diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java index 127bac83..f6422bdc 100644 --- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java @@ -111,6 +111,36 @@ protected Class getOutputFormatClass() return null; } + /** + * Build the boundary query for the column of the result set created by + * the given query. + * @param col column name whose boundaries we're interested in. + * @param query sub-query used to create the result set. + * @return input boundary query as a string + */ + private String buildBoundaryQuery(String col, String query) { + if (col == null) { + return ""; + } + + // Replace table name with alias 't1' if column name is a fully + // qualified name. This is needed because "tableName"."columnName" + // in the input boundary query causes a SQL syntax error in most dbs + // including Oracle and MySQL. + String alias = "t1"; + int dot = col.lastIndexOf('.'); + String qualifiedName = (dot == -1) ? col : alias + col.substring(dot); + + ConnManager mgr = getContext().getConnManager(); + String ret = mgr.getInputBoundsQuery(qualifiedName, query); + if (ret != null) { + return ret; + } + + return "SELECT MIN(" + qualifiedName + "), MAX(" + qualifiedName + ") " + + "FROM (" + query + ") AS " + alias; + } + @Override protected void configureInputFormat(Job job, String tableName, String tableClassName, String splitByCol) throws IOException { @@ -165,18 +195,8 @@ protected void configureInputFormat(Job job, String tableName, DataDrivenDBInputFormat.SUBSTITUTE_TOKEN, " (1 = 1) "); String inputBoundingQuery = options.getBoundaryQuery(); - if (inputBoundingQuery == null) { - inputBoundingQuery = - mgr.getInputBoundsQuery(splitByCol, sanitizedQuery); - if (inputBoundingQuery == null) { - if (splitByCol != null) { - inputBoundingQuery = "SELECT MIN(" + splitByCol + "), MAX(" - + splitByCol + ") FROM (" + sanitizedQuery + ") AS t1"; - } else { - inputBoundingQuery = ""; - } - } + inputBoundingQuery = buildBoundaryQuery(splitByCol, sanitizedQuery); } DataDrivenDBInputFormat.setInput(job, DBWritable.class, inputQuery, inputBoundingQuery); diff --git a/src/test/com/cloudera/sqoop/TestFreeFormQueryImport.java b/src/test/com/cloudera/sqoop/TestFreeFormQueryImport.java new file mode 100644 index 00000000..81f1c597 --- /dev/null +++ b/src/test/com/cloudera/sqoop/TestFreeFormQueryImport.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; + +import com.cloudera.sqoop.testutil.CommonArgs; +import com.cloudera.sqoop.testutil.ImportJobTestCase; + +/** + * Test free form query import. + */ +public class TestFreeFormQueryImport extends ImportJobTestCase { + + private Log log; + + public TestFreeFormQueryImport() { + this.log = LogFactory.getLog(TestFreeFormQueryImport.class.getName()); + } + + /** + * @return the Log object to use for reporting during this test + */ + protected Log getLogger() { + return log; + } + + /** the names of the tables we're creating. */ + private List tableNames; + + @Override + public void tearDown() { + // Clean up the database on our way out. + for (String tableName : tableNames) { + try { + dropTableIfExists(tableName); + } catch (SQLException e) { + log.warn("Error trying to drop table '" + tableName + + "' on tearDown: " + e); + } + } + super.tearDown(); + } + + /** + * Create the argv to pass to Sqoop. + * @param splitByCol column of the table used to split work. + * @param query free form query to be used. + * @return the argv as an array of strings. + */ + protected String [] getArgv(String splitByCol, String query) { + ArrayList args = new ArrayList(); + + CommonArgs.addHadoopFlags(args); + + args.add("--connect"); + args.add(getConnectString()); + args.add("--target-dir"); + args.add(getWarehouseDir()); + args.add("--split-by"); + args.add(splitByCol); + args.add("--num-mappers"); + args.add("2"); + args.add("--query"); + args.add(query); + + return args.toArray(new String[0]); + } + + /** + * Create two tables that share the common id column. Run free-form query + * import on the result table that is created by joining the two tables on + * the id column. + */ + public void testSimpleJoin() throws IOException { + tableNames = new ArrayList(); + + String [] types1 = { "SMALLINT", }; + String [] vals1 = { "1", }; + String tableName1 = getTableName(); + createTableWithColTypes(types1, vals1); + tableNames.add(tableName1); + + incrementTableNum(); + + String [] types2 = { "SMALLINT", "VARCHAR(32)", }; + String [] vals2 = { "1", "'foo'", }; + String tableName2 = getTableName(); + createTableWithColTypes(types2, vals2); + tableNames.add(tableName2); + + String query = "SELECT " + + tableName1 + "." + getColName(0) + ", " + + tableName2 + "." + getColName(1) + " " + + "FROM " + tableName1 + " JOIN " + tableName2 + " ON (" + + tableName1 + "." + getColName(0) + " = " + + tableName2 + "." + getColName(0) + ") WHERE " + + tableName1 + "." + getColName(0) + " < 3 AND $CONDITIONS"; + + runImport(getArgv(tableName1 + "." + getColName(0), query)); + + Path warehousePath = new Path(this.getWarehouseDir()); + Path filePath = new Path(warehousePath, "part-m-00000"); + String expectedVal = "1,foo"; + + BufferedReader reader = null; + if (!isOnPhysicalCluster()) { + reader = new BufferedReader( + new InputStreamReader(new FileInputStream( + new File(filePath.toString())))); + } else { + FileSystem dfs = FileSystem.get(getConf()); + FSDataInputStream dis = dfs.open(filePath); + reader = new BufferedReader(new InputStreamReader(dis)); + } + try { + String line = reader.readLine(); + assertEquals("QueryResult expected a different string", + expectedVal, line); + } finally { + IOUtils.closeStream(reader); + } + } +} diff --git a/src/test/com/cloudera/sqoop/manager/MySQLFreeFormQueryTest.java b/src/test/com/cloudera/sqoop/manager/MySQLFreeFormQueryTest.java new file mode 100644 index 00000000..935d948b --- /dev/null +++ b/src/test/com/cloudera/sqoop/manager/MySQLFreeFormQueryTest.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.manager; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.TestFreeFormQueryImport; + +/** + * Test free form query import with the MySQL db. + */ +public class MySQLFreeFormQueryTest extends TestFreeFormQueryImport { + + public static final Log LOG = LogFactory.getLog( + MySQLFreeFormQueryTest.class.getName()); + + @Override + protected Log getLogger() { + return LOG; + } + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + @Override + protected String getConnectString() { + return MySQLTestUtils.CONNECT_STRING; + } + + @Override + protected SqoopOptions getSqoopOptions(Configuration conf) { + SqoopOptions opts = new SqoopOptions(conf); + opts.setUsername(MySQLTestUtils.getCurrentUser()); + return opts; + } + + @Override + protected void dropTableIfExists(String table) throws SQLException { + Connection conn = getManager().getConnection(); + PreparedStatement statement = conn.prepareStatement( + "DROP TABLE IF EXISTS " + table, + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try { + statement.executeUpdate(); + conn.commit(); + } finally { + statement.close(); + } + } +} diff --git a/src/test/com/cloudera/sqoop/manager/OracleFreeFormQueryTest.java b/src/test/com/cloudera/sqoop/manager/OracleFreeFormQueryTest.java new file mode 100644 index 00000000..e4f94239 --- /dev/null +++ b/src/test/com/cloudera/sqoop/manager/OracleFreeFormQueryTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.manager; + +import java.sql.SQLException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.TestFreeFormQueryImport; + +/** + * Test free form query import with the Oracle db. + */ +public class OracleFreeFormQueryTest extends TestFreeFormQueryImport { + + public static final Log LOG = LogFactory.getLog( + OracleFreeFormQueryTest.class.getName()); + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + @Override + protected String getConnectString() { + return OracleUtils.CONNECT_STRING; + } + + @Override + protected SqoopOptions getSqoopOptions(Configuration conf) { + SqoopOptions opts = new SqoopOptions(conf); + OracleUtils.setOracleAuth(opts); + return opts; + } + + @Override + protected void dropTableIfExists(String table) throws SQLException { + OracleUtils.dropTable(table, getManager()); + } +} +