mirror of
https://github.com/apache/sqoop.git
synced 2025-05-04 02:52:19 +08:00
SQOOP-761: HDFSTextExportExtractor loses lines around partition boundaries
(Hari Shreedharan via Jarek Jarcec Cecho)
This commit is contained in:
parent
75559f52e1
commit
bd53c33f79
@ -85,6 +85,9 @@ private void extractFile(Path file, long start, long length)
|
||||
while (hasNext) {
|
||||
datawriter.writeCsvRecord(line.toString());
|
||||
hasNext = filereader.next(line);
|
||||
if(filereader.getPosition() >= end && filereader.syncSeen()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -84,25 +84,16 @@ private void extractFile(Path file, long start, long length)
|
||||
FSDataInputStream filestream = fs.open(file);
|
||||
CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
|
||||
LineReader filereader;
|
||||
Seekable fileseeker;
|
||||
Seekable fileseeker = filestream;
|
||||
|
||||
if (codec == null) {
|
||||
filestream.seek(start);
|
||||
byte[] recordDelimiterBytes = String.valueOf(
|
||||
Data.DEFAULT_RECORD_DELIMITER).getBytes(
|
||||
Charset.forName(Data.CHARSET_NAME));
|
||||
// Hadoop 1.0 do not have support for custom record delimiter and thus we
|
||||
// are supporting only default one.
|
||||
filereader = new LineReader(filestream, conf);
|
||||
fileseeker = filestream;
|
||||
// Hadoop 1.0 does not have support for custom record delimiter and thus we
|
||||
// are supporting only default one.
|
||||
// We might add another "else if" case for SplittableCompressionCodec once
|
||||
// we drop support for Hadoop 1.0.
|
||||
if (codec == null) {
|
||||
filestream.seek(start);
|
||||
filereader = new LineReader(filestream);
|
||||
} else {
|
||||
byte[] recordDelimiterBytes = String.valueOf(
|
||||
Data.DEFAULT_RECORD_DELIMITER).getBytes(
|
||||
Charset.forName(Data.CHARSET_NAME));
|
||||
// Hadoop 1.0 do not have support for custom record delimiter and thus we
|
||||
// are supporting only default one.
|
||||
filereader = new LineReader(
|
||||
codec.createInputStream(filestream, codec.createDecompressor()), conf);
|
||||
fileseeker = filestream;
|
||||
@ -113,15 +104,20 @@ private void extractFile(Path file, long start, long length)
|
||||
// one extra line is read in previous split
|
||||
start += filereader.readLine(new Text(), 0);
|
||||
}
|
||||
|
||||
Text line = new Text();
|
||||
int size;
|
||||
while (fileseeker.getPos() <= end) {
|
||||
LOG.info("Start position: " + String.valueOf(start));
|
||||
long next = start;
|
||||
while (next <= end) {
|
||||
size = filereader.readLine(line, Integer.MAX_VALUE);
|
||||
if (size == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (codec == null) {
|
||||
next += size;
|
||||
} else {
|
||||
next = fileseeker.getPos();
|
||||
}
|
||||
datawriter.writeCsvRecord(line.toString());
|
||||
}
|
||||
LOG.info("Extracting ended on position: " + fileseeker.getPos());
|
||||
@ -132,5 +128,4 @@ public long getRowsRead() {
|
||||
// TODO need to return the rows read
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -107,7 +107,7 @@ public void testCompressedText() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUncompressedSequence() throws Exception {
|
||||
public void testCompressedSequence() throws Exception {
|
||||
FileUtils.delete(indir);
|
||||
FileUtils.mkdirs(indir);
|
||||
createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC);
|
||||
@ -125,7 +125,7 @@ public void testUncompressedSequence() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompressedSequence() throws Exception {
|
||||
public void testUncompressedSequence() throws Exception {
|
||||
FileUtils.delete(indir);
|
||||
FileUtils.mkdirs(indir);
|
||||
createSequenceInput(null);
|
||||
@ -241,9 +241,9 @@ public void load(ImmutableContext context, Object oc, Object oj, DataReader read
|
||||
int numbers = NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE;
|
||||
// This test is not currently working due to bug in HdfsExtractor.
|
||||
// Check SQOOP-761 for more details.
|
||||
// assertEquals((1+numbers)*numbers/2, sum);
|
||||
//
|
||||
// assertEquals(NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE, index-1);
|
||||
assertEquals((1+numbers)*numbers/2, sum);
|
||||
|
||||
assertEquals(NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE, index-1);
|
||||
}
|
||||
}
|
||||
|
||||
|
16
pom.xml
16
pom.xml
@ -353,8 +353,24 @@ limitations under the License.
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<version>2.3.2</version>
|
||||
</plugin>
|
||||
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>2.12</version>
|
||||
<configuration>
|
||||
<forkMode>always</forkMode>
|
||||
<forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds>
|
||||
<redirectTestOutputToFile>true</redirectTestOutputToFile>
|
||||
<argLine>-Xms256m -Xmx1g</argLine>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
|
||||
|
||||
</build>
|
||||
|
||||
<!-- All reports might be generated using mvn site command -->
|
||||
|
Loading…
Reference in New Issue
Block a user