5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 13:10:39 +08:00

SQOOP-721 Duplicating rows on export when exporting from compressed files.

(Jarek Jarcec Cecho via Cheolsoo Park)
This commit is contained in:
Cheolsoo Park 2012-11-29 20:40:25 -08:00
parent 33a7a81412
commit a840f41fc7
3 changed files with 195 additions and 100 deletions

View File

@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.HashSet;
import java.util.List;
@ -35,6 +36,8 @@
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
@ -42,12 +45,19 @@
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* This file was ported from Hadoop 2.0.2-alpha
*/
// CHECKSTYLE:OFF
/**
* An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in
* {@link InputFormat#getSplits(JobContext)} method.
*
* Splits are constructed from the files under the input path
* Splits are constructed from the files under the input paths.
* A split cannot have files from different pools.
* Each split returned may contain blocks from different files.
* If a maxSplitSize is specified, then blocks on the same node are
@ -69,6 +79,14 @@
public abstract class CombineFileInputFormat<K, V>
extends FileInputFormat<K, V> {
public static final Log LOG =
LogFactory.getLog(CombineFileInputFormat.class.getName());
public static final String SPLIT_MINSIZE_PERNODE =
"mapreduce.input.fileinputformat.split.minsize.per.node";
public static final String SPLIT_MINSIZE_PERRACK =
"mapreduce.input.fileinputformat.split.minsize.per.rack";
// ability to limit the size of a single split
private long maxSplitSize = 0;
private long minSplitSizeNode = 0;
@ -85,8 +103,8 @@ public abstract class CombineFileInputFormat<K, V>
* Specify the maximum size (in bytes) of each split. Each split is
* approximately equal to the specified size.
*/
protected void setMaxSplitSize(long val) {
this.maxSplitSize = val;
protected void setMaxSplitSize(long maxSplitSize) {
this.maxSplitSize = maxSplitSize;
}
/**
@ -96,8 +114,8 @@ protected void setMaxSplitSize(long val) {
* This leftover data will be combined into its own split if its size
* exceeds minSplitSizeNode.
*/
protected void setMinSplitSizeNode(long val) {
this.minSplitSizeNode = val;
protected void setMinSplitSizeNode(long minSplitSizeNode) {
this.minSplitSizeNode = minSplitSizeNode;
}
/**
@ -107,8 +125,8 @@ protected void setMinSplitSizeNode(long val) {
* This leftover data will be combined into its own split if its size
* exceeds minSplitSizeRack.
*/
protected void setMinSplitSizeRack(long val) {
this.minSplitSizeRack = val;
protected void setMinSplitSizeRack(long minSplitSizeRack) {
this.minSplitSizeRack = minSplitSizeRack;
}
/**
@ -132,8 +150,21 @@ protected void createPool(PathFilter... filters) {
pools.add(multi);
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
final CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
if (null == codec) {
return true;
}
// Once we remove support for Hadoop < 2.0
//return codec instanceof SplittableCompressionCodec;
return false;
}
/**
* default constructor.
* default constructor
*/
public CombineFileInputFormat() {
}
@ -152,12 +183,12 @@ public List<InputSplit> getSplits(JobContext job)
if (minSplitSizeNode != 0) {
minSizeNode = minSplitSizeNode;
} else {
minSizeNode = conf.getLong("mapred.min.split.size.per.node", 0);
minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
}
if (minSplitSizeRack != 0) {
minSizeRack = minSplitSizeRack;
} else {
minSizeRack = conf.getLong("mapred.min.split.size.per.rack", 0);
minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
}
if (maxSplitSize != 0) {
maxSize = maxSplitSize;
@ -165,19 +196,19 @@ public List<InputSplit> getSplits(JobContext job)
maxSize = conf.getLong("mapred.max.split.size", 0);
}
if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
throw new IOException("Minimum split size pernode " + minSizeNode
+ " cannot be larger than maximum split size "
+ maxSize);
throw new IOException("Minimum split size pernode " + minSizeNode +
" cannot be larger than maximum split size " +
maxSize);
}
if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
throw new IOException("Minimum split size per rack" + minSizeRack
+ " cannot be larger than maximum split size "
+ maxSize);
throw new IOException("Minimum split size per rack" + minSizeRack +
" cannot be larger than maximum split size " +
maxSize);
}
if (minSizeRack != 0 && minSizeNode > minSizeRack) {
throw new IOException("Minimum split size per node" + minSizeNode
+ " cannot be smaller than minimum split "
+ "size per rack " + minSizeRack);
throw new IOException("Minimum split size per node" + minSizeNode +
" cannot be smaller than minimum split " +
"size per rack " + minSizeRack);
}
// all the files in input set
@ -214,12 +245,12 @@ public List<InputSplit> getSplits(JobContext job)
}
}
// create splits for all files in this pool.
getMoreSplits(conf, myPaths.toArray(new Path[myPaths.size()]),
getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]),
maxSize, minSizeNode, minSizeRack, splits);
}
// create splits for all files that are not in any pool.
getMoreSplits(conf, newpaths.toArray(new Path[newpaths.size()]),
getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]),
maxSize, minSizeNode, minSizeRack, splits);
// free up rackToNodes map
@ -228,13 +259,13 @@ public List<InputSplit> getSplits(JobContext job)
}
/**
* Return all the splits in the specified set of paths.
* Return all the splits in the specified set of paths
*/
// CHECKSTYLE:OFF
private void getMoreSplits(Configuration conf, Path[] paths,
private void getMoreSplits(JobContext job, Path[] paths,
long maxSize, long minSizeNode, long minSizeRack,
List<InputSplit> splits)
throws IOException {
Configuration conf = job.getConfiguration();
// all blocks for all the files in input set
OneFileInfo[] files;
@ -259,13 +290,14 @@ private void getMoreSplits(Configuration conf, Path[] paths,
// populate all the blocks for all files
long totLength = 0;
for (int i = 0; i < paths.length; i++) {
files[i] = new OneFileInfo(paths[i], conf, rackToBlocks, blockToNodes,
nodeToBlocks, rackToNodes);
files[i] = new OneFileInfo(paths[i], conf, isSplitable(job, paths[i]),
rackToBlocks, blockToNodes, nodeToBlocks,
rackToNodes, maxSize);
totLength += files[i].getLength();
}
ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
ArrayList<String> nodes = new ArrayList<String>();
Set<String> nodes = new HashSet<String>();
long curSplitSize = 0;
// process all nodes and create splits that are local
@ -319,7 +351,7 @@ private void getMoreSplits(Configuration conf, Path[] paths,
// in 'overflow'. After the processing of all racks is complete, these
// overflow blocks will be combined into splits.
ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
ArrayList<String> racks = new ArrayList<String>();
Set<String> racks = new HashSet<String>();
// Process all racks over and over again until there is no more work to do.
while (blockToNodes.size() > 0) {
@ -375,8 +407,8 @@ private void getMoreSplits(Configuration conf, Path[] paths,
addCreatedSplit(splits, getHosts(racks), validBlocks);
} else {
// There were a few blocks in this rack that
// remained to be processed. Keep them in 'overflow' block list.
// These will be combined later.
// remained to be processed. Keep them in 'overflow' block list.
// These will be combined later.
overflowBlocks.addAll(validBlocks);
}
}
@ -418,14 +450,13 @@ private void getMoreSplits(Configuration conf, Path[] paths,
addCreatedSplit(splits, getHosts(racks), validBlocks);
}
}
// CHECKSTYLE:ON
/**
* Create a single split from the list of blocks specified in validBlocks
* Add this new split into splitList.
*/
private void addCreatedSplit(List<InputSplit> splitList,
List<String> locations,
Collection<String> locations,
ArrayList<OneBlockInfo> validBlocks) {
// create an input split
Path[] fl = new Path[validBlocks.size()];
@ -450,17 +481,19 @@ public abstract RecordReader<K, V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException;
/**
* information about one file from the File System.
* information about one file from the File System
*/
private static class OneFileInfo {
private long fileSize; // size of the file
private OneBlockInfo[] blocks; // all blocks in this file
OneFileInfo(Path path, Configuration conf,
boolean isSplitable,
HashMap<String, List<OneBlockInfo>> rackToBlocks,
HashMap<OneBlockInfo, String[]> blockToNodes,
HashMap<String, List<OneBlockInfo>> nodeToBlocks,
HashMap<String, Set<String>> rackToNodes)
HashMap<String, Set<String>> rackToNodes,
long maxSize)
throws IOException {
this.fileSize = 0;
@ -473,32 +506,82 @@ private static class OneFileInfo {
if (locations == null) {
blocks = new OneBlockInfo[0];
} else {
blocks = new OneBlockInfo[locations.length];
for (int i = 0; i < locations.length; i++) {
fileSize += locations[i].getLength();
OneBlockInfo oneblock = new OneBlockInfo(path,
locations[i].getOffset(),
locations[i].getLength(),
locations[i].getHosts(),
locations[i].getTopologyPaths());
blocks[i] = oneblock;
if(locations.length == 0) {
locations = new BlockLocation[] { new BlockLocation() };
}
if (!isSplitable) {
// if the file is not splitable, just create the one block with
// full file length
blocks = new OneBlockInfo[1];
fileSize = stat.getLen();
blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
.getHosts(), locations[0].getTopologyPaths());
} else {
ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
locations.length);
for (int i = 0; i < locations.length; i++) {
fileSize += locations[i].getLength();
// each split can be a maximum of maxSize
long left = locations[i].getLength();
long myOffset = locations[i].getOffset();
long myLength = 0;
do {
if (maxSize == 0) {
myLength = left;
} else {
if (left > maxSize && left < 2 * maxSize) {
// if remainder is between max and 2*max - then
// instead of creating splits of size max, left-max we
// create splits of size left/2 and left/2. This is
// a heuristic to avoid creating really really small
// splits.
myLength = left / 2;
} else {
myLength = Math.min(maxSize, left);
}
}
OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
myLength, locations[i].getHosts(), locations[i]
.getTopologyPaths());
left -= myLength;
myOffset += myLength;
blocksList.add(oneblock);
} while (left > 0);
}
blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
}
for (OneBlockInfo oneblock : blocks) {
// add this block to the block --> node locations map
blockToNodes.put(oneblock, oneblock.hosts);
// For blocks that do not have host/rack information,
// assign to default rack.
String[] racks = null;
if (oneblock.hosts.length == 0) {
racks = new String[]{NetworkTopology.DEFAULT_RACK};
} else {
racks = oneblock.racks;
}
// add this block to the rack --> block map
for (int j = 0; j < oneblock.racks.length; j++) {
String rack = oneblock.racks[j];
for (int j = 0; j < racks.length; j++) {
String rack = racks[j];
List<OneBlockInfo> blklist = rackToBlocks.get(rack);
if (blklist == null) {
blklist = new ArrayList<OneBlockInfo>();
rackToBlocks.put(rack, blklist);
}
blklist.add(oneblock);
// Add this host to rackToNodes map
addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]);
}
if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
// Add this host to rackToNodes map
addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
}
}
// add this block to the node --> block map
for (int j = 0; j < oneblock.hosts.length; j++) {
@ -524,16 +607,14 @@ OneBlockInfo[] getBlocks() {
}
/**
* information about one block from the File System.
* information about one block from the File System
*/
private static class OneBlockInfo {
// CHECKSTYLE:OFF
Path onepath; // name of this file
long offset; // offset in file
long length; // length of this block
String[] hosts; // nodes on which this block resides
String[] racks; // network topology of hosts
// CHECKSTYLE:ON
OneBlockInfo(Path path, long offset, long len,
String[] hosts, String[] topologyPaths) {
@ -541,8 +622,8 @@ private static class OneBlockInfo {
this.offset = offset;
this.hosts = hosts;
this.length = len;
assert (hosts.length == topologyPaths.length
|| topologyPaths.length == 0);
assert (hosts.length == topologyPaths.length ||
topologyPaths.length == 0);
// if the file system does not have any rack information, then
// use dummy rack location.
@ -563,6 +644,11 @@ private static class OneBlockInfo {
}
}
protected BlockLocation[] getFileBlockLocations(
FileSystem fs, FileStatus stat) throws IOException {
return fs.getFileBlockLocations(stat, 0, stat.getLen());
}
private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
String rack, String host) {
Set<String> hosts = rackToNodes.get(rack);
@ -573,10 +659,12 @@ private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
hosts.add(host);
}
private List<String> getHosts(List<String> racks) {
List<String> hosts = new ArrayList<String>();
private Set<String> getHosts(Set<String> racks) {
Set<String> hosts = new HashSet<String>();
for (String rack : racks) {
hosts.addAll(rackToNodes.get(rack));
if (rackToNodes.containsKey(rack)) {
hosts.addAll(rackToNodes.get(rack));
}
}
return hosts;
}
@ -621,3 +709,5 @@ public String toString() {
}
}
}
// CHECKSTYLE:ON

View File

@ -26,6 +26,11 @@
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.Configuration;
/**
* This file was ported from Hadoop 2.0.2-alpha
*/
// CHECKSTYLE:OFF
/**
* A generic RecordReader that can hand out different recordReaders
* for each chunk in a {@link CombineFileSplit}.
@ -34,19 +39,16 @@
* these data chunks from different files.
* @see CombineFileSplit
*/
public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
// CHECKSTYLE:OFF
static final Class [] constructorSignature = new Class []
{CombineFileSplit.class,
TaskAttemptContext.class,
Integer.class};
// CHECKSTYLE:ON
protected CombineFileSplit split;
protected Class<? extends RecordReader<K, V>> rrClass;
protected Constructor<? extends RecordReader<K, V>> rrConstructor;
protected Class<? extends RecordReader<K,V>> rrClass;
protected Constructor<? extends RecordReader<K,V>> rrConstructor;
protected FileSystem fs;
protected TaskAttemptContext context;
@ -54,10 +56,10 @@ public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
protected long progress;
protected RecordReader<K, V> curReader;
public void initialize(InputSplit psplit,
TaskAttemptContext pcontext) throws IOException, InterruptedException {
this.split = (CombineFileSplit)psplit;
this.context = pcontext;
public void initialize(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
this.split = (CombineFileSplit)split;
this.context = context;
if (null != this.curReader) {
this.curReader.initialize(split, context);
}
@ -106,7 +108,7 @@ public float getProgress() throws IOException, InterruptedException {
*/
public CombineFileRecordReader(CombineFileSplit split,
TaskAttemptContext context,
Class<? extends RecordReader<K, V>> rrClass)
Class<? extends RecordReader<K,V>> rrClass)
throws IOException {
this.split = split;
this.context = context;
@ -119,8 +121,8 @@ public CombineFileRecordReader(CombineFileSplit split,
rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
rrConstructor.setAccessible(true);
} catch (Exception e) {
throw new RuntimeException(rrClass.getName()
+ " does not have valid constructor", e);
throw new RuntimeException(rrClass.getName() +
" does not have valid constructor", e);
}
initNextRecordReader();
}
@ -145,9 +147,6 @@ protected boolean initNextRecordReader() throws IOException {
// get a record reader for the idx-th chunk
try {
curReader = rrConstructor.newInstance(new Object []
{split, context, Integer.valueOf(idx)});
Configuration conf = context.getConfiguration();
// setup some helper config variables.
conf.set("map.input.file", split.getPath(idx).toString());
@ -163,9 +162,10 @@ protected boolean initNextRecordReader() throws IOException {
curReader.initialize(split, context);
}
} catch (Exception e) {
throw new RuntimeException(e);
throw new RuntimeException (e);
}
idx++;
return true;
}
}
// CHECKSTYLE:ON

View File

@ -27,17 +27,21 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
/**
* This file was ported from Hadoop 2.0.2-alpha
*/
// CHECKSTYLE:OFF
/**
* A sub-collection of input files.
*
* Unlike {@link FileSplit}, CombineFileSplit class does not represent
* Unlike FileSplit, CombineFileSplit class does not represent
* a split of a file, but a split of input files into smaller sets.
* A split may contain blocks from different file but all
* the blocks in the same split are probably local to some rack <br>
* CombineFileSplit can be used to implement {@link RecordReader}'s,
* with reading one record per file.
*
* @see FileSplit
* @see CombineFileInputFormat
*/
public class CombineFileSplit extends InputSplit implements Writable {
@ -49,7 +53,7 @@ public class CombineFileSplit extends InputSplit implements Writable {
private long totLength;
/**
* default constructor.
* default constructor
*/
public CombineFileSplit() {}
public CombineFileSplit(Path[] files, long[] start,
@ -58,31 +62,31 @@ public CombineFileSplit(Path[] files, long[] start,
}
public CombineFileSplit(Path[] files, long[] lengths) {
long[] pstartoffset = new long[files.length];
for (int i = 0; i < pstartoffset.length; i++) {
pstartoffset[i] = 0;
long[] startoffset = new long[files.length];
for (int i = 0; i < startoffset.length; i++) {
startoffset[i] = 0;
}
String[] plocations = new String[files.length];
for (int i = 0; i < plocations.length; i++) {
plocations[i] = "";
String[] locations = new String[files.length];
for (int i = 0; i < locations.length; i++) {
locations[i] = "";
}
initSplit(files, pstartoffset, lengths, plocations);
initSplit(files, startoffset, lengths, locations);
}
private void initSplit(Path[] files, long[] start,
long[] plengths, String[] plocations) {
long[] lengths, String[] locations) {
this.startoffset = start;
this.lengths = plengths;
this.lengths = lengths;
this.paths = files;
this.totLength = 0;
this.locations = plocations;
this.locations = locations;
for(long length : lengths) {
totLength += length;
}
}
/**
* Copy constructor.
* Copy constructor
*/
public CombineFileSplit(CombineFileSplit old) throws IOException {
this(old.getPaths(), old.getStartOffsets(),
@ -93,42 +97,42 @@ public long getLength() {
return totLength;
}
/** Returns an array containing the start offsets of the files in the split.*/
/** Returns an array containing the start offsets of the files in the split*/
public long[] getStartOffsets() {
return startoffset;
}
/** Returns an array containing the lengths of the files in the split.*/
/** Returns an array containing the lengths of the files in the split*/
public long[] getLengths() {
return lengths;
}
/** Returns the start offset of the i<sup>th</sup> Path.*/
/** Returns the start offset of the i<sup>th</sup> Path */
public long getOffset(int i) {
return startoffset[i];
}
/** Returns the length of the i<sup>th</sup> Path. */
/** Returns the length of the i<sup>th</sup> Path */
public long getLength(int i) {
return lengths[i];
}
/** Returns the number of Paths in the split.*/
/** Returns the number of Paths in the split */
public int getNumPaths() {
return paths.length;
}
/** Returns the i<sup>th</sup> Path. */
/** Returns the i<sup>th</sup> Path */
public Path getPath(int i) {
return paths[i];
}
/** Returns all the Paths in the split. */
/** Returns all the Paths in the split */
public Path[] getPaths() {
return paths;
}
/** Returns all the Paths where this input-split resides. */
/** Returns all the Paths where this input-split resides */
public String[] getLocations() throws IOException {
return locations;
}
@ -137,17 +141,17 @@ public void readFields(DataInput in) throws IOException {
totLength = in.readLong();
int arrLength = in.readInt();
lengths = new long[arrLength];
for(int i=0; i<arrLength; i++) {
for(int i=0; i<arrLength;i++) {
lengths[i] = in.readLong();
}
int filesLength = in.readInt();
paths = new Path[filesLength];
for(int i=0; i<filesLength; i++) {
for(int i=0; i<filesLength;i++) {
paths[i] = new Path(Text.readString(in));
}
arrLength = in.readInt();
startoffset = new long[arrLength];
for(int i=0; i<arrLength; i++) {
for(int i=0; i<arrLength;i++) {
startoffset[i] = in.readLong();
}
}
@ -172,11 +176,11 @@ public void write(DataOutput out) throws IOException {
public String toString() {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < paths.length; i++) {
if (i == 0) {
if (i == 0 ) {
sb.append("Paths:");
}
sb.append(paths[i].toUri().getPath() + ":" + startoffset[i]
+ "+" + lengths[i]);
sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] +
"+" + lengths[i]);
if (i < paths.length -1) {
sb.append(",");
}
@ -193,3 +197,4 @@ public String toString() {
return sb.toString();
}
}
// CHECKSTYLE:ON