From df76e995e8fe7e91c93db378abc27ea9b063d1a0 Mon Sep 17 00:00:00 2001 From: Andrew Bayer Date: Fri, 22 Jul 2011 20:03:38 +0000 Subject: [PATCH] Users can precisely control export parallelism. Uses CombineFileInputFormat to run exports over a target number of mappers independent of the number of input files. From: Aaron Kimball git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149869 13f79535-47bb-0310-9956-ffa450edef68 --- .../mapreduce/CombineShimRecordReader.java | 134 ++++++++++++ .../sqoop/mapreduce/ExportInputFormat.java | 194 ++++++++++++++++++ .../hadoop/sqoop/mapreduce/ExportJob.java | 73 +------ .../org/apache/hadoop/sqoop/TestExport.java | 39 +++- .../sqoop/testutil/ExportJobTestCase.java | 2 + 5 files changed, 370 insertions(+), 72 deletions(-) create mode 100644 src/java/org/apache/hadoop/sqoop/mapreduce/CombineShimRecordReader.java create mode 100644 src/java/org/apache/hadoop/sqoop/mapreduce/ExportInputFormat.java diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/CombineShimRecordReader.java b/src/java/org/apache/hadoop/sqoop/mapreduce/CombineShimRecordReader.java new file mode 100644 index 00000000..ee2a724a --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/CombineShimRecordReader.java @@ -0,0 +1,134 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * RecordReader that CombineFileRecordReader can instantiate, which itself + * translates a CombineFileSplit into a FileSplit. + */ +public class CombineShimRecordReader + extends RecordReader { + + public static final Log LOG = + LogFactory.getLog(CombineShimRecordReader.class.getName()); + + private CombineFileSplit split; + private TaskAttemptContext context; + private int index; + private RecordReader rr; + + /** + * Constructor invoked by CombineFileRecordReader that identifies part of a + * CombineFileSplit to use. + */ + public CombineShimRecordReader(CombineFileSplit split, + TaskAttemptContext context, Integer index) + throws IOException, InterruptedException { + this.index = index; + this.split = (CombineFileSplit) split; + this.context = context; + + createChildReader(); + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + this.split = (CombineFileSplit) split; + this.context = context; + + if (null == rr) { + createChildReader(); + } + + FileSplit fileSplit = new FileSplit(this.split.getPath(index), + this.split.getOffset(index), this.split.getLength(index), + this.split.getLocations()); + this.rr.initialize(fileSplit, context); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return rr.getProgress(); + } + + @Override + public void close() throws IOException { + if (null != rr) { + rr.close(); + rr = null; + } + } + + @Override + public LongWritable getCurrentKey() + throws IOException, InterruptedException { + return rr.getCurrentKey(); + } + + @Override + public Object getCurrentValue() + throws IOException, InterruptedException { + return rr.getCurrentValue(); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return rr.nextKeyValue(); + } + + /** + * Actually instantiate the user's chosen RecordReader implementation. + */ + @SuppressWarnings("unchecked") + private void createChildReader() throws IOException, InterruptedException { + LOG.debug("ChildSplit operates on: " + split.getPath(index)); + + Configuration conf = context.getConfiguration(); + + // Determine the file format we're reading. + Class rrClass; + if (ExportInputFormat.isSequenceFiles(conf, split.getPath(index))) { + rrClass = SequenceFileRecordReader.class; + } else { + rrClass = LineRecordReader.class; + } + + // Create the appropriate record reader. + this.rr = (RecordReader) + ReflectionUtils.newInstance(rrClass, conf); + } +} diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/ExportInputFormat.java b/src/java/org/apache/hadoop/sqoop/mapreduce/ExportInputFormat.java new file mode 100644 index 00000000..e86e0fe9 --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/ExportInputFormat.java @@ -0,0 +1,194 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.mapreduce; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; + +/** + * InputFormat that generates a user-defined number of splits to inject data + * into the database. + */ +public class ExportInputFormat + extends CombineFileInputFormat { + + public static final Log LOG = + LogFactory.getLog(ExportInputFormat.class.getName()); + + /** How many map tasks to use for export */ + public static final String EXPORT_MAP_TASKS_KEY = + "sqoop.mapreduce.export.map.tasks"; + + public static final int DEFAULT_NUM_MAP_TASKS = 4; + + public ExportInputFormat() { + } + + /** + * @return true if p is a SequenceFile, or a directory containing + * SequenceFiles. + */ + public static boolean isSequenceFiles(Configuration conf, Path p) + throws IOException { + FileSystem fs = p.getFileSystem(conf); + + try { + FileStatus stat = fs.getFileStatus(p); + + if (null == stat) { + // Couldn't get the item. + LOG.warn("Input path " + p + " does not exist"); + return false; + } + + if (stat.isDir()) { + FileStatus [] subitems = fs.listStatus(p); + if (subitems == null || subitems.length == 0) { + LOG.warn("Input path " + p + " contains no files"); + return false; // empty dir. + } + + // Pick a random child entry to examine instead. + stat = subitems[0]; + } + + if (null == stat) { + LOG.warn("null FileStatus object in isSequenceFiles(); assuming false."); + return false; + } + + Path target = stat.getPath(); + // Test target's header to see if it contains magic numbers indicating it's + // a SequenceFile. + byte [] header = new byte[3]; + FSDataInputStream is = null; + try { + is = fs.open(target); + is.readFully(header); + } catch (IOException ioe) { + // Error reading header or EOF; assume not a SequenceFile. + LOG.warn("IOException checking SequenceFile header: " + ioe); + return false; + } finally { + try { + if (null != is) { + is.close(); + } + } catch (IOException ioe) { + // ignore; closing. + LOG.warn("IOException closing input stream: " + ioe + "; ignoring."); + } + } + + // Return true (isSequenceFile) iff the magic number sticks. + return header[0] == 'S' && header[1] == 'E' && header[2] == 'Q'; + } catch (FileNotFoundException fnfe) { + LOG.warn("Input path " + p + " does not exist"); + return false; // doesn't exist! + } + } + + /** + * @return the number of bytes across all files in the job. + */ + private long getJobSize(JobContext job) throws IOException { + List stats = listStatus(job); + long count = 0; + for (FileStatus stat : stats) { + count += stat.getLen(); + } + + return count; + } + + @Override + public List getSplits(JobContext job) throws IOException { + // Set the max split size based on the number of map tasks we want. + long numTasks = getNumMapTasks(job); + long numFileBytes = getJobSize(job); + long maxSplitSize = numFileBytes / numTasks; + + setMaxSplitSize(maxSplitSize); + + LOG.debug("Target numMapTasks=" + numTasks); + LOG.debug("Total input bytes=" + numFileBytes); + LOG.debug("maxSplitSize=" + maxSplitSize); + + List splits = super.getSplits(job); + + if (LOG.isDebugEnabled()) { + LOG.debug("Generated splits:"); + for (InputSplit split : splits) { + LOG.debug(" " + split); + } + } + return splits; + } + + @Override + @SuppressWarnings("unchecked") + public RecordReader createRecordReader( + InputSplit split, TaskAttemptContext context) throws IOException { + + CombineFileSplit combineSplit = (CombineFileSplit) split; + + // Use CombineFileRecordReader since this can handle CombineFileSplits + // and instantiate another RecordReader in a loop; do this with the + // CombineShimRecordReader. + RecordReader rr = new CombineFileRecordReader(combineSplit, context, + CombineShimRecordReader.class); + + return rr; + } + + /** + * Allows the user to control the number of map tasks used for this + * export job. + */ + public static void setNumMapTasks(JobContext job, int numTasks) { + job.getConfiguration().setInt(EXPORT_MAP_TASKS_KEY, numTasks); + } + + /** + * @return the number of map tasks to use in this export job. + */ + public static int getNumMapTasks(JobContext job) { + return job.getConfiguration().getInt(EXPORT_MAP_TASKS_KEY, + DEFAULT_NUM_MAP_TASKS); + } + +} diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJob.java b/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJob.java index a0092539..e8cacf25 100644 --- a/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJob.java +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJob.java @@ -63,70 +63,6 @@ public ExportJob(final ExportJobContext ctxt) { this.context = ctxt; } - /** - * @return true if p is a SequenceFile, or a directory containing - * SequenceFiles. - */ - private boolean isSequenceFiles(Path p) throws IOException { - Configuration conf = context.getOptions().getConf(); - FileSystem fs = p.getFileSystem(conf); - - try { - FileStatus stat = fs.getFileStatus(p); - - if (null == stat) { - // Couldn't get the item. - LOG.warn("Input path " + p + " does not exist"); - return false; - } - - if (stat.isDir()) { - FileStatus [] subitems = fs.listStatus(p); - if (subitems == null || subitems.length == 0) { - LOG.warn("Input path " + p + " contains no files"); - return false; // empty dir. - } - - // Pick a random child entry to examine instead. - stat = subitems[0]; - } - - if (null == stat) { - LOG.warn("null FileStatus object in isSequenceFiles(); assuming false."); - return false; - } - - Path target = stat.getPath(); - // Test target's header to see if it contains magic numbers indicating it's - // a SequenceFile. - byte [] header = new byte[3]; - FSDataInputStream is = null; - try { - is = fs.open(target); - is.readFully(header); - } catch (IOException ioe) { - // Error reading header or EOF; assume not a SequenceFile. - LOG.warn("IOException checking SequenceFile header: " + ioe); - return false; - } finally { - try { - if (null != is) { - is.close(); - } - } catch (IOException ioe) { - // ignore; closing. - LOG.warn("IOException closing input stream: " + ioe + "; ignoring."); - } - } - - // Return true (isSequenceFile) iff the magic number sticks. - return header[0] == 'S' && header[1] == 'E' && header[2] == 'Q'; - } catch (FileNotFoundException fnfe) { - LOG.warn("Input path " + p + " does not exist"); - return false; // doesn't exist! - } - } - /** * Run an export job to dump a table from HDFS to a database * @throws IOException if the export job encounters an IO error @@ -161,16 +97,19 @@ public void runExport() throws ExportException, IOException { Path inputPath = new Path(context.getOptions().getExportDir()); inputPath = inputPath.makeQualified(FileSystem.get(conf)); - if (isSequenceFiles(inputPath)) { - job.setInputFormatClass(SequenceFileInputFormat.class); + boolean isSeqFiles = ExportInputFormat.isSequenceFiles( + context.getOptions().getConf(), inputPath); + + if (isSeqFiles) { job.setMapperClass(SequenceFileExportMapper.class); } else { - job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(TextExportMapper.class); } + job.setInputFormatClass(ExportInputFormat.class); FileInputFormat.addInputPath(job, inputPath); job.setNumReduceTasks(0); + ExportInputFormat.setNumMapTasks(job, options.getNumMappers()); // Concurrent writes of the same records would be problematic. job.setMapSpeculativeExecution(false); diff --git a/src/test/org/apache/hadoop/sqoop/TestExport.java b/src/test/org/apache/hadoop/sqoop/TestExport.java index f8dda2f3..a5b36956 100644 --- a/src/test/org/apache/hadoop/sqoop/TestExport.java +++ b/src/test/org/apache/hadoop/sqoop/TestExport.java @@ -305,15 +305,44 @@ private void assertColMinAndMax(String colName, ColumnGenerator generator) assertColValForRowId(maxId, colName, expectedMax); } + private void multiFileTest(int numFiles, int recordsPerMap, int numMaps) + throws IOException, SQLException { + + final int TOTAL_RECORDS = numFiles * recordsPerMap; + + try { + LOG.info("Beginning test: numFiles=" + numFiles + "; recordsPerMap=" + + recordsPerMap + "; numMaps=" + numMaps); + + for (int i = 0; i < numFiles; i++) { + createTextFile(i, recordsPerMap, false); + } + + createTable(); + runExport(getArgv(true, "-m", "" + numMaps)); + verifyExport(TOTAL_RECORDS); + } finally { + LOG.info("multi-file test complete"); + } + } + /** Export 10 rows, make sure they load in correctly */ public void testTextExport() throws IOException, SQLException { + multiFileTest(1, 10, 1); + } - final int TOTAL_RECORDS = 10; + /** Make sure we can use CombineFileInputFormat to handle multiple + * files in a single mapper. + */ + public void testMultiFilesOneMapper() throws IOException, SQLException { + multiFileTest(2, 10, 1); + } - createTextFile(0, TOTAL_RECORDS, false); - createTable(); - runExport(getArgv(true)); - verifyExport(TOTAL_RECORDS); + /** Make sure we can use CombineFileInputFormat to handle multiple + * files and multiple maps + */ + public void testMultiFilesMultiMaps() throws IOException, SQLException { + multiFileTest(2, 10, 2); } /** Export 10 rows from gzipped text files. */ diff --git a/src/test/org/apache/hadoop/sqoop/testutil/ExportJobTestCase.java b/src/test/org/apache/hadoop/sqoop/testutil/ExportJobTestCase.java index 072457b6..2b36d199 100644 --- a/src/test/org/apache/hadoop/sqoop/testutil/ExportJobTestCase.java +++ b/src/test/org/apache/hadoop/sqoop/testutil/ExportJobTestCase.java @@ -66,6 +66,8 @@ protected String getTablePrefix() { args.add("\\t"); args.add("--lines-terminated-by"); args.add("\\n"); + args.add("-m"); + args.add("1"); if (null != additionalArgv) {