5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-07 00:39:46 +08:00

QOOP-705: Framework-defined text/sequence partitioner/extractor for HDFS

(Bilung Lee via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2012-12-04 12:08:20 -08:00
parent cf3d71049d
commit c4f9ef846c
6 changed files with 1193 additions and 0 deletions

View File

@ -59,4 +59,12 @@ public int getInt(String key, int defaultValue) {
public boolean getBoolean(String key, boolean defaultValue) { public boolean getBoolean(String key, boolean defaultValue) {
return configuration.getBoolean(prefix + key, defaultValue); return configuration.getBoolean(prefix + key, defaultValue);
} }
/*
* TODO: Use getter methods for retrieval instead of
* exposing configuration directly.
*/
public Configuration getConfiguration() {
return configuration;
}
} }

View File

@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.etl;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
/**
* This class derives mostly from CombineFileSplit of Hadoop, i.e.
* org.apache.hadoop.mapreduce.lib.input.CombineFileSplit.
*/
public class HdfsExportPartition extends Partition {
private long lenFiles;
private int numFiles;
private Path[] files;
private long[] offsets;
private long[] lengths;
private String[] locations;
public HdfsExportPartition() {}
public HdfsExportPartition(Path[] files, long[] offsets,
long[] lengths, String[] locations) {
for(long length : lengths) {
this.lenFiles += length;
}
this.numFiles = files.length;
this.files = files;
this.offsets = offsets;
this.lengths = lengths;
this.locations = locations;
}
public long getLengthOfFiles() {
return lenFiles;
}
public int getNumberOfFiles() {
return numFiles;
}
public Path getFile(int i) {
return files[i];
}
public long getOffset(int i) {
return offsets[i];
}
public long getLength(int i) {
return lengths[i];
}
public String[] getLocations() {
return locations;
}
@Override
public void readFields(DataInput in) throws IOException {
numFiles = in.readInt();
files = new Path[numFiles];
for(int i=0; i<numFiles; i++) {
files[i] = new Path(in.readUTF());
}
offsets = new long[numFiles];
for(int i=0; i<numFiles; i++) {
offsets[i] = in.readLong();
}
lengths = new long[numFiles];
for(int i=0; i<numFiles; i++) {
lengths[i] = in.readLong();
}
for(long length : lengths) {
lenFiles += length;
}
int numLocations = in.readInt();
if (numLocations == 0) {
locations = null;
} else {
locations = new String[numLocations];
for(int i=0; i<numLocations; i++) {
locations[i] = in.readUTF();
}
}
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(numFiles);
for(Path file : files) {
out.writeUTF(file.toString());
}
for(long offset : offsets) {
out.writeLong(offset);
}
for(long length : lengths) {
out.writeLong(length);
}
if (locations == null || locations.length == 0) {
out.writeInt(0);
} else {
out.writeInt(locations.length);
for(String location : locations) {
out.writeUTF(location);
}
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{");
sb.append(files[0]);
for(int i = 1; i < files.length; i++) {
sb.append(", " + files[i]);
}
sb.append("}");
return sb.toString();
}
}

View File

@ -0,0 +1,554 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.etl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.HashMap;
import java.util.Set;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.Constants;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext;
/**
* This class derives mostly from CombineFileInputFormat of Hadoop, i.e.
* org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.
*/
public class HdfsExportPartitioner extends Partitioner {
public static final String SPLIT_MINSIZE_PERNODE =
"mapreduce.input.fileinputformat.split.minsize.per.node";
public static final String SPLIT_MINSIZE_PERRACK =
"mapreduce.input.fileinputformat.split.minsize.per.rack";
// ability to limit the size of a single split
private long maxSplitSize = 0;
private long minSplitSizeNode = 0;
private long minSplitSizeRack = 0;
// mapping from a rack name to the set of Nodes in the rack
private HashMap<String, Set<String>> rackToNodes =
new HashMap<String, Set<String>>();
@Override
public List<Partition> getPartitions(ImmutableContext context,
Object connectionConfiguration, Object jobConfiguration) {
Configuration conf = ((PrefixContext)context).getConfiguration();
try {
int numTasks = Integer.parseInt(conf.get(
Constants.JOB_ETL_NUMBER_PARTITIONS));
long numInputBytes = getInputSize(conf);
maxSplitSize = numInputBytes / numTasks;
long minSizeNode = 0;
long minSizeRack = 0;
long maxSize = 0;
// the values specified by setxxxSplitSize() takes precedence over the
// values that might have been specified in the config
if (minSplitSizeNode != 0) {
minSizeNode = minSplitSizeNode;
} else {
minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
}
if (minSplitSizeRack != 0) {
minSizeRack = minSplitSizeRack;
} else {
minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
}
if (maxSplitSize != 0) {
maxSize = maxSplitSize;
} else {
maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
}
if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
throw new IOException("Minimum split size pernode " + minSizeNode +
" cannot be larger than maximum split size " +
maxSize);
}
if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
throw new IOException("Minimum split size per rack" + minSizeRack +
" cannot be larger than maximum split size " +
maxSize);
}
if (minSizeRack != 0 && minSizeNode > minSizeRack) {
throw new IOException("Minimum split size per node" + minSizeNode +
" cannot be smaller than minimum split " +
"size per rack " + minSizeRack);
}
// all the files in input set
String indir = conf.get(FileInputFormat.INPUT_DIR);
FileSystem fs = FileSystem.get(conf);
Path[] paths = FileUtil.stat2Paths(fs.listStatus(new Path(indir)));
List<Partition> partitions = new ArrayList<Partition>();
if (paths.length == 0) {
return partitions;
}
// Convert them to Paths first. This is a costly operation and
// we should do it first, otherwise we will incur doing it multiple
// times, one time each for each pool in the next loop.
List<Path> newpaths = new ArrayList<Path>();
for (int i = 0; i < paths.length; i++) {
Path p = new Path(paths[i].toUri().getPath());
newpaths.add(p);
}
paths = null;
// create splits for all files that are not in any pool.
getMoreSplits(conf, newpaths.toArray(new Path[newpaths.size()]),
maxSize, minSizeNode, minSizeRack, partitions);
// free up rackToNodes map
rackToNodes.clear();
return partitions;
} catch (IOException e) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0021, e);
}
}
private long getInputSize(Configuration conf) throws IOException {
String indir = conf.get(FileInputFormat.INPUT_DIR);
FileSystem fs = FileSystem.get(conf);
FileStatus[] files = fs.listStatus(new Path(indir));
long count = 0;
for (FileStatus file : files) {
count += file.getLen();
}
return count;
}
/**
* Return all the splits in the specified set of paths
*/
private void getMoreSplits(Configuration conf, Path[] paths,
long maxSize, long minSizeNode, long minSizeRack,
List<Partition> partitions) throws IOException {
// all blocks for all the files in input set
OneFileInfo[] files;
// mapping from a rack name to the list of blocks it has
HashMap<String, List<OneBlockInfo>> rackToBlocks =
new HashMap<String, List<OneBlockInfo>>();
// mapping from a block to the nodes on which it has replicas
HashMap<OneBlockInfo, String[]> blockToNodes =
new HashMap<OneBlockInfo, String[]>();
// mapping from a node to the list of blocks that it contains
HashMap<String, List<OneBlockInfo>> nodeToBlocks =
new HashMap<String, List<OneBlockInfo>>();
files = new OneFileInfo[paths.length];
if (paths.length == 0) {
return;
}
// populate all the blocks for all files
for (int i = 0; i < paths.length; i++) {
files[i] = new OneFileInfo(paths[i], conf, isSplitable(conf, paths[i]),
rackToBlocks, blockToNodes, nodeToBlocks,
rackToNodes, maxSize);
}
ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
Set<String> nodes = new HashSet<String>();
long curSplitSize = 0;
// process all nodes and create splits that are local
// to a node.
for (Iterator<Map.Entry<String,
List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
iter.hasNext();) {
Map.Entry<String, List<OneBlockInfo>> one = iter.next();
nodes.add(one.getKey());
List<OneBlockInfo> blocksInNode = one.getValue();
// for each block, copy it into validBlocks. Delete it from
// blockToNodes so that the same block does not appear in
// two different splits.
for (OneBlockInfo oneblock : blocksInNode) {
if (blockToNodes.containsKey(oneblock)) {
validBlocks.add(oneblock);
blockToNodes.remove(oneblock);
curSplitSize += oneblock.length;
// if the accumulated split size exceeds the maximum, then
// create this split.
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(partitions, nodes, validBlocks);
curSplitSize = 0;
validBlocks.clear();
}
}
}
// if there were any blocks left over and their combined size is
// larger than minSplitNode, then combine them into one split.
// Otherwise add them back to the unprocessed pool. It is likely
// that they will be combined with other blocks from the
// same rack later on.
if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
// create an input split and add it to the splits array
addCreatedSplit(partitions, nodes, validBlocks);
} else {
for (OneBlockInfo oneblock : validBlocks) {
blockToNodes.put(oneblock, oneblock.hosts);
}
}
validBlocks.clear();
nodes.clear();
curSplitSize = 0;
}
// if blocks in a rack are below the specified minimum size, then keep them
// in 'overflow'. After the processing of all racks is complete, these
// overflow blocks will be combined into splits.
ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
Set<String> racks = new HashSet<String>();
// Process all racks over and over again until there is no more work to do.
while (blockToNodes.size() > 0) {
// Create one split for this rack before moving over to the next rack.
// Come back to this rack after creating a single split for each of the
// remaining racks.
// Process one rack location at a time, Combine all possible blocks that
// reside on this rack as one split. (constrained by minimum and maximum
// split size).
// iterate over all racks
for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
rackToBlocks.entrySet().iterator(); iter.hasNext();) {
Map.Entry<String, List<OneBlockInfo>> one = iter.next();
racks.add(one.getKey());
List<OneBlockInfo> blocks = one.getValue();
// for each block, copy it into validBlocks. Delete it from
// blockToNodes so that the same block does not appear in
// two different splits.
boolean createdSplit = false;
for (OneBlockInfo oneblock : blocks) {
if (blockToNodes.containsKey(oneblock)) {
validBlocks.add(oneblock);
blockToNodes.remove(oneblock);
curSplitSize += oneblock.length;
// if the accumulated split size exceeds the maximum, then
// create this split.
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(partitions, getHosts(racks), validBlocks);
createdSplit = true;
break;
}
}
}
// if we created a split, then just go to the next rack
if (createdSplit) {
curSplitSize = 0;
validBlocks.clear();
racks.clear();
continue;
}
if (!validBlocks.isEmpty()) {
if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
// if there is a minimum size specified, then create a single split
// otherwise, store these blocks into overflow data structure
addCreatedSplit(partitions, getHosts(racks), validBlocks);
} else {
// There were a few blocks in this rack that
// remained to be processed. Keep them in 'overflow' block list.
// These will be combined later.
overflowBlocks.addAll(validBlocks);
}
}
curSplitSize = 0;
validBlocks.clear();
racks.clear();
}
}
assert blockToNodes.isEmpty();
assert curSplitSize == 0;
assert validBlocks.isEmpty();
assert racks.isEmpty();
// Process all overflow blocks
for (OneBlockInfo oneblock : overflowBlocks) {
validBlocks.add(oneblock);
curSplitSize += oneblock.length;
// This might cause an exiting rack location to be re-added,
// but it should be ok.
for (int i = 0; i < oneblock.racks.length; i++) {
racks.add(oneblock.racks[i]);
}
// if the accumulated split size exceeds the maximum, then
// create this split.
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(partitions, getHosts(racks), validBlocks);
curSplitSize = 0;
validBlocks.clear();
racks.clear();
}
}
// Process any remaining blocks, if any.
if (!validBlocks.isEmpty()) {
addCreatedSplit(partitions, getHosts(racks), validBlocks);
}
}
private boolean isSplitable(Configuration conf, Path file) {
final CompressionCodec codec =
new CompressionCodecFactory(conf).getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}
/**
* Create a single split from the list of blocks specified in validBlocks
* Add this new split into list.
*/
private void addCreatedSplit(List<Partition> partitions,
Collection<String> locations,
ArrayList<OneBlockInfo> validBlocks) {
// create an input split
Path[] files = new Path[validBlocks.size()];
long[] offsets = new long[validBlocks.size()];
long[] lengths = new long[validBlocks.size()];
for (int i = 0; i < validBlocks.size(); i++) {
files[i] = validBlocks.get(i).onepath;
offsets[i] = validBlocks.get(i).offset;
lengths[i] = validBlocks.get(i).length;
}
// add this split to the list that is returned
HdfsExportPartition partition = new HdfsExportPartition(
files, offsets, lengths, locations.toArray(new String[0]));
partitions.add(partition);
}
private Set<String> getHosts(Set<String> racks) {
Set<String> hosts = new HashSet<String>();
for (String rack : racks) {
if (rackToNodes.containsKey(rack)) {
hosts.addAll(rackToNodes.get(rack));
}
}
return hosts;
}
private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
String rack, String host) {
Set<String> hosts = rackToNodes.get(rack);
if (hosts == null) {
hosts = new HashSet<String>();
rackToNodes.put(rack, hosts);
}
hosts.add(host);
}
/**
* information about one file from the File System
*/
private static class OneFileInfo {
private long fileSize; // size of the file
private OneBlockInfo[] blocks; // all blocks in this file
OneFileInfo(Path path, Configuration conf,
boolean isSplitable,
HashMap<String, List<OneBlockInfo>> rackToBlocks,
HashMap<OneBlockInfo, String[]> blockToNodes,
HashMap<String, List<OneBlockInfo>> nodeToBlocks,
HashMap<String, Set<String>> rackToNodes,
long maxSize)
throws IOException {
this.fileSize = 0;
// get block locations from file system
FileSystem fs = path.getFileSystem(conf);
FileStatus stat = fs.getFileStatus(path);
BlockLocation[] locations = fs.getFileBlockLocations(stat, 0,
stat.getLen());
// create a list of all block and their locations
if (locations == null) {
blocks = new OneBlockInfo[0];
} else {
if (!isSplitable) {
// if the file is not splitable, just create the one block with
// full file length
blocks = new OneBlockInfo[1];
fileSize = stat.getLen();
blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
.getHosts(), locations[0].getTopologyPaths());
} else {
ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
locations.length);
for (int i = 0; i < locations.length; i++) {
fileSize += locations[i].getLength();
// each split can be a maximum of maxSize
long left = locations[i].getLength();
long myOffset = locations[i].getOffset();
long myLength = 0;
do {
if (maxSize == 0) {
myLength = left;
} else {
if (left > maxSize && left < 2 * maxSize) {
// if remainder is between max and 2*max - then
// instead of creating splits of size max, left-max we
// create splits of size left/2 and left/2. This is
// a heuristic to avoid creating really really small
// splits.
myLength = left / 2;
} else {
myLength = Math.min(maxSize, left);
}
}
OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
myLength, locations[i].getHosts(), locations[i]
.getTopologyPaths());
left -= myLength;
myOffset += myLength;
blocksList.add(oneblock);
} while (left > 0);
}
blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
}
for (OneBlockInfo oneblock : blocks) {
// add this block to the block --> node locations map
blockToNodes.put(oneblock, oneblock.hosts);
// For blocks that do not have host/rack information,
// assign to default rack.
String[] racks = null;
if (oneblock.hosts.length == 0) {
racks = new String[]{NetworkTopology.DEFAULT_RACK};
} else {
racks = oneblock.racks;
}
// add this block to the rack --> block map
for (int j = 0; j < racks.length; j++) {
String rack = racks[j];
List<OneBlockInfo> blklist = rackToBlocks.get(rack);
if (blklist == null) {
blklist = new ArrayList<OneBlockInfo>();
rackToBlocks.put(rack, blklist);
}
blklist.add(oneblock);
if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
// Add this host to rackToNodes map
addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
}
}
// add this block to the node --> block map
for (int j = 0; j < oneblock.hosts.length; j++) {
String node = oneblock.hosts[j];
List<OneBlockInfo> blklist = nodeToBlocks.get(node);
if (blklist == null) {
blklist = new ArrayList<OneBlockInfo>();
nodeToBlocks.put(node, blklist);
}
blklist.add(oneblock);
}
}
}
}
}
/**
* information about one block from the File System
*/
private static class OneBlockInfo {
Path onepath; // name of this file
long offset; // offset in file
long length; // length of this block
String[] hosts; // nodes on which this block resides
String[] racks; // network topology of hosts
OneBlockInfo(Path path, long offset, long len,
String[] hosts, String[] topologyPaths) {
this.onepath = path;
this.offset = offset;
this.hosts = hosts;
this.length = len;
assert (hosts.length == topologyPaths.length ||
topologyPaths.length == 0);
// if the file system does not have any rack information, then
// use dummy rack location.
if (topologyPaths.length == 0) {
topologyPaths = new String[hosts.length];
for (int i = 0; i < topologyPaths.length; i++) {
topologyPaths[i] = (new NodeBase(hosts[i],
NetworkTopology.DEFAULT_RACK)).toString();
}
}
// The topology paths have the host name included as the last
// component. Strip it.
this.racks = new String[topologyPaths.length];
for (int i = 0; i < topologyPaths.length; i++) {
this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
}
}
}
}

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.etl;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.DataWriter;
public class HdfsSequenceExportExtractor extends Extractor {
private Configuration conf;
private DataWriter datawriter;
private final char fieldDelimiter;
public HdfsSequenceExportExtractor() {
fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
}
@Override
public void run(ImmutableContext context, Object connectionConfiguration,
Object jobConfiguration, Partition partition, DataWriter writer) {
writer.setFieldDelimiter(fieldDelimiter);
conf = ((PrefixContext)context).getConfiguration();
datawriter = writer;
try {
HdfsExportPartition p = (HdfsExportPartition)partition;
int numFiles = p.getNumberOfFiles();
for (int i=0; i<numFiles; i++) {
extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
}
} catch (IOException e) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
}
}
private void extractFile(Path file, long offset, long length)
throws IOException {
long start = offset;
SequenceFile.Reader filereader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(file));
if (start > filereader.getPosition()) {
filereader.sync(start); // sync to start
}
Text line = new Text();
boolean hasNext = filereader.next(line);
while (hasNext) {
datawriter.writeCsvRecord(line.toString());
hasNext = filereader.next(line);
}
}
@Override
public long getRowsRead() {
// TODO need to return the rows read
return 0;
}
}

View File

@ -0,0 +1,140 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.etl;
import java.io.IOException;
import java.nio.charset.Charset;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.util.LineReader;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.DataWriter;
public class HdfsTextExportExtractor extends Extractor {
private Configuration conf;
private DataWriter datawriter;
private final char fieldDelimiter;
public HdfsTextExportExtractor() {
fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
}
@Override
public void run(ImmutableContext context, Object connectionConfiguration,
Object jobConfiguration, Partition partition, DataWriter writer) {
writer.setFieldDelimiter(fieldDelimiter);
conf = ((PrefixContext)context).getConfiguration();
datawriter = writer;
try {
HdfsExportPartition p = (HdfsExportPartition)partition;
int numFiles = p.getNumberOfFiles();
for (int i=0; i<numFiles; i++) {
extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
}
} catch (IOException e) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
}
}
private void extractFile(Path file, long offset, long length)
throws IOException {
long start = offset;
long end = start + length;
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream filestream = fs.open(file);
CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
LineReader filereader;
Seekable fileseeker;
if (codec == null) {
filestream.seek(start);
byte[] recordDelimiterBytes = String.valueOf(
Data.DEFAULT_RECORD_DELIMITER).getBytes(
Charset.forName(Data.CHARSET_NAME));
filereader = new LineReader(filestream, conf,
recordDelimiterBytes);
fileseeker = filestream;
} else if (codec instanceof SplittableCompressionCodec) {
SplitCompressionInputStream compressionstream =
((SplittableCompressionCodec)codec).createInputStream(
filestream, codec.createDecompressor(), start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
byte[] recordDelimiterBytes = String.valueOf(
Data.DEFAULT_RECORD_DELIMITER).getBytes(
Charset.forName(Data.CHARSET_NAME));
filereader = new LineReader(compressionstream,
conf, recordDelimiterBytes);
fileseeker = compressionstream;
start = compressionstream.getAdjustedStart();
end = compressionstream.getAdjustedEnd();
} else {
byte[] recordDelimiterBytes = String.valueOf(
Data.DEFAULT_RECORD_DELIMITER).getBytes(
Charset.forName(Data.CHARSET_NAME));
filereader = new LineReader(
codec.createInputStream(filestream, codec.createDecompressor()),
conf, recordDelimiterBytes);
fileseeker = filestream;
}
if (start != 0) {
// always throw away first record because
// one extra line is read in previous split
start += filereader.readLine(new Text(), 0);
}
Text line = new Text();
int size;
while (fileseeker.getPos() <= end) {
size = filereader.readLine(line, Integer.MAX_VALUE);
if (size == 0) {
break;
}
datawriter.writeCsvRecord(line.toString());
}
}
@Override
public long getRowsRead() {
// TODO need to return the rows read
return 0;
}
}

View File

@ -0,0 +1,253 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.job.etl.HdfsExportPartitioner;
import org.apache.sqoop.job.etl.HdfsSequenceExportExtractor;
import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
import org.apache.sqoop.job.etl.HdfsTextExportExtractor;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.DataReader;
import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
import org.junit.Test;
public class TestHdfsExtract extends TestCase {
private static final String INPUT_ROOT = "/tmp/sqoop/warehouse/";
private static final int NUMBER_OF_FILES = 5;
private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
private String indir;
public TestHdfsExtract() {
indir = INPUT_ROOT + getClass().getSimpleName();
}
@Test
public void testUncompressedText() throws Exception {
FileUtils.delete(indir);
FileUtils.mkdirs(indir);
createTextInput(null);
Configuration conf = new Configuration();
conf.set(JobConstants.JOB_TYPE, "EXPORT");
conf.set(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR,
HdfsTextExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(FileInputFormat.INPUT_DIR, indir);
JobUtils.runJob(conf);
}
@Test
public void testCompressedText() throws Exception {
FileUtils.delete(indir);
FileUtils.mkdirs(indir);
createTextInput(SqoopFileOutputFormat.DEFAULT_CODEC);
Configuration conf = new Configuration();
conf.set(JobConstants.JOB_TYPE, "EXPORT");
conf.set(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR,
HdfsTextExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(FileInputFormat.INPUT_DIR, indir);
JobUtils.runJob(conf);
FileUtils.delete(indir);
FileUtils.mkdirs(indir);
createTextInput(BZip2Codec.class);
conf.set(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR,
HdfsTextExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(FileInputFormat.INPUT_DIR, indir);
JobUtils.runJob(conf);
}
@Test
public void testUncompressedSequence() throws Exception {
FileUtils.delete(indir);
FileUtils.mkdirs(indir);
createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC);
Configuration conf = new Configuration();
conf.set(JobConstants.JOB_TYPE, "EXPORT");
conf.set(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR,
HdfsSequenceExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(FileInputFormat.INPUT_DIR, indir);
JobUtils.runJob(conf);
}
@Test
public void testCompressedSequence() throws Exception {
FileUtils.delete(indir);
FileUtils.mkdirs(indir);
createSequenceInput(null);
Configuration conf = new Configuration();
conf.set(JobConstants.JOB_TYPE, "EXPORT");
conf.set(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR,
HdfsSequenceExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(FileInputFormat.INPUT_DIR, indir);
JobUtils.runJob(conf);
}
private void createTextInput(Class<? extends CompressionCodec> clz)
throws IOException, InstantiationException, IllegalAccessException {
Configuration conf = new Configuration();
CompressionCodec codec = null;
String extension = "";
if (clz != null) {
codec = clz.newInstance();
if (codec instanceof Configurable) {
((Configurable) codec).setConf(conf);
}
extension = codec.getDefaultExtension();
}
int index = 1;
for (int fi=0; fi<NUMBER_OF_FILES; fi++) {
String fileName = indir + "/" + "part-r-" + padZeros(fi, 5) + extension;
OutputStream filestream = FileUtils.create(fileName);
BufferedWriter filewriter;
if (codec != null) {
filewriter = new BufferedWriter(new OutputStreamWriter(
codec.createOutputStream(filestream, codec.createCompressor()),
Data.CHARSET_NAME));
} else {
filewriter = new BufferedWriter(new OutputStreamWriter(
filestream, Data.CHARSET_NAME));
}
for (int ri=0; ri<NUMBER_OF_ROWS_PER_FILE; ri++) {
String row = index + "," + (double)index + ",'" + index + "'";
filewriter.write(row + Data.DEFAULT_RECORD_DELIMITER);
index++;
}
filewriter.close();
}
}
private void createSequenceInput(Class<? extends CompressionCodec> clz)
throws IOException, InstantiationException, IllegalAccessException {
Configuration conf = new Configuration();
CompressionCodec codec = null;
if (clz != null) {
codec = clz.newInstance();
if (codec instanceof Configurable) {
((Configurable) codec).setConf(conf);
}
}
int index = 1;
for (int fi=0; fi<NUMBER_OF_FILES; fi++) {
Path filepath = new Path(indir,
"part-r-" + padZeros(fi, 5) + HdfsSequenceImportLoader.EXTENSION);
SequenceFile.Writer filewriter;
if (codec != null) {
filewriter = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(filepath),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(NullWritable.class),
SequenceFile.Writer.compression(CompressionType.BLOCK, codec));
} else {
filewriter = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(filepath),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(NullWritable.class),
SequenceFile.Writer.compression(CompressionType.NONE));
}
Text text = new Text();
for (int ri=0; ri<NUMBER_OF_ROWS_PER_FILE; ri++) {
String row = index + "," + (double)index + ",'" + index + "'";
text.set(row);
filewriter.append(text, NullWritable.get());
index++;
}
filewriter.close();
}
}
private String padZeros(int number, int digits) {
String string = String.valueOf(number);
for (int i=(digits-string.length()); i>0; i--) {
string = "0" + string;
}
return string;
}
public static class DummyLoader extends Loader {
@Override
public void run(ImmutableContext context, DataReader reader)
throws Exception {
int index = 1;
int sum = 0;
Object[] array;
while ((array = reader.readArrayRecord()) != null) {
sum += Integer.valueOf(array[0].toString());
index++;
};
int numbers = NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE;
assertEquals((1+numbers)*numbers/2, sum);
assertEquals(NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE, index-1);
}
}
}