From 9dfc7ba898e6746ad595d4523f698c5dd45bb5d6 Mon Sep 17 00:00:00 2001 From: Andrew Bayer Date: Fri, 22 Jul 2011 20:03:21 +0000 Subject: [PATCH] MAPREDUCE-674. Sqoop should allow a "where" clause to avoid having to export entire tables. Contributed by Kevin Weil. From: Thomas White git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149813 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/sqoop/ImportOptions.java | 9 + .../sqoop/manager/LocalMySQLManager.java | 21 +-- .../apache/hadoop/sqoop/mapred/ImportJob.java | 5 +- .../org/apache/hadoop/sqoop/AllTests.java | 1 + .../org/apache/hadoop/sqoop/TestWhere.java | 170 ++++++++++++++++++ .../hadoop/sqoop/manager/LocalMySQLTest.java | 3 +- 6 files changed, 197 insertions(+), 12 deletions(-) create mode 100644 src/test/org/apache/hadoop/sqoop/TestWhere.java diff --git a/src/java/org/apache/hadoop/sqoop/ImportOptions.java b/src/java/org/apache/hadoop/sqoop/ImportOptions.java index b88b3e81..e1c58b74 100644 --- a/src/java/org/apache/hadoop/sqoop/ImportOptions.java +++ b/src/java/org/apache/hadoop/sqoop/ImportOptions.java @@ -88,6 +88,7 @@ public enum FileLayout { private ControlAction action; private String hadoopHome; private String orderByCol; + private String whereClause; private String debugSqlCmd; private String driverClassName; private String warehouseDir; @@ -137,6 +138,7 @@ private void loadFromProperties() { this.tableName = props.getProperty("db.table", this.tableName); this.connectString = props.getProperty("db.connect.url", this.connectString); this.orderByCol = props.getProperty("db.sort.column", this.orderByCol); + this.whereClause = props.getProperty("db.where.clause", this.whereClause); this.driverClassName = props.getProperty("jdbc.driver", this.driverClassName); this.warehouseDir = props.getProperty("hdfs.warehouse.dir", this.warehouseDir); this.hiveHome = props.getProperty("hive.home", this.hiveHome); @@ -213,6 +215,7 @@ public static void printUsage() { System.out.println("--table (tablename) Table to read"); System.out.println("--columns (col,col,col...) Columns to export from table"); System.out.println("--order-by (column-name) Column of the table used to order results"); + System.out.println("--where (where clause) Where clause to use during export"); System.out.println("--hadoop-home (dir) Override $HADOOP_HOME"); System.out.println("--hive-home (dir) Override $HIVE_HOME"); System.out.println("--warehouse-dir (dir) HDFS path for table destination"); @@ -260,6 +263,8 @@ public void parse(String [] args) throws InvalidOptionsException { this.columns = columnString.split(","); } else if (args[i].equals("--order-by")) { this.orderByCol = args[++i]; + } else if (args[i].equals("--where")) { + this.whereClause = args[++i]; } else if (args[i].equals("--list-tables")) { this.action = ControlAction.ListTables; } else if (args[i].equals("--all-tables")) { @@ -364,6 +369,10 @@ public String[] getColumns() { public String getOrderByCol() { return orderByCol; } + + public String getWhereClause() { + return whereClause; + } public ControlAction getAction() { return action; diff --git a/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java b/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java index b7115197..d765701f 100644 --- a/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java +++ b/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java @@ -20,24 +20,17 @@ import java.io.BufferedReader; import java.io.BufferedWriter; -import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; -import java.io.Reader; -import java.io.Writer; import java.net.MalformedURLException; import java.net.URL; -import java.sql.ResultSet; -import java.sql.SQLException; import java.util.ArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -128,6 +121,14 @@ public void importTable(String tableName, String jarFile, Configuration conf) // TODO(aaron): This is really insecure. args.add("--password=" + password); } + + String whereClause = options.getWhereClause(); + if (null != whereClause) { + // Don't use the --where="" version because spaces in it can confuse + // Java, and adding in surrounding quotes confuses Java as well. + args.add("-w"); + args.add(whereClause); + } args.add("--quick"); // no buffering // TODO(aaron): Add a flag to allow --lock-tables instead for MyISAM data @@ -135,7 +136,7 @@ public void importTable(String tableName, String jarFile, Configuration conf) args.add(databaseName); args.add(tableName); - + Process p = null; try { // begin the import in an external process. @@ -143,9 +144,9 @@ public void importTable(String tableName, String jarFile, Configuration conf) for (String arg : args) { LOG.debug(" " + arg); } - + p = Runtime.getRuntime().exec(args.toArray(new String[0])); - + // read from the pipe, into HDFS. InputStream is = p.getInputStream(); OutputStream os = null; diff --git a/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java b/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java index dfd44bb3..a84126f6 100644 --- a/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java +++ b/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java @@ -128,11 +128,14 @@ public void runImport(String tableName, String ormJarFile, String orderByCol, if (null == colNames) { colNames = mgr.getColumnNames(tableName); } + + // It's ok if the where clause is null in DBInputFormat.setInput. + String whereClause = options.getWhereClause(); // We can't set the class properly in here, because we may not have the // jar loaded in this JVM. So we start by calling setInput() with DBWritable, // and then overriding the string manually. - DBInputFormat.setInput(job, DBWritable.class, tableName, null, + DBInputFormat.setInput(job, DBWritable.class, tableName, whereClause, orderByCol, colNames); job.set(DBConfiguration.INPUT_CLASS_PROPERTY, tableClassName); diff --git a/src/test/org/apache/hadoop/sqoop/AllTests.java b/src/test/org/apache/hadoop/sqoop/AllTests.java index 6a410a27..ad0121b4 100644 --- a/src/test/org/apache/hadoop/sqoop/AllTests.java +++ b/src/test/org/apache/hadoop/sqoop/AllTests.java @@ -46,6 +46,7 @@ public static Test suite() { suite.addTestSuite(TestColumnTypes.class); suite.addTestSuite(TestMultiCols.class); suite.addTestSuite(TestOrderBy.class); + suite.addTestSuite(TestWhere.class); suite.addTestSuite(LocalMySQLTest.class); suite.addTestSuite(TestHiveImport.class); diff --git a/src/test/org/apache/hadoop/sqoop/TestWhere.java b/src/test/org/apache/hadoop/sqoop/TestWhere.java new file mode 100644 index 00000000..93c94141 --- /dev/null +++ b/src/test/org/apache/hadoop/sqoop/TestWhere.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.util.ReflectionUtils; + +import org.apache.hadoop.sqoop.ImportOptions.InvalidOptionsException; +import org.apache.hadoop.sqoop.orm.CompilationManager; +import org.apache.hadoop.sqoop.testutil.HsqldbTestServer; +import org.apache.hadoop.sqoop.testutil.ImportJobTestCase; +import org.apache.hadoop.sqoop.testutil.SeqFileReader; +import org.apache.hadoop.sqoop.util.ClassLoaderStack; + +/** + * Test that --where works in Sqoop. + * Methods essentially copied out of the other Test* classes. + * TODO(kevin or aaron): Factor out these common test methods + * so that every new Test* class doesn't need to copy the code. + * + * + */ +public class TestWhere extends ImportJobTestCase { + + /** + * Create the argv to pass to Sqoop + * @return the argv as an array of strings. + */ + private String [] getArgv(boolean includeHadoopFlags, String [] colNames, String whereClause) { + String columnsString = ""; + for (String col : colNames) { + columnsString += col + ","; + } + + ArrayList args = new ArrayList(); + + if (includeHadoopFlags) { + args.add("-D"); + args.add("mapred.job.tracker=local"); + args.add("-D"); + args.add("mapred.map.tasks=1"); + args.add("-D"); + args.add("fs.default.name=file:///"); + } + + args.add("--table"); + args.add(HsqldbTestServer.getTableName()); + args.add("--columns"); + args.add(columnsString); + args.add("--where"); + args.add(whereClause); + args.add("--order-by"); + args.add("INTFIELD1"); + args.add("--warehouse-dir"); + args.add(getWarehouseDir()); + args.add("--connect"); + args.add(HsqldbTestServer.getUrl()); + args.add("--as-sequencefile"); + + return args.toArray(new String[0]); + } + + // this test just uses the two int table. + protected String getTableName() { + return HsqldbTestServer.getTableName(); + } + + + /** + * Given a comma-delimited list of integers, grab and parse the first int + * @param str a comma-delimited list of values, the first of which is an int. + * @return the first field in the string, cast to int + */ + private int getFirstInt(String str) { + String [] parts = str.split(","); + return Integer.parseInt(parts[0]); + } + + public void runWhereTest(String whereClause, String firstValStr, int numExpectedResults, int expectedSum) + throws IOException { + + String [] columns = HsqldbTestServer.getFieldNames(); + ClassLoader prevClassLoader = null; + SequenceFile.Reader reader = null; + + String [] argv = getArgv(true, columns, whereClause); + runImport(argv); + try { + ImportOptions opts = new ImportOptions(); + opts.parse(getArgv(false, columns, whereClause)); + + CompilationManager compileMgr = new CompilationManager(opts); + String jarFileName = compileMgr.getJarFilename(); + + prevClassLoader = ClassLoaderStack.addJarFile(jarFileName, getTableName()); + + reader = SeqFileReader.getSeqFileReader(getDataFilePath().toString()); + + // here we can actually instantiate (k, v) pairs. + Configuration conf = new Configuration(); + Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf); + Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf); + + if (reader.next(key) == null) { + fail("Empty SequenceFile during import"); + } + + // make sure that the value we think should be at the top, is. + reader.getCurrentValue(val); + assertEquals("Invalid ordering within sorted SeqFile", firstValStr, val.toString()); + + // We know that these values are two ints separated by a ',' character. + // Since this is all dynamic, though, we don't want to actually link against + // the class and use its methods. So we just parse this back into int fields manually. + // Sum them up and ensure that we get the expected total for the first column, to + // verify that we got all the results from the db into the file. + int curSum = getFirstInt(val.toString()); + int totalResults = 1; + + // now sum up everything else in the file. + while (reader.next(key) != null) { + reader.getCurrentValue(val); + curSum += getFirstInt(val.toString()); + totalResults++; + } + + assertEquals("Total sum of first db column mismatch", expectedSum, curSum); + assertEquals("Incorrect number of results for query", numExpectedResults, totalResults); + } catch (InvalidOptionsException ioe) { + fail(ioe.toString()); + } finally { + IOUtils.closeStream(reader); + + if (null != prevClassLoader) { + ClassLoaderStack.setCurrentClassLoader(prevClassLoader); + } + } + } + + public void testSingleClauseWhere() throws IOException { + String whereClause = "INTFIELD2 > 4"; + runWhereTest(whereClause, "1,8", 2, 4); + } + + public void testMultiClauseWhere() throws IOException { + String whereClause = "INTFIELD1 > 4 AND INTFIELD2 < 3"; + runWhereTest(whereClause, "7,2", 1, 7); + } +} diff --git a/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java b/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java index 973e04c5..9eb04d3d 100644 --- a/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java +++ b/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java @@ -201,6 +201,8 @@ private String getCurrentUser() { args.add("--local"); args.add("--username"); args.add(getCurrentUser()); + args.add("--where"); + args.add("id > 1"); return args.toArray(new String[0]); } @@ -226,7 +228,6 @@ public void testLocalBulkImport() { try { // Read through the file and make sure it's all there. r = new BufferedReader(new InputStreamReader(new FileInputStream(f))); - assertEquals("1,'Aaron','2009-05-14',1e+06,'engineering'", r.readLine()); assertEquals("2,'Bob','2009-04-20',400,'sales'", r.readLine()); assertEquals("3,'Fred','2009-01-23',15,'marketing'", r.readLine()); } catch (IOException ioe) {