5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-06 21:23:03 +08:00

SQOOP-2811: Sqoop2: Extracting sequence files may result in duplicates

(Abraham Fine via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2016-02-01 11:13:17 -08:00
parent 0c20d1f069
commit 118aa7c4f9
3 changed files with 297 additions and 23 deletions

View File

@ -26,10 +26,13 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.util.LineReader; import org.apache.hadoop.util.LineReader;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
@ -69,7 +72,7 @@ public Void run() throws Exception {
LOG.info("Working on partition: " + p); LOG.info("Working on partition: " + p);
int numFiles = p.getNumberOfFiles(); int numFiles = p.getNumberOfFiles();
for (int i = 0; i < numFiles; i++) { for (int i = 0; i < numFiles; i++) {
extractFile(linkConfiguration, jobConfiguration, p.getFile(i), p.getOffset(i), p.getLength(i)); extractFile(linkConfiguration, jobConfiguration, p.getFile(i), p.getOffset(i), p.getLength(i), p.getLocations());
} }
return null; return null;
} }
@ -81,7 +84,7 @@ public Void run() throws Exception {
private void extractFile(LinkConfiguration linkConfiguration, private void extractFile(LinkConfiguration linkConfiguration,
FromJobConfiguration fromJobConfiguration, FromJobConfiguration fromJobConfiguration,
Path file, long start, long length) Path file, long start, long length, String[] locations)
throws IOException { throws IOException {
long end = start + length; long end = start + length;
LOG.info("Extracting file " + file); LOG.info("Extracting file " + file);
@ -89,9 +92,9 @@ private void extractFile(LinkConfiguration linkConfiguration,
LOG.info("\t to offset " + end); LOG.info("\t to offset " + end);
LOG.info("\t of length " + length); LOG.info("\t of length " + length);
if(isSequenceFile(file)) { if(isSequenceFile(file)) {
extractSequenceFile(linkConfiguration, fromJobConfiguration, file, start, length); extractSequenceFile(linkConfiguration, fromJobConfiguration, file, start, length, locations);
} else { } else {
extractTextFile(linkConfiguration, fromJobConfiguration, file, start, length); extractTextFile(linkConfiguration, fromJobConfiguration, file, start, length, locations);
} }
} }
@ -105,29 +108,22 @@ private void extractFile(LinkConfiguration linkConfiguration,
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
private void extractSequenceFile(LinkConfiguration linkConfiguration, private void extractSequenceFile(LinkConfiguration linkConfiguration,
FromJobConfiguration fromJobConfiguration, FromJobConfiguration fromJobConfiguration,
Path file, long start, long length) Path file, long start, long length, String[] locations)
throws IOException { throws IOException {
LOG.info("Extracting sequence file"); LOG.info("Extracting sequence file");
long end = start + length; SequenceFileRecordReader<Text, NullWritable> sequenceFileRecordReader = new SequenceFileRecordReader();
SequenceFile.Reader filereader = new SequenceFile.Reader(
file.getFileSystem(conf), file, conf);
if (start > filereader.getPosition()) { try {
filereader.sync(start); // sync to start sequenceFileRecordReader.initialize(new FileSplit(file, start, length, locations), new SqoopTaskAttemptContext(conf) );
} while (sequenceFileRecordReader.nextKeyValue()) {
Text line = new Text();
boolean hasNext = filereader.next(line);
while (hasNext) {
rowsRead++; rowsRead++;
extractRow(linkConfiguration, fromJobConfiguration, line); extractRow(linkConfiguration, fromJobConfiguration, sequenceFileRecordReader.getCurrentKey());
line = new Text();
hasNext = filereader.next(line);
if (filereader.getPosition() >= end && filereader.syncSeen()) {
break;
} }
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
sequenceFileRecordReader.close();
} }
filereader.close();
} }
/** /**
@ -140,7 +136,7 @@ private void extractSequenceFile(LinkConfiguration linkConfiguration,
@SuppressWarnings("resource") @SuppressWarnings("resource")
private void extractTextFile(LinkConfiguration linkConfiguration, private void extractTextFile(LinkConfiguration linkConfiguration,
FromJobConfiguration fromJobConfiguration, FromJobConfiguration fromJobConfiguration,
Path file, long start, long length) Path file, long start, long length, String[] locations)
throws IOException { throws IOException {
LOG.info("Extracting text file"); LOG.info("Extracting text file");
long end = start + length; long end = start + length;

View File

@ -72,6 +72,12 @@ public long getLength(int i) {
return lengths[i]; return lengths[i];
} }
public String[] getLocations() {
String[] locationsCopy = new String[locations.length];
System.arraycopy(locations, 0, locationsCopy, 0, locations.length);
return locationsCopy;
}
@Override @Override
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
numFiles = in.readInt(); numFiles = in.readInt();

View File

@ -0,0 +1,272 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.security.Credentials;
import java.io.IOException;
import java.net.URI;
// Simple TaskAttemptContext wrapper for passing through a conf object
// for hadoop classes
public class SqoopTaskAttemptContext implements TaskAttemptContext {
private Configuration conf;
public SqoopTaskAttemptContext(Configuration conf){
this.conf = conf;
}
@Override
public TaskAttemptID getTaskAttemptID() {
return null;
}
@Override
public void setStatus(String msg) {
}
@Override
public String getStatus() {
return null;
}
@Override
public float getProgress() {
return 0;
}
@Override
public Counter getCounter(Enum<?> counterName) {
return null;
}
@Override
public Counter getCounter(String groupName, String counterName) {
return null;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public Credentials getCredentials() {
return null;
}
@Override
public JobID getJobID() {
return null;
}
@Override
public int getNumReduceTasks() {
return 0;
}
@Override
public Path getWorkingDirectory() throws IOException {
return null;
}
@Override
public Class<?> getOutputKeyClass() {
return null;
}
@Override
public Class<?> getOutputValueClass() {
return null;
}
@Override
public Class<?> getMapOutputKeyClass() {
return null;
}
@Override
public Class<?> getMapOutputValueClass() {
return null;
}
@Override
public String getJobName() {
return null;
}
@Override
public Class<? extends InputFormat<?, ?>> getInputFormatClass() throws
ClassNotFoundException {
return null;
}
@Override
public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass() throws
ClassNotFoundException {
return null;
}
@Override
public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass() throws
ClassNotFoundException {
return null;
}
@Override
public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass() throws
ClassNotFoundException {
return null;
}
@Override
public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
throws ClassNotFoundException {
return null;
}
@Override
public Class<? extends Partitioner<?, ?>> getPartitionerClass() throws
ClassNotFoundException {
return null;
}
@Override
public RawComparator<?> getSortComparator() {
return null;
}
@Override
public String getJar() {
return null;
}
@Override
public RawComparator<?> getCombinerKeyGroupingComparator() {
return null;
}
@Override
public RawComparator<?> getGroupingComparator() {
return null;
}
@Override
public boolean getJobSetupCleanupNeeded() {
return false;
}
@Override
public boolean getTaskCleanupNeeded() {
return false;
}
@Override
public boolean getProfileEnabled() {
return false;
}
@Override
public String getProfileParams() {
return null;
}
@Override
public Configuration.IntegerRanges getProfileTaskRange(boolean isMap) {
return null;
}
@Override
public String getUser() {
return null;
}
@Override
public boolean getSymlink() {
return false;
}
@Override
public Path[] getArchiveClassPaths() {
return new Path[0];
}
@Override
public URI[] getCacheArchives() throws IOException {
return new URI[0];
}
@Override
public URI[] getCacheFiles() throws IOException {
return new URI[0];
}
@Override
public Path[] getLocalCacheArchives() throws IOException {
return new Path[0];
}
@Override
public Path[] getLocalCacheFiles() throws IOException {
return new Path[0];
}
@Override
public Path[] getFileClassPaths() {
return new Path[0];
}
@Override
public String[] getArchiveTimestamps() {
return new String[0];
}
@Override
public String[] getFileTimestamps() {
return new String[0];
}
@Override
public int getMaxMapAttempts() {
return 0;
}
@Override
public int getMaxReduceAttempts() {
return 0;
}
@Override
public void progress() {
}
}