From 73cc549d9d70d407ed7f95bc25e623381b75c52d Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Tue, 9 Jul 2013 21:55:27 -0700 Subject: [PATCH] SQOOP-1097: Export using procedures feature needs fixes for working with mysql (Venkat Ranganathan via Jarek Jarcec Cecho) --- .../org/apache/sqoop/manager/ConnManager.java | 33 ++- .../apache/sqoop/manager/MySQLManager.java | 131 ++++++++++- .../apache/sqoop/manager/OracleManager.java | 103 ++++++++- .../org/apache/sqoop/manager/SqlManager.java | 43 ++++ .../com/cloudera/sqoop/ThirdPartyTests.java | 7 + .../sqoop/TestExportUsingProcedure.java | 2 +- .../manager/mysql/MySqlCallExportTest.java | 199 ++++++++++++++++ .../manager/oracle/OracleCallExportTest.java | 213 ++++++++++++++++++ 8 files changed, 724 insertions(+), 7 deletions(-) create mode 100644 src/test/org/apache/sqoop/manager/mysql/MySqlCallExportTest.java create mode 100644 src/test/org/apache/sqoop/manager/oracle/OracleCallExportTest.java diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java index c9e05da4..c84c8590 100644 --- a/src/java/org/apache/sqoop/manager/ConnManager.java +++ b/src/java/org/apache/sqoop/manager/ConnManager.java @@ -368,8 +368,17 @@ public Map getColumnTypeNamesForTable(String tableName) { } /** - * Return an unordered mapping from colname to sql type name for - * all columns in a query. + * Return an unordered mapping from colname to sql type name for all columns + * in a procedure. + */ + public Map getColumnTypeNamesForProcedure(String callName) { + LOG.error("This database does not support procedure param type names."); + return null; + } + + /** + * Return an unordered mapping from colname to sql type name for all columns + * in a query. */ public Map getColumnTypeNamesForQuery(String query) { LOG.error("This database does not support free-form query" @@ -385,11 +394,29 @@ public Map getColumnTypeNamesForQuery(String query) { * @param sqlQuery the SQL query to use if tableName is null */ public Map getColumnTypeNames(String tableName, - String sqlQuery) { + String sqlQuery) { + return getColumnTypeNames(tableName, null, sqlQuery); + } + + /** + * Return an unordered mapping from colname to sql type name for all columns + * in a table or query. + * + * @param tableName + * the name of the table + * @param callName + * the name of the procedure + * @param sqlQuery + * the SQL query to use if tableName is null + */ + public Map getColumnTypeNames(String tableName, + String callName, String sqlQuery) { Map columnTypeNames; if (null != tableName) { // We're generating a class based on a table import. columnTypeNames = getColumnTypeNamesForTable(tableName); + } else if (null != callName) { + columnTypeNames = getColumnTypeNamesForProcedure(callName); } else { // This is based on an arbitrary query. String query = sqlQuery; diff --git a/src/java/org/apache/sqoop/manager/MySQLManager.java b/src/java/org/apache/sqoop/manager/MySQLManager.java index 2090b1a2..e1d5a36e 100644 --- a/src/java/org/apache/sqoop/manager/MySQLManager.java +++ b/src/java/org/apache/sqoop/manager/MySQLManager.java @@ -22,12 +22,19 @@ import java.io.PrintWriter; import java.net.URI; import java.net.URISyntaxException; +import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.TreeMap; import org.apache.avro.Schema.Type; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -261,6 +268,126 @@ public boolean supportsStagingForExport() { return true; } + @Override + public String[] getColumnNamesForProcedure(String procedureName) { + List ret = new ArrayList(); + try { + DatabaseMetaData metaData = this.getConnection().getMetaData(); + ResultSet results = metaData.getProcedureColumns(null, null, + procedureName, null); + if (null == results) { + LOG.debug("Get Procedure Columns returns null"); + return null; + } + + try { + while (results.next()) { + if (results.getInt("COLUMN_TYPE") + != DatabaseMetaData.procedureColumnReturn) { + String name = results.getString("COLUMN_NAME"); + ret.add(name); + } + } + String[] result = ret.toArray(new String[ret.size()]); + LOG.debug("getColumnsNamesForProcedure returns " + + StringUtils.join(ret, ",")); + return result; + } finally { + results.close(); + getConnection().commit(); + } + } catch (SQLException e) { + LoggingUtils.logAll(LOG, "Error reading procedure metadata: ", e); + throw new RuntimeException("Can't fetch column names for procedure.", e); + } + } + + @Override + public Map getColumnTypesForProcedure(String procedureName) { + Map ret = new TreeMap(); + try { + DatabaseMetaData metaData = this.getConnection().getMetaData(); + ResultSet results = metaData.getProcedureColumns(null, null, + procedureName, null); + if (null == results) { + LOG.debug("getColumnTypesForProcedure returns null"); + return null; + } + + try { + while (results.next()) { + if (results.getInt("COLUMN_TYPE") + != DatabaseMetaData.procedureColumnReturn) { + // we don't care if we get several rows for the + // same ORDINAL_POSITION (e.g. like H2 gives us) + // as we'll just overwrite the entry in the map: + ret.put( + results.getString("COLUMN_NAME"), + results.getInt("DATA_TYPE")); + } + } + + LOG.debug("Columns returned = " + StringUtils.join(ret.keySet(), ",")); + LOG.debug("Types returned = " + StringUtils.join(ret.values(), ",")); + + return ret.isEmpty() ? null : ret; + } finally { + if (results != null) { + results.close(); + } + getConnection().commit(); + } + } catch (SQLException sqlException) { + LoggingUtils.logAll(LOG, "Error reading primary key metadata: " + + sqlException.toString(), sqlException); + return null; + } + } + + @Override + public Map + getColumnTypeNamesForProcedure(String procedureName) { + Map ret = new TreeMap(); + try { + DatabaseMetaData metaData = this.getConnection().getMetaData(); + ResultSet results = metaData.getProcedureColumns(null, null, + procedureName, null); + if (null == results) { + LOG.debug("getColumnTypesForProcedure returns null"); + return null; + } + + try { + while (results.next()) { + if (results.getInt("COLUMN_TYPE") + != DatabaseMetaData.procedureColumnReturn) { + // we don't care if we get several rows for the + // same ORDINAL_POSITION (e.g. like H2 gives us) + // as we'll just overwrite the entry in the map: + ret.put( + results.getString("COLUMN_NAME"), + results.getString("TYPE_NAME")); + } + } + + LOG.debug("Columns returned = " + StringUtils.join(ret.keySet(), ",")); + LOG.debug( + "Type names returned = " + StringUtils.join(ret.values(), ",")); + + return ret.isEmpty() ? null : ret; + } finally { + if (results != null) { + results.close(); + } + getConnection().commit(); + } + } catch (SQLException sqlException) { + LoggingUtils.logAll(LOG, "Error reading primary key metadata: " + + sqlException.toString(), sqlException); + return null; + } + } + @Override protected String getListDatabasesQuery() { return "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA"; @@ -276,8 +403,10 @@ protected String getSchemaQuery() { private int overrideSqlType(String tableName, String columnName, int sqlType) { + if (colTypeNames == null) { - colTypeNames = getColumnTypeNames(tableName, options.getSqlQuery()); + colTypeNames = getColumnTypeNames(tableName, options.getCall(), + options.getSqlQuery()); } if ("YEAR".equalsIgnoreCase(colTypeNames.get(columnName))) { diff --git a/src/java/org/apache/sqoop/manager/OracleManager.java b/src/java/org/apache/sqoop/manager/OracleManager.java index edc888e5..686bc197 100644 --- a/src/java/org/apache/sqoop/manager/OracleManager.java +++ b/src/java/org/apache/sqoop/manager/OracleManager.java @@ -38,6 +38,7 @@ import java.util.Properties; import java.util.Set; import java.util.StringTokenizer; +import java.util.TreeMap; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -482,7 +483,8 @@ public ResultSet readTable(String tableName, String[] columns) */ private String toDbSpecificJavaType(String tableName, String colName) { if (columnTypeNames == null) { - columnTypeNames = getColumnTypeNames(tableName, options.getSqlQuery()); + columnTypeNames = getColumnTypeNames(tableName, options.getCall(), + options.getSqlQuery()); } String colTypeName = columnTypeNames.get(colName); @@ -490,9 +492,15 @@ private String toDbSpecificJavaType(String tableName, String colName) { if (colTypeName.equalsIgnoreCase("BINARY_FLOAT")) { return "Float"; } + if (colTypeName.equalsIgnoreCase("FLOAT")) { + return "Float"; + } if (colTypeName.equalsIgnoreCase("BINARY_DOUBLE")) { return "Double"; } + if (colTypeName.equalsIgnoreCase("DOUBLE")) { + return "Double"; + } if (colTypeName.toUpperCase().startsWith("TIMESTAMP")) { return "java.sql.Timestamp"; } @@ -508,8 +516,14 @@ private String toDbSpecificJavaType(String tableName, String colName) { */ private String toDbSpecificHiveType(String tableName, String colName) { if (columnTypeNames == null) { - columnTypeNames = getColumnTypeNames(tableName, options.getSqlQuery()); + columnTypeNames = getColumnTypeNames(tableName, options.getCall(), + options.getSqlQuery()); } + LOG.debug("Column Types and names returned = (" + + StringUtils.join(columnTypeNames.keySet(), ",") + + ")=>(" + + StringUtils.join(columnTypeNames.values(), ",") + + ")"); String colTypeName = columnTypeNames.get(colName); if (colTypeName != null) { @@ -753,6 +767,91 @@ public String[] getColumnNamesForProcedure(String procedureName) { } } + @Override + public Map + getColumnTypesForProcedure(String procedureName) { + Map ret = new TreeMap(); + try { + DatabaseMetaData metaData = this.getConnection().getMetaData(); + ResultSet results = metaData.getProcedureColumns(null, null, + procedureName, null); + if (null == results) { + return null; + } + + try { + while (results.next()) { + if (results.getInt("COLUMN_TYPE") + != DatabaseMetaData.procedureColumnReturn) { + int index = results.getInt("ORDINAL_POSITION"); + if (index < 0) { + continue; // actually the return type + } + // we don't care if we get several rows for the + // same ORDINAL_POSITION (e.g. like H2 gives us) + // as we'll just overwrite the entry in the map: + ret.put( + results.getString("COLUMN_NAME"), + results.getInt("DATA_TYPE")); + } + } + LOG.debug("Columns returned = " + StringUtils.join(ret.keySet(), ",")); + LOG.debug("Types returned = " + StringUtils.join(ret.values(), ",")); + return ret.isEmpty() ? null : ret; + } finally { + results.close(); + getConnection().commit(); + } + } catch (SQLException sqlException) { + LoggingUtils.logAll(LOG, "Error reading primary key metadata: " + + sqlException.toString(), sqlException); + return null; + } + } + + @Override + public Map + getColumnTypeNamesForProcedure(String procedureName) { + Map ret = new TreeMap(); + try { + DatabaseMetaData metaData = this.getConnection().getMetaData(); + ResultSet results = metaData.getProcedureColumns(null, null, + procedureName, null); + if (null == results) { + return null; + } + + try { + while (results.next()) { + if (results.getInt("COLUMN_TYPE") + != DatabaseMetaData.procedureColumnReturn) { + int index = results.getInt("ORDINAL_POSITION"); + if (index < 0) { + continue; // actually the return type + } + // we don't care if we get several rows for the + // same ORDINAL_POSITION (e.g. like H2 gives us) + // as we'll just overwrite the entry in the map: + ret.put( + results.getString("COLUMN_NAME"), + results.getString("TYPE_NAME")); + } + } + LOG.debug("Columns returned = " + StringUtils.join(ret.keySet(), ",")); + LOG.debug( + "Type names returned = " + StringUtils.join(ret.values(), ",")); + return ret.isEmpty() ? null : ret; + } finally { + results.close(); + getConnection().commit(); + } + } catch (SQLException sqlException) { + LoggingUtils.logAll(LOG, "Error reading primary key metadata: " + + sqlException.toString(), sqlException); + return null; + } + } + @Override public String[] getColumnNames(String tableName) { Connection conn = null; diff --git a/src/java/org/apache/sqoop/manager/SqlManager.java b/src/java/org/apache/sqoop/manager/SqlManager.java index e96368ba..2a4992d1 100644 --- a/src/java/org/apache/sqoop/manager/SqlManager.java +++ b/src/java/org/apache/sqoop/manager/SqlManager.java @@ -235,6 +235,7 @@ public Map getColumnTypesForQuery(String query) { */ protected Map getColumnTypesForRawQuery(String stmt) { ResultSet results; + LOG.debug("Execute getColumnTypesRawQuery : " + stmt); try { results = execute(stmt); } catch (SQLException sqlE) { @@ -399,6 +400,8 @@ public Map getColumnTypesForProcedure(String procedureName) { results.getInt("DATA_TYPE")); } } + LOG.debug("Columns returned = " + StringUtils.join(ret.keySet(), ",")); + LOG.debug("Types returned = " + StringUtils.join(ret.values(), ",")); return ret.isEmpty() ? null : ret; } finally { results.close(); @@ -411,6 +414,46 @@ public Map getColumnTypesForProcedure(String procedureName) { } } + @Override + public Map + getColumnTypeNamesForProcedure(String procedureName) { + Map ret = new TreeMap(); + try { + DatabaseMetaData metaData = this.getConnection().getMetaData(); + ResultSet results = metaData.getProcedureColumns(null, null, + procedureName, null); + if (null == results) { + return null; + } + + try { + while (results.next()) { + if (results.getInt("COLUMN_TYPE") + != DatabaseMetaData.procedureColumnReturn + && results.getInt("ORDINAL_POSITION") > 0) { + // we don't care if we get several rows for the + // same ORDINAL_POSITION (e.g. like H2 gives us) + // as we'll just overwrite the entry in the map: + ret.put( + results.getString("COLUMN_NAME"), + results.getString("TYPE_NAME")); + } + } + LOG.debug("Columns returned = " + StringUtils.join(ret.keySet(), ",")); + LOG.debug( + "Type names returned = " + StringUtils.join(ret.values(), ",")); + return ret.isEmpty() ? null : ret; + } finally { + results.close(); + getConnection().commit(); + } + } catch (SQLException sqlException) { + LoggingUtils.logAll(LOG, "Error reading primary key metadata: " + + sqlException.toString(), sqlException); + return null; + } + } + @Override public String[] listTables() { ResultSet results = null; diff --git a/src/test/com/cloudera/sqoop/ThirdPartyTests.java b/src/test/com/cloudera/sqoop/ThirdPartyTests.java index 7fae0520..ada5c72f 100644 --- a/src/test/com/cloudera/sqoop/ThirdPartyTests.java +++ b/src/test/com/cloudera/sqoop/ThirdPartyTests.java @@ -41,6 +41,9 @@ import com.cloudera.sqoop.manager.OracleCompatTest; import com.cloudera.sqoop.manager.PostgresqlExportTest; import com.cloudera.sqoop.manager.PostgresqlImportTest; + +import org.apache.sqoop.manager.mysql.MySqlCallExportTest; +import org.apache.sqoop.manager.oracle.OracleCallExportTest; import org.apache.sqoop.manager.sqlserver.SQLServerDatatypeExportDelimitedFileManualTest; import org.apache.sqoop.manager.sqlserver.SQLServerDatatypeExportSequenceFileManualTest; import org.apache.sqoop.manager.sqlserver.SQLServerDatatypeImportDelimitedFileManualTest; @@ -110,6 +113,10 @@ public static Test suite() { suite.addTestSuite(HCatalogImportTest.class); suite.addTestSuite(HCatalogExportTest.class); + // Call Export tests + suite.addTestSuite(MySqlCallExportTest.class); + suite.addTestSuite(OracleCallExportTest.class); + return suite; } diff --git a/src/test/org/apache/sqoop/TestExportUsingProcedure.java b/src/test/org/apache/sqoop/TestExportUsingProcedure.java index 6414ef7b..78d688d1 100644 --- a/src/test/org/apache/sqoop/TestExportUsingProcedure.java +++ b/src/test/org/apache/sqoop/TestExportUsingProcedure.java @@ -74,7 +74,7 @@ public void createTable(ColumnGenerator... extraColumns) throws SQLException { createProcedure(names, types); } - private void createProcedure(String[] extraNames, String[] extraTypes) + protected void createProcedure(String[] extraNames, String[] extraTypes) throws SQLException { StringBuilder drop = new StringBuilder("DROP ALIAS IF EXISTS "); drop.append(PROCEDURE_NAME); diff --git a/src/test/org/apache/sqoop/manager/mysql/MySqlCallExportTest.java b/src/test/org/apache/sqoop/manager/mysql/MySqlCallExportTest.java new file mode 100644 index 00000000..3be447ea --- /dev/null +++ b/src/test/org/apache/sqoop/manager/mysql/MySqlCallExportTest.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.manager.mysql; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Writer; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.manager.MySQLTestUtils; +import com.cloudera.sqoop.testutil.CommonArgs; +import com.cloudera.sqoop.testutil.ExportJobTestCase; + +/** + * Test free form query import with the MySQL db. + */ +public class MySqlCallExportTest extends ExportJobTestCase { + + public static final Log LOG = LogFactory.getLog( + MySqlCallExportTest.class.getName()); + + private final String tableName = "MYSQL_CALL_EXPORT_BASE_TABLE"; + private final String procName = "MYSQL_CALL_EXPORT_PROC"; + + @Override + public void setUp() { + super.setUp(); + createObjects(); + } + + private String[] getArgv(String... extraArgs) { + ArrayList args = new ArrayList(); + + CommonArgs.addHadoopFlags(args); + + args.add("--call"); + args.add(procName); + args.add("--export-dir"); + args.add(getWarehouseDir()); + args.add("--fields-terminated-by"); + args.add(","); + args.add("--lines-terminated-by"); + args.add("\\n"); + args.add("--connect"); + args.add(getConnectString()); + args.add("-m"); + args.add("1"); + + for (String arg : extraArgs) { + args.add(arg); + } + + return args.toArray(new String[0]); + } + + private void createObjects() { + + String createTableSql = "CREATE TABLE " + tableName + " ( " + + "id INT NOT NULL PRIMARY KEY, " + + "msg VARCHAR(24) NOT NULL, " + + "d DATE, " + + "f FLOAT, " + + "vc VARCHAR(32))"; + + String createProcSql = "CREATE PROCEDURE " + procName + " ( " + + "IN id INT," + + "IN msg VARCHAR(24)," + + "IN d DATE," + + "IN f FLOAT) BEGIN " + + "INSERT INTO " + tableName + " " + + "VALUES(id," + + "msg," + + "d," + + "f," + + "concat(msg, '_2')); END"; + + try { + dropTableIfExists(tableName); + dropProcedureIfExists(procName); + } catch (SQLException sqle) { + throw new AssertionError(sqle.getMessage()); + } + Connection conn = getConnection(); + + try { + Statement st = conn.createStatement(); + st.executeUpdate(createTableSql); + LOG.debug("Successfully created table " + tableName); + st.executeUpdate(createProcSql); + LOG.debug("Successfully created procedure " + procName); + st.close(); + } catch (SQLException sqle) { + throw new AssertionError(sqle.getMessage()); + } + } + + @Override + protected Connection getConnection() { + try { + return getManager().getConnection(); + } catch (SQLException sqle) { + throw new AssertionError(sqle.getMessage()); + } + } + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + @Override + protected String getConnectString() { + return MySQLTestUtils.CONNECT_STRING; + } + + @Override + protected SqoopOptions getSqoopOptions(Configuration conf) { + SqoopOptions opts = new SqoopOptions(conf); + opts.setUsername(MySQLTestUtils.getCurrentUser()); + return opts; + } + + @Override + protected String getTableName() { + return tableName; + } + + @Override + protected void dropTableIfExists(String table) throws SQLException { + Connection conn = getManager().getConnection(); + PreparedStatement statement = conn.prepareStatement( + "DROP TABLE IF EXISTS " + table, + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try { + statement.executeUpdate(); + conn.commit(); + } finally { + statement.close(); + } + } + + protected void dropProcedureIfExists(String proc) throws SQLException { + Connection conn = getManager().getConnection(); + PreparedStatement statement = conn.prepareStatement( + "DROP PROCEDURE IF EXISTS " + proc, + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try { + statement.executeUpdate(); + conn.commit(); + } finally { + statement.close(); + } + } + + public void testExportUsingProcedure() throws IOException, SQLException { + String[] lines = { + "0,textfield0,2002-12-29,3300", + "1,textfield1,2007-06-04,4400", + }; + new File(getWarehouseDir()).mkdirs(); + File file = new File(getWarehouseDir() + "/part-00000"); + Writer output = new BufferedWriter(new FileWriter(file)); + for (String line : lines) { + output.write(line); + output.write("\n"); + } + output.close(); + runExport(getArgv()); + verifyExport(2, getConnection()); + } +} diff --git a/src/test/org/apache/sqoop/manager/oracle/OracleCallExportTest.java b/src/test/org/apache/sqoop/manager/oracle/OracleCallExportTest.java new file mode 100644 index 00000000..44b2f9af --- /dev/null +++ b/src/test/org/apache/sqoop/manager/oracle/OracleCallExportTest.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.manager.oracle; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Writer; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.manager.OracleUtils; +import com.cloudera.sqoop.testutil.CommonArgs; +import com.cloudera.sqoop.testutil.ExportJobTestCase; + +/** + * Test free form query import with the MySQL db. + */ +public class OracleCallExportTest extends ExportJobTestCase { + + public static final Log LOG = LogFactory.getLog( + OracleCallExportTest.class.getName()); + + private final String tableName = "ORACLE_CALL_EXPORT_BASE_TABLE"; + private final String procName = "ORACLE_CALL_EXPORT_PROC"; + + @Override + public void setUp() { + super.setUp(); + createObjects(); + } + + + private String[] getArgv(String... extraArgs) { + ArrayList args = new ArrayList(); + + CommonArgs.addHadoopFlags(args); + + args.add("--call"); + args.add(procName); + args.add("--export-dir"); + args.add(getWarehouseDir()); + args.add("--fields-terminated-by"); + args.add(","); + args.add("--lines-terminated-by"); + args.add("\\n"); + args.add("--connect"); + args.add(getConnectString()); + args.add("--username"); + args.add(OracleUtils.ORACLE_USER_NAME); + args.add("--password"); + args.add(OracleUtils.ORACLE_USER_PASS); + args.add("-m"); + args.add("1"); + + for (String arg : extraArgs) { + args.add(arg); + } + + return args.toArray(new String[0]); + } + + + private void createObjects() { + + String createTableSql = "CREATE TABLE " + tableName + " ( " + + "id INT NOT NULL PRIMARY KEY, " + + "msg VARCHAR(24) NOT NULL, " + + "d DATE, " + + "f FLOAT, " + + "vc VARCHAR(32))"; + + String createProcSql = "CREATE PROCEDURE " + procName + " ( " + + "id IN INT," + + "msg IN VARCHAR," + + "d IN DATE," + + "f IN FLOAT) IS BEGIN " + + "INSERT INTO " + tableName + " " + + "VALUES(id," + + "msg," + + "d," + + "f," + + "msg || '_2'); END;"; + + try { + dropTableIfExists(tableName); + dropProcedureIfExists(procName); + } catch (SQLException sqle) { + throw new AssertionError(sqle.getMessage()); + } + Connection conn = getConnection(); + + try { + Statement st = conn.createStatement(); + st.executeUpdate(createTableSql); + LOG.debug("Successfully created table " + tableName); + st.executeUpdate(createProcSql); + LOG.debug("Successfully created procedure " + procName); + st.close(); + } catch (SQLException sqle) { + throw new AssertionError(sqle.getMessage()); + } + } + + @Override + protected Connection getConnection() { + try { + return getManager().getConnection(); + } catch (SQLException sqle) { + throw new AssertionError(sqle.getMessage()); + } + } + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + @Override + protected String getConnectString() { + return OracleUtils.CONNECT_STRING; + } + + @Override + protected SqoopOptions getSqoopOptions(Configuration conf) { + SqoopOptions opts = new SqoopOptions(conf); + opts.setUsername(OracleUtils.ORACLE_USER_NAME); + opts.setPassword(OracleUtils.ORACLE_USER_PASS); + return opts; + } + + @Override + protected String getTableName() { + return tableName; + } + + @Override + protected void dropTableIfExists(String table) throws SQLException { + Connection conn = getManager().getConnection(); + PreparedStatement statement = conn.prepareStatement( + "DROP TABLE " + table, + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try { + try { + statement.executeUpdate(); + } catch (SQLException sqle) { + // Ignore + } + conn.commit(); + } finally { + statement.close(); + } + } + + protected void dropProcedureIfExists(String proc) throws SQLException { + Connection conn = getManager().getConnection(); + PreparedStatement statement = conn.prepareStatement( + "DROP PROCEDURE " + proc, + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try { + try { + statement.executeUpdate(); + } catch (SQLException sqle) { + // Ignore + } + conn.commit(); + } finally { + statement.close(); + } + } + public void testExportUsingProcedure() throws IOException, SQLException { + String[] lines = { + "0,textfield0,2002-12-29 08:40:00,3300", + "1,textfield1,2007-06-04 13:15:10,4400", + }; + new File(getWarehouseDir()).mkdirs(); + File file = new File(getWarehouseDir() + "/part-00000"); + Writer output = new BufferedWriter(new FileWriter(file)); + for (String line : lines) { + output.write(line); + output.write("\n"); + } + output.close(); + runExport(getArgv()); + verifyExport(2, getConnection()); + } +}