From 57d4d3aa0290fa7cfcc82ce2814bdec908cc7014 Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Fri, 15 Mar 2013 13:22:07 -0700 Subject: [PATCH] SQOOP-903: Sqoop2: Add schema support to Generic JDBC Connector (Abraham Elmahrek via Jarek Jarcec Cecho) --- .../jdbc/GenericJdbcExportInitializer.java | 10 +- .../jdbc/GenericJdbcImportInitializer.java | 15 +- .../connector/jdbc/GenericJdbcValidator.java | 3 + .../jdbc/configuration/ExportTableForm.java | 1 + .../jdbc/configuration/ImportTableForm.java | 1 + ...eneric-jdbc-connector-resources.properties | 4 + .../connector/jdbc/TestExportInitializer.java | 110 +++++++++-- .../connector/jdbc/TestImportInitializer.java | 173 ++++++++++++++++-- 8 files changed, 285 insertions(+), 32 deletions(-) diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java index 520b0bbc..40a77749 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java @@ -67,6 +67,7 @@ private void configureJdbcProperties(MutableContext context, ConnectionConfigura private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) { String dataSql; + String schemaName = jobConfig.table.schemaName; String tableName = jobConfig.table.tableName; String tableSql = jobConfig.table.sql; String tableColumns = jobConfig.table.columns; @@ -79,12 +80,15 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur } else if (tableName != null) { // when table name is specified: + // For databases that support schemas (IE: postgresql). + String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + if (tableColumns == null) { String[] columns = executor.getQueryColumns("SELECT * FROM " - + executor.delimitIdentifier(tableName) + " WHERE 1 = 0"); + + fullTableName + " WHERE 1 = 0"); StringBuilder builder = new StringBuilder(); builder.append("INSERT INTO "); - builder.append(executor.delimitIdentifier(tableName)); + builder.append(fullTableName); builder.append(" VALUES (?"); for (int i = 1; i < columns.length; i++) { builder.append(",?"); @@ -96,7 +100,7 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur String[] columns = StringUtils.split(tableColumns, ','); StringBuilder builder = new StringBuilder(); builder.append("INSERT INTO "); - builder.append(executor.delimitIdentifier(tableName)); + builder.append(fullTableName); builder.append(" ("); builder.append(tableColumns); builder.append(") VALUES (?"); diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java index 46c7ee7c..3e9789c3 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java @@ -104,6 +104,7 @@ private void configurePartitionProperties(MutableContext context, ConnectionConf if (minMaxQuery == null) { StringBuilder builder = new StringBuilder(); + String schemaName = jobConfig.table.schemaName; String tableName = jobConfig.table.tableName; String tableSql = jobConfig.table.sql; @@ -114,13 +115,17 @@ private void configurePartitionProperties(MutableContext context, ConnectionConf } else if (tableName != null) { // when table name is specified: + + // For databases that support schemas (IE: postgresql). + String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + String column = partitionColumnName; builder.append("SELECT MIN("); builder.append(column); builder.append("), MAX("); builder.append(column); builder.append(") FROM "); - builder.append(executor.delimitIdentifier(tableName)); + builder.append(fullTableName); } else if (tableSql != null) { String column = executor.qualify( @@ -177,6 +182,7 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur String dataSql; String fieldNames; + String schemaName = jobConfig.table.schemaName; String tableName = jobConfig.table.tableName; String tableSql = jobConfig.table.sql; String tableColumns = jobConfig.table.columns; @@ -189,10 +195,13 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur } else if (tableName != null) { // when table name is specified: + // For databases that support schemas (IE: postgresql). + String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + if (tableColumns == null) { StringBuilder builder = new StringBuilder(); builder.append("SELECT * FROM "); - builder.append(executor.delimitIdentifier(tableName)); + builder.append(fullTableName); builder.append(" WHERE "); builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN); dataSql = builder.toString(); @@ -206,7 +215,7 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur builder.append("SELECT "); builder.append(tableColumns); builder.append(" FROM "); - builder.append(executor.delimitIdentifier(tableName)); + builder.append(fullTableName); builder.append(" WHERE "); builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN); dataSql = builder.toString(); diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java index e098fbc1..4e24517f 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java @@ -102,6 +102,9 @@ private Validation validateImportJob(Object jobConfiguration) { if(configuration.table.tableName != null && configuration.table.sql != null) { validation.addMessage(Status.UNACCEPTABLE, "table", "Both table name and SQL cannot be specified"); } + if(configuration.table.schemaName != null && configuration.table.sql != null) { + validation.addMessage(Status.UNACCEPTABLE, "table", "Both schema name and SQL cannot be specified"); + } if(configuration.table.sql != null && !configuration.table.sql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN)) { validation.addMessage(Status.UNACCEPTABLE, "table", "sql", "SQL statement must contain placeholder for auto generated " diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java index 718d1fbc..ee4bb6e5 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java @@ -25,6 +25,7 @@ */ @FormClass public class ExportTableForm { + @Input(size = 50) public String schemaName; @Input(size = 50) public String tableName; @Input(size = 50) public String sql; @Input(size = 50) public String columns; diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java index d150779f..3422a8fa 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java @@ -25,6 +25,7 @@ */ @FormClass public class ImportTableForm { + @Input(size = 50) public String schemaName; @Input(size = 50) public String tableName; @Input(size = 50) public String sql; @Input(size = 50) public String columns; diff --git a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties index 6ab42967..44fc984d 100644 --- a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties +++ b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties @@ -53,6 +53,10 @@ table.label = Database configuration table.help = You must supply the information requested in order to create \ a job object. +# Schema name +table.schemaName.label = Schema name +table.schemaName.help = Schema name to process data in the remote database + # Table name table.tableName.label = Table name table.tableName.help = Table name to process data in the remote database diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java index bb0c23b9..f83aaa28 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java @@ -28,15 +28,21 @@ public class TestExportInitializer extends TestCase { + private final String schemaName; private final String tableName; + private final String schemalessTableName; private final String tableSql; + private final String schemalessTableSql; private final String tableColumns; private GenericJdbcExecutor executor; public TestExportInitializer() { - tableName = getClass().getSimpleName().toUpperCase(); + schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA"; + tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA"; + schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE"; tableSql = "INSERT INTO " + tableName + " VALUES (?,?,?)"; + schemalessTableSql = "INSERT INTO " + schemalessTableName + " VALUES (?,?,?)"; tableColumns = "ICOL,VCOL"; } @@ -45,10 +51,15 @@ public void setUp() { executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, GenericJdbcTestConstants.URL, null, null); + String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); if (!executor.existTable(tableName)) { - executor.executeUpdate("CREATE TABLE " - + executor.delimitIdentifier(tableName) - + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName)); + executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + } + + fullTableName = executor.delimitIdentifier(schemalessTableName); + if (!executor.existTable(schemalessTableName)) { + executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); } } @@ -57,62 +68,131 @@ public void tearDown() { executor.close(); } + @SuppressWarnings("unchecked") public void testTableName() throws Exception { ConnectionConfiguration connConf = new ConnectionConfiguration(); ExportJobConfiguration jobConf = new ExportJobConfiguration(); + String fullTableName = executor.delimitIdentifier(schemalessTableName); + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; connConf.connection.connectionString = GenericJdbcTestConstants.URL; - jobConf.table.tableName = tableName; + jobConf.table.tableName = schemalessTableName; MutableContext context = new MutableMapContext(); InitializerContext initializerContext = new InitializerContext(context); + @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcExportInitializer(); initializer.initialize(initializerContext, connConf, jobConf); - verifyResult(context, - "INSERT INTO " + executor.delimitIdentifier(tableName) - + " VALUES (?,?,?)"); + verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)"); } + @SuppressWarnings("unchecked") public void testTableNameWithTableColumns() throws Exception { ConnectionConfiguration connConf = new ConnectionConfiguration(); ExportJobConfiguration jobConf = new ExportJobConfiguration(); + String fullTableName = executor.delimitIdentifier(schemalessTableName); + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; connConf.connection.connectionString = GenericJdbcTestConstants.URL; - jobConf.table.tableName = tableName; + jobConf.table.tableName = schemalessTableName; jobConf.table.columns = tableColumns; MutableContext context = new MutableMapContext(); InitializerContext initializerContext = new InitializerContext(context); + @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcExportInitializer(); initializer.initialize(initializerContext, connConf, jobConf); - verifyResult(context, - "INSERT INTO " + executor.delimitIdentifier(tableName) - + " (" + tableColumns + ") VALUES (?,?)"); + verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)"); } + @SuppressWarnings("unchecked") public void testTableSql() throws Exception { ConnectionConfiguration connConf = new ConnectionConfiguration(); ExportJobConfiguration jobConf = new ExportJobConfiguration(); connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.table.sql = schemalessTableSql; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcExportInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(schemalessTableName) + " VALUES (?,?,?)"); + } + + @SuppressWarnings("unchecked") + public void testTableNameWithSchema() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ExportJobConfiguration jobConf = new ExportJobConfiguration(); + + String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.table.schemaName = schemaName; + jobConf.table.tableName = tableName; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcExportInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)"); + } + + @SuppressWarnings("unchecked") + public void testTableNameWithTableColumnsWithSchema() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ExportJobConfiguration jobConf = new ExportJobConfiguration(); + + String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.table.schemaName = schemaName; + jobConf.table.tableName = tableName; + jobConf.table.columns = tableColumns; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcExportInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)"); + } + + @SuppressWarnings("unchecked") + public void testTableSqlWithSchema() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ExportJobConfiguration jobConf = new ExportJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.table.schemaName = schemaName; jobConf.table.sql = tableSql; MutableContext context = new MutableMapContext(); InitializerContext initializerContext = new InitializerContext(context); + @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcExportInitializer(); initializer.initialize(initializerContext, connConf, jobConf); - verifyResult(context, - "INSERT INTO " + executor.delimitIdentifier(tableName) - + " VALUES (?,?,?)"); + verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?)"); } private void verifyResult(MutableContext context, String dataSql) { diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java index 45835bd6..9f4269a0 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java @@ -31,8 +31,11 @@ public class TestImportInitializer extends TestCase { + private final String schemaName; private final String tableName; + private final String schemalessTableName; private final String tableSql; + private final String schemalessTableSql; private final String tableColumns; private GenericJdbcExecutor executor; @@ -41,8 +44,11 @@ public class TestImportInitializer extends TestCase { private static final int NUMBER_OF_ROWS = 101; public TestImportInitializer() { - tableName = getClass().getSimpleName().toUpperCase(); - tableSql = "SELECT * FROM " + tableName + " WHERE ${CONDITIONS}"; + schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA"; + tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA"; + schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE"; + tableSql = "SELECT * FROM " + schemaName + "." + tableName + " WHERE ${CONDITIONS}"; + schemalessTableSql = "SELECT * FROM " + schemalessTableName + " WHERE ${CONDITIONS}"; tableColumns = "ICOL,VCOL"; } @@ -51,14 +57,30 @@ public void setUp() { executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, GenericJdbcTestConstants.URL, null, null); + String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); if (!executor.existTable(tableName)) { + executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName)); executor.executeUpdate("CREATE TABLE " - + executor.delimitIdentifier(tableName) + + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); for (int i = 0; i < NUMBER_OF_ROWS; i++) { int value = START + i; - String sql = "INSERT INTO " + executor.delimitIdentifier(tableName) + String sql = "INSERT INTO " + fullTableName + + " VALUES(" + value + ", " + value + ", '" + value + "')"; + executor.executeUpdate(sql); + } + } + + fullTableName = executor.delimitIdentifier(schemalessTableName); + if (!executor.existTable(schemalessTableName)) { + executor.executeUpdate("CREATE TABLE " + + fullTableName + + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + + for (int i = 0; i < NUMBER_OF_ROWS; i++) { + int value = START + i; + String sql = "INSERT INTO " + fullTableName + " VALUES(" + value + ", " + value + ", '" + value + "')"; executor.executeUpdate(sql); } @@ -70,22 +92,24 @@ public void tearDown() { executor.close(); } + @SuppressWarnings("unchecked") public void testTableName() throws Exception { ConnectionConfiguration connConf = new ConnectionConfiguration(); ImportJobConfiguration jobConf = new ImportJobConfiguration(); connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; connConf.connection.connectionString = GenericJdbcTestConstants.URL; - jobConf.table.tableName = tableName; + jobConf.table.tableName = schemalessTableName; MutableContext context = new MutableMapContext(); InitializerContext initializerContext = new InitializerContext(context); + @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcImportInitializer(); initializer.initialize(initializerContext, connConf, jobConf); verifyResult(context, - "SELECT * FROM " + executor.delimitIdentifier(tableName) + "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}", "ICOL,DCOL,VCOL", "ICOL", @@ -94,23 +118,25 @@ public void testTableName() throws Exception { String.valueOf(START+NUMBER_OF_ROWS-1)); } + @SuppressWarnings("unchecked") public void testTableNameWithTableColumns() throws Exception { ConnectionConfiguration connConf = new ConnectionConfiguration(); ImportJobConfiguration jobConf = new ImportJobConfiguration(); connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; connConf.connection.connectionString = GenericJdbcTestConstants.URL; - jobConf.table.tableName = tableName; + jobConf.table.tableName = schemalessTableName; jobConf.table.columns = tableColumns; MutableContext context = new MutableMapContext(); InitializerContext initializerContext = new InitializerContext(context); + @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcImportInitializer(); initializer.initialize(initializerContext, connConf, jobConf); verifyResult(context, - "SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(tableName) + "SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}", tableColumns, "ICOL", @@ -119,23 +145,25 @@ public void testTableNameWithTableColumns() throws Exception { String.valueOf(START+NUMBER_OF_ROWS-1)); } + @SuppressWarnings("unchecked") public void testTableSql() throws Exception { ConnectionConfiguration connConf = new ConnectionConfiguration(); ImportJobConfiguration jobConf = new ImportJobConfiguration(); connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; connConf.connection.connectionString = GenericJdbcTestConstants.URL; - jobConf.table.sql = tableSql; + jobConf.table.sql = schemalessTableSql; jobConf.table.partitionColumn = "DCOL"; MutableContext context = new MutableMapContext(); InitializerContext initializerContext = new InitializerContext(context); + @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcImportInitializer(); initializer.initialize(initializerContext, connConf, jobConf); verifyResult(context, - "SELECT * FROM " + executor.delimitIdentifier(tableName) + "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}", "ICOL,DCOL,VCOL", "DCOL", @@ -144,12 +172,134 @@ public void testTableSql() throws Exception { String.valueOf((double)(START+NUMBER_OF_ROWS-1))); } + @SuppressWarnings("unchecked") public void testTableSqlWithTableColumns() throws Exception { ConnectionConfiguration connConf = new ConnectionConfiguration(); ImportJobConfiguration jobConf = new ImportJobConfiguration(); connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.table.sql = schemalessTableSql; + jobConf.table.columns = tableColumns; + jobConf.table.partitionColumn = "DCOL"; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcImportInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, + "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " + + "(SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS", + tableColumns, + "DCOL", + String.valueOf(Types.DOUBLE), + String.valueOf((double)START), + String.valueOf((double)(START+NUMBER_OF_ROWS-1))); + } + + @SuppressWarnings("unchecked") + public void testTableNameWithSchema() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ImportJobConfiguration jobConf = new ImportJobConfiguration(); + + String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.table.schemaName = schemaName; + jobConf.table.tableName = tableName; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcImportInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, + "SELECT * FROM " + fullTableName + + " WHERE ${CONDITIONS}", + "ICOL,DCOL,VCOL", + "ICOL", + String.valueOf(Types.INTEGER), + String.valueOf(START), + String.valueOf(START+NUMBER_OF_ROWS-1)); + } + + @SuppressWarnings("unchecked") + public void testTableNameWithTableColumnsWithSchema() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ImportJobConfiguration jobConf = new ImportJobConfiguration(); + + String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.table.schemaName = schemaName; + jobConf.table.tableName = tableName; + jobConf.table.columns = tableColumns; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcImportInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, + "SELECT ICOL,VCOL FROM " + fullTableName + + " WHERE ${CONDITIONS}", + tableColumns, + "ICOL", + String.valueOf(Types.INTEGER), + String.valueOf(START), + String.valueOf(START+NUMBER_OF_ROWS-1)); + } + + @SuppressWarnings("unchecked") + public void testTableSqlWithSchema() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ImportJobConfiguration jobConf = new ImportJobConfiguration(); + + String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.table.schemaName = schemaName; + jobConf.table.sql = tableSql; + jobConf.table.partitionColumn = "DCOL"; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcImportInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, + "SELECT * FROM " + fullTableName + + " WHERE ${CONDITIONS}", + "ICOL,DCOL,VCOL", + "DCOL", + String.valueOf(Types.DOUBLE), + String.valueOf((double)START), + String.valueOf((double)(START+NUMBER_OF_ROWS-1))); + } + + @SuppressWarnings("unchecked") + public void testTableSqlWithTableColumnsWithSchema() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ImportJobConfiguration jobConf = new ImportJobConfiguration(); + + String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.table.schemaName = schemaName; jobConf.table.sql = tableSql; jobConf.table.columns = tableColumns; jobConf.table.partitionColumn = "DCOL"; @@ -157,12 +307,13 @@ public void testTableSqlWithTableColumns() throws Exception { MutableContext context = new MutableMapContext(); InitializerContext initializerContext = new InitializerContext(context); + @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcImportInitializer(); initializer.initialize(initializerContext, connConf, jobConf); verifyResult(context, "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " - + "(SELECT * FROM " + executor.delimitIdentifier(tableName) + + "(SELECT * FROM " + fullTableName + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS", tableColumns, "DCOL",