diff --git a/build.xml b/build.xml
index d9e7f4d4..21bbeb78 100644
--- a/build.xml
+++ b/build.xml
@@ -26,6 +26,38 @@ to call at top-level: ant deploy-contrib compile-core-test
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -54,6 +86,11 @@ to call at top-level: ant deploy-contrib compile-core-test
+
+
+
+
-
+
+
+
+
diff --git a/ivy.xml b/ivy.xml
index f3bd660a..2b2591d1 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -64,6 +64,18 @@
name="hsqldb"
rev="${hsqldb.version}"
conf="common->default"/>
+
+
+
+ extends Mapper {
+
+ public static final Log LOG = LogFactory.getLog(AutoProgressMapper.class.getName());
+
+ /** Total number of millis for which progress will be reported
+ by the auto-progress thread. If this is zero, then the auto-progress
+ thread will never voluntarily exit.
+ */
+ private int maxProgressPeriod;
+
+ /** Number of milliseconds to sleep for between loop iterations. Must be less
+ than report interval.
+ */
+ private int sleepInterval;
+
+ /** Number of milliseconds between calls to Reporter.progress(). Should be a multiple
+ of the sleepInterval.
+ */
+ private int reportInterval;
+
+ public static final String MAX_PROGRESS_PERIOD_KEY = "sqoop.mapred.auto.progress.max";
+ public static final String SLEEP_INTERVAL_KEY = "sqoop.mapred.auto.progress.sleep";
+ public static final String REPORT_INTERVAL_KEY = "sqoop.mapred.auto.progress.report";
+
+ // Sleep for 10 seconds at a time.
+ static final int DEFAULT_SLEEP_INTERVAL = 10000;
+
+ // Report progress every 30 seconds.
+ static final int DEFAULT_REPORT_INTERVAL = 30000;
+
+ // Disable max progress, by default.
+ static final int DEFAULT_MAX_PROGRESS = 0;
+
+ private class ProgressThread extends Thread {
+
+ private volatile boolean keepGoing; // while this is true, thread runs.
+ private Context context;
+ private long startTimeMillis;
+ private long lastReportMillis;
+
+ public ProgressThread(final Context ctxt) {
+ this.context = ctxt;
+ this.keepGoing = true;
+ }
+
+ public void signalShutdown() {
+ this.keepGoing = false; // volatile update.
+ this.interrupt();
+ }
+
+ public void run() {
+ this.lastReportMillis = System.currentTimeMillis();
+ this.startTimeMillis = this.lastReportMillis;
+
+ final long MAX_PROGRESS = AutoProgressMapper.this.maxProgressPeriod;
+ final long REPORT_INTERVAL = AutoProgressMapper.this.reportInterval;
+ final long SLEEP_INTERVAL = AutoProgressMapper.this.sleepInterval;
+
+ // in a loop:
+ // * Check that we haven't run for too long (maxProgressPeriod)
+ // * If it's been a report interval since we last made progress, make more.
+ // * Sleep for a bit.
+ // * If the parent thread has signaled for exit, do so.
+ while (this.keepGoing) {
+ long curTimeMillis = System.currentTimeMillis();
+
+ if (MAX_PROGRESS != 0 && curTimeMillis - this.startTimeMillis > MAX_PROGRESS) {
+ this.keepGoing = false;
+ LOG.info("Auto-progress thread exiting after " + MAX_PROGRESS + " ms.");
+ break;
+ }
+
+ if (curTimeMillis - this.lastReportMillis > REPORT_INTERVAL) {
+ // It's been a full report interval -- claim progress.
+ LOG.debug("Auto-progress thread reporting progress");
+ this.context.progress();
+ this.lastReportMillis = curTimeMillis;
+ }
+
+ // Unless we got an interrupt while we were working,
+ // sleep a bit before doing more work.
+ if (!this.interrupted()) {
+ try {
+ Thread.sleep(SLEEP_INTERVAL);
+ } catch (InterruptedException ie) {
+ // we were notified on something; not necessarily an error.
+ }
+ }
+ }
+
+ LOG.info("Auto-progress thread is finished. keepGoing=" + this.keepGoing);
+ }
+ }
+
+ /**
+ * Set configuration parameters for the auto-progress thread.
+ */
+ private final void configureAutoProgress(Configuration job) {
+ this.maxProgressPeriod = job.getInt(MAX_PROGRESS_PERIOD_KEY, DEFAULT_MAX_PROGRESS);
+ this.sleepInterval = job.getInt(SLEEP_INTERVAL_KEY, DEFAULT_SLEEP_INTERVAL);
+ this.reportInterval = job.getInt(REPORT_INTERVAL_KEY, DEFAULT_REPORT_INTERVAL);
+
+ if (this.reportInterval < 1) {
+ LOG.warn("Invalid " + REPORT_INTERVAL_KEY + "; setting to " + DEFAULT_REPORT_INTERVAL);
+ this.reportInterval = DEFAULT_REPORT_INTERVAL;
+ }
+
+ if (this.sleepInterval > this.reportInterval || this.sleepInterval < 1) {
+ LOG.warn("Invalid " + SLEEP_INTERVAL_KEY + "; setting to " + DEFAULT_SLEEP_INTERVAL);
+ this.sleepInterval = DEFAULT_SLEEP_INTERVAL;
+ }
+
+ if (this.maxProgressPeriod < 0) {
+ LOG.warn("Invalid " + MAX_PROGRESS_PERIOD_KEY + "; setting to " + DEFAULT_MAX_PROGRESS);
+ this.maxProgressPeriod = DEFAULT_MAX_PROGRESS;
+ }
+ }
+
+
+ // map() method intentionally omitted; Mapper.map() is the identity mapper.
+
+
+ /**
+ * Run the mapping process for this task, wrapped in an auto-progress system.
+ */
+ public void run(Context context) throws IOException, InterruptedException {
+ configureAutoProgress(context.getConfiguration());
+ ProgressThread thread = this.new ProgressThread(context);
+
+ try {
+ thread.setDaemon(true);
+ thread.start();
+
+ // use default run() method to actually drive the mapping.
+ super.run(context);
+ } finally {
+ // Tell the progress thread to exit..
+ LOG.debug("Instructing auto-progress thread to quit.");
+ thread.signalShutdown();
+ try {
+ // And wait for that to happen.
+ LOG.debug("Waiting for progress thread shutdown...");
+ thread.join();
+ LOG.debug("Progress thread shutdown detected.");
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted when waiting on auto-progress thread: " + ie.toString());
+ }
+ }
+ }
+
+}
diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java
new file mode 100644
index 00000000..6031a4d0
--- /dev/null
+++ b/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
+import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+import org.apache.hadoop.sqoop.ConnFactory;
+import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.manager.ConnManager;
+import org.apache.hadoop.sqoop.orm.TableClassName;
+import org.apache.hadoop.sqoop.util.ClassLoaderStack;
+import org.apache.hadoop.sqoop.util.PerfCounters;
+
+/**
+ * Actually runs a jdbc import job using the ORM files generated by the sqoop.orm package.
+ * Uses DataDrivenDBInputFormat
+ */
+public class DataDrivenImportJob {
+
+ public static final Log LOG = LogFactory.getLog(DataDrivenImportJob.class.getName());
+
+ private ImportOptions options;
+
+ public DataDrivenImportJob(final ImportOptions opts) {
+ this.options = opts;
+ }
+
+ /**
+ * Run an import job to read a table in to HDFS
+ *
+ * @param tableName the database table to read
+ * @param ormJarFile the Jar file to insert into the dcache classpath. (may be null)
+ * @param splitByCol the column of the database table to use to split the import
+ * @param conf A fresh Hadoop Configuration to use to build an MR job.
+ */
+ public void runImport(String tableName, String ormJarFile, String splitByCol,
+ Configuration conf) throws IOException {
+
+ LOG.info("Beginning data-driven import of " + tableName);
+
+ String tableClassName = new TableClassName(options).getClassForTable(tableName);
+
+ boolean isLocal = "local".equals(conf.get("mapred.job.tracker"));
+ ClassLoader prevClassLoader = null;
+ if (isLocal) {
+ // If we're using the LocalJobRunner, then instead of using the compiled jar file
+ // as the job source, we're running in the current thread. Push on another classloader
+ // that loads from that jar in addition to everything currently on the classpath.
+ prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile, tableClassName);
+ }
+
+ try {
+ Job job = new Job(conf);
+
+ // Set the external jar to use for the job.
+ job.getConfiguration().set("mapred.jar", ormJarFile);
+
+ String hdfsWarehouseDir = options.getWarehouseDir();
+ Path outputPath;
+
+ if (null != hdfsWarehouseDir) {
+ Path hdfsWarehousePath = new Path(hdfsWarehouseDir);
+ hdfsWarehousePath.makeQualified(FileSystem.get(job.getConfiguration()));
+ outputPath = new Path(hdfsWarehousePath, tableName);
+ } else {
+ outputPath = new Path(tableName);
+ }
+
+ if (options.getFileLayout() == ImportOptions.FileLayout.TextFile) {
+ job.setOutputFormatClass(RawKeyTextOutputFormat.class);
+ job.setMapperClass(TextImportMapper.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(NullWritable.class);
+ } else if (options.getFileLayout() == ImportOptions.FileLayout.SequenceFile) {
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setMapperClass(AutoProgressMapper.class);
+ SequenceFileOutputFormat.setCompressOutput(job, true);
+ SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+ job.getConfiguration().set("mapred.output.value.class", tableClassName);
+ } else {
+ LOG.warn("Unknown file layout specified: " + options.getFileLayout() + "; using text.");
+ }
+
+ int numMapTasks = options.getNumMappers();
+ if (numMapTasks < 1) {
+ numMapTasks = ImportOptions.DEFAULT_NUM_MAPPERS;
+ LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
+ }
+ job.getConfiguration().setInt("mapred.map.tasks", numMapTasks);
+ job.setNumReduceTasks(0);
+
+ job.setInputFormatClass(DataDrivenDBInputFormat.class);
+
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ ConnManager mgr = new ConnFactory(conf).getManager(options);
+ String username = options.getUsername();
+ if (null == username || username.length() == 0) {
+ DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
+ options.getConnectString());
+ } else {
+ DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
+ options.getConnectString(), username, options.getPassword());
+ }
+
+ String [] colNames = options.getColumns();
+ if (null == colNames) {
+ colNames = mgr.getColumnNames(tableName);
+ }
+
+ // It's ok if the where clause is null in DBInputFormat.setInput.
+ String whereClause = options.getWhereClause();
+
+ // We can't set the class properly in here, because we may not have the
+ // jar loaded in this JVM. So we start by calling setInput() with DBWritable,
+ // and then overriding the string manually.
+ DataDrivenDBInputFormat.setInput(job, DBWritable.class, tableName, whereClause,
+ splitByCol, colNames);
+ job.getConfiguration().set(DBConfiguration.INPUT_CLASS_PROPERTY, tableClassName);
+
+ PerfCounters counters = new PerfCounters();
+ counters.startClock();
+
+ try {
+ job.waitForCompletion(false);
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+
+ counters.stopClock();
+ counters.addBytes(job.getCounters().getGroup("FileSystemCounters")
+ .findCounter("HDFS_BYTES_WRITTEN").getValue());
+ LOG.info("Transferred " + counters.toString());
+ } finally {
+ if (isLocal && null != prevClassLoader) {
+ // unload the special classloader for this jar.
+ ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
+ }
+ }
+ }
+}
diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/RawKeyTextOutputFormat.java b/src/java/org/apache/hadoop/sqoop/mapreduce/RawKeyTextOutputFormat.java
new file mode 100644
index 00000000..c216ac60
--- /dev/null
+++ b/src/java/org/apache/hadoop/sqoop/mapreduce/RawKeyTextOutputFormat.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapreduce;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.*;
+
+/** An {@link OutputFormat} that writes plain text files.
+ * Only writes the key. Does not write any delimiter/newline after the key.
+ */
+public class RawKeyTextOutputFormat extends FileOutputFormat {
+
+ protected static class RawKeyRecordWriter extends RecordWriter {
+ private static final String utf8 = "UTF-8";
+
+ protected DataOutputStream out;
+
+ public RawKeyRecordWriter(DataOutputStream out) {
+ this.out = out;
+ }
+
+ /**
+ * Write the object to the byte stream, handling Text as a special
+ * case.
+ * @param o the object to print
+ * @throws IOException if the write throws, we pass it on
+ */
+ private void writeObject(Object o) throws IOException {
+ if (o instanceof Text) {
+ Text to = (Text) o;
+ out.write(to.getBytes(), 0, to.getLength());
+ } else {
+ out.write(o.toString().getBytes(utf8));
+ }
+ }
+
+ public synchronized void write(K key, V value) throws IOException {
+ writeObject(key);
+ }
+
+ public synchronized void close(TaskAttemptContext context) throws IOException {
+ out.close();
+ }
+ }
+
+ public RecordWriter getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ boolean isCompressed = getCompressOutput(context);
+ Configuration conf = context.getConfiguration();
+ String ext = "";
+ CompressionCodec codec = null;
+
+ if (isCompressed) {
+ // create the named codec
+ Class extends CompressionCodec> codecClass =
+ getOutputCompressorClass(context, GzipCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, conf);
+
+ ext = codec.getDefaultExtension();
+ }
+
+ Path file = getDefaultWorkFile(context, ext);
+ FileSystem fs = file.getFileSystem(conf);
+ FSDataOutputStream fileOut = fs.create(file, false);
+ DataOutputStream ostream = fileOut;
+
+ if (isCompressed) {
+ ostream = new DataOutputStream(codec.createOutputStream(fileOut));
+ }
+
+ return new RawKeyRecordWriter(ostream);
+ }
+}
+
diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java b/src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java
new file mode 100644
index 00000000..3970105d
--- /dev/null
+++ b/src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+/**
+ * Converts an input record into a string representation and emit it.
+ */
+public class TextImportMapper
+ extends AutoProgressMapper {
+
+ private Text outkey;
+
+ public TextImportMapper() {
+ outkey = new Text();
+ }
+
+ public void map(LongWritable key, DBWritable val, Context context)
+ throws IOException, InterruptedException {
+ outkey.set(val.toString());
+ context.write(outkey, NullWritable.get());
+ }
+}
diff --git a/src/test/org/apache/hadoop/sqoop/AllTests.java b/src/test/org/apache/hadoop/sqoop/AllTests.java
index 213fb0ef..f396fd39 100644
--- a/src/test/org/apache/hadoop/sqoop/AllTests.java
+++ b/src/test/org/apache/hadoop/sqoop/AllTests.java
@@ -23,7 +23,8 @@
import org.apache.hadoop.sqoop.lib.TestRecordParser;
import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
import org.apache.hadoop.sqoop.manager.TestSqlManager;
-import org.apache.hadoop.sqoop.mapred.TestAutoProgressMapRunner;
+import org.apache.hadoop.sqoop.mapred.MapredTests;
+import org.apache.hadoop.sqoop.mapreduce.MapreduceTests;
import org.apache.hadoop.sqoop.orm.TestClassWriter;
import org.apache.hadoop.sqoop.orm.TestParseMethods;
@@ -32,24 +33,22 @@
/**
* All tests for Sqoop (org.apache.hadoop.sqoop)
- *
- *
*/
-public final class AllTests {
+public final class AllTests {
private AllTests() { }
public static Test suite() {
TestSuite suite = new TestSuite("Tests for org.apache.hadoop.sqoop");
- suite.addTestSuite(TestAutoProgressMapRunner.class);
suite.addTestSuite(TestAllTables.class);
suite.addTestSuite(TestHsqldbManager.class);
suite.addTestSuite(TestSqlManager.class);
suite.addTestSuite(TestClassWriter.class);
suite.addTestSuite(TestColumnTypes.class);
suite.addTestSuite(TestMultiCols.class);
- suite.addTestSuite(TestOrderBy.class);
+ suite.addTestSuite(TestMultiMaps.class);
+ suite.addTestSuite(TestSplitBy.class);
suite.addTestSuite(TestWhere.class);
suite.addTestSuite(TestHiveImport.class);
suite.addTestSuite(TestRecordParser.class);
@@ -58,6 +57,8 @@ public static Test suite() {
suite.addTestSuite(TestParseMethods.class);
suite.addTestSuite(TestConnFactory.class);
suite.addTest(ThirdPartyTests.suite());
+ suite.addTest(MapredTests.suite());
+ suite.addTest(MapreduceTests.suite());
return suite;
}
diff --git a/src/test/org/apache/hadoop/sqoop/TestAllTables.java b/src/test/org/apache/hadoop/sqoop/TestAllTables.java
index de86e076..2f05e09d 100644
--- a/src/test/org/apache/hadoop/sqoop/TestAllTables.java
+++ b/src/test/org/apache/hadoop/sqoop/TestAllTables.java
@@ -36,9 +36,6 @@
/**
* Test the --all-tables functionality that can import multiple tables.
- * ;
- *
- *
*/
public class TestAllTables extends ImportJobTestCase {
@@ -63,6 +60,8 @@ public class TestAllTables extends ImportJobTestCase {
args.add(getWarehouseDir());
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
+ args.add("--num-mappers");
+ args.add("1");
return args.toArray(new String[0]);
}
@@ -107,7 +106,7 @@ public void testMultiTableImport() throws IOException {
Path warehousePath = new Path(this.getWarehouseDir());
for (String tableName : this.tableNames) {
Path tablePath = new Path(warehousePath, tableName);
- Path filePath = new Path(tablePath, "part-00000");
+ Path filePath = new Path(tablePath, "part-m-00000");
// dequeue the expected value for this table. This
// list has the same order as the tableNames list.
diff --git a/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java b/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java
index 82d8e79c..3a5e1ddf 100644
--- a/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java
+++ b/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java
@@ -36,9 +36,6 @@
* write(DataOutput), readFields(DataInput)
* - And optionally, that we can push to the database:
* write(PreparedStatement)
- *
- *
- *
*/
public class TestColumnTypes extends ImportJobTestCase {
@@ -217,14 +214,24 @@ public void testTimestamp1() {
@Test
public void testTimestamp2() {
+ try {
+ LOG.debug("Beginning testTimestamp2");
verifyType("TIMESTAMP", "'2009-04-24 18:24:00.0002'",
"2009-04-24 18:24:00.000200000",
"2009-04-24 18:24:00.0002");
+ } finally {
+ LOG.debug("End testTimestamp2");
+ }
}
@Test
public void testTimestamp3() {
+ try {
+ LOG.debug("Beginning testTimestamp3");
verifyType("TIMESTAMP", "null", null);
+ } finally {
+ LOG.debug("End testTimestamp3");
+ }
}
@Test
diff --git a/src/test/org/apache/hadoop/sqoop/TestMultiMaps.java b/src/test/org/apache/hadoop/sqoop/TestMultiMaps.java
new file mode 100644
index 00000000..f66af202
--- /dev/null
+++ b/src/test/org/apache/hadoop/sqoop/TestMultiMaps.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.apache.hadoop.sqoop.ImportOptions.InvalidOptionsException;
+import org.apache.hadoop.sqoop.orm.CompilationManager;
+import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
+import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
+import org.apache.hadoop.sqoop.testutil.SeqFileReader;
+import org.apache.hadoop.sqoop.util.ClassLoaderStack;
+
+/**
+ * Test that using multiple mapper splits works.
+ */
+public class TestMultiMaps extends ImportJobTestCase {
+
+ /**
+ * Create the argv to pass to Sqoop
+ * @return the argv as an array of strings.
+ */
+ private String [] getArgv(boolean includeHadoopFlags, String [] colNames, String splitByCol) {
+ String columnsString = "";
+ for (String col : colNames) {
+ columnsString += col + ",";
+ }
+
+ ArrayList args = new ArrayList();
+
+ if (includeHadoopFlags) {
+ args.add("-D");
+ args.add("mapred.job.tracker=local");
+ args.add("-D");
+ args.add("fs.default.name=file:///");
+ }
+
+ args.add("--table");
+ args.add(HsqldbTestServer.getTableName());
+ args.add("--columns");
+ args.add(columnsString);
+ args.add("--split-by");
+ args.add(splitByCol);
+ args.add("--warehouse-dir");
+ args.add(getWarehouseDir());
+ args.add("--connect");
+ args.add(HsqldbTestServer.getUrl());
+ args.add("--as-sequencefile");
+ args.add("--num-mappers");
+ args.add("2");
+
+ return args.toArray(new String[0]);
+ }
+
+ // this test just uses the two int table.
+ protected String getTableName() {
+ return HsqldbTestServer.getTableName();
+ }
+
+ /** @return a list of Path objects for each data file */
+ protected List getDataFilePaths() throws IOException {
+ List paths = new ArrayList();
+ Configuration conf = new Configuration();
+ conf.set("fs.default.name", "file:///");
+ FileSystem fs = FileSystem.get(conf);
+
+ FileStatus [] stats = fs.listStatus(getTablePath());
+ for (FileStatus stat : stats) {
+ paths.add(stat.getPath());
+ }
+
+ return paths;
+ }
+
+ /**
+ * Given a comma-delimited list of integers, grab and parse the first int
+ * @param str a comma-delimited list of values, the first of which is an int.
+ * @return the first field in the string, cast to int
+ */
+ private int getFirstInt(String str) {
+ String [] parts = str.split(",");
+ return Integer.parseInt(parts[0]);
+ }
+
+ public void runMultiMapTest(String splitByCol, int expectedSum)
+ throws IOException {
+
+ String [] columns = HsqldbTestServer.getFieldNames();
+ ClassLoader prevClassLoader = null;
+ SequenceFile.Reader reader = null;
+
+ String [] argv = getArgv(true, columns, splitByCol);
+ runImport(argv);
+ try {
+ ImportOptions opts = new ImportOptions();
+ opts.parse(getArgv(false, columns, splitByCol));
+
+ CompilationManager compileMgr = new CompilationManager(opts);
+ String jarFileName = compileMgr.getJarFilename();
+
+ prevClassLoader = ClassLoaderStack.addJarFile(jarFileName, getTableName());
+
+ List paths = getDataFilePaths();
+ Configuration conf = new Configuration();
+ int curSum = 0;
+
+ assertTrue("Found only " + paths.size() + " path(s); expected > 1.", paths.size() > 1);
+
+ // We expect multiple files. We need to open all the files and sum up the
+ // first column across all of them.
+ for (Path p : paths) {
+ reader = SeqFileReader.getSeqFileReader(p.toString());
+
+ // here we can actually instantiate (k, v) pairs.
+ Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+ // We know that these values are two ints separated by a ',' character.
+ // Since this is all dynamic, though, we don't want to actually link against
+ // the class and use its methods. So we just parse this back into int fields manually.
+ // Sum them up and ensure that we get the expected total for the first column, to
+ // verify that we got all the results from the db into the file.
+
+ // now sum up everything in the file.
+ while (reader.next(key) != null) {
+ reader.getCurrentValue(val);
+ curSum += getFirstInt(val.toString());
+ }
+
+ IOUtils.closeStream(reader);
+ reader = null;
+ }
+
+ assertEquals("Total sum of first db column mismatch", expectedSum, curSum);
+ } catch (InvalidOptionsException ioe) {
+ fail(ioe.toString());
+ } finally {
+ IOUtils.closeStream(reader);
+
+ if (null != prevClassLoader) {
+ ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
+ }
+ }
+ }
+
+ public void testSplitByFirstCol() throws IOException {
+ runMultiMapTest("INTFIELD1", HsqldbTestServer.getFirstColSum());
+ }
+}
diff --git a/src/test/org/apache/hadoop/sqoop/TestOrderBy.java b/src/test/org/apache/hadoop/sqoop/TestSplitBy.java
similarity index 80%
rename from src/test/org/apache/hadoop/sqoop/TestOrderBy.java
rename to src/test/org/apache/hadoop/sqoop/TestSplitBy.java
index d39d29eb..b4e11264 100644
--- a/src/test/org/apache/hadoop/sqoop/TestOrderBy.java
+++ b/src/test/org/apache/hadoop/sqoop/TestSplitBy.java
@@ -34,15 +34,15 @@
import org.apache.hadoop.sqoop.util.ClassLoaderStack;
/**
- * Test that --order-by works
+ * Test that --split-by works
*/
-public class TestOrderBy extends ImportJobTestCase {
+public class TestSplitBy extends ImportJobTestCase {
/**
* Create the argv to pass to Sqoop
* @return the argv as an array of strings.
*/
- private String [] getArgv(boolean includeHadoopFlags, String [] colNames, String orderByCol) {
+ private String [] getArgv(boolean includeHadoopFlags, String [] colNames, String splitByCol) {
String columnsString = "";
for (String col : colNames) {
columnsString += col + ",";
@@ -63,13 +63,15 @@ public class TestOrderBy extends ImportJobTestCase {
args.add(HsqldbTestServer.getTableName());
args.add("--columns");
args.add(columnsString);
- args.add("--order-by");
- args.add(orderByCol);
+ args.add("--split-by");
+ args.add(splitByCol);
args.add("--warehouse-dir");
args.add(getWarehouseDir());
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--as-sequencefile");
+ args.add("--num-mappers");
+ args.add("1");
return args.toArray(new String[0]);
}
@@ -90,18 +92,18 @@ private int getFirstInt(String str) {
return Integer.parseInt(parts[0]);
}
- public void runOrderByTest(String orderByCol, String firstValStr, int expectedSum)
+ public void runSplitByTest(String splitByCol, int expectedSum)
throws IOException {
String [] columns = HsqldbTestServer.getFieldNames();
ClassLoader prevClassLoader = null;
SequenceFile.Reader reader = null;
- String [] argv = getArgv(true, columns, orderByCol);
+ String [] argv = getArgv(true, columns, splitByCol);
runImport(argv);
try {
ImportOptions opts = new ImportOptions();
- opts.parse(getArgv(false, columns, orderByCol));
+ opts.parse(getArgv(false, columns, splitByCol));
CompilationManager compileMgr = new CompilationManager(opts);
String jarFileName = compileMgr.getJarFilename();
@@ -115,22 +117,14 @@ public void runOrderByTest(String orderByCol, String firstValStr, int expectedSu
Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf);
- if (reader.next(key) == null) {
- fail("Empty SequenceFile during import");
- }
-
- // make sure that the value we think should be at the top, is.
- reader.getCurrentValue(val);
- assertEquals("Invalid ordering within sorted SeqFile", firstValStr, val.toString());
-
// We know that these values are two ints separated by a ',' character.
// Since this is all dynamic, though, we don't want to actually link against
// the class and use its methods. So we just parse this back into int fields manually.
// Sum them up and ensure that we get the expected total for the first column, to
// verify that we got all the results from the db into the file.
- int curSum = getFirstInt(val.toString());
- // now sum up everything else in the file.
+ // Sum up everything in the file.
+ int curSum = 0;
while (reader.next(key) != null) {
reader.getCurrentValue(val);
curSum += getFirstInt(val.toString());
@@ -148,13 +142,13 @@ public void runOrderByTest(String orderByCol, String firstValStr, int expectedSu
}
}
- public void testOrderByFirstCol() throws IOException {
- String orderByCol = "INTFIELD1";
- runOrderByTest(orderByCol, "1,8\n", HsqldbTestServer.getFirstColSum());
+ public void testSplitByFirstCol() throws IOException {
+ String splitByCol = "INTFIELD1";
+ runSplitByTest(splitByCol, HsqldbTestServer.getFirstColSum());
}
- public void testOrderBySecondCol() throws IOException {
- String orderByCol = "INTFIELD2";
- runOrderByTest(orderByCol, "7,2\n", HsqldbTestServer.getFirstColSum());
+ public void testSplitBySecondCol() throws IOException {
+ String splitByCol = "INTFIELD2";
+ runSplitByTest(splitByCol, HsqldbTestServer.getFirstColSum());
}
}
diff --git a/src/test/org/apache/hadoop/sqoop/TestWhere.java b/src/test/org/apache/hadoop/sqoop/TestWhere.java
index a2ac80d3..669bd186 100644
--- a/src/test/org/apache/hadoop/sqoop/TestWhere.java
+++ b/src/test/org/apache/hadoop/sqoop/TestWhere.java
@@ -68,13 +68,15 @@ public class TestWhere extends ImportJobTestCase {
args.add(columnsString);
args.add("--where");
args.add(whereClause);
- args.add("--order-by");
+ args.add("--split-by");
args.add("INTFIELD1");
args.add("--warehouse-dir");
args.add(getWarehouseDir());
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--as-sequencefile");
+ args.add("--num-mappers");
+ args.add("1");
return args.toArray(new String[0]);
}
diff --git a/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java b/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java
index 9ff9459c..96581a16 100644
--- a/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java
+++ b/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java
@@ -61,8 +61,10 @@ public class TestHiveImport extends ImportJobTestCase {
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--hive-import");
- args.add("--order-by");
+ args.add("--split-by");
args.add(getColNames()[0]);
+ args.add("--num-mappers");
+ args.add("1");
if (null != moreArgs) {
for (String arg: moreArgs) {
diff --git a/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java b/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
index fc72aa86..6f98cd09 100644
--- a/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
+++ b/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
@@ -198,6 +198,8 @@ private String getCurrentUser() {
args.add(getCurrentUser());
args.add("--where");
args.add("id > 1");
+ args.add("--num-mappers");
+ args.add("1");
if (mysqlOutputDelims) {
args.add("--mysql-delimiters");
diff --git a/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java b/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java
index 49166b59..5d972b85 100644
--- a/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java
+++ b/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java
@@ -150,6 +150,8 @@ public void tearDown() {
args.add("--password");
args.add(AUTH_TEST_PASS);
args.add("--mysql-delimiters");
+ args.add("--num-mappers");
+ args.add("1");
return args.toArray(new String[0]);
}
diff --git a/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java b/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java
index 289ca2e7..fdccd07e 100644
--- a/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java
+++ b/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java
@@ -156,6 +156,8 @@ public void tearDown() {
args.add(ORACLE_USER_NAME);
args.add("--password");
args.add(ORACLE_USER_PASS);
+ args.add("--num-mappers");
+ args.add("1");
return args.toArray(new String[0]);
}
diff --git a/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java b/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java
index 01b63a8d..dde921f2 100644
--- a/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java
+++ b/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java
@@ -181,7 +181,7 @@ private void doImportAndVerify(boolean isDirect, String [] expectedResults)
if (isDirect) {
filePath = new Path(tablePath, "data-00000");
} else {
- filePath = new Path(tablePath, "part-00000");
+ filePath = new Path(tablePath, "part-m-00000");
}
File tableFile = new File(tablePath.toString());
diff --git a/src/test/org/apache/hadoop/sqoop/mapred/MapredTests.java b/src/test/org/apache/hadoop/sqoop/mapred/MapredTests.java
new file mode 100644
index 00000000..948dddc0
--- /dev/null
+++ b/src/test/org/apache/hadoop/sqoop/mapred/MapredTests.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapred;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+/**
+ * All tests for Sqoop old mapred-api (org.apache.hadoop.sqoop.mapred)
+ */
+public final class MapredTests {
+
+ private MapredTests() { }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite("Tests for org.apache.hadoop.sqoop.mapred");
+ suite.addTestSuite(TestAutoProgressMapRunner.class);
+ return suite;
+ }
+}
+
diff --git a/src/test/org/apache/hadoop/sqoop/mapreduce/MapreduceTests.java b/src/test/org/apache/hadoop/sqoop/mapreduce/MapreduceTests.java
new file mode 100644
index 00000000..6ec92703
--- /dev/null
+++ b/src/test/org/apache/hadoop/sqoop/mapreduce/MapreduceTests.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapreduce;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+/**
+ * All tests for Sqoop new mapreduce-api (org.apache.hadoop.sqoop.mapreduce)
+ */
+public final class MapreduceTests {
+
+ private MapreduceTests() { }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite("Tests for org.apache.hadoop.sqoop.mapreduce");
+ suite.addTestSuite(TestTextImportMapper.class);
+ return suite;
+ }
+}
+
diff --git a/src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java b/src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java
new file mode 100644
index 00000000..ad330187
--- /dev/null
+++ b/src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+
+import junit.framework.TestCase;
+
+/**
+ * Test the TextImportMapper
+ */
+public class TestTextImportMapper extends TestCase {
+
+
+ static class DummyDBWritable implements DBWritable {
+ long field;
+
+ public DummyDBWritable(final long val) {
+ this.field = val;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ field = in.readLong();
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(field);
+ }
+
+ public void readFields(ResultSet rs) throws SQLException {
+ field = rs.getLong(1);
+ }
+
+ public void write(PreparedStatement s) throws SQLException {
+ s.setLong(1, field);
+ }
+
+ public String toString() {
+ return "" + field;
+ }
+ }
+
+ public void testTextImport() {
+ TextImportMapper m = new TextImportMapper();
+ MapDriver driver =
+ new MapDriver(m);
+
+ driver.withInput(new LongWritable(0), new DummyDBWritable(42))
+ .withOutput(new Text("42"), NullWritable.get())
+ .runTest();
+ }
+}
diff --git a/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java b/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java
index 014161b8..fdc4f218 100644
--- a/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java
+++ b/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java
@@ -73,8 +73,8 @@ public class TestParseMethods extends ImportJobTestCase {
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--as-textfile");
- args.add("--order-by");
- args.add("DATA_COL0"); // always order by first column.
+ args.add("--split-by");
+ args.add("DATA_COL0"); // always split by first column.
args.add("--fields-terminated-by");
args.add(fieldTerminator);
args.add("--lines-terminated-by");
@@ -87,7 +87,8 @@ public class TestParseMethods extends ImportJobTestCase {
args.add("--optionally-enclosed-by");
}
args.add(encloser);
-
+ args.add("--num-mappers");
+ args.add("1");
return args.toArray(new String[0]);
}
diff --git a/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java b/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java
index 34165a62..a3090119 100644
--- a/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java
+++ b/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java
@@ -277,7 +277,7 @@ protected void verifyReadback(int colNum, String expectedVal) {
colNames = getColNames();
}
- String orderByCol = colNames[0];
+ String splitByCol = colNames[0];
String columnsString = "";
for (String col : colNames) {
columnsString += col + ",";
@@ -298,13 +298,15 @@ protected void verifyReadback(int colNum, String expectedVal) {
args.add(getTableName());
args.add("--columns");
args.add(columnsString);
- args.add("--order-by");
- args.add(orderByCol);
+ args.add("--split-by");
+ args.add(splitByCol);
args.add("--warehouse-dir");
args.add(getWarehouseDir());
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--as-sequencefile");
+ args.add("--num-mappers");
+ args.add("1");
return args.toArray(new String[0]);
}
@@ -316,7 +318,7 @@ protected Path getTablePath() {
}
protected Path getDataFilePath() {
- return new Path(getTablePath(), "part-00000");
+ return new Path(getTablePath(), "part-m-00000");
}
protected void removeTableDir() {
@@ -350,8 +352,7 @@ protected void verifyImport(String expectedVal, String [] importCols) {
ret = ToolRunner.run(importer, getArgv(true, importCols));
} catch (Exception e) {
LOG.error("Got exception running Sqoop: " + e.toString());
- e.printStackTrace();
- ret = 1;
+ throw new RuntimeException(e);
}
// expect a successful return.