From 6a7144a496b94ee3f410a68c389cd9fc21730051 Mon Sep 17 00:00:00 2001 From: Arvind Prabhakar Date: Wed, 28 Sep 2011 17:43:11 +0000 Subject: [PATCH] SQOOP-331. Support for boundary query. (Jarek Jarcec Cecho via Arvind Prabhakar) git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1176981 13f79535-47bb-0310-9956-ffa450edef68 --- src/docs/man/import-args.txt | 3 + src/docs/user/import.txt | 6 + src/java/com/cloudera/sqoop/SqoopOptions.java | 9 + .../cloudera/sqoop/manager/SqlManager.java | 4 + .../sqoop/mapreduce/DataDrivenImportJob.java | 12 +- .../mapreduce/db/DataDrivenDBInputFormat.java | 12 +- .../cloudera/sqoop/tool/BaseSqoopTool.java | 1 + .../com/cloudera/sqoop/tool/ImportTool.java | 10 ++ .../com/cloudera/sqoop/TestBoundaryQuery.java | 166 ++++++++++++++++++ .../com/cloudera/sqoop/TestSqoopOptions.java | 9 + 10 files changed, 227 insertions(+), 5 deletions(-) create mode 100644 src/test/com/cloudera/sqoop/TestBoundaryQuery.java diff --git a/src/docs/man/import-args.txt b/src/docs/man/import-args.txt index 98a62361..b6b4bb87 100644 --- a/src/docs/man/import-args.txt +++ b/src/docs/man/import-args.txt @@ -35,6 +35,9 @@ Import control options --as-textfile:: Imports data as plain text (default) +--boundary-query (query):: + Using following query to select minimal and maximal value of '--split-by' column for creating splits + --columns (col,col,col...):: Columns to export from table diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt index e2be17ed..24878b43 100644 --- a/src/docs/user/import.txt +++ b/src/docs/user/import.txt @@ -60,6 +60,7 @@ Argument Description +\--as-avrodatafile+ Imports data to Avro Data Files +\--as-sequencefile+ Imports data to SequenceFiles +\--as-textfile+ Imports data as plain text (default) ++\--boundary-query + Boundary query to use for creating splits +\--columns + Columns to import from table +\--direct+ Use direct import fast path +\--direct-split-size + Split the input stream every 'n' bytes\ @@ -114,6 +115,11 @@ form +SELECT FROM +. You can append a "id > 400"+. Only rows where the +id+ column has a value greater than 400 will be imported. +By default sqoop will use query +select min(), max() from +
+ to find out boundaries for creating splits. In some cases this query +is not the most optimal so you can specify any arbitrary query returning two +numeric columns using +\--boundary-query+ argument. + Free-form Query Imports ^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/src/java/com/cloudera/sqoop/SqoopOptions.java b/src/java/com/cloudera/sqoop/SqoopOptions.java index 1fd98852..21b0e984 100644 --- a/src/java/com/cloudera/sqoop/SqoopOptions.java +++ b/src/java/com/cloudera/sqoop/SqoopOptions.java @@ -138,6 +138,7 @@ public enum IncrementalMode { @StoredAsProperty("db.split.column") private String splitByCol; @StoredAsProperty("db.where.clause") private String whereClause; @StoredAsProperty("db.query") private String sqlQuery; + @StoredAsProperty("db.query.boundary") private String boundaryQuery; @StoredAsProperty("jdbc.driver.class") private String driverClassName; @StoredAsProperty("hdfs.warehouse.dir") private String warehouseDir; @StoredAsProperty("hdfs.target.dir") private String targetDir; @@ -1200,6 +1201,14 @@ public void setSqlQuery(String sqlStatement) { this.sqlQuery = sqlStatement; } + public String getBoundaryQuery() { + return boundaryQuery; + } + + public void setBoundaryQuery(String sqlStatement) { + boundaryQuery = sqlStatement; + } + /** * @return The JDBC driver class name specified with --driver. */ diff --git a/src/java/com/cloudera/sqoop/manager/SqlManager.java b/src/java/com/cloudera/sqoop/manager/SqlManager.java index a08d7746..687dae77 100644 --- a/src/java/com/cloudera/sqoop/manager/SqlManager.java +++ b/src/java/com/cloudera/sqoop/manager/SqlManager.java @@ -870,4 +870,8 @@ public void migrateData(String fromTable, String toTable) } } } + + public String getInputBoundsQuery(String splitByCol, String sanitizedQuery) { + return options.getBoundaryQuery(); + } } diff --git a/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java index c7cca09f..25b82807 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java @@ -155,14 +155,24 @@ protected void configureInputFormat(Job job, String tableName, DataDrivenDBInputFormat.setInput(job, DBWritable.class, mgr.escapeTableName(tableName), whereClause, mgr.escapeColName(splitByCol), sqlColNames); + + // If user specified boundary query on the command line propagate it to + // the job + if(options.getBoundaryQuery() != null) { + DataDrivenDBInputFormat.setBoundingQuery(job.getConfiguration(), + options.getBoundaryQuery()); + } } else { // Import a free-form query. String inputQuery = options.getSqlQuery(); String sanitizedQuery = inputQuery.replace( DataDrivenDBInputFormat.SUBSTITUTE_TOKEN, " (1 = 1) "); - String inputBoundingQuery = + String inputBoundingQuery = options.getBoundaryQuery(); + + if(inputBoundingQuery == null) { mgr.getInputBoundsQuery(splitByCol, sanitizedQuery); + } if (inputBoundingQuery == null) { inputBoundingQuery = "SELECT MIN(" + splitByCol + "), MAX(" + splitByCol + ") FROM (" + sanitizedQuery + ") AS t1"; diff --git a/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java b/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java index dbae29c2..06056ddd 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java +++ b/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java @@ -170,10 +170,14 @@ protected DBSplitter getSplitter(int sqlDataType) { public List getSplits(JobContext job) throws IOException { int targetNumTasks = ConfigurationHelper.getJobNumMaps(job); - if (1 == targetNumTasks) { - // There's no need to run a bounding vals query; just return a split - // that separates nothing. This can be considerably more optimal for a - // large table with no index. + String boundaryQuery = getDBConf().getInputBoundingQuery(); + + // If user do not forced us to use his boundary query and we don't have to + // bacause there is only one mapper we will return single split that + // separates nothing. This can be considerably more optimal for a large + // table with no index. + if (1 == targetNumTasks + && (boundaryQuery == null || boundaryQuery.isEmpty())) { List singletonSplit = new ArrayList(); singletonSplit.add(new DataDrivenDBInputSplit("1=1", "1=1")); return singletonSplit; diff --git a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java index 55b383a8..f13cedbc 100644 --- a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java +++ b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java @@ -136,6 +136,7 @@ public abstract class BaseSqoopTool extends SqoopTool { public static final String CLASS_NAME_ARG = "class-name"; public static final String JAR_FILE_NAME_ARG = "jar-file"; public static final String SQL_QUERY_ARG = "query"; + public static final String SQL_QUERY_BOUNDARY = "boundary-query"; public static final String SQL_QUERY_SHORT_ARG = "e"; public static final String VERBOSE_ARG = "verbose"; public static final String HELP_ARG = "help"; diff --git a/src/java/com/cloudera/sqoop/tool/ImportTool.java b/src/java/com/cloudera/sqoop/tool/ImportTool.java index 48a8947c..702024c2 100644 --- a/src/java/com/cloudera/sqoop/tool/ImportTool.java +++ b/src/java/com/cloudera/sqoop/tool/ImportTool.java @@ -527,6 +527,12 @@ protected RelatedOptions getImportOptions() { .withDescription("Import results of SQL 'statement'") .withLongOpt(SQL_QUERY_ARG) .create(SQL_QUERY_SHORT_ARG)); + importOpts.addOption(OptionBuilder.withArgName("statement") + .hasArg() + .withDescription("Set boundary query for retrieving max and min" + + " value of the primary key") + .withLongOpt(SQL_QUERY_BOUNDARY) + .create()); } importOpts.addOption(OptionBuilder.withArgName("dir") @@ -721,6 +727,10 @@ public void applyOptions(CommandLine in, SqoopOptions out) if (in.hasOption(SQL_QUERY_ARG)) { out.setSqlQuery(in.getOptionValue(SQL_QUERY_ARG)); } + + if(in.hasOption(SQL_QUERY_BOUNDARY)) { + out.setBoundaryQuery(in.getOptionValue(SQL_QUERY_BOUNDARY)); + } } if (in.hasOption(WAREHOUSE_DIR_ARG)) { diff --git a/src/test/com/cloudera/sqoop/TestBoundaryQuery.java b/src/test/com/cloudera/sqoop/TestBoundaryQuery.java new file mode 100644 index 00000000..d7e821fe --- /dev/null +++ b/src/test/com/cloudera/sqoop/TestBoundaryQuery.java @@ -0,0 +1,166 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cloudera.sqoop; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.util.ReflectionUtils; + +import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException; +import com.cloudera.sqoop.orm.CompilationManager; +import com.cloudera.sqoop.testutil.CommonArgs; +import com.cloudera.sqoop.testutil.HsqldbTestServer; +import com.cloudera.sqoop.testutil.ImportJobTestCase; +import com.cloudera.sqoop.testutil.SeqFileReader; +import com.cloudera.sqoop.tool.ImportTool; +import com.cloudera.sqoop.util.ClassLoaderStack; + +/** + * Test that --boundary-query works in Sqoop. + */ +public class TestBoundaryQuery extends ImportJobTestCase { + + /** + * Create the argv to pass to Sqoop. + * @return the argv as an array of strings. + */ + protected String [] getArgv(boolean includeHadoopFlags, String boundaryQuery, + String targetDir) { + + ArrayList args = new ArrayList(); + + if (includeHadoopFlags) { + CommonArgs.addHadoopFlags(args); + } + + args.add("--table"); + args.add(HsqldbTestServer.getTableName()); + args.add("--split-by"); + args.add("INTFIELD1"); + args.add("--connect"); + args.add(HsqldbTestServer.getUrl()); + args.add("--boundary-query"); + args.add(boundaryQuery); + args.add("--as-sequencefile"); + args.add("--target-dir"); + args.add(targetDir); + args.add("--class-name"); + args.add(getTableName()); + args.add("--verbose"); + + return args.toArray(new String[0]); + } + + // this test just uses the two int table. + protected String getTableName() { + return HsqldbTestServer.getTableName(); + } + + + /** + * Given a comma-delimited list of integers, grab and parse the first int. + * @param str a comma-delimited list of values, the first of which is an int. + * @return the first field in the string, cast to int + */ + private int getFirstInt(String str) { + String [] parts = str.split(","); + return Integer.parseInt(parts[0]); + } + + public void runQueryTest(String query, int numExpectedResults, + int expectedSum, String targetDir) + throws IOException { + + ClassLoader prevClassLoader = null; + SequenceFile.Reader reader = null; + + String [] argv = getArgv(true, query, targetDir); + runImport(argv); + try { + SqoopOptions opts = new ImportTool().parseArguments( + getArgv(false, query, targetDir), + null, null, true); + + CompilationManager compileMgr = new CompilationManager(opts); + String jarFileName = compileMgr.getJarFilename(); + + prevClassLoader = ClassLoaderStack.addJarFile(jarFileName, + getTableName()); + + reader = SeqFileReader.getSeqFileReader(getDataFilePath().toString()); + + // here we can actually instantiate (k, v) pairs. + Configuration conf = new Configuration(); + Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf); + Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf); + + if (reader.next(key) == null) { + fail("Empty SequenceFile during import"); + } + + // make sure that the value we think should be at the top, is. + reader.getCurrentValue(val); + + // We know that these values are two ints separated by a ',' character. + // Since this is all dynamic, though, we don't want to actually link + // against the class and use its methods. So we just parse this back + // into int fields manually. Sum them up and ensure that we get the + // expected total for the first column, to verify that we got all the + // results from the db into the file. + int curSum = getFirstInt(val.toString()); + int totalResults = 1; + + // now sum up everything else in the file. + while (reader.next(key) != null) { + reader.getCurrentValue(val); + curSum += getFirstInt(val.toString()); + totalResults++; + } + + assertEquals("Total sum of first db column mismatch", expectedSum, + curSum); + assertEquals("Incorrect number of results for query", numExpectedResults, + totalResults); + } catch (InvalidOptionsException ioe) { + fail(ioe.toString()); + } catch (ParseException pe) { + fail(pe.toString()); + } finally { + IOUtils.closeStream(reader); + + if (null != prevClassLoader) { + ClassLoaderStack.setCurrentClassLoader(prevClassLoader); + } + } + } + + public void testBoundaryQuery() throws IOException { + System.out.println("PCYO"); + String query = "select min(intfield1), max(intfield1) from " + + getTableName() +" where intfield1 in (3, 5)"; + + runQueryTest(query, 2, 8, getTablePath().toString()); + } +} diff --git a/src/test/com/cloudera/sqoop/TestSqoopOptions.java b/src/test/com/cloudera/sqoop/TestSqoopOptions.java index deaa32ba..09f72d4d 100644 --- a/src/test/com/cloudera/sqoop/TestSqoopOptions.java +++ b/src/test/com/cloudera/sqoop/TestSqoopOptions.java @@ -248,6 +248,15 @@ public void testHivePartitionParams() throws Exception { assertEquals("20110413", opts.getHivePartitionValue()); } + public void testBoundaryQueryParams() throws Exception { + String[] args = { + "--boundary-query", "select 1, 2", + }; + + SqoopOptions opts = parse(args); + assertEquals("select 1, 2", opts.getBoundaryQuery()); + } + public void testPropertySerialization1() { // Test that if we write a SqoopOptions out to a Properties, // and then read it back in, we get all the same results.