5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 01:00:46 +08:00

SQOOP-468. Oracle free form queries fail.

(Cheolsoo Park via Jarek Jarcec Cecho)


git-svn-id: https://svn.apache.org/repos/asf/sqoop/trunk@1309268 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jarek Jarcec Cecho 2012-04-04 06:58:52 +00:00
parent 2a27bf28fb
commit 4373a338ba
4 changed files with 321 additions and 11 deletions

View File

@ -111,6 +111,36 @@ protected Class<? extends OutputFormat> getOutputFormatClass()
return null;
}
/**
* Build the boundary query for the column of the result set created by
* the given query.
* @param col column name whose boundaries we're interested in.
* @param query sub-query used to create the result set.
* @return input boundary query as a string
*/
private String buildBoundaryQuery(String col, String query) {
if (col == null) {
return "";
}
// Replace table name with alias 't1' if column name is a fully
// qualified name. This is needed because "tableName"."columnName"
// in the input boundary query causes a SQL syntax error in most dbs
// including Oracle and MySQL.
String alias = "t1";
int dot = col.lastIndexOf('.');
String qualifiedName = (dot == -1) ? col : alias + col.substring(dot);
ConnManager mgr = getContext().getConnManager();
String ret = mgr.getInputBoundsQuery(qualifiedName, query);
if (ret != null) {
return ret;
}
return "SELECT MIN(" + qualifiedName + "), MAX(" + qualifiedName + ") "
+ "FROM (" + query + ") AS " + alias;
}
@Override
protected void configureInputFormat(Job job, String tableName,
String tableClassName, String splitByCol) throws IOException {
@ -165,18 +195,8 @@ protected void configureInputFormat(Job job, String tableName,
DataDrivenDBInputFormat.SUBSTITUTE_TOKEN, " (1 = 1) ");
String inputBoundingQuery = options.getBoundaryQuery();
if (inputBoundingQuery == null) {
inputBoundingQuery =
mgr.getInputBoundsQuery(splitByCol, sanitizedQuery);
if (inputBoundingQuery == null) {
if (splitByCol != null) {
inputBoundingQuery = "SELECT MIN(" + splitByCol + "), MAX("
+ splitByCol + ") FROM (" + sanitizedQuery + ") AS t1";
} else {
inputBoundingQuery = "";
}
}
inputBoundingQuery = buildBoundaryQuery(splitByCol, sanitizedQuery);
}
DataDrivenDBInputFormat.setInput(job, DBWritable.class,
inputQuery, inputBoundingQuery);

View File

@ -0,0 +1,154 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import com.cloudera.sqoop.testutil.CommonArgs;
import com.cloudera.sqoop.testutil.ImportJobTestCase;
/**
* Test free form query import.
*/
public class TestFreeFormQueryImport extends ImportJobTestCase {
private Log log;
public TestFreeFormQueryImport() {
this.log = LogFactory.getLog(TestFreeFormQueryImport.class.getName());
}
/**
* @return the Log object to use for reporting during this test
*/
protected Log getLogger() {
return log;
}
/** the names of the tables we're creating. */
private List<String> tableNames;
@Override
public void tearDown() {
// Clean up the database on our way out.
for (String tableName : tableNames) {
try {
dropTableIfExists(tableName);
} catch (SQLException e) {
log.warn("Error trying to drop table '" + tableName
+ "' on tearDown: " + e);
}
}
super.tearDown();
}
/**
* Create the argv to pass to Sqoop.
* @param splitByCol column of the table used to split work.
* @param query free form query to be used.
* @return the argv as an array of strings.
*/
protected String [] getArgv(String splitByCol, String query) {
ArrayList<String> args = new ArrayList<String>();
CommonArgs.addHadoopFlags(args);
args.add("--connect");
args.add(getConnectString());
args.add("--target-dir");
args.add(getWarehouseDir());
args.add("--split-by");
args.add(splitByCol);
args.add("--num-mappers");
args.add("2");
args.add("--query");
args.add(query);
return args.toArray(new String[0]);
}
/**
* Create two tables that share the common id column. Run free-form query
* import on the result table that is created by joining the two tables on
* the id column.
*/
public void testSimpleJoin() throws IOException {
tableNames = new ArrayList<String>();
String [] types1 = { "SMALLINT", };
String [] vals1 = { "1", };
String tableName1 = getTableName();
createTableWithColTypes(types1, vals1);
tableNames.add(tableName1);
incrementTableNum();
String [] types2 = { "SMALLINT", "VARCHAR(32)", };
String [] vals2 = { "1", "'foo'", };
String tableName2 = getTableName();
createTableWithColTypes(types2, vals2);
tableNames.add(tableName2);
String query = "SELECT "
+ tableName1 + "." + getColName(0) + ", "
+ tableName2 + "." + getColName(1) + " "
+ "FROM " + tableName1 + " JOIN " + tableName2 + " ON ("
+ tableName1 + "." + getColName(0) + " = "
+ tableName2 + "." + getColName(0) + ") WHERE "
+ tableName1 + "." + getColName(0) + " < 3 AND $CONDITIONS";
runImport(getArgv(tableName1 + "." + getColName(0), query));
Path warehousePath = new Path(this.getWarehouseDir());
Path filePath = new Path(warehousePath, "part-m-00000");
String expectedVal = "1,foo";
BufferedReader reader = null;
if (!isOnPhysicalCluster()) {
reader = new BufferedReader(
new InputStreamReader(new FileInputStream(
new File(filePath.toString()))));
} else {
FileSystem dfs = FileSystem.get(getConf());
FSDataInputStream dis = dfs.open(filePath);
reader = new BufferedReader(new InputStreamReader(dis));
}
try {
String line = reader.readLine();
assertEquals("QueryResult expected a different string",
expectedVal, line);
} finally {
IOUtils.closeStream(reader);
}
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.manager;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.TestFreeFormQueryImport;
/**
* Test free form query import with the MySQL db.
*/
public class MySQLFreeFormQueryTest extends TestFreeFormQueryImport {
public static final Log LOG = LogFactory.getLog(
MySQLFreeFormQueryTest.class.getName());
@Override
protected Log getLogger() {
return LOG;
}
@Override
protected boolean useHsqldbTestServer() {
return false;
}
@Override
protected String getConnectString() {
return MySQLTestUtils.CONNECT_STRING;
}
@Override
protected SqoopOptions getSqoopOptions(Configuration conf) {
SqoopOptions opts = new SqoopOptions(conf);
opts.setUsername(MySQLTestUtils.getCurrentUser());
return opts;
}
@Override
protected void dropTableIfExists(String table) throws SQLException {
Connection conn = getManager().getConnection();
PreparedStatement statement = conn.prepareStatement(
"DROP TABLE IF EXISTS " + table,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
try {
statement.executeUpdate();
conn.commit();
} finally {
statement.close();
}
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.manager;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.TestFreeFormQueryImport;
/**
* Test free form query import with the Oracle db.
*/
public class OracleFreeFormQueryTest extends TestFreeFormQueryImport {
public static final Log LOG = LogFactory.getLog(
OracleFreeFormQueryTest.class.getName());
@Override
protected boolean useHsqldbTestServer() {
return false;
}
@Override
protected String getConnectString() {
return OracleUtils.CONNECT_STRING;
}
@Override
protected SqoopOptions getSqoopOptions(Configuration conf) {
SqoopOptions opts = new SqoopOptions(conf);
OracleUtils.setOracleAuth(opts);
return opts;
}
@Override
protected void dropTableIfExists(String table) throws SQLException {
OracleUtils.dropTable(table, getManager());
}
}