From ec8f687d9743eb097449a57f2921cf4d3b308f54 Mon Sep 17 00:00:00 2001 From: Andrew Bayer Date: Fri, 22 Jul 2011 20:03:28 +0000 Subject: [PATCH] MAPREDUCE-1169. Improvements to mysqldump use in Sqoop. Contributed by Aaron Kimball. From: Thomas White git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149840 13f79535-47bb-0310-9956-ffa450edef68 --- doc/Sqoop-manpage.txt | 8 ++++ doc/direct.txt | 24 ++++++++++-- .../apache/hadoop/sqoop/ImportOptions.java | 31 +++++++++++++++ .../manager/DirectPostgresqlManager.java | 39 +++++++++---------- .../sqoop/manager/LocalMySQLManager.java | 37 ++++++++++++++++-- .../hadoop/sqoop/util/DirectImportUtils.java | 19 +++++++++ .../hadoop/sqoop/manager/LocalMySQLTest.java | 31 ++++++++++++--- 7 files changed, 155 insertions(+), 34 deletions(-) diff --git a/doc/Sqoop-manpage.txt b/doc/Sqoop-manpage.txt index 203f4552..d4a3d668 100644 --- a/doc/Sqoop-manpage.txt +++ b/doc/Sqoop-manpage.txt @@ -163,6 +163,14 @@ no import or code generation is performed. --list-tables:: List tables in database and exit +Database-specific options +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Additional arguments may be passed to the database manager +after a lone '-' on the command-line. + +In MySQL direct mode, additional arguments are passed directly to +mysqldump. ENVIRONMENT ----------- diff --git a/doc/direct.txt b/doc/direct.txt index 4601f113..4c020689 100644 --- a/doc/direct.txt +++ b/doc/direct.txt @@ -18,16 +18,15 @@ Direct-mode Imports ------------------- +------------------- While the JDBC-based import method used by Sqoop provides it with the ability to read from a variety of databases using a generic driver, it is not the most high-performance method available. Sqoop can read from certain database systems faster by using their built-in export tools. -For example, Sqoop can read from a local MySQL database by using the +mysqldump+ -tool distributed with MySQL. If you run Sqoop on the same machine where a -MySQL database is present, you can take advantage of this faster +For example, Sqoop can read from a MySQL database by using the +mysqldump+ +tool distributed with MySQL. You can take advantage of this faster import method by running Sqoop with the +--direct+ argument. This combined with a connect string that begins with +jdbc:mysql://+ will inform Sqoop that it should select the faster access method. @@ -58,4 +57,21 @@ each. If compressing the HDFS files with +--compress+, this will allow subsequent MapReduce programs to use multiple mappers across your data in parallel. +Tool-specific arguments +~~~~~~~~~~~~~~~~~~~~~~~ + +Sqoop will generate a set of command-line arguments with which it invokes +the underlying direct-mode tool (e.g., mysqldump). You can specify additional +arguments which should be passed to the tool by passing them to Sqoop +after a single '+-+' argument. e.g.: + +---- +$ sqoop --connect jdbc:mysql://localhost/db --table foo --direct - --lock-tables +---- + +The +--lock-tables+ argument (and anything else to the right of the +-+ argument) +will be passed directly to mysqldump. + + + diff --git a/src/java/org/apache/hadoop/sqoop/ImportOptions.java b/src/java/org/apache/hadoop/sqoop/ImportOptions.java index 591193d2..6eacbc78 100644 --- a/src/java/org/apache/hadoop/sqoop/ImportOptions.java +++ b/src/java/org/apache/hadoop/sqoop/ImportOptions.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Arrays; +import java.util.ArrayList; import java.util.Properties; import org.apache.commons.logging.Log; @@ -124,6 +125,8 @@ public enum FileLayout { private static final String DEFAULT_CONFIG_FILE = "sqoop.properties"; + private String [] extraArgs; + public ImportOptions() { initDefaults(); } @@ -254,6 +257,8 @@ private void initDefaults() { this.conf = new Configuration(); + this.extraArgs = null; + loadFromProperties(); } @@ -327,6 +332,10 @@ public static void printUsage() { System.out.println("--list-databases List all databases available and exit"); System.out.println("--debug-sql (statement) Execute 'statement' in SQL and exit"); System.out.println(""); + System.out.println("Database-specific options:"); + System.out.println("Arguments may be passed to the database manager after a lone '-':"); + System.out.println(" MySQL direct mode: arguments passed directly to mysqldump"); + System.out.println(""); System.out.println("Generic Hadoop command-line options:"); ToolRunner.printGenericCommandUsage(System.out); System.out.println(""); @@ -546,6 +555,13 @@ public void parse(String [] args) throws InvalidOptionsException { } else if (args[i].equals("--help")) { printUsage(); throw new InvalidOptionsException(""); + } else if (args[i].equals("-")) { + // Everything after a '--' goes into extraArgs. + ArrayList extra = new ArrayList(); + for (i++; i < args.length; i++) { + extra.add(args[i]); + } + this.extraArgs = extra.toArray(new String[0]); } else { throw new InvalidOptionsException("Invalid argument: " + args[i] + ".\n" + "Try --help for usage."); @@ -882,4 +898,19 @@ public Configuration getConf() { public void setConf(Configuration config) { this.conf = config; } + + /** + * @return command-line arguments after a '-' + */ + public String [] getExtraArgs() { + if (extraArgs == null) { + return null; + } + + String [] out = new String[extraArgs.length]; + for (int i = 0; i < extraArgs.length; i++) { + out[i] = extraArgs[i]; + } + return out; + } } diff --git a/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java b/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java index 53a5c5a5..a3f9454d 100644 --- a/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java +++ b/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java @@ -27,8 +27,6 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; @@ -38,14 +36,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.sqoop.ImportOptions; import org.apache.hadoop.sqoop.io.SplittableBufferedWriter; +import org.apache.hadoop.sqoop.util.AsyncSink; import org.apache.hadoop.sqoop.util.DirectImportUtils; import org.apache.hadoop.sqoop.util.ErrorableAsyncSink; import org.apache.hadoop.sqoop.util.ErrorableThread; import org.apache.hadoop.sqoop.util.Executor; import org.apache.hadoop.sqoop.util.ImportError; import org.apache.hadoop.sqoop.util.JdbcUrl; +import org.apache.hadoop.sqoop.util.LoggingAsyncSink; import org.apache.hadoop.sqoop.util.PerfCounters; -import org.apache.hadoop.sqoop.util.AsyncSink; /** * Manages direct dumps from Postgresql databases via psql COPY TO STDOUT @@ -273,23 +272,6 @@ private String writePasswordFile(String password) throws IOException { return tempFile.toString(); } - /** @return true if someHost refers to localhost. - */ - private boolean isLocalhost(String someHost) { - if (null == someHost) { - return false; - } - - try { - InetAddress localHostAddr = InetAddress.getLocalHost(); - InetAddress someAddr = InetAddress.getByName(someHost); - - return localHostAddr.equals(someAddr); - } catch (UnknownHostException uhe) { - return false; - } - } - @Override /** * Import the table into HDFS by using psql to pull the data out of the db @@ -315,6 +297,7 @@ public void importTable(ImportJobContext context) String passwordFilename = null; Process p = null; AsyncSink sink = null; + AsyncSink errSink = null; PerfCounters counters = new PerfCounters(); try { @@ -361,7 +344,7 @@ public void importTable(ImportJobContext context) } } - if (!isLocalhost(hostname) || port != -1) { + if (!DirectImportUtils.isLocalhost(hostname) || port != -1) { args.add("--host"); args.add(hostname); args.add("--port"); @@ -397,6 +380,8 @@ public void importTable(ImportJobContext context) LOG.debug("Starting stream sink"); counters.startClock(); sink.processStream(is); + errSink = new LoggingAsyncSink(LOG); + errSink.processStream(p.getErrorStream()); } finally { // block until the process is done. LOG.debug("Waiting for process completion"); @@ -444,6 +429,18 @@ public void importTable(ImportJobContext context) } } + // Attempt to block for stderr stream sink; errors are advisory. + if (null != errSink) { + try { + if (0 != errSink.join()) { + LOG.info("Encountered exception reading stderr stream"); + } + } catch (InterruptedException ie) { + LOG.info("Thread interrupted waiting for stderr to complete: " + + ie.toString()); + } + } + LOG.info("Transfer loop complete."); if (0 != result) { diff --git a/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java b/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java index 9f319d23..263f3eb7 100644 --- a/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java +++ b/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java @@ -38,13 +38,14 @@ import org.apache.hadoop.sqoop.io.SplittableBufferedWriter; import org.apache.hadoop.sqoop.lib.FieldFormatter; import org.apache.hadoop.sqoop.lib.RecordParser; +import org.apache.hadoop.sqoop.util.AsyncSink; import org.apache.hadoop.sqoop.util.DirectImportUtils; import org.apache.hadoop.sqoop.util.ErrorableAsyncSink; import org.apache.hadoop.sqoop.util.ErrorableThread; import org.apache.hadoop.sqoop.util.ImportError; import org.apache.hadoop.sqoop.util.JdbcUrl; +import org.apache.hadoop.sqoop.util.LoggingAsyncSink; import org.apache.hadoop.sqoop.util.PerfCounters; -import org.apache.hadoop.sqoop.util.AsyncSink; /** * Manages direct connections to MySQL databases @@ -365,6 +366,8 @@ public void importTable(ImportJobContext context) // (everything before '://') and replace it with 'http', which we know will work. String connectString = options.getConnectString(); String databaseName = JdbcUrl.getDatabaseName(connectString); + String hostname = JdbcUrl.getHostName(connectString); + int port = JdbcUrl.getPort(connectString); if (null == databaseName) { throw new ImportError("Could not determine database name"); @@ -378,6 +381,7 @@ public void importTable(ImportJobContext context) Process p = null; AsyncSink sink = null; + AsyncSink errSink = null; PerfCounters counters = new PerfCounters(); try { // --defaults-file must be the first argument. @@ -394,20 +398,31 @@ public void importTable(ImportJobContext context) args.add(whereClause); } + if (!DirectImportUtils.isLocalhost(hostname) || port != -1) { + args.add("--host=" + hostname); + args.add("--port=" + Integer.toString(port)); + } + args.add("--skip-opt"); args.add("--compact"); args.add("--no-create-db"); args.add("--no-create-info"); args.add("--quick"); // no buffering - // TODO(aaron): Add a flag to allow --lock-tables instead for MyISAM data args.add("--single-transaction"); - // TODO(aaron): Add --host and --port arguments to support remote direct connects. String username = options.getUsername(); if (null != username) { args.add("--user=" + username); } + // If the user supplied extra args, add them here. + String [] extra = options.getExtraArgs(); + if (null != extra) { + for (String arg : extra) { + args.add(arg); + } + } + args.add(databaseName); args.add(tableName); @@ -442,6 +457,10 @@ public void importTable(ImportJobContext context) // Start an async thread to read and upload the whole stream. counters.startClock(); sink.processStream(is); + + // Start an async thread to send stderr to log4j. + errSink = new LoggingAsyncSink(LOG); + errSink.processStream(p.getErrorStream()); } finally { // block until the process is done. @@ -482,6 +501,18 @@ public void importTable(ImportJobContext context) } } + // Try to wait for stderr to finish, but regard any errors as advisory. + if (null != errSink) { + try { + if (0 != errSink.join()) { + LOG.info("Encountered exception reading stderr stream"); + } + } catch (InterruptedException ie) { + LOG.info("Thread interrupted waiting for stderr to complete: " + + ie.toString()); + } + } + LOG.info("Transfer loop complete."); if (0 != result) { diff --git a/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java b/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java index 0718f53b..9ba47838 100644 --- a/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java +++ b/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.io.File; +import java.net.InetAddress; +import java.net.UnknownHostException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -88,5 +90,22 @@ public static SplittableBufferedWriter createHdfsSink(Configuration conf, new SplittingOutputStream(conf, destDir, "data-", options.getDirectSplitSize(), options.shouldUseCompression())); } + + /** @return true if someHost refers to localhost. + */ + public static boolean isLocalhost(String someHost) { + if (null == someHost) { + return false; + } + + try { + InetAddress localHostAddr = InetAddress.getLocalHost(); + InetAddress someAddr = InetAddress.getByName(someHost); + + return localHostAddr.equals(someAddr); + } catch (UnknownHostException uhe) { + return false; + } + } } diff --git a/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java b/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java index 6f98cd09..28d99169 100644 --- a/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java +++ b/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java @@ -181,7 +181,7 @@ private String getCurrentUser() { } } - private String [] getArgv(boolean mysqlOutputDelims) { + private String [] getArgv(boolean mysqlOutputDelims, String... extraArgs) { ArrayList args = new ArrayList(); args.add("-D"); @@ -205,11 +205,17 @@ private String getCurrentUser() { args.add("--mysql-delimiters"); } + if (null != extraArgs) { + for (String arg : extraArgs) { + args.add(arg); + } + } + return args.toArray(new String[0]); } - private void doLocalBulkImport(boolean mysqlOutputDelims, String [] expectedResults) - throws IOException { + private void doLocalBulkImport(boolean mysqlOutputDelims, + String [] expectedResults, String [] extraArgs) throws IOException { Path warehousePath = new Path(this.getWarehouseDir()); Path tablePath = new Path(warehousePath, TABLE_NAME); @@ -221,7 +227,7 @@ private void doLocalBulkImport(boolean mysqlOutputDelims, String [] expectedResu FileListing.recursiveDeleteDir(tableFile); } - String [] argv = getArgv(mysqlOutputDelims); + String [] argv = getArgv(mysqlOutputDelims, extraArgs); try { runImport(argv); } catch (IOException ioe) { @@ -256,7 +262,20 @@ public void testLocalBulkImportWithDefaultDelims() throws IOException { "3,Fred,2009-01-23,15,marketing" }; - doLocalBulkImport(false, expectedResults); + doLocalBulkImport(false, expectedResults, null); + } + + @Test + public void testWithExtraParams() throws IOException { + // no quoting of strings allowed. + String [] expectedResults = { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing" + }; + + String [] extraArgs = { "-", "--lock-tables" }; + + doLocalBulkImport(false, expectedResults, extraArgs); } @Test @@ -267,6 +286,6 @@ public void testLocalBulkImportWithMysqlQuotes() throws IOException { "3,'Fred','2009-01-23',15,'marketing'" }; - doLocalBulkImport(true, expectedResults); + doLocalBulkImport(true, expectedResults, null); } }