From c4f9ef846cc142f3a292c255db370a9a267cfade Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Tue, 4 Dec 2012 12:08:20 -0800 Subject: [PATCH] QOOP-705: Framework-defined text/sequence partitioner/extractor for HDFS (Bilung Lee via Jarek Jarcec Cecho) --- .../org/apache/sqoop/job/PrefixContext.java | 8 + .../sqoop/job/etl/HdfsExportPartition.java | 150 +++++ .../sqoop/job/etl/HdfsExportPartitioner.java | 554 ++++++++++++++++++ .../job/etl/HdfsSequenceExportExtractor.java | 88 +++ .../job/etl/HdfsTextExportExtractor.java | 140 +++++ .../org/apache/sqoop/job/TestHdfsExtract.java | 253 ++++++++ 6 files changed, 1193 insertions(+) create mode 100644 execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java create mode 100644 execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java create mode 100644 execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java create mode 100644 execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java create mode 100644 execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java index 5488b46f..c3beed7f 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java @@ -59,4 +59,12 @@ public int getInt(String key, int defaultValue) { public boolean getBoolean(String key, boolean defaultValue) { return configuration.getBoolean(prefix + key, defaultValue); } + + /* + * TODO: Use getter methods for retrieval instead of + * exposing configuration directly. + */ + public Configuration getConfiguration() { + return configuration; + } } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java new file mode 100644 index 00000000..0e0e53fe --- /dev/null +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.job.etl; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.fs.Path; + +/** + * This class derives mostly from CombineFileSplit of Hadoop, i.e. + * org.apache.hadoop.mapreduce.lib.input.CombineFileSplit. + */ +public class HdfsExportPartition extends Partition { + + private long lenFiles; + private int numFiles; + private Path[] files; + private long[] offsets; + private long[] lengths; + private String[] locations; + + public HdfsExportPartition() {} + + public HdfsExportPartition(Path[] files, long[] offsets, + long[] lengths, String[] locations) { + for(long length : lengths) { + this.lenFiles += length; + } + this.numFiles = files.length; + this.files = files; + this.offsets = offsets; + this.lengths = lengths; + this.locations = locations; + } + + public long getLengthOfFiles() { + return lenFiles; + } + + public int getNumberOfFiles() { + return numFiles; + } + + public Path getFile(int i) { + return files[i]; + } + + public long getOffset(int i) { + return offsets[i]; + } + + public long getLength(int i) { + return lengths[i]; + } + + public String[] getLocations() { + return locations; + } + + @Override + public void readFields(DataInput in) throws IOException { + numFiles = in.readInt(); + + files = new Path[numFiles]; + for(int i=0; i> rackToNodes = + new HashMap>(); + + @Override + public List getPartitions(ImmutableContext context, + Object connectionConfiguration, Object jobConfiguration) { + Configuration conf = ((PrefixContext)context).getConfiguration(); + + try { + int numTasks = Integer.parseInt(conf.get( + Constants.JOB_ETL_NUMBER_PARTITIONS)); + long numInputBytes = getInputSize(conf); + maxSplitSize = numInputBytes / numTasks; + + long minSizeNode = 0; + long minSizeRack = 0; + long maxSize = 0; + + // the values specified by setxxxSplitSize() takes precedence over the + // values that might have been specified in the config + if (minSplitSizeNode != 0) { + minSizeNode = minSplitSizeNode; + } else { + minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0); + } + if (minSplitSizeRack != 0) { + minSizeRack = minSplitSizeRack; + } else { + minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0); + } + if (maxSplitSize != 0) { + maxSize = maxSplitSize; + } else { + maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0); + } + if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) { + throw new IOException("Minimum split size pernode " + minSizeNode + + " cannot be larger than maximum split size " + + maxSize); + } + if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) { + throw new IOException("Minimum split size per rack" + minSizeRack + + " cannot be larger than maximum split size " + + maxSize); + } + if (minSizeRack != 0 && minSizeNode > minSizeRack) { + throw new IOException("Minimum split size per node" + minSizeNode + + " cannot be smaller than minimum split " + + "size per rack " + minSizeRack); + } + + // all the files in input set + String indir = conf.get(FileInputFormat.INPUT_DIR); + FileSystem fs = FileSystem.get(conf); + Path[] paths = FileUtil.stat2Paths(fs.listStatus(new Path(indir))); + List partitions = new ArrayList(); + if (paths.length == 0) { + return partitions; + } + + // Convert them to Paths first. This is a costly operation and + // we should do it first, otherwise we will incur doing it multiple + // times, one time each for each pool in the next loop. + List newpaths = new ArrayList(); + for (int i = 0; i < paths.length; i++) { + Path p = new Path(paths[i].toUri().getPath()); + newpaths.add(p); + } + paths = null; + + // create splits for all files that are not in any pool. + getMoreSplits(conf, newpaths.toArray(new Path[newpaths.size()]), + maxSize, minSizeNode, minSizeRack, partitions); + + // free up rackToNodes map + rackToNodes.clear(); + + return partitions; + + } catch (IOException e) { + throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0021, e); + } + } + + private long getInputSize(Configuration conf) throws IOException { + String indir = conf.get(FileInputFormat.INPUT_DIR); + FileSystem fs = FileSystem.get(conf); + FileStatus[] files = fs.listStatus(new Path(indir)); + long count = 0; + for (FileStatus file : files) { + count += file.getLen(); + } + return count; + } + + /** + * Return all the splits in the specified set of paths + */ + private void getMoreSplits(Configuration conf, Path[] paths, + long maxSize, long minSizeNode, long minSizeRack, + List partitions) throws IOException { + + // all blocks for all the files in input set + OneFileInfo[] files; + + // mapping from a rack name to the list of blocks it has + HashMap> rackToBlocks = + new HashMap>(); + + // mapping from a block to the nodes on which it has replicas + HashMap blockToNodes = + new HashMap(); + + // mapping from a node to the list of blocks that it contains + HashMap> nodeToBlocks = + new HashMap>(); + + files = new OneFileInfo[paths.length]; + if (paths.length == 0) { + return; + } + + // populate all the blocks for all files + for (int i = 0; i < paths.length; i++) { + files[i] = new OneFileInfo(paths[i], conf, isSplitable(conf, paths[i]), + rackToBlocks, blockToNodes, nodeToBlocks, + rackToNodes, maxSize); + } + + ArrayList validBlocks = new ArrayList(); + Set nodes = new HashSet(); + long curSplitSize = 0; + + // process all nodes and create splits that are local + // to a node. + for (Iterator>> iter = nodeToBlocks.entrySet().iterator(); + iter.hasNext();) { + + Map.Entry> one = iter.next(); + nodes.add(one.getKey()); + List blocksInNode = one.getValue(); + + // for each block, copy it into validBlocks. Delete it from + // blockToNodes so that the same block does not appear in + // two different splits. + for (OneBlockInfo oneblock : blocksInNode) { + if (blockToNodes.containsKey(oneblock)) { + validBlocks.add(oneblock); + blockToNodes.remove(oneblock); + curSplitSize += oneblock.length; + + // if the accumulated split size exceeds the maximum, then + // create this split. + if (maxSize != 0 && curSplitSize >= maxSize) { + // create an input split and add it to the splits array + addCreatedSplit(partitions, nodes, validBlocks); + curSplitSize = 0; + validBlocks.clear(); + } + } + } + // if there were any blocks left over and their combined size is + // larger than minSplitNode, then combine them into one split. + // Otherwise add them back to the unprocessed pool. It is likely + // that they will be combined with other blocks from the + // same rack later on. + if (minSizeNode != 0 && curSplitSize >= minSizeNode) { + // create an input split and add it to the splits array + addCreatedSplit(partitions, nodes, validBlocks); + } else { + for (OneBlockInfo oneblock : validBlocks) { + blockToNodes.put(oneblock, oneblock.hosts); + } + } + validBlocks.clear(); + nodes.clear(); + curSplitSize = 0; + } + + // if blocks in a rack are below the specified minimum size, then keep them + // in 'overflow'. After the processing of all racks is complete, these + // overflow blocks will be combined into splits. + ArrayList overflowBlocks = new ArrayList(); + Set racks = new HashSet(); + + // Process all racks over and over again until there is no more work to do. + while (blockToNodes.size() > 0) { + + // Create one split for this rack before moving over to the next rack. + // Come back to this rack after creating a single split for each of the + // remaining racks. + // Process one rack location at a time, Combine all possible blocks that + // reside on this rack as one split. (constrained by minimum and maximum + // split size). + + // iterate over all racks + for (Iterator>> iter = + rackToBlocks.entrySet().iterator(); iter.hasNext();) { + + Map.Entry> one = iter.next(); + racks.add(one.getKey()); + List blocks = one.getValue(); + + // for each block, copy it into validBlocks. Delete it from + // blockToNodes so that the same block does not appear in + // two different splits. + boolean createdSplit = false; + for (OneBlockInfo oneblock : blocks) { + if (blockToNodes.containsKey(oneblock)) { + validBlocks.add(oneblock); + blockToNodes.remove(oneblock); + curSplitSize += oneblock.length; + + // if the accumulated split size exceeds the maximum, then + // create this split. + if (maxSize != 0 && curSplitSize >= maxSize) { + // create an input split and add it to the splits array + addCreatedSplit(partitions, getHosts(racks), validBlocks); + createdSplit = true; + break; + } + } + } + + // if we created a split, then just go to the next rack + if (createdSplit) { + curSplitSize = 0; + validBlocks.clear(); + racks.clear(); + continue; + } + + if (!validBlocks.isEmpty()) { + if (minSizeRack != 0 && curSplitSize >= minSizeRack) { + // if there is a minimum size specified, then create a single split + // otherwise, store these blocks into overflow data structure + addCreatedSplit(partitions, getHosts(racks), validBlocks); + } else { + // There were a few blocks in this rack that + // remained to be processed. Keep them in 'overflow' block list. + // These will be combined later. + overflowBlocks.addAll(validBlocks); + } + } + curSplitSize = 0; + validBlocks.clear(); + racks.clear(); + } + } + + assert blockToNodes.isEmpty(); + assert curSplitSize == 0; + assert validBlocks.isEmpty(); + assert racks.isEmpty(); + + // Process all overflow blocks + for (OneBlockInfo oneblock : overflowBlocks) { + validBlocks.add(oneblock); + curSplitSize += oneblock.length; + + // This might cause an exiting rack location to be re-added, + // but it should be ok. + for (int i = 0; i < oneblock.racks.length; i++) { + racks.add(oneblock.racks[i]); + } + + // if the accumulated split size exceeds the maximum, then + // create this split. + if (maxSize != 0 && curSplitSize >= maxSize) { + // create an input split and add it to the splits array + addCreatedSplit(partitions, getHosts(racks), validBlocks); + curSplitSize = 0; + validBlocks.clear(); + racks.clear(); + } + } + + // Process any remaining blocks, if any. + if (!validBlocks.isEmpty()) { + addCreatedSplit(partitions, getHosts(racks), validBlocks); + } + } + + private boolean isSplitable(Configuration conf, Path file) { + final CompressionCodec codec = + new CompressionCodecFactory(conf).getCodec(file); + if (null == codec) { + return true; + } + return codec instanceof SplittableCompressionCodec; + } + + /** + * Create a single split from the list of blocks specified in validBlocks + * Add this new split into list. + */ + private void addCreatedSplit(List partitions, + Collection locations, + ArrayList validBlocks) { + // create an input split + Path[] files = new Path[validBlocks.size()]; + long[] offsets = new long[validBlocks.size()]; + long[] lengths = new long[validBlocks.size()]; + for (int i = 0; i < validBlocks.size(); i++) { + files[i] = validBlocks.get(i).onepath; + offsets[i] = validBlocks.get(i).offset; + lengths[i] = validBlocks.get(i).length; + } + + // add this split to the list that is returned + HdfsExportPartition partition = new HdfsExportPartition( + files, offsets, lengths, locations.toArray(new String[0])); + partitions.add(partition); + } + + private Set getHosts(Set racks) { + Set hosts = new HashSet(); + for (String rack : racks) { + if (rackToNodes.containsKey(rack)) { + hosts.addAll(rackToNodes.get(rack)); + } + } + return hosts; + } + + private static void addHostToRack(HashMap> rackToNodes, + String rack, String host) { + Set hosts = rackToNodes.get(rack); + if (hosts == null) { + hosts = new HashSet(); + rackToNodes.put(rack, hosts); + } + hosts.add(host); + } + + /** + * information about one file from the File System + */ + private static class OneFileInfo { + private long fileSize; // size of the file + private OneBlockInfo[] blocks; // all blocks in this file + + OneFileInfo(Path path, Configuration conf, + boolean isSplitable, + HashMap> rackToBlocks, + HashMap blockToNodes, + HashMap> nodeToBlocks, + HashMap> rackToNodes, + long maxSize) + throws IOException { + this.fileSize = 0; + + // get block locations from file system + FileSystem fs = path.getFileSystem(conf); + FileStatus stat = fs.getFileStatus(path); + BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, + stat.getLen()); + // create a list of all block and their locations + if (locations == null) { + blocks = new OneBlockInfo[0]; + } else { + if (!isSplitable) { + // if the file is not splitable, just create the one block with + // full file length + blocks = new OneBlockInfo[1]; + fileSize = stat.getLen(); + blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0] + .getHosts(), locations[0].getTopologyPaths()); + } else { + ArrayList blocksList = new ArrayList( + locations.length); + for (int i = 0; i < locations.length; i++) { + fileSize += locations[i].getLength(); + + // each split can be a maximum of maxSize + long left = locations[i].getLength(); + long myOffset = locations[i].getOffset(); + long myLength = 0; + do { + if (maxSize == 0) { + myLength = left; + } else { + if (left > maxSize && left < 2 * maxSize) { + // if remainder is between max and 2*max - then + // instead of creating splits of size max, left-max we + // create splits of size left/2 and left/2. This is + // a heuristic to avoid creating really really small + // splits. + myLength = left / 2; + } else { + myLength = Math.min(maxSize, left); + } + } + OneBlockInfo oneblock = new OneBlockInfo(path, myOffset, + myLength, locations[i].getHosts(), locations[i] + .getTopologyPaths()); + left -= myLength; + myOffset += myLength; + + blocksList.add(oneblock); + } while (left > 0); + } + blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]); + } + + for (OneBlockInfo oneblock : blocks) { + // add this block to the block --> node locations map + blockToNodes.put(oneblock, oneblock.hosts); + + // For blocks that do not have host/rack information, + // assign to default rack. + String[] racks = null; + if (oneblock.hosts.length == 0) { + racks = new String[]{NetworkTopology.DEFAULT_RACK}; + } else { + racks = oneblock.racks; + } + + // add this block to the rack --> block map + for (int j = 0; j < racks.length; j++) { + String rack = racks[j]; + List blklist = rackToBlocks.get(rack); + if (blklist == null) { + blklist = new ArrayList(); + rackToBlocks.put(rack, blklist); + } + blklist.add(oneblock); + if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) { + // Add this host to rackToNodes map + addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]); + } + } + + // add this block to the node --> block map + for (int j = 0; j < oneblock.hosts.length; j++) { + String node = oneblock.hosts[j]; + List blklist = nodeToBlocks.get(node); + if (blklist == null) { + blklist = new ArrayList(); + nodeToBlocks.put(node, blklist); + } + blklist.add(oneblock); + } + } + } + } + + } + + /** + * information about one block from the File System + */ + private static class OneBlockInfo { + Path onepath; // name of this file + long offset; // offset in file + long length; // length of this block + String[] hosts; // nodes on which this block resides + String[] racks; // network topology of hosts + + OneBlockInfo(Path path, long offset, long len, + String[] hosts, String[] topologyPaths) { + this.onepath = path; + this.offset = offset; + this.hosts = hosts; + this.length = len; + assert (hosts.length == topologyPaths.length || + topologyPaths.length == 0); + + // if the file system does not have any rack information, then + // use dummy rack location. + if (topologyPaths.length == 0) { + topologyPaths = new String[hosts.length]; + for (int i = 0; i < topologyPaths.length; i++) { + topologyPaths[i] = (new NodeBase(hosts[i], + NetworkTopology.DEFAULT_RACK)).toString(); + } + } + + // The topology paths have the host name included as the last + // component. Strip it. + this.racks = new String[topologyPaths.length]; + for (int i = 0; i < topologyPaths.length; i++) { + this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation(); + } + } + } + +} diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java new file mode 100644 index 00000000..1f6714dc --- /dev/null +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.etl; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.sqoop.common.ImmutableContext; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.job.MapreduceExecutionError; +import org.apache.sqoop.job.PrefixContext; +import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.DataWriter; + +public class HdfsSequenceExportExtractor extends Extractor { + + private Configuration conf; + private DataWriter datawriter; + + private final char fieldDelimiter; + + public HdfsSequenceExportExtractor() { + fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER; + } + + @Override + public void run(ImmutableContext context, Object connectionConfiguration, + Object jobConfiguration, Partition partition, DataWriter writer) { + writer.setFieldDelimiter(fieldDelimiter); + + conf = ((PrefixContext)context).getConfiguration(); + datawriter = writer; + + try { + HdfsExportPartition p = (HdfsExportPartition)partition; + int numFiles = p.getNumberOfFiles(); + for (int i=0; i filereader.getPosition()) { + filereader.sync(start); // sync to start + } + + Text line = new Text(); + boolean hasNext = filereader.next(line); + while (hasNext) { + datawriter.writeCsvRecord(line.toString()); + hasNext = filereader.next(line); + } + } + + @Override + public long getRowsRead() { + // TODO need to return the rows read + return 0; + } + +} diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java new file mode 100644 index 00000000..7f1b1443 --- /dev/null +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.etl; + +import java.io.IOException; +import java.nio.charset.Charset; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.SplitCompressionInputStream; +import org.apache.hadoop.io.compress.SplittableCompressionCodec; +import org.apache.hadoop.util.LineReader; +import org.apache.sqoop.common.ImmutableContext; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.job.MapreduceExecutionError; +import org.apache.sqoop.job.PrefixContext; +import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.DataWriter; + +public class HdfsTextExportExtractor extends Extractor { + + private Configuration conf; + private DataWriter datawriter; + + private final char fieldDelimiter; + + public HdfsTextExportExtractor() { + fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER; + } + + @Override + public void run(ImmutableContext context, Object connectionConfiguration, + Object jobConfiguration, Partition partition, DataWriter writer) { + writer.setFieldDelimiter(fieldDelimiter); + + conf = ((PrefixContext)context).getConfiguration(); + datawriter = writer; + + try { + HdfsExportPartition p = (HdfsExportPartition)partition; + int numFiles = p.getNumberOfFiles(); + for (int i=0; i clz) + throws IOException, InstantiationException, IllegalAccessException { + Configuration conf = new Configuration(); + + CompressionCodec codec = null; + String extension = ""; + if (clz != null) { + codec = clz.newInstance(); + if (codec instanceof Configurable) { + ((Configurable) codec).setConf(conf); + } + extension = codec.getDefaultExtension(); + } + + int index = 1; + for (int fi=0; fi clz) + throws IOException, InstantiationException, IllegalAccessException { + Configuration conf = new Configuration(); + + CompressionCodec codec = null; + if (clz != null) { + codec = clz.newInstance(); + if (codec instanceof Configurable) { + ((Configurable) codec).setConf(conf); + } + } + + int index = 1; + for (int fi=0; fi0; i--) { + string = "0" + string; + } + return string; + } + + public static class DummyLoader extends Loader { + @Override + public void run(ImmutableContext context, DataReader reader) + throws Exception { + int index = 1; + int sum = 0; + Object[] array; + while ((array = reader.readArrayRecord()) != null) { + sum += Integer.valueOf(array[0].toString()); + index++; + }; + + int numbers = NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE; + assertEquals((1+numbers)*numbers/2, sum); + + assertEquals(NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE, index-1); + } + } + +}