mirror of
https://github.com/apache/sqoop.git
synced 2025-05-03 11:12:14 +08:00
SQOOP-3. Add ability to import arbitrary queries.
Add ConnManager.importQuery() API. Change BaseSqoopTool.DEBUG_SQL_CMD_ARG to SQL_QUERY_ARG to reflect the broader applicability of the argument. Change 'debugSqlCmd' member of SqoopOptions to 'sqlQuery'. CompilationManager now sets jar name based on specified class name. Add tests for query support. Add documentation for query-based import. From: Aaron Kimball <aaron@cloudera.com> git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149932 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
84b613e92d
commit
b0cd632b58
@ -45,11 +45,14 @@ Import control options
|
||||
--num-mappers (n)::
|
||||
Use 'n' map tasks to import in parallel
|
||||
|
||||
--query (statement)::
|
||||
Imports the results of +statement+ instead of a table
|
||||
|
||||
--split-by (column-name)::
|
||||
Column of the table used to split the table for parallel import
|
||||
|
||||
--table (table-name)::
|
||||
The table to read (required)
|
||||
The table to import
|
||||
|
||||
--target-dir (dir)::
|
||||
Explicit HDFS target directory for the import.
|
||||
|
@ -52,11 +52,14 @@ Import control options
|
||||
-m::
|
||||
Use 'n' map tasks to import in parallel
|
||||
|
||||
--query (statement)::
|
||||
Imports the results of +statement+ instead of a table
|
||||
|
||||
--split-by (column-name)::
|
||||
Column of the table used to split the table for parallel import
|
||||
|
||||
--table (table-name)::
|
||||
The table to read (required)
|
||||
The table to import
|
||||
|
||||
--target-dir (dir)::
|
||||
Explicit HDFS target directory for the import.
|
||||
|
@ -62,6 +62,7 @@ Argument Description
|
||||
when importing in direct mode
|
||||
+\--inline-lob-limit <n>+ Set the maximum size for an inline LOB
|
||||
+-m,\--num-mappers <n>+ Use 'n' map tasks to import in parallel
|
||||
+-e,\--query <statement>+ Import the results of '+statement+'.
|
||||
+\--split-by <column-name>+ Column of the table used to split work\
|
||||
units
|
||||
+\--table <table-name>+ Table to read
|
||||
@ -76,7 +77,7 @@ Argument Description
|
||||
Selecting the Data to Import
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Sqoop currently imports data in a table-centric fashion. Use the
|
||||
Sqoop typically imports data in a table-centric fashion. Use the
|
||||
+\--table+ argument to select the table to import. For example, +\--table
|
||||
employees+. This argument can also identify a +VIEW+ or other table-like
|
||||
entity in a database.
|
||||
@ -103,6 +104,38 @@ form +SELECT <column list> FROM <table name>+. You can append a
|
||||
"id > 400"+. Only rows where the +id+ column has a value greater than
|
||||
400 will be imported.
|
||||
|
||||
Free-form Query Imports
|
||||
^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Sqoop can also import the result set of an arbitrary SQL query. Instead of
|
||||
using the +\--table+, +\--columns+ and +\--where+ arguments, you can specify
|
||||
a SQL statement with the +\--query+ argument.
|
||||
|
||||
When importing a free-form query, you must specify a destination directory
|
||||
with +\--target-dir+.
|
||||
|
||||
If you want to import the results of a query in parallel, then each map task
|
||||
will need to execute a copy of the query, with results partitioned by bounding
|
||||
conditions inferred by Sqoop. Your query must include the token +$CONDITIONS+
|
||||
which each Sqoop process will replace with a unique condition expression.
|
||||
You must also select a splitting column with +\--split-by+.
|
||||
|
||||
For example:
|
||||
|
||||
----
|
||||
$ sqoop import \
|
||||
--query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id) WHERE $CONDITIONS' \
|
||||
--split-by a.id --target-dir /user/foo/joinresults
|
||||
----
|
||||
|
||||
Alternately, the query can be executed once and imported serially, by
|
||||
specifying a single map task with +-m 1+:
|
||||
|
||||
----
|
||||
$ sqoop import --query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id)' \
|
||||
-m 1 --target-dir /user/foo/joinresults
|
||||
----
|
||||
|
||||
Controlling Parallelism
|
||||
^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
|
@ -83,7 +83,7 @@ public enum FileLayout {
|
||||
private String hadoopHome;
|
||||
private String splitByCol;
|
||||
private String whereClause;
|
||||
private String debugSqlCmd;
|
||||
private String sqlQuery;
|
||||
private String driverClassName;
|
||||
private String warehouseDir;
|
||||
private String targetDir;
|
||||
@ -591,12 +591,12 @@ public void setHadoopHome(String home) {
|
||||
/**
|
||||
* @return a sql command to execute and exit with.
|
||||
*/
|
||||
public String getDebugSqlCmd() {
|
||||
return debugSqlCmd;
|
||||
public String getSqlQuery() {
|
||||
return sqlQuery;
|
||||
}
|
||||
|
||||
public void setDebugSqlCmd(String sqlStatement) {
|
||||
this.debugSqlCmd = sqlStatement;
|
||||
public void setSqlQuery(String sqlStatement) {
|
||||
this.sqlQuery = sqlStatement;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -24,6 +24,9 @@
|
||||
import java.sql.SQLException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.cloudera.sqoop.util.ExportException;
|
||||
import com.cloudera.sqoop.util.ImportException;
|
||||
|
||||
@ -34,6 +37,8 @@
|
||||
*/
|
||||
public abstract class ConnManager {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(SqlManager.class.getName());
|
||||
|
||||
/**
|
||||
* Return a list of all databases on a server.
|
||||
*/
|
||||
@ -49,6 +54,14 @@ public abstract class ConnManager {
|
||||
*/
|
||||
public abstract String [] getColumnNames(String tableName);
|
||||
|
||||
/**
|
||||
* Return a list of column names in query in the order returned by the db.
|
||||
*/
|
||||
public String [] getColumnNamesForQuery(String query) {
|
||||
LOG.error("This database does not support free-form query column names.");
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the name of the primary key for a table, or null if there is none.
|
||||
*/
|
||||
@ -76,6 +89,17 @@ public abstract class ConnManager {
|
||||
*/
|
||||
public abstract Map<String, Integer> getColumnTypes(String tableName);
|
||||
|
||||
/**
|
||||
* Return an unordered mapping from colname to sqltype for
|
||||
* all columns in a query.
|
||||
*
|
||||
* The Integer type id is a constant from java.sql.Types
|
||||
*/
|
||||
public Map<String, Integer> getColumnTypesForQuery(String query) {
|
||||
LOG.error("This database does not support free-form query column types.");
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a SQL statement to read the named set of columns from a table.
|
||||
* If columns is null, all columns from the table are read. This is a direct
|
||||
@ -109,6 +133,15 @@ public abstract ResultSet readTable(String tableName, String [] columns)
|
||||
public abstract void importTable(ImportJobContext context)
|
||||
throws IOException, ImportException;
|
||||
|
||||
/**
|
||||
* Perform an import of a free-form query from the database into HDFS.
|
||||
*/
|
||||
public void importQuery(ImportJobContext context)
|
||||
throws IOException, ImportException {
|
||||
throw new ImportException(
|
||||
"This database only supports table-based imports.");
|
||||
}
|
||||
|
||||
/**
|
||||
* When using a column name in a generated SQL query, how (if at all)
|
||||
* should we escape that column name? e.g., a column named "table"
|
||||
|
@ -47,6 +47,7 @@
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;
|
||||
|
||||
/**
|
||||
* ConnManager implementation for generic SQL-compliant database.
|
||||
@ -57,6 +58,12 @@ public abstract class SqlManager extends ConnManager {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(SqlManager.class.getName());
|
||||
|
||||
/** Substring that must appear in free-form queries submitted by users.
|
||||
* This is the string '$CONDITIONS'.
|
||||
*/
|
||||
public static final String SUBSTITUTE_TOKEN =
|
||||
DataDrivenDBInputFormat.SUBSTITUTE_TOKEN;
|
||||
|
||||
protected SqoopOptions options;
|
||||
private Statement lastStatement;
|
||||
|
||||
@ -78,9 +85,23 @@ protected String getColNamesQuery(String tableName) {
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public String[] getColumnNames(String tableName) {
|
||||
String stmt = getColNamesQuery(tableName);
|
||||
return getColumnNamesForRawQuery(stmt);
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public String [] getColumnNamesForQuery(String query) {
|
||||
String rawQuery = query.replace(SUBSTITUTE_TOKEN, " (1 = 0) ");
|
||||
return getColumnNamesForRawQuery(rawQuery);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get column names for a query statement that we do not modify further.
|
||||
*/
|
||||
public String[] getColumnNamesForRawQuery(String stmt) {
|
||||
ResultSet results;
|
||||
try {
|
||||
results = execute(stmt);
|
||||
@ -98,6 +119,9 @@ public String[] getColumnNames(String tableName) {
|
||||
String colName = metadata.getColumnName(i);
|
||||
if (colName == null || colName.equals("")) {
|
||||
colName = metadata.getColumnLabel(i);
|
||||
if (null == colName) {
|
||||
colName = "_RESULT_" + i;
|
||||
}
|
||||
}
|
||||
columns.add(colName);
|
||||
}
|
||||
@ -128,7 +152,20 @@ protected String getColTypesQuery(String tableName) {
|
||||
@Override
|
||||
public Map<String, Integer> getColumnTypes(String tableName) {
|
||||
String stmt = getColTypesQuery(tableName);
|
||||
return getColumnTypesForRawQuery(stmt);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Integer> getColumnTypesForQuery(String query) {
|
||||
// Manipulate the query to return immediately, with zero rows.
|
||||
String rawQuery = query.replace(SUBSTITUTE_TOKEN, " (1 = 0) ");
|
||||
return getColumnTypesForRawQuery(rawQuery);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get column types for a query statement that we do not modify further.
|
||||
*/
|
||||
protected Map<String, Integer> getColumnTypesForRawQuery(String stmt) {
|
||||
ResultSet results;
|
||||
try {
|
||||
results = execute(stmt);
|
||||
@ -285,7 +322,7 @@ public String getPrimaryKey(String tableName) {
|
||||
*/
|
||||
protected String getSplitColumn(SqoopOptions opts, String tableName) {
|
||||
String splitCol = opts.getSplitByCol();
|
||||
if (null == splitCol) {
|
||||
if (null == splitCol && null != tableName) {
|
||||
// If the user didn't specify a splitting column, try to infer one.
|
||||
splitCol = getPrimaryKey(tableName);
|
||||
}
|
||||
@ -317,6 +354,30 @@ public void importTable(ImportJobContext context)
|
||||
importer.runImport(tableName, jarFile, splitCol, opts.getConf());
|
||||
}
|
||||
|
||||
/**
|
||||
* Default implementation of importQuery() is to launch a MapReduce job
|
||||
* via DataDrivenImportJob to read the table with DataDrivenDBInputFormat,
|
||||
* using its free-form query importer.
|
||||
*/
|
||||
public void importQuery(ImportJobContext context)
|
||||
throws IOException, ImportException {
|
||||
String jarFile = context.getJarFile();
|
||||
SqoopOptions opts = context.getOptions();
|
||||
|
||||
DataDrivenImportJob importer =
|
||||
new DataDrivenImportJob(opts, context.getInputFormat(), context);
|
||||
|
||||
String splitCol = getSplitColumn(opts, null);
|
||||
if (null == splitCol && opts.getNumMappers() > 1) {
|
||||
// Can't infer a primary key.
|
||||
throw new ImportException("A split-by column must be specified for "
|
||||
+ "parallel free-form query imports. Please specify one with "
|
||||
+ "--split-by or perform a sequential import with '-m 1'.");
|
||||
}
|
||||
|
||||
importer.runImport(null, jarFile, splitCol, opts.getConf());
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes an arbitrary SQL statement.
|
||||
* @param stmt The SQL statement to execute
|
||||
|
@ -117,29 +117,43 @@ protected void configureInputFormat(Job job, String tableName,
|
||||
username, options.getPassword());
|
||||
}
|
||||
|
||||
String [] colNames = options.getColumns();
|
||||
if (null == colNames) {
|
||||
colNames = mgr.getColumnNames(tableName);
|
||||
}
|
||||
|
||||
String [] sqlColNames = null;
|
||||
if (null != colNames) {
|
||||
sqlColNames = new String[colNames.length];
|
||||
for (int i = 0; i < colNames.length; i++) {
|
||||
sqlColNames[i] = mgr.escapeColName(colNames[i]);
|
||||
if (null != tableName) {
|
||||
// Import a table.
|
||||
String [] colNames = options.getColumns();
|
||||
if (null == colNames) {
|
||||
colNames = mgr.getColumnNames(tableName);
|
||||
}
|
||||
|
||||
String [] sqlColNames = null;
|
||||
if (null != colNames) {
|
||||
sqlColNames = new String[colNames.length];
|
||||
for (int i = 0; i < colNames.length; i++) {
|
||||
sqlColNames[i] = mgr.escapeColName(colNames[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// It's ok if the where clause is null in DBInputFormat.setInput.
|
||||
String whereClause = options.getWhereClause();
|
||||
|
||||
// We can't set the class properly in here, because we may not have the
|
||||
// jar loaded in this JVM. So we start by calling setInput() with
|
||||
// DBWritable and then overriding the string manually.
|
||||
DataDrivenDBInputFormat.setInput(job, DBWritable.class,
|
||||
mgr.escapeTableName(tableName), whereClause,
|
||||
mgr.escapeColName(splitByCol), sqlColNames);
|
||||
} else {
|
||||
// Import a free-form query.
|
||||
String inputQuery = options.getSqlQuery();
|
||||
String sanitizedQuery = inputQuery.replace(
|
||||
DataDrivenDBInputFormat.SUBSTITUTE_TOKEN, " (1 = 1) ");
|
||||
String inputBoundingQuery = "SELECT MIN(" + splitByCol
|
||||
+ "), MAX(" + splitByCol + ") FROM (" + sanitizedQuery + ") AS t1";
|
||||
DataDrivenDBInputFormat.setInput(job, DBWritable.class,
|
||||
inputQuery, inputBoundingQuery);
|
||||
new DBConfiguration(job.getConfiguration()).setInputOrderBy(
|
||||
splitByCol);
|
||||
}
|
||||
|
||||
// It's ok if the where clause is null in DBInputFormat.setInput.
|
||||
String whereClause = options.getWhereClause();
|
||||
|
||||
// We can't set the class properly in here, because we may not have the
|
||||
// jar loaded in this JVM. So we start by calling setInput() with
|
||||
// DBWritable and then overriding the string manually.
|
||||
DataDrivenDBInputFormat.setInput(job, DBWritable.class,
|
||||
mgr.escapeTableName(tableName), whereClause,
|
||||
mgr.escapeColName(splitByCol), sqlColNames);
|
||||
|
||||
LOG.debug("Using table class: " + tableClassName);
|
||||
job.getConfiguration().set(HadoopShim.get().getDbInputClassProperty(),
|
||||
tableClassName);
|
||||
|
@ -117,7 +117,9 @@ protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
|
||||
/**
|
||||
* Run an import job to read a table in to HDFS.
|
||||
*
|
||||
* @param tableName the database table to read
|
||||
* @param tableName the database table to read; may be null if a free-form
|
||||
* query is specified in the SqoopOptions, and the ImportJobBase subclass
|
||||
* supports free-form queries.
|
||||
* @param ormJarFile the Jar file to insert into the dcache classpath.
|
||||
* (may be null)
|
||||
* @param splitByCol the column of the database table to use to split
|
||||
@ -130,7 +132,12 @@ protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
|
||||
public void runImport(String tableName, String ormJarFile, String splitByCol,
|
||||
Configuration conf) throws IOException, ImportException {
|
||||
|
||||
LOG.info("Beginning import of " + tableName);
|
||||
if (null != tableName) {
|
||||
LOG.info("Beginning import of " + tableName);
|
||||
} else {
|
||||
LOG.info("Beginning query import.");
|
||||
}
|
||||
|
||||
String tableClassName =
|
||||
new TableClassName(options).getClassForTable(tableName);
|
||||
loadJars(conf, ormJarFile, tableClassName);
|
||||
|
@ -64,6 +64,13 @@ protected void configureInputFormat(Job job, String tableName,
|
||||
String tableClassName, String splitByCol)
|
||||
throws ClassNotFoundException, IOException {
|
||||
|
||||
if (null == tableName) {
|
||||
LOG.error(
|
||||
"mysqldump-based import cannot support free-form query imports.");
|
||||
LOG.error("Do not use --direct and --query together for MySQL.");
|
||||
throw new IOException("null tableName for MySQLDumpImportJob.");
|
||||
}
|
||||
|
||||
ConnManager mgr = new ConnFactory(options.getConf()).getManager(options);
|
||||
|
||||
try {
|
||||
|
@ -21,6 +21,7 @@
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import com.cloudera.sqoop.SqoopOptions;
|
||||
import com.cloudera.sqoop.manager.ConnManager;
|
||||
import com.cloudera.sqoop.manager.SqlManager;
|
||||
import com.cloudera.sqoop.lib.BigDecimalSerializer;
|
||||
import com.cloudera.sqoop.lib.DelimiterSet;
|
||||
import com.cloudera.sqoop.lib.FieldFormatter;
|
||||
@ -114,10 +115,12 @@ public class ClassWriter {
|
||||
private CompilationManager compileManager;
|
||||
|
||||
/**
|
||||
* Creates a new ClassWriter to generate an ORM class for a table.
|
||||
* Creates a new ClassWriter to generate an ORM class for a table
|
||||
* or arbitrary query.
|
||||
* @param opts program-wide options
|
||||
* @param connMgr the connection manager used to describe the table.
|
||||
* @param table the name of the table to read.
|
||||
* @param table the name of the table to read. If null, query is taken
|
||||
* from the SqoopOptions.
|
||||
*/
|
||||
public ClassWriter(final SqoopOptions opts, final ConnManager connMgr,
|
||||
final String table, final CompilationManager compMgr) {
|
||||
@ -843,11 +846,32 @@ private void generateHadoopWrite(Map<String, Integer> columnTypes,
|
||||
* Generate the ORM code for the class.
|
||||
*/
|
||||
public void generate() throws IOException {
|
||||
Map<String, Integer> columnTypes = connManager.getColumnTypes(tableName);
|
||||
Map<String, Integer> columnTypes;
|
||||
|
||||
if (null != tableName) {
|
||||
// We're generating a class based on a table import.
|
||||
columnTypes = connManager.getColumnTypes(tableName);
|
||||
} else {
|
||||
// This is based on an arbitrary query.
|
||||
String query = this.options.getSqlQuery();
|
||||
if (query.indexOf(SqlManager.SUBSTITUTE_TOKEN) == -1) {
|
||||
throw new IOException("Query must contain '"
|
||||
+ SqlManager.SUBSTITUTE_TOKEN + "' in WHERE clause.");
|
||||
}
|
||||
|
||||
columnTypes = connManager.getColumnTypesForQuery(query);
|
||||
}
|
||||
|
||||
String [] colNames = options.getColumns();
|
||||
if (null == colNames) {
|
||||
colNames = connManager.getColumnNames(tableName);
|
||||
if (null != tableName) {
|
||||
// Table-based import. Read column names from table.
|
||||
colNames = connManager.getColumnNames(tableName);
|
||||
} else {
|
||||
// Infer/assign column names for arbitrary query.
|
||||
colNames = connManager.getColumnNamesForQuery(
|
||||
this.options.getSqlQuery());
|
||||
}
|
||||
}
|
||||
|
||||
// Translate all the column names into names that are safe to
|
||||
|
@ -53,6 +53,10 @@
|
||||
*/
|
||||
public class CompilationManager {
|
||||
|
||||
/** If we cannot infer a jar name from a table name, etc., use this. */
|
||||
public static final String DEFAULT_CODEGEN_JAR_NAME =
|
||||
"sqoop-codegen-created.jar";
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
CompilationManager.class.getName());
|
||||
|
||||
@ -202,7 +206,11 @@ public void compile() throws IOException {
|
||||
public String getJarFilename() {
|
||||
String jarOutDir = options.getJarOutputDir();
|
||||
String tableName = options.getTableName();
|
||||
if (null != tableName && tableName.length() > 0) {
|
||||
String specificClassName = options.getClassName();
|
||||
|
||||
if (specificClassName != null && specificClassName.length() > 0) {
|
||||
return jarOutDir + specificClassName + ".jar";
|
||||
} else if (null != tableName && tableName.length() > 0) {
|
||||
return jarOutDir + tableName + ".jar";
|
||||
} else if (this.sources.size() == 1) {
|
||||
// if we only have one source file, find it's base name,
|
||||
@ -213,7 +221,7 @@ public String getJarFilename() {
|
||||
String preExtPart = parts[0];
|
||||
return jarOutDir + preExtPart + ".jar";
|
||||
} else {
|
||||
return jarOutDir + "sqoop.jar";
|
||||
return jarOutDir + DEFAULT_CODEGEN_JAR_NAME;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -76,29 +76,30 @@ public String getPackageForTable() {
|
||||
* @return the full name of the class to generate/use to import a table.
|
||||
*/
|
||||
public String getClassForTable(String tableName) {
|
||||
if (null == tableName) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String predefinedClass = options.getClassName();
|
||||
if (predefinedClass != null) {
|
||||
// The user's chosen a specific class name for this job.
|
||||
return predefinedClass;
|
||||
}
|
||||
|
||||
String queryName = tableName;
|
||||
if (null == queryName) {
|
||||
queryName = "QueryResult";
|
||||
}
|
||||
|
||||
String packageName = options.getPackageName();
|
||||
if (null != packageName) {
|
||||
// return packageName.tableName.
|
||||
return packageName + "." + tableName;
|
||||
// return packageName.queryName.
|
||||
return packageName + "." + queryName;
|
||||
}
|
||||
|
||||
// no specific class; no specific package.
|
||||
// Just make sure it's a legal identifier.
|
||||
return ClassWriter.toIdentifier(tableName);
|
||||
return ClassWriter.toIdentifier(queryName);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return just the last spegment of the class name -- all package info
|
||||
* @return just the last segment of the class name -- all package info
|
||||
* stripped.
|
||||
*/
|
||||
public String getShortClassForTable(String tableName) {
|
||||
|
@ -106,8 +106,8 @@ public abstract class BaseSqoopTool extends SqoopTool {
|
||||
public static final String PACKAGE_NAME_ARG = "package-name";
|
||||
public static final String CLASS_NAME_ARG = "class-name";
|
||||
public static final String JAR_FILE_NAME_ARG = "jar-file";
|
||||
public static final String DEBUG_SQL_ARG = "query";
|
||||
public static final String DEBUG_SQL_SHORT_ARG = "e";
|
||||
public static final String SQL_QUERY_ARG = "query";
|
||||
public static final String SQL_QUERY_SHORT_ARG = "e";
|
||||
public static final String VERBOSE_ARG = "verbose";
|
||||
public static final String HELP_ARG = "help";
|
||||
|
||||
|
@ -48,7 +48,7 @@ public int run(SqoopOptions options) {
|
||||
|
||||
try {
|
||||
// just run a SQL statement for debugging purposes.
|
||||
manager.execAndPrint(options.getDebugSqlCmd());
|
||||
manager.execAndPrint(options.getSqlQuery());
|
||||
} finally {
|
||||
destroy(options);
|
||||
}
|
||||
@ -65,8 +65,8 @@ public void configureOptions(ToolOptions toolOptions) {
|
||||
evalOpts.addOption(OptionBuilder.withArgName("statement")
|
||||
.hasArg()
|
||||
.withDescription("Execute 'statement' in SQL and exit")
|
||||
.withLongOpt(DEBUG_SQL_ARG)
|
||||
.create(DEBUG_SQL_SHORT_ARG));
|
||||
.withLongOpt(SQL_QUERY_ARG)
|
||||
.create(SQL_QUERY_SHORT_ARG));
|
||||
|
||||
toolOptions.addUniqueOptions(evalOpts);
|
||||
}
|
||||
@ -77,8 +77,8 @@ public void applyOptions(CommandLine in, SqoopOptions out)
|
||||
throws InvalidOptionsException {
|
||||
|
||||
applyCommonOptions(in, out);
|
||||
if (in.hasOption(DEBUG_SQL_ARG)) {
|
||||
out.setDebugSqlCmd(in.getOptionValue(DEBUG_SQL_ARG));
|
||||
if (in.hasOption(SQL_QUERY_ARG)) {
|
||||
out.setSqlQuery(in.getOptionValue(SQL_QUERY_ARG));
|
||||
}
|
||||
}
|
||||
|
||||
@ -91,10 +91,10 @@ public void validateOptions(SqoopOptions options)
|
||||
throw new InvalidOptionsException(HELP_STR);
|
||||
}
|
||||
|
||||
String sqlCmd = options.getDebugSqlCmd();
|
||||
String sqlCmd = options.getSqlQuery();
|
||||
if (null == sqlCmd || sqlCmd.length() == 0) {
|
||||
throw new InvalidOptionsException(
|
||||
"This command requires the " + DEBUG_SQL_ARG + " argument."
|
||||
"This command requires the " + SQL_QUERY_ARG + " argument."
|
||||
+ HELP_STR);
|
||||
}
|
||||
|
||||
|
@ -26,6 +26,8 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.cloudera.sqoop.Sqoop;
|
||||
import com.cloudera.sqoop.SqoopOptions;
|
||||
import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
|
||||
@ -86,7 +88,11 @@ protected void importTable(SqoopOptions options, String tableName,
|
||||
ImportJobContext context = new ImportJobContext(tableName, jarFile,
|
||||
options, getOutputPath(options, tableName));
|
||||
|
||||
manager.importTable(context);
|
||||
if (null != tableName) {
|
||||
manager.importTable(context);
|
||||
} else {
|
||||
manager.importQuery(context);
|
||||
}
|
||||
|
||||
if (options.isAppendMode()) {
|
||||
AppendUtils app = new AppendUtils(context);
|
||||
@ -150,11 +156,11 @@ public int run(SqoopOptions options) {
|
||||
hiveImport = new HiveImport(options, manager, options.getConf(), false);
|
||||
}
|
||||
|
||||
// Import a single table the user specified.
|
||||
// Import a single table (or query) the user specified.
|
||||
importTable(options, options.getTableName(), hiveImport);
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Encountered IOException running import job: "
|
||||
+ ioe.toString());
|
||||
+ StringUtils.stringifyException(ioe));
|
||||
if (System.getProperty(Sqoop.SQOOP_RETHROW_PROPERTY) != null) {
|
||||
throw new RuntimeException(ioe);
|
||||
} else {
|
||||
@ -215,6 +221,11 @@ protected RelatedOptions getImportOptions() {
|
||||
.hasArg().withDescription("HDFS plain table destination")
|
||||
.withLongOpt(TARGET_DIR_ARG)
|
||||
.create());
|
||||
importOpts.addOption(OptionBuilder.withArgName("statement")
|
||||
.hasArg()
|
||||
.withDescription("Import results of SQL 'statement'")
|
||||
.withLongOpt(SQL_QUERY_ARG)
|
||||
.create(SQL_QUERY_SHORT_ARG));
|
||||
}
|
||||
|
||||
importOpts.addOption(OptionBuilder.withArgName("dir")
|
||||
@ -329,13 +340,16 @@ public void applyOptions(CommandLine in, SqoopOptions out)
|
||||
if (in.hasOption(APPEND_ARG)) {
|
||||
out.setAppendMode(true);
|
||||
}
|
||||
|
||||
if (in.hasOption(SQL_QUERY_ARG)) {
|
||||
out.setSqlQuery(in.getOptionValue(SQL_QUERY_ARG));
|
||||
}
|
||||
}
|
||||
|
||||
if (in.hasOption(WAREHOUSE_DIR_ARG)) {
|
||||
out.setWarehouseDir(in.getOptionValue(WAREHOUSE_DIR_ARG));
|
||||
}
|
||||
|
||||
|
||||
if (in.hasOption(FMT_SEQUENCEFILE_ARG)) {
|
||||
out.setFileLayout(SqoopOptions.FileLayout.SequenceFile);
|
||||
}
|
||||
@ -382,9 +396,11 @@ public void applyOptions(CommandLine in, SqoopOptions out)
|
||||
*/
|
||||
protected void validateImportOptions(SqoopOptions options)
|
||||
throws InvalidOptionsException {
|
||||
if (!allTables && options.getTableName() == null) {
|
||||
if (!allTables && options.getTableName() == null
|
||||
&& options.getSqlQuery() == null) {
|
||||
throw new InvalidOptionsException(
|
||||
"--table is required for import. (Or use sqoop import-all-tables.)"
|
||||
"--table or --" + SQL_QUERY_ARG + " is required for import. "
|
||||
+ "(Or use sqoop import-all-tables.)"
|
||||
+ HELP_STR);
|
||||
} else if (options.getExistingJarName() != null
|
||||
&& options.getClassName() == null) {
|
||||
@ -393,8 +409,28 @@ protected void validateImportOptions(SqoopOptions options)
|
||||
} else if (options.getTargetDir() != null
|
||||
&& options.getWarehouseDir() != null) {
|
||||
throw new InvalidOptionsException(
|
||||
"--target-dir with --warehouse-dir are incompatible options"
|
||||
"--target-dir with --warehouse-dir are incompatible options."
|
||||
+ HELP_STR);
|
||||
} else if (options.getTableName() != null
|
||||
&& options.getSqlQuery() != null) {
|
||||
throw new InvalidOptionsException(
|
||||
"Cannot specify --" + SQL_QUERY_ARG + " and --table together."
|
||||
+ HELP_STR);
|
||||
} else if (options.getSqlQuery() != null
|
||||
&& options.getTargetDir() == null) {
|
||||
throw new InvalidOptionsException(
|
||||
"Must specify destination with --target-dir."
|
||||
+ HELP_STR);
|
||||
} else if (options.getSqlQuery() != null && options.doHiveImport()
|
||||
&& options.getHiveTableName() == null) {
|
||||
throw new InvalidOptionsException(
|
||||
"When importing a query to Hive, you must specify --"
|
||||
+ HIVE_TABLE_ARG + "." + HELP_STR);
|
||||
} else if (options.getSqlQuery() != null && options.getNumMappers() > 1
|
||||
&& options.getSplitByCol() == null) {
|
||||
throw new InvalidOptionsException(
|
||||
"When importing query results in parallel, you must specify --"
|
||||
+ SPLIT_BY_ARG + "." + HELP_STR);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -56,6 +56,7 @@ public static Test suite() {
|
||||
suite.addTestSuite(TestMultiCols.class);
|
||||
suite.addTestSuite(TestMultiMaps.class);
|
||||
suite.addTestSuite(TestSplitBy.class);
|
||||
suite.addTestSuite(TestQuery.class);
|
||||
suite.addTestSuite(TestWhere.class);
|
||||
suite.addTestSuite(TestTargetDir.class);
|
||||
suite.addTestSuite(TestAppendUtils.class);
|
||||
|
186
src/test/com/cloudera/sqoop/TestQuery.java
Normal file
186
src/test/com/cloudera/sqoop/TestQuery.java
Normal file
@ -0,0 +1,186 @@
|
||||
/**
|
||||
* Licensed to Cloudera, Inc. under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Cloudera, Inc. licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.cloudera.sqoop;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
|
||||
import com.cloudera.sqoop.orm.CompilationManager;
|
||||
import com.cloudera.sqoop.testutil.CommonArgs;
|
||||
import com.cloudera.sqoop.testutil.HsqldbTestServer;
|
||||
import com.cloudera.sqoop.testutil.ImportJobTestCase;
|
||||
import com.cloudera.sqoop.testutil.SeqFileReader;
|
||||
import com.cloudera.sqoop.tool.ImportTool;
|
||||
import com.cloudera.sqoop.util.ClassLoaderStack;
|
||||
|
||||
/**
|
||||
* Test that --query works in Sqoop.
|
||||
*/
|
||||
public class TestQuery extends ImportJobTestCase {
|
||||
|
||||
/**
|
||||
* Create the argv to pass to Sqoop.
|
||||
* @return the argv as an array of strings.
|
||||
*/
|
||||
protected String [] getArgv(boolean includeHadoopFlags, String query,
|
||||
String targetDir, boolean allowParallel) {
|
||||
|
||||
ArrayList<String> args = new ArrayList<String>();
|
||||
|
||||
if (includeHadoopFlags) {
|
||||
CommonArgs.addHadoopFlags(args);
|
||||
}
|
||||
|
||||
args.add("--query");
|
||||
args.add(query);
|
||||
args.add("--split-by");
|
||||
args.add("INTFIELD1");
|
||||
args.add("--connect");
|
||||
args.add(HsqldbTestServer.getUrl());
|
||||
args.add("--as-sequencefile");
|
||||
args.add("--target-dir");
|
||||
args.add(targetDir);
|
||||
args.add("--class-name");
|
||||
args.add(getTableName());
|
||||
if (allowParallel) {
|
||||
args.add("--num-mappers");
|
||||
args.add("2");
|
||||
} else {
|
||||
args.add("--num-mappers");
|
||||
args.add("1");
|
||||
}
|
||||
|
||||
return args.toArray(new String[0]);
|
||||
}
|
||||
|
||||
// this test just uses the two int table.
|
||||
protected String getTableName() {
|
||||
return HsqldbTestServer.getTableName();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Given a comma-delimited list of integers, grab and parse the first int.
|
||||
* @param str a comma-delimited list of values, the first of which is an int.
|
||||
* @return the first field in the string, cast to int
|
||||
*/
|
||||
private int getFirstInt(String str) {
|
||||
String [] parts = str.split(",");
|
||||
return Integer.parseInt(parts[0]);
|
||||
}
|
||||
|
||||
public void runQueryTest(String query, String firstValStr,
|
||||
int numExpectedResults, int expectedSum, String targetDir)
|
||||
throws IOException {
|
||||
|
||||
ClassLoader prevClassLoader = null;
|
||||
SequenceFile.Reader reader = null;
|
||||
|
||||
String [] argv = getArgv(true, query, targetDir, false);
|
||||
runImport(argv);
|
||||
try {
|
||||
SqoopOptions opts = new ImportTool().parseArguments(
|
||||
getArgv(false, query, targetDir, false),
|
||||
null, null, true);
|
||||
|
||||
CompilationManager compileMgr = new CompilationManager(opts);
|
||||
String jarFileName = compileMgr.getJarFilename();
|
||||
|
||||
prevClassLoader = ClassLoaderStack.addJarFile(jarFileName,
|
||||
getTableName());
|
||||
|
||||
reader = SeqFileReader.getSeqFileReader(getDataFilePath().toString());
|
||||
|
||||
// here we can actually instantiate (k, v) pairs.
|
||||
Configuration conf = new Configuration();
|
||||
Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
|
||||
Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf);
|
||||
|
||||
if (reader.next(key) == null) {
|
||||
fail("Empty SequenceFile during import");
|
||||
}
|
||||
|
||||
// make sure that the value we think should be at the top, is.
|
||||
reader.getCurrentValue(val);
|
||||
assertEquals("Invalid ordering within sorted SeqFile", firstValStr,
|
||||
val.toString());
|
||||
|
||||
// We know that these values are two ints separated by a ',' character.
|
||||
// Since this is all dynamic, though, we don't want to actually link
|
||||
// against the class and use its methods. So we just parse this back
|
||||
// into int fields manually. Sum them up and ensure that we get the
|
||||
// expected total for the first column, to verify that we got all the
|
||||
// results from the db into the file.
|
||||
int curSum = getFirstInt(val.toString());
|
||||
int totalResults = 1;
|
||||
|
||||
// now sum up everything else in the file.
|
||||
while (reader.next(key) != null) {
|
||||
reader.getCurrentValue(val);
|
||||
curSum += getFirstInt(val.toString());
|
||||
totalResults++;
|
||||
}
|
||||
|
||||
assertEquals("Total sum of first db column mismatch", expectedSum,
|
||||
curSum);
|
||||
assertEquals("Incorrect number of results for query", numExpectedResults,
|
||||
totalResults);
|
||||
} catch (InvalidOptionsException ioe) {
|
||||
fail(ioe.toString());
|
||||
} catch (ParseException pe) {
|
||||
fail(pe.toString());
|
||||
} finally {
|
||||
IOUtils.closeStream(reader);
|
||||
|
||||
if (null != prevClassLoader) {
|
||||
ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testSelectStar() throws IOException {
|
||||
runQueryTest("SELECT * FROM " + getTableName()
|
||||
+ " WHERE INTFIELD2 > 4 AND $CONDITIONS",
|
||||
"1,8\n", 2, 4, getTablePath().toString());
|
||||
}
|
||||
|
||||
public void testCompoundWhere() throws IOException {
|
||||
runQueryTest("SELECT * FROM " + getTableName()
|
||||
+ " WHERE INTFIELD1 > 4 AND INTFIELD2 < 3 AND $CONDITIONS",
|
||||
"7,2\n", 1, 7, getTablePath().toString());
|
||||
}
|
||||
|
||||
public void testFailNoConditions() throws IOException {
|
||||
String [] argv = getArgv(true, "SELECT * FROM " + getTableName(),
|
||||
getTablePath().toString(), true);
|
||||
try {
|
||||
runImport(argv);
|
||||
fail("Expected exception running import without $CONDITIONS");
|
||||
} catch (Exception e) {
|
||||
LOG.info("Got exception " + e + " running job (expected; ok)");
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user