From 37cadedc8fd99b6e592f69b743810c2bebb3a8ab Mon Sep 17 00:00:00 2001 From: Andrew Bayer Date: Fri, 22 Jul 2011 20:03:24 +0000 Subject: [PATCH] MAPREDUCE-907. Sqoop should use more intelligent splits. Contributed by Aaron Kimball. From: Thomas White git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149828 13f79535-47bb-0310-9956-ffa450edef68 --- build.xml | 42 +++- ivy.xml | 12 ++ .../apache/hadoop/sqoop/ImportOptions.java | 45 ++++- src/java/org/apache/hadoop/sqoop/Sqoop.java | 23 ++- .../hadoop/sqoop/manager/OracleManager.java | 23 +++ .../hadoop/sqoop/manager/SqlManager.java | 20 +- .../sqoop/mapreduce/AutoProgressMapper.java | 185 ++++++++++++++++++ .../sqoop/mapreduce/DataDrivenImportJob.java | 176 +++++++++++++++++ .../mapreduce/RawKeyTextOutputFormat.java | 102 ++++++++++ .../sqoop/mapreduce/TextImportMapper.java | 46 +++++ .../org/apache/hadoop/sqoop/AllTests.java | 13 +- .../apache/hadoop/sqoop/TestAllTables.java | 7 +- .../apache/hadoop/sqoop/TestColumnTypes.java | 13 +- .../apache/hadoop/sqoop/TestMultiMaps.java | 175 +++++++++++++++++ .../{TestOrderBy.java => TestSplitBy.java} | 42 ++-- .../org/apache/hadoop/sqoop/TestWhere.java | 4 +- .../hadoop/sqoop/hive/TestHiveImport.java | 4 +- .../hadoop/sqoop/manager/LocalMySQLTest.java | 2 + .../hadoop/sqoop/manager/MySQLAuthTest.java | 2 + .../sqoop/manager/OracleManagerTest.java | 2 + .../hadoop/sqoop/manager/PostgresqlTest.java | 2 +- .../hadoop/sqoop/mapred/MapredTests.java | 37 ++++ .../sqoop/mapreduce/MapreduceTests.java | 37 ++++ .../sqoop/mapreduce/TestTextImportMapper.java | 79 ++++++++ .../hadoop/sqoop/orm/TestParseMethods.java | 7 +- .../sqoop/testutil/ImportJobTestCase.java | 13 +- 26 files changed, 1040 insertions(+), 73 deletions(-) create mode 100644 src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java create mode 100644 src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java create mode 100644 src/java/org/apache/hadoop/sqoop/mapreduce/RawKeyTextOutputFormat.java create mode 100644 src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java create mode 100644 src/test/org/apache/hadoop/sqoop/TestMultiMaps.java rename src/test/org/apache/hadoop/sqoop/{TestOrderBy.java => TestSplitBy.java} (80%) create mode 100644 src/test/org/apache/hadoop/sqoop/mapred/MapredTests.java create mode 100644 src/test/org/apache/hadoop/sqoop/mapreduce/MapreduceTests.java create mode 100644 src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java diff --git a/build.xml b/build.xml index d9e7f4d4..21bbeb78 100644 --- a/build.xml +++ b/build.xml @@ -26,6 +26,38 @@ to call at top-level: ant deploy-contrib compile-core-test + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -54,6 +86,11 @@ to call at top-level: ant deploy-contrib compile-core-test + + + + - + + + + diff --git a/ivy.xml b/ivy.xml index f3bd660a..2b2591d1 100644 --- a/ivy.xml +++ b/ivy.xml @@ -64,6 +64,18 @@ name="hsqldb" rev="${hsqldb.version}" conf="common->default"/> + + + + extends Mapper { + + public static final Log LOG = LogFactory.getLog(AutoProgressMapper.class.getName()); + + /** Total number of millis for which progress will be reported + by the auto-progress thread. If this is zero, then the auto-progress + thread will never voluntarily exit. + */ + private int maxProgressPeriod; + + /** Number of milliseconds to sleep for between loop iterations. Must be less + than report interval. + */ + private int sleepInterval; + + /** Number of milliseconds between calls to Reporter.progress(). Should be a multiple + of the sleepInterval. + */ + private int reportInterval; + + public static final String MAX_PROGRESS_PERIOD_KEY = "sqoop.mapred.auto.progress.max"; + public static final String SLEEP_INTERVAL_KEY = "sqoop.mapred.auto.progress.sleep"; + public static final String REPORT_INTERVAL_KEY = "sqoop.mapred.auto.progress.report"; + + // Sleep for 10 seconds at a time. + static final int DEFAULT_SLEEP_INTERVAL = 10000; + + // Report progress every 30 seconds. + static final int DEFAULT_REPORT_INTERVAL = 30000; + + // Disable max progress, by default. + static final int DEFAULT_MAX_PROGRESS = 0; + + private class ProgressThread extends Thread { + + private volatile boolean keepGoing; // while this is true, thread runs. + private Context context; + private long startTimeMillis; + private long lastReportMillis; + + public ProgressThread(final Context ctxt) { + this.context = ctxt; + this.keepGoing = true; + } + + public void signalShutdown() { + this.keepGoing = false; // volatile update. + this.interrupt(); + } + + public void run() { + this.lastReportMillis = System.currentTimeMillis(); + this.startTimeMillis = this.lastReportMillis; + + final long MAX_PROGRESS = AutoProgressMapper.this.maxProgressPeriod; + final long REPORT_INTERVAL = AutoProgressMapper.this.reportInterval; + final long SLEEP_INTERVAL = AutoProgressMapper.this.sleepInterval; + + // in a loop: + // * Check that we haven't run for too long (maxProgressPeriod) + // * If it's been a report interval since we last made progress, make more. + // * Sleep for a bit. + // * If the parent thread has signaled for exit, do so. + while (this.keepGoing) { + long curTimeMillis = System.currentTimeMillis(); + + if (MAX_PROGRESS != 0 && curTimeMillis - this.startTimeMillis > MAX_PROGRESS) { + this.keepGoing = false; + LOG.info("Auto-progress thread exiting after " + MAX_PROGRESS + " ms."); + break; + } + + if (curTimeMillis - this.lastReportMillis > REPORT_INTERVAL) { + // It's been a full report interval -- claim progress. + LOG.debug("Auto-progress thread reporting progress"); + this.context.progress(); + this.lastReportMillis = curTimeMillis; + } + + // Unless we got an interrupt while we were working, + // sleep a bit before doing more work. + if (!this.interrupted()) { + try { + Thread.sleep(SLEEP_INTERVAL); + } catch (InterruptedException ie) { + // we were notified on something; not necessarily an error. + } + } + } + + LOG.info("Auto-progress thread is finished. keepGoing=" + this.keepGoing); + } + } + + /** + * Set configuration parameters for the auto-progress thread. + */ + private final void configureAutoProgress(Configuration job) { + this.maxProgressPeriod = job.getInt(MAX_PROGRESS_PERIOD_KEY, DEFAULT_MAX_PROGRESS); + this.sleepInterval = job.getInt(SLEEP_INTERVAL_KEY, DEFAULT_SLEEP_INTERVAL); + this.reportInterval = job.getInt(REPORT_INTERVAL_KEY, DEFAULT_REPORT_INTERVAL); + + if (this.reportInterval < 1) { + LOG.warn("Invalid " + REPORT_INTERVAL_KEY + "; setting to " + DEFAULT_REPORT_INTERVAL); + this.reportInterval = DEFAULT_REPORT_INTERVAL; + } + + if (this.sleepInterval > this.reportInterval || this.sleepInterval < 1) { + LOG.warn("Invalid " + SLEEP_INTERVAL_KEY + "; setting to " + DEFAULT_SLEEP_INTERVAL); + this.sleepInterval = DEFAULT_SLEEP_INTERVAL; + } + + if (this.maxProgressPeriod < 0) { + LOG.warn("Invalid " + MAX_PROGRESS_PERIOD_KEY + "; setting to " + DEFAULT_MAX_PROGRESS); + this.maxProgressPeriod = DEFAULT_MAX_PROGRESS; + } + } + + + // map() method intentionally omitted; Mapper.map() is the identity mapper. + + + /** + * Run the mapping process for this task, wrapped in an auto-progress system. + */ + public void run(Context context) throws IOException, InterruptedException { + configureAutoProgress(context.getConfiguration()); + ProgressThread thread = this.new ProgressThread(context); + + try { + thread.setDaemon(true); + thread.start(); + + // use default run() method to actually drive the mapping. + super.run(context); + } finally { + // Tell the progress thread to exit.. + LOG.debug("Instructing auto-progress thread to quit."); + thread.signalShutdown(); + try { + // And wait for that to happen. + LOG.debug("Waiting for progress thread shutdown..."); + thread.join(); + LOG.debug("Progress thread shutdown detected."); + } catch (InterruptedException ie) { + LOG.warn("Interrupted when waiting on auto-progress thread: " + ie.toString()); + } + } + } + +} diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java new file mode 100644 index 00000000..6031a4d0 --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; +import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; + +import org.apache.hadoop.sqoop.ConnFactory; +import org.apache.hadoop.sqoop.ImportOptions; +import org.apache.hadoop.sqoop.manager.ConnManager; +import org.apache.hadoop.sqoop.orm.TableClassName; +import org.apache.hadoop.sqoop.util.ClassLoaderStack; +import org.apache.hadoop.sqoop.util.PerfCounters; + +/** + * Actually runs a jdbc import job using the ORM files generated by the sqoop.orm package. + * Uses DataDrivenDBInputFormat + */ +public class DataDrivenImportJob { + + public static final Log LOG = LogFactory.getLog(DataDrivenImportJob.class.getName()); + + private ImportOptions options; + + public DataDrivenImportJob(final ImportOptions opts) { + this.options = opts; + } + + /** + * Run an import job to read a table in to HDFS + * + * @param tableName the database table to read + * @param ormJarFile the Jar file to insert into the dcache classpath. (may be null) + * @param splitByCol the column of the database table to use to split the import + * @param conf A fresh Hadoop Configuration to use to build an MR job. + */ + public void runImport(String tableName, String ormJarFile, String splitByCol, + Configuration conf) throws IOException { + + LOG.info("Beginning data-driven import of " + tableName); + + String tableClassName = new TableClassName(options).getClassForTable(tableName); + + boolean isLocal = "local".equals(conf.get("mapred.job.tracker")); + ClassLoader prevClassLoader = null; + if (isLocal) { + // If we're using the LocalJobRunner, then instead of using the compiled jar file + // as the job source, we're running in the current thread. Push on another classloader + // that loads from that jar in addition to everything currently on the classpath. + prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile, tableClassName); + } + + try { + Job job = new Job(conf); + + // Set the external jar to use for the job. + job.getConfiguration().set("mapred.jar", ormJarFile); + + String hdfsWarehouseDir = options.getWarehouseDir(); + Path outputPath; + + if (null != hdfsWarehouseDir) { + Path hdfsWarehousePath = new Path(hdfsWarehouseDir); + hdfsWarehousePath.makeQualified(FileSystem.get(job.getConfiguration())); + outputPath = new Path(hdfsWarehousePath, tableName); + } else { + outputPath = new Path(tableName); + } + + if (options.getFileLayout() == ImportOptions.FileLayout.TextFile) { + job.setOutputFormatClass(RawKeyTextOutputFormat.class); + job.setMapperClass(TextImportMapper.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(NullWritable.class); + } else if (options.getFileLayout() == ImportOptions.FileLayout.SequenceFile) { + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setMapperClass(AutoProgressMapper.class); + SequenceFileOutputFormat.setCompressOutput(job, true); + SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); + job.getConfiguration().set("mapred.output.value.class", tableClassName); + } else { + LOG.warn("Unknown file layout specified: " + options.getFileLayout() + "; using text."); + } + + int numMapTasks = options.getNumMappers(); + if (numMapTasks < 1) { + numMapTasks = ImportOptions.DEFAULT_NUM_MAPPERS; + LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers."); + } + job.getConfiguration().setInt("mapred.map.tasks", numMapTasks); + job.setNumReduceTasks(0); + + job.setInputFormatClass(DataDrivenDBInputFormat.class); + + FileOutputFormat.setOutputPath(job, outputPath); + + ConnManager mgr = new ConnFactory(conf).getManager(options); + String username = options.getUsername(); + if (null == username || username.length() == 0) { + DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), + options.getConnectString()); + } else { + DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), + options.getConnectString(), username, options.getPassword()); + } + + String [] colNames = options.getColumns(); + if (null == colNames) { + colNames = mgr.getColumnNames(tableName); + } + + // It's ok if the where clause is null in DBInputFormat.setInput. + String whereClause = options.getWhereClause(); + + // We can't set the class properly in here, because we may not have the + // jar loaded in this JVM. So we start by calling setInput() with DBWritable, + // and then overriding the string manually. + DataDrivenDBInputFormat.setInput(job, DBWritable.class, tableName, whereClause, + splitByCol, colNames); + job.getConfiguration().set(DBConfiguration.INPUT_CLASS_PROPERTY, tableClassName); + + PerfCounters counters = new PerfCounters(); + counters.startClock(); + + try { + job.waitForCompletion(false); + } catch (InterruptedException ie) { + throw new IOException(ie); + } catch (ClassNotFoundException cnfe) { + throw new IOException(cnfe); + } + + counters.stopClock(); + counters.addBytes(job.getCounters().getGroup("FileSystemCounters") + .findCounter("HDFS_BYTES_WRITTEN").getValue()); + LOG.info("Transferred " + counters.toString()); + } finally { + if (isLocal && null != prevClassLoader) { + // unload the special classloader for this jar. + ClassLoaderStack.setCurrentClassLoader(prevClassLoader); + } + } + } +} diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/RawKeyTextOutputFormat.java b/src/java/org/apache/hadoop/sqoop/mapreduce/RawKeyTextOutputFormat.java new file mode 100644 index 00000000..c216ac60 --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/RawKeyTextOutputFormat.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.mapreduce; + +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.*; + +/** An {@link OutputFormat} that writes plain text files. + * Only writes the key. Does not write any delimiter/newline after the key. + */ +public class RawKeyTextOutputFormat extends FileOutputFormat { + + protected static class RawKeyRecordWriter extends RecordWriter { + private static final String utf8 = "UTF-8"; + + protected DataOutputStream out; + + public RawKeyRecordWriter(DataOutputStream out) { + this.out = out; + } + + /** + * Write the object to the byte stream, handling Text as a special + * case. + * @param o the object to print + * @throws IOException if the write throws, we pass it on + */ + private void writeObject(Object o) throws IOException { + if (o instanceof Text) { + Text to = (Text) o; + out.write(to.getBytes(), 0, to.getLength()); + } else { + out.write(o.toString().getBytes(utf8)); + } + } + + public synchronized void write(K key, V value) throws IOException { + writeObject(key); + } + + public synchronized void close(TaskAttemptContext context) throws IOException { + out.close(); + } + } + + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException { + boolean isCompressed = getCompressOutput(context); + Configuration conf = context.getConfiguration(); + String ext = ""; + CompressionCodec codec = null; + + if (isCompressed) { + // create the named codec + Class codecClass = + getOutputCompressorClass(context, GzipCodec.class); + codec = ReflectionUtils.newInstance(codecClass, conf); + + ext = codec.getDefaultExtension(); + } + + Path file = getDefaultWorkFile(context, ext); + FileSystem fs = file.getFileSystem(conf); + FSDataOutputStream fileOut = fs.create(file, false); + DataOutputStream ostream = fileOut; + + if (isCompressed) { + ostream = new DataOutputStream(codec.createOutputStream(fileOut)); + } + + return new RawKeyRecordWriter(ostream); + } +} + diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java b/src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java new file mode 100644 index 00000000..3970105d --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; + +/** + * Converts an input record into a string representation and emit it. + */ +public class TextImportMapper + extends AutoProgressMapper { + + private Text outkey; + + public TextImportMapper() { + outkey = new Text(); + } + + public void map(LongWritable key, DBWritable val, Context context) + throws IOException, InterruptedException { + outkey.set(val.toString()); + context.write(outkey, NullWritable.get()); + } +} diff --git a/src/test/org/apache/hadoop/sqoop/AllTests.java b/src/test/org/apache/hadoop/sqoop/AllTests.java index 213fb0ef..f396fd39 100644 --- a/src/test/org/apache/hadoop/sqoop/AllTests.java +++ b/src/test/org/apache/hadoop/sqoop/AllTests.java @@ -23,7 +23,8 @@ import org.apache.hadoop.sqoop.lib.TestRecordParser; import org.apache.hadoop.sqoop.manager.TestHsqldbManager; import org.apache.hadoop.sqoop.manager.TestSqlManager; -import org.apache.hadoop.sqoop.mapred.TestAutoProgressMapRunner; +import org.apache.hadoop.sqoop.mapred.MapredTests; +import org.apache.hadoop.sqoop.mapreduce.MapreduceTests; import org.apache.hadoop.sqoop.orm.TestClassWriter; import org.apache.hadoop.sqoop.orm.TestParseMethods; @@ -32,24 +33,22 @@ /** * All tests for Sqoop (org.apache.hadoop.sqoop) - * - * */ -public final class AllTests { +public final class AllTests { private AllTests() { } public static Test suite() { TestSuite suite = new TestSuite("Tests for org.apache.hadoop.sqoop"); - suite.addTestSuite(TestAutoProgressMapRunner.class); suite.addTestSuite(TestAllTables.class); suite.addTestSuite(TestHsqldbManager.class); suite.addTestSuite(TestSqlManager.class); suite.addTestSuite(TestClassWriter.class); suite.addTestSuite(TestColumnTypes.class); suite.addTestSuite(TestMultiCols.class); - suite.addTestSuite(TestOrderBy.class); + suite.addTestSuite(TestMultiMaps.class); + suite.addTestSuite(TestSplitBy.class); suite.addTestSuite(TestWhere.class); suite.addTestSuite(TestHiveImport.class); suite.addTestSuite(TestRecordParser.class); @@ -58,6 +57,8 @@ public static Test suite() { suite.addTestSuite(TestParseMethods.class); suite.addTestSuite(TestConnFactory.class); suite.addTest(ThirdPartyTests.suite()); + suite.addTest(MapredTests.suite()); + suite.addTest(MapreduceTests.suite()); return suite; } diff --git a/src/test/org/apache/hadoop/sqoop/TestAllTables.java b/src/test/org/apache/hadoop/sqoop/TestAllTables.java index de86e076..2f05e09d 100644 --- a/src/test/org/apache/hadoop/sqoop/TestAllTables.java +++ b/src/test/org/apache/hadoop/sqoop/TestAllTables.java @@ -36,9 +36,6 @@ /** * Test the --all-tables functionality that can import multiple tables. - * ; - * - * */ public class TestAllTables extends ImportJobTestCase { @@ -63,6 +60,8 @@ public class TestAllTables extends ImportJobTestCase { args.add(getWarehouseDir()); args.add("--connect"); args.add(HsqldbTestServer.getUrl()); + args.add("--num-mappers"); + args.add("1"); return args.toArray(new String[0]); } @@ -107,7 +106,7 @@ public void testMultiTableImport() throws IOException { Path warehousePath = new Path(this.getWarehouseDir()); for (String tableName : this.tableNames) { Path tablePath = new Path(warehousePath, tableName); - Path filePath = new Path(tablePath, "part-00000"); + Path filePath = new Path(tablePath, "part-m-00000"); // dequeue the expected value for this table. This // list has the same order as the tableNames list. diff --git a/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java b/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java index 82d8e79c..3a5e1ddf 100644 --- a/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java +++ b/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java @@ -36,9 +36,6 @@ * write(DataOutput), readFields(DataInput) * - And optionally, that we can push to the database: * write(PreparedStatement) - * - * - * */ public class TestColumnTypes extends ImportJobTestCase { @@ -217,14 +214,24 @@ public void testTimestamp1() { @Test public void testTimestamp2() { + try { + LOG.debug("Beginning testTimestamp2"); verifyType("TIMESTAMP", "'2009-04-24 18:24:00.0002'", "2009-04-24 18:24:00.000200000", "2009-04-24 18:24:00.0002"); + } finally { + LOG.debug("End testTimestamp2"); + } } @Test public void testTimestamp3() { + try { + LOG.debug("Beginning testTimestamp3"); verifyType("TIMESTAMP", "null", null); + } finally { + LOG.debug("End testTimestamp3"); + } } @Test diff --git a/src/test/org/apache/hadoop/sqoop/TestMultiMaps.java b/src/test/org/apache/hadoop/sqoop/TestMultiMaps.java new file mode 100644 index 00000000..f66af202 --- /dev/null +++ b/src/test/org/apache/hadoop/sqoop/TestMultiMaps.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.util.ReflectionUtils; + +import org.apache.hadoop.sqoop.ImportOptions.InvalidOptionsException; +import org.apache.hadoop.sqoop.orm.CompilationManager; +import org.apache.hadoop.sqoop.testutil.HsqldbTestServer; +import org.apache.hadoop.sqoop.testutil.ImportJobTestCase; +import org.apache.hadoop.sqoop.testutil.SeqFileReader; +import org.apache.hadoop.sqoop.util.ClassLoaderStack; + +/** + * Test that using multiple mapper splits works. + */ +public class TestMultiMaps extends ImportJobTestCase { + + /** + * Create the argv to pass to Sqoop + * @return the argv as an array of strings. + */ + private String [] getArgv(boolean includeHadoopFlags, String [] colNames, String splitByCol) { + String columnsString = ""; + for (String col : colNames) { + columnsString += col + ","; + } + + ArrayList args = new ArrayList(); + + if (includeHadoopFlags) { + args.add("-D"); + args.add("mapred.job.tracker=local"); + args.add("-D"); + args.add("fs.default.name=file:///"); + } + + args.add("--table"); + args.add(HsqldbTestServer.getTableName()); + args.add("--columns"); + args.add(columnsString); + args.add("--split-by"); + args.add(splitByCol); + args.add("--warehouse-dir"); + args.add(getWarehouseDir()); + args.add("--connect"); + args.add(HsqldbTestServer.getUrl()); + args.add("--as-sequencefile"); + args.add("--num-mappers"); + args.add("2"); + + return args.toArray(new String[0]); + } + + // this test just uses the two int table. + protected String getTableName() { + return HsqldbTestServer.getTableName(); + } + + /** @return a list of Path objects for each data file */ + protected List getDataFilePaths() throws IOException { + List paths = new ArrayList(); + Configuration conf = new Configuration(); + conf.set("fs.default.name", "file:///"); + FileSystem fs = FileSystem.get(conf); + + FileStatus [] stats = fs.listStatus(getTablePath()); + for (FileStatus stat : stats) { + paths.add(stat.getPath()); + } + + return paths; + } + + /** + * Given a comma-delimited list of integers, grab and parse the first int + * @param str a comma-delimited list of values, the first of which is an int. + * @return the first field in the string, cast to int + */ + private int getFirstInt(String str) { + String [] parts = str.split(","); + return Integer.parseInt(parts[0]); + } + + public void runMultiMapTest(String splitByCol, int expectedSum) + throws IOException { + + String [] columns = HsqldbTestServer.getFieldNames(); + ClassLoader prevClassLoader = null; + SequenceFile.Reader reader = null; + + String [] argv = getArgv(true, columns, splitByCol); + runImport(argv); + try { + ImportOptions opts = new ImportOptions(); + opts.parse(getArgv(false, columns, splitByCol)); + + CompilationManager compileMgr = new CompilationManager(opts); + String jarFileName = compileMgr.getJarFilename(); + + prevClassLoader = ClassLoaderStack.addJarFile(jarFileName, getTableName()); + + List paths = getDataFilePaths(); + Configuration conf = new Configuration(); + int curSum = 0; + + assertTrue("Found only " + paths.size() + " path(s); expected > 1.", paths.size() > 1); + + // We expect multiple files. We need to open all the files and sum up the + // first column across all of them. + for (Path p : paths) { + reader = SeqFileReader.getSeqFileReader(p.toString()); + + // here we can actually instantiate (k, v) pairs. + Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf); + Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf); + + // We know that these values are two ints separated by a ',' character. + // Since this is all dynamic, though, we don't want to actually link against + // the class and use its methods. So we just parse this back into int fields manually. + // Sum them up and ensure that we get the expected total for the first column, to + // verify that we got all the results from the db into the file. + + // now sum up everything in the file. + while (reader.next(key) != null) { + reader.getCurrentValue(val); + curSum += getFirstInt(val.toString()); + } + + IOUtils.closeStream(reader); + reader = null; + } + + assertEquals("Total sum of first db column mismatch", expectedSum, curSum); + } catch (InvalidOptionsException ioe) { + fail(ioe.toString()); + } finally { + IOUtils.closeStream(reader); + + if (null != prevClassLoader) { + ClassLoaderStack.setCurrentClassLoader(prevClassLoader); + } + } + } + + public void testSplitByFirstCol() throws IOException { + runMultiMapTest("INTFIELD1", HsqldbTestServer.getFirstColSum()); + } +} diff --git a/src/test/org/apache/hadoop/sqoop/TestOrderBy.java b/src/test/org/apache/hadoop/sqoop/TestSplitBy.java similarity index 80% rename from src/test/org/apache/hadoop/sqoop/TestOrderBy.java rename to src/test/org/apache/hadoop/sqoop/TestSplitBy.java index d39d29eb..b4e11264 100644 --- a/src/test/org/apache/hadoop/sqoop/TestOrderBy.java +++ b/src/test/org/apache/hadoop/sqoop/TestSplitBy.java @@ -34,15 +34,15 @@ import org.apache.hadoop.sqoop.util.ClassLoaderStack; /** - * Test that --order-by works + * Test that --split-by works */ -public class TestOrderBy extends ImportJobTestCase { +public class TestSplitBy extends ImportJobTestCase { /** * Create the argv to pass to Sqoop * @return the argv as an array of strings. */ - private String [] getArgv(boolean includeHadoopFlags, String [] colNames, String orderByCol) { + private String [] getArgv(boolean includeHadoopFlags, String [] colNames, String splitByCol) { String columnsString = ""; for (String col : colNames) { columnsString += col + ","; @@ -63,13 +63,15 @@ public class TestOrderBy extends ImportJobTestCase { args.add(HsqldbTestServer.getTableName()); args.add("--columns"); args.add(columnsString); - args.add("--order-by"); - args.add(orderByCol); + args.add("--split-by"); + args.add(splitByCol); args.add("--warehouse-dir"); args.add(getWarehouseDir()); args.add("--connect"); args.add(HsqldbTestServer.getUrl()); args.add("--as-sequencefile"); + args.add("--num-mappers"); + args.add("1"); return args.toArray(new String[0]); } @@ -90,18 +92,18 @@ private int getFirstInt(String str) { return Integer.parseInt(parts[0]); } - public void runOrderByTest(String orderByCol, String firstValStr, int expectedSum) + public void runSplitByTest(String splitByCol, int expectedSum) throws IOException { String [] columns = HsqldbTestServer.getFieldNames(); ClassLoader prevClassLoader = null; SequenceFile.Reader reader = null; - String [] argv = getArgv(true, columns, orderByCol); + String [] argv = getArgv(true, columns, splitByCol); runImport(argv); try { ImportOptions opts = new ImportOptions(); - opts.parse(getArgv(false, columns, orderByCol)); + opts.parse(getArgv(false, columns, splitByCol)); CompilationManager compileMgr = new CompilationManager(opts); String jarFileName = compileMgr.getJarFilename(); @@ -115,22 +117,14 @@ public void runOrderByTest(String orderByCol, String firstValStr, int expectedSu Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf); Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf); - if (reader.next(key) == null) { - fail("Empty SequenceFile during import"); - } - - // make sure that the value we think should be at the top, is. - reader.getCurrentValue(val); - assertEquals("Invalid ordering within sorted SeqFile", firstValStr, val.toString()); - // We know that these values are two ints separated by a ',' character. // Since this is all dynamic, though, we don't want to actually link against // the class and use its methods. So we just parse this back into int fields manually. // Sum them up and ensure that we get the expected total for the first column, to // verify that we got all the results from the db into the file. - int curSum = getFirstInt(val.toString()); - // now sum up everything else in the file. + // Sum up everything in the file. + int curSum = 0; while (reader.next(key) != null) { reader.getCurrentValue(val); curSum += getFirstInt(val.toString()); @@ -148,13 +142,13 @@ public void runOrderByTest(String orderByCol, String firstValStr, int expectedSu } } - public void testOrderByFirstCol() throws IOException { - String orderByCol = "INTFIELD1"; - runOrderByTest(orderByCol, "1,8\n", HsqldbTestServer.getFirstColSum()); + public void testSplitByFirstCol() throws IOException { + String splitByCol = "INTFIELD1"; + runSplitByTest(splitByCol, HsqldbTestServer.getFirstColSum()); } - public void testOrderBySecondCol() throws IOException { - String orderByCol = "INTFIELD2"; - runOrderByTest(orderByCol, "7,2\n", HsqldbTestServer.getFirstColSum()); + public void testSplitBySecondCol() throws IOException { + String splitByCol = "INTFIELD2"; + runSplitByTest(splitByCol, HsqldbTestServer.getFirstColSum()); } } diff --git a/src/test/org/apache/hadoop/sqoop/TestWhere.java b/src/test/org/apache/hadoop/sqoop/TestWhere.java index a2ac80d3..669bd186 100644 --- a/src/test/org/apache/hadoop/sqoop/TestWhere.java +++ b/src/test/org/apache/hadoop/sqoop/TestWhere.java @@ -68,13 +68,15 @@ public class TestWhere extends ImportJobTestCase { args.add(columnsString); args.add("--where"); args.add(whereClause); - args.add("--order-by"); + args.add("--split-by"); args.add("INTFIELD1"); args.add("--warehouse-dir"); args.add(getWarehouseDir()); args.add("--connect"); args.add(HsqldbTestServer.getUrl()); args.add("--as-sequencefile"); + args.add("--num-mappers"); + args.add("1"); return args.toArray(new String[0]); } diff --git a/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java b/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java index 9ff9459c..96581a16 100644 --- a/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java +++ b/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java @@ -61,8 +61,10 @@ public class TestHiveImport extends ImportJobTestCase { args.add("--connect"); args.add(HsqldbTestServer.getUrl()); args.add("--hive-import"); - args.add("--order-by"); + args.add("--split-by"); args.add(getColNames()[0]); + args.add("--num-mappers"); + args.add("1"); if (null != moreArgs) { for (String arg: moreArgs) { diff --git a/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java b/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java index fc72aa86..6f98cd09 100644 --- a/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java +++ b/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java @@ -198,6 +198,8 @@ private String getCurrentUser() { args.add(getCurrentUser()); args.add("--where"); args.add("id > 1"); + args.add("--num-mappers"); + args.add("1"); if (mysqlOutputDelims) { args.add("--mysql-delimiters"); diff --git a/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java b/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java index 49166b59..5d972b85 100644 --- a/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java +++ b/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java @@ -150,6 +150,8 @@ public void tearDown() { args.add("--password"); args.add(AUTH_TEST_PASS); args.add("--mysql-delimiters"); + args.add("--num-mappers"); + args.add("1"); return args.toArray(new String[0]); } diff --git a/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java b/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java index 289ca2e7..fdccd07e 100644 --- a/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java +++ b/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java @@ -156,6 +156,8 @@ public void tearDown() { args.add(ORACLE_USER_NAME); args.add("--password"); args.add(ORACLE_USER_PASS); + args.add("--num-mappers"); + args.add("1"); return args.toArray(new String[0]); } diff --git a/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java b/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java index 01b63a8d..dde921f2 100644 --- a/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java +++ b/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java @@ -181,7 +181,7 @@ private void doImportAndVerify(boolean isDirect, String [] expectedResults) if (isDirect) { filePath = new Path(tablePath, "data-00000"); } else { - filePath = new Path(tablePath, "part-00000"); + filePath = new Path(tablePath, "part-m-00000"); } File tableFile = new File(tablePath.toString()); diff --git a/src/test/org/apache/hadoop/sqoop/mapred/MapredTests.java b/src/test/org/apache/hadoop/sqoop/mapred/MapredTests.java new file mode 100644 index 00000000..948dddc0 --- /dev/null +++ b/src/test/org/apache/hadoop/sqoop/mapred/MapredTests.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.mapred; + +import junit.framework.Test; +import junit.framework.TestSuite; + +/** + * All tests for Sqoop old mapred-api (org.apache.hadoop.sqoop.mapred) + */ +public final class MapredTests { + + private MapredTests() { } + + public static Test suite() { + TestSuite suite = new TestSuite("Tests for org.apache.hadoop.sqoop.mapred"); + suite.addTestSuite(TestAutoProgressMapRunner.class); + return suite; + } +} + diff --git a/src/test/org/apache/hadoop/sqoop/mapreduce/MapreduceTests.java b/src/test/org/apache/hadoop/sqoop/mapreduce/MapreduceTests.java new file mode 100644 index 00000000..6ec92703 --- /dev/null +++ b/src/test/org/apache/hadoop/sqoop/mapreduce/MapreduceTests.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.mapreduce; + +import junit.framework.Test; +import junit.framework.TestSuite; + +/** + * All tests for Sqoop new mapreduce-api (org.apache.hadoop.sqoop.mapreduce) + */ +public final class MapreduceTests { + + private MapreduceTests() { } + + public static Test suite() { + TestSuite suite = new TestSuite("Tests for org.apache.hadoop.sqoop.mapreduce"); + suite.addTestSuite(TestTextImportMapper.class); + return suite; + } +} + diff --git a/src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java b/src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java new file mode 100644 index 00000000..ad330187 --- /dev/null +++ b/src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.hadoop.mrunit.mapreduce.MapDriver; + +import junit.framework.TestCase; + +/** + * Test the TextImportMapper + */ +public class TestTextImportMapper extends TestCase { + + + static class DummyDBWritable implements DBWritable { + long field; + + public DummyDBWritable(final long val) { + this.field = val; + } + + public void readFields(DataInput in) throws IOException { + field = in.readLong(); + } + + public void write(DataOutput out) throws IOException { + out.writeLong(field); + } + + public void readFields(ResultSet rs) throws SQLException { + field = rs.getLong(1); + } + + public void write(PreparedStatement s) throws SQLException { + s.setLong(1, field); + } + + public String toString() { + return "" + field; + } + } + + public void testTextImport() { + TextImportMapper m = new TextImportMapper(); + MapDriver driver = + new MapDriver(m); + + driver.withInput(new LongWritable(0), new DummyDBWritable(42)) + .withOutput(new Text("42"), NullWritable.get()) + .runTest(); + } +} diff --git a/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java b/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java index 014161b8..fdc4f218 100644 --- a/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java +++ b/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java @@ -73,8 +73,8 @@ public class TestParseMethods extends ImportJobTestCase { args.add("--connect"); args.add(HsqldbTestServer.getUrl()); args.add("--as-textfile"); - args.add("--order-by"); - args.add("DATA_COL0"); // always order by first column. + args.add("--split-by"); + args.add("DATA_COL0"); // always split by first column. args.add("--fields-terminated-by"); args.add(fieldTerminator); args.add("--lines-terminated-by"); @@ -87,7 +87,8 @@ public class TestParseMethods extends ImportJobTestCase { args.add("--optionally-enclosed-by"); } args.add(encloser); - + args.add("--num-mappers"); + args.add("1"); return args.toArray(new String[0]); } diff --git a/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java b/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java index 34165a62..a3090119 100644 --- a/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java +++ b/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java @@ -277,7 +277,7 @@ protected void verifyReadback(int colNum, String expectedVal) { colNames = getColNames(); } - String orderByCol = colNames[0]; + String splitByCol = colNames[0]; String columnsString = ""; for (String col : colNames) { columnsString += col + ","; @@ -298,13 +298,15 @@ protected void verifyReadback(int colNum, String expectedVal) { args.add(getTableName()); args.add("--columns"); args.add(columnsString); - args.add("--order-by"); - args.add(orderByCol); + args.add("--split-by"); + args.add(splitByCol); args.add("--warehouse-dir"); args.add(getWarehouseDir()); args.add("--connect"); args.add(HsqldbTestServer.getUrl()); args.add("--as-sequencefile"); + args.add("--num-mappers"); + args.add("1"); return args.toArray(new String[0]); } @@ -316,7 +318,7 @@ protected Path getTablePath() { } protected Path getDataFilePath() { - return new Path(getTablePath(), "part-00000"); + return new Path(getTablePath(), "part-m-00000"); } protected void removeTableDir() { @@ -350,8 +352,7 @@ protected void verifyImport(String expectedVal, String [] importCols) { ret = ToolRunner.run(importer, getArgv(true, importCols)); } catch (Exception e) { LOG.error("Got exception running Sqoop: " + e.toString()); - e.printStackTrace(); - ret = 1; + throw new RuntimeException(e); } // expect a successful return.