5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 20:40:58 +08:00

MAPREDUCE-907. Sqoop should use more intelligent splits. Contributed by Aaron Kimball.

From: Thomas White <tomwhite@apache.org>

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149828 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Bayer 2011-07-22 20:03:24 +00:00
parent 629559c3fb
commit 37cadedc8f
26 changed files with 1040 additions and 73 deletions

View File

@ -26,6 +26,38 @@ to call at top-level: ant deploy-contrib compile-core-test
<import file="../build-contrib.xml"/>
<property environment="env"/>
<property name="sqoop.thirdparty.lib.dir" value="" />
<property name="mrunit.class.dir" value="${build.dir}/../mrunit/classes" />
<!-- ================================================================== -->
<!-- Compile test code -->
<!-- Override with our own version so we can enforce build dependencies -->
<!-- on compile-mapred-test for MiniMRCluster, and MRUnit. -->
<!-- ================================================================== -->
<target name="compile-test" depends="compile-examples" if="test.available">
<echo message="Compiling ${name} dependencies" />
<!-- need top-level compile-mapred-test for MiniMRCluster -->
<subant target="compile-mapred-test">
<fileset dir="../../.." includes="build.xml" />
</subant>
<!-- Need MRUnit compiled for some tests -->
<subant target="compile">
<fileset dir="../mrunit" includes="build.xml" />
</subant>
<echo message="contrib: ${name}"/>
<javac
encoding="${build.encoding}"
srcdir="${src.test}"
includes="**/*.java"
destdir="${build.test}"
debug="${javac.debug}">
<classpath>
<path refid="test.classpath"/>
<pathelement path="${mrunit.class.dir}" />
</classpath>
</javac>
</target>
<!-- ================================================================== -->
<!-- Run unit tests -->
@ -54,6 +86,11 @@ to call at top-level: ant deploy-contrib compile-core-test
<sysproperty key="build.test" value="${build.test}"/>
<sysproperty key="contrib.name" value="${name}"/>
<!-- define this property to force Sqoop to throw better exceptions on errors
during testing, instead of printing a short message and exiting with status 1. -->
<sysproperty key="sqoop.throwOnError" value="" />
<!--
Added property needed to use the .class files for compilation
instead of depending on hadoop-*-core.jar
@ -92,13 +129,16 @@ to call at top-level: ant deploy-contrib compile-core-test
-->
<sysproperty key="hive.home" value="${basedir}/testdata/hive" />
<!-- tools.jar from Sun JDK also required to invoke javac. -->
<classpath>
<path refid="test.classpath"/>
<path refid="contrib-classpath"/>
<!-- tools.jar from Sun JDK also required to invoke javac. -->
<pathelement path="${env.JAVA_HOME}/lib/tools.jar" />
<!-- need thirdparty JDBC drivers for thirdparty tests -->
<fileset dir="${sqoop.thirdparty.lib.dir}"
includes="*.jar" />
<!-- need MRUnit for some tests -->
<pathelement path="${mrunit.class.dir}" />
</classpath>
<formatter type="${test.junit.output.format}" />
<batchtest todir="${build.test}" unless="testcase">

12
ivy.xml
View File

@ -64,6 +64,18 @@
name="hsqldb"
rev="${hsqldb.version}"
conf="common->default"/>
<dependency org="javax.servlet"
name="servlet-api"
rev="${servlet-api.version}"
conf="common->master"/>
<dependency org="org.mortbay.jetty"
name="jetty"
rev="${jetty.version}"
conf="common->master"/>
<dependency org="org.mortbay.jetty"
name="jetty-util"
rev="${jetty-util.version}"
conf="common->master"/>
<dependency org="org.apache.hadoop"
name="avro"
rev="1.0.0"

View File

@ -87,7 +87,7 @@ public enum FileLayout {
private String jarOutputDir;
private ControlAction action;
private String hadoopHome;
private String orderByCol;
private String splitByCol;
private String whereClause;
private String debugSqlCmd;
private String driverClassName;
@ -99,6 +99,7 @@ public enum FileLayout {
private boolean hiveImport;
private String packageName; // package to prepend to auto-named classes.
private String className; // package+class to apply to individual table import.
private int numMappers;
private char inputFieldDelim;
private char inputRecordDelim;
@ -114,6 +115,8 @@ public enum FileLayout {
private boolean areDelimsManuallySet;
public static final int DEFAULT_NUM_MAPPERS = 4;
private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
public ImportOptions() {
@ -153,7 +156,7 @@ private void loadFromProperties() {
this.password = props.getProperty("db.password", this.password);
this.tableName = props.getProperty("db.table", this.tableName);
this.connectString = props.getProperty("db.connect.url", this.connectString);
this.orderByCol = props.getProperty("db.sort.column", this.orderByCol);
this.splitByCol = props.getProperty("db.split.column", this.splitByCol);
this.whereClause = props.getProperty("db.where.clause", this.whereClause);
this.driverClassName = props.getProperty("jdbc.driver", this.driverClassName);
this.warehouseDir = props.getProperty("hdfs.warehouse.dir", this.warehouseDir);
@ -227,6 +230,8 @@ private void initDefaults() {
this.areDelimsManuallySet = false;
this.numMappers = DEFAULT_NUM_MAPPERS;
loadFromProperties();
}
@ -255,7 +260,7 @@ public static void printUsage() {
System.out.println("Import control options:");
System.out.println("--table (tablename) Table to read");
System.out.println("--columns (col,col,col...) Columns to export from table");
System.out.println("--order-by (column-name) Column of the table used to order results");
System.out.println("--split-by (column-name) Column of the table used to split work units");
System.out.println("--where (where clause) Where clause to use during export");
System.out.println("--hadoop-home (dir) Override $HADOOP_HOME");
System.out.println("--hive-home (dir) Override $HIVE_HOME");
@ -263,9 +268,10 @@ public static void printUsage() {
System.out.println("--as-sequencefile Imports data to SequenceFiles");
System.out.println("--as-textfile Imports data as plain text (default)");
System.out.println("--all-tables Import all tables in database");
System.out.println(" (Ignores --table, --columns and --order-by)");
System.out.println(" (Ignores --table, --columns and --split-by)");
System.out.println("--hive-import If set, then import the table into Hive.");
System.out.println(" (Uses Hive's default delimiters if none are set.)");
System.out.println("-m, --num-mappers (n) Use 'n' map tasks to import in parallel");
System.out.println("");
System.out.println("Output line formatting options:");
System.out.println("--fields-terminated-by (char) Sets the field separator character");
@ -409,8 +415,8 @@ public void parse(String [] args) throws InvalidOptionsException {
} else if (args[i].equals("--columns")) {
String columnString = args[++i];
this.columns = columnString.split(",");
} else if (args[i].equals("--order-by")) {
this.orderByCol = args[++i];
} else if (args[i].equals("--split-by")) {
this.splitByCol = args[++i];
} else if (args[i].equals("--where")) {
this.whereClause = args[++i];
} else if (args[i].equals("--list-tables")) {
@ -441,6 +447,14 @@ public void parse(String [] args) throws InvalidOptionsException {
this.hiveHome = args[++i];
} else if (args[i].equals("--hive-import")) {
this.hiveImport = true;
} else if (args[i].equals("--num-mappers") || args[i].equals("-m")) {
String numMappersStr = args[++i];
try {
this.numMappers = Integer.valueOf(numMappersStr);
} catch (NumberFormatException nfe) {
throw new InvalidOptionsException("Invalid argument; expected "
+ args[i - 1] + " (number).");
}
} else if (args[i].equals("--fields-terminated-by")) {
this.outputFieldDelim = ImportOptions.toChar(args[++i]);
this.areDelimsManuallySet = true;
@ -530,9 +544,9 @@ public void validate() throws InvalidOptionsException {
// If we're reading all tables in a database, can't filter column names.
throw new InvalidOptionsException("--columns and --all-tables are incompatible options."
+ HELP_STR);
} else if (this.allTables && this.orderByCol != null) {
} else if (this.allTables && this.splitByCol != null) {
// If we're reading all tables in a database, can't set pkey
throw new InvalidOptionsException("--order-by and --all-tables are incompatible options."
throw new InvalidOptionsException("--split-by and --all-tables are incompatible options."
+ HELP_STR);
} else if (this.allTables && this.className != null) {
// If we're reading all tables, can't set individual class name
@ -544,6 +558,10 @@ public void validate() throws InvalidOptionsException {
} else if (this.className != null && this.packageName != null) {
throw new InvalidOptionsException(
"--class-name overrides --package-name. You cannot use both." + HELP_STR);
} else if (this.action == ControlAction.FullImport && !this.allTables
&& this.tableName == null) {
throw new InvalidOptionsException(
"One of --table or --all-tables is required for import." + HELP_STR);
}
if (this.hiveImport) {
@ -594,8 +612,8 @@ public String[] getColumns() {
}
}
public String getOrderByCol() {
return orderByCol;
public String getSplitByCol() {
return splitByCol;
}
public String getWhereClause() {
@ -622,6 +640,13 @@ public boolean isDirect() {
return direct;
}
/**
* @return the number of map tasks to use for import
*/
public int getNumMappers() {
return this.numMappers;
}
/**
* @return the user-specified absolute class name for the table
*/

View File

@ -42,6 +42,11 @@ public class Sqoop extends Configured implements Tool {
public static final Log LOG = LogFactory.getLog(Sqoop.class.getName());
/** If this System property is set, always throw an exception, do not just
exit with status 1.
*/
public static final String SQOOP_RETHROW_PROPERTY = "sqoop.throwOnError";
static {
Configuration.addDefaultResource("sqoop-default.xml");
Configuration.addDefaultResource("sqoop-site.xml");
@ -112,7 +117,11 @@ public int run(String [] args) {
manager = new ConnFactory(getConf()).getManager(options);
} catch (Exception e) {
LOG.error("Got error creating database manager: " + e.toString());
return 1;
if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
throw new RuntimeException(e);
} else {
return 1;
}
}
if (options.doHiveImport()) {
@ -167,10 +176,18 @@ public int run(String [] args) {
}
} catch (IOException ioe) {
LOG.error("Encountered IOException running import job: " + ioe.toString());
return 1;
if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
throw new RuntimeException(ioe);
} else {
return 1;
}
} catch (ImportError ie) {
LOG.error("Error during import: " + ie.toString());
return 1;
if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
throw new RuntimeException(ie);
} else {
return 1;
}
}
}

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.sqoop.ImportOptions;
import org.apache.hadoop.sqoop.mapred.ImportJob;
import org.apache.hadoop.sqoop.util.ImportError;
/**
@ -84,5 +85,27 @@ protected Connection makeConnection() throws SQLException {
return connection;
}
/**
* This importTable() implementation continues to use the older DBInputFormat
* because DataDrivenDBInputFormat does not currently work with Oracle.
*/
public void importTable(String tableName, String jarFile, Configuration conf)
throws IOException, ImportError {
ImportJob importer = new ImportJob(options);
String splitCol = options.getSplitByCol();
if (null == splitCol) {
// If the user didn't specify a splitting column, try to infer one.
splitCol = getPrimaryKey(tableName);
}
if (null == splitCol) {
// Can't infer a primary key.
throw new ImportError("No primary key could be found for table " + tableName
+ ". Please specify one with --split-by.");
}
importer.runImport(tableName, jarFile, splitCol, conf);
}
}

View File

@ -19,7 +19,7 @@
package org.apache.hadoop.sqoop.manager;
import org.apache.hadoop.sqoop.ImportOptions;
import org.apache.hadoop.sqoop.mapred.ImportJob;
import org.apache.hadoop.sqoop.mapreduce.DataDrivenImportJob;
import org.apache.hadoop.sqoop.util.ImportError;
import org.apache.hadoop.sqoop.util.ResultSetPrinter;
@ -258,24 +258,24 @@ public String getPrimaryKey(String tableName) {
/**
* Default implementation of importTable() is to launch a MapReduce job
* via ImportJob to read the table with DBInputFormat.
* via DataDrivenImportJob to read the table with DataDrivenDBInputFormat.
*/
public void importTable(String tableName, String jarFile, Configuration conf)
throws IOException, ImportError {
ImportJob importer = new ImportJob(options);
String orderCol = options.getOrderByCol();
if (null == orderCol) {
// If the user didn't specify an ordering column, try to infer one.
orderCol = getPrimaryKey(tableName);
DataDrivenImportJob importer = new DataDrivenImportJob(options);
String splitCol = options.getSplitByCol();
if (null == splitCol) {
// If the user didn't specify a splitting column, try to infer one.
splitCol = getPrimaryKey(tableName);
}
if (null == orderCol) {
if (null == splitCol) {
// Can't infer a primary key.
throw new ImportError("No primary key could be found for table " + tableName
+ ". Please specify one with --order-by.");
+ ". Please specify one with --split-by.");
}
importer.runImport(tableName, jarFile, orderCol, conf);
importer.runImport(tableName, jarFile, splitCol, conf);
}
/**

View File

@ -0,0 +1,185 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.mapreduce;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
/**
* Identity mapper that continuously reports progress via a background thread.
*/
public class AutoProgressMapper<KEYIN, VALIN, KEYOUT, VALOUT>
extends Mapper<KEYIN, VALIN, KEYOUT, VALOUT> {
public static final Log LOG = LogFactory.getLog(AutoProgressMapper.class.getName());
/** Total number of millis for which progress will be reported
by the auto-progress thread. If this is zero, then the auto-progress
thread will never voluntarily exit.
*/
private int maxProgressPeriod;
/** Number of milliseconds to sleep for between loop iterations. Must be less
than report interval.
*/
private int sleepInterval;
/** Number of milliseconds between calls to Reporter.progress(). Should be a multiple
of the sleepInterval.
*/
private int reportInterval;
public static final String MAX_PROGRESS_PERIOD_KEY = "sqoop.mapred.auto.progress.max";
public static final String SLEEP_INTERVAL_KEY = "sqoop.mapred.auto.progress.sleep";
public static final String REPORT_INTERVAL_KEY = "sqoop.mapred.auto.progress.report";
// Sleep for 10 seconds at a time.
static final int DEFAULT_SLEEP_INTERVAL = 10000;
// Report progress every 30 seconds.
static final int DEFAULT_REPORT_INTERVAL = 30000;
// Disable max progress, by default.
static final int DEFAULT_MAX_PROGRESS = 0;
private class ProgressThread extends Thread {
private volatile boolean keepGoing; // while this is true, thread runs.
private Context context;
private long startTimeMillis;
private long lastReportMillis;
public ProgressThread(final Context ctxt) {
this.context = ctxt;
this.keepGoing = true;
}
public void signalShutdown() {
this.keepGoing = false; // volatile update.
this.interrupt();
}
public void run() {
this.lastReportMillis = System.currentTimeMillis();
this.startTimeMillis = this.lastReportMillis;
final long MAX_PROGRESS = AutoProgressMapper.this.maxProgressPeriod;
final long REPORT_INTERVAL = AutoProgressMapper.this.reportInterval;
final long SLEEP_INTERVAL = AutoProgressMapper.this.sleepInterval;
// in a loop:
// * Check that we haven't run for too long (maxProgressPeriod)
// * If it's been a report interval since we last made progress, make more.
// * Sleep for a bit.
// * If the parent thread has signaled for exit, do so.
while (this.keepGoing) {
long curTimeMillis = System.currentTimeMillis();
if (MAX_PROGRESS != 0 && curTimeMillis - this.startTimeMillis > MAX_PROGRESS) {
this.keepGoing = false;
LOG.info("Auto-progress thread exiting after " + MAX_PROGRESS + " ms.");
break;
}
if (curTimeMillis - this.lastReportMillis > REPORT_INTERVAL) {
// It's been a full report interval -- claim progress.
LOG.debug("Auto-progress thread reporting progress");
this.context.progress();
this.lastReportMillis = curTimeMillis;
}
// Unless we got an interrupt while we were working,
// sleep a bit before doing more work.
if (!this.interrupted()) {
try {
Thread.sleep(SLEEP_INTERVAL);
} catch (InterruptedException ie) {
// we were notified on something; not necessarily an error.
}
}
}
LOG.info("Auto-progress thread is finished. keepGoing=" + this.keepGoing);
}
}
/**
* Set configuration parameters for the auto-progress thread.
*/
private final void configureAutoProgress(Configuration job) {
this.maxProgressPeriod = job.getInt(MAX_PROGRESS_PERIOD_KEY, DEFAULT_MAX_PROGRESS);
this.sleepInterval = job.getInt(SLEEP_INTERVAL_KEY, DEFAULT_SLEEP_INTERVAL);
this.reportInterval = job.getInt(REPORT_INTERVAL_KEY, DEFAULT_REPORT_INTERVAL);
if (this.reportInterval < 1) {
LOG.warn("Invalid " + REPORT_INTERVAL_KEY + "; setting to " + DEFAULT_REPORT_INTERVAL);
this.reportInterval = DEFAULT_REPORT_INTERVAL;
}
if (this.sleepInterval > this.reportInterval || this.sleepInterval < 1) {
LOG.warn("Invalid " + SLEEP_INTERVAL_KEY + "; setting to " + DEFAULT_SLEEP_INTERVAL);
this.sleepInterval = DEFAULT_SLEEP_INTERVAL;
}
if (this.maxProgressPeriod < 0) {
LOG.warn("Invalid " + MAX_PROGRESS_PERIOD_KEY + "; setting to " + DEFAULT_MAX_PROGRESS);
this.maxProgressPeriod = DEFAULT_MAX_PROGRESS;
}
}
// map() method intentionally omitted; Mapper.map() is the identity mapper.
/**
* Run the mapping process for this task, wrapped in an auto-progress system.
*/
public void run(Context context) throws IOException, InterruptedException {
configureAutoProgress(context.getConfiguration());
ProgressThread thread = this.new ProgressThread(context);
try {
thread.setDaemon(true);
thread.start();
// use default run() method to actually drive the mapping.
super.run(context);
} finally {
// Tell the progress thread to exit..
LOG.debug("Instructing auto-progress thread to quit.");
thread.signalShutdown();
try {
// And wait for that to happen.
LOG.debug("Waiting for progress thread shutdown...");
thread.join();
LOG.debug("Progress thread shutdown detected.");
} catch (InterruptedException ie) {
LOG.warn("Interrupted when waiting on auto-progress thread: " + ie.toString());
}
}
}
}

View File

@ -0,0 +1,176 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.mapreduce;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.sqoop.ConnFactory;
import org.apache.hadoop.sqoop.ImportOptions;
import org.apache.hadoop.sqoop.manager.ConnManager;
import org.apache.hadoop.sqoop.orm.TableClassName;
import org.apache.hadoop.sqoop.util.ClassLoaderStack;
import org.apache.hadoop.sqoop.util.PerfCounters;
/**
* Actually runs a jdbc import job using the ORM files generated by the sqoop.orm package.
* Uses DataDrivenDBInputFormat
*/
public class DataDrivenImportJob {
public static final Log LOG = LogFactory.getLog(DataDrivenImportJob.class.getName());
private ImportOptions options;
public DataDrivenImportJob(final ImportOptions opts) {
this.options = opts;
}
/**
* Run an import job to read a table in to HDFS
*
* @param tableName the database table to read
* @param ormJarFile the Jar file to insert into the dcache classpath. (may be null)
* @param splitByCol the column of the database table to use to split the import
* @param conf A fresh Hadoop Configuration to use to build an MR job.
*/
public void runImport(String tableName, String ormJarFile, String splitByCol,
Configuration conf) throws IOException {
LOG.info("Beginning data-driven import of " + tableName);
String tableClassName = new TableClassName(options).getClassForTable(tableName);
boolean isLocal = "local".equals(conf.get("mapred.job.tracker"));
ClassLoader prevClassLoader = null;
if (isLocal) {
// If we're using the LocalJobRunner, then instead of using the compiled jar file
// as the job source, we're running in the current thread. Push on another classloader
// that loads from that jar in addition to everything currently on the classpath.
prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile, tableClassName);
}
try {
Job job = new Job(conf);
// Set the external jar to use for the job.
job.getConfiguration().set("mapred.jar", ormJarFile);
String hdfsWarehouseDir = options.getWarehouseDir();
Path outputPath;
if (null != hdfsWarehouseDir) {
Path hdfsWarehousePath = new Path(hdfsWarehouseDir);
hdfsWarehousePath.makeQualified(FileSystem.get(job.getConfiguration()));
outputPath = new Path(hdfsWarehousePath, tableName);
} else {
outputPath = new Path(tableName);
}
if (options.getFileLayout() == ImportOptions.FileLayout.TextFile) {
job.setOutputFormatClass(RawKeyTextOutputFormat.class);
job.setMapperClass(TextImportMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
} else if (options.getFileLayout() == ImportOptions.FileLayout.SequenceFile) {
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(AutoProgressMapper.class);
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
job.getConfiguration().set("mapred.output.value.class", tableClassName);
} else {
LOG.warn("Unknown file layout specified: " + options.getFileLayout() + "; using text.");
}
int numMapTasks = options.getNumMappers();
if (numMapTasks < 1) {
numMapTasks = ImportOptions.DEFAULT_NUM_MAPPERS;
LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
}
job.getConfiguration().setInt("mapred.map.tasks", numMapTasks);
job.setNumReduceTasks(0);
job.setInputFormatClass(DataDrivenDBInputFormat.class);
FileOutputFormat.setOutputPath(job, outputPath);
ConnManager mgr = new ConnFactory(conf).getManager(options);
String username = options.getUsername();
if (null == username || username.length() == 0) {
DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
options.getConnectString());
} else {
DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
options.getConnectString(), username, options.getPassword());
}
String [] colNames = options.getColumns();
if (null == colNames) {
colNames = mgr.getColumnNames(tableName);
}
// It's ok if the where clause is null in DBInputFormat.setInput.
String whereClause = options.getWhereClause();
// We can't set the class properly in here, because we may not have the
// jar loaded in this JVM. So we start by calling setInput() with DBWritable,
// and then overriding the string manually.
DataDrivenDBInputFormat.setInput(job, DBWritable.class, tableName, whereClause,
splitByCol, colNames);
job.getConfiguration().set(DBConfiguration.INPUT_CLASS_PROPERTY, tableClassName);
PerfCounters counters = new PerfCounters();
counters.startClock();
try {
job.waitForCompletion(false);
} catch (InterruptedException ie) {
throw new IOException(ie);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
counters.stopClock();
counters.addBytes(job.getCounters().getGroup("FileSystemCounters")
.findCounter("HDFS_BYTES_WRITTEN").getValue());
LOG.info("Transferred " + counters.toString());
} finally {
if (isLocal && null != prevClassLoader) {
// unload the special classloader for this jar.
ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
}
}
}
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.mapreduce;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.*;
/** An {@link OutputFormat} that writes plain text files.
* Only writes the key. Does not write any delimiter/newline after the key.
*/
public class RawKeyTextOutputFormat<K, V> extends FileOutputFormat<K, V> {
protected static class RawKeyRecordWriter<K, V> extends RecordWriter<K, V> {
private static final String utf8 = "UTF-8";
protected DataOutputStream out;
public RawKeyRecordWriter(DataOutputStream out) {
this.out = out;
}
/**
* Write the object to the byte stream, handling Text as a special
* case.
* @param o the object to print
* @throws IOException if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value) throws IOException {
writeObject(key);
}
public synchronized void close(TaskAttemptContext context) throws IOException {
out.close();
}
}
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException {
boolean isCompressed = getCompressOutput(context);
Configuration conf = context.getConfiguration();
String ext = "";
CompressionCodec codec = null;
if (isCompressed) {
// create the named codec
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(context, GzipCodec.class);
codec = ReflectionUtils.newInstance(codecClass, conf);
ext = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(context, ext);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, false);
DataOutputStream ostream = fileOut;
if (isCompressed) {
ostream = new DataOutputStream(codec.createOutputStream(fileOut));
}
return new RawKeyRecordWriter<K, V>(ostream);
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
/**
* Converts an input record into a string representation and emit it.
*/
public class TextImportMapper
extends AutoProgressMapper<LongWritable, DBWritable, Text, NullWritable> {
private Text outkey;
public TextImportMapper() {
outkey = new Text();
}
public void map(LongWritable key, DBWritable val, Context context)
throws IOException, InterruptedException {
outkey.set(val.toString());
context.write(outkey, NullWritable.get());
}
}

View File

@ -23,7 +23,8 @@
import org.apache.hadoop.sqoop.lib.TestRecordParser;
import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
import org.apache.hadoop.sqoop.manager.TestSqlManager;
import org.apache.hadoop.sqoop.mapred.TestAutoProgressMapRunner;
import org.apache.hadoop.sqoop.mapred.MapredTests;
import org.apache.hadoop.sqoop.mapreduce.MapreduceTests;
import org.apache.hadoop.sqoop.orm.TestClassWriter;
import org.apache.hadoop.sqoop.orm.TestParseMethods;
@ -32,24 +33,22 @@
/**
* All tests for Sqoop (org.apache.hadoop.sqoop)
*
*
*/
public final class AllTests {
public final class AllTests {
private AllTests() { }
public static Test suite() {
TestSuite suite = new TestSuite("Tests for org.apache.hadoop.sqoop");
suite.addTestSuite(TestAutoProgressMapRunner.class);
suite.addTestSuite(TestAllTables.class);
suite.addTestSuite(TestHsqldbManager.class);
suite.addTestSuite(TestSqlManager.class);
suite.addTestSuite(TestClassWriter.class);
suite.addTestSuite(TestColumnTypes.class);
suite.addTestSuite(TestMultiCols.class);
suite.addTestSuite(TestOrderBy.class);
suite.addTestSuite(TestMultiMaps.class);
suite.addTestSuite(TestSplitBy.class);
suite.addTestSuite(TestWhere.class);
suite.addTestSuite(TestHiveImport.class);
suite.addTestSuite(TestRecordParser.class);
@ -58,6 +57,8 @@ public static Test suite() {
suite.addTestSuite(TestParseMethods.class);
suite.addTestSuite(TestConnFactory.class);
suite.addTest(ThirdPartyTests.suite());
suite.addTest(MapredTests.suite());
suite.addTest(MapreduceTests.suite());
return suite;
}

View File

@ -36,9 +36,6 @@
/**
* Test the --all-tables functionality that can import multiple tables.
* ;
*
*
*/
public class TestAllTables extends ImportJobTestCase {
@ -63,6 +60,8 @@ public class TestAllTables extends ImportJobTestCase {
args.add(getWarehouseDir());
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--num-mappers");
args.add("1");
return args.toArray(new String[0]);
}
@ -107,7 +106,7 @@ public void testMultiTableImport() throws IOException {
Path warehousePath = new Path(this.getWarehouseDir());
for (String tableName : this.tableNames) {
Path tablePath = new Path(warehousePath, tableName);
Path filePath = new Path(tablePath, "part-00000");
Path filePath = new Path(tablePath, "part-m-00000");
// dequeue the expected value for this table. This
// list has the same order as the tableNames list.

View File

@ -36,9 +36,6 @@
* write(DataOutput), readFields(DataInput)
* - And optionally, that we can push to the database:
* write(PreparedStatement)
*
*
*
*/
public class TestColumnTypes extends ImportJobTestCase {
@ -217,14 +214,24 @@ public void testTimestamp1() {
@Test
public void testTimestamp2() {
try {
LOG.debug("Beginning testTimestamp2");
verifyType("TIMESTAMP", "'2009-04-24 18:24:00.0002'",
"2009-04-24 18:24:00.000200000",
"2009-04-24 18:24:00.0002");
} finally {
LOG.debug("End testTimestamp2");
}
}
@Test
public void testTimestamp3() {
try {
LOG.debug("Beginning testTimestamp3");
verifyType("TIMESTAMP", "null", null);
} finally {
LOG.debug("End testTimestamp3");
}
}
@Test

View File

@ -0,0 +1,175 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.sqoop.ImportOptions.InvalidOptionsException;
import org.apache.hadoop.sqoop.orm.CompilationManager;
import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
import org.apache.hadoop.sqoop.testutil.SeqFileReader;
import org.apache.hadoop.sqoop.util.ClassLoaderStack;
/**
* Test that using multiple mapper splits works.
*/
public class TestMultiMaps extends ImportJobTestCase {
/**
* Create the argv to pass to Sqoop
* @return the argv as an array of strings.
*/
private String [] getArgv(boolean includeHadoopFlags, String [] colNames, String splitByCol) {
String columnsString = "";
for (String col : colNames) {
columnsString += col + ",";
}
ArrayList<String> args = new ArrayList<String>();
if (includeHadoopFlags) {
args.add("-D");
args.add("mapred.job.tracker=local");
args.add("-D");
args.add("fs.default.name=file:///");
}
args.add("--table");
args.add(HsqldbTestServer.getTableName());
args.add("--columns");
args.add(columnsString);
args.add("--split-by");
args.add(splitByCol);
args.add("--warehouse-dir");
args.add(getWarehouseDir());
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--as-sequencefile");
args.add("--num-mappers");
args.add("2");
return args.toArray(new String[0]);
}
// this test just uses the two int table.
protected String getTableName() {
return HsqldbTestServer.getTableName();
}
/** @return a list of Path objects for each data file */
protected List<Path> getDataFilePaths() throws IOException {
List<Path> paths = new ArrayList<Path>();
Configuration conf = new Configuration();
conf.set("fs.default.name", "file:///");
FileSystem fs = FileSystem.get(conf);
FileStatus [] stats = fs.listStatus(getTablePath());
for (FileStatus stat : stats) {
paths.add(stat.getPath());
}
return paths;
}
/**
* Given a comma-delimited list of integers, grab and parse the first int
* @param str a comma-delimited list of values, the first of which is an int.
* @return the first field in the string, cast to int
*/
private int getFirstInt(String str) {
String [] parts = str.split(",");
return Integer.parseInt(parts[0]);
}
public void runMultiMapTest(String splitByCol, int expectedSum)
throws IOException {
String [] columns = HsqldbTestServer.getFieldNames();
ClassLoader prevClassLoader = null;
SequenceFile.Reader reader = null;
String [] argv = getArgv(true, columns, splitByCol);
runImport(argv);
try {
ImportOptions opts = new ImportOptions();
opts.parse(getArgv(false, columns, splitByCol));
CompilationManager compileMgr = new CompilationManager(opts);
String jarFileName = compileMgr.getJarFilename();
prevClassLoader = ClassLoaderStack.addJarFile(jarFileName, getTableName());
List<Path> paths = getDataFilePaths();
Configuration conf = new Configuration();
int curSum = 0;
assertTrue("Found only " + paths.size() + " path(s); expected > 1.", paths.size() > 1);
// We expect multiple files. We need to open all the files and sum up the
// first column across all of them.
for (Path p : paths) {
reader = SeqFileReader.getSeqFileReader(p.toString());
// here we can actually instantiate (k, v) pairs.
Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf);
// We know that these values are two ints separated by a ',' character.
// Since this is all dynamic, though, we don't want to actually link against
// the class and use its methods. So we just parse this back into int fields manually.
// Sum them up and ensure that we get the expected total for the first column, to
// verify that we got all the results from the db into the file.
// now sum up everything in the file.
while (reader.next(key) != null) {
reader.getCurrentValue(val);
curSum += getFirstInt(val.toString());
}
IOUtils.closeStream(reader);
reader = null;
}
assertEquals("Total sum of first db column mismatch", expectedSum, curSum);
} catch (InvalidOptionsException ioe) {
fail(ioe.toString());
} finally {
IOUtils.closeStream(reader);
if (null != prevClassLoader) {
ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
}
}
}
public void testSplitByFirstCol() throws IOException {
runMultiMapTest("INTFIELD1", HsqldbTestServer.getFirstColSum());
}
}

View File

@ -34,15 +34,15 @@
import org.apache.hadoop.sqoop.util.ClassLoaderStack;
/**
* Test that --order-by works
* Test that --split-by works
*/
public class TestOrderBy extends ImportJobTestCase {
public class TestSplitBy extends ImportJobTestCase {
/**
* Create the argv to pass to Sqoop
* @return the argv as an array of strings.
*/
private String [] getArgv(boolean includeHadoopFlags, String [] colNames, String orderByCol) {
private String [] getArgv(boolean includeHadoopFlags, String [] colNames, String splitByCol) {
String columnsString = "";
for (String col : colNames) {
columnsString += col + ",";
@ -63,13 +63,15 @@ public class TestOrderBy extends ImportJobTestCase {
args.add(HsqldbTestServer.getTableName());
args.add("--columns");
args.add(columnsString);
args.add("--order-by");
args.add(orderByCol);
args.add("--split-by");
args.add(splitByCol);
args.add("--warehouse-dir");
args.add(getWarehouseDir());
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--as-sequencefile");
args.add("--num-mappers");
args.add("1");
return args.toArray(new String[0]);
}
@ -90,18 +92,18 @@ private int getFirstInt(String str) {
return Integer.parseInt(parts[0]);
}
public void runOrderByTest(String orderByCol, String firstValStr, int expectedSum)
public void runSplitByTest(String splitByCol, int expectedSum)
throws IOException {
String [] columns = HsqldbTestServer.getFieldNames();
ClassLoader prevClassLoader = null;
SequenceFile.Reader reader = null;
String [] argv = getArgv(true, columns, orderByCol);
String [] argv = getArgv(true, columns, splitByCol);
runImport(argv);
try {
ImportOptions opts = new ImportOptions();
opts.parse(getArgv(false, columns, orderByCol));
opts.parse(getArgv(false, columns, splitByCol));
CompilationManager compileMgr = new CompilationManager(opts);
String jarFileName = compileMgr.getJarFilename();
@ -115,22 +117,14 @@ public void runOrderByTest(String orderByCol, String firstValStr, int expectedSu
Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf);
if (reader.next(key) == null) {
fail("Empty SequenceFile during import");
}
// make sure that the value we think should be at the top, is.
reader.getCurrentValue(val);
assertEquals("Invalid ordering within sorted SeqFile", firstValStr, val.toString());
// We know that these values are two ints separated by a ',' character.
// Since this is all dynamic, though, we don't want to actually link against
// the class and use its methods. So we just parse this back into int fields manually.
// Sum them up and ensure that we get the expected total for the first column, to
// verify that we got all the results from the db into the file.
int curSum = getFirstInt(val.toString());
// now sum up everything else in the file.
// Sum up everything in the file.
int curSum = 0;
while (reader.next(key) != null) {
reader.getCurrentValue(val);
curSum += getFirstInt(val.toString());
@ -148,13 +142,13 @@ public void runOrderByTest(String orderByCol, String firstValStr, int expectedSu
}
}
public void testOrderByFirstCol() throws IOException {
String orderByCol = "INTFIELD1";
runOrderByTest(orderByCol, "1,8\n", HsqldbTestServer.getFirstColSum());
public void testSplitByFirstCol() throws IOException {
String splitByCol = "INTFIELD1";
runSplitByTest(splitByCol, HsqldbTestServer.getFirstColSum());
}
public void testOrderBySecondCol() throws IOException {
String orderByCol = "INTFIELD2";
runOrderByTest(orderByCol, "7,2\n", HsqldbTestServer.getFirstColSum());
public void testSplitBySecondCol() throws IOException {
String splitByCol = "INTFIELD2";
runSplitByTest(splitByCol, HsqldbTestServer.getFirstColSum());
}
}

View File

@ -68,13 +68,15 @@ public class TestWhere extends ImportJobTestCase {
args.add(columnsString);
args.add("--where");
args.add(whereClause);
args.add("--order-by");
args.add("--split-by");
args.add("INTFIELD1");
args.add("--warehouse-dir");
args.add(getWarehouseDir());
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--as-sequencefile");
args.add("--num-mappers");
args.add("1");
return args.toArray(new String[0]);
}

View File

@ -61,8 +61,10 @@ public class TestHiveImport extends ImportJobTestCase {
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--hive-import");
args.add("--order-by");
args.add("--split-by");
args.add(getColNames()[0]);
args.add("--num-mappers");
args.add("1");
if (null != moreArgs) {
for (String arg: moreArgs) {

View File

@ -198,6 +198,8 @@ private String getCurrentUser() {
args.add(getCurrentUser());
args.add("--where");
args.add("id > 1");
args.add("--num-mappers");
args.add("1");
if (mysqlOutputDelims) {
args.add("--mysql-delimiters");

View File

@ -150,6 +150,8 @@ public void tearDown() {
args.add("--password");
args.add(AUTH_TEST_PASS);
args.add("--mysql-delimiters");
args.add("--num-mappers");
args.add("1");
return args.toArray(new String[0]);
}

View File

@ -156,6 +156,8 @@ public void tearDown() {
args.add(ORACLE_USER_NAME);
args.add("--password");
args.add(ORACLE_USER_PASS);
args.add("--num-mappers");
args.add("1");
return args.toArray(new String[0]);
}

View File

@ -181,7 +181,7 @@ private void doImportAndVerify(boolean isDirect, String [] expectedResults)
if (isDirect) {
filePath = new Path(tablePath, "data-00000");
} else {
filePath = new Path(tablePath, "part-00000");
filePath = new Path(tablePath, "part-m-00000");
}
File tableFile = new File(tablePath.toString());

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.mapred;
import junit.framework.Test;
import junit.framework.TestSuite;
/**
* All tests for Sqoop old mapred-api (org.apache.hadoop.sqoop.mapred)
*/
public final class MapredTests {
private MapredTests() { }
public static Test suite() {
TestSuite suite = new TestSuite("Tests for org.apache.hadoop.sqoop.mapred");
suite.addTestSuite(TestAutoProgressMapRunner.class);
return suite;
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.mapreduce;
import junit.framework.Test;
import junit.framework.TestSuite;
/**
* All tests for Sqoop new mapreduce-api (org.apache.hadoop.sqoop.mapreduce)
*/
public final class MapreduceTests {
private MapreduceTests() { }
public static Test suite() {
TestSuite suite = new TestSuite("Tests for org.apache.hadoop.sqoop.mapreduce");
suite.addTestSuite(TestTextImportMapper.class);
return suite;
}
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import junit.framework.TestCase;
/**
* Test the TextImportMapper
*/
public class TestTextImportMapper extends TestCase {
static class DummyDBWritable implements DBWritable {
long field;
public DummyDBWritable(final long val) {
this.field = val;
}
public void readFields(DataInput in) throws IOException {
field = in.readLong();
}
public void write(DataOutput out) throws IOException {
out.writeLong(field);
}
public void readFields(ResultSet rs) throws SQLException {
field = rs.getLong(1);
}
public void write(PreparedStatement s) throws SQLException {
s.setLong(1, field);
}
public String toString() {
return "" + field;
}
}
public void testTextImport() {
TextImportMapper m = new TextImportMapper();
MapDriver<LongWritable, DBWritable, Text, NullWritable> driver =
new MapDriver<LongWritable, DBWritable, Text, NullWritable>(m);
driver.withInput(new LongWritable(0), new DummyDBWritable(42))
.withOutput(new Text("42"), NullWritable.get())
.runTest();
}
}

View File

@ -73,8 +73,8 @@ public class TestParseMethods extends ImportJobTestCase {
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--as-textfile");
args.add("--order-by");
args.add("DATA_COL0"); // always order by first column.
args.add("--split-by");
args.add("DATA_COL0"); // always split by first column.
args.add("--fields-terminated-by");
args.add(fieldTerminator);
args.add("--lines-terminated-by");
@ -87,7 +87,8 @@ public class TestParseMethods extends ImportJobTestCase {
args.add("--optionally-enclosed-by");
}
args.add(encloser);
args.add("--num-mappers");
args.add("1");
return args.toArray(new String[0]);
}

View File

@ -277,7 +277,7 @@ protected void verifyReadback(int colNum, String expectedVal) {
colNames = getColNames();
}
String orderByCol = colNames[0];
String splitByCol = colNames[0];
String columnsString = "";
for (String col : colNames) {
columnsString += col + ",";
@ -298,13 +298,15 @@ protected void verifyReadback(int colNum, String expectedVal) {
args.add(getTableName());
args.add("--columns");
args.add(columnsString);
args.add("--order-by");
args.add(orderByCol);
args.add("--split-by");
args.add(splitByCol);
args.add("--warehouse-dir");
args.add(getWarehouseDir());
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--as-sequencefile");
args.add("--num-mappers");
args.add("1");
return args.toArray(new String[0]);
}
@ -316,7 +318,7 @@ protected Path getTablePath() {
}
protected Path getDataFilePath() {
return new Path(getTablePath(), "part-00000");
return new Path(getTablePath(), "part-m-00000");
}
protected void removeTableDir() {
@ -350,8 +352,7 @@ protected void verifyImport(String expectedVal, String [] importCols) {
ret = ToolRunner.run(importer, getArgv(true, importCols));
} catch (Exception e) {
LOG.error("Got exception running Sqoop: " + e.toString());
e.printStackTrace();
ret = 1;
throw new RuntimeException(e);
}
// expect a successful return.