diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java index 2261a7cb..16afcdb6 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java @@ -85,6 +85,9 @@ private void extractFile(Path file, long start, long length) while (hasNext) { datawriter.writeCsvRecord(line.toString()); hasNext = filereader.next(line); + if(filereader.getPosition() >= end && filereader.syncSeen()) { + break; + } } } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java index fdc7d67f..80551406 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java @@ -84,25 +84,16 @@ private void extractFile(Path file, long start, long length) FSDataInputStream filestream = fs.open(file); CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file); LineReader filereader; - Seekable fileseeker; + Seekable fileseeker = filestream; - if (codec == null) { - filestream.seek(start); - byte[] recordDelimiterBytes = String.valueOf( - Data.DEFAULT_RECORD_DELIMITER).getBytes( - Charset.forName(Data.CHARSET_NAME)); - // Hadoop 1.0 do not have support for custom record delimiter and thus we - // are supporting only default one. - filereader = new LineReader(filestream, conf); - fileseeker = filestream; + // Hadoop 1.0 does not have support for custom record delimiter and thus we + // are supporting only default one. // We might add another "else if" case for SplittableCompressionCodec once // we drop support for Hadoop 1.0. + if (codec == null) { + filestream.seek(start); + filereader = new LineReader(filestream); } else { - byte[] recordDelimiterBytes = String.valueOf( - Data.DEFAULT_RECORD_DELIMITER).getBytes( - Charset.forName(Data.CHARSET_NAME)); - // Hadoop 1.0 do not have support for custom record delimiter and thus we - // are supporting only default one. filereader = new LineReader( codec.createInputStream(filestream, codec.createDecompressor()), conf); fileseeker = filestream; @@ -113,15 +104,20 @@ private void extractFile(Path file, long start, long length) // one extra line is read in previous split start += filereader.readLine(new Text(), 0); } - Text line = new Text(); int size; - while (fileseeker.getPos() <= end) { + LOG.info("Start position: " + String.valueOf(start)); + long next = start; + while (next <= end) { size = filereader.readLine(line, Integer.MAX_VALUE); if (size == 0) { break; } - + if (codec == null) { + next += size; + } else { + next = fileseeker.getPos(); + } datawriter.writeCsvRecord(line.toString()); } LOG.info("Extracting ended on position: " + fileseeker.getPos()); @@ -132,5 +128,4 @@ public long getRowsRead() { // TODO need to return the rows read return 0; } - } diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java index 484eb205..95cfe856 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java @@ -107,7 +107,7 @@ public void testCompressedText() throws Exception { } @Test - public void testUncompressedSequence() throws Exception { + public void testCompressedSequence() throws Exception { FileUtils.delete(indir); FileUtils.mkdirs(indir); createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC); @@ -125,7 +125,7 @@ public void testUncompressedSequence() throws Exception { } @Test - public void testCompressedSequence() throws Exception { + public void testUncompressedSequence() throws Exception { FileUtils.delete(indir); FileUtils.mkdirs(indir); createSequenceInput(null); @@ -241,9 +241,9 @@ public void load(ImmutableContext context, Object oc, Object oj, DataReader read int numbers = NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE; // This test is not currently working due to bug in HdfsExtractor. // Check SQOOP-761 for more details. -// assertEquals((1+numbers)*numbers/2, sum); -// -// assertEquals(NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE, index-1); + assertEquals((1+numbers)*numbers/2, sum); + + assertEquals(NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE, index-1); } } diff --git a/pom.xml b/pom.xml index be4f1b62..2e66f7f7 100644 --- a/pom.xml +++ b/pom.xml @@ -353,8 +353,24 @@ limitations under the License. maven-jar-plugin 2.3.2 + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.12 + + always + 900 + true + -Xms256m -Xmx1g + + + + +