5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 06:39:54 +08:00

MAPREDUCE-1169. Improvements to mysqldump use in Sqoop. Contributed by Aaron Kimball.

From: Thomas White <tomwhite@apache.org>

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149840 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Bayer 2011-07-22 20:03:28 +00:00
parent 84adbeea26
commit ec8f687d97
7 changed files with 155 additions and 34 deletions

View File

@ -163,6 +163,14 @@ no import or code generation is performed.
--list-tables::
List tables in database and exit
Database-specific options
~~~~~~~~~~~~~~~~~~~~~~~~~
Additional arguments may be passed to the database manager
after a lone '-' on the command-line.
In MySQL direct mode, additional arguments are passed directly to
mysqldump.
ENVIRONMENT
-----------

View File

@ -18,16 +18,15 @@
Direct-mode Imports
------------------
-------------------
While the JDBC-based import method used by Sqoop provides it with the
ability to read from a variety of databases using a generic driver, it
is not the most high-performance method available. Sqoop can read from
certain database systems faster by using their built-in export tools.
For example, Sqoop can read from a local MySQL database by using the +mysqldump+
tool distributed with MySQL. If you run Sqoop on the same machine where a
MySQL database is present, you can take advantage of this faster
For example, Sqoop can read from a MySQL database by using the +mysqldump+
tool distributed with MySQL. You can take advantage of this faster
import method by running Sqoop with the +--direct+ argument. This
combined with a connect string that begins with +jdbc:mysql://+ will
inform Sqoop that it should select the faster access method.
@ -58,4 +57,21 @@ each. If compressing the HDFS files with +--compress+, this will allow
subsequent MapReduce programs to use multiple mappers across your data
in parallel.
Tool-specific arguments
~~~~~~~~~~~~~~~~~~~~~~~
Sqoop will generate a set of command-line arguments with which it invokes
the underlying direct-mode tool (e.g., mysqldump). You can specify additional
arguments which should be passed to the tool by passing them to Sqoop
after a single '+-+' argument. e.g.:
----
$ sqoop --connect jdbc:mysql://localhost/db --table foo --direct - --lock-tables
----
The +--lock-tables+ argument (and anything else to the right of the +-+ argument)
will be passed directly to mysqldump.

View File

@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.commons.logging.Log;
@ -124,6 +125,8 @@ public enum FileLayout {
private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
private String [] extraArgs;
public ImportOptions() {
initDefaults();
}
@ -254,6 +257,8 @@ private void initDefaults() {
this.conf = new Configuration();
this.extraArgs = null;
loadFromProperties();
}
@ -327,6 +332,10 @@ public static void printUsage() {
System.out.println("--list-databases List all databases available and exit");
System.out.println("--debug-sql (statement) Execute 'statement' in SQL and exit");
System.out.println("");
System.out.println("Database-specific options:");
System.out.println("Arguments may be passed to the database manager after a lone '-':");
System.out.println(" MySQL direct mode: arguments passed directly to mysqldump");
System.out.println("");
System.out.println("Generic Hadoop command-line options:");
ToolRunner.printGenericCommandUsage(System.out);
System.out.println("");
@ -546,6 +555,13 @@ public void parse(String [] args) throws InvalidOptionsException {
} else if (args[i].equals("--help")) {
printUsage();
throw new InvalidOptionsException("");
} else if (args[i].equals("-")) {
// Everything after a '--' goes into extraArgs.
ArrayList<String> extra = new ArrayList<String>();
for (i++; i < args.length; i++) {
extra.add(args[i]);
}
this.extraArgs = extra.toArray(new String[0]);
} else {
throw new InvalidOptionsException("Invalid argument: " + args[i] + ".\n"
+ "Try --help for usage.");
@ -882,4 +898,19 @@ public Configuration getConf() {
public void setConf(Configuration config) {
this.conf = config;
}
/**
* @return command-line arguments after a '-'
*/
public String [] getExtraArgs() {
if (extraArgs == null) {
return null;
}
String [] out = new String[extraArgs.length];
for (int i = 0; i < extraArgs.length; i++) {
out[i] = extraArgs[i];
}
return out;
}
}

View File

@ -27,8 +27,6 @@
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
@ -38,14 +36,15 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.sqoop.ImportOptions;
import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
import org.apache.hadoop.sqoop.util.AsyncSink;
import org.apache.hadoop.sqoop.util.DirectImportUtils;
import org.apache.hadoop.sqoop.util.ErrorableAsyncSink;
import org.apache.hadoop.sqoop.util.ErrorableThread;
import org.apache.hadoop.sqoop.util.Executor;
import org.apache.hadoop.sqoop.util.ImportError;
import org.apache.hadoop.sqoop.util.JdbcUrl;
import org.apache.hadoop.sqoop.util.LoggingAsyncSink;
import org.apache.hadoop.sqoop.util.PerfCounters;
import org.apache.hadoop.sqoop.util.AsyncSink;
/**
* Manages direct dumps from Postgresql databases via psql COPY TO STDOUT
@ -273,23 +272,6 @@ private String writePasswordFile(String password) throws IOException {
return tempFile.toString();
}
/** @return true if someHost refers to localhost.
*/
private boolean isLocalhost(String someHost) {
if (null == someHost) {
return false;
}
try {
InetAddress localHostAddr = InetAddress.getLocalHost();
InetAddress someAddr = InetAddress.getByName(someHost);
return localHostAddr.equals(someAddr);
} catch (UnknownHostException uhe) {
return false;
}
}
@Override
/**
* Import the table into HDFS by using psql to pull the data out of the db
@ -315,6 +297,7 @@ public void importTable(ImportJobContext context)
String passwordFilename = null;
Process p = null;
AsyncSink sink = null;
AsyncSink errSink = null;
PerfCounters counters = new PerfCounters();
try {
@ -361,7 +344,7 @@ public void importTable(ImportJobContext context)
}
}
if (!isLocalhost(hostname) || port != -1) {
if (!DirectImportUtils.isLocalhost(hostname) || port != -1) {
args.add("--host");
args.add(hostname);
args.add("--port");
@ -397,6 +380,8 @@ public void importTable(ImportJobContext context)
LOG.debug("Starting stream sink");
counters.startClock();
sink.processStream(is);
errSink = new LoggingAsyncSink(LOG);
errSink.processStream(p.getErrorStream());
} finally {
// block until the process is done.
LOG.debug("Waiting for process completion");
@ -444,6 +429,18 @@ public void importTable(ImportJobContext context)
}
}
// Attempt to block for stderr stream sink; errors are advisory.
if (null != errSink) {
try {
if (0 != errSink.join()) {
LOG.info("Encountered exception reading stderr stream");
}
} catch (InterruptedException ie) {
LOG.info("Thread interrupted waiting for stderr to complete: "
+ ie.toString());
}
}
LOG.info("Transfer loop complete.");
if (0 != result) {

View File

@ -38,13 +38,14 @@
import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
import org.apache.hadoop.sqoop.lib.FieldFormatter;
import org.apache.hadoop.sqoop.lib.RecordParser;
import org.apache.hadoop.sqoop.util.AsyncSink;
import org.apache.hadoop.sqoop.util.DirectImportUtils;
import org.apache.hadoop.sqoop.util.ErrorableAsyncSink;
import org.apache.hadoop.sqoop.util.ErrorableThread;
import org.apache.hadoop.sqoop.util.ImportError;
import org.apache.hadoop.sqoop.util.JdbcUrl;
import org.apache.hadoop.sqoop.util.LoggingAsyncSink;
import org.apache.hadoop.sqoop.util.PerfCounters;
import org.apache.hadoop.sqoop.util.AsyncSink;
/**
* Manages direct connections to MySQL databases
@ -365,6 +366,8 @@ public void importTable(ImportJobContext context)
// (everything before '://') and replace it with 'http', which we know will work.
String connectString = options.getConnectString();
String databaseName = JdbcUrl.getDatabaseName(connectString);
String hostname = JdbcUrl.getHostName(connectString);
int port = JdbcUrl.getPort(connectString);
if (null == databaseName) {
throw new ImportError("Could not determine database name");
@ -378,6 +381,7 @@ public void importTable(ImportJobContext context)
Process p = null;
AsyncSink sink = null;
AsyncSink errSink = null;
PerfCounters counters = new PerfCounters();
try {
// --defaults-file must be the first argument.
@ -394,20 +398,31 @@ public void importTable(ImportJobContext context)
args.add(whereClause);
}
if (!DirectImportUtils.isLocalhost(hostname) || port != -1) {
args.add("--host=" + hostname);
args.add("--port=" + Integer.toString(port));
}
args.add("--skip-opt");
args.add("--compact");
args.add("--no-create-db");
args.add("--no-create-info");
args.add("--quick"); // no buffering
// TODO(aaron): Add a flag to allow --lock-tables instead for MyISAM data
args.add("--single-transaction");
// TODO(aaron): Add --host and --port arguments to support remote direct connects.
String username = options.getUsername();
if (null != username) {
args.add("--user=" + username);
}
// If the user supplied extra args, add them here.
String [] extra = options.getExtraArgs();
if (null != extra) {
for (String arg : extra) {
args.add(arg);
}
}
args.add(databaseName);
args.add(tableName);
@ -442,6 +457,10 @@ public void importTable(ImportJobContext context)
// Start an async thread to read and upload the whole stream.
counters.startClock();
sink.processStream(is);
// Start an async thread to send stderr to log4j.
errSink = new LoggingAsyncSink(LOG);
errSink.processStream(p.getErrorStream());
} finally {
// block until the process is done.
@ -482,6 +501,18 @@ public void importTable(ImportJobContext context)
}
}
// Try to wait for stderr to finish, but regard any errors as advisory.
if (null != errSink) {
try {
if (0 != errSink.join()) {
LOG.info("Encountered exception reading stderr stream");
}
} catch (InterruptedException ie) {
LOG.info("Thread interrupted waiting for stderr to complete: "
+ ie.toString());
}
}
LOG.info("Transfer loop complete.");
if (0 != result) {

View File

@ -20,6 +20,8 @@
import java.io.IOException;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -88,5 +90,22 @@ public static SplittableBufferedWriter createHdfsSink(Configuration conf,
new SplittingOutputStream(conf, destDir, "data-",
options.getDirectSplitSize(), options.shouldUseCompression()));
}
/** @return true if someHost refers to localhost.
*/
public static boolean isLocalhost(String someHost) {
if (null == someHost) {
return false;
}
try {
InetAddress localHostAddr = InetAddress.getLocalHost();
InetAddress someAddr = InetAddress.getByName(someHost);
return localHostAddr.equals(someAddr);
} catch (UnknownHostException uhe) {
return false;
}
}
}

View File

@ -181,7 +181,7 @@ private String getCurrentUser() {
}
}
private String [] getArgv(boolean mysqlOutputDelims) {
private String [] getArgv(boolean mysqlOutputDelims, String... extraArgs) {
ArrayList<String> args = new ArrayList<String>();
args.add("-D");
@ -205,11 +205,17 @@ private String getCurrentUser() {
args.add("--mysql-delimiters");
}
if (null != extraArgs) {
for (String arg : extraArgs) {
args.add(arg);
}
}
return args.toArray(new String[0]);
}
private void doLocalBulkImport(boolean mysqlOutputDelims, String [] expectedResults)
throws IOException {
private void doLocalBulkImport(boolean mysqlOutputDelims,
String [] expectedResults, String [] extraArgs) throws IOException {
Path warehousePath = new Path(this.getWarehouseDir());
Path tablePath = new Path(warehousePath, TABLE_NAME);
@ -221,7 +227,7 @@ private void doLocalBulkImport(boolean mysqlOutputDelims, String [] expectedResu
FileListing.recursiveDeleteDir(tableFile);
}
String [] argv = getArgv(mysqlOutputDelims);
String [] argv = getArgv(mysqlOutputDelims, extraArgs);
try {
runImport(argv);
} catch (IOException ioe) {
@ -256,7 +262,20 @@ public void testLocalBulkImportWithDefaultDelims() throws IOException {
"3,Fred,2009-01-23,15,marketing"
};
doLocalBulkImport(false, expectedResults);
doLocalBulkImport(false, expectedResults, null);
}
@Test
public void testWithExtraParams() throws IOException {
// no quoting of strings allowed.
String [] expectedResults = {
"2,Bob,2009-04-20,400,sales",
"3,Fred,2009-01-23,15,marketing"
};
String [] extraArgs = { "-", "--lock-tables" };
doLocalBulkImport(false, expectedResults, extraArgs);
}
@Test
@ -267,6 +286,6 @@ public void testLocalBulkImportWithMysqlQuotes() throws IOException {
"3,'Fred','2009-01-23',15,'marketing'"
};
doLocalBulkImport(true, expectedResults);
doLocalBulkImport(true, expectedResults, null);
}
}