mirror of
https://github.com/apache/sqoop.git
synced 2025-05-11 22:41:50 +08:00
SQOOP-601 Support custom schemas in PostgreSQL Connector
(Jarek Jarcec Cecho via Cheolsoo Park)
This commit is contained in:
parent
0b2a688d36
commit
f11c3091c2
@ -21,6 +21,38 @@
|
||||
Notes for specific connectors
|
||||
-----------------------------
|
||||
|
||||
PostgreSQL Connector
|
||||
~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Extra arguments
|
||||
^^^^^^^^^^^^^^^
|
||||
|
||||
List of all extra arguments supported by PostgreSQL Connector is shown on table
|
||||
below:
|
||||
|
||||
.Supported PostgreSQL extra arguments:
|
||||
[grid="all"]
|
||||
`----------------------------------------`---------------------------------------
|
||||
Argument Description
|
||||
---------------------------------------------------------------------------------
|
||||
+\--schema <name>+ Scheme name that sqoop should use. \
|
||||
Default is "public".
|
||||
---------------------------------------------------------------------------------
|
||||
|
||||
Schema support
|
||||
^^^^^^^^^^^^^^
|
||||
|
||||
If you need to work with table that is located in schema other than default one,
|
||||
you need to specify extra argument +\--schema+. Custom schemas are supported for
|
||||
both import and export job (optional staging table however must be present in the
|
||||
same schema as target table). Example invocation:
|
||||
|
||||
----
|
||||
$ sqoop import ... --table custom_table -- --schema custom_schema
|
||||
----
|
||||
|
||||
|
||||
|
||||
pg_bulkload connector
|
||||
~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
|
@ -29,10 +29,5 @@ public class PostgresqlManager
|
||||
public PostgresqlManager(final SqoopOptions opts) {
|
||||
super(opts);
|
||||
}
|
||||
|
||||
protected PostgresqlManager(final SqoopOptions opts, boolean ignored) {
|
||||
super(opts, ignored);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
@ -142,10 +142,11 @@ public String[] getColumnNames(String tableName) {
|
||||
Statement s = null;
|
||||
ResultSet rs = null;
|
||||
List<String> columns = new ArrayList<String>();
|
||||
String listColumnsQuery = getListColumnsQuery(tableName);
|
||||
try {
|
||||
c = getConnection();
|
||||
s = c.createStatement();
|
||||
rs = s.executeQuery(getListColumnsQuery(tableName));
|
||||
rs = s.executeQuery(listColumnsQuery);
|
||||
while (rs.next()) {
|
||||
columns.add(rs.getString(1));
|
||||
}
|
||||
@ -158,7 +159,7 @@ public String[] getColumnNames(String tableName) {
|
||||
} catch (SQLException ce) {
|
||||
LOG.error("Failed to rollback transaction", ce);
|
||||
}
|
||||
LOG.error("Failed to list columns", sqle);
|
||||
LOG.error("Failed to list columns from query: " + listColumnsQuery, sqle);
|
||||
throw new RuntimeException(sqle);
|
||||
} finally {
|
||||
if (rs != null) {
|
||||
|
@ -56,9 +56,7 @@ public class DirectPostgresqlManager
|
||||
DirectPostgresqlManager.class.getName());
|
||||
|
||||
public DirectPostgresqlManager(final SqoopOptions opts) {
|
||||
// Inform superclass that we're overriding import method via alt.
|
||||
// constructor.
|
||||
super(opts, true);
|
||||
super(opts);
|
||||
}
|
||||
|
||||
private static final String PSQL_CMD = "psql";
|
||||
|
@ -40,7 +40,7 @@ public class PGBulkloadManager extends PostgresqlManager {
|
||||
|
||||
|
||||
public PGBulkloadManager(final SqoopOptions opts) {
|
||||
super(opts, true);
|
||||
super(opts);
|
||||
}
|
||||
|
||||
|
||||
|
@ -21,11 +21,17 @@
|
||||
import java.io.IOException;
|
||||
import java.sql.SQLException;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.CommandLineParser;
|
||||
import org.apache.commons.cli.GnuParser;
|
||||
import org.apache.commons.cli.OptionBuilder;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.cloudera.sqoop.SqoopOptions;
|
||||
import com.cloudera.sqoop.util.ImportException;
|
||||
import org.apache.sqoop.cli.RelatedOptions;
|
||||
|
||||
/**
|
||||
* Manages connections to Postgresql databases.
|
||||
@ -33,6 +39,8 @@
|
||||
public class PostgresqlManager
|
||||
extends com.cloudera.sqoop.manager.CatalogQueryManager {
|
||||
|
||||
public static final String SCHEMA = "schema";
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
PostgresqlManager.class.getName());
|
||||
|
||||
@ -42,13 +50,20 @@ public class PostgresqlManager
|
||||
// set to true after we warn the user that we can use direct fastpath.
|
||||
private static boolean warningPrinted = false;
|
||||
|
||||
/*
|
||||
* PostgreSQL schema that we should use.
|
||||
*/
|
||||
private String schema;
|
||||
|
||||
public PostgresqlManager(final SqoopOptions opts) {
|
||||
super(DRIVER_CLASS, opts);
|
||||
}
|
||||
|
||||
protected PostgresqlManager(final SqoopOptions opts, boolean ignored) {
|
||||
// constructor used by subclasses to avoid the --direct warning.
|
||||
super(DRIVER_CLASS, opts);
|
||||
// Try to parse extra arguments
|
||||
try {
|
||||
parseExtraArgs(opts.getExtraArgs());
|
||||
} catch (ParseException e) {
|
||||
throw new RuntimeException("Can't parse extra arguments", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -58,6 +73,11 @@ public String escapeColName(String colName) {
|
||||
|
||||
@Override
|
||||
public String escapeTableName(String tableName) {
|
||||
// Return full table name including schema if needed
|
||||
if (schema != null && !schema.isEmpty()) {
|
||||
return escapeIdentifier(schema) + "." + escapeIdentifier(tableName);
|
||||
}
|
||||
|
||||
return escapeIdentifier(tableName);
|
||||
}
|
||||
|
||||
@ -117,7 +137,7 @@ protected String getListDatabasesQuery() {
|
||||
protected String getListTablesQuery() {
|
||||
return
|
||||
"SELECT TABLENAME FROM PG_CATALOG.PG_TABLES "
|
||||
+ "WHERE SCHEMANAME = (SELECT CURRENT_SCHEMA())";
|
||||
+ "WHERE SCHEMANAME = " + getSchemaSqlFragment();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -127,7 +147,7 @@ protected String getListColumnsQuery(String tableName) {
|
||||
+ " PG_CATALOG.PG_CLASS tab, PG_CATALOG.PG_ATTRIBUTE col "
|
||||
+ "WHERE sch.OID = tab.RELNAMESPACE "
|
||||
+ " AND tab.OID = col.ATTRELID "
|
||||
+ " AND sch.NSPNAME = (SELECT CURRENT_SCHEMA()) "
|
||||
+ " AND sch.NSPNAME = " + getSchemaSqlFragment()
|
||||
+ " AND tab.RELNAME = '" + escapeLiteral(tableName) + "' "
|
||||
+ " AND col.ATTNUM >= 1"
|
||||
+ " AND col.ATTISDROPPED = 'f'";
|
||||
@ -142,12 +162,20 @@ protected String getPrimaryKeyQuery(String tableName) {
|
||||
+ "WHERE sch.OID = tab.RELNAMESPACE "
|
||||
+ " AND tab.OID = col.ATTRELID "
|
||||
+ " AND tab.OID = ind.INDRELID "
|
||||
+ " AND sch.NSPNAME = (SELECT CURRENT_SCHEMA()) "
|
||||
+ " AND sch.NSPNAME = " + getSchemaSqlFragment()
|
||||
+ " AND tab.RELNAME = '" + escapeLiteral(tableName) + "' "
|
||||
+ " AND col.ATTNUM = ANY(ind.INDKEY) "
|
||||
+ " AND ind.INDISPRIMARY";
|
||||
}
|
||||
|
||||
private String getSchemaSqlFragment() {
|
||||
if (schema != null && !schema.isEmpty()) {
|
||||
return "'" + escapeLiteral(schema) + "'";
|
||||
}
|
||||
|
||||
return "(SELECT CURRENT_SCHEMA())";
|
||||
}
|
||||
|
||||
private String escapeLiteral(String literal) {
|
||||
return literal.replace("'", "''");
|
||||
}
|
||||
@ -157,5 +185,48 @@ protected String getCurTimestampQuery() {
|
||||
return "SELECT CURRENT_TIMESTAMP";
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse extra arguments.
|
||||
*
|
||||
* @param args Extra arguments array
|
||||
* @throws ParseException
|
||||
*/
|
||||
void parseExtraArgs(String[] args) throws ParseException {
|
||||
// No-op when no extra arguments are present
|
||||
if (args == null || args.length == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// We do not need extended abilities of SqoopParser, so we're using
|
||||
// Gnu parser instead.
|
||||
CommandLineParser parser = new GnuParser();
|
||||
CommandLine cmdLine = parser.parse(getExtraOptions(), args, true);
|
||||
|
||||
// Apply extra options
|
||||
if (cmdLine.hasOption(SCHEMA)) {
|
||||
String schemaName = cmdLine.getOptionValue(SCHEMA);
|
||||
LOG.info("We will use schema " + schemaName);
|
||||
|
||||
this.schema = schemaName;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create related options for PostgreSQL extra parameters.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
@SuppressWarnings("static-access")
|
||||
private RelatedOptions getExtraOptions() {
|
||||
// Connection args (common)
|
||||
RelatedOptions extraOptions =
|
||||
new RelatedOptions("PostgreSQL extra options:");
|
||||
|
||||
extraOptions.addOption(OptionBuilder.withArgName("string").hasArg()
|
||||
.withDescription("Optional schema name")
|
||||
.withLongOpt(SCHEMA).create());
|
||||
|
||||
return extraOptions;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -766,6 +766,10 @@ public Timestamp getCurrentDbTimestamp() {
|
||||
@Override
|
||||
public long getTableRowCount(String tableName) throws SQLException {
|
||||
release(); // Release any previous ResultSet
|
||||
|
||||
// Escape used table name
|
||||
tableName = escapeTableName(tableName);
|
||||
|
||||
long result = -1;
|
||||
String countQuery = "SELECT COUNT(*) FROM " + tableName;
|
||||
Statement stmt = null;
|
||||
@ -801,6 +805,10 @@ public long getTableRowCount(String tableName) throws SQLException {
|
||||
@Override
|
||||
public void deleteAllRecords(String tableName) throws SQLException {
|
||||
release(); // Release any previous ResultSet
|
||||
|
||||
// Escape table name
|
||||
tableName = escapeTableName(tableName);
|
||||
|
||||
String deleteQuery = "DELETE FROM " + tableName;
|
||||
Statement stmt = null;
|
||||
try {
|
||||
@ -827,6 +835,11 @@ public void deleteAllRecords(String tableName) throws SQLException {
|
||||
public void migrateData(String fromTable, String toTable)
|
||||
throws SQLException {
|
||||
release(); // Release any previous ResultSet
|
||||
|
||||
// Escape all table names
|
||||
fromTable = escapeTableName(fromTable);
|
||||
toTable = escapeTableName(toTable);
|
||||
|
||||
String updateQuery = "INSERT INTO " + toTable
|
||||
+ " ( SELECT * FROM " + fromTable + " )";
|
||||
|
||||
|
@ -127,7 +127,7 @@ protected void configureOutputFormat(Job job, String tableName,
|
||||
if (null == colNames) {
|
||||
colNames = mgr.getColumnNames(tableName);
|
||||
}
|
||||
DBOutputFormat.setOutput(job, tableName, colNames);
|
||||
DBOutputFormat.setOutput(job, mgr.escapeTableName(tableName), colNames);
|
||||
|
||||
job.setOutputFormatClass(getOutputFormatClass());
|
||||
job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
|
||||
|
@ -132,7 +132,8 @@ protected void configureOutputFormat(Job job, String tableName,
|
||||
outColNames[j++] = colNames[i];
|
||||
}
|
||||
}
|
||||
DBOutputFormat.setOutput(job, tableName, outColNames);
|
||||
DBOutputFormat.setOutput(job,
|
||||
mgr.escapeTableName(tableName), outColNames);
|
||||
|
||||
job.setOutputFormatClass(getOutputFormatClass());
|
||||
job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
|
||||
|
@ -72,7 +72,7 @@ protected void configureOutputFormat(Job job, String tableName,
|
||||
throw new IOException(
|
||||
"Export column names could not be determined for " + tableName);
|
||||
}
|
||||
DBOutputFormat.setOutput(job, tableName, colNames);
|
||||
DBOutputFormat.setOutput(job, mgr.escapeTableName(tableName), colNames);
|
||||
|
||||
String updateKeyColumns = options.getUpdateKeyCol();
|
||||
if (null == updateKeyColumns) {
|
||||
|
@ -98,10 +98,10 @@ protected String getSelectQuery() {
|
||||
|
||||
query.append(" FROM ").append(tableName);
|
||||
if (!dbProductName.startsWith("ORACLE")
|
||||
&& !dbProductName.startsWith("DB2")) {
|
||||
// The AS clause is required for hsqldb, but Oracle explicitly does
|
||||
// not use it, and DB2 does not allow a qualified name in alias. Since
|
||||
// this is not necessary for Oracle and DB2, we do not append.
|
||||
&& !dbProductName.startsWith("DB2")
|
||||
&& !dbProductName.startsWith("POSTGRESQL")) {
|
||||
// The AS clause is required for hsqldb. Some other databases might have
|
||||
// issues with it, so we're skipping some of them.
|
||||
query.append(" AS ").append(tableName);
|
||||
}
|
||||
query.append(" WHERE ");
|
||||
|
@ -30,7 +30,8 @@
|
||||
import com.cloudera.sqoop.manager.OracleExportTest;
|
||||
import com.cloudera.sqoop.manager.OracleManagerTest;
|
||||
import com.cloudera.sqoop.manager.OracleCompatTest;
|
||||
import com.cloudera.sqoop.manager.PostgresqlTest;
|
||||
import com.cloudera.sqoop.manager.PostgresqlExportTest;
|
||||
import com.cloudera.sqoop.manager.PostgresqlImportTest;
|
||||
|
||||
/**
|
||||
* Test battery including all tests of vendor-specific ConnManager
|
||||
@ -53,7 +54,8 @@ public static Test suite() {
|
||||
suite.addTestSuite(OracleExportTest.class);
|
||||
suite.addTestSuite(OracleManagerTest.class);
|
||||
suite.addTestSuite(OracleCompatTest.class);
|
||||
suite.addTestSuite(PostgresqlTest.class);
|
||||
suite.addTestSuite(PostgresqlImportTest.class);
|
||||
suite.addTestSuite(PostgresqlExportTest.class);
|
||||
|
||||
return suite;
|
||||
}
|
||||
|
362
src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java
Normal file
362
src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java
Normal file
@ -0,0 +1,362 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.cloudera.sqoop.manager;
|
||||
|
||||
import com.cloudera.sqoop.SqoopOptions;
|
||||
import com.cloudera.sqoop.testutil.CommonArgs;
|
||||
import com.cloudera.sqoop.testutil.ExportJobTestCase;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.Writer;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class PostgresqlExportTest extends ExportJobTestCase {
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
PostgresqlExportTest.class.getName());
|
||||
|
||||
static final String HOST_URL = System.getProperty(
|
||||
"sqoop.test.postgresql.connectstring.host_url",
|
||||
"jdbc:postgresql://localhost/");
|
||||
|
||||
static final String DATABASE_USER = "sqooptest";
|
||||
static final String DATABASE_NAME = "sqooptest";
|
||||
static final String TABLE_NAME = "EMPLOYEES_PG";
|
||||
static final String STAGING_TABLE_NAME = "STAGING";
|
||||
static final String SCHEMA_PUBLIC = "public";
|
||||
static final String SCHEMA_SPECIAL = "special";
|
||||
static final String CONNECT_STRING = HOST_URL + DATABASE_NAME;
|
||||
|
||||
protected Connection connection;
|
||||
|
||||
@Override
|
||||
protected boolean useHsqldbTestServer() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
super.setUp();
|
||||
|
||||
LOG.debug("Setting up postgresql test: " + CONNECT_STRING);
|
||||
|
||||
try {
|
||||
connection = DriverManager.getConnection(HOST_URL, DATABASE_USER, null);
|
||||
connection.setAutoCommit(false);
|
||||
} catch (SQLException ex) {
|
||||
LOG.error("Can't create connection", ex);
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
|
||||
createTable(TABLE_NAME, SCHEMA_PUBLIC);
|
||||
createTable(STAGING_TABLE_NAME, SCHEMA_PUBLIC);
|
||||
createTable(TABLE_NAME, SCHEMA_SPECIAL);
|
||||
createTable(STAGING_TABLE_NAME, SCHEMA_SPECIAL);
|
||||
|
||||
LOG.debug("setUp complete.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() {
|
||||
super.tearDown();
|
||||
|
||||
try {
|
||||
connection.close();
|
||||
} catch (SQLException e) {
|
||||
LOG.error("Ignoring exception in tearDown", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void createTable(String tableName, String schema) {
|
||||
SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName);
|
||||
options.setUsername(DATABASE_USER);
|
||||
|
||||
ConnManager manager = null;
|
||||
Statement st = null;
|
||||
|
||||
try {
|
||||
manager = new PostgresqlManager(options);
|
||||
st = connection.createStatement();
|
||||
|
||||
// Create schema if not exists in dummy way (always create and ignore
|
||||
// errors.
|
||||
try {
|
||||
st.executeUpdate("CREATE SCHEMA " + escapeTableOrSchemaName(schema));
|
||||
connection.commit();
|
||||
} catch (SQLException e) {
|
||||
LOG.info("Couldn't create schema " + schema + " (is o.k. as long as"
|
||||
+ "the schema already exists.", e);
|
||||
connection.rollback();
|
||||
}
|
||||
|
||||
String fullTableName = escapeTableOrSchemaName(schema)
|
||||
+ "." + escapeTableOrSchemaName(tableName);
|
||||
|
||||
try {
|
||||
// Try to remove the table first. DROP TABLE IF EXISTS didn't
|
||||
// get added until pg 8.3, so we just use "DROP TABLE" and ignore
|
||||
// any exception here if one occurs.
|
||||
st.executeUpdate("DROP TABLE " + fullTableName);
|
||||
} catch (SQLException e) {
|
||||
LOG.info("Couldn't drop table " + schema + "." + tableName + " (ok)",
|
||||
e);
|
||||
// Now we need to reset the transaction.
|
||||
connection.rollback();
|
||||
}
|
||||
|
||||
st.executeUpdate("CREATE TABLE " + fullTableName + " ("
|
||||
+ manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, "
|
||||
+ manager.escapeColName("name") + " VARCHAR(24) NOT NULL, "
|
||||
+ manager.escapeColName("start_date") + " DATE, "
|
||||
+ manager.escapeColName("salary") + " FLOAT, "
|
||||
+ manager.escapeColName("dept") + " VARCHAR(32))");
|
||||
|
||||
connection.commit();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.error("Encountered SQL Exception: " + sqlE);
|
||||
sqlE.printStackTrace();
|
||||
fail("SQLException when running test setUp(): " + sqlE);
|
||||
} finally {
|
||||
try {
|
||||
if (null != st) {
|
||||
st.close();
|
||||
}
|
||||
|
||||
if (null != manager) {
|
||||
manager.close();
|
||||
}
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Got SQLException when closing connection: " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("setUp complete.");
|
||||
}
|
||||
|
||||
private String [] getArgv(String tableName,
|
||||
String... extraArgs) {
|
||||
ArrayList<String> args = new ArrayList<String>();
|
||||
|
||||
CommonArgs.addHadoopFlags(args);
|
||||
|
||||
args.add("--table");
|
||||
args.add(tableName);
|
||||
args.add("--export-dir");
|
||||
args.add(getWarehouseDir());
|
||||
args.add("--fields-terminated-by");
|
||||
args.add(",");
|
||||
args.add("--lines-terminated-by");
|
||||
args.add("\\n");
|
||||
args.add("--connect");
|
||||
args.add(CONNECT_STRING);
|
||||
args.add("--username");
|
||||
args.add(DATABASE_USER);
|
||||
args.add("-m");
|
||||
args.add("1");
|
||||
|
||||
for (String arg : extraArgs) {
|
||||
args.add(arg);
|
||||
}
|
||||
|
||||
return args.toArray(new String[0]);
|
||||
}
|
||||
|
||||
protected void createTestFile(String filename,
|
||||
String[] lines)
|
||||
throws IOException {
|
||||
new File(getWarehouseDir()).mkdirs();
|
||||
File file = new File(getWarehouseDir() + "/" + filename);
|
||||
Writer output = new BufferedWriter(new FileWriter(file));
|
||||
for(String line : lines) {
|
||||
output.write(line);
|
||||
output.write("\n");
|
||||
}
|
||||
output.close();
|
||||
}
|
||||
|
||||
public void testExport() throws IOException, SQLException {
|
||||
createTestFile("inputFile", new String[] {
|
||||
"2,Bob,2009-04-20,400,sales",
|
||||
"3,Fred,2009-01-23,15,marketing",
|
||||
});
|
||||
|
||||
runExport(getArgv(TABLE_NAME));
|
||||
|
||||
assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection);
|
||||
}
|
||||
|
||||
public void testExportStaging() throws IOException, SQLException {
|
||||
createTestFile("inputFile", new String[] {
|
||||
"2,Bob,2009-04-20,400,sales",
|
||||
"3,Fred,2009-01-23,15,marketing",
|
||||
});
|
||||
|
||||
String[] extra = new String[] {"--staging-table", STAGING_TABLE_NAME, };
|
||||
|
||||
runExport(getArgv(TABLE_NAME, extra));
|
||||
|
||||
assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection);
|
||||
}
|
||||
|
||||
public void testExportDirect() throws IOException, SQLException {
|
||||
createTestFile("inputFile", new String[] {
|
||||
"2,Bob,2009-04-20,400,sales",
|
||||
"3,Fred,2009-01-23,15,marketing",
|
||||
});
|
||||
|
||||
String[] extra = new String[] {"--direct"};
|
||||
|
||||
runExport(getArgv(TABLE_NAME, extra));
|
||||
|
||||
assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection);
|
||||
}
|
||||
|
||||
public void testExportCustomSchema() throws IOException, SQLException {
|
||||
createTestFile("inputFile", new String[] {
|
||||
"2,Bob,2009-04-20,400,sales",
|
||||
"3,Fred,2009-01-23,15,marketing",
|
||||
});
|
||||
|
||||
String[] extra = new String[] {"--",
|
||||
"--schema",
|
||||
SCHEMA_SPECIAL,
|
||||
};
|
||||
|
||||
runExport(getArgv(TABLE_NAME, extra));
|
||||
|
||||
assertRowCount(2,
|
||||
escapeTableOrSchemaName(SCHEMA_SPECIAL)
|
||||
+ "." + escapeTableOrSchemaName(TABLE_NAME),
|
||||
connection);
|
||||
}
|
||||
|
||||
public void testExportCustomSchemaStaging() throws IOException, SQLException {
|
||||
createTestFile("inputFile", new String[] {
|
||||
"2,Bob,2009-04-20,400,sales",
|
||||
"3,Fred,2009-01-23,15,marketing",
|
||||
});
|
||||
|
||||
String[] extra = new String[] {
|
||||
"--staging-table",
|
||||
STAGING_TABLE_NAME,
|
||||
"--",
|
||||
"--schema",
|
||||
SCHEMA_SPECIAL,
|
||||
};
|
||||
|
||||
runExport(getArgv(TABLE_NAME, extra));
|
||||
|
||||
assertRowCount(2,
|
||||
escapeTableOrSchemaName(SCHEMA_SPECIAL)
|
||||
+ "." + escapeTableOrSchemaName(TABLE_NAME),
|
||||
connection);
|
||||
}
|
||||
|
||||
public void testExportCustomSchemaStagingClear()
|
||||
throws IOException, SQLException {
|
||||
createTestFile("inputFile", new String[] {
|
||||
"2,Bob,2009-04-20,400,sales",
|
||||
"3,Fred,2009-01-23,15,marketing",
|
||||
});
|
||||
|
||||
String[] extra = new String[] {
|
||||
"--staging-table",
|
||||
STAGING_TABLE_NAME,
|
||||
"--clear-staging-table",
|
||||
"--",
|
||||
"--schema",
|
||||
SCHEMA_SPECIAL,
|
||||
};
|
||||
|
||||
runExport(getArgv(TABLE_NAME, extra));
|
||||
|
||||
assertRowCount(2,
|
||||
escapeTableOrSchemaName(SCHEMA_SPECIAL)
|
||||
+ "." + escapeTableOrSchemaName(TABLE_NAME),
|
||||
connection);
|
||||
}
|
||||
|
||||
public void testExportCustomSchemaDirect() throws IOException, SQLException {
|
||||
createTestFile("inputFile", new String[] {
|
||||
"2,Bob,2009-04-20,400,sales",
|
||||
"3,Fred,2009-01-23,15,marketing",
|
||||
});
|
||||
|
||||
String[] extra = new String[] {
|
||||
"--direct",
|
||||
"--",
|
||||
"--schema",
|
||||
SCHEMA_SPECIAL,
|
||||
};
|
||||
|
||||
runExport(getArgv(TABLE_NAME, extra));
|
||||
|
||||
assertRowCount(2,
|
||||
escapeTableOrSchemaName(SCHEMA_SPECIAL)
|
||||
+ "." + escapeTableOrSchemaName(TABLE_NAME),
|
||||
connection);
|
||||
}
|
||||
|
||||
public static void assertRowCount(long expected,
|
||||
String tableName,
|
||||
Connection connection) {
|
||||
Statement stmt = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
stmt = connection.createStatement();
|
||||
rs = stmt.executeQuery("SELECT count(*) FROM " + tableName);
|
||||
|
||||
rs.next();
|
||||
|
||||
assertEquals(expected, rs.getLong(1));
|
||||
} catch (SQLException e) {
|
||||
LOG.error("Can't verify number of rows", e);
|
||||
fail();
|
||||
} finally {
|
||||
try {
|
||||
connection.commit();
|
||||
|
||||
if (stmt != null) {
|
||||
stmt.close();
|
||||
}
|
||||
if (rs != null) {
|
||||
rs.close();
|
||||
}
|
||||
} catch (SQLException ex) {
|
||||
LOG.info("Ignored exception in finally block.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public String escapeTableOrSchemaName(String tableName) {
|
||||
return "\"" + tableName + "\"";
|
||||
}
|
||||
}
|
@ -49,7 +49,7 @@
|
||||
*
|
||||
* Since this requires a Postgresql installation on your local machine to use,
|
||||
* this class is named in such a way that Hadoop's default QA process does not
|
||||
* run it. You need to run this manually with -Dtestcase=PostgresqlTest or
|
||||
* run it. You need to run this manually with -Dtestcase=PostgresqlImportTest or
|
||||
* -Dthirdparty=true.
|
||||
*
|
||||
* You need to put Postgresql's JDBC driver library into a location where
|
||||
@ -76,13 +76,14 @@
|
||||
* $ sudo -u postgres psql -U postgres template1
|
||||
* template1=> CREATE USER sqooptest;
|
||||
* template1=> CREATE DATABASE sqooptest;
|
||||
* template1=> GRANT ALL ON DATABASE sqooptest TO sqooptest;
|
||||
* template1=> \q
|
||||
*
|
||||
*/
|
||||
public class PostgresqlTest extends ImportJobTestCase {
|
||||
public class PostgresqlImportTest extends ImportJobTestCase {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
PostgresqlTest.class.getName());
|
||||
PostgresqlImportTest.class.getName());
|
||||
|
||||
static final String HOST_URL = System.getProperty(
|
||||
"sqoop.test.postgresql.connectstring.host_url",
|
||||
@ -92,6 +93,9 @@ public class PostgresqlTest extends ImportJobTestCase {
|
||||
static final String DATABASE_NAME = "sqooptest";
|
||||
static final String TABLE_NAME = "EMPLOYEES_PG";
|
||||
static final String SPECIAL_TABLE_NAME = "EMPLOYEES_PG's";
|
||||
static final String DIFFERENT_TABLE_NAME = "DIFFERENT_TABLE";
|
||||
static final String SCHEMA_PUBLIC = "public";
|
||||
static final String SCHEMA_SPECIAL = "special";
|
||||
static final String CONNECT_STRING = HOST_URL + DATABASE_NAME;
|
||||
|
||||
@Override
|
||||
@ -105,13 +109,14 @@ public void setUp() {
|
||||
|
||||
LOG.debug("Setting up another postgresql test: " + CONNECT_STRING);
|
||||
|
||||
setUpData(TABLE_NAME);
|
||||
setUpData(SPECIAL_TABLE_NAME);
|
||||
setUpData(TABLE_NAME, SCHEMA_PUBLIC);
|
||||
setUpData(SPECIAL_TABLE_NAME, SCHEMA_PUBLIC);
|
||||
setUpData(DIFFERENT_TABLE_NAME, SCHEMA_SPECIAL);
|
||||
|
||||
LOG.debug("setUp complete.");
|
||||
}
|
||||
|
||||
public void setUpData(String tableName) {
|
||||
public void setUpData(String tableName, String schema) {
|
||||
SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName);
|
||||
options.setUsername(DATABASE_USER);
|
||||
|
||||
@ -125,33 +130,44 @@ public void setUpData(String tableName) {
|
||||
connection.setAutoCommit(false);
|
||||
st = connection.createStatement();
|
||||
|
||||
// create the database table and populate it with data.
|
||||
// Create schema if not exists in dummy way (always create and ignore
|
||||
// errors.
|
||||
try {
|
||||
st.executeUpdate("CREATE SCHEMA " + manager.escapeTableName(schema));
|
||||
connection.commit();
|
||||
} catch (SQLException e) {
|
||||
LOG.info("Couldn't create schema " + schema + " (is o.k. as long as"
|
||||
+ "the schema already exists.", e);
|
||||
connection.rollback();
|
||||
}
|
||||
|
||||
String fullTableName = manager.escapeTableName(schema)
|
||||
+ "." + manager.escapeTableName(tableName);
|
||||
|
||||
try {
|
||||
// Try to remove the table first. DROP TABLE IF EXISTS didn't
|
||||
// get added until pg 8.3, so we just use "DROP TABLE" and ignore
|
||||
// any exception here if one occurs.
|
||||
st.executeUpdate("DROP TABLE " + manager.escapeTableName(tableName));
|
||||
st.executeUpdate("DROP TABLE " + fullTableName);
|
||||
} catch (SQLException e) {
|
||||
LOG.info("Couldn't drop table " + tableName + " (ok)");
|
||||
LOG.info(e.toString());
|
||||
LOG.info("Couldn't drop table " + schema + "." + tableName + " (ok)",
|
||||
e);
|
||||
// Now we need to reset the transaction.
|
||||
connection.rollback();
|
||||
}
|
||||
|
||||
st.executeUpdate("CREATE TABLE " + manager.escapeTableName(tableName)
|
||||
+ " ("
|
||||
st.executeUpdate("CREATE TABLE " + fullTableName + " ("
|
||||
+ manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, "
|
||||
+ manager.escapeColName("name") + " VARCHAR(24) NOT NULL, "
|
||||
+ manager.escapeColName("start_date") + " DATE, "
|
||||
+ manager.escapeColName("salary") + " FLOAT, "
|
||||
+ manager.escapeColName("dept") + " VARCHAR(32))");
|
||||
|
||||
st.executeUpdate("INSERT INTO " + manager.escapeTableName(tableName)
|
||||
st.executeUpdate("INSERT INTO " + fullTableName
|
||||
+ " VALUES(1,'Aaron','2009-05-14',1000000.00,'engineering')");
|
||||
st.executeUpdate("INSERT INTO " + manager.escapeTableName(tableName)
|
||||
st.executeUpdate("INSERT INTO " + fullTableName
|
||||
+ " VALUES(2,'Bob','2009-04-20',400.00,'sales')");
|
||||
st.executeUpdate("INSERT INTO " + manager.escapeTableName(tableName)
|
||||
st.executeUpdate("INSERT INTO " + fullTableName
|
||||
+ " VALUES(3,'Fred','2009-01-23',15.00,'marketing')");
|
||||
connection.commit();
|
||||
} catch (SQLException sqlE) {
|
||||
@ -303,4 +319,32 @@ public void testIncrementalImport() throws IOException {
|
||||
|
||||
doImportAndVerify(false, expectedResults, TABLE_NAME, extraArgs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDifferentSchemaImport() throws IOException {
|
||||
String [] expectedResults = {
|
||||
"2,Bob,2009-04-20,400.0,sales",
|
||||
"3,Fred,2009-01-23,15.0,marketing",
|
||||
};
|
||||
|
||||
String [] extraArgs = { "--",
|
||||
"--schema", SCHEMA_SPECIAL,
|
||||
};
|
||||
|
||||
doImportAndVerify(false, expectedResults, DIFFERENT_TABLE_NAME, extraArgs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDifferentSchemaImportDirect() throws IOException {
|
||||
String [] expectedResults = {
|
||||
"2,Bob,2009-04-20,400,sales",
|
||||
"3,Fred,2009-01-23,15,marketing",
|
||||
};
|
||||
|
||||
String [] extraArgs = { "--",
|
||||
"--schema", SCHEMA_SPECIAL,
|
||||
};
|
||||
|
||||
doImportAndVerify(true, expectedResults, DIFFERENT_TABLE_NAME, extraArgs);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user