diff --git a/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java b/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java index a9ddcb0e..55c3eebc 100644 --- a/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java +++ b/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java @@ -23,7 +23,7 @@ import org.apache.hadoop.sqoop.lib.BlobRef; import org.apache.hadoop.sqoop.lib.ClobRef; import org.apache.hadoop.sqoop.mapreduce.DataDrivenImportJob; -import org.apache.hadoop.sqoop.mapreduce.ExportJob; +import org.apache.hadoop.sqoop.mapreduce.JdbcExportJob; import org.apache.hadoop.sqoop.util.ExportException; import org.apache.hadoop.sqoop.util.ImportException; import org.apache.hadoop.sqoop.util.ResultSetPrinter; @@ -488,7 +488,7 @@ protected Connection makeConnection() throws SQLException { */ public void exportTable(ExportJobContext context) throws IOException, ExportException { - ExportJob exportJob = new ExportJob(context); + JdbcExportJob exportJob = new JdbcExportJob(context); exportJob.runExport(); } diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJob.java b/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJob.java deleted file mode 100644 index 0edbc861..00000000 --- a/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJob.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed to Cloudera, Inc. under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Cloudera, Inc. licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.sqoop.mapreduce; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.sql.SQLException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; -import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; - -import org.apache.hadoop.sqoop.ConnFactory; -import org.apache.hadoop.sqoop.SqoopOptions; -import org.apache.hadoop.sqoop.lib.SqoopRecord; -import org.apache.hadoop.sqoop.manager.ConnManager; -import org.apache.hadoop.sqoop.manager.ExportJobContext; -import org.apache.hadoop.sqoop.orm.TableClassName; -import org.apache.hadoop.sqoop.util.ClassLoaderStack; -import org.apache.hadoop.sqoop.util.ExportException; - -/** - * Actually runs a jdbc export job using the ORM files generated by the sqoop.orm package. - * Uses DBOutputFormat - */ -public class ExportJob { - - public static final Log LOG = LogFactory.getLog(ExportJob.class.getName()); - - public static final String SQOOP_EXPORT_TABLE_CLASS_KEY = "sqoop.export.table.class"; - - private ExportJobContext context; - - public ExportJob(final ExportJobContext ctxt) { - this.context = ctxt; - } - - /** - * Run an export job to dump a table from HDFS to a database - * @throws IOException if the export job encounters an IO error - * @throws ExportException if the job fails unexpectedly or is misconfigured. - */ - public void runExport() throws ExportException, IOException { - - SqoopOptions options = context.getOptions(); - Configuration conf = options.getConf(); - String tableName = context.getTableName(); - String tableClassName = new TableClassName(options).getClassForTable(tableName); - String ormJarFile = context.getJarFile(); - ConnManager mgr = null; - - LOG.info("Beginning export of " + tableName); - - boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address")) - || "local".equals(conf.get("mapred.job.tracker")); - ClassLoader prevClassLoader = null; - if (isLocal) { - // If we're using the LocalJobRunner, then instead of using the compiled jar file - // as the job source, we're running in the current thread. Push on another classloader - // that loads from that jar in addition to everything currently on the classpath. - prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile, tableClassName); - } - - try { - Job job = new Job(conf); - - // Set the external jar to use for the job. - job.getConfiguration().set("mapred.jar", ormJarFile); - - Path inputPath = new Path(context.getOptions().getExportDir()); - inputPath = inputPath.makeQualified(FileSystem.get(conf)); - - boolean isSeqFiles = ExportInputFormat.isSequenceFiles( - context.getOptions().getConf(), inputPath); - - if (isSeqFiles) { - job.setMapperClass(SequenceFileExportMapper.class); - } else { - job.setMapperClass(TextExportMapper.class); - } - - job.setInputFormatClass(ExportInputFormat.class); - FileInputFormat.addInputPath(job, inputPath); - job.setNumReduceTasks(0); - ExportInputFormat.setNumMapTasks(job, options.getNumMappers()); - - // Concurrent writes of the same records would be problematic. - job.setMapSpeculativeExecution(false); - - mgr = new ConnFactory(conf).getManager(options); - String username = options.getUsername(); - if (null == username || username.length() == 0) { - DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), - options.getConnectString()); - } else { - DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), - options.getConnectString(), username, options.getPassword()); - } - - String [] colNames = options.getColumns(); - if (null == colNames) { - colNames = mgr.getColumnNames(tableName); - } - DBOutputFormat.setOutput(job, tableName, colNames); - - job.setOutputFormatClass(DBOutputFormat.class); - job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName); - job.setMapOutputKeyClass(SqoopRecord.class); - job.setMapOutputValueClass(NullWritable.class); - - try { - boolean success = job.waitForCompletion(false); - if (!success) { - throw new ExportException("Export job failed!"); - } - } catch (InterruptedException ie) { - throw new IOException(ie); - } catch (ClassNotFoundException cnfe) { - throw new IOException(cnfe); - } - } finally { - if (isLocal && null != prevClassLoader) { - // unload the special classloader for this jar. - ClassLoaderStack.setCurrentClassLoader(prevClassLoader); - } - - if (null != mgr) { - try { - mgr.close(); - } catch (SQLException sqlE) { - LOG.warn("Error closing connection: " + sqlE); - } - } - } - } -} diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJobBase.java b/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJobBase.java new file mode 100644 index 00000000..678ab9b2 --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJobBase.java @@ -0,0 +1,194 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.mapreduce; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.sql.SQLException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.TaskCounter; +import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; +import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; + +import org.apache.hadoop.sqoop.ConnFactory; +import org.apache.hadoop.sqoop.SqoopOptions; +import org.apache.hadoop.sqoop.lib.SqoopRecord; +import org.apache.hadoop.sqoop.manager.ConnManager; +import org.apache.hadoop.sqoop.manager.ExportJobContext; +import org.apache.hadoop.sqoop.orm.TableClassName; +import org.apache.hadoop.sqoop.util.ClassLoaderStack; +import org.apache.hadoop.sqoop.util.ExportException; +import org.apache.hadoop.sqoop.util.PerfCounters; + +/** + * Base class for running an export MapReduce job. + */ +public class ExportJobBase extends JobBase { + + public static final Log LOG = LogFactory.getLog( + ExportJobBase.class.getName()); + + public static final String SQOOP_EXPORT_TABLE_CLASS_KEY = + "sqoop.export.table.class"; + + protected ExportJobContext context; + + public ExportJobBase() { + this(null); + } + + public ExportJobBase(final ExportJobContext ctxt) { + this(ctxt, null, null, null); + } + + public ExportJobBase(final ExportJobContext ctxt, + final Class mapperClass, + final Class inputFormatClass, + final Class outputFormatClass) { + super(ctxt.getOptions(), mapperClass, inputFormatClass, outputFormatClass); + this.context = ctxt; + } + + /** + * @return the Path to the files we are going to export to the db. + */ + protected Path getInputPath() throws IOException { + Path inputPath = new Path(context.getOptions().getExportDir()); + Configuration conf = options.getConf(); + inputPath = inputPath.makeQualified(FileSystem.get(conf)); + return inputPath; + } + + @Override + protected void configureInputFormat(Job job, String tableName, + String tableClassName, String splitByCol) throws IOException { + + super.configureInputFormat(job, tableName, tableClassName, splitByCol); + FileInputFormat.addInputPath(job, getInputPath()); + } + + @Override + protected Class getInputFormatClass() { + Class configuredIF = super.getInputFormatClass(); + if (null == configuredIF) { + return ExportInputFormat.class; + } else { + return configuredIF; + } + } + + @Override + protected void configureMapper(Job job, String tableName, + String tableClassName) throws IOException { + + job.setMapperClass(getMapperClass()); + + // Concurrent writes of the same records would be problematic. + job.setMapSpeculativeExecution(false); + + job.setMapOutputKeyClass(SqoopRecord.class); + job.setMapOutputValueClass(NullWritable.class); + } + + @Override + protected int configureNumTasks(Job job) throws IOException { + int numMaps = super.configureNumTasks(job); + ExportInputFormat.setNumMapTasks(job, numMaps); + return numMaps; + } + + @Override + protected boolean runJob(Job job) throws ClassNotFoundException, IOException, + InterruptedException { + + PerfCounters counters = new PerfCounters(); + counters.startClock(); + + boolean success = job.waitForCompletion(false); + counters.stopClock(); + counters.addBytes(job.getCounters().getGroup("FileSystemCounters") + .findCounter("HDFS_BYTES_READ").getValue()); + LOG.info("Transferred " + counters.toString()); + long numRecords = job.getCounters() + .findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue(); + LOG.info("Exported " + numRecords + " records."); + + return success; + } + + + /** + * Run an export job to dump a table from HDFS to a database + * @throws IOException if the export job encounters an IO error + * @throws ExportException if the job fails unexpectedly or is misconfigured. + */ + public void runExport() throws ExportException, IOException { + + SqoopOptions options = context.getOptions(); + Configuration conf = options.getConf(); + String tableName = context.getTableName(); + String tableClassName = new TableClassName(options).getClassForTable(tableName); + String ormJarFile = context.getJarFile(); + + LOG.info("Beginning export of " + tableName); + loadJars(conf, ormJarFile, tableClassName); + + try { + Job job = new Job(conf); + + // Set the external jar to use for the job. + job.getConfiguration().set("mapred.jar", ormJarFile); + + configureInputFormat(job, tableName, tableClassName, null); + configureOutputFormat(job, tableName, tableClassName); + configureMapper(job, tableName, tableClassName); + configureNumTasks(job); + + try { + boolean success = runJob(job); + if (!success) { + throw new ExportException("Export job failed!"); + } + } catch (InterruptedException ie) { + throw new IOException(ie); + } catch (ClassNotFoundException cnfe) { + throw new IOException(cnfe); + } + } finally { + unloadJars(); + } + } +} diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/hadoop/sqoop/mapreduce/ImportJobBase.java index 9c7e3809..4a083a64 100644 --- a/src/java/org/apache/hadoop/sqoop/mapreduce/ImportJobBase.java +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/ImportJobBase.java @@ -55,17 +55,10 @@ * Base class for running an import MapReduce job. * Allows dependency injection, etc, for easy customization of import job types. */ -public class ImportJobBase { +public class ImportJobBase extends JobBase { public static final Log LOG = LogFactory.getLog(ImportJobBase.class.getName()); - protected SqoopOptions options; - protected Class mapperClass; - protected Class inputFormatClass; - protected Class outputFormatClass; - - private ClassLoader prevClassLoader = null; - public ImportJobBase() { this(null); } @@ -78,94 +71,13 @@ public ImportJobBase(final SqoopOptions opts, final Class mapperClass, final Class inputFormatClass, final Class outputFormatClass) { - - this.options = opts; - this.mapperClass = mapperClass; - this.inputFormatClass = inputFormatClass; - this.outputFormatClass = outputFormatClass; - } - - /** - * @return the mapper class to use for the job. - */ - protected Class getMapperClass() { - return this.mapperClass; - } - - /** - * @return the inputformat class to use for the job. - */ - protected Class getInputFormatClass() { - return this.inputFormatClass; - } - - /** - * @return the outputformat class to use for the job. - */ - protected Class getOutputFormatClass() { - return this.outputFormatClass; - } - - /** Set the OutputFormat class to use for this job */ - public void setOutputFormatClass(Class cls) { - this.outputFormatClass = cls; - } - - /** Set the InputFormat class to use for this job */ - public void setInputFormatClass(Class cls) { - this.inputFormatClass = cls; - } - - /** Set the Mapper class to use for this job */ - public void setMapperClass(Class cls) { - this.mapperClass = cls; - } - - /** - * Set the SqoopOptions configuring this job - */ - public void setOptions(SqoopOptions opts) { - this.options = opts; - } - - /** - * If jars must be loaded into the local environment, do so here. - */ - protected void loadJars(Configuration conf, String ormJarFile, - String tableClassName) throws IOException { - boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address")) - || "local".equals(conf.get("mapred.job.tracker")); - if (isLocal) { - // If we're using the LocalJobRunner, then instead of using the compiled jar file - // as the job source, we're running in the current thread. Push on another classloader - // that loads from that jar in addition to everything currently on the classpath. - this.prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile, - tableClassName); - } - } - - /** - * If any classloader was invoked by loadJars, free it here. - */ - protected void unloadJars() { - if (null != this.prevClassLoader) { - // unload the special classloader for this jar. - ClassLoaderStack.setCurrentClassLoader(this.prevClassLoader); - } - } - - /** - * Configure the inputformat to use for the job. - */ - protected void configureInputFormat(Job job, String tableName, - String tableClassName, String splitByCol) throws IOException { - LOG.debug("Using InputFormat: " + inputFormatClass); - job.setInputFormatClass(getInputFormatClass()); + super(opts, mapperClass, inputFormatClass, outputFormatClass); } /** * Configure the output format to use for the job. */ + @Override protected void configureOutputFormat(Job job, String tableName, String tableClassName) throws IOException { String hdfsWarehouseDir = options.getWarehouseDir(); @@ -195,33 +107,12 @@ protected void configureOutputFormat(Job job, String tableName, FileOutputFormat.setOutputPath(job, outputPath); } - /** - * Set the mapper class implementation to use in the job, - * as well as any related configuration (e.g., map output types). - */ - protected void configureMapper(Job job, String tableName, - String tableClassName) throws IOException { - job.setMapperClass(getMapperClass()); - } - - /** - * Configure the number of map/reduce tasks to use in the job. - */ - protected void configureNumTasks(Job job) throws IOException { - int numMapTasks = options.getNumMappers(); - if (numMapTasks < 1) { - numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS; - LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers."); - } - job.getConfiguration().setInt(JobContext.NUM_MAPS, numMapTasks); - job.setNumReduceTasks(0); - } - /** * Actually run the MapReduce job. */ - protected void runJob(Job job) throws ClassNotFoundException, IOException, - ImportException, InterruptedException { + @Override + protected boolean runJob(Job job) throws ClassNotFoundException, IOException, + InterruptedException { PerfCounters counters = new PerfCounters(); counters.startClock(); @@ -234,10 +125,7 @@ protected void runJob(Job job) throws ClassNotFoundException, IOException, long numRecords = job.getCounters() .findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue(); LOG.info("Retrieved " + numRecords + " records."); - - if (!success) { - throw new ImportException("Import job failed!"); - } + return success; } @@ -270,7 +158,10 @@ public void runImport(String tableName, String ormJarFile, String splitByCol, configureNumTasks(job); try { - runJob(job); + boolean success = runJob(job); + if (!success) { + throw new ImportException("Import job failed!"); + } } catch (InterruptedException ie) { throw new IOException(ie); } catch (ClassNotFoundException cnfe) { diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/hadoop/sqoop/mapreduce/JdbcExportJob.java new file mode 100644 index 00000000..e710333b --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/JdbcExportJob.java @@ -0,0 +1,108 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.SQLException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; +import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; + +import org.apache.hadoop.sqoop.ConnFactory; +import org.apache.hadoop.sqoop.manager.ConnManager; +import org.apache.hadoop.sqoop.manager.ExportJobContext; + + +/** + * Run an export using JDBC (DBOutputFormat) + */ +public class JdbcExportJob extends ExportJobBase { + + public static final Log LOG = LogFactory.getLog( + JdbcExportJob.class.getName()); + + public JdbcExportJob(final ExportJobContext context) { + super(context); + } + + @Override + protected Class getMapperClass() { + + boolean isSeqFiles; + try { + isSeqFiles = ExportInputFormat.isSequenceFiles( + context.getOptions().getConf(), getInputPath()); + } catch (IOException ioe) { + LOG.warn("Could not check file format for export; assuming text"); + isSeqFiles = false; + } + + if (isSeqFiles) { + return SequenceFileExportMapper.class; + } else { + return TextExportMapper.class; + } + } + + @Override + protected void configureOutputFormat(Job job, String tableName, + String tableClassName) throws IOException { + + Configuration conf = options.getConf(); + ConnManager mgr = new ConnFactory(conf).getManager(options); + try { + String username = options.getUsername(); + if (null == username || username.length() == 0) { + DBConfiguration.configureDB(job.getConfiguration(), + mgr.getDriverClass(), + options.getConnectString()); + } else { + DBConfiguration.configureDB(job.getConfiguration(), + mgr.getDriverClass(), + options.getConnectString(), + username, options.getPassword()); + } + + String [] colNames = options.getColumns(); + if (null == colNames) { + colNames = mgr.getColumnNames(tableName); + } + DBOutputFormat.setOutput(job, tableName, colNames); + + job.setOutputFormatClass(DBOutputFormat.class); + job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName); + } finally { + try { + mgr.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing connection: " + sqlE); + } + } + } + + + +} + diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/JobBase.java b/src/java/org/apache/hadoop/sqoop/mapreduce/JobBase.java new file mode 100644 index 00000000..a5e72dd8 --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/JobBase.java @@ -0,0 +1,190 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; + +import org.apache.hadoop.sqoop.SqoopOptions; +import org.apache.hadoop.sqoop.util.ClassLoaderStack; + +/** + * Base class for configuring and running a MapReduce job. + * Allows dependency injection, etc, for easy customization of import job types. + */ +public class JobBase { + + public static final Log LOG = LogFactory.getLog(JobBase.class.getName()); + + protected SqoopOptions options; + protected Class mapperClass; + protected Class inputFormatClass; + protected Class outputFormatClass; + + private ClassLoader prevClassLoader = null; + + public JobBase() { + this(null); + } + + public JobBase(final SqoopOptions opts) { + this(opts, null, null, null); + } + + public JobBase(final SqoopOptions opts, + final Class mapperClass, + final Class inputFormatClass, + final Class outputFormatClass) { + + this.options = opts; + this.mapperClass = mapperClass; + this.inputFormatClass = inputFormatClass; + this.outputFormatClass = outputFormatClass; + } + + /** + * @return the mapper class to use for the job. + */ + protected Class getMapperClass() { + return this.mapperClass; + } + + /** + * @return the inputformat class to use for the job. + */ + protected Class getInputFormatClass() { + return this.inputFormatClass; + } + + /** + * @return the outputformat class to use for the job. + */ + protected Class getOutputFormatClass() { + return this.outputFormatClass; + } + + /** Set the OutputFormat class to use for this job */ + public void setOutputFormatClass(Class cls) { + this.outputFormatClass = cls; + } + + /** Set the InputFormat class to use for this job */ + public void setInputFormatClass(Class cls) { + this.inputFormatClass = cls; + } + + /** Set the Mapper class to use for this job */ + public void setMapperClass(Class cls) { + this.mapperClass = cls; + } + + /** + * Set the SqoopOptions configuring this job + */ + public void setOptions(SqoopOptions opts) { + this.options = opts; + } + + /** + * If jars must be loaded into the local environment, do so here. + */ + protected void loadJars(Configuration conf, String ormJarFile, + String tableClassName) throws IOException { + boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address")) + || "local".equals(conf.get("mapred.job.tracker")); + if (isLocal) { + // If we're using the LocalJobRunner, then instead of using the compiled jar file + // as the job source, we're running in the current thread. Push on another classloader + // that loads from that jar in addition to everything currently on the classpath. + this.prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile, + tableClassName); + } + } + + /** + * If any classloader was invoked by loadJars, free it here. + */ + protected void unloadJars() { + if (null != this.prevClassLoader) { + // unload the special classloader for this jar. + ClassLoaderStack.setCurrentClassLoader(this.prevClassLoader); + } + } + + /** + * Configure the inputformat to use for the job. + */ + protected void configureInputFormat(Job job, String tableName, + String tableClassName, String splitByCol) throws IOException { + //TODO: 'splitByCol' is import-job specific; lift it out of this API. + Class ifClass = getInputFormatClass(); + LOG.debug("Using InputFormat: " + ifClass); + job.setInputFormatClass(ifClass); + } + + /** + * Configure the output format to use for the job. + */ + protected void configureOutputFormat(Job job, String tableName, + String tableClassName) throws IOException { + Class ofClass = getOutputFormatClass(); + LOG.debug("Using OutputFormat: " + ofClass); + job.setOutputFormatClass(ofClass); + } + + /** + * Set the mapper class implementation to use in the job, + * as well as any related configuration (e.g., map output types). + */ + protected void configureMapper(Job job, String tableName, + String tableClassName) throws IOException { + job.setMapperClass(getMapperClass()); + } + + /** + * Configure the number of map/reduce tasks to use in the job. + */ + protected int configureNumTasks(Job job) throws IOException { + int numMapTasks = options.getNumMappers(); + if (numMapTasks < 1) { + numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS; + LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers."); + } + job.getConfiguration().setInt(JobContext.NUM_MAPS, numMapTasks); + job.setNumReduceTasks(0); + return numMapTasks; + } + + /** + * Actually run the MapReduce job. + */ + protected boolean runJob(Job job) throws ClassNotFoundException, IOException, + InterruptedException { + return job.waitForCompletion(true); + } +} diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/TextExportMapper.java b/src/java/org/apache/hadoop/sqoop/mapreduce/TextExportMapper.java index d0bffc46..b70c8eea 100644 --- a/src/java/org/apache/hadoop/sqoop/mapreduce/TextExportMapper.java +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/TextExportMapper.java @@ -43,16 +43,18 @@ public class TextExportMapper public TextExportMapper() { } - protected void setup(Context context) throws IOException, InterruptedException { + protected void setup(Context context) + throws IOException, InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); // Instantiate a copy of the user's class to hold and parse the record. - String recordClassName = conf.get(ExportJob.SQOOP_EXPORT_TABLE_CLASS_KEY); + String recordClassName = conf.get( + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY); if (null == recordClassName) { throw new IOException("Export table class name (" - + ExportJob.SQOOP_EXPORT_TABLE_CLASS_KEY + + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY + ") is not set!"); } diff --git a/src/java/org/apache/hadoop/sqoop/util/ImportException.java b/src/java/org/apache/hadoop/sqoop/util/ImportException.java index d73ce47a..64d74ed9 100644 --- a/src/java/org/apache/hadoop/sqoop/util/ImportException.java +++ b/src/java/org/apache/hadoop/sqoop/util/ImportException.java @@ -19,9 +19,7 @@ package org.apache.hadoop.sqoop.util; /** - * General error during import process. - * - * + * General error during the import process. */ @SuppressWarnings("serial") public class ImportException extends Exception {