diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java index a813c479..b430739a 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java @@ -26,10 +26,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader; import org.apache.hadoop.util.LineReader; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; @@ -69,7 +72,7 @@ public Void run() throws Exception { LOG.info("Working on partition: " + p); int numFiles = p.getNumberOfFiles(); for (int i = 0; i < numFiles; i++) { - extractFile(linkConfiguration, jobConfiguration, p.getFile(i), p.getOffset(i), p.getLength(i)); + extractFile(linkConfiguration, jobConfiguration, p.getFile(i), p.getOffset(i), p.getLength(i), p.getLocations()); } return null; } @@ -81,7 +84,7 @@ public Void run() throws Exception { private void extractFile(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration, - Path file, long start, long length) + Path file, long start, long length, String[] locations) throws IOException { long end = start + length; LOG.info("Extracting file " + file); @@ -89,9 +92,9 @@ private void extractFile(LinkConfiguration linkConfiguration, LOG.info("\t to offset " + end); LOG.info("\t of length " + length); if(isSequenceFile(file)) { - extractSequenceFile(linkConfiguration, fromJobConfiguration, file, start, length); + extractSequenceFile(linkConfiguration, fromJobConfiguration, file, start, length, locations); } else { - extractTextFile(linkConfiguration, fromJobConfiguration, file, start, length); + extractTextFile(linkConfiguration, fromJobConfiguration, file, start, length, locations); } } @@ -105,29 +108,22 @@ private void extractFile(LinkConfiguration linkConfiguration, @SuppressWarnings("deprecation") private void extractSequenceFile(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration, - Path file, long start, long length) + Path file, long start, long length, String[] locations) throws IOException { LOG.info("Extracting sequence file"); - long end = start + length; - SequenceFile.Reader filereader = new SequenceFile.Reader( - file.getFileSystem(conf), file, conf); + SequenceFileRecordReader sequenceFileRecordReader = new SequenceFileRecordReader(); - if (start > filereader.getPosition()) { - filereader.sync(start); // sync to start - } - - Text line = new Text(); - boolean hasNext = filereader.next(line); - while (hasNext) { - rowsRead++; - extractRow(linkConfiguration, fromJobConfiguration, line); - line = new Text(); - hasNext = filereader.next(line); - if (filereader.getPosition() >= end && filereader.syncSeen()) { - break; + try { + sequenceFileRecordReader.initialize(new FileSplit(file, start, length, locations), new SqoopTaskAttemptContext(conf) ); + while (sequenceFileRecordReader.nextKeyValue()) { + rowsRead++; + extractRow(linkConfiguration, fromJobConfiguration, sequenceFileRecordReader.getCurrentKey()); } + } catch (InterruptedException e) { + throw new IOException(e); + } finally { + sequenceFileRecordReader.close(); } - filereader.close(); } /** @@ -140,7 +136,7 @@ private void extractSequenceFile(LinkConfiguration linkConfiguration, @SuppressWarnings("resource") private void extractTextFile(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration, - Path file, long start, long length) + Path file, long start, long length, String[] locations) throws IOException { LOG.info("Extracting text file"); long end = start + length; diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java index 644de605..3af5fa90 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java @@ -72,6 +72,12 @@ public long getLength(int i) { return lengths[i]; } + public String[] getLocations() { + String[] locationsCopy = new String[locations.length]; + System.arraycopy(locations, 0, locationsCopy, 0, locations.length); + return locationsCopy; + } + @Override public void readFields(DataInput in) throws IOException { numFiles = in.readInt(); diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/SqoopTaskAttemptContext.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/SqoopTaskAttemptContext.java new file mode 100644 index 00000000..5bec482c --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/SqoopTaskAttemptContext.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.security.Credentials; + +import java.io.IOException; +import java.net.URI; + +// Simple TaskAttemptContext wrapper for passing through a conf object +// for hadoop classes +public class SqoopTaskAttemptContext implements TaskAttemptContext { + + private Configuration conf; + + public SqoopTaskAttemptContext(Configuration conf){ + this.conf = conf; + } + + @Override + public TaskAttemptID getTaskAttemptID() { + return null; + } + + @Override + public void setStatus(String msg) { + + } + + @Override + public String getStatus() { + return null; + } + + @Override + public float getProgress() { + return 0; + } + + @Override + public Counter getCounter(Enum counterName) { + return null; + } + + @Override + public Counter getCounter(String groupName, String counterName) { + return null; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public Credentials getCredentials() { + return null; + } + + @Override + public JobID getJobID() { + return null; + } + + @Override + public int getNumReduceTasks() { + return 0; + } + + @Override + public Path getWorkingDirectory() throws IOException { + return null; + } + + @Override + public Class getOutputKeyClass() { + return null; + } + + @Override + public Class getOutputValueClass() { + return null; + } + + @Override + public Class getMapOutputKeyClass() { + return null; + } + + @Override + public Class getMapOutputValueClass() { + return null; + } + + @Override + public String getJobName() { + return null; + } + + @Override + public Class> getInputFormatClass() throws + ClassNotFoundException { + return null; + } + + @Override + public Class> getMapperClass() throws + ClassNotFoundException { + return null; + } + + @Override + public Class> getCombinerClass() throws + ClassNotFoundException { + return null; + } + + @Override + public Class> getReducerClass() throws + ClassNotFoundException { + return null; + } + + @Override + public Class> getOutputFormatClass() + throws ClassNotFoundException { + return null; + } + + @Override + public Class> getPartitionerClass() throws + ClassNotFoundException { + return null; + } + + @Override + public RawComparator getSortComparator() { + return null; + } + + @Override + public String getJar() { + return null; + } + + @Override + public RawComparator getCombinerKeyGroupingComparator() { + return null; + } + + @Override + public RawComparator getGroupingComparator() { + return null; + } + + @Override + public boolean getJobSetupCleanupNeeded() { + return false; + } + + @Override + public boolean getTaskCleanupNeeded() { + return false; + } + + @Override + public boolean getProfileEnabled() { + return false; + } + + @Override + public String getProfileParams() { + return null; + } + + @Override + public Configuration.IntegerRanges getProfileTaskRange(boolean isMap) { + return null; + } + + @Override + public String getUser() { + return null; + } + + @Override + public boolean getSymlink() { + return false; + } + + @Override + public Path[] getArchiveClassPaths() { + return new Path[0]; + } + + @Override + public URI[] getCacheArchives() throws IOException { + return new URI[0]; + } + + @Override + public URI[] getCacheFiles() throws IOException { + return new URI[0]; + } + + @Override + public Path[] getLocalCacheArchives() throws IOException { + return new Path[0]; + } + + @Override + public Path[] getLocalCacheFiles() throws IOException { + return new Path[0]; + } + + @Override + public Path[] getFileClassPaths() { + return new Path[0]; + } + + @Override + public String[] getArchiveTimestamps() { + return new String[0]; + } + + @Override + public String[] getFileTimestamps() { + return new String[0]; + } + + @Override + public int getMaxMapAttempts() { + return 0; + } + + @Override + public int getMaxReduceAttempts() { + return 0; + } + + @Override + public void progress() { + + } +} \ No newline at end of file