diff --git a/src/docs/man/import-args.txt b/src/docs/man/import-args.txt index 86865449..538e9136 100644 --- a/src/docs/man/import-args.txt +++ b/src/docs/man/import-args.txt @@ -45,11 +45,14 @@ Import control options --num-mappers (n):: Use 'n' map tasks to import in parallel +--query (statement):: + Imports the results of +statement+ instead of a table + --split-by (column-name):: Column of the table used to split the table for parallel import --table (table-name):: - The table to read (required) + The table to import --target-dir (dir):: Explicit HDFS target directory for the import. diff --git a/src/docs/man/sqoop-import.txt b/src/docs/man/sqoop-import.txt index e6b9e1d5..23ac5d73 100644 --- a/src/docs/man/sqoop-import.txt +++ b/src/docs/man/sqoop-import.txt @@ -52,11 +52,14 @@ Import control options -m:: Use 'n' map tasks to import in parallel +--query (statement):: + Imports the results of +statement+ instead of a table + --split-by (column-name):: Column of the table used to split the table for parallel import --table (table-name):: - The table to read (required) + The table to import --target-dir (dir):: Explicit HDFS target directory for the import. diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt index 5cbce045..39f0436a 100644 --- a/src/docs/user/import.txt +++ b/src/docs/user/import.txt @@ -62,6 +62,7 @@ Argument Description when importing in direct mode +\--inline-lob-limit + Set the maximum size for an inline LOB +-m,\--num-mappers + Use 'n' map tasks to import in parallel ++-e,\--query + Import the results of '+statement+'. +\--split-by + Column of the table used to split work\ units +\--table + Table to read @@ -76,7 +77,7 @@ Argument Description Selecting the Data to Import ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Sqoop currently imports data in a table-centric fashion. Use the +Sqoop typically imports data in a table-centric fashion. Use the +\--table+ argument to select the table to import. For example, +\--table employees+. This argument can also identify a +VIEW+ or other table-like entity in a database. @@ -103,6 +104,38 @@ form +SELECT FROM +. You can append a "id > 400"+. Only rows where the +id+ column has a value greater than 400 will be imported. +Free-form Query Imports +^^^^^^^^^^^^^^^^^^^^^^^ + +Sqoop can also import the result set of an arbitrary SQL query. Instead of +using the +\--table+, +\--columns+ and +\--where+ arguments, you can specify +a SQL statement with the +\--query+ argument. + +When importing a free-form query, you must specify a destination directory +with +\--target-dir+. + +If you want to import the results of a query in parallel, then each map task +will need to execute a copy of the query, with results partitioned by bounding +conditions inferred by Sqoop. Your query must include the token +$CONDITIONS+ +which each Sqoop process will replace with a unique condition expression. +You must also select a splitting column with +\--split-by+. + +For example: + +---- +$ sqoop import \ + --query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id) WHERE $CONDITIONS' \ + --split-by a.id --target-dir /user/foo/joinresults +---- + +Alternately, the query can be executed once and imported serially, by +specifying a single map task with +-m 1+: + +---- +$ sqoop import --query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id)' \ + -m 1 --target-dir /user/foo/joinresults +---- + Controlling Parallelism ^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/src/java/com/cloudera/sqoop/SqoopOptions.java b/src/java/com/cloudera/sqoop/SqoopOptions.java index 7b811610..945b461e 100644 --- a/src/java/com/cloudera/sqoop/SqoopOptions.java +++ b/src/java/com/cloudera/sqoop/SqoopOptions.java @@ -83,7 +83,7 @@ public enum FileLayout { private String hadoopHome; private String splitByCol; private String whereClause; - private String debugSqlCmd; + private String sqlQuery; private String driverClassName; private String warehouseDir; private String targetDir; @@ -591,12 +591,12 @@ public void setHadoopHome(String home) { /** * @return a sql command to execute and exit with. */ - public String getDebugSqlCmd() { - return debugSqlCmd; + public String getSqlQuery() { + return sqlQuery; } - public void setDebugSqlCmd(String sqlStatement) { - this.debugSqlCmd = sqlStatement; + public void setSqlQuery(String sqlStatement) { + this.sqlQuery = sqlStatement; } /** diff --git a/src/java/com/cloudera/sqoop/manager/ConnManager.java b/src/java/com/cloudera/sqoop/manager/ConnManager.java index 0c7302db..effd00a5 100644 --- a/src/java/com/cloudera/sqoop/manager/ConnManager.java +++ b/src/java/com/cloudera/sqoop/manager/ConnManager.java @@ -24,6 +24,9 @@ import java.sql.SQLException; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import com.cloudera.sqoop.util.ExportException; import com.cloudera.sqoop.util.ImportException; @@ -34,6 +37,8 @@ */ public abstract class ConnManager { + public static final Log LOG = LogFactory.getLog(SqlManager.class.getName()); + /** * Return a list of all databases on a server. */ @@ -49,6 +54,14 @@ public abstract class ConnManager { */ public abstract String [] getColumnNames(String tableName); + /** + * Return a list of column names in query in the order returned by the db. + */ + public String [] getColumnNamesForQuery(String query) { + LOG.error("This database does not support free-form query column names."); + return null; + } + /** * Return the name of the primary key for a table, or null if there is none. */ @@ -76,6 +89,17 @@ public abstract class ConnManager { */ public abstract Map getColumnTypes(String tableName); + /** + * Return an unordered mapping from colname to sqltype for + * all columns in a query. + * + * The Integer type id is a constant from java.sql.Types + */ + public Map getColumnTypesForQuery(String query) { + LOG.error("This database does not support free-form query column types."); + return null; + } + /** * Execute a SQL statement to read the named set of columns from a table. * If columns is null, all columns from the table are read. This is a direct @@ -109,6 +133,15 @@ public abstract ResultSet readTable(String tableName, String [] columns) public abstract void importTable(ImportJobContext context) throws IOException, ImportException; + /** + * Perform an import of a free-form query from the database into HDFS. + */ + public void importQuery(ImportJobContext context) + throws IOException, ImportException { + throw new ImportException( + "This database only supports table-based imports."); + } + /** * When using a column name in a generated SQL query, how (if at all) * should we escape that column name? e.g., a column named "table" diff --git a/src/java/com/cloudera/sqoop/manager/SqlManager.java b/src/java/com/cloudera/sqoop/manager/SqlManager.java index 5f0a62b2..74c87744 100644 --- a/src/java/com/cloudera/sqoop/manager/SqlManager.java +++ b/src/java/com/cloudera/sqoop/manager/SqlManager.java @@ -47,6 +47,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat; /** * ConnManager implementation for generic SQL-compliant database. @@ -57,6 +58,12 @@ public abstract class SqlManager extends ConnManager { public static final Log LOG = LogFactory.getLog(SqlManager.class.getName()); + /** Substring that must appear in free-form queries submitted by users. + * This is the string '$CONDITIONS'. + */ + public static final String SUBSTITUTE_TOKEN = + DataDrivenDBInputFormat.SUBSTITUTE_TOKEN; + protected SqoopOptions options; private Statement lastStatement; @@ -78,9 +85,23 @@ protected String getColNamesQuery(String tableName) { } @Override + /** {@inheritDoc} */ public String[] getColumnNames(String tableName) { String stmt = getColNamesQuery(tableName); + return getColumnNamesForRawQuery(stmt); + } + @Override + /** {@inheritDoc} */ + public String [] getColumnNamesForQuery(String query) { + String rawQuery = query.replace(SUBSTITUTE_TOKEN, " (1 = 0) "); + return getColumnNamesForRawQuery(rawQuery); + } + + /** + * Get column names for a query statement that we do not modify further. + */ + public String[] getColumnNamesForRawQuery(String stmt) { ResultSet results; try { results = execute(stmt); @@ -98,6 +119,9 @@ public String[] getColumnNames(String tableName) { String colName = metadata.getColumnName(i); if (colName == null || colName.equals("")) { colName = metadata.getColumnLabel(i); + if (null == colName) { + colName = "_RESULT_" + i; + } } columns.add(colName); } @@ -128,7 +152,20 @@ protected String getColTypesQuery(String tableName) { @Override public Map getColumnTypes(String tableName) { String stmt = getColTypesQuery(tableName); + return getColumnTypesForRawQuery(stmt); + } + @Override + public Map getColumnTypesForQuery(String query) { + // Manipulate the query to return immediately, with zero rows. + String rawQuery = query.replace(SUBSTITUTE_TOKEN, " (1 = 0) "); + return getColumnTypesForRawQuery(rawQuery); + } + + /** + * Get column types for a query statement that we do not modify further. + */ + protected Map getColumnTypesForRawQuery(String stmt) { ResultSet results; try { results = execute(stmt); @@ -285,7 +322,7 @@ public String getPrimaryKey(String tableName) { */ protected String getSplitColumn(SqoopOptions opts, String tableName) { String splitCol = opts.getSplitByCol(); - if (null == splitCol) { + if (null == splitCol && null != tableName) { // If the user didn't specify a splitting column, try to infer one. splitCol = getPrimaryKey(tableName); } @@ -317,6 +354,30 @@ public void importTable(ImportJobContext context) importer.runImport(tableName, jarFile, splitCol, opts.getConf()); } + /** + * Default implementation of importQuery() is to launch a MapReduce job + * via DataDrivenImportJob to read the table with DataDrivenDBInputFormat, + * using its free-form query importer. + */ + public void importQuery(ImportJobContext context) + throws IOException, ImportException { + String jarFile = context.getJarFile(); + SqoopOptions opts = context.getOptions(); + + DataDrivenImportJob importer = + new DataDrivenImportJob(opts, context.getInputFormat(), context); + + String splitCol = getSplitColumn(opts, null); + if (null == splitCol && opts.getNumMappers() > 1) { + // Can't infer a primary key. + throw new ImportException("A split-by column must be specified for " + + "parallel free-form query imports. Please specify one with " + + "--split-by or perform a sequential import with '-m 1'."); + } + + importer.runImport(null, jarFile, splitCol, opts.getConf()); + } + /** * Executes an arbitrary SQL statement. * @param stmt The SQL statement to execute diff --git a/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java index a02082c0..f561be1d 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java @@ -117,29 +117,43 @@ protected void configureInputFormat(Job job, String tableName, username, options.getPassword()); } - String [] colNames = options.getColumns(); - if (null == colNames) { - colNames = mgr.getColumnNames(tableName); - } - - String [] sqlColNames = null; - if (null != colNames) { - sqlColNames = new String[colNames.length]; - for (int i = 0; i < colNames.length; i++) { - sqlColNames[i] = mgr.escapeColName(colNames[i]); + if (null != tableName) { + // Import a table. + String [] colNames = options.getColumns(); + if (null == colNames) { + colNames = mgr.getColumnNames(tableName); } + + String [] sqlColNames = null; + if (null != colNames) { + sqlColNames = new String[colNames.length]; + for (int i = 0; i < colNames.length; i++) { + sqlColNames[i] = mgr.escapeColName(colNames[i]); + } + } + + // It's ok if the where clause is null in DBInputFormat.setInput. + String whereClause = options.getWhereClause(); + + // We can't set the class properly in here, because we may not have the + // jar loaded in this JVM. So we start by calling setInput() with + // DBWritable and then overriding the string manually. + DataDrivenDBInputFormat.setInput(job, DBWritable.class, + mgr.escapeTableName(tableName), whereClause, + mgr.escapeColName(splitByCol), sqlColNames); + } else { + // Import a free-form query. + String inputQuery = options.getSqlQuery(); + String sanitizedQuery = inputQuery.replace( + DataDrivenDBInputFormat.SUBSTITUTE_TOKEN, " (1 = 1) "); + String inputBoundingQuery = "SELECT MIN(" + splitByCol + + "), MAX(" + splitByCol + ") FROM (" + sanitizedQuery + ") AS t1"; + DataDrivenDBInputFormat.setInput(job, DBWritable.class, + inputQuery, inputBoundingQuery); + new DBConfiguration(job.getConfiguration()).setInputOrderBy( + splitByCol); } - // It's ok if the where clause is null in DBInputFormat.setInput. - String whereClause = options.getWhereClause(); - - // We can't set the class properly in here, because we may not have the - // jar loaded in this JVM. So we start by calling setInput() with - // DBWritable and then overriding the string manually. - DataDrivenDBInputFormat.setInput(job, DBWritable.class, - mgr.escapeTableName(tableName), whereClause, - mgr.escapeColName(splitByCol), sqlColNames); - LOG.debug("Using table class: " + tableClassName); job.getConfiguration().set(HadoopShim.get().getDbInputClassProperty(), tableClassName); diff --git a/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java b/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java index c3ea09a4..28cf6136 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java +++ b/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java @@ -117,7 +117,9 @@ protected boolean runJob(Job job) throws ClassNotFoundException, IOException, /** * Run an import job to read a table in to HDFS. * - * @param tableName the database table to read + * @param tableName the database table to read; may be null if a free-form + * query is specified in the SqoopOptions, and the ImportJobBase subclass + * supports free-form queries. * @param ormJarFile the Jar file to insert into the dcache classpath. * (may be null) * @param splitByCol the column of the database table to use to split @@ -130,7 +132,12 @@ protected boolean runJob(Job job) throws ClassNotFoundException, IOException, public void runImport(String tableName, String ormJarFile, String splitByCol, Configuration conf) throws IOException, ImportException { - LOG.info("Beginning import of " + tableName); + if (null != tableName) { + LOG.info("Beginning import of " + tableName); + } else { + LOG.info("Beginning query import."); + } + String tableClassName = new TableClassName(options).getClassForTable(tableName); loadJars(conf, ormJarFile, tableClassName); diff --git a/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpImportJob.java b/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpImportJob.java index 21769178..e6a764c3 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpImportJob.java +++ b/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpImportJob.java @@ -64,6 +64,13 @@ protected void configureInputFormat(Job job, String tableName, String tableClassName, String splitByCol) throws ClassNotFoundException, IOException { + if (null == tableName) { + LOG.error( + "mysqldump-based import cannot support free-form query imports."); + LOG.error("Do not use --direct and --query together for MySQL."); + throw new IOException("null tableName for MySQLDumpImportJob."); + } + ConnManager mgr = new ConnFactory(options.getConf()).getManager(options); try { diff --git a/src/java/com/cloudera/sqoop/orm/ClassWriter.java b/src/java/com/cloudera/sqoop/orm/ClassWriter.java index e6a88649..dbbac640 100644 --- a/src/java/com/cloudera/sqoop/orm/ClassWriter.java +++ b/src/java/com/cloudera/sqoop/orm/ClassWriter.java @@ -21,6 +21,7 @@ import org.apache.hadoop.io.BytesWritable; import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.manager.ConnManager; +import com.cloudera.sqoop.manager.SqlManager; import com.cloudera.sqoop.lib.BigDecimalSerializer; import com.cloudera.sqoop.lib.DelimiterSet; import com.cloudera.sqoop.lib.FieldFormatter; @@ -114,10 +115,12 @@ public class ClassWriter { private CompilationManager compileManager; /** - * Creates a new ClassWriter to generate an ORM class for a table. + * Creates a new ClassWriter to generate an ORM class for a table + * or arbitrary query. * @param opts program-wide options * @param connMgr the connection manager used to describe the table. - * @param table the name of the table to read. + * @param table the name of the table to read. If null, query is taken + * from the SqoopOptions. */ public ClassWriter(final SqoopOptions opts, final ConnManager connMgr, final String table, final CompilationManager compMgr) { @@ -843,11 +846,32 @@ private void generateHadoopWrite(Map columnTypes, * Generate the ORM code for the class. */ public void generate() throws IOException { - Map columnTypes = connManager.getColumnTypes(tableName); + Map columnTypes; + + if (null != tableName) { + // We're generating a class based on a table import. + columnTypes = connManager.getColumnTypes(tableName); + } else { + // This is based on an arbitrary query. + String query = this.options.getSqlQuery(); + if (query.indexOf(SqlManager.SUBSTITUTE_TOKEN) == -1) { + throw new IOException("Query must contain '" + + SqlManager.SUBSTITUTE_TOKEN + "' in WHERE clause."); + } + + columnTypes = connManager.getColumnTypesForQuery(query); + } String [] colNames = options.getColumns(); if (null == colNames) { - colNames = connManager.getColumnNames(tableName); + if (null != tableName) { + // Table-based import. Read column names from table. + colNames = connManager.getColumnNames(tableName); + } else { + // Infer/assign column names for arbitrary query. + colNames = connManager.getColumnNamesForQuery( + this.options.getSqlQuery()); + } } // Translate all the column names into names that are safe to diff --git a/src/java/com/cloudera/sqoop/orm/CompilationManager.java b/src/java/com/cloudera/sqoop/orm/CompilationManager.java index d4de7c4e..6ed3a6f3 100644 --- a/src/java/com/cloudera/sqoop/orm/CompilationManager.java +++ b/src/java/com/cloudera/sqoop/orm/CompilationManager.java @@ -53,6 +53,10 @@ */ public class CompilationManager { + /** If we cannot infer a jar name from a table name, etc., use this. */ + public static final String DEFAULT_CODEGEN_JAR_NAME = + "sqoop-codegen-created.jar"; + public static final Log LOG = LogFactory.getLog( CompilationManager.class.getName()); @@ -202,7 +206,11 @@ public void compile() throws IOException { public String getJarFilename() { String jarOutDir = options.getJarOutputDir(); String tableName = options.getTableName(); - if (null != tableName && tableName.length() > 0) { + String specificClassName = options.getClassName(); + + if (specificClassName != null && specificClassName.length() > 0) { + return jarOutDir + specificClassName + ".jar"; + } else if (null != tableName && tableName.length() > 0) { return jarOutDir + tableName + ".jar"; } else if (this.sources.size() == 1) { // if we only have one source file, find it's base name, @@ -213,7 +221,7 @@ public String getJarFilename() { String preExtPart = parts[0]; return jarOutDir + preExtPart + ".jar"; } else { - return jarOutDir + "sqoop.jar"; + return jarOutDir + DEFAULT_CODEGEN_JAR_NAME; } } diff --git a/src/java/com/cloudera/sqoop/orm/TableClassName.java b/src/java/com/cloudera/sqoop/orm/TableClassName.java index 4fc0f543..acd31323 100644 --- a/src/java/com/cloudera/sqoop/orm/TableClassName.java +++ b/src/java/com/cloudera/sqoop/orm/TableClassName.java @@ -76,29 +76,30 @@ public String getPackageForTable() { * @return the full name of the class to generate/use to import a table. */ public String getClassForTable(String tableName) { - if (null == tableName) { - return null; - } - String predefinedClass = options.getClassName(); if (predefinedClass != null) { // The user's chosen a specific class name for this job. return predefinedClass; } + String queryName = tableName; + if (null == queryName) { + queryName = "QueryResult"; + } + String packageName = options.getPackageName(); if (null != packageName) { - // return packageName.tableName. - return packageName + "." + tableName; + // return packageName.queryName. + return packageName + "." + queryName; } // no specific class; no specific package. // Just make sure it's a legal identifier. - return ClassWriter.toIdentifier(tableName); + return ClassWriter.toIdentifier(queryName); } /** - * @return just the last spegment of the class name -- all package info + * @return just the last segment of the class name -- all package info * stripped. */ public String getShortClassForTable(String tableName) { diff --git a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java index 03af6a45..8d454c2e 100644 --- a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java +++ b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java @@ -106,8 +106,8 @@ public abstract class BaseSqoopTool extends SqoopTool { public static final String PACKAGE_NAME_ARG = "package-name"; public static final String CLASS_NAME_ARG = "class-name"; public static final String JAR_FILE_NAME_ARG = "jar-file"; - public static final String DEBUG_SQL_ARG = "query"; - public static final String DEBUG_SQL_SHORT_ARG = "e"; + public static final String SQL_QUERY_ARG = "query"; + public static final String SQL_QUERY_SHORT_ARG = "e"; public static final String VERBOSE_ARG = "verbose"; public static final String HELP_ARG = "help"; diff --git a/src/java/com/cloudera/sqoop/tool/EvalSqlTool.java b/src/java/com/cloudera/sqoop/tool/EvalSqlTool.java index f6b21cbc..78639a0a 100644 --- a/src/java/com/cloudera/sqoop/tool/EvalSqlTool.java +++ b/src/java/com/cloudera/sqoop/tool/EvalSqlTool.java @@ -48,7 +48,7 @@ public int run(SqoopOptions options) { try { // just run a SQL statement for debugging purposes. - manager.execAndPrint(options.getDebugSqlCmd()); + manager.execAndPrint(options.getSqlQuery()); } finally { destroy(options); } @@ -65,8 +65,8 @@ public void configureOptions(ToolOptions toolOptions) { evalOpts.addOption(OptionBuilder.withArgName("statement") .hasArg() .withDescription("Execute 'statement' in SQL and exit") - .withLongOpt(DEBUG_SQL_ARG) - .create(DEBUG_SQL_SHORT_ARG)); + .withLongOpt(SQL_QUERY_ARG) + .create(SQL_QUERY_SHORT_ARG)); toolOptions.addUniqueOptions(evalOpts); } @@ -77,8 +77,8 @@ public void applyOptions(CommandLine in, SqoopOptions out) throws InvalidOptionsException { applyCommonOptions(in, out); - if (in.hasOption(DEBUG_SQL_ARG)) { - out.setDebugSqlCmd(in.getOptionValue(DEBUG_SQL_ARG)); + if (in.hasOption(SQL_QUERY_ARG)) { + out.setSqlQuery(in.getOptionValue(SQL_QUERY_ARG)); } } @@ -91,10 +91,10 @@ public void validateOptions(SqoopOptions options) throw new InvalidOptionsException(HELP_STR); } - String sqlCmd = options.getDebugSqlCmd(); + String sqlCmd = options.getSqlQuery(); if (null == sqlCmd || sqlCmd.length() == 0) { throw new InvalidOptionsException( - "This command requires the " + DEBUG_SQL_ARG + " argument." + "This command requires the " + SQL_QUERY_ARG + " argument." + HELP_STR); } diff --git a/src/java/com/cloudera/sqoop/tool/ImportTool.java b/src/java/com/cloudera/sqoop/tool/ImportTool.java index 2bbd462a..460fcbc1 100644 --- a/src/java/com/cloudera/sqoop/tool/ImportTool.java +++ b/src/java/com/cloudera/sqoop/tool/ImportTool.java @@ -26,6 +26,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; + import com.cloudera.sqoop.Sqoop; import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException; @@ -86,7 +88,11 @@ protected void importTable(SqoopOptions options, String tableName, ImportJobContext context = new ImportJobContext(tableName, jarFile, options, getOutputPath(options, tableName)); - manager.importTable(context); + if (null != tableName) { + manager.importTable(context); + } else { + manager.importQuery(context); + } if (options.isAppendMode()) { AppendUtils app = new AppendUtils(context); @@ -150,11 +156,11 @@ public int run(SqoopOptions options) { hiveImport = new HiveImport(options, manager, options.getConf(), false); } - // Import a single table the user specified. + // Import a single table (or query) the user specified. importTable(options, options.getTableName(), hiveImport); } catch (IOException ioe) { LOG.error("Encountered IOException running import job: " - + ioe.toString()); + + StringUtils.stringifyException(ioe)); if (System.getProperty(Sqoop.SQOOP_RETHROW_PROPERTY) != null) { throw new RuntimeException(ioe); } else { @@ -215,6 +221,11 @@ protected RelatedOptions getImportOptions() { .hasArg().withDescription("HDFS plain table destination") .withLongOpt(TARGET_DIR_ARG) .create()); + importOpts.addOption(OptionBuilder.withArgName("statement") + .hasArg() + .withDescription("Import results of SQL 'statement'") + .withLongOpt(SQL_QUERY_ARG) + .create(SQL_QUERY_SHORT_ARG)); } importOpts.addOption(OptionBuilder.withArgName("dir") @@ -329,13 +340,16 @@ public void applyOptions(CommandLine in, SqoopOptions out) if (in.hasOption(APPEND_ARG)) { out.setAppendMode(true); } + + if (in.hasOption(SQL_QUERY_ARG)) { + out.setSqlQuery(in.getOptionValue(SQL_QUERY_ARG)); + } } if (in.hasOption(WAREHOUSE_DIR_ARG)) { out.setWarehouseDir(in.getOptionValue(WAREHOUSE_DIR_ARG)); } - if (in.hasOption(FMT_SEQUENCEFILE_ARG)) { out.setFileLayout(SqoopOptions.FileLayout.SequenceFile); } @@ -382,9 +396,11 @@ public void applyOptions(CommandLine in, SqoopOptions out) */ protected void validateImportOptions(SqoopOptions options) throws InvalidOptionsException { - if (!allTables && options.getTableName() == null) { + if (!allTables && options.getTableName() == null + && options.getSqlQuery() == null) { throw new InvalidOptionsException( - "--table is required for import. (Or use sqoop import-all-tables.)" + "--table or --" + SQL_QUERY_ARG + " is required for import. " + + "(Or use sqoop import-all-tables.)" + HELP_STR); } else if (options.getExistingJarName() != null && options.getClassName() == null) { @@ -393,8 +409,28 @@ protected void validateImportOptions(SqoopOptions options) } else if (options.getTargetDir() != null && options.getWarehouseDir() != null) { throw new InvalidOptionsException( - "--target-dir with --warehouse-dir are incompatible options" + "--target-dir with --warehouse-dir are incompatible options." + HELP_STR); + } else if (options.getTableName() != null + && options.getSqlQuery() != null) { + throw new InvalidOptionsException( + "Cannot specify --" + SQL_QUERY_ARG + " and --table together." + + HELP_STR); + } else if (options.getSqlQuery() != null + && options.getTargetDir() == null) { + throw new InvalidOptionsException( + "Must specify destination with --target-dir." + + HELP_STR); + } else if (options.getSqlQuery() != null && options.doHiveImport() + && options.getHiveTableName() == null) { + throw new InvalidOptionsException( + "When importing a query to Hive, you must specify --" + + HIVE_TABLE_ARG + "." + HELP_STR); + } else if (options.getSqlQuery() != null && options.getNumMappers() > 1 + && options.getSplitByCol() == null) { + throw new InvalidOptionsException( + "When importing query results in parallel, you must specify --" + + SPLIT_BY_ARG + "." + HELP_STR); } } diff --git a/src/test/com/cloudera/sqoop/SmokeTests.java b/src/test/com/cloudera/sqoop/SmokeTests.java index e7aef42c..4e052208 100644 --- a/src/test/com/cloudera/sqoop/SmokeTests.java +++ b/src/test/com/cloudera/sqoop/SmokeTests.java @@ -56,6 +56,7 @@ public static Test suite() { suite.addTestSuite(TestMultiCols.class); suite.addTestSuite(TestMultiMaps.class); suite.addTestSuite(TestSplitBy.class); + suite.addTestSuite(TestQuery.class); suite.addTestSuite(TestWhere.class); suite.addTestSuite(TestTargetDir.class); suite.addTestSuite(TestAppendUtils.class); diff --git a/src/test/com/cloudera/sqoop/TestQuery.java b/src/test/com/cloudera/sqoop/TestQuery.java new file mode 100644 index 00000000..b0da3760 --- /dev/null +++ b/src/test/com/cloudera/sqoop/TestQuery.java @@ -0,0 +1,186 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.util.ReflectionUtils; + +import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException; +import com.cloudera.sqoop.orm.CompilationManager; +import com.cloudera.sqoop.testutil.CommonArgs; +import com.cloudera.sqoop.testutil.HsqldbTestServer; +import com.cloudera.sqoop.testutil.ImportJobTestCase; +import com.cloudera.sqoop.testutil.SeqFileReader; +import com.cloudera.sqoop.tool.ImportTool; +import com.cloudera.sqoop.util.ClassLoaderStack; + +/** + * Test that --query works in Sqoop. + */ +public class TestQuery extends ImportJobTestCase { + + /** + * Create the argv to pass to Sqoop. + * @return the argv as an array of strings. + */ + protected String [] getArgv(boolean includeHadoopFlags, String query, + String targetDir, boolean allowParallel) { + + ArrayList args = new ArrayList(); + + if (includeHadoopFlags) { + CommonArgs.addHadoopFlags(args); + } + + args.add("--query"); + args.add(query); + args.add("--split-by"); + args.add("INTFIELD1"); + args.add("--connect"); + args.add(HsqldbTestServer.getUrl()); + args.add("--as-sequencefile"); + args.add("--target-dir"); + args.add(targetDir); + args.add("--class-name"); + args.add(getTableName()); + if (allowParallel) { + args.add("--num-mappers"); + args.add("2"); + } else { + args.add("--num-mappers"); + args.add("1"); + } + + return args.toArray(new String[0]); + } + + // this test just uses the two int table. + protected String getTableName() { + return HsqldbTestServer.getTableName(); + } + + + /** + * Given a comma-delimited list of integers, grab and parse the first int. + * @param str a comma-delimited list of values, the first of which is an int. + * @return the first field in the string, cast to int + */ + private int getFirstInt(String str) { + String [] parts = str.split(","); + return Integer.parseInt(parts[0]); + } + + public void runQueryTest(String query, String firstValStr, + int numExpectedResults, int expectedSum, String targetDir) + throws IOException { + + ClassLoader prevClassLoader = null; + SequenceFile.Reader reader = null; + + String [] argv = getArgv(true, query, targetDir, false); + runImport(argv); + try { + SqoopOptions opts = new ImportTool().parseArguments( + getArgv(false, query, targetDir, false), + null, null, true); + + CompilationManager compileMgr = new CompilationManager(opts); + String jarFileName = compileMgr.getJarFilename(); + + prevClassLoader = ClassLoaderStack.addJarFile(jarFileName, + getTableName()); + + reader = SeqFileReader.getSeqFileReader(getDataFilePath().toString()); + + // here we can actually instantiate (k, v) pairs. + Configuration conf = new Configuration(); + Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf); + Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf); + + if (reader.next(key) == null) { + fail("Empty SequenceFile during import"); + } + + // make sure that the value we think should be at the top, is. + reader.getCurrentValue(val); + assertEquals("Invalid ordering within sorted SeqFile", firstValStr, + val.toString()); + + // We know that these values are two ints separated by a ',' character. + // Since this is all dynamic, though, we don't want to actually link + // against the class and use its methods. So we just parse this back + // into int fields manually. Sum them up and ensure that we get the + // expected total for the first column, to verify that we got all the + // results from the db into the file. + int curSum = getFirstInt(val.toString()); + int totalResults = 1; + + // now sum up everything else in the file. + while (reader.next(key) != null) { + reader.getCurrentValue(val); + curSum += getFirstInt(val.toString()); + totalResults++; + } + + assertEquals("Total sum of first db column mismatch", expectedSum, + curSum); + assertEquals("Incorrect number of results for query", numExpectedResults, + totalResults); + } catch (InvalidOptionsException ioe) { + fail(ioe.toString()); + } catch (ParseException pe) { + fail(pe.toString()); + } finally { + IOUtils.closeStream(reader); + + if (null != prevClassLoader) { + ClassLoaderStack.setCurrentClassLoader(prevClassLoader); + } + } + } + + public void testSelectStar() throws IOException { + runQueryTest("SELECT * FROM " + getTableName() + + " WHERE INTFIELD2 > 4 AND $CONDITIONS", + "1,8\n", 2, 4, getTablePath().toString()); + } + + public void testCompoundWhere() throws IOException { + runQueryTest("SELECT * FROM " + getTableName() + + " WHERE INTFIELD1 > 4 AND INTFIELD2 < 3 AND $CONDITIONS", + "7,2\n", 1, 7, getTablePath().toString()); + } + + public void testFailNoConditions() throws IOException { + String [] argv = getArgv(true, "SELECT * FROM " + getTableName(), + getTablePath().toString(), true); + try { + runImport(argv); + fail("Expected exception running import without $CONDITIONS"); + } catch (Exception e) { + LOG.info("Got exception " + e + " running job (expected; ok)"); + } + } +}