mirror of
https://github.com/apache/sqoop.git
synced 2025-05-03 19:39:29 +08:00
SQOOP-331. Support for boundary query.
(Jarek Jarcec Cecho via Arvind Prabhakar) git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1176981 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
028464dd7e
commit
6a7144a496
@ -35,6 +35,9 @@ Import control options
|
|||||||
--as-textfile::
|
--as-textfile::
|
||||||
Imports data as plain text (default)
|
Imports data as plain text (default)
|
||||||
|
|
||||||
|
--boundary-query (query)::
|
||||||
|
Using following query to select minimal and maximal value of '--split-by' column for creating splits
|
||||||
|
|
||||||
--columns (col,col,col...)::
|
--columns (col,col,col...)::
|
||||||
Columns to export from table
|
Columns to export from table
|
||||||
|
|
||||||
|
@ -60,6 +60,7 @@ Argument Description
|
|||||||
+\--as-avrodatafile+ Imports data to Avro Data Files
|
+\--as-avrodatafile+ Imports data to Avro Data Files
|
||||||
+\--as-sequencefile+ Imports data to SequenceFiles
|
+\--as-sequencefile+ Imports data to SequenceFiles
|
||||||
+\--as-textfile+ Imports data as plain text (default)
|
+\--as-textfile+ Imports data as plain text (default)
|
||||||
|
+\--boundary-query <statement>+ Boundary query to use for creating splits
|
||||||
+\--columns <col,col,col...>+ Columns to import from table
|
+\--columns <col,col,col...>+ Columns to import from table
|
||||||
+\--direct+ Use direct import fast path
|
+\--direct+ Use direct import fast path
|
||||||
+\--direct-split-size <n>+ Split the input stream every 'n' bytes\
|
+\--direct-split-size <n>+ Split the input stream every 'n' bytes\
|
||||||
@ -114,6 +115,11 @@ form +SELECT <column list> FROM <table name>+. You can append a
|
|||||||
"id > 400"+. Only rows where the +id+ column has a value greater than
|
"id > 400"+. Only rows where the +id+ column has a value greater than
|
||||||
400 will be imported.
|
400 will be imported.
|
||||||
|
|
||||||
|
By default sqoop will use query +select min(<split-by>), max(<split-by>) from
|
||||||
|
<table name>+ to find out boundaries for creating splits. In some cases this query
|
||||||
|
is not the most optimal so you can specify any arbitrary query returning two
|
||||||
|
numeric columns using +\--boundary-query+ argument.
|
||||||
|
|
||||||
Free-form Query Imports
|
Free-form Query Imports
|
||||||
^^^^^^^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
@ -138,6 +138,7 @@ public enum IncrementalMode {
|
|||||||
@StoredAsProperty("db.split.column") private String splitByCol;
|
@StoredAsProperty("db.split.column") private String splitByCol;
|
||||||
@StoredAsProperty("db.where.clause") private String whereClause;
|
@StoredAsProperty("db.where.clause") private String whereClause;
|
||||||
@StoredAsProperty("db.query") private String sqlQuery;
|
@StoredAsProperty("db.query") private String sqlQuery;
|
||||||
|
@StoredAsProperty("db.query.boundary") private String boundaryQuery;
|
||||||
@StoredAsProperty("jdbc.driver.class") private String driverClassName;
|
@StoredAsProperty("jdbc.driver.class") private String driverClassName;
|
||||||
@StoredAsProperty("hdfs.warehouse.dir") private String warehouseDir;
|
@StoredAsProperty("hdfs.warehouse.dir") private String warehouseDir;
|
||||||
@StoredAsProperty("hdfs.target.dir") private String targetDir;
|
@StoredAsProperty("hdfs.target.dir") private String targetDir;
|
||||||
@ -1200,6 +1201,14 @@ public void setSqlQuery(String sqlStatement) {
|
|||||||
this.sqlQuery = sqlStatement;
|
this.sqlQuery = sqlStatement;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getBoundaryQuery() {
|
||||||
|
return boundaryQuery;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBoundaryQuery(String sqlStatement) {
|
||||||
|
boundaryQuery = sqlStatement;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return The JDBC driver class name specified with --driver.
|
* @return The JDBC driver class name specified with --driver.
|
||||||
*/
|
*/
|
||||||
|
@ -870,4 +870,8 @@ public void migrateData(String fromTable, String toTable)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getInputBoundsQuery(String splitByCol, String sanitizedQuery) {
|
||||||
|
return options.getBoundaryQuery();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -155,14 +155,24 @@ protected void configureInputFormat(Job job, String tableName,
|
|||||||
DataDrivenDBInputFormat.setInput(job, DBWritable.class,
|
DataDrivenDBInputFormat.setInput(job, DBWritable.class,
|
||||||
mgr.escapeTableName(tableName), whereClause,
|
mgr.escapeTableName(tableName), whereClause,
|
||||||
mgr.escapeColName(splitByCol), sqlColNames);
|
mgr.escapeColName(splitByCol), sqlColNames);
|
||||||
|
|
||||||
|
// If user specified boundary query on the command line propagate it to
|
||||||
|
// the job
|
||||||
|
if(options.getBoundaryQuery() != null) {
|
||||||
|
DataDrivenDBInputFormat.setBoundingQuery(job.getConfiguration(),
|
||||||
|
options.getBoundaryQuery());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// Import a free-form query.
|
// Import a free-form query.
|
||||||
String inputQuery = options.getSqlQuery();
|
String inputQuery = options.getSqlQuery();
|
||||||
String sanitizedQuery = inputQuery.replace(
|
String sanitizedQuery = inputQuery.replace(
|
||||||
DataDrivenDBInputFormat.SUBSTITUTE_TOKEN, " (1 = 1) ");
|
DataDrivenDBInputFormat.SUBSTITUTE_TOKEN, " (1 = 1) ");
|
||||||
|
|
||||||
String inputBoundingQuery =
|
String inputBoundingQuery = options.getBoundaryQuery();
|
||||||
|
|
||||||
|
if(inputBoundingQuery == null) {
|
||||||
mgr.getInputBoundsQuery(splitByCol, sanitizedQuery);
|
mgr.getInputBoundsQuery(splitByCol, sanitizedQuery);
|
||||||
|
}
|
||||||
if (inputBoundingQuery == null) {
|
if (inputBoundingQuery == null) {
|
||||||
inputBoundingQuery = "SELECT MIN(" + splitByCol + "), MAX("
|
inputBoundingQuery = "SELECT MIN(" + splitByCol + "), MAX("
|
||||||
+ splitByCol + ") FROM (" + sanitizedQuery + ") AS t1";
|
+ splitByCol + ") FROM (" + sanitizedQuery + ") AS t1";
|
||||||
|
@ -170,10 +170,14 @@ protected DBSplitter getSplitter(int sqlDataType) {
|
|||||||
public List<InputSplit> getSplits(JobContext job) throws IOException {
|
public List<InputSplit> getSplits(JobContext job) throws IOException {
|
||||||
|
|
||||||
int targetNumTasks = ConfigurationHelper.getJobNumMaps(job);
|
int targetNumTasks = ConfigurationHelper.getJobNumMaps(job);
|
||||||
if (1 == targetNumTasks) {
|
String boundaryQuery = getDBConf().getInputBoundingQuery();
|
||||||
// There's no need to run a bounding vals query; just return a split
|
|
||||||
// that separates nothing. This can be considerably more optimal for a
|
// If user do not forced us to use his boundary query and we don't have to
|
||||||
// large table with no index.
|
// bacause there is only one mapper we will return single split that
|
||||||
|
// separates nothing. This can be considerably more optimal for a large
|
||||||
|
// table with no index.
|
||||||
|
if (1 == targetNumTasks
|
||||||
|
&& (boundaryQuery == null || boundaryQuery.isEmpty())) {
|
||||||
List<InputSplit> singletonSplit = new ArrayList<InputSplit>();
|
List<InputSplit> singletonSplit = new ArrayList<InputSplit>();
|
||||||
singletonSplit.add(new DataDrivenDBInputSplit("1=1", "1=1"));
|
singletonSplit.add(new DataDrivenDBInputSplit("1=1", "1=1"));
|
||||||
return singletonSplit;
|
return singletonSplit;
|
||||||
|
@ -136,6 +136,7 @@ public abstract class BaseSqoopTool extends SqoopTool {
|
|||||||
public static final String CLASS_NAME_ARG = "class-name";
|
public static final String CLASS_NAME_ARG = "class-name";
|
||||||
public static final String JAR_FILE_NAME_ARG = "jar-file";
|
public static final String JAR_FILE_NAME_ARG = "jar-file";
|
||||||
public static final String SQL_QUERY_ARG = "query";
|
public static final String SQL_QUERY_ARG = "query";
|
||||||
|
public static final String SQL_QUERY_BOUNDARY = "boundary-query";
|
||||||
public static final String SQL_QUERY_SHORT_ARG = "e";
|
public static final String SQL_QUERY_SHORT_ARG = "e";
|
||||||
public static final String VERBOSE_ARG = "verbose";
|
public static final String VERBOSE_ARG = "verbose";
|
||||||
public static final String HELP_ARG = "help";
|
public static final String HELP_ARG = "help";
|
||||||
|
@ -527,6 +527,12 @@ protected RelatedOptions getImportOptions() {
|
|||||||
.withDescription("Import results of SQL 'statement'")
|
.withDescription("Import results of SQL 'statement'")
|
||||||
.withLongOpt(SQL_QUERY_ARG)
|
.withLongOpt(SQL_QUERY_ARG)
|
||||||
.create(SQL_QUERY_SHORT_ARG));
|
.create(SQL_QUERY_SHORT_ARG));
|
||||||
|
importOpts.addOption(OptionBuilder.withArgName("statement")
|
||||||
|
.hasArg()
|
||||||
|
.withDescription("Set boundary query for retrieving max and min"
|
||||||
|
+ " value of the primary key")
|
||||||
|
.withLongOpt(SQL_QUERY_BOUNDARY)
|
||||||
|
.create());
|
||||||
}
|
}
|
||||||
|
|
||||||
importOpts.addOption(OptionBuilder.withArgName("dir")
|
importOpts.addOption(OptionBuilder.withArgName("dir")
|
||||||
@ -721,6 +727,10 @@ public void applyOptions(CommandLine in, SqoopOptions out)
|
|||||||
if (in.hasOption(SQL_QUERY_ARG)) {
|
if (in.hasOption(SQL_QUERY_ARG)) {
|
||||||
out.setSqlQuery(in.getOptionValue(SQL_QUERY_ARG));
|
out.setSqlQuery(in.getOptionValue(SQL_QUERY_ARG));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(in.hasOption(SQL_QUERY_BOUNDARY)) {
|
||||||
|
out.setBoundaryQuery(in.getOptionValue(SQL_QUERY_BOUNDARY));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (in.hasOption(WAREHOUSE_DIR_ARG)) {
|
if (in.hasOption(WAREHOUSE_DIR_ARG)) {
|
||||||
|
166
src/test/com/cloudera/sqoop/TestBoundaryQuery.java
Normal file
166
src/test/com/cloudera/sqoop/TestBoundaryQuery.java
Normal file
@ -0,0 +1,166 @@
|
|||||||
|
/**
|
||||||
|
* Copyright 2011 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package com.cloudera.sqoop;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import org.apache.commons.cli.ParseException;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
|
||||||
|
import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
|
||||||
|
import com.cloudera.sqoop.orm.CompilationManager;
|
||||||
|
import com.cloudera.sqoop.testutil.CommonArgs;
|
||||||
|
import com.cloudera.sqoop.testutil.HsqldbTestServer;
|
||||||
|
import com.cloudera.sqoop.testutil.ImportJobTestCase;
|
||||||
|
import com.cloudera.sqoop.testutil.SeqFileReader;
|
||||||
|
import com.cloudera.sqoop.tool.ImportTool;
|
||||||
|
import com.cloudera.sqoop.util.ClassLoaderStack;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that --boundary-query works in Sqoop.
|
||||||
|
*/
|
||||||
|
public class TestBoundaryQuery extends ImportJobTestCase {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create the argv to pass to Sqoop.
|
||||||
|
* @return the argv as an array of strings.
|
||||||
|
*/
|
||||||
|
protected String [] getArgv(boolean includeHadoopFlags, String boundaryQuery,
|
||||||
|
String targetDir) {
|
||||||
|
|
||||||
|
ArrayList<String> args = new ArrayList<String>();
|
||||||
|
|
||||||
|
if (includeHadoopFlags) {
|
||||||
|
CommonArgs.addHadoopFlags(args);
|
||||||
|
}
|
||||||
|
|
||||||
|
args.add("--table");
|
||||||
|
args.add(HsqldbTestServer.getTableName());
|
||||||
|
args.add("--split-by");
|
||||||
|
args.add("INTFIELD1");
|
||||||
|
args.add("--connect");
|
||||||
|
args.add(HsqldbTestServer.getUrl());
|
||||||
|
args.add("--boundary-query");
|
||||||
|
args.add(boundaryQuery);
|
||||||
|
args.add("--as-sequencefile");
|
||||||
|
args.add("--target-dir");
|
||||||
|
args.add(targetDir);
|
||||||
|
args.add("--class-name");
|
||||||
|
args.add(getTableName());
|
||||||
|
args.add("--verbose");
|
||||||
|
|
||||||
|
return args.toArray(new String[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// this test just uses the two int table.
|
||||||
|
protected String getTableName() {
|
||||||
|
return HsqldbTestServer.getTableName();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a comma-delimited list of integers, grab and parse the first int.
|
||||||
|
* @param str a comma-delimited list of values, the first of which is an int.
|
||||||
|
* @return the first field in the string, cast to int
|
||||||
|
*/
|
||||||
|
private int getFirstInt(String str) {
|
||||||
|
String [] parts = str.split(",");
|
||||||
|
return Integer.parseInt(parts[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void runQueryTest(String query, int numExpectedResults,
|
||||||
|
int expectedSum, String targetDir)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
ClassLoader prevClassLoader = null;
|
||||||
|
SequenceFile.Reader reader = null;
|
||||||
|
|
||||||
|
String [] argv = getArgv(true, query, targetDir);
|
||||||
|
runImport(argv);
|
||||||
|
try {
|
||||||
|
SqoopOptions opts = new ImportTool().parseArguments(
|
||||||
|
getArgv(false, query, targetDir),
|
||||||
|
null, null, true);
|
||||||
|
|
||||||
|
CompilationManager compileMgr = new CompilationManager(opts);
|
||||||
|
String jarFileName = compileMgr.getJarFilename();
|
||||||
|
|
||||||
|
prevClassLoader = ClassLoaderStack.addJarFile(jarFileName,
|
||||||
|
getTableName());
|
||||||
|
|
||||||
|
reader = SeqFileReader.getSeqFileReader(getDataFilePath().toString());
|
||||||
|
|
||||||
|
// here we can actually instantiate (k, v) pairs.
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
|
||||||
|
Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf);
|
||||||
|
|
||||||
|
if (reader.next(key) == null) {
|
||||||
|
fail("Empty SequenceFile during import");
|
||||||
|
}
|
||||||
|
|
||||||
|
// make sure that the value we think should be at the top, is.
|
||||||
|
reader.getCurrentValue(val);
|
||||||
|
|
||||||
|
// We know that these values are two ints separated by a ',' character.
|
||||||
|
// Since this is all dynamic, though, we don't want to actually link
|
||||||
|
// against the class and use its methods. So we just parse this back
|
||||||
|
// into int fields manually. Sum them up and ensure that we get the
|
||||||
|
// expected total for the first column, to verify that we got all the
|
||||||
|
// results from the db into the file.
|
||||||
|
int curSum = getFirstInt(val.toString());
|
||||||
|
int totalResults = 1;
|
||||||
|
|
||||||
|
// now sum up everything else in the file.
|
||||||
|
while (reader.next(key) != null) {
|
||||||
|
reader.getCurrentValue(val);
|
||||||
|
curSum += getFirstInt(val.toString());
|
||||||
|
totalResults++;
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals("Total sum of first db column mismatch", expectedSum,
|
||||||
|
curSum);
|
||||||
|
assertEquals("Incorrect number of results for query", numExpectedResults,
|
||||||
|
totalResults);
|
||||||
|
} catch (InvalidOptionsException ioe) {
|
||||||
|
fail(ioe.toString());
|
||||||
|
} catch (ParseException pe) {
|
||||||
|
fail(pe.toString());
|
||||||
|
} finally {
|
||||||
|
IOUtils.closeStream(reader);
|
||||||
|
|
||||||
|
if (null != prevClassLoader) {
|
||||||
|
ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testBoundaryQuery() throws IOException {
|
||||||
|
System.out.println("PCYO");
|
||||||
|
String query = "select min(intfield1), max(intfield1) from "
|
||||||
|
+ getTableName() +" where intfield1 in (3, 5)";
|
||||||
|
|
||||||
|
runQueryTest(query, 2, 8, getTablePath().toString());
|
||||||
|
}
|
||||||
|
}
|
@ -248,6 +248,15 @@ public void testHivePartitionParams() throws Exception {
|
|||||||
assertEquals("20110413", opts.getHivePartitionValue());
|
assertEquals("20110413", opts.getHivePartitionValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testBoundaryQueryParams() throws Exception {
|
||||||
|
String[] args = {
|
||||||
|
"--boundary-query", "select 1, 2",
|
||||||
|
};
|
||||||
|
|
||||||
|
SqoopOptions opts = parse(args);
|
||||||
|
assertEquals("select 1, 2", opts.getBoundaryQuery());
|
||||||
|
}
|
||||||
|
|
||||||
public void testPropertySerialization1() {
|
public void testPropertySerialization1() {
|
||||||
// Test that if we write a SqoopOptions out to a Properties,
|
// Test that if we write a SqoopOptions out to a Properties,
|
||||||
// and then read it back in, we get all the same results.
|
// and then read it back in, we get all the same results.
|
||||||
|
Loading…
Reference in New Issue
Block a user