5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 18:32:06 +08:00

Refactor ExportJob to facilitate multiple kinds of export jobs.

Add 'JobBase' which attempts to unify ExportJob with ImportJobBase.
ImportJobBase and ExportJobBase override job-type specific behavior.

From: Aaron Kimball <aaron@cloudera.com>

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149875 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Bayer 2011-07-22 20:03:40 +00:00
parent 8147d262d8
commit abbfab1941
8 changed files with 511 additions and 293 deletions

View File

@ -23,7 +23,7 @@
import org.apache.hadoop.sqoop.lib.BlobRef;
import org.apache.hadoop.sqoop.lib.ClobRef;
import org.apache.hadoop.sqoop.mapreduce.DataDrivenImportJob;
import org.apache.hadoop.sqoop.mapreduce.ExportJob;
import org.apache.hadoop.sqoop.mapreduce.JdbcExportJob;
import org.apache.hadoop.sqoop.util.ExportException;
import org.apache.hadoop.sqoop.util.ImportException;
import org.apache.hadoop.sqoop.util.ResultSetPrinter;
@ -488,7 +488,7 @@ protected Connection makeConnection() throws SQLException {
*/
public void exportTable(ExportJobContext context)
throws IOException, ExportException {
ExportJob exportJob = new ExportJob(context);
JdbcExportJob exportJob = new JdbcExportJob(context);
exportJob.runExport();
}

View File

@ -1,165 +0,0 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.mapreduce;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.sqoop.ConnFactory;
import org.apache.hadoop.sqoop.SqoopOptions;
import org.apache.hadoop.sqoop.lib.SqoopRecord;
import org.apache.hadoop.sqoop.manager.ConnManager;
import org.apache.hadoop.sqoop.manager.ExportJobContext;
import org.apache.hadoop.sqoop.orm.TableClassName;
import org.apache.hadoop.sqoop.util.ClassLoaderStack;
import org.apache.hadoop.sqoop.util.ExportException;
/**
* Actually runs a jdbc export job using the ORM files generated by the sqoop.orm package.
* Uses DBOutputFormat
*/
public class ExportJob {
public static final Log LOG = LogFactory.getLog(ExportJob.class.getName());
public static final String SQOOP_EXPORT_TABLE_CLASS_KEY = "sqoop.export.table.class";
private ExportJobContext context;
public ExportJob(final ExportJobContext ctxt) {
this.context = ctxt;
}
/**
* Run an export job to dump a table from HDFS to a database
* @throws IOException if the export job encounters an IO error
* @throws ExportException if the job fails unexpectedly or is misconfigured.
*/
public void runExport() throws ExportException, IOException {
SqoopOptions options = context.getOptions();
Configuration conf = options.getConf();
String tableName = context.getTableName();
String tableClassName = new TableClassName(options).getClassForTable(tableName);
String ormJarFile = context.getJarFile();
ConnManager mgr = null;
LOG.info("Beginning export of " + tableName);
boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address"))
|| "local".equals(conf.get("mapred.job.tracker"));
ClassLoader prevClassLoader = null;
if (isLocal) {
// If we're using the LocalJobRunner, then instead of using the compiled jar file
// as the job source, we're running in the current thread. Push on another classloader
// that loads from that jar in addition to everything currently on the classpath.
prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile, tableClassName);
}
try {
Job job = new Job(conf);
// Set the external jar to use for the job.
job.getConfiguration().set("mapred.jar", ormJarFile);
Path inputPath = new Path(context.getOptions().getExportDir());
inputPath = inputPath.makeQualified(FileSystem.get(conf));
boolean isSeqFiles = ExportInputFormat.isSequenceFiles(
context.getOptions().getConf(), inputPath);
if (isSeqFiles) {
job.setMapperClass(SequenceFileExportMapper.class);
} else {
job.setMapperClass(TextExportMapper.class);
}
job.setInputFormatClass(ExportInputFormat.class);
FileInputFormat.addInputPath(job, inputPath);
job.setNumReduceTasks(0);
ExportInputFormat.setNumMapTasks(job, options.getNumMappers());
// Concurrent writes of the same records would be problematic.
job.setMapSpeculativeExecution(false);
mgr = new ConnFactory(conf).getManager(options);
String username = options.getUsername();
if (null == username || username.length() == 0) {
DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
options.getConnectString());
} else {
DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
options.getConnectString(), username, options.getPassword());
}
String [] colNames = options.getColumns();
if (null == colNames) {
colNames = mgr.getColumnNames(tableName);
}
DBOutputFormat.setOutput(job, tableName, colNames);
job.setOutputFormatClass(DBOutputFormat.class);
job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
job.setMapOutputKeyClass(SqoopRecord.class);
job.setMapOutputValueClass(NullWritable.class);
try {
boolean success = job.waitForCompletion(false);
if (!success) {
throw new ExportException("Export job failed!");
}
} catch (InterruptedException ie) {
throw new IOException(ie);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
} finally {
if (isLocal && null != prevClassLoader) {
// unload the special classloader for this jar.
ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
}
if (null != mgr) {
try {
mgr.close();
} catch (SQLException sqlE) {
LOG.warn("Error closing connection: " + sqlE);
}
}
}
}
}

View File

@ -0,0 +1,194 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.mapreduce;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.sqoop.ConnFactory;
import org.apache.hadoop.sqoop.SqoopOptions;
import org.apache.hadoop.sqoop.lib.SqoopRecord;
import org.apache.hadoop.sqoop.manager.ConnManager;
import org.apache.hadoop.sqoop.manager.ExportJobContext;
import org.apache.hadoop.sqoop.orm.TableClassName;
import org.apache.hadoop.sqoop.util.ClassLoaderStack;
import org.apache.hadoop.sqoop.util.ExportException;
import org.apache.hadoop.sqoop.util.PerfCounters;
/**
* Base class for running an export MapReduce job.
*/
public class ExportJobBase extends JobBase {
public static final Log LOG = LogFactory.getLog(
ExportJobBase.class.getName());
public static final String SQOOP_EXPORT_TABLE_CLASS_KEY =
"sqoop.export.table.class";
protected ExportJobContext context;
public ExportJobBase() {
this(null);
}
public ExportJobBase(final ExportJobContext ctxt) {
this(ctxt, null, null, null);
}
public ExportJobBase(final ExportJobContext ctxt,
final Class<? extends Mapper> mapperClass,
final Class<? extends InputFormat> inputFormatClass,
final Class<? extends OutputFormat> outputFormatClass) {
super(ctxt.getOptions(), mapperClass, inputFormatClass, outputFormatClass);
this.context = ctxt;
}
/**
* @return the Path to the files we are going to export to the db.
*/
protected Path getInputPath() throws IOException {
Path inputPath = new Path(context.getOptions().getExportDir());
Configuration conf = options.getConf();
inputPath = inputPath.makeQualified(FileSystem.get(conf));
return inputPath;
}
@Override
protected void configureInputFormat(Job job, String tableName,
String tableClassName, String splitByCol) throws IOException {
super.configureInputFormat(job, tableName, tableClassName, splitByCol);
FileInputFormat.addInputPath(job, getInputPath());
}
@Override
protected Class<? extends InputFormat> getInputFormatClass() {
Class<? extends InputFormat> configuredIF = super.getInputFormatClass();
if (null == configuredIF) {
return ExportInputFormat.class;
} else {
return configuredIF;
}
}
@Override
protected void configureMapper(Job job, String tableName,
String tableClassName) throws IOException {
job.setMapperClass(getMapperClass());
// Concurrent writes of the same records would be problematic.
job.setMapSpeculativeExecution(false);
job.setMapOutputKeyClass(SqoopRecord.class);
job.setMapOutputValueClass(NullWritable.class);
}
@Override
protected int configureNumTasks(Job job) throws IOException {
int numMaps = super.configureNumTasks(job);
ExportInputFormat.setNumMapTasks(job, numMaps);
return numMaps;
}
@Override
protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
InterruptedException {
PerfCounters counters = new PerfCounters();
counters.startClock();
boolean success = job.waitForCompletion(false);
counters.stopClock();
counters.addBytes(job.getCounters().getGroup("FileSystemCounters")
.findCounter("HDFS_BYTES_READ").getValue());
LOG.info("Transferred " + counters.toString());
long numRecords = job.getCounters()
.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
LOG.info("Exported " + numRecords + " records.");
return success;
}
/**
* Run an export job to dump a table from HDFS to a database
* @throws IOException if the export job encounters an IO error
* @throws ExportException if the job fails unexpectedly or is misconfigured.
*/
public void runExport() throws ExportException, IOException {
SqoopOptions options = context.getOptions();
Configuration conf = options.getConf();
String tableName = context.getTableName();
String tableClassName = new TableClassName(options).getClassForTable(tableName);
String ormJarFile = context.getJarFile();
LOG.info("Beginning export of " + tableName);
loadJars(conf, ormJarFile, tableClassName);
try {
Job job = new Job(conf);
// Set the external jar to use for the job.
job.getConfiguration().set("mapred.jar", ormJarFile);
configureInputFormat(job, tableName, tableClassName, null);
configureOutputFormat(job, tableName, tableClassName);
configureMapper(job, tableName, tableClassName);
configureNumTasks(job);
try {
boolean success = runJob(job);
if (!success) {
throw new ExportException("Export job failed!");
}
} catch (InterruptedException ie) {
throw new IOException(ie);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
} finally {
unloadJars();
}
}
}

View File

@ -55,17 +55,10 @@
* Base class for running an import MapReduce job.
* Allows dependency injection, etc, for easy customization of import job types.
*/
public class ImportJobBase {
public class ImportJobBase extends JobBase {
public static final Log LOG = LogFactory.getLog(ImportJobBase.class.getName());
protected SqoopOptions options;
protected Class<? extends Mapper> mapperClass;
protected Class<? extends InputFormat> inputFormatClass;
protected Class<? extends OutputFormat> outputFormatClass;
private ClassLoader prevClassLoader = null;
public ImportJobBase() {
this(null);
}
@ -78,94 +71,13 @@ public ImportJobBase(final SqoopOptions opts,
final Class<? extends Mapper> mapperClass,
final Class<? extends InputFormat> inputFormatClass,
final Class<? extends OutputFormat> outputFormatClass) {
this.options = opts;
this.mapperClass = mapperClass;
this.inputFormatClass = inputFormatClass;
this.outputFormatClass = outputFormatClass;
}
/**
* @return the mapper class to use for the job.
*/
protected Class<? extends Mapper> getMapperClass() {
return this.mapperClass;
}
/**
* @return the inputformat class to use for the job.
*/
protected Class<? extends InputFormat> getInputFormatClass() {
return this.inputFormatClass;
}
/**
* @return the outputformat class to use for the job.
*/
protected Class<? extends OutputFormat> getOutputFormatClass() {
return this.outputFormatClass;
}
/** Set the OutputFormat class to use for this job */
public void setOutputFormatClass(Class<? extends OutputFormat> cls) {
this.outputFormatClass = cls;
}
/** Set the InputFormat class to use for this job */
public void setInputFormatClass(Class<? extends InputFormat> cls) {
this.inputFormatClass = cls;
}
/** Set the Mapper class to use for this job */
public void setMapperClass(Class<? extends Mapper> cls) {
this.mapperClass = cls;
}
/**
* Set the SqoopOptions configuring this job
*/
public void setOptions(SqoopOptions opts) {
this.options = opts;
}
/**
* If jars must be loaded into the local environment, do so here.
*/
protected void loadJars(Configuration conf, String ormJarFile,
String tableClassName) throws IOException {
boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address"))
|| "local".equals(conf.get("mapred.job.tracker"));
if (isLocal) {
// If we're using the LocalJobRunner, then instead of using the compiled jar file
// as the job source, we're running in the current thread. Push on another classloader
// that loads from that jar in addition to everything currently on the classpath.
this.prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile,
tableClassName);
}
}
/**
* If any classloader was invoked by loadJars, free it here.
*/
protected void unloadJars() {
if (null != this.prevClassLoader) {
// unload the special classloader for this jar.
ClassLoaderStack.setCurrentClassLoader(this.prevClassLoader);
}
}
/**
* Configure the inputformat to use for the job.
*/
protected void configureInputFormat(Job job, String tableName,
String tableClassName, String splitByCol) throws IOException {
LOG.debug("Using InputFormat: " + inputFormatClass);
job.setInputFormatClass(getInputFormatClass());
super(opts, mapperClass, inputFormatClass, outputFormatClass);
}
/**
* Configure the output format to use for the job.
*/
@Override
protected void configureOutputFormat(Job job, String tableName,
String tableClassName) throws IOException {
String hdfsWarehouseDir = options.getWarehouseDir();
@ -195,33 +107,12 @@ protected void configureOutputFormat(Job job, String tableName,
FileOutputFormat.setOutputPath(job, outputPath);
}
/**
* Set the mapper class implementation to use in the job,
* as well as any related configuration (e.g., map output types).
*/
protected void configureMapper(Job job, String tableName,
String tableClassName) throws IOException {
job.setMapperClass(getMapperClass());
}
/**
* Configure the number of map/reduce tasks to use in the job.
*/
protected void configureNumTasks(Job job) throws IOException {
int numMapTasks = options.getNumMappers();
if (numMapTasks < 1) {
numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS;
LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
}
job.getConfiguration().setInt(JobContext.NUM_MAPS, numMapTasks);
job.setNumReduceTasks(0);
}
/**
* Actually run the MapReduce job.
*/
protected void runJob(Job job) throws ClassNotFoundException, IOException,
ImportException, InterruptedException {
@Override
protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
InterruptedException {
PerfCounters counters = new PerfCounters();
counters.startClock();
@ -234,10 +125,7 @@ protected void runJob(Job job) throws ClassNotFoundException, IOException,
long numRecords = job.getCounters()
.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue();
LOG.info("Retrieved " + numRecords + " records.");
if (!success) {
throw new ImportException("Import job failed!");
}
return success;
}
@ -270,7 +158,10 @@ public void runImport(String tableName, String ormJarFile, String splitByCol,
configureNumTasks(job);
try {
runJob(job);
boolean success = runJob(job);
if (!success) {
throw new ImportException("Import job failed!");
}
} catch (InterruptedException ie) {
throw new IOException(ie);
} catch (ClassNotFoundException cnfe) {

View File

@ -0,0 +1,108 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.mapreduce;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.sqoop.ConnFactory;
import org.apache.hadoop.sqoop.manager.ConnManager;
import org.apache.hadoop.sqoop.manager.ExportJobContext;
/**
* Run an export using JDBC (DBOutputFormat)
*/
public class JdbcExportJob extends ExportJobBase {
public static final Log LOG = LogFactory.getLog(
JdbcExportJob.class.getName());
public JdbcExportJob(final ExportJobContext context) {
super(context);
}
@Override
protected Class<? extends Mapper> getMapperClass() {
boolean isSeqFiles;
try {
isSeqFiles = ExportInputFormat.isSequenceFiles(
context.getOptions().getConf(), getInputPath());
} catch (IOException ioe) {
LOG.warn("Could not check file format for export; assuming text");
isSeqFiles = false;
}
if (isSeqFiles) {
return SequenceFileExportMapper.class;
} else {
return TextExportMapper.class;
}
}
@Override
protected void configureOutputFormat(Job job, String tableName,
String tableClassName) throws IOException {
Configuration conf = options.getConf();
ConnManager mgr = new ConnFactory(conf).getManager(options);
try {
String username = options.getUsername();
if (null == username || username.length() == 0) {
DBConfiguration.configureDB(job.getConfiguration(),
mgr.getDriverClass(),
options.getConnectString());
} else {
DBConfiguration.configureDB(job.getConfiguration(),
mgr.getDriverClass(),
options.getConnectString(),
username, options.getPassword());
}
String [] colNames = options.getColumns();
if (null == colNames) {
colNames = mgr.getColumnNames(tableName);
}
DBOutputFormat.setOutput(job, tableName, colNames);
job.setOutputFormatClass(DBOutputFormat.class);
job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
} finally {
try {
mgr.close();
} catch (SQLException sqlE) {
LOG.warn("Error closing connection: " + sqlE);
}
}
}
}

View File

@ -0,0 +1,190 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.mapreduce;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.sqoop.SqoopOptions;
import org.apache.hadoop.sqoop.util.ClassLoaderStack;
/**
* Base class for configuring and running a MapReduce job.
* Allows dependency injection, etc, for easy customization of import job types.
*/
public class JobBase {
public static final Log LOG = LogFactory.getLog(JobBase.class.getName());
protected SqoopOptions options;
protected Class<? extends Mapper> mapperClass;
protected Class<? extends InputFormat> inputFormatClass;
protected Class<? extends OutputFormat> outputFormatClass;
private ClassLoader prevClassLoader = null;
public JobBase() {
this(null);
}
public JobBase(final SqoopOptions opts) {
this(opts, null, null, null);
}
public JobBase(final SqoopOptions opts,
final Class<? extends Mapper> mapperClass,
final Class<? extends InputFormat> inputFormatClass,
final Class<? extends OutputFormat> outputFormatClass) {
this.options = opts;
this.mapperClass = mapperClass;
this.inputFormatClass = inputFormatClass;
this.outputFormatClass = outputFormatClass;
}
/**
* @return the mapper class to use for the job.
*/
protected Class<? extends Mapper> getMapperClass() {
return this.mapperClass;
}
/**
* @return the inputformat class to use for the job.
*/
protected Class<? extends InputFormat> getInputFormatClass() {
return this.inputFormatClass;
}
/**
* @return the outputformat class to use for the job.
*/
protected Class<? extends OutputFormat> getOutputFormatClass() {
return this.outputFormatClass;
}
/** Set the OutputFormat class to use for this job */
public void setOutputFormatClass(Class<? extends OutputFormat> cls) {
this.outputFormatClass = cls;
}
/** Set the InputFormat class to use for this job */
public void setInputFormatClass(Class<? extends InputFormat> cls) {
this.inputFormatClass = cls;
}
/** Set the Mapper class to use for this job */
public void setMapperClass(Class<? extends Mapper> cls) {
this.mapperClass = cls;
}
/**
* Set the SqoopOptions configuring this job
*/
public void setOptions(SqoopOptions opts) {
this.options = opts;
}
/**
* If jars must be loaded into the local environment, do so here.
*/
protected void loadJars(Configuration conf, String ormJarFile,
String tableClassName) throws IOException {
boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address"))
|| "local".equals(conf.get("mapred.job.tracker"));
if (isLocal) {
// If we're using the LocalJobRunner, then instead of using the compiled jar file
// as the job source, we're running in the current thread. Push on another classloader
// that loads from that jar in addition to everything currently on the classpath.
this.prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile,
tableClassName);
}
}
/**
* If any classloader was invoked by loadJars, free it here.
*/
protected void unloadJars() {
if (null != this.prevClassLoader) {
// unload the special classloader for this jar.
ClassLoaderStack.setCurrentClassLoader(this.prevClassLoader);
}
}
/**
* Configure the inputformat to use for the job.
*/
protected void configureInputFormat(Job job, String tableName,
String tableClassName, String splitByCol) throws IOException {
//TODO: 'splitByCol' is import-job specific; lift it out of this API.
Class<? extends InputFormat> ifClass = getInputFormatClass();
LOG.debug("Using InputFormat: " + ifClass);
job.setInputFormatClass(ifClass);
}
/**
* Configure the output format to use for the job.
*/
protected void configureOutputFormat(Job job, String tableName,
String tableClassName) throws IOException {
Class<? extends OutputFormat> ofClass = getOutputFormatClass();
LOG.debug("Using OutputFormat: " + ofClass);
job.setOutputFormatClass(ofClass);
}
/**
* Set the mapper class implementation to use in the job,
* as well as any related configuration (e.g., map output types).
*/
protected void configureMapper(Job job, String tableName,
String tableClassName) throws IOException {
job.setMapperClass(getMapperClass());
}
/**
* Configure the number of map/reduce tasks to use in the job.
*/
protected int configureNumTasks(Job job) throws IOException {
int numMapTasks = options.getNumMappers();
if (numMapTasks < 1) {
numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS;
LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
}
job.getConfiguration().setInt(JobContext.NUM_MAPS, numMapTasks);
job.setNumReduceTasks(0);
return numMapTasks;
}
/**
* Actually run the MapReduce job.
*/
protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
InterruptedException {
return job.waitForCompletion(true);
}
}

View File

@ -43,16 +43,18 @@ public class TextExportMapper
public TextExportMapper() {
}
protected void setup(Context context) throws IOException, InterruptedException {
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
// Instantiate a copy of the user's class to hold and parse the record.
String recordClassName = conf.get(ExportJob.SQOOP_EXPORT_TABLE_CLASS_KEY);
String recordClassName = conf.get(
ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
if (null == recordClassName) {
throw new IOException("Export table class name ("
+ ExportJob.SQOOP_EXPORT_TABLE_CLASS_KEY
+ ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+ ") is not set!");
}

View File

@ -19,9 +19,7 @@
package org.apache.hadoop.sqoop.util;
/**
* General error during import process.
*
*
* General error during the import process.
*/
@SuppressWarnings("serial")
public class ImportException extends Exception {