mirror of
https://github.com/apache/sqoop.git
synced 2025-05-05 04:20:08 +08:00
SQOOP-761: HDFSTextExportExtractor loses lines around partition boundaries
(Hari Shreedharan via Jarek Jarcec Cecho)
This commit is contained in:
parent
2455f74238
commit
c417148888
@ -85,6 +85,9 @@ private void extractFile(Path file, long start, long length)
|
|||||||
while (hasNext) {
|
while (hasNext) {
|
||||||
datawriter.writeCsvRecord(line.toString());
|
datawriter.writeCsvRecord(line.toString());
|
||||||
hasNext = filereader.next(line);
|
hasNext = filereader.next(line);
|
||||||
|
if(filereader.getPosition() >= end && filereader.syncSeen()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,25 +84,16 @@ private void extractFile(Path file, long start, long length)
|
|||||||
FSDataInputStream filestream = fs.open(file);
|
FSDataInputStream filestream = fs.open(file);
|
||||||
CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
|
CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
|
||||||
LineReader filereader;
|
LineReader filereader;
|
||||||
Seekable fileseeker;
|
Seekable fileseeker = filestream;
|
||||||
|
|
||||||
if (codec == null) {
|
// Hadoop 1.0 does not have support for custom record delimiter and thus we
|
||||||
filestream.seek(start);
|
|
||||||
byte[] recordDelimiterBytes = String.valueOf(
|
|
||||||
Data.DEFAULT_RECORD_DELIMITER).getBytes(
|
|
||||||
Charset.forName(Data.CHARSET_NAME));
|
|
||||||
// Hadoop 1.0 do not have support for custom record delimiter and thus we
|
|
||||||
// are supporting only default one.
|
// are supporting only default one.
|
||||||
filereader = new LineReader(filestream, conf);
|
|
||||||
fileseeker = filestream;
|
|
||||||
// We might add another "else if" case for SplittableCompressionCodec once
|
// We might add another "else if" case for SplittableCompressionCodec once
|
||||||
// we drop support for Hadoop 1.0.
|
// we drop support for Hadoop 1.0.
|
||||||
|
if (codec == null) {
|
||||||
|
filestream.seek(start);
|
||||||
|
filereader = new LineReader(filestream);
|
||||||
} else {
|
} else {
|
||||||
byte[] recordDelimiterBytes = String.valueOf(
|
|
||||||
Data.DEFAULT_RECORD_DELIMITER).getBytes(
|
|
||||||
Charset.forName(Data.CHARSET_NAME));
|
|
||||||
// Hadoop 1.0 do not have support for custom record delimiter and thus we
|
|
||||||
// are supporting only default one.
|
|
||||||
filereader = new LineReader(
|
filereader = new LineReader(
|
||||||
codec.createInputStream(filestream, codec.createDecompressor()), conf);
|
codec.createInputStream(filestream, codec.createDecompressor()), conf);
|
||||||
fileseeker = filestream;
|
fileseeker = filestream;
|
||||||
@ -113,15 +104,20 @@ private void extractFile(Path file, long start, long length)
|
|||||||
// one extra line is read in previous split
|
// one extra line is read in previous split
|
||||||
start += filereader.readLine(new Text(), 0);
|
start += filereader.readLine(new Text(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
Text line = new Text();
|
Text line = new Text();
|
||||||
int size;
|
int size;
|
||||||
while (fileseeker.getPos() <= end) {
|
LOG.info("Start position: " + String.valueOf(start));
|
||||||
|
long next = start;
|
||||||
|
while (next <= end) {
|
||||||
size = filereader.readLine(line, Integer.MAX_VALUE);
|
size = filereader.readLine(line, Integer.MAX_VALUE);
|
||||||
if (size == 0) {
|
if (size == 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
if (codec == null) {
|
||||||
|
next += size;
|
||||||
|
} else {
|
||||||
|
next = fileseeker.getPos();
|
||||||
|
}
|
||||||
datawriter.writeCsvRecord(line.toString());
|
datawriter.writeCsvRecord(line.toString());
|
||||||
}
|
}
|
||||||
LOG.info("Extracting ended on position: " + fileseeker.getPos());
|
LOG.info("Extracting ended on position: " + fileseeker.getPos());
|
||||||
@ -132,5 +128,4 @@ public long getRowsRead() {
|
|||||||
// TODO need to return the rows read
|
// TODO need to return the rows read
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -107,7 +107,7 @@ public void testCompressedText() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUncompressedSequence() throws Exception {
|
public void testCompressedSequence() throws Exception {
|
||||||
FileUtils.delete(indir);
|
FileUtils.delete(indir);
|
||||||
FileUtils.mkdirs(indir);
|
FileUtils.mkdirs(indir);
|
||||||
createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC);
|
createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC);
|
||||||
@ -125,7 +125,7 @@ public void testUncompressedSequence() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCompressedSequence() throws Exception {
|
public void testUncompressedSequence() throws Exception {
|
||||||
FileUtils.delete(indir);
|
FileUtils.delete(indir);
|
||||||
FileUtils.mkdirs(indir);
|
FileUtils.mkdirs(indir);
|
||||||
createSequenceInput(null);
|
createSequenceInput(null);
|
||||||
@ -241,9 +241,9 @@ public void load(ImmutableContext context, Object oc, Object oj, DataReader read
|
|||||||
int numbers = NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE;
|
int numbers = NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE;
|
||||||
// This test is not currently working due to bug in HdfsExtractor.
|
// This test is not currently working due to bug in HdfsExtractor.
|
||||||
// Check SQOOP-761 for more details.
|
// Check SQOOP-761 for more details.
|
||||||
// assertEquals((1+numbers)*numbers/2, sum);
|
assertEquals((1+numbers)*numbers/2, sum);
|
||||||
//
|
|
||||||
// assertEquals(NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE, index-1);
|
assertEquals(NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE, index-1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
16
pom.xml
16
pom.xml
@ -353,8 +353,24 @@ limitations under the License.
|
|||||||
<artifactId>maven-jar-plugin</artifactId>
|
<artifactId>maven-jar-plugin</artifactId>
|
||||||
<version>2.3.2</version>
|
<version>2.3.2</version>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
|
||||||
|
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-surefire-plugin</artifactId>
|
||||||
|
<version>2.12</version>
|
||||||
|
<configuration>
|
||||||
|
<forkMode>always</forkMode>
|
||||||
|
<forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds>
|
||||||
|
<redirectTestOutputToFile>true</redirectTestOutputToFile>
|
||||||
|
<argLine>-Xms256m -Xmx1g</argLine>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
</plugins>
|
</plugins>
|
||||||
</pluginManagement>
|
</pluginManagement>
|
||||||
|
|
||||||
|
|
||||||
</build>
|
</build>
|
||||||
|
|
||||||
<!-- All reports might be generated using mvn site command -->
|
<!-- All reports might be generated using mvn site command -->
|
||||||
|
Loading…
Reference in New Issue
Block a user