5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 20:42:20 +08:00

SQOOP-768 Compilation on hadoop profile 100 will fail

(Jarek Jarcec Cecho)
This commit is contained in:
Bilung Lee 2012-12-14 11:52:14 -08:00 committed by Jarek Jarcec Cecho
parent 993c6e42af
commit 543aeb24ef
8 changed files with 49 additions and 52 deletions

View File

@ -83,8 +83,6 @@ limitations under the License.
<dependency> <dependency>
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
<version>${guava.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -17,8 +17,8 @@
*/ */
package org.apache.sqoop.execution.mapreduce; package org.apache.sqoop.execution.mapreduce;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.framework.ExecutionEngine; import org.apache.sqoop.framework.ExecutionEngine;
@ -61,6 +61,9 @@ public void prepareImportSubmission(SubmissionRequest gRequest) {
MRSubmissionRequest request = (MRSubmissionRequest) gRequest; MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
ImportJobConfiguration jobConf = (ImportJobConfiguration) request.getConfigFrameworkJob(); ImportJobConfiguration jobConf = (ImportJobConfiguration) request.getConfigFrameworkJob();
// Add jar dependencies
addDependencies(request);
// Configure map-reduce classes for import // Configure map-reduce classes for import
request.setInputFormatClass(SqoopInputFormat.class); request.setInputFormatClass(SqoopInputFormat.class);
@ -103,6 +106,9 @@ public void prepareExportSubmission(SubmissionRequest gRequest) {
MRSubmissionRequest request = (MRSubmissionRequest) gRequest; MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
ExportJobConfiguration jobConf = (ExportJobConfiguration) request.getConfigFrameworkJob(); ExportJobConfiguration jobConf = (ExportJobConfiguration) request.getConfigFrameworkJob();
// Add jar dependencies
addDependencies(request);
// Configure map-reduce classes for import // Configure map-reduce classes for import
request.setInputFormatClass(SqoopInputFormat.class); request.setInputFormatClass(SqoopInputFormat.class);
@ -124,10 +130,22 @@ public void prepareExportSubmission(SubmissionRequest gRequest) {
// We should make one extractor that will be able to read all supported file types // We should make one extractor that will be able to read all supported file types
context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsTextExportExtractor.class.getName()); context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsTextExportExtractor.class.getName());
context.setString(FileInputFormat.INPUT_DIR, jobConf.input.inputDirectory); context.setString(JobConstants.HADOOP_INPUTDIR, jobConf.input.inputDirectory);
if(request.getExtractors() != null) { if(request.getExtractors() != null) {
context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors()); context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
} }
} }
/**
* Our execution engine have additional dependencies that needs to be available
* at mapreduce job time. This method will register all dependencies in the request
* object.
*
* @param request Active request object.
*/
protected void addDependencies(MRSubmissionRequest request) {
// Guava
request.addJarForClass(ThreadFactoryBuilder.class);
}
} }

View File

@ -83,6 +83,8 @@ public final class JobConstants extends Constants {
// We're using constants from Hadoop 1. Hadoop 2 has different names, but // We're using constants from Hadoop 1. Hadoop 2 has different names, but
// provides backward compatibility layer for those names as well. // provides backward compatibility layer for those names as well.
public static final String HADOOP_INPUTDIR = "mapred.input.dir";
public static final String HADOOP_OUTDIR = "mapred.output.dir"; public static final String HADOOP_OUTDIR = "mapred.output.dir";
public static final String HADOOP_COMPRESS = "mapred.output.compress"; public static final String HADOOP_COMPRESS = "mapred.output.compress";

View File

@ -36,7 +36,6 @@
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
@ -113,12 +112,12 @@ public List<Partition> getPartitions(ImmutableContext context,
} }
// all the files in input set // all the files in input set
String indir = conf.get(FileInputFormat.INPUT_DIR); String indir = conf.get(JobConstants.HADOOP_INPUTDIR);
FileSystem fs = FileSystem.get(conf); FileSystem fs = FileSystem.get(conf);
List<Path> paths = new LinkedList<Path>(); List<Path> paths = new LinkedList<Path>();
for(FileStatus status : fs.listStatus(new Path(indir))) { for(FileStatus status : fs.listStatus(new Path(indir))) {
if(!status.isDirectory()) { if(!status.isDir()) {
paths.add(status.getPath()); paths.add(status.getPath());
} }
} }
@ -143,7 +142,7 @@ public List<Partition> getPartitions(ImmutableContext context,
} }
private long getInputSize(Configuration conf) throws IOException { private long getInputSize(Configuration conf) throws IOException {
String indir = conf.get(FileInputFormat.INPUT_DIR); String indir = conf.get(JobConstants.HADOOP_INPUTDIR);
FileSystem fs = FileSystem.get(conf); FileSystem fs = FileSystem.get(conf);
FileStatus[] files = fs.listStatus(new Path(indir)); FileStatus[] files = fs.listStatus(new Path(indir));
long count = 0; long count = 0;
@ -345,10 +344,11 @@ private void getMoreSplits(Configuration conf, List<Path> paths,
private boolean isSplitable(Configuration conf, Path file) { private boolean isSplitable(Configuration conf, Path file) {
final CompressionCodec codec = final CompressionCodec codec =
new CompressionCodecFactory(conf).getCodec(file); new CompressionCodecFactory(conf).getCodec(file);
if (null == codec) {
return true; // This method might be improved for SplittableCompression codec when we
} // drop support for Hadoop 1.0
return codec instanceof SplittableCompressionCodec; return null == codec;
} }
/** /**

View File

@ -74,8 +74,7 @@ private void extractFile(Path file, long start, long length)
LOG.info("\t to offset " + end); LOG.info("\t to offset " + end);
LOG.info("\t of length " + length); LOG.info("\t of length " + length);
SequenceFile.Reader filereader = new SequenceFile.Reader(conf, SequenceFile.Reader filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf);
SequenceFile.Reader.file(file));
if (start > filereader.getPosition()) { if (start > filereader.getPosition()) {
filereader.sync(start); // sync to start filereader.sync(start); // sync to start

View File

@ -30,8 +30,6 @@
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.util.LineReader; import org.apache.hadoop.util.LineReader;
import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
@ -93,32 +91,20 @@ private void extractFile(Path file, long start, long length)
byte[] recordDelimiterBytes = String.valueOf( byte[] recordDelimiterBytes = String.valueOf(
Data.DEFAULT_RECORD_DELIMITER).getBytes( Data.DEFAULT_RECORD_DELIMITER).getBytes(
Charset.forName(Data.CHARSET_NAME)); Charset.forName(Data.CHARSET_NAME));
filereader = new LineReader(filestream, conf, // Hadoop 1.0 do not have support for custom record delimiter and thus we
recordDelimiterBytes); // are supporting only default one.
filereader = new LineReader(filestream, conf);
fileseeker = filestream; fileseeker = filestream;
// We might add another "else if" case for SplittableCompressionCodec once
} else if (codec instanceof SplittableCompressionCodec) { // we drop support for Hadoop 1.0.
SplitCompressionInputStream compressionstream =
((SplittableCompressionCodec)codec).createInputStream(
filestream, codec.createDecompressor(), start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
byte[] recordDelimiterBytes = String.valueOf(
Data.DEFAULT_RECORD_DELIMITER).getBytes(
Charset.forName(Data.CHARSET_NAME));
filereader = new LineReader(compressionstream,
conf, recordDelimiterBytes);
fileseeker = compressionstream;
start = compressionstream.getAdjustedStart();
end = compressionstream.getAdjustedEnd();
} else { } else {
byte[] recordDelimiterBytes = String.valueOf( byte[] recordDelimiterBytes = String.valueOf(
Data.DEFAULT_RECORD_DELIMITER).getBytes( Data.DEFAULT_RECORD_DELIMITER).getBytes(
Charset.forName(Data.CHARSET_NAME)); Charset.forName(Data.CHARSET_NAME));
// Hadoop 1.0 do not have support for custom record delimiter and thus we
// are supporting only default one.
filereader = new LineReader( filereader = new LineReader(
codec.createInputStream(filestream, codec.createDecompressor()), codec.createInputStream(filestream, codec.createDecompressor()), conf);
conf, recordDelimiterBytes);
fileseeker = filestream; fileseeker = filestream;
} }

View File

@ -71,7 +71,7 @@ public void testUncompressedText() throws Exception {
HdfsTextExportExtractor.class.getName()); HdfsTextExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(FileInputFormat.INPUT_DIR, indir); conf.set(JobConstants.HADOOP_INPUTDIR, indir);
JobUtils.runJob(conf); JobUtils.runJob(conf);
} }
@ -89,7 +89,7 @@ public void testCompressedText() throws Exception {
HdfsTextExportExtractor.class.getName()); HdfsTextExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(FileInputFormat.INPUT_DIR, indir); conf.set(JobConstants.HADOOP_INPUTDIR, indir);
JobUtils.runJob(conf); JobUtils.runJob(conf);
FileUtils.delete(indir); FileUtils.delete(indir);
@ -102,7 +102,7 @@ public void testCompressedText() throws Exception {
HdfsTextExportExtractor.class.getName()); HdfsTextExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(FileInputFormat.INPUT_DIR, indir); conf.set(JobConstants.HADOOP_INPUTDIR, indir);
JobUtils.runJob(conf); JobUtils.runJob(conf);
} }
@ -120,7 +120,7 @@ public void testUncompressedSequence() throws Exception {
HdfsSequenceExportExtractor.class.getName()); HdfsSequenceExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(FileInputFormat.INPUT_DIR, indir); conf.set(JobConstants.HADOOP_INPUTDIR, indir);
JobUtils.runJob(conf); JobUtils.runJob(conf);
} }
@ -138,7 +138,7 @@ public void testCompressedSequence() throws Exception {
HdfsSequenceExportExtractor.class.getName()); HdfsSequenceExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(FileInputFormat.INPUT_DIR, indir); conf.set(JobConstants.HADOOP_INPUTDIR, indir);
JobUtils.runJob(conf); JobUtils.runJob(conf);
} }
@ -198,17 +198,12 @@ private void createSequenceInput(Class<? extends CompressionCodec> clz)
"part-r-" + padZeros(fi, 5) + HdfsSequenceImportLoader.EXTENSION); "part-r-" + padZeros(fi, 5) + HdfsSequenceImportLoader.EXTENSION);
SequenceFile.Writer filewriter; SequenceFile.Writer filewriter;
if (codec != null) { if (codec != null) {
filewriter = SequenceFile.createWriter(conf, filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
SequenceFile.Writer.file(filepath), conf, filepath, Text.class, NullWritable.class,
SequenceFile.Writer.keyClass(Text.class), CompressionType.BLOCK, codec);
SequenceFile.Writer.valueClass(NullWritable.class),
SequenceFile.Writer.compression(CompressionType.BLOCK, codec));
} else { } else {
filewriter = SequenceFile.createWriter(conf, filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
SequenceFile.Writer.file(filepath), conf, filepath, Text.class, NullWritable.class, CompressionType.NONE);
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(NullWritable.class),
SequenceFile.Writer.compression(CompressionType.NONE));
} }
Text text = new Text(); Text text = new Text();

View File

@ -130,7 +130,6 @@ limitations under the License.
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
<version>${guava.version}</version> <version>${guava.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>