diff --git a/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java b/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java index 4f58d7d8..7d2be385 100644 --- a/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java +++ b/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.LinkedList; import java.util.HashSet; import java.util.List; @@ -35,6 +36,8 @@ import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; @@ -42,12 +45,19 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NetworkTopology; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * This file was ported from Hadoop 2.0.2-alpha + */ +// CHECKSTYLE:OFF /** * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in * {@link InputFormat#getSplits(JobContext)} method. * - * Splits are constructed from the files under the input path + * Splits are constructed from the files under the input paths. * A split cannot have files from different pools. * Each split returned may contain blocks from different files. * If a maxSplitSize is specified, then blocks on the same node are @@ -69,6 +79,14 @@ public abstract class CombineFileInputFormat extends FileInputFormat { + public static final Log LOG = + LogFactory.getLog(CombineFileInputFormat.class.getName()); + + + public static final String SPLIT_MINSIZE_PERNODE = + "mapreduce.input.fileinputformat.split.minsize.per.node"; + public static final String SPLIT_MINSIZE_PERRACK = + "mapreduce.input.fileinputformat.split.minsize.per.rack"; // ability to limit the size of a single split private long maxSplitSize = 0; private long minSplitSizeNode = 0; @@ -85,8 +103,8 @@ public abstract class CombineFileInputFormat * Specify the maximum size (in bytes) of each split. Each split is * approximately equal to the specified size. */ - protected void setMaxSplitSize(long val) { - this.maxSplitSize = val; + protected void setMaxSplitSize(long maxSplitSize) { + this.maxSplitSize = maxSplitSize; } /** @@ -96,8 +114,8 @@ protected void setMaxSplitSize(long val) { * This leftover data will be combined into its own split if its size * exceeds minSplitSizeNode. */ - protected void setMinSplitSizeNode(long val) { - this.minSplitSizeNode = val; + protected void setMinSplitSizeNode(long minSplitSizeNode) { + this.minSplitSizeNode = minSplitSizeNode; } /** @@ -107,8 +125,8 @@ protected void setMinSplitSizeNode(long val) { * This leftover data will be combined into its own split if its size * exceeds minSplitSizeRack. */ - protected void setMinSplitSizeRack(long val) { - this.minSplitSizeRack = val; + protected void setMinSplitSizeRack(long minSplitSizeRack) { + this.minSplitSizeRack = minSplitSizeRack; } /** @@ -132,8 +150,21 @@ protected void createPool(PathFilter... filters) { pools.add(multi); } + @Override + protected boolean isSplitable(JobContext context, Path file) { + final CompressionCodec codec = + new CompressionCodecFactory(context.getConfiguration()).getCodec(file); + if (null == codec) { + return true; + } + + // Once we remove support for Hadoop < 2.0 + //return codec instanceof SplittableCompressionCodec; + return false; + } + /** - * default constructor. + * default constructor */ public CombineFileInputFormat() { } @@ -152,12 +183,12 @@ public List getSplits(JobContext job) if (minSplitSizeNode != 0) { minSizeNode = minSplitSizeNode; } else { - minSizeNode = conf.getLong("mapred.min.split.size.per.node", 0); + minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0); } if (minSplitSizeRack != 0) { minSizeRack = minSplitSizeRack; } else { - minSizeRack = conf.getLong("mapred.min.split.size.per.rack", 0); + minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0); } if (maxSplitSize != 0) { maxSize = maxSplitSize; @@ -165,19 +196,19 @@ public List getSplits(JobContext job) maxSize = conf.getLong("mapred.max.split.size", 0); } if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) { - throw new IOException("Minimum split size pernode " + minSizeNode - + " cannot be larger than maximum split size " - + maxSize); + throw new IOException("Minimum split size pernode " + minSizeNode + + " cannot be larger than maximum split size " + + maxSize); } if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) { - throw new IOException("Minimum split size per rack" + minSizeRack - + " cannot be larger than maximum split size " - + maxSize); + throw new IOException("Minimum split size per rack" + minSizeRack + + " cannot be larger than maximum split size " + + maxSize); } if (minSizeRack != 0 && minSizeNode > minSizeRack) { - throw new IOException("Minimum split size per node" + minSizeNode - + " cannot be smaller than minimum split " - + "size per rack " + minSizeRack); + throw new IOException("Minimum split size per node" + minSizeNode + + " cannot be smaller than minimum split " + + "size per rack " + minSizeRack); } // all the files in input set @@ -214,12 +245,12 @@ public List getSplits(JobContext job) } } // create splits for all files in this pool. - getMoreSplits(conf, myPaths.toArray(new Path[myPaths.size()]), + getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), maxSize, minSizeNode, minSizeRack, splits); } // create splits for all files that are not in any pool. - getMoreSplits(conf, newpaths.toArray(new Path[newpaths.size()]), + getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]), maxSize, minSizeNode, minSizeRack, splits); // free up rackToNodes map @@ -228,13 +259,13 @@ public List getSplits(JobContext job) } /** - * Return all the splits in the specified set of paths. + * Return all the splits in the specified set of paths */ - // CHECKSTYLE:OFF - private void getMoreSplits(Configuration conf, Path[] paths, + private void getMoreSplits(JobContext job, Path[] paths, long maxSize, long minSizeNode, long minSizeRack, List splits) throws IOException { + Configuration conf = job.getConfiguration(); // all blocks for all the files in input set OneFileInfo[] files; @@ -259,13 +290,14 @@ private void getMoreSplits(Configuration conf, Path[] paths, // populate all the blocks for all files long totLength = 0; for (int i = 0; i < paths.length; i++) { - files[i] = new OneFileInfo(paths[i], conf, rackToBlocks, blockToNodes, - nodeToBlocks, rackToNodes); + files[i] = new OneFileInfo(paths[i], conf, isSplitable(job, paths[i]), + rackToBlocks, blockToNodes, nodeToBlocks, + rackToNodes, maxSize); totLength += files[i].getLength(); } ArrayList validBlocks = new ArrayList(); - ArrayList nodes = new ArrayList(); + Set nodes = new HashSet(); long curSplitSize = 0; // process all nodes and create splits that are local @@ -319,7 +351,7 @@ private void getMoreSplits(Configuration conf, Path[] paths, // in 'overflow'. After the processing of all racks is complete, these // overflow blocks will be combined into splits. ArrayList overflowBlocks = new ArrayList(); - ArrayList racks = new ArrayList(); + Set racks = new HashSet(); // Process all racks over and over again until there is no more work to do. while (blockToNodes.size() > 0) { @@ -375,8 +407,8 @@ private void getMoreSplits(Configuration conf, Path[] paths, addCreatedSplit(splits, getHosts(racks), validBlocks); } else { // There were a few blocks in this rack that - // remained to be processed. Keep them in 'overflow' block list. - // These will be combined later. + // remained to be processed. Keep them in 'overflow' block list. + // These will be combined later. overflowBlocks.addAll(validBlocks); } } @@ -418,14 +450,13 @@ private void getMoreSplits(Configuration conf, Path[] paths, addCreatedSplit(splits, getHosts(racks), validBlocks); } } - // CHECKSTYLE:ON /** * Create a single split from the list of blocks specified in validBlocks * Add this new split into splitList. */ private void addCreatedSplit(List splitList, - List locations, + Collection locations, ArrayList validBlocks) { // create an input split Path[] fl = new Path[validBlocks.size()]; @@ -450,17 +481,19 @@ public abstract RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException; /** - * information about one file from the File System. + * information about one file from the File System */ private static class OneFileInfo { private long fileSize; // size of the file private OneBlockInfo[] blocks; // all blocks in this file OneFileInfo(Path path, Configuration conf, + boolean isSplitable, HashMap> rackToBlocks, HashMap blockToNodes, HashMap> nodeToBlocks, - HashMap> rackToNodes) + HashMap> rackToNodes, + long maxSize) throws IOException { this.fileSize = 0; @@ -473,32 +506,82 @@ private static class OneFileInfo { if (locations == null) { blocks = new OneBlockInfo[0]; } else { - blocks = new OneBlockInfo[locations.length]; - for (int i = 0; i < locations.length; i++) { - fileSize += locations[i].getLength(); - OneBlockInfo oneblock = new OneBlockInfo(path, - locations[i].getOffset(), - locations[i].getLength(), - locations[i].getHosts(), - locations[i].getTopologyPaths()); - blocks[i] = oneblock; + if(locations.length == 0) { + locations = new BlockLocation[] { new BlockLocation() }; + } + if (!isSplitable) { + // if the file is not splitable, just create the one block with + // full file length + blocks = new OneBlockInfo[1]; + fileSize = stat.getLen(); + blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0] + .getHosts(), locations[0].getTopologyPaths()); + } else { + ArrayList blocksList = new ArrayList( + locations.length); + for (int i = 0; i < locations.length; i++) { + fileSize += locations[i].getLength(); + + // each split can be a maximum of maxSize + long left = locations[i].getLength(); + long myOffset = locations[i].getOffset(); + long myLength = 0; + do { + if (maxSize == 0) { + myLength = left; + } else { + if (left > maxSize && left < 2 * maxSize) { + // if remainder is between max and 2*max - then + // instead of creating splits of size max, left-max we + // create splits of size left/2 and left/2. This is + // a heuristic to avoid creating really really small + // splits. + myLength = left / 2; + } else { + myLength = Math.min(maxSize, left); + } + } + OneBlockInfo oneblock = new OneBlockInfo(path, myOffset, + myLength, locations[i].getHosts(), locations[i] + .getTopologyPaths()); + left -= myLength; + myOffset += myLength; + + blocksList.add(oneblock); + } while (left > 0); + } + blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]); + } + + for (OneBlockInfo oneblock : blocks) { // add this block to the block --> node locations map blockToNodes.put(oneblock, oneblock.hosts); + // For blocks that do not have host/rack information, + // assign to default rack. + String[] racks = null; + if (oneblock.hosts.length == 0) { + racks = new String[]{NetworkTopology.DEFAULT_RACK}; + } else { + racks = oneblock.racks; + } + // add this block to the rack --> block map - for (int j = 0; j < oneblock.racks.length; j++) { - String rack = oneblock.racks[j]; + for (int j = 0; j < racks.length; j++) { + String rack = racks[j]; List blklist = rackToBlocks.get(rack); if (blklist == null) { blklist = new ArrayList(); rackToBlocks.put(rack, blklist); } blklist.add(oneblock); - // Add this host to rackToNodes map - addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]); - } + if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) { + // Add this host to rackToNodes map + addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]); + } + } // add this block to the node --> block map for (int j = 0; j < oneblock.hosts.length; j++) { @@ -524,16 +607,14 @@ OneBlockInfo[] getBlocks() { } /** - * information about one block from the File System. + * information about one block from the File System */ private static class OneBlockInfo { - // CHECKSTYLE:OFF Path onepath; // name of this file long offset; // offset in file long length; // length of this block String[] hosts; // nodes on which this block resides String[] racks; // network topology of hosts - // CHECKSTYLE:ON OneBlockInfo(Path path, long offset, long len, String[] hosts, String[] topologyPaths) { @@ -541,8 +622,8 @@ private static class OneBlockInfo { this.offset = offset; this.hosts = hosts; this.length = len; - assert (hosts.length == topologyPaths.length - || topologyPaths.length == 0); + assert (hosts.length == topologyPaths.length || + topologyPaths.length == 0); // if the file system does not have any rack information, then // use dummy rack location. @@ -563,6 +644,11 @@ private static class OneBlockInfo { } } + protected BlockLocation[] getFileBlockLocations( + FileSystem fs, FileStatus stat) throws IOException { + return fs.getFileBlockLocations(stat, 0, stat.getLen()); + } + private static void addHostToRack(HashMap> rackToNodes, String rack, String host) { Set hosts = rackToNodes.get(rack); @@ -573,10 +659,12 @@ private static void addHostToRack(HashMap> rackToNodes, hosts.add(host); } - private List getHosts(List racks) { - List hosts = new ArrayList(); + private Set getHosts(Set racks) { + Set hosts = new HashSet(); for (String rack : racks) { - hosts.addAll(rackToNodes.get(rack)); + if (rackToNodes.containsKey(rack)) { + hosts.addAll(rackToNodes.get(rack)); + } } return hosts; } @@ -621,3 +709,5 @@ public String toString() { } } } + +// CHECKSTYLE:ON diff --git a/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java b/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java index 6f9864b8..8d4666ce 100644 --- a/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java +++ b/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java @@ -26,6 +26,11 @@ import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.conf.Configuration; +/** + * This file was ported from Hadoop 2.0.2-alpha + */ +// CHECKSTYLE:OFF + /** * A generic RecordReader that can hand out different recordReaders * for each chunk in a {@link CombineFileSplit}. @@ -34,19 +39,16 @@ * these data chunks from different files. * @see CombineFileSplit */ - public class CombineFileRecordReader extends RecordReader { - // CHECKSTYLE:OFF static final Class [] constructorSignature = new Class [] {CombineFileSplit.class, TaskAttemptContext.class, Integer.class}; - // CHECKSTYLE:ON protected CombineFileSplit split; - protected Class> rrClass; - protected Constructor> rrConstructor; + protected Class> rrClass; + protected Constructor> rrConstructor; protected FileSystem fs; protected TaskAttemptContext context; @@ -54,10 +56,10 @@ public class CombineFileRecordReader extends RecordReader { protected long progress; protected RecordReader curReader; - public void initialize(InputSplit psplit, - TaskAttemptContext pcontext) throws IOException, InterruptedException { - this.split = (CombineFileSplit)psplit; - this.context = pcontext; + public void initialize(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + this.split = (CombineFileSplit)split; + this.context = context; if (null != this.curReader) { this.curReader.initialize(split, context); } @@ -106,7 +108,7 @@ public float getProgress() throws IOException, InterruptedException { */ public CombineFileRecordReader(CombineFileSplit split, TaskAttemptContext context, - Class> rrClass) + Class> rrClass) throws IOException { this.split = split; this.context = context; @@ -119,8 +121,8 @@ public CombineFileRecordReader(CombineFileSplit split, rrConstructor = rrClass.getDeclaredConstructor(constructorSignature); rrConstructor.setAccessible(true); } catch (Exception e) { - throw new RuntimeException(rrClass.getName() - + " does not have valid constructor", e); + throw new RuntimeException(rrClass.getName() + + " does not have valid constructor", e); } initNextRecordReader(); } @@ -145,9 +147,6 @@ protected boolean initNextRecordReader() throws IOException { // get a record reader for the idx-th chunk try { - curReader = rrConstructor.newInstance(new Object [] - {split, context, Integer.valueOf(idx)}); - Configuration conf = context.getConfiguration(); // setup some helper config variables. conf.set("map.input.file", split.getPath(idx).toString()); @@ -163,9 +162,10 @@ protected boolean initNextRecordReader() throws IOException { curReader.initialize(split, context); } } catch (Exception e) { - throw new RuntimeException(e); + throw new RuntimeException (e); } idx++; return true; } } +// CHECKSTYLE:ON diff --git a/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java b/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java index 8cf4d54c..956e23b5 100644 --- a/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java +++ b/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java @@ -27,17 +27,21 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; +/** + * This file was ported from Hadoop 2.0.2-alpha + */ + // CHECKSTYLE:OFF + /** * A sub-collection of input files. * - * Unlike {@link FileSplit}, CombineFileSplit class does not represent + * Unlike FileSplit, CombineFileSplit class does not represent * a split of a file, but a split of input files into smaller sets. * A split may contain blocks from different file but all * the blocks in the same split are probably local to some rack
* CombineFileSplit can be used to implement {@link RecordReader}'s, * with reading one record per file. * - * @see FileSplit * @see CombineFileInputFormat */ public class CombineFileSplit extends InputSplit implements Writable { @@ -49,7 +53,7 @@ public class CombineFileSplit extends InputSplit implements Writable { private long totLength; /** - * default constructor. + * default constructor */ public CombineFileSplit() {} public CombineFileSplit(Path[] files, long[] start, @@ -58,31 +62,31 @@ public CombineFileSplit(Path[] files, long[] start, } public CombineFileSplit(Path[] files, long[] lengths) { - long[] pstartoffset = new long[files.length]; - for (int i = 0; i < pstartoffset.length; i++) { - pstartoffset[i] = 0; + long[] startoffset = new long[files.length]; + for (int i = 0; i < startoffset.length; i++) { + startoffset[i] = 0; } - String[] plocations = new String[files.length]; - for (int i = 0; i < plocations.length; i++) { - plocations[i] = ""; + String[] locations = new String[files.length]; + for (int i = 0; i < locations.length; i++) { + locations[i] = ""; } - initSplit(files, pstartoffset, lengths, plocations); + initSplit(files, startoffset, lengths, locations); } private void initSplit(Path[] files, long[] start, - long[] plengths, String[] plocations) { + long[] lengths, String[] locations) { this.startoffset = start; - this.lengths = plengths; + this.lengths = lengths; this.paths = files; this.totLength = 0; - this.locations = plocations; + this.locations = locations; for(long length : lengths) { totLength += length; } } /** - * Copy constructor. + * Copy constructor */ public CombineFileSplit(CombineFileSplit old) throws IOException { this(old.getPaths(), old.getStartOffsets(), @@ -93,42 +97,42 @@ public long getLength() { return totLength; } - /** Returns an array containing the start offsets of the files in the split.*/ + /** Returns an array containing the start offsets of the files in the split*/ public long[] getStartOffsets() { return startoffset; } - /** Returns an array containing the lengths of the files in the split.*/ + /** Returns an array containing the lengths of the files in the split*/ public long[] getLengths() { return lengths; } - /** Returns the start offset of the ith Path.*/ + /** Returns the start offset of the ith Path */ public long getOffset(int i) { return startoffset[i]; } - /** Returns the length of the ith Path. */ + /** Returns the length of the ith Path */ public long getLength(int i) { return lengths[i]; } - /** Returns the number of Paths in the split.*/ + /** Returns the number of Paths in the split */ public int getNumPaths() { return paths.length; } - /** Returns the ith Path. */ + /** Returns the ith Path */ public Path getPath(int i) { return paths[i]; } - /** Returns all the Paths in the split. */ + /** Returns all the Paths in the split */ public Path[] getPaths() { return paths; } - /** Returns all the Paths where this input-split resides. */ + /** Returns all the Paths where this input-split resides */ public String[] getLocations() throws IOException { return locations; } @@ -137,17 +141,17 @@ public void readFields(DataInput in) throws IOException { totLength = in.readLong(); int arrLength = in.readInt(); lengths = new long[arrLength]; - for(int i=0; i