mirror of
https://github.com/apache/sqoop.git
synced 2025-05-03 20:40:58 +08:00
MAPREDUCE-674. Sqoop should allow a "where" clause to avoid having to export entire tables. Contributed by Kevin Weil.
From: Thomas White <tomwhite@apache.org> git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149813 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8b7bde6f35
commit
9dfc7ba898
@ -88,6 +88,7 @@ public enum FileLayout {
|
|||||||
private ControlAction action;
|
private ControlAction action;
|
||||||
private String hadoopHome;
|
private String hadoopHome;
|
||||||
private String orderByCol;
|
private String orderByCol;
|
||||||
|
private String whereClause;
|
||||||
private String debugSqlCmd;
|
private String debugSqlCmd;
|
||||||
private String driverClassName;
|
private String driverClassName;
|
||||||
private String warehouseDir;
|
private String warehouseDir;
|
||||||
@ -137,6 +138,7 @@ private void loadFromProperties() {
|
|||||||
this.tableName = props.getProperty("db.table", this.tableName);
|
this.tableName = props.getProperty("db.table", this.tableName);
|
||||||
this.connectString = props.getProperty("db.connect.url", this.connectString);
|
this.connectString = props.getProperty("db.connect.url", this.connectString);
|
||||||
this.orderByCol = props.getProperty("db.sort.column", this.orderByCol);
|
this.orderByCol = props.getProperty("db.sort.column", this.orderByCol);
|
||||||
|
this.whereClause = props.getProperty("db.where.clause", this.whereClause);
|
||||||
this.driverClassName = props.getProperty("jdbc.driver", this.driverClassName);
|
this.driverClassName = props.getProperty("jdbc.driver", this.driverClassName);
|
||||||
this.warehouseDir = props.getProperty("hdfs.warehouse.dir", this.warehouseDir);
|
this.warehouseDir = props.getProperty("hdfs.warehouse.dir", this.warehouseDir);
|
||||||
this.hiveHome = props.getProperty("hive.home", this.hiveHome);
|
this.hiveHome = props.getProperty("hive.home", this.hiveHome);
|
||||||
@ -213,6 +215,7 @@ public static void printUsage() {
|
|||||||
System.out.println("--table (tablename) Table to read");
|
System.out.println("--table (tablename) Table to read");
|
||||||
System.out.println("--columns (col,col,col...) Columns to export from table");
|
System.out.println("--columns (col,col,col...) Columns to export from table");
|
||||||
System.out.println("--order-by (column-name) Column of the table used to order results");
|
System.out.println("--order-by (column-name) Column of the table used to order results");
|
||||||
|
System.out.println("--where (where clause) Where clause to use during export");
|
||||||
System.out.println("--hadoop-home (dir) Override $HADOOP_HOME");
|
System.out.println("--hadoop-home (dir) Override $HADOOP_HOME");
|
||||||
System.out.println("--hive-home (dir) Override $HIVE_HOME");
|
System.out.println("--hive-home (dir) Override $HIVE_HOME");
|
||||||
System.out.println("--warehouse-dir (dir) HDFS path for table destination");
|
System.out.println("--warehouse-dir (dir) HDFS path for table destination");
|
||||||
@ -260,6 +263,8 @@ public void parse(String [] args) throws InvalidOptionsException {
|
|||||||
this.columns = columnString.split(",");
|
this.columns = columnString.split(",");
|
||||||
} else if (args[i].equals("--order-by")) {
|
} else if (args[i].equals("--order-by")) {
|
||||||
this.orderByCol = args[++i];
|
this.orderByCol = args[++i];
|
||||||
|
} else if (args[i].equals("--where")) {
|
||||||
|
this.whereClause = args[++i];
|
||||||
} else if (args[i].equals("--list-tables")) {
|
} else if (args[i].equals("--list-tables")) {
|
||||||
this.action = ControlAction.ListTables;
|
this.action = ControlAction.ListTables;
|
||||||
} else if (args[i].equals("--all-tables")) {
|
} else if (args[i].equals("--all-tables")) {
|
||||||
@ -364,6 +369,10 @@ public String[] getColumns() {
|
|||||||
public String getOrderByCol() {
|
public String getOrderByCol() {
|
||||||
return orderByCol;
|
return orderByCol;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getWhereClause() {
|
||||||
|
return whereClause;
|
||||||
|
}
|
||||||
|
|
||||||
public ControlAction getAction() {
|
public ControlAction getAction() {
|
||||||
return action;
|
return action;
|
||||||
|
@ -20,24 +20,17 @@
|
|||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.BufferedWriter;
|
import java.io.BufferedWriter;
|
||||||
import java.io.File;
|
|
||||||
import java.io.FileInputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.io.OutputStreamWriter;
|
import java.io.OutputStreamWriter;
|
||||||
import java.io.Reader;
|
|
||||||
import java.io.Writer;
|
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.sql.ResultSet;
|
|
||||||
import java.sql.SQLException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
@ -128,6 +121,14 @@ public void importTable(String tableName, String jarFile, Configuration conf)
|
|||||||
// TODO(aaron): This is really insecure.
|
// TODO(aaron): This is really insecure.
|
||||||
args.add("--password=" + password);
|
args.add("--password=" + password);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String whereClause = options.getWhereClause();
|
||||||
|
if (null != whereClause) {
|
||||||
|
// Don't use the --where="<whereClause>" version because spaces in it can confuse
|
||||||
|
// Java, and adding in surrounding quotes confuses Java as well.
|
||||||
|
args.add("-w");
|
||||||
|
args.add(whereClause);
|
||||||
|
}
|
||||||
|
|
||||||
args.add("--quick"); // no buffering
|
args.add("--quick"); // no buffering
|
||||||
// TODO(aaron): Add a flag to allow --lock-tables instead for MyISAM data
|
// TODO(aaron): Add a flag to allow --lock-tables instead for MyISAM data
|
||||||
@ -135,7 +136,7 @@ public void importTable(String tableName, String jarFile, Configuration conf)
|
|||||||
|
|
||||||
args.add(databaseName);
|
args.add(databaseName);
|
||||||
args.add(tableName);
|
args.add(tableName);
|
||||||
|
|
||||||
Process p = null;
|
Process p = null;
|
||||||
try {
|
try {
|
||||||
// begin the import in an external process.
|
// begin the import in an external process.
|
||||||
@ -143,9 +144,9 @@ public void importTable(String tableName, String jarFile, Configuration conf)
|
|||||||
for (String arg : args) {
|
for (String arg : args) {
|
||||||
LOG.debug(" " + arg);
|
LOG.debug(" " + arg);
|
||||||
}
|
}
|
||||||
|
|
||||||
p = Runtime.getRuntime().exec(args.toArray(new String[0]));
|
p = Runtime.getRuntime().exec(args.toArray(new String[0]));
|
||||||
|
|
||||||
// read from the pipe, into HDFS.
|
// read from the pipe, into HDFS.
|
||||||
InputStream is = p.getInputStream();
|
InputStream is = p.getInputStream();
|
||||||
OutputStream os = null;
|
OutputStream os = null;
|
||||||
|
@ -128,11 +128,14 @@ public void runImport(String tableName, String ormJarFile, String orderByCol,
|
|||||||
if (null == colNames) {
|
if (null == colNames) {
|
||||||
colNames = mgr.getColumnNames(tableName);
|
colNames = mgr.getColumnNames(tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// It's ok if the where clause is null in DBInputFormat.setInput.
|
||||||
|
String whereClause = options.getWhereClause();
|
||||||
|
|
||||||
// We can't set the class properly in here, because we may not have the
|
// We can't set the class properly in here, because we may not have the
|
||||||
// jar loaded in this JVM. So we start by calling setInput() with DBWritable,
|
// jar loaded in this JVM. So we start by calling setInput() with DBWritable,
|
||||||
// and then overriding the string manually.
|
// and then overriding the string manually.
|
||||||
DBInputFormat.setInput(job, DBWritable.class, tableName, null,
|
DBInputFormat.setInput(job, DBWritable.class, tableName, whereClause,
|
||||||
orderByCol, colNames);
|
orderByCol, colNames);
|
||||||
job.set(DBConfiguration.INPUT_CLASS_PROPERTY, tableClassName);
|
job.set(DBConfiguration.INPUT_CLASS_PROPERTY, tableClassName);
|
||||||
|
|
||||||
|
@ -46,6 +46,7 @@ public static Test suite() {
|
|||||||
suite.addTestSuite(TestColumnTypes.class);
|
suite.addTestSuite(TestColumnTypes.class);
|
||||||
suite.addTestSuite(TestMultiCols.class);
|
suite.addTestSuite(TestMultiCols.class);
|
||||||
suite.addTestSuite(TestOrderBy.class);
|
suite.addTestSuite(TestOrderBy.class);
|
||||||
|
suite.addTestSuite(TestWhere.class);
|
||||||
suite.addTestSuite(LocalMySQLTest.class);
|
suite.addTestSuite(LocalMySQLTest.class);
|
||||||
suite.addTestSuite(TestHiveImport.class);
|
suite.addTestSuite(TestHiveImport.class);
|
||||||
|
|
||||||
|
170
src/test/org/apache/hadoop/sqoop/TestWhere.java
Normal file
170
src/test/org/apache/hadoop/sqoop/TestWhere.java
Normal file
@ -0,0 +1,170 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.sqoop;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
|
||||||
|
import org.apache.hadoop.sqoop.ImportOptions.InvalidOptionsException;
|
||||||
|
import org.apache.hadoop.sqoop.orm.CompilationManager;
|
||||||
|
import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
|
||||||
|
import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
|
||||||
|
import org.apache.hadoop.sqoop.testutil.SeqFileReader;
|
||||||
|
import org.apache.hadoop.sqoop.util.ClassLoaderStack;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that --where works in Sqoop.
|
||||||
|
* Methods essentially copied out of the other Test* classes.
|
||||||
|
* TODO(kevin or aaron): Factor out these common test methods
|
||||||
|
* so that every new Test* class doesn't need to copy the code.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class TestWhere extends ImportJobTestCase {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create the argv to pass to Sqoop
|
||||||
|
* @return the argv as an array of strings.
|
||||||
|
*/
|
||||||
|
private String [] getArgv(boolean includeHadoopFlags, String [] colNames, String whereClause) {
|
||||||
|
String columnsString = "";
|
||||||
|
for (String col : colNames) {
|
||||||
|
columnsString += col + ",";
|
||||||
|
}
|
||||||
|
|
||||||
|
ArrayList<String> args = new ArrayList<String>();
|
||||||
|
|
||||||
|
if (includeHadoopFlags) {
|
||||||
|
args.add("-D");
|
||||||
|
args.add("mapred.job.tracker=local");
|
||||||
|
args.add("-D");
|
||||||
|
args.add("mapred.map.tasks=1");
|
||||||
|
args.add("-D");
|
||||||
|
args.add("fs.default.name=file:///");
|
||||||
|
}
|
||||||
|
|
||||||
|
args.add("--table");
|
||||||
|
args.add(HsqldbTestServer.getTableName());
|
||||||
|
args.add("--columns");
|
||||||
|
args.add(columnsString);
|
||||||
|
args.add("--where");
|
||||||
|
args.add(whereClause);
|
||||||
|
args.add("--order-by");
|
||||||
|
args.add("INTFIELD1");
|
||||||
|
args.add("--warehouse-dir");
|
||||||
|
args.add(getWarehouseDir());
|
||||||
|
args.add("--connect");
|
||||||
|
args.add(HsqldbTestServer.getUrl());
|
||||||
|
args.add("--as-sequencefile");
|
||||||
|
|
||||||
|
return args.toArray(new String[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// this test just uses the two int table.
|
||||||
|
protected String getTableName() {
|
||||||
|
return HsqldbTestServer.getTableName();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a comma-delimited list of integers, grab and parse the first int
|
||||||
|
* @param str a comma-delimited list of values, the first of which is an int.
|
||||||
|
* @return the first field in the string, cast to int
|
||||||
|
*/
|
||||||
|
private int getFirstInt(String str) {
|
||||||
|
String [] parts = str.split(",");
|
||||||
|
return Integer.parseInt(parts[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void runWhereTest(String whereClause, String firstValStr, int numExpectedResults, int expectedSum)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
String [] columns = HsqldbTestServer.getFieldNames();
|
||||||
|
ClassLoader prevClassLoader = null;
|
||||||
|
SequenceFile.Reader reader = null;
|
||||||
|
|
||||||
|
String [] argv = getArgv(true, columns, whereClause);
|
||||||
|
runImport(argv);
|
||||||
|
try {
|
||||||
|
ImportOptions opts = new ImportOptions();
|
||||||
|
opts.parse(getArgv(false, columns, whereClause));
|
||||||
|
|
||||||
|
CompilationManager compileMgr = new CompilationManager(opts);
|
||||||
|
String jarFileName = compileMgr.getJarFilename();
|
||||||
|
|
||||||
|
prevClassLoader = ClassLoaderStack.addJarFile(jarFileName, getTableName());
|
||||||
|
|
||||||
|
reader = SeqFileReader.getSeqFileReader(getDataFilePath().toString());
|
||||||
|
|
||||||
|
// here we can actually instantiate (k, v) pairs.
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
|
||||||
|
Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf);
|
||||||
|
|
||||||
|
if (reader.next(key) == null) {
|
||||||
|
fail("Empty SequenceFile during import");
|
||||||
|
}
|
||||||
|
|
||||||
|
// make sure that the value we think should be at the top, is.
|
||||||
|
reader.getCurrentValue(val);
|
||||||
|
assertEquals("Invalid ordering within sorted SeqFile", firstValStr, val.toString());
|
||||||
|
|
||||||
|
// We know that these values are two ints separated by a ',' character.
|
||||||
|
// Since this is all dynamic, though, we don't want to actually link against
|
||||||
|
// the class and use its methods. So we just parse this back into int fields manually.
|
||||||
|
// Sum them up and ensure that we get the expected total for the first column, to
|
||||||
|
// verify that we got all the results from the db into the file.
|
||||||
|
int curSum = getFirstInt(val.toString());
|
||||||
|
int totalResults = 1;
|
||||||
|
|
||||||
|
// now sum up everything else in the file.
|
||||||
|
while (reader.next(key) != null) {
|
||||||
|
reader.getCurrentValue(val);
|
||||||
|
curSum += getFirstInt(val.toString());
|
||||||
|
totalResults++;
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals("Total sum of first db column mismatch", expectedSum, curSum);
|
||||||
|
assertEquals("Incorrect number of results for query", numExpectedResults, totalResults);
|
||||||
|
} catch (InvalidOptionsException ioe) {
|
||||||
|
fail(ioe.toString());
|
||||||
|
} finally {
|
||||||
|
IOUtils.closeStream(reader);
|
||||||
|
|
||||||
|
if (null != prevClassLoader) {
|
||||||
|
ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSingleClauseWhere() throws IOException {
|
||||||
|
String whereClause = "INTFIELD2 > 4";
|
||||||
|
runWhereTest(whereClause, "1,8", 2, 4);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testMultiClauseWhere() throws IOException {
|
||||||
|
String whereClause = "INTFIELD1 > 4 AND INTFIELD2 < 3";
|
||||||
|
runWhereTest(whereClause, "7,2", 1, 7);
|
||||||
|
}
|
||||||
|
}
|
@ -201,6 +201,8 @@ private String getCurrentUser() {
|
|||||||
args.add("--local");
|
args.add("--local");
|
||||||
args.add("--username");
|
args.add("--username");
|
||||||
args.add(getCurrentUser());
|
args.add(getCurrentUser());
|
||||||
|
args.add("--where");
|
||||||
|
args.add("id > 1");
|
||||||
|
|
||||||
return args.toArray(new String[0]);
|
return args.toArray(new String[0]);
|
||||||
}
|
}
|
||||||
@ -226,7 +228,6 @@ public void testLocalBulkImport() {
|
|||||||
try {
|
try {
|
||||||
// Read through the file and make sure it's all there.
|
// Read through the file and make sure it's all there.
|
||||||
r = new BufferedReader(new InputStreamReader(new FileInputStream(f)));
|
r = new BufferedReader(new InputStreamReader(new FileInputStream(f)));
|
||||||
assertEquals("1,'Aaron','2009-05-14',1e+06,'engineering'", r.readLine());
|
|
||||||
assertEquals("2,'Bob','2009-04-20',400,'sales'", r.readLine());
|
assertEquals("2,'Bob','2009-04-20',400,'sales'", r.readLine());
|
||||||
assertEquals("3,'Fred','2009-01-23',15,'marketing'", r.readLine());
|
assertEquals("3,'Fred','2009-01-23',15,'marketing'", r.readLine());
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
|
Loading…
Reference in New Issue
Block a user