From f11c3091c2ef3e52717acbc40c88def238b1cc3a Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Mon, 17 Sep 2012 11:04:18 -0700 Subject: [PATCH] SQOOP-601 Support custom schemas in PostgreSQL Connector (Jarek Jarcec Cecho via Cheolsoo Park) --- src/docs/user/connectors.txt | 32 ++ .../sqoop/manager/PostgresqlManager.java | 5 - .../sqoop/manager/CatalogQueryManager.java | 5 +- .../manager/DirectPostgresqlManager.java | 4 +- .../sqoop/manager/PGBulkloadManager.java | 2 +- .../sqoop/manager/PostgresqlManager.java | 85 +++- .../org/apache/sqoop/manager/SqlManager.java | 13 + .../apache/sqoop/mapreduce/JdbcExportJob.java | 2 +- .../sqoop/mapreduce/JdbcUpdateExportJob.java | 3 +- .../sqoop/mapreduce/JdbcUpsertExportJob.java | 2 +- .../db/DataDrivenDBRecordReader.java | 8 +- .../com/cloudera/sqoop/ThirdPartyTests.java | 6 +- .../sqoop/manager/PostgresqlExportTest.java | 362 ++++++++++++++++++ ...sqlTest.java => PostgresqlImportTest.java} | 74 +++- 14 files changed, 561 insertions(+), 42 deletions(-) create mode 100644 src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java rename src/test/com/cloudera/sqoop/manager/{PostgresqlTest.java => PostgresqlImportTest.java} (80%) diff --git a/src/docs/user/connectors.txt b/src/docs/user/connectors.txt index a93f14eb..930a4996 100644 --- a/src/docs/user/connectors.txt +++ b/src/docs/user/connectors.txt @@ -21,6 +21,38 @@ Notes for specific connectors ----------------------------- +PostgreSQL Connector +~~~~~~~~~~~~~~~~~~~~~ + +Extra arguments +^^^^^^^^^^^^^^^ + +List of all extra arguments supported by PostgreSQL Connector is shown on table +below: + +.Supported PostgreSQL extra arguments: +[grid="all"] +`----------------------------------------`--------------------------------------- +Argument Description +--------------------------------------------------------------------------------- ++\--schema + Scheme name that sqoop should use. \ + Default is "public". +--------------------------------------------------------------------------------- + +Schema support +^^^^^^^^^^^^^^ + +If you need to work with table that is located in schema other than default one, +you need to specify extra argument +\--schema+. Custom schemas are supported for +both import and export job (optional staging table however must be present in the +same schema as target table). Example invocation: + +---- +$ sqoop import ... --table custom_table -- --schema custom_schema +---- + + + pg_bulkload connector ~~~~~~~~~~~~~~~~~~~~~ diff --git a/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java b/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java index 16adeb2f..354d260f 100644 --- a/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java +++ b/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java @@ -29,10 +29,5 @@ public class PostgresqlManager public PostgresqlManager(final SqoopOptions opts) { super(opts); } - - protected PostgresqlManager(final SqoopOptions opts, boolean ignored) { - super(opts, ignored); - } - } diff --git a/src/java/org/apache/sqoop/manager/CatalogQueryManager.java b/src/java/org/apache/sqoop/manager/CatalogQueryManager.java index 5f2f89fd..fa7661e3 100644 --- a/src/java/org/apache/sqoop/manager/CatalogQueryManager.java +++ b/src/java/org/apache/sqoop/manager/CatalogQueryManager.java @@ -142,10 +142,11 @@ public String[] getColumnNames(String tableName) { Statement s = null; ResultSet rs = null; List columns = new ArrayList(); + String listColumnsQuery = getListColumnsQuery(tableName); try { c = getConnection(); s = c.createStatement(); - rs = s.executeQuery(getListColumnsQuery(tableName)); + rs = s.executeQuery(listColumnsQuery); while (rs.next()) { columns.add(rs.getString(1)); } @@ -158,7 +159,7 @@ public String[] getColumnNames(String tableName) { } catch (SQLException ce) { LOG.error("Failed to rollback transaction", ce); } - LOG.error("Failed to list columns", sqle); + LOG.error("Failed to list columns from query: " + listColumnsQuery, sqle); throw new RuntimeException(sqle); } finally { if (rs != null) { diff --git a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java index a557aa13..ea91fc69 100644 --- a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java +++ b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java @@ -56,9 +56,7 @@ public class DirectPostgresqlManager DirectPostgresqlManager.class.getName()); public DirectPostgresqlManager(final SqoopOptions opts) { - // Inform superclass that we're overriding import method via alt. - // constructor. - super(opts, true); + super(opts); } private static final String PSQL_CMD = "psql"; diff --git a/src/java/org/apache/sqoop/manager/PGBulkloadManager.java b/src/java/org/apache/sqoop/manager/PGBulkloadManager.java index 92174f8e..091fd15b 100644 --- a/src/java/org/apache/sqoop/manager/PGBulkloadManager.java +++ b/src/java/org/apache/sqoop/manager/PGBulkloadManager.java @@ -40,7 +40,7 @@ public class PGBulkloadManager extends PostgresqlManager { public PGBulkloadManager(final SqoopOptions opts) { - super(opts, true); + super(opts); } diff --git a/src/java/org/apache/sqoop/manager/PostgresqlManager.java b/src/java/org/apache/sqoop/manager/PostgresqlManager.java index d18321c3..7e6284ef 100644 --- a/src/java/org/apache/sqoop/manager/PostgresqlManager.java +++ b/src/java/org/apache/sqoop/manager/PostgresqlManager.java @@ -21,11 +21,17 @@ import java.io.IOException; import java.sql.SQLException; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.util.ImportException; +import org.apache.sqoop.cli.RelatedOptions; /** * Manages connections to Postgresql databases. @@ -33,6 +39,8 @@ public class PostgresqlManager extends com.cloudera.sqoop.manager.CatalogQueryManager { + public static final String SCHEMA = "schema"; + public static final Log LOG = LogFactory.getLog( PostgresqlManager.class.getName()); @@ -42,13 +50,20 @@ public class PostgresqlManager // set to true after we warn the user that we can use direct fastpath. private static boolean warningPrinted = false; + /* + * PostgreSQL schema that we should use. + */ + private String schema; + public PostgresqlManager(final SqoopOptions opts) { super(DRIVER_CLASS, opts); - } - protected PostgresqlManager(final SqoopOptions opts, boolean ignored) { - // constructor used by subclasses to avoid the --direct warning. - super(DRIVER_CLASS, opts); + // Try to parse extra arguments + try { + parseExtraArgs(opts.getExtraArgs()); + } catch (ParseException e) { + throw new RuntimeException("Can't parse extra arguments", e); + } } @Override @@ -58,6 +73,11 @@ public String escapeColName(String colName) { @Override public String escapeTableName(String tableName) { + // Return full table name including schema if needed + if (schema != null && !schema.isEmpty()) { + return escapeIdentifier(schema) + "." + escapeIdentifier(tableName); + } + return escapeIdentifier(tableName); } @@ -117,7 +137,7 @@ protected String getListDatabasesQuery() { protected String getListTablesQuery() { return "SELECT TABLENAME FROM PG_CATALOG.PG_TABLES " - + "WHERE SCHEMANAME = (SELECT CURRENT_SCHEMA())"; + + "WHERE SCHEMANAME = " + getSchemaSqlFragment(); } @Override @@ -127,7 +147,7 @@ protected String getListColumnsQuery(String tableName) { + " PG_CATALOG.PG_CLASS tab, PG_CATALOG.PG_ATTRIBUTE col " + "WHERE sch.OID = tab.RELNAMESPACE " + " AND tab.OID = col.ATTRELID " - + " AND sch.NSPNAME = (SELECT CURRENT_SCHEMA()) " + + " AND sch.NSPNAME = " + getSchemaSqlFragment() + " AND tab.RELNAME = '" + escapeLiteral(tableName) + "' " + " AND col.ATTNUM >= 1" + " AND col.ATTISDROPPED = 'f'"; @@ -142,12 +162,20 @@ protected String getPrimaryKeyQuery(String tableName) { + "WHERE sch.OID = tab.RELNAMESPACE " + " AND tab.OID = col.ATTRELID " + " AND tab.OID = ind.INDRELID " - + " AND sch.NSPNAME = (SELECT CURRENT_SCHEMA()) " + + " AND sch.NSPNAME = " + getSchemaSqlFragment() + " AND tab.RELNAME = '" + escapeLiteral(tableName) + "' " + " AND col.ATTNUM = ANY(ind.INDKEY) " + " AND ind.INDISPRIMARY"; } + private String getSchemaSqlFragment() { + if (schema != null && !schema.isEmpty()) { + return "'" + escapeLiteral(schema) + "'"; + } + + return "(SELECT CURRENT_SCHEMA())"; + } + private String escapeLiteral(String literal) { return literal.replace("'", "''"); } @@ -157,5 +185,48 @@ protected String getCurTimestampQuery() { return "SELECT CURRENT_TIMESTAMP"; } + /** + * Parse extra arguments. + * + * @param args Extra arguments array + * @throws ParseException + */ + void parseExtraArgs(String[] args) throws ParseException { + // No-op when no extra arguments are present + if (args == null || args.length == 0) { + return; + } + + // We do not need extended abilities of SqoopParser, so we're using + // Gnu parser instead. + CommandLineParser parser = new GnuParser(); + CommandLine cmdLine = parser.parse(getExtraOptions(), args, true); + + // Apply extra options + if (cmdLine.hasOption(SCHEMA)) { + String schemaName = cmdLine.getOptionValue(SCHEMA); + LOG.info("We will use schema " + schemaName); + + this.schema = schemaName; + } + } + + /** + * Create related options for PostgreSQL extra parameters. + * + * @return + */ + @SuppressWarnings("static-access") + private RelatedOptions getExtraOptions() { + // Connection args (common) + RelatedOptions extraOptions = + new RelatedOptions("PostgreSQL extra options:"); + + extraOptions.addOption(OptionBuilder.withArgName("string").hasArg() + .withDescription("Optional schema name") + .withLongOpt(SCHEMA).create()); + + return extraOptions; + } } diff --git a/src/java/org/apache/sqoop/manager/SqlManager.java b/src/java/org/apache/sqoop/manager/SqlManager.java index ea961cd7..3a52c6dc 100644 --- a/src/java/org/apache/sqoop/manager/SqlManager.java +++ b/src/java/org/apache/sqoop/manager/SqlManager.java @@ -766,6 +766,10 @@ public Timestamp getCurrentDbTimestamp() { @Override public long getTableRowCount(String tableName) throws SQLException { release(); // Release any previous ResultSet + + // Escape used table name + tableName = escapeTableName(tableName); + long result = -1; String countQuery = "SELECT COUNT(*) FROM " + tableName; Statement stmt = null; @@ -801,6 +805,10 @@ public long getTableRowCount(String tableName) throws SQLException { @Override public void deleteAllRecords(String tableName) throws SQLException { release(); // Release any previous ResultSet + + // Escape table name + tableName = escapeTableName(tableName); + String deleteQuery = "DELETE FROM " + tableName; Statement stmt = null; try { @@ -827,6 +835,11 @@ public void deleteAllRecords(String tableName) throws SQLException { public void migrateData(String fromTable, String toTable) throws SQLException { release(); // Release any previous ResultSet + + // Escape all table names + fromTable = escapeTableName(fromTable); + toTable = escapeTableName(toTable); + String updateQuery = "INSERT INTO " + toTable + " ( SELECT * FROM " + fromTable + " )"; diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java index b574f82c..bd52f008 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java @@ -127,7 +127,7 @@ protected void configureOutputFormat(Job job, String tableName, if (null == colNames) { colNames = mgr.getColumnNames(tableName); } - DBOutputFormat.setOutput(job, tableName, colNames); + DBOutputFormat.setOutput(job, mgr.escapeTableName(tableName), colNames); job.setOutputFormatClass(getOutputFormatClass()); job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName); diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java index 7be5ed99..c8e17c23 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java @@ -132,7 +132,8 @@ protected void configureOutputFormat(Job job, String tableName, outColNames[j++] = colNames[i]; } } - DBOutputFormat.setOutput(job, tableName, outColNames); + DBOutputFormat.setOutput(job, + mgr.escapeTableName(tableName), outColNames); job.setOutputFormatClass(getOutputFormatClass()); job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName); diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java index f299f982..c17b4bbb 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java @@ -72,7 +72,7 @@ protected void configureOutputFormat(Job job, String tableName, throw new IOException( "Export column names could not be determined for " + tableName); } - DBOutputFormat.setOutput(job, tableName, colNames); + DBOutputFormat.setOutput(job, mgr.escapeTableName(tableName), colNames); String updateKeyColumns = options.getUpdateKeyCol(); if (null == updateKeyColumns) { diff --git a/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java b/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java index 38e9fb9c..a56b93da 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java +++ b/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java @@ -98,10 +98,10 @@ protected String getSelectQuery() { query.append(" FROM ").append(tableName); if (!dbProductName.startsWith("ORACLE") - && !dbProductName.startsWith("DB2")) { - // The AS clause is required for hsqldb, but Oracle explicitly does - // not use it, and DB2 does not allow a qualified name in alias. Since - // this is not necessary for Oracle and DB2, we do not append. + && !dbProductName.startsWith("DB2") + && !dbProductName.startsWith("POSTGRESQL")) { + // The AS clause is required for hsqldb. Some other databases might have + // issues with it, so we're skipping some of them. query.append(" AS ").append(tableName); } query.append(" WHERE "); diff --git a/src/test/com/cloudera/sqoop/ThirdPartyTests.java b/src/test/com/cloudera/sqoop/ThirdPartyTests.java index eeab7f35..949b02d7 100644 --- a/src/test/com/cloudera/sqoop/ThirdPartyTests.java +++ b/src/test/com/cloudera/sqoop/ThirdPartyTests.java @@ -30,7 +30,8 @@ import com.cloudera.sqoop.manager.OracleExportTest; import com.cloudera.sqoop.manager.OracleManagerTest; import com.cloudera.sqoop.manager.OracleCompatTest; -import com.cloudera.sqoop.manager.PostgresqlTest; +import com.cloudera.sqoop.manager.PostgresqlExportTest; +import com.cloudera.sqoop.manager.PostgresqlImportTest; /** * Test battery including all tests of vendor-specific ConnManager @@ -53,7 +54,8 @@ public static Test suite() { suite.addTestSuite(OracleExportTest.class); suite.addTestSuite(OracleManagerTest.class); suite.addTestSuite(OracleCompatTest.class); - suite.addTestSuite(PostgresqlTest.class); + suite.addTestSuite(PostgresqlImportTest.class); + suite.addTestSuite(PostgresqlExportTest.class); return suite; } diff --git a/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java b/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java new file mode 100644 index 00000000..be449e4e --- /dev/null +++ b/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cloudera.sqoop.manager; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.testutil.CommonArgs; +import com.cloudera.sqoop.testutil.ExportJobTestCase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Before; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Writer; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; + +/** + * + */ +public class PostgresqlExportTest extends ExportJobTestCase { + public static final Log LOG = LogFactory.getLog( + PostgresqlExportTest.class.getName()); + + static final String HOST_URL = System.getProperty( + "sqoop.test.postgresql.connectstring.host_url", + "jdbc:postgresql://localhost/"); + + static final String DATABASE_USER = "sqooptest"; + static final String DATABASE_NAME = "sqooptest"; + static final String TABLE_NAME = "EMPLOYEES_PG"; + static final String STAGING_TABLE_NAME = "STAGING"; + static final String SCHEMA_PUBLIC = "public"; + static final String SCHEMA_SPECIAL = "special"; + static final String CONNECT_STRING = HOST_URL + DATABASE_NAME; + + protected Connection connection; + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + @Before + public void setUp() { + super.setUp(); + + LOG.debug("Setting up postgresql test: " + CONNECT_STRING); + + try { + connection = DriverManager.getConnection(HOST_URL, DATABASE_USER, null); + connection.setAutoCommit(false); + } catch (SQLException ex) { + LOG.error("Can't create connection", ex); + throw new RuntimeException(ex); + } + + createTable(TABLE_NAME, SCHEMA_PUBLIC); + createTable(STAGING_TABLE_NAME, SCHEMA_PUBLIC); + createTable(TABLE_NAME, SCHEMA_SPECIAL); + createTable(STAGING_TABLE_NAME, SCHEMA_SPECIAL); + + LOG.debug("setUp complete."); + } + + @Override + public void tearDown() { + super.tearDown(); + + try { + connection.close(); + } catch (SQLException e) { + LOG.error("Ignoring exception in tearDown", e); + } + } + + public void createTable(String tableName, String schema) { + SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName); + options.setUsername(DATABASE_USER); + + ConnManager manager = null; + Statement st = null; + + try { + manager = new PostgresqlManager(options); + st = connection.createStatement(); + + // Create schema if not exists in dummy way (always create and ignore + // errors. + try { + st.executeUpdate("CREATE SCHEMA " + escapeTableOrSchemaName(schema)); + connection.commit(); + } catch (SQLException e) { + LOG.info("Couldn't create schema " + schema + " (is o.k. as long as" + + "the schema already exists.", e); + connection.rollback(); + } + + String fullTableName = escapeTableOrSchemaName(schema) + + "." + escapeTableOrSchemaName(tableName); + + try { + // Try to remove the table first. DROP TABLE IF EXISTS didn't + // get added until pg 8.3, so we just use "DROP TABLE" and ignore + // any exception here if one occurs. + st.executeUpdate("DROP TABLE " + fullTableName); + } catch (SQLException e) { + LOG.info("Couldn't drop table " + schema + "." + tableName + " (ok)", + e); + // Now we need to reset the transaction. + connection.rollback(); + } + + st.executeUpdate("CREATE TABLE " + fullTableName + " (" + + manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, " + + manager.escapeColName("name") + " VARCHAR(24) NOT NULL, " + + manager.escapeColName("start_date") + " DATE, " + + manager.escapeColName("salary") + " FLOAT, " + + manager.escapeColName("dept") + " VARCHAR(32))"); + + connection.commit(); + } catch (SQLException sqlE) { + LOG.error("Encountered SQL Exception: " + sqlE); + sqlE.printStackTrace(); + fail("SQLException when running test setUp(): " + sqlE); + } finally { + try { + if (null != st) { + st.close(); + } + + if (null != manager) { + manager.close(); + } + } catch (SQLException sqlE) { + LOG.warn("Got SQLException when closing connection: " + sqlE); + } + } + + LOG.debug("setUp complete."); + } + + private String [] getArgv(String tableName, + String... extraArgs) { + ArrayList args = new ArrayList(); + + CommonArgs.addHadoopFlags(args); + + args.add("--table"); + args.add(tableName); + args.add("--export-dir"); + args.add(getWarehouseDir()); + args.add("--fields-terminated-by"); + args.add(","); + args.add("--lines-terminated-by"); + args.add("\\n"); + args.add("--connect"); + args.add(CONNECT_STRING); + args.add("--username"); + args.add(DATABASE_USER); + args.add("-m"); + args.add("1"); + + for (String arg : extraArgs) { + args.add(arg); + } + + return args.toArray(new String[0]); + } + + protected void createTestFile(String filename, + String[] lines) + throws IOException { + new File(getWarehouseDir()).mkdirs(); + File file = new File(getWarehouseDir() + "/" + filename); + Writer output = new BufferedWriter(new FileWriter(file)); + for(String line : lines) { + output.write(line); + output.write("\n"); + } + output.close(); + } + + public void testExport() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + runExport(getArgv(TABLE_NAME)); + + assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection); + } + + public void testExportStaging() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + String[] extra = new String[] {"--staging-table", STAGING_TABLE_NAME, }; + + runExport(getArgv(TABLE_NAME, extra)); + + assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection); + } + + public void testExportDirect() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + String[] extra = new String[] {"--direct"}; + + runExport(getArgv(TABLE_NAME, extra)); + + assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection); + } + + public void testExportCustomSchema() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + String[] extra = new String[] {"--", + "--schema", + SCHEMA_SPECIAL, + }; + + runExport(getArgv(TABLE_NAME, extra)); + + assertRowCount(2, + escapeTableOrSchemaName(SCHEMA_SPECIAL) + + "." + escapeTableOrSchemaName(TABLE_NAME), + connection); + } + + public void testExportCustomSchemaStaging() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + String[] extra = new String[] { + "--staging-table", + STAGING_TABLE_NAME, + "--", + "--schema", + SCHEMA_SPECIAL, + }; + + runExport(getArgv(TABLE_NAME, extra)); + + assertRowCount(2, + escapeTableOrSchemaName(SCHEMA_SPECIAL) + + "." + escapeTableOrSchemaName(TABLE_NAME), + connection); + } + + public void testExportCustomSchemaStagingClear() + throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + String[] extra = new String[] { + "--staging-table", + STAGING_TABLE_NAME, + "--clear-staging-table", + "--", + "--schema", + SCHEMA_SPECIAL, + }; + + runExport(getArgv(TABLE_NAME, extra)); + + assertRowCount(2, + escapeTableOrSchemaName(SCHEMA_SPECIAL) + + "." + escapeTableOrSchemaName(TABLE_NAME), + connection); + } + + public void testExportCustomSchemaDirect() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + String[] extra = new String[] { + "--direct", + "--", + "--schema", + SCHEMA_SPECIAL, + }; + + runExport(getArgv(TABLE_NAME, extra)); + + assertRowCount(2, + escapeTableOrSchemaName(SCHEMA_SPECIAL) + + "." + escapeTableOrSchemaName(TABLE_NAME), + connection); + } + + public static void assertRowCount(long expected, + String tableName, + Connection connection) { + Statement stmt = null; + ResultSet rs = null; + try { + stmt = connection.createStatement(); + rs = stmt.executeQuery("SELECT count(*) FROM " + tableName); + + rs.next(); + + assertEquals(expected, rs.getLong(1)); + } catch (SQLException e) { + LOG.error("Can't verify number of rows", e); + fail(); + } finally { + try { + connection.commit(); + + if (stmt != null) { + stmt.close(); + } + if (rs != null) { + rs.close(); + } + } catch (SQLException ex) { + LOG.info("Ignored exception in finally block."); + } + } + } + + public String escapeTableOrSchemaName(String tableName) { + return "\"" + tableName + "\""; + } +} diff --git a/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java b/src/test/com/cloudera/sqoop/manager/PostgresqlImportTest.java similarity index 80% rename from src/test/com/cloudera/sqoop/manager/PostgresqlTest.java rename to src/test/com/cloudera/sqoop/manager/PostgresqlImportTest.java index 0dfd1fc4..267ccd04 100644 --- a/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java +++ b/src/test/com/cloudera/sqoop/manager/PostgresqlImportTest.java @@ -49,7 +49,7 @@ * * Since this requires a Postgresql installation on your local machine to use, * this class is named in such a way that Hadoop's default QA process does not - * run it. You need to run this manually with -Dtestcase=PostgresqlTest or + * run it. You need to run this manually with -Dtestcase=PostgresqlImportTest or * -Dthirdparty=true. * * You need to put Postgresql's JDBC driver library into a location where @@ -76,13 +76,14 @@ * $ sudo -u postgres psql -U postgres template1 * template1=> CREATE USER sqooptest; * template1=> CREATE DATABASE sqooptest; + * template1=> GRANT ALL ON DATABASE sqooptest TO sqooptest; * template1=> \q * */ -public class PostgresqlTest extends ImportJobTestCase { +public class PostgresqlImportTest extends ImportJobTestCase { public static final Log LOG = LogFactory.getLog( - PostgresqlTest.class.getName()); + PostgresqlImportTest.class.getName()); static final String HOST_URL = System.getProperty( "sqoop.test.postgresql.connectstring.host_url", @@ -92,6 +93,9 @@ public class PostgresqlTest extends ImportJobTestCase { static final String DATABASE_NAME = "sqooptest"; static final String TABLE_NAME = "EMPLOYEES_PG"; static final String SPECIAL_TABLE_NAME = "EMPLOYEES_PG's"; + static final String DIFFERENT_TABLE_NAME = "DIFFERENT_TABLE"; + static final String SCHEMA_PUBLIC = "public"; + static final String SCHEMA_SPECIAL = "special"; static final String CONNECT_STRING = HOST_URL + DATABASE_NAME; @Override @@ -105,13 +109,14 @@ public void setUp() { LOG.debug("Setting up another postgresql test: " + CONNECT_STRING); - setUpData(TABLE_NAME); - setUpData(SPECIAL_TABLE_NAME); + setUpData(TABLE_NAME, SCHEMA_PUBLIC); + setUpData(SPECIAL_TABLE_NAME, SCHEMA_PUBLIC); + setUpData(DIFFERENT_TABLE_NAME, SCHEMA_SPECIAL); LOG.debug("setUp complete."); } - public void setUpData(String tableName) { + public void setUpData(String tableName, String schema) { SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName); options.setUsername(DATABASE_USER); @@ -125,33 +130,44 @@ public void setUpData(String tableName) { connection.setAutoCommit(false); st = connection.createStatement(); - // create the database table and populate it with data. + // Create schema if not exists in dummy way (always create and ignore + // errors. + try { + st.executeUpdate("CREATE SCHEMA " + manager.escapeTableName(schema)); + connection.commit(); + } catch (SQLException e) { + LOG.info("Couldn't create schema " + schema + " (is o.k. as long as" + + "the schema already exists.", e); + connection.rollback(); + } + + String fullTableName = manager.escapeTableName(schema) + + "." + manager.escapeTableName(tableName); try { // Try to remove the table first. DROP TABLE IF EXISTS didn't // get added until pg 8.3, so we just use "DROP TABLE" and ignore // any exception here if one occurs. - st.executeUpdate("DROP TABLE " + manager.escapeTableName(tableName)); + st.executeUpdate("DROP TABLE " + fullTableName); } catch (SQLException e) { - LOG.info("Couldn't drop table " + tableName + " (ok)"); - LOG.info(e.toString()); + LOG.info("Couldn't drop table " + schema + "." + tableName + " (ok)", + e); // Now we need to reset the transaction. connection.rollback(); } - st.executeUpdate("CREATE TABLE " + manager.escapeTableName(tableName) - + " (" + st.executeUpdate("CREATE TABLE " + fullTableName + " (" + manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, " + manager.escapeColName("name") + " VARCHAR(24) NOT NULL, " + manager.escapeColName("start_date") + " DATE, " + manager.escapeColName("salary") + " FLOAT, " + manager.escapeColName("dept") + " VARCHAR(32))"); - st.executeUpdate("INSERT INTO " + manager.escapeTableName(tableName) + st.executeUpdate("INSERT INTO " + fullTableName + " VALUES(1,'Aaron','2009-05-14',1000000.00,'engineering')"); - st.executeUpdate("INSERT INTO " + manager.escapeTableName(tableName) + st.executeUpdate("INSERT INTO " + fullTableName + " VALUES(2,'Bob','2009-04-20',400.00,'sales')"); - st.executeUpdate("INSERT INTO " + manager.escapeTableName(tableName) + st.executeUpdate("INSERT INTO " + fullTableName + " VALUES(3,'Fred','2009-01-23',15.00,'marketing')"); connection.commit(); } catch (SQLException sqlE) { @@ -303,4 +319,32 @@ public void testIncrementalImport() throws IOException { doImportAndVerify(false, expectedResults, TABLE_NAME, extraArgs); } + + @Test + public void testDifferentSchemaImport() throws IOException { + String [] expectedResults = { + "2,Bob,2009-04-20,400.0,sales", + "3,Fred,2009-01-23,15.0,marketing", + }; + + String [] extraArgs = { "--", + "--schema", SCHEMA_SPECIAL, + }; + + doImportAndVerify(false, expectedResults, DIFFERENT_TABLE_NAME, extraArgs); + } + + @Test + public void testDifferentSchemaImportDirect() throws IOException { + String [] expectedResults = { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }; + + String [] extraArgs = { "--", + "--schema", SCHEMA_SPECIAL, + }; + + doImportAndVerify(true, expectedResults, DIFFERENT_TABLE_NAME, extraArgs); + } }