diff --git a/execution/mapreduce/pom.xml b/execution/mapreduce/pom.xml index daf60fa3..9c92a117 100644 --- a/execution/mapreduce/pom.xml +++ b/execution/mapreduce/pom.xml @@ -83,8 +83,6 @@ limitations under the License. com.google.guava guava - ${guava.version} - provided diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index 06872ca7..b201a8d6 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -17,8 +17,8 @@ */ package org.apache.sqoop.execution.mapreduce; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.framework.ExecutionEngine; @@ -61,6 +61,9 @@ public void prepareImportSubmission(SubmissionRequest gRequest) { MRSubmissionRequest request = (MRSubmissionRequest) gRequest; ImportJobConfiguration jobConf = (ImportJobConfiguration) request.getConfigFrameworkJob(); + // Add jar dependencies + addDependencies(request); + // Configure map-reduce classes for import request.setInputFormatClass(SqoopInputFormat.class); @@ -103,6 +106,9 @@ public void prepareExportSubmission(SubmissionRequest gRequest) { MRSubmissionRequest request = (MRSubmissionRequest) gRequest; ExportJobConfiguration jobConf = (ExportJobConfiguration) request.getConfigFrameworkJob(); + // Add jar dependencies + addDependencies(request); + // Configure map-reduce classes for import request.setInputFormatClass(SqoopInputFormat.class); @@ -124,10 +130,22 @@ public void prepareExportSubmission(SubmissionRequest gRequest) { // We should make one extractor that will be able to read all supported file types context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsTextExportExtractor.class.getName()); - context.setString(FileInputFormat.INPUT_DIR, jobConf.input.inputDirectory); + context.setString(JobConstants.HADOOP_INPUTDIR, jobConf.input.inputDirectory); if(request.getExtractors() != null) { context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors()); } } + + /** + * Our execution engine have additional dependencies that needs to be available + * at mapreduce job time. This method will register all dependencies in the request + * object. + * + * @param request Active request object. + */ + protected void addDependencies(MRSubmissionRequest request) { + // Guava + request.addJarForClass(ThreadFactoryBuilder.class); + } } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java index f5123a21..e16a2c47 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java @@ -83,6 +83,8 @@ public final class JobConstants extends Constants { // We're using constants from Hadoop 1. Hadoop 2 has different names, but // provides backward compatibility layer for those names as well. + public static final String HADOOP_INPUTDIR = "mapred.input.dir"; + public static final String HADOOP_OUTDIR = "mapred.output.dir"; public static final String HADOOP_COMPRESS = "mapred.output.compress"; diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java index 7ffd97c3..71e00606 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java @@ -36,7 +36,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.io.compress.SplittableCompressionCodec; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NetworkTopology; @@ -113,12 +112,12 @@ public List getPartitions(ImmutableContext context, } // all the files in input set - String indir = conf.get(FileInputFormat.INPUT_DIR); + String indir = conf.get(JobConstants.HADOOP_INPUTDIR); FileSystem fs = FileSystem.get(conf); List paths = new LinkedList(); for(FileStatus status : fs.listStatus(new Path(indir))) { - if(!status.isDirectory()) { + if(!status.isDir()) { paths.add(status.getPath()); } } @@ -143,7 +142,7 @@ public List getPartitions(ImmutableContext context, } private long getInputSize(Configuration conf) throws IOException { - String indir = conf.get(FileInputFormat.INPUT_DIR); + String indir = conf.get(JobConstants.HADOOP_INPUTDIR); FileSystem fs = FileSystem.get(conf); FileStatus[] files = fs.listStatus(new Path(indir)); long count = 0; @@ -345,10 +344,11 @@ private void getMoreSplits(Configuration conf, List paths, private boolean isSplitable(Configuration conf, Path file) { final CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(file); - if (null == codec) { - return true; - } - return codec instanceof SplittableCompressionCodec; + + // This method might be improved for SplittableCompression codec when we + // drop support for Hadoop 1.0 + return null == codec; + } /** diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java index 0693a095..2261a7cb 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java @@ -74,8 +74,7 @@ private void extractFile(Path file, long start, long length) LOG.info("\t to offset " + end); LOG.info("\t of length " + length); - SequenceFile.Reader filereader = new SequenceFile.Reader(conf, - SequenceFile.Reader.file(file)); + SequenceFile.Reader filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf); if (start > filereader.getPosition()) { filereader.sync(start); // sync to start diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java index c412c810..fdc7d67f 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java @@ -30,8 +30,6 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.io.compress.SplitCompressionInputStream; -import org.apache.hadoop.io.compress.SplittableCompressionCodec; import org.apache.hadoop.util.LineReader; import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.common.SqoopException; @@ -93,32 +91,20 @@ private void extractFile(Path file, long start, long length) byte[] recordDelimiterBytes = String.valueOf( Data.DEFAULT_RECORD_DELIMITER).getBytes( Charset.forName(Data.CHARSET_NAME)); - filereader = new LineReader(filestream, conf, - recordDelimiterBytes); + // Hadoop 1.0 do not have support for custom record delimiter and thus we + // are supporting only default one. + filereader = new LineReader(filestream, conf); fileseeker = filestream; - - } else if (codec instanceof SplittableCompressionCodec) { - SplitCompressionInputStream compressionstream = - ((SplittableCompressionCodec)codec).createInputStream( - filestream, codec.createDecompressor(), start, end, - SplittableCompressionCodec.READ_MODE.BYBLOCK); - byte[] recordDelimiterBytes = String.valueOf( - Data.DEFAULT_RECORD_DELIMITER).getBytes( - Charset.forName(Data.CHARSET_NAME)); - filereader = new LineReader(compressionstream, - conf, recordDelimiterBytes); - fileseeker = compressionstream; - - start = compressionstream.getAdjustedStart(); - end = compressionstream.getAdjustedEnd(); - + // We might add another "else if" case for SplittableCompressionCodec once + // we drop support for Hadoop 1.0. } else { byte[] recordDelimiterBytes = String.valueOf( Data.DEFAULT_RECORD_DELIMITER).getBytes( Charset.forName(Data.CHARSET_NAME)); + // Hadoop 1.0 do not have support for custom record delimiter and thus we + // are supporting only default one. filereader = new LineReader( - codec.createInputStream(filestream, codec.createDecompressor()), - conf, recordDelimiterBytes); + codec.createInputStream(filestream, codec.createDecompressor()), conf); fileseeker = filestream; } diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java index ba44de94..484eb205 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java @@ -71,7 +71,7 @@ public void testUncompressedText() throws Exception { HdfsTextExportExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(FileInputFormat.INPUT_DIR, indir); + conf.set(JobConstants.HADOOP_INPUTDIR, indir); JobUtils.runJob(conf); } @@ -89,7 +89,7 @@ public void testCompressedText() throws Exception { HdfsTextExportExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(FileInputFormat.INPUT_DIR, indir); + conf.set(JobConstants.HADOOP_INPUTDIR, indir); JobUtils.runJob(conf); FileUtils.delete(indir); @@ -102,7 +102,7 @@ public void testCompressedText() throws Exception { HdfsTextExportExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(FileInputFormat.INPUT_DIR, indir); + conf.set(JobConstants.HADOOP_INPUTDIR, indir); JobUtils.runJob(conf); } @@ -120,7 +120,7 @@ public void testUncompressedSequence() throws Exception { HdfsSequenceExportExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(FileInputFormat.INPUT_DIR, indir); + conf.set(JobConstants.HADOOP_INPUTDIR, indir); JobUtils.runJob(conf); } @@ -138,7 +138,7 @@ public void testCompressedSequence() throws Exception { HdfsSequenceExportExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(FileInputFormat.INPUT_DIR, indir); + conf.set(JobConstants.HADOOP_INPUTDIR, indir); JobUtils.runJob(conf); } @@ -198,17 +198,12 @@ private void createSequenceInput(Class clz) "part-r-" + padZeros(fi, 5) + HdfsSequenceImportLoader.EXTENSION); SequenceFile.Writer filewriter; if (codec != null) { - filewriter = SequenceFile.createWriter(conf, - SequenceFile.Writer.file(filepath), - SequenceFile.Writer.keyClass(Text.class), - SequenceFile.Writer.valueClass(NullWritable.class), - SequenceFile.Writer.compression(CompressionType.BLOCK, codec)); + filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), + conf, filepath, Text.class, NullWritable.class, + CompressionType.BLOCK, codec); } else { - filewriter = SequenceFile.createWriter(conf, - SequenceFile.Writer.file(filepath), - SequenceFile.Writer.keyClass(Text.class), - SequenceFile.Writer.valueClass(NullWritable.class), - SequenceFile.Writer.compression(CompressionType.NONE)); + filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), + conf, filepath, Text.class, NullWritable.class, CompressionType.NONE); } Text text = new Text(); diff --git a/pom.xml b/pom.xml index 05fb2512..4ab133fc 100644 --- a/pom.xml +++ b/pom.xml @@ -130,7 +130,6 @@ limitations under the License. com.google.guava guava ${guava.version} - provided