5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 06:51:49 +08:00

Users can precisely control export parallelism.

Uses CombineFileInputFormat to run exports over a target number
of mappers independent of the number of input files.

From: Aaron Kimball <aaron@cloudera.com>

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149869 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Bayer 2011-07-22 20:03:38 +00:00
parent a0dd7e7490
commit df76e995e8
5 changed files with 370 additions and 72 deletions

View File

@ -0,0 +1,134 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.mapreduce;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.util.ReflectionUtils;
/**
* RecordReader that CombineFileRecordReader can instantiate, which itself
* translates a CombineFileSplit into a FileSplit.
*/
public class CombineShimRecordReader
extends RecordReader<LongWritable, Object> {
public static final Log LOG =
LogFactory.getLog(CombineShimRecordReader.class.getName());
private CombineFileSplit split;
private TaskAttemptContext context;
private int index;
private RecordReader<LongWritable, Object> rr;
/**
* Constructor invoked by CombineFileRecordReader that identifies part of a
* CombineFileSplit to use.
*/
public CombineShimRecordReader(CombineFileSplit split,
TaskAttemptContext context, Integer index)
throws IOException, InterruptedException {
this.index = index;
this.split = (CombineFileSplit) split;
this.context = context;
createChildReader();
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
this.split = (CombineFileSplit) split;
this.context = context;
if (null == rr) {
createChildReader();
}
FileSplit fileSplit = new FileSplit(this.split.getPath(index),
this.split.getOffset(index), this.split.getLength(index),
this.split.getLocations());
this.rr.initialize(fileSplit, context);
}
@Override
public float getProgress() throws IOException, InterruptedException {
return rr.getProgress();
}
@Override
public void close() throws IOException {
if (null != rr) {
rr.close();
rr = null;
}
}
@Override
public LongWritable getCurrentKey()
throws IOException, InterruptedException {
return rr.getCurrentKey();
}
@Override
public Object getCurrentValue()
throws IOException, InterruptedException {
return rr.getCurrentValue();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return rr.nextKeyValue();
}
/**
* Actually instantiate the user's chosen RecordReader implementation.
*/
@SuppressWarnings("unchecked")
private void createChildReader() throws IOException, InterruptedException {
LOG.debug("ChildSplit operates on: " + split.getPath(index));
Configuration conf = context.getConfiguration();
// Determine the file format we're reading.
Class rrClass;
if (ExportInputFormat.isSequenceFiles(conf, split.getPath(index))) {
rrClass = SequenceFileRecordReader.class;
} else {
rrClass = LineRecordReader.class;
}
// Create the appropriate record reader.
this.rr = (RecordReader<LongWritable, Object>)
ReflectionUtils.newInstance(rrClass, conf);
}
}

View File

@ -0,0 +1,194 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.mapreduce;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
/**
* InputFormat that generates a user-defined number of splits to inject data
* into the database.
*/
public class ExportInputFormat
extends CombineFileInputFormat<LongWritable, Object> {
public static final Log LOG =
LogFactory.getLog(ExportInputFormat.class.getName());
/** How many map tasks to use for export */
public static final String EXPORT_MAP_TASKS_KEY =
"sqoop.mapreduce.export.map.tasks";
public static final int DEFAULT_NUM_MAP_TASKS = 4;
public ExportInputFormat() {
}
/**
* @return true if p is a SequenceFile, or a directory containing
* SequenceFiles.
*/
public static boolean isSequenceFiles(Configuration conf, Path p)
throws IOException {
FileSystem fs = p.getFileSystem(conf);
try {
FileStatus stat = fs.getFileStatus(p);
if (null == stat) {
// Couldn't get the item.
LOG.warn("Input path " + p + " does not exist");
return false;
}
if (stat.isDir()) {
FileStatus [] subitems = fs.listStatus(p);
if (subitems == null || subitems.length == 0) {
LOG.warn("Input path " + p + " contains no files");
return false; // empty dir.
}
// Pick a random child entry to examine instead.
stat = subitems[0];
}
if (null == stat) {
LOG.warn("null FileStatus object in isSequenceFiles(); assuming false.");
return false;
}
Path target = stat.getPath();
// Test target's header to see if it contains magic numbers indicating it's
// a SequenceFile.
byte [] header = new byte[3];
FSDataInputStream is = null;
try {
is = fs.open(target);
is.readFully(header);
} catch (IOException ioe) {
// Error reading header or EOF; assume not a SequenceFile.
LOG.warn("IOException checking SequenceFile header: " + ioe);
return false;
} finally {
try {
if (null != is) {
is.close();
}
} catch (IOException ioe) {
// ignore; closing.
LOG.warn("IOException closing input stream: " + ioe + "; ignoring.");
}
}
// Return true (isSequenceFile) iff the magic number sticks.
return header[0] == 'S' && header[1] == 'E' && header[2] == 'Q';
} catch (FileNotFoundException fnfe) {
LOG.warn("Input path " + p + " does not exist");
return false; // doesn't exist!
}
}
/**
* @return the number of bytes across all files in the job.
*/
private long getJobSize(JobContext job) throws IOException {
List<FileStatus> stats = listStatus(job);
long count = 0;
for (FileStatus stat : stats) {
count += stat.getLen();
}
return count;
}
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
// Set the max split size based on the number of map tasks we want.
long numTasks = getNumMapTasks(job);
long numFileBytes = getJobSize(job);
long maxSplitSize = numFileBytes / numTasks;
setMaxSplitSize(maxSplitSize);
LOG.debug("Target numMapTasks=" + numTasks);
LOG.debug("Total input bytes=" + numFileBytes);
LOG.debug("maxSplitSize=" + maxSplitSize);
List<InputSplit> splits = super.getSplits(job);
if (LOG.isDebugEnabled()) {
LOG.debug("Generated splits:");
for (InputSplit split : splits) {
LOG.debug(" " + split);
}
}
return splits;
}
@Override
@SuppressWarnings("unchecked")
public RecordReader createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException {
CombineFileSplit combineSplit = (CombineFileSplit) split;
// Use CombineFileRecordReader since this can handle CombineFileSplits
// and instantiate another RecordReader in a loop; do this with the
// CombineShimRecordReader.
RecordReader rr = new CombineFileRecordReader(combineSplit, context,
CombineShimRecordReader.class);
return rr;
}
/**
* Allows the user to control the number of map tasks used for this
* export job.
*/
public static void setNumMapTasks(JobContext job, int numTasks) {
job.getConfiguration().setInt(EXPORT_MAP_TASKS_KEY, numTasks);
}
/**
* @return the number of map tasks to use in this export job.
*/
public static int getNumMapTasks(JobContext job) {
return job.getConfiguration().getInt(EXPORT_MAP_TASKS_KEY,
DEFAULT_NUM_MAP_TASKS);
}
}

View File

@ -63,70 +63,6 @@ public ExportJob(final ExportJobContext ctxt) {
this.context = ctxt;
}
/**
* @return true if p is a SequenceFile, or a directory containing
* SequenceFiles.
*/
private boolean isSequenceFiles(Path p) throws IOException {
Configuration conf = context.getOptions().getConf();
FileSystem fs = p.getFileSystem(conf);
try {
FileStatus stat = fs.getFileStatus(p);
if (null == stat) {
// Couldn't get the item.
LOG.warn("Input path " + p + " does not exist");
return false;
}
if (stat.isDir()) {
FileStatus [] subitems = fs.listStatus(p);
if (subitems == null || subitems.length == 0) {
LOG.warn("Input path " + p + " contains no files");
return false; // empty dir.
}
// Pick a random child entry to examine instead.
stat = subitems[0];
}
if (null == stat) {
LOG.warn("null FileStatus object in isSequenceFiles(); assuming false.");
return false;
}
Path target = stat.getPath();
// Test target's header to see if it contains magic numbers indicating it's
// a SequenceFile.
byte [] header = new byte[3];
FSDataInputStream is = null;
try {
is = fs.open(target);
is.readFully(header);
} catch (IOException ioe) {
// Error reading header or EOF; assume not a SequenceFile.
LOG.warn("IOException checking SequenceFile header: " + ioe);
return false;
} finally {
try {
if (null != is) {
is.close();
}
} catch (IOException ioe) {
// ignore; closing.
LOG.warn("IOException closing input stream: " + ioe + "; ignoring.");
}
}
// Return true (isSequenceFile) iff the magic number sticks.
return header[0] == 'S' && header[1] == 'E' && header[2] == 'Q';
} catch (FileNotFoundException fnfe) {
LOG.warn("Input path " + p + " does not exist");
return false; // doesn't exist!
}
}
/**
* Run an export job to dump a table from HDFS to a database
* @throws IOException if the export job encounters an IO error
@ -161,16 +97,19 @@ public void runExport() throws ExportException, IOException {
Path inputPath = new Path(context.getOptions().getExportDir());
inputPath = inputPath.makeQualified(FileSystem.get(conf));
if (isSequenceFiles(inputPath)) {
job.setInputFormatClass(SequenceFileInputFormat.class);
boolean isSeqFiles = ExportInputFormat.isSequenceFiles(
context.getOptions().getConf(), inputPath);
if (isSeqFiles) {
job.setMapperClass(SequenceFileExportMapper.class);
} else {
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(TextExportMapper.class);
}
job.setInputFormatClass(ExportInputFormat.class);
FileInputFormat.addInputPath(job, inputPath);
job.setNumReduceTasks(0);
ExportInputFormat.setNumMapTasks(job, options.getNumMappers());
// Concurrent writes of the same records would be problematic.
job.setMapSpeculativeExecution(false);

View File

@ -305,15 +305,44 @@ private void assertColMinAndMax(String colName, ColumnGenerator generator)
assertColValForRowId(maxId, colName, expectedMax);
}
private void multiFileTest(int numFiles, int recordsPerMap, int numMaps)
throws IOException, SQLException {
final int TOTAL_RECORDS = numFiles * recordsPerMap;
try {
LOG.info("Beginning test: numFiles=" + numFiles + "; recordsPerMap="
+ recordsPerMap + "; numMaps=" + numMaps);
for (int i = 0; i < numFiles; i++) {
createTextFile(i, recordsPerMap, false);
}
createTable();
runExport(getArgv(true, "-m", "" + numMaps));
verifyExport(TOTAL_RECORDS);
} finally {
LOG.info("multi-file test complete");
}
}
/** Export 10 rows, make sure they load in correctly */
public void testTextExport() throws IOException, SQLException {
multiFileTest(1, 10, 1);
}
final int TOTAL_RECORDS = 10;
/** Make sure we can use CombineFileInputFormat to handle multiple
* files in a single mapper.
*/
public void testMultiFilesOneMapper() throws IOException, SQLException {
multiFileTest(2, 10, 1);
}
createTextFile(0, TOTAL_RECORDS, false);
createTable();
runExport(getArgv(true));
verifyExport(TOTAL_RECORDS);
/** Make sure we can use CombineFileInputFormat to handle multiple
* files and multiple maps
*/
public void testMultiFilesMultiMaps() throws IOException, SQLException {
multiFileTest(2, 10, 2);
}
/** Export 10 rows from gzipped text files. */

View File

@ -66,6 +66,8 @@ protected String getTablePrefix() {
args.add("\\t");
args.add("--lines-terminated-by");
args.add("\\n");
args.add("-m");
args.add("1");
if (null != additionalArgv) {