From 2873a4b1ec4bad01efb7df23944af5579ce1771c Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Wed, 20 Aug 2014 16:05:33 -0700 Subject: [PATCH] SQOOP-1463: Sqoop2: From/To: Re-enable generic-jdbc-connector test cases (Abraham Elmahrek via Jarek Jarcec Cecho) --- .../jdbc/GenericJdbcConnectorConstants.java | 4 +- .../jdbc/GenericJdbcConnectorError.java | 4 +- .../connector/jdbc/GenericJdbcExtractor.java | 2 +- .../jdbc/GenericJdbcFromInitializer.java | 4 +- .../connector/jdbc/GenericJdbcLoader.java | 2 +- .../jdbc/GenericJdbcToInitializer.java | 2 +- .../connector/jdbc/GenericJdbcValidator.java | 36 +- .../jdbc/GenericJdbcExecutorTest.java | 130 ++--- .../connector/jdbc/TestExportInitializer.java | 365 ------------- .../connector/jdbc/TestExportLoader.java | 143 ----- .../sqoop/connector/jdbc/TestExtractor.java | 160 ++++++ .../connector/jdbc/TestFromInitializer.java | 404 ++++++++++++++ .../connector/jdbc/TestImportExtractor.java | 160 ------ .../connector/jdbc/TestImportInitializer.java | 404 -------------- .../connector/jdbc/TestImportPartitioner.java | 505 ------------------ .../sqoop/connector/jdbc/TestLoader.java | 142 +++++ .../sqoop/connector/jdbc/TestPartitioner.java | 503 +++++++++++++++++ .../connector/jdbc/TestToInitializer.java | 362 +++++++++++++ 18 files changed, 1667 insertions(+), 1665 deletions(-) delete mode 100644 connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java delete mode 100644 connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java create mode 100644 connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java create mode 100644 connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java delete mode 100644 connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java delete mode 100644 connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java delete mode 100644 connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java create mode 100644 connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java create mode 100644 connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java create mode 100644 connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java index a51fb7dc..62da2dbc 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java @@ -42,9 +42,9 @@ public final class GenericJdbcConnectorConstants { public static final String CONNECTOR_JDBC_PARTITION_MAXVALUE = PREFIX_CONNECTOR_JDBC_CONFIG + "partition.maxvalue"; - public static final String CONNECTOR_FROM_JDBC_DATA_SQL = + public static final String CONNECTOR_JDBC_FROM_DATA_SQL = PREFIX_CONNECTOR_JDBC_CONFIG + "from.data.sql"; - public static final String CONNECTOR_TO_JDBC_DATA_SQL = + public static final String CONNECTOR_JDBC_TO_DATA_SQL = PREFIX_CONNECTOR_JDBC_CONFIG + "to.data.sql"; public static final String SQL_CONDITIONS_TOKEN = "${CONDITIONS}"; diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java index c3747505..c291cb2b 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java @@ -79,7 +79,9 @@ public enum GenericJdbcConnectorError implements ErrorCode { GENERIC_JDBC_CONNECTOR_0018("Error occurred while transferring data from " + "stage table to destination table."), - GENERIC_JDBC_CONNECTOR_0019("Table name extraction not supported.") + GENERIC_JDBC_CONNECTOR_0019("Table name extraction not supported."), + + GENERIC_JDBC_CONNECTOR_0020("Unknown direction.") ; private final String message; diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java index 24281993..9915603f 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java @@ -41,7 +41,7 @@ public void extract(ExtractorContext context, ConnectionConfiguration connection String password = connection.connection.password; GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password); - String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_FROM_JDBC_DATA_SQL); + String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL); String conditions = partition.getConditions(); query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions); LOG.info("Using query: " + query); diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java index bd7b17db..fbbe5063 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java @@ -80,7 +80,7 @@ public Schema getSchema(InitializerContext context, ConnectionConfiguration conn ResultSetMetaData rsmt = null; try { rs = executor.executeQuery( - context.getString(GenericJdbcConnectorConstants.CONNECTOR_FROM_JDBC_DATA_SQL) + context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL) .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0") ); @@ -316,7 +316,7 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur LOG.info("Using dataSql: " + dataSql); LOG.info("Field names: " + fieldNames); - context.setString(GenericJdbcConnectorConstants.CONNECTOR_FROM_JDBC_DATA_SQL, dataSql); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL, dataSql); context.setString(Constants.JOB_ETL_FIELD_NAMES, fieldNames); } } diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java index 7d583c5c..07ae9888 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java @@ -38,7 +38,7 @@ public void load(LoaderContext context, ConnectionConfiguration connection, ToJo GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password); executor.setAutoCommit(false); - String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_TO_JDBC_DATA_SQL); + String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL); executor.beginBatch(sql); try { int numberOfRows = 0; diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java index 9b9b6d00..73a49b19 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java @@ -216,7 +216,7 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008); } - context.setString(GenericJdbcConnectorConstants.CONNECTOR_TO_JDBC_DATA_SQL, + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL, dataSql); } } diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java index 756bc34d..eea86b27 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java @@ -17,6 +17,7 @@ */ package org.apache.sqoop.connector.jdbc; +import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration; @@ -67,43 +68,48 @@ public Validation validateConnection(Object configuration) { @Override public Validation validateJob(Object jobConfiguration) { - return super.validateJob(jobConfiguration); + if (jobConfiguration instanceof FromJobConfiguration) { + return validateFromJobConfiguration((FromJobConfiguration)jobConfiguration); + } else if (jobConfiguration instanceof ToJobConfiguration) { + return validateToJobConfiguration((ToJobConfiguration)jobConfiguration); + } else { + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0020, + "Configuration object for unknown direction."); + } } - private Validation validateExportJob(Object jobConfiguration) { + private Validation validateToJobConfiguration(ToJobConfiguration configuration) { Validation validation = new Validation(ToJobConfiguration.class); - ToJobConfiguration configuration = (ToJobConfiguration)jobConfiguration; if(configuration.toTable.tableName == null && configuration.toTable.sql == null) { - validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Either fromTable name or SQL must be specified"); + validation.addMessage(Status.UNACCEPTABLE, "toTable", "Either table name or SQL must be specified"); } if(configuration.toTable.tableName != null && configuration.toTable.sql != null) { - validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Both fromTable name and SQL cannot be specified"); + validation.addMessage(Status.UNACCEPTABLE, "toTable", "Both table name and SQL cannot be specified"); } if(configuration.toTable.tableName == null && configuration.toTable.stageTableName != null) { - validation.addMessage(Status.UNACCEPTABLE, "fromTable", - "Stage fromTable name cannot be specified without specifying fromTable name"); + validation.addMessage(Status.UNACCEPTABLE, "toTable", + "Stage table name cannot be specified without specifying table name"); } if(configuration.toTable.stageTableName == null && configuration.toTable.clearStageTable != null) { - validation.addMessage(Status.UNACCEPTABLE, "fromTable", - "Clear stage fromTable cannot be specified without specifying name of " + - "the stage fromTable."); + validation.addMessage(Status.UNACCEPTABLE, "toTable", + "Clear stage table cannot be specified without specifying name of " + + "the stage table."); } return validation; } - private Validation validateImportJob(Object jobConfiguration) { - Validation validation = new Validation(FromJobConfiguration.class); - FromJobConfiguration configuration = (FromJobConfiguration)jobConfiguration; + private Validation validateFromJobConfiguration(FromJobConfiguration configuration) { + Validation validation = new Validation(ToJobConfiguration.class); if(configuration.fromTable.tableName == null && configuration.fromTable.sql == null) { - validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Either fromTable name or SQL must be specified"); + validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Either table name or SQL must be specified"); } if(configuration.fromTable.tableName != null && configuration.fromTable.sql != null) { - validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Both fromTable name and SQL cannot be specified"); + validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Both table name and SQL cannot be specified"); } if(configuration.fromTable.schemaName != null && configuration.fromTable.sql != null) { validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Both schema name and SQL cannot be specified"); diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java index 3ea2c76e..e10a5b48 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java @@ -20,69 +20,69 @@ import junit.framework.TestCase; public class GenericJdbcExecutorTest extends TestCase { -// private final String fromTable; -// private final String emptyTable; -// private final GenericJdbcExecutor executor; -// -// private static final int START = -50; -// private static final int NUMBER_OF_ROWS = 974; -// -// public GenericJdbcExecutorTest() { -// fromTable = getClass().getSimpleName().toUpperCase(); -// emptyTable = fromTable + "_EMPTY"; -// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, -// GenericJdbcTestConstants.URL, null, null); -// } -// -// @Override -// public void setUp() { -// if(executor.existTable(emptyTable)) { -// executor.executeUpdate("DROP TABLE " + emptyTable); -// } -// executor.executeUpdate("CREATE TABLE " -// + emptyTable + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))"); -// -// if(executor.existTable(fromTable)) { -// executor.executeUpdate("DROP TABLE " + fromTable); -// } -// executor.executeUpdate("CREATE TABLE " -// + fromTable + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))"); -// -// for (int i = 0; i < NUMBER_OF_ROWS; i++) { -// int value = START + i; -// String sql = "INSERT INTO " + fromTable -// + " VALUES(" + value + ", '" + value + "')"; -// executor.executeUpdate(sql); -// } -// } -// -// @SuppressWarnings("unchecked") -// public void testDeleteTableData() throws Exception { -// executor.deleteTableData(fromTable); -// assertEquals("Table " + fromTable + " is expected to be empty.", -// 0, executor.getTableRowCount(fromTable)); -// } -// -// @SuppressWarnings("unchecked") -// public void testMigrateData() throws Exception { -// assertEquals("Table " + emptyTable + " is expected to be empty.", -// 0, executor.getTableRowCount(emptyTable)); -// assertEquals("Table " + fromTable + " is expected to have " + -// NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS, -// executor.getTableRowCount(fromTable)); -// -// executor.migrateData(fromTable, emptyTable); -// -// assertEquals("Table " + fromTable + " is expected to be empty.", 0, -// executor.getTableRowCount(fromTable)); -// assertEquals("Table " + emptyTable + " is expected to have " + -// NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS, -// executor.getTableRowCount(emptyTable)); -// } -// -// @SuppressWarnings("unchecked") -// public void testGetTableRowCount() throws Exception { -// assertEquals("Table " + fromTable + " is expected to be empty.", -// NUMBER_OF_ROWS, executor.getTableRowCount(fromTable)); -// } + private final String table; + private final String emptyTable; + private final GenericJdbcExecutor executor; + + private static final int START = -50; + private static final int NUMBER_OF_ROWS = 974; + + public GenericJdbcExecutorTest() { + table = getClass().getSimpleName().toUpperCase(); + emptyTable = table + "_EMPTY"; + executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, + GenericJdbcTestConstants.URL, null, null); + } + + @Override + public void setUp() { + if(executor.existTable(emptyTable)) { + executor.executeUpdate("DROP TABLE " + emptyTable); + } + executor.executeUpdate("CREATE TABLE " + + emptyTable + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))"); + + if(executor.existTable(table)) { + executor.executeUpdate("DROP TABLE " + table); + } + executor.executeUpdate("CREATE TABLE " + + table + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))"); + + for (int i = 0; i < NUMBER_OF_ROWS; i++) { + int value = START + i; + String sql = "INSERT INTO " + table + + " VALUES(" + value + ", '" + value + "')"; + executor.executeUpdate(sql); + } + } + + @SuppressWarnings("unchecked") + public void testDeleteTableData() throws Exception { + executor.deleteTableData(table); + assertEquals("Table " + table + " is expected to be empty.", + 0, executor.getTableRowCount(table)); + } + + @SuppressWarnings("unchecked") + public void testMigrateData() throws Exception { + assertEquals("Table " + emptyTable + " is expected to be empty.", + 0, executor.getTableRowCount(emptyTable)); + assertEquals("Table " + table + " is expected to have " + + NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS, + executor.getTableRowCount(table)); + + executor.migrateData(table, emptyTable); + + assertEquals("Table " + table + " is expected to be empty.", 0, + executor.getTableRowCount(table)); + assertEquals("Table " + emptyTable + " is expected to have " + + NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS, + executor.getTableRowCount(emptyTable)); + } + + @SuppressWarnings("unchecked") + public void testGetTableRowCount() throws Exception { + assertEquals("Table " + table + " is expected to be empty.", + NUMBER_OF_ROWS, executor.getTableRowCount(table)); + } } diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java deleted file mode 100644 index 97e771d1..00000000 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java +++ /dev/null @@ -1,365 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.jdbc; - -import junit.framework.TestCase; -import org.apache.sqoop.common.MutableContext; -import org.apache.sqoop.common.MutableMapContext; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -//import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; -import org.apache.sqoop.job.etl.Initializer; -import org.apache.sqoop.job.etl.InitializerContext; -import org.apache.sqoop.model.MJob; -import org.apache.sqoop.validation.Status; -import org.apache.sqoop.validation.Validation; - -public class TestExportInitializer extends TestCase { - -// private final String schemaName; -// private final String tableName; -// private final String schemalessTableName; -// private final String stageTableName; -// private final String tableSql; -// private final String schemalessTableSql; -// private final String tableColumns; -// -// private GenericJdbcExecutor executor; -// -// public TestExportInitializer() { -// schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA"; -// tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA"; -// schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE"; -// stageTableName = getClass().getSimpleName().toUpperCase() + -// "_STAGE_TABLE"; -// tableSql = "INSERT INTO " + tableName + " VALUES (?,?,?)"; -// schemalessTableSql = "INSERT INTO " + schemalessTableName + " VALUES (?,?,?)"; -// tableColumns = "ICOL,VCOL"; -// } -// -// @Override -// public void setUp() { -// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, -// GenericJdbcTestConstants.URL, null, null); -// -// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); -// if (!executor.existTable(tableName)) { -// executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName)); -// executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); -// } -// -// fullTableName = executor.delimitIdentifier(schemalessTableName); -// if (!executor.existTable(schemalessTableName)) { -// executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); -// } -// } -// -// @Override -// public void tearDown() { -// executor.close(); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableName() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ExportJobConfiguration jobConf = new ExportJobConfiguration(); -// -// String fullTableName = executor.delimitIdentifier(schemalessTableName); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.tableName = schemalessTableName; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcExportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)"); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableNameWithTableColumns() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ExportJobConfiguration jobConf = new ExportJobConfiguration(); -// -// String fullTableName = executor.delimitIdentifier(schemalessTableName); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.tableName = schemalessTableName; -// jobConf.fromTable.columns = tableColumns; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcExportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)"); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableSql() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ExportJobConfiguration jobConf = new ExportJobConfiguration(); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.sql = schemalessTableSql; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcExportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(schemalessTableName) + " VALUES (?,?,?)"); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableNameWithSchema() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ExportJobConfiguration jobConf = new ExportJobConfiguration(); -// -// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.schemaName = schemaName; -// jobConf.fromTable.tableName = tableName; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcExportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)"); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableNameWithTableColumnsWithSchema() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ExportJobConfiguration jobConf = new ExportJobConfiguration(); -// -// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.schemaName = schemaName; -// jobConf.fromTable.tableName = tableName; -// jobConf.fromTable.columns = tableColumns; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcExportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)"); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableSqlWithSchema() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ExportJobConfiguration jobConf = new ExportJobConfiguration(); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.schemaName = schemaName; -// jobConf.fromTable.sql = tableSql; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcExportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?)"); -// } -// -// private void verifyResult(MutableContext context, String dataSql) { -// assertEquals(dataSql, context.getString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL)); -// } -// -// private void createTable(String tableName) { -// try { -// executor.executeUpdate("DROP TABLE " + tableName); -// } catch(SqoopException e) { -// //Ok to fail as the table might not exist -// } -// executor.executeUpdate("CREATE TABLE " + tableName + -// "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); -// } -// -// public void testNonExistingStageTable() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ExportJobConfiguration jobConf = new ExportJobConfiguration(); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.tableName = schemalessTableName; -// jobConf.fromTable.stageTableName = stageTableName; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcExportInitializer(); -// try { -// initializer.initialize(initializerContext, connConf, jobConf); -// fail("Initialization should fail for non-existing stage table."); -// } catch(SqoopException se) { -// //expected -// } -// } -// -// @SuppressWarnings("unchecked") -// public void testNonEmptyStageTable() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ExportJobConfiguration jobConf = new ExportJobConfiguration(); -// -// String fullStageTableName = executor.delimitIdentifier(stageTableName); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.tableName = schemalessTableName; -// jobConf.fromTable.stageTableName = stageTableName; -// createTable(fullStageTableName); -// executor.executeUpdate("INSERT INTO " + fullStageTableName + -// " VALUES(1, 1.1, 'one')"); -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcExportInitializer(); -// try { -// initializer.initialize(initializerContext, connConf, jobConf); -// fail("Initialization should fail for non-empty stage table."); -// } catch(SqoopException se) { -// //expected -// } -// } -// -// @SuppressWarnings("unchecked") -// public void testClearStageTableValidation() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ExportJobConfiguration jobConf = new ExportJobConfiguration(); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// //specifying clear stage table flag without specifying name of -// // the stage table -// jobConf.fromTable.tableName = schemalessTableName; -// jobConf.fromTable.clearStageTable = false; -// GenericJdbcValidator validator = new GenericJdbcValidator(); -// Validation validation = validator.validateJob(MJob.Type.EXPORT, jobConf); -// assertEquals("User should not specify clear stage table flag without " + -// "specifying name of the stage table", -// Status.UNACCEPTABLE, -// validation.getStatus()); -// assertTrue(validation.getMessages().containsKey( -// new Validation.FormInput("fromTable"))); -// -// jobConf.fromTable.clearStageTable = true; -// validation = validator.validateJob(MJob.Type.EXPORT, jobConf); -// assertEquals("User should not specify clear stage table flag without " + -// "specifying name of the stage table", -// Status.UNACCEPTABLE, -// validation.getStatus()); -// assertTrue(validation.getMessages().containsKey( -// new Validation.FormInput("fromTable"))); -// } -// -// @SuppressWarnings("unchecked") -// public void testStageTableWithoutTable() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ExportJobConfiguration jobConf = new ExportJobConfiguration(); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// //specifying stage table without specifying table name -// jobConf.fromTable.stageTableName = stageTableName; -// jobConf.fromTable.sql = ""; -// -// GenericJdbcValidator validator = new GenericJdbcValidator(); -// Validation validation = validator.validateJob(MJob.Type.EXPORT, jobConf); -// assertEquals("Stage table name cannot be specified without specifying " + -// "table name", Status.UNACCEPTABLE, validation.getStatus()); -// assertTrue(validation.getMessages().containsKey( -// new Validation.FormInput("fromTable"))); -// } -// -// @SuppressWarnings("unchecked") -// public void testClearStageTable() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ExportJobConfiguration jobConf = new ExportJobConfiguration(); -// -// String fullStageTableName = executor.delimitIdentifier(stageTableName); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.tableName = schemalessTableName; -// jobConf.fromTable.stageTableName = stageTableName; -// jobConf.fromTable.clearStageTable = true; -// createTable(fullStageTableName); -// executor.executeUpdate("INSERT INTO " + fullStageTableName + -// " VALUES(1, 1.1, 'one')"); -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcExportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// assertEquals("Stage table should have been cleared", 0, -// executor.getTableRowCount(stageTableName)); -// } -// -// @SuppressWarnings("unchecked") -// public void testStageTable() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ExportJobConfiguration jobConf = new ExportJobConfiguration(); -// -// String fullStageTableName = executor.delimitIdentifier(stageTableName); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.tableName = schemalessTableName; -// jobConf.fromTable.stageTableName = stageTableName; -// createTable(fullStageTableName); -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcExportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, "INSERT INTO " + fullStageTableName + -// " VALUES (?,?,?)"); -// } - -} diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java deleted file mode 100644 index 420e3ad0..00000000 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.jdbc; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.sql.ResultSet; -import java.util.Arrays; -import java.util.Collection; - -import org.apache.sqoop.common.MutableContext; -import org.apache.sqoop.common.MutableMapContext; -import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -//import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; -import org.apache.sqoop.etl.io.DataReader; -import org.apache.sqoop.job.etl.Loader; -import org.apache.sqoop.job.etl.LoaderContext; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -@RunWith(Parameterized.class) -public class TestExportLoader { - -// private final String tableName; -// -// private GenericJdbcExecutor executor; -// -// private static final int START = -50; -// -// private int numberOfRows; -// -// @Parameters -// public static Collection data() { -// return Arrays.asList(new Object[][] {{50}, {100}, {101}, {150}, {200}}); -// } -// -// public TestExportLoader(int numberOfRows) { -// this.numberOfRows = numberOfRows; -// tableName = getClass().getSimpleName().toUpperCase(); -// } -// -// @Before -// public void setUp() { -// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, -// GenericJdbcTestConstants.URL, null, null); -// -// if (!executor.existTable(tableName)) { -// executor.executeUpdate("CREATE TABLE " -// + executor.delimitIdentifier(tableName) -// + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); -// } else { -// executor.deleteTableData(tableName); -// } -// } -// -// @After -// public void tearDown() { -// executor.close(); -// } -// -// @Test -// public void testInsert() throws Exception { -// MutableContext context = new MutableMapContext(); -// -// ConnectionConfiguration connectionConfig = new ConnectionConfiguration(); -// -// connectionConfig.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connectionConfig.connection.connectionString = GenericJdbcTestConstants.URL; -// -// ExportJobConfiguration jobConfig = new ExportJobConfiguration(); -// -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL, -// "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?)"); -// -// Loader loader = new GenericJdbcExportLoader(); -// DummyReader reader = new DummyReader(); -// LoaderContext loaderContext = new LoaderContext(context, reader, null); -// loader.load(loaderContext, connectionConfig, jobConfig); -// -// int index = START; -// ResultSet rs = executor.executeQuery("SELECT * FROM " -// + executor.delimitIdentifier(tableName) + " ORDER BY ICOL"); -// while (rs.next()) { -// assertEquals(index, rs.getObject(1)); -// assertEquals((double) index, rs.getObject(2)); -// assertEquals(String.valueOf(index), rs.getObject(3)); -// index++; -// } -// assertEquals(numberOfRows, index-START); -// } -// -// public class DummyReader extends DataReader { -// int index = 0; -// -// @Override -// public Object[] readArrayRecord() { -// if (index < numberOfRows) { -// Object[] array = new Object[] { -// START + index, -// (double) (START + index), -// String.valueOf(START+index) }; -// index++; -// return array; -// } else { -// return null; -// } -// } -// -// @Override -// public String readTextRecord() { -// fail("This method should not be invoked."); -// return null; -// } -// -// @Override -// public Object readContent() throws Exception { -// fail("This method should not be invoked."); -// return null; -// } -// -// } - -} diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java new file mode 100644 index 00000000..2b1dec22 --- /dev/null +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import junit.framework.TestCase; + +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration; +import org.apache.sqoop.job.etl.Extractor; +import org.apache.sqoop.job.etl.ExtractorContext; +import org.apache.sqoop.etl.io.DataWriter; + +public class TestExtractor extends TestCase { + + private final String tableName; + + private GenericJdbcExecutor executor; + + private static final int START = -50; + private static final int NUMBER_OF_ROWS = 101; + + public TestExtractor() { + tableName = getClass().getSimpleName().toUpperCase(); + } + + @Override + public void setUp() { + executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, + GenericJdbcTestConstants.URL, null, null); + + if (!executor.existTable(tableName)) { + executor.executeUpdate("CREATE TABLE " + + executor.delimitIdentifier(tableName) + + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + + for (int i = 0; i < NUMBER_OF_ROWS; i++) { + int value = START + i; + String sql = "INSERT INTO " + executor.delimitIdentifier(tableName) + + " VALUES(" + value + ", " + value + ", '" + value + "')"; + executor.executeUpdate(sql); + } + } + } + + @Override + public void tearDown() { + executor.close(); + } + + public void testQuery() throws Exception { + MutableContext context = new MutableMapContext(); + + ConnectionConfiguration connectionConfig = new ConnectionConfiguration(); + + connectionConfig.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connectionConfig.connection.connectionString = GenericJdbcTestConstants.URL; + + FromJobConfiguration jobConfig = new FromJobConfiguration(); + + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL, + "SELECT * FROM " + executor.delimitIdentifier(tableName) + " WHERE ${CONDITIONS}"); + + GenericJdbcPartition partition; + + Extractor extractor = new GenericJdbcExtractor(); + DummyWriter writer = new DummyWriter(); + ExtractorContext extractorContext = new ExtractorContext(context, writer, null); + + partition = new GenericJdbcPartition(); + partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665"); + extractor.extract(extractorContext, connectionConfig, jobConfig, partition); + + partition = new GenericJdbcPartition(); + partition.setConditions("-16.6666666666666665 <= DCOL AND DCOL < 16.666666666666667"); + extractor.extract(extractorContext, connectionConfig, jobConfig, partition); + + partition = new GenericJdbcPartition(); + partition.setConditions("16.666666666666667 <= DCOL AND DCOL <= 50.0"); + extractor.extract(extractorContext, connectionConfig, jobConfig, partition); + } + + public void testSubquery() throws Exception { + MutableContext context = new MutableMapContext(); + + ConnectionConfiguration connectionConfig = new ConnectionConfiguration(); + + connectionConfig.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connectionConfig.connection.connectionString = GenericJdbcTestConstants.URL; + + FromJobConfiguration jobConfig = new FromJobConfiguration(); + + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL, + "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " + + "(SELECT * FROM " + executor.delimitIdentifier(tableName) + + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS"); + + GenericJdbcPartition partition; + + Extractor extractor = new GenericJdbcExtractor(); + DummyWriter writer = new DummyWriter(); + ExtractorContext extractorContext = new ExtractorContext(context, writer, null); + + partition = new GenericJdbcPartition(); + partition.setConditions("-50 <= ICOL AND ICOL < -16"); + extractor.extract(extractorContext, connectionConfig, jobConfig, partition); + + partition = new GenericJdbcPartition(); + partition.setConditions("-16 <= ICOL AND ICOL < 17"); + extractor.extract(extractorContext, connectionConfig, jobConfig, partition); + + partition = new GenericJdbcPartition(); + partition.setConditions("17 <= ICOL AND ICOL < 50"); + extractor.extract(extractorContext, connectionConfig, jobConfig, partition); + } + + public class DummyWriter extends DataWriter { + int indx = START; + + @Override + public void writeArrayRecord(Object[] array) { + for (int i = 0; i < array.length; i++) { + if (array[i] instanceof Integer) { + assertEquals(indx, ((Integer)array[i]).intValue()); + } else if (array[i] instanceof Double) { + assertEquals((double)indx, ((Double)array[i]).doubleValue()); + } else { + assertEquals(String.valueOf(indx), array[i].toString()); + } + } + indx++; + } + + @Override + public void writeStringRecord(String text) { + fail("This method should not be invoked."); + } + + @Override + public void writeRecord(Object content) { + fail("This method should not be invoked."); + } + } +} diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java new file mode 100644 index 00000000..2d071309 --- /dev/null +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java @@ -0,0 +1,404 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import java.sql.Types; + +import junit.framework.TestCase; + +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration; +import org.apache.sqoop.job.Constants; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.FloatingPoint; +import org.apache.sqoop.schema.type.Text; + +public class TestFromInitializer extends TestCase { + + private final String schemaName; + private final String tableName; + private final String schemalessTableName; + private final String tableSql; + private final String schemalessTableSql; + private final String tableColumns; + + private GenericJdbcExecutor executor; + + private static final int START = -50; + private static final int NUMBER_OF_ROWS = 101; + + public TestFromInitializer() { + schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA"; + tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA"; + schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE"; + tableSql = "SELECT * FROM " + schemaName + "." + tableName + " WHERE ${CONDITIONS}"; + schemalessTableSql = "SELECT * FROM " + schemalessTableName + " WHERE ${CONDITIONS}"; + tableColumns = "ICOL,VCOL"; + } + + @Override + public void setUp() { + executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, + GenericJdbcTestConstants.URL, null, null); + + String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + if (!executor.existTable(tableName)) { + executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName)); + executor.executeUpdate("CREATE TABLE " + + fullTableName + + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + + for (int i = 0; i < NUMBER_OF_ROWS; i++) { + int value = START + i; + String sql = "INSERT INTO " + fullTableName + + " VALUES(" + value + ", " + value + ", '" + value + "')"; + executor.executeUpdate(sql); + } + } + + fullTableName = executor.delimitIdentifier(schemalessTableName); + if (!executor.existTable(schemalessTableName)) { + executor.executeUpdate("CREATE TABLE " + + fullTableName + + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + + for (int i = 0; i < NUMBER_OF_ROWS; i++) { + int value = START + i; + String sql = "INSERT INTO " + fullTableName + + " VALUES(" + value + ", " + value + ", '" + value + "')"; + executor.executeUpdate(sql); + } + } + } + + /** + * Return Schema representation for the testing table. + * + * @param name Name that should be used for the generated schema. + * @return + */ + public Schema getSchema(String name) { + return new Schema(name) + .addColumn(new FixedPoint("ICOL")) + .addColumn(new FloatingPoint("DCOL")) + .addColumn(new Text("VCOL")) + ; + } + + @Override + public void tearDown() { + executor.close(); + } + + @SuppressWarnings("unchecked") + public void testTableName() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.fromTable.tableName = schemalessTableName; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcFromInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, + "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + + " WHERE ${CONDITIONS}", + "ICOL,DCOL,VCOL", + "ICOL", + String.valueOf(Types.INTEGER), + String.valueOf(START), + String.valueOf(START+NUMBER_OF_ROWS-1)); + } + + @SuppressWarnings("unchecked") + public void testTableNameWithTableColumns() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.fromTable.tableName = schemalessTableName; + jobConf.fromTable.columns = tableColumns; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcFromInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, + "SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(schemalessTableName) + + " WHERE ${CONDITIONS}", + tableColumns, + "ICOL", + String.valueOf(Types.INTEGER), + String.valueOf(START), + String.valueOf(START+NUMBER_OF_ROWS-1)); + } + + @SuppressWarnings("unchecked") + public void testTableSql() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.fromTable.sql = schemalessTableSql; + jobConf.fromTable.partitionColumn = "DCOL"; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcFromInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, + "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + + " WHERE ${CONDITIONS}", + "ICOL,DCOL,VCOL", + "DCOL", + String.valueOf(Types.DOUBLE), + String.valueOf((double)START), + String.valueOf((double)(START+NUMBER_OF_ROWS-1))); + } + + @SuppressWarnings("unchecked") + public void testTableSqlWithTableColumns() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.fromTable.sql = schemalessTableSql; + jobConf.fromTable.columns = tableColumns; + jobConf.fromTable.partitionColumn = "DCOL"; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcFromInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, + "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " + + "(SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS", + tableColumns, + "DCOL", + String.valueOf(Types.DOUBLE), + String.valueOf((double)START), + String.valueOf((double)(START+NUMBER_OF_ROWS-1))); + } + + @SuppressWarnings("unchecked") + public void testTableNameWithSchema() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.fromTable.schemaName = schemaName; + jobConf.fromTable.tableName = tableName; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcFromInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, + "SELECT * FROM " + fullTableName + + " WHERE ${CONDITIONS}", + "ICOL,DCOL,VCOL", + "ICOL", + String.valueOf(Types.INTEGER), + String.valueOf(START), + String.valueOf(START+NUMBER_OF_ROWS-1)); + } + + @SuppressWarnings("unchecked") + public void testTableNameWithTableColumnsWithSchema() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.fromTable.schemaName = schemaName; + jobConf.fromTable.tableName = tableName; + jobConf.fromTable.columns = tableColumns; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcFromInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, + "SELECT ICOL,VCOL FROM " + fullTableName + + " WHERE ${CONDITIONS}", + tableColumns, + "ICOL", + String.valueOf(Types.INTEGER), + String.valueOf(START), + String.valueOf(START+NUMBER_OF_ROWS-1)); + } + + @SuppressWarnings("unchecked") + public void testTableSqlWithSchema() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.fromTable.schemaName = schemaName; + jobConf.fromTable.sql = tableSql; + jobConf.fromTable.partitionColumn = "DCOL"; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcFromInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, + "SELECT * FROM " + fullTableName + + " WHERE ${CONDITIONS}", + "ICOL,DCOL,VCOL", + "DCOL", + String.valueOf(Types.DOUBLE), + String.valueOf((double)START), + String.valueOf((double)(START+NUMBER_OF_ROWS-1))); + } + + + @SuppressWarnings("unchecked") + public void testGetSchemaForTable() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.fromTable.schemaName = schemaName; + jobConf.fromTable.tableName = tableName; + jobConf.fromTable.partitionColumn = "DCOL"; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcFromInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + Schema schema = initializer.getSchema(initializerContext, connConf, jobConf); + assertEquals(getSchema(jobConf.fromTable.schemaName + "." + tableName), schema); + } + + @SuppressWarnings("unchecked") + public void testGetSchemaForSql() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.fromTable.schemaName = schemaName; + jobConf.fromTable.sql = tableSql; + jobConf.fromTable.partitionColumn = "DCOL"; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcFromInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + Schema schema = initializer.getSchema(initializerContext, connConf, jobConf); + assertEquals(getSchema("Query"), schema); + } + + @SuppressWarnings("unchecked") + public void testTableSqlWithTableColumnsWithSchema() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.fromTable.schemaName = schemaName; + jobConf.fromTable.sql = tableSql; + jobConf.fromTable.columns = tableColumns; + jobConf.fromTable.partitionColumn = "DCOL"; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcFromInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, + "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " + + "(SELECT * FROM " + fullTableName + + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS", + tableColumns, + "DCOL", + String.valueOf(Types.DOUBLE), + String.valueOf((double)START), + String.valueOf((double)(START+NUMBER_OF_ROWS-1))); + } + + private void verifyResult(MutableContext context, + String dataSql, String fieldNames, + String partitionColumnName, String partitionColumnType, + String partitionMinValue, String partitionMaxValue) { + assertEquals(dataSql, context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL)); + assertEquals(fieldNames, context.getString( + Constants.JOB_ETL_FIELD_NAMES)); + + assertEquals(partitionColumnName, context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME)); + assertEquals(partitionColumnType, context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE)); + assertEquals(partitionMinValue, context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE)); + assertEquals(partitionMaxValue, context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE)); + } +} diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java deleted file mode 100644 index 8ded5a49..00000000 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.jdbc; - -import junit.framework.TestCase; - -import org.apache.sqoop.common.MutableContext; -import org.apache.sqoop.common.MutableMapContext; -import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -//import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; -import org.apache.sqoop.job.etl.Extractor; -import org.apache.sqoop.job.etl.ExtractorContext; -import org.apache.sqoop.etl.io.DataWriter; - -public class TestImportExtractor extends TestCase { - -// private final String tableName; -// -// private GenericJdbcExecutor executor; -// -// private static final int START = -50; -// private static final int NUMBER_OF_ROWS = 101; -// -// public TestImportExtractor() { -// tableName = getClass().getSimpleName().toUpperCase(); -// } -// -// @Override -// public void setUp() { -// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, -// GenericJdbcTestConstants.URL, null, null); -// -// if (!executor.existTable(tableName)) { -// executor.executeUpdate("CREATE TABLE " -// + executor.delimitIdentifier(tableName) -// + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); -// -// for (int i = 0; i < NUMBER_OF_ROWS; i++) { -// int value = START + i; -// String sql = "INSERT INTO " + executor.delimitIdentifier(tableName) -// + " VALUES(" + value + ", " + value + ", '" + value + "')"; -// executor.executeUpdate(sql); -// } -// } -// } -// -// @Override -// public void tearDown() { -// executor.close(); -// } -// -// public void testQuery() throws Exception { -// MutableContext context = new MutableMapContext(); -// -// ConnectionConfiguration connectionConfig = new ConnectionConfiguration(); -// -// connectionConfig.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connectionConfig.connection.connectionString = GenericJdbcTestConstants.URL; -// -// ImportJobConfiguration jobConfig = new ImportJobConfiguration(); -// -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL, -// "SELECT * FROM " + executor.delimitIdentifier(tableName) + " WHERE ${CONDITIONS}"); -// -// GenericJdbcImportPartition partition; -// -// Extractor extractor = new GenericJdbcImportExtractor(); -// DummyWriter writer = new DummyWriter(); -// ExtractorContext extractorContext = new ExtractorContext(context, writer, null); -// -// partition = new GenericJdbcImportPartition(); -// partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665"); -// extractor.extract(extractorContext, connectionConfig, jobConfig, partition); -// -// partition = new GenericJdbcImportPartition(); -// partition.setConditions("-16.6666666666666665 <= DCOL AND DCOL < 16.666666666666667"); -// extractor.extract(extractorContext, connectionConfig, jobConfig, partition); -// -// partition = new GenericJdbcImportPartition(); -// partition.setConditions("16.666666666666667 <= DCOL AND DCOL <= 50.0"); -// extractor.extract(extractorContext, connectionConfig, jobConfig, partition); -// } -// -// public void testSubquery() throws Exception { -// MutableContext context = new MutableMapContext(); -// -// ConnectionConfiguration connectionConfig = new ConnectionConfiguration(); -// -// connectionConfig.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connectionConfig.connection.connectionString = GenericJdbcTestConstants.URL; -// -// ImportJobConfiguration jobConfig = new ImportJobConfiguration(); -// -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL, -// "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " -// + "(SELECT * FROM " + executor.delimitIdentifier(tableName) -// + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS"); -// -// GenericJdbcImportPartition partition; -// -// Extractor extractor = new GenericJdbcImportExtractor(); -// DummyWriter writer = new DummyWriter(); -// ExtractorContext extractorContext = new ExtractorContext(context, writer, null); -// -// partition = new GenericJdbcImportPartition(); -// partition.setConditions("-50 <= ICOL AND ICOL < -16"); -// extractor.extract(extractorContext, connectionConfig, jobConfig, partition); -// -// partition = new GenericJdbcImportPartition(); -// partition.setConditions("-16 <= ICOL AND ICOL < 17"); -// extractor.extract(extractorContext, connectionConfig, jobConfig, partition); -// -// partition = new GenericJdbcImportPartition(); -// partition.setConditions("17 <= ICOL AND ICOL < 50"); -// extractor.extract(extractorContext, connectionConfig, jobConfig, partition); -// } -// -// public class DummyWriter extends DataWriter { -// int indx = START; -// -// @Override -// public void writeArrayRecord(Object[] array) { -// for (int i = 0; i < array.length; i++) { -// if (array[i] instanceof Integer) { -// assertEquals(indx, ((Integer)array[i]).intValue()); -// } else if (array[i] instanceof Double) { -// assertEquals((double)indx, ((Double)array[i]).doubleValue()); -// } else { -// assertEquals(String.valueOf(indx), array[i].toString()); -// } -// } -// indx++; -// } -// -// @Override -// public void writeStringRecord(String text) { -// fail("This method should not be invoked."); -// } -// -// @Override -// public void writeRecord(Object content) { -// fail("This method should not be invoked."); -// } -// } -} diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java deleted file mode 100644 index cd834e8b..00000000 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java +++ /dev/null @@ -1,404 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.jdbc; - -import java.sql.Types; - -import junit.framework.TestCase; - -import org.apache.sqoop.common.MutableContext; -import org.apache.sqoop.common.MutableMapContext; -import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -//import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; -import org.apache.sqoop.job.Constants; -import org.apache.sqoop.job.etl.Initializer; -import org.apache.sqoop.job.etl.InitializerContext; -import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.schema.type.FixedPoint; -import org.apache.sqoop.schema.type.FloatingPoint; -import org.apache.sqoop.schema.type.Text; - -public class TestImportInitializer extends TestCase { - -// private final String schemaName; -// private final String tableName; -// private final String schemalessTableName; -// private final String tableSql; -// private final String schemalessTableSql; -// private final String tableColumns; -// -// private GenericJdbcExecutor executor; -// -// private static final int START = -50; -// private static final int NUMBER_OF_ROWS = 101; -// -// public TestImportInitializer() { -// schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA"; -// tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA"; -// schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE"; -// tableSql = "SELECT * FROM " + schemaName + "." + tableName + " WHERE ${CONDITIONS}"; -// schemalessTableSql = "SELECT * FROM " + schemalessTableName + " WHERE ${CONDITIONS}"; -// tableColumns = "ICOL,VCOL"; -// } -// -// @Override -// public void setUp() { -// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, -// GenericJdbcTestConstants.URL, null, null); -// -// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); -// if (!executor.existTable(tableName)) { -// executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName)); -// executor.executeUpdate("CREATE TABLE " -// + fullTableName -// + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); -// -// for (int i = 0; i < NUMBER_OF_ROWS; i++) { -// int value = START + i; -// String sql = "INSERT INTO " + fullTableName -// + " VALUES(" + value + ", " + value + ", '" + value + "')"; -// executor.executeUpdate(sql); -// } -// } -// -// fullTableName = executor.delimitIdentifier(schemalessTableName); -// if (!executor.existTable(schemalessTableName)) { -// executor.executeUpdate("CREATE TABLE " -// + fullTableName -// + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); -// -// for (int i = 0; i < NUMBER_OF_ROWS; i++) { -// int value = START + i; -// String sql = "INSERT INTO " + fullTableName -// + " VALUES(" + value + ", " + value + ", '" + value + "')"; -// executor.executeUpdate(sql); -// } -// } -// } -// -// /** -// * Return Schema representation for the testing fromTable. -// * -// * @param name Name that should be used for the generated schema. -// * @return -// */ -// public Schema getSchema(String name) { -// return new Schema(name) -// .addColumn(new FixedPoint("ICOL")) -// .addColumn(new FloatingPoint("DCOL")) -// .addColumn(new Text("VCOL")) -// ; -// } -// -// @Override -// public void tearDown() { -// executor.close(); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableName() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.tableName = schemalessTableName; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, -// "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) -// + " WHERE ${CONDITIONS}", -// "ICOL,DCOL,VCOL", -// "ICOL", -// String.valueOf(Types.INTEGER), -// String.valueOf(START), -// String.valueOf(START+NUMBER_OF_ROWS-1)); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableNameWithTableColumns() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.tableName = schemalessTableName; -// jobConf.fromTable.columns = tableColumns; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, -// "SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(schemalessTableName) -// + " WHERE ${CONDITIONS}", -// tableColumns, -// "ICOL", -// String.valueOf(Types.INTEGER), -// String.valueOf(START), -// String.valueOf(START+NUMBER_OF_ROWS-1)); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableSql() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.sql = schemalessTableSql; -// jobConf.fromTable.partitionColumn = "DCOL"; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, -// "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) -// + " WHERE ${CONDITIONS}", -// "ICOL,DCOL,VCOL", -// "DCOL", -// String.valueOf(Types.DOUBLE), -// String.valueOf((double)START), -// String.valueOf((double)(START+NUMBER_OF_ROWS-1))); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableSqlWithTableColumns() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.sql = schemalessTableSql; -// jobConf.fromTable.columns = tableColumns; -// jobConf.fromTable.partitionColumn = "DCOL"; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, -// "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " -// + "(SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) -// + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS", -// tableColumns, -// "DCOL", -// String.valueOf(Types.DOUBLE), -// String.valueOf((double)START), -// String.valueOf((double)(START+NUMBER_OF_ROWS-1))); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableNameWithSchema() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.schemaName = schemaName; -// jobConf.fromTable.tableName = tableName; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, -// "SELECT * FROM " + fullTableName -// + " WHERE ${CONDITIONS}", -// "ICOL,DCOL,VCOL", -// "ICOL", -// String.valueOf(Types.INTEGER), -// String.valueOf(START), -// String.valueOf(START+NUMBER_OF_ROWS-1)); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableNameWithTableColumnsWithSchema() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.schemaName = schemaName; -// jobConf.fromTable.tableName = tableName; -// jobConf.fromTable.columns = tableColumns; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, -// "SELECT ICOL,VCOL FROM " + fullTableName -// + " WHERE ${CONDITIONS}", -// tableColumns, -// "ICOL", -// String.valueOf(Types.INTEGER), -// String.valueOf(START), -// String.valueOf(START+NUMBER_OF_ROWS-1)); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableSqlWithSchema() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.schemaName = schemaName; -// jobConf.fromTable.sql = tableSql; -// jobConf.fromTable.partitionColumn = "DCOL"; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, -// "SELECT * FROM " + fullTableName -// + " WHERE ${CONDITIONS}", -// "ICOL,DCOL,VCOL", -// "DCOL", -// String.valueOf(Types.DOUBLE), -// String.valueOf((double)START), -// String.valueOf((double)(START+NUMBER_OF_ROWS-1))); -// } -// -// -// @SuppressWarnings("unchecked") -// public void testGetSchemaForTable() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.schemaName = schemaName; -// jobConf.fromTable.tableName = tableName; -// jobConf.fromTable.partitionColumn = "DCOL"; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// Schema schema = initializer.getSchema(initializerContext, connConf, jobConf); -// assertEquals(getSchema(jobConf.table.schemaName + "." + tableName), schema); -// } -// -// @SuppressWarnings("unchecked") -// public void testGetSchemaForSql() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.schemaName = schemaName; -// jobConf.fromTable.sql = tableSql; -// jobConf.fromTable.partitionColumn = "DCOL"; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// Schema schema = initializer.getSchema(initializerContext, connConf, jobConf); -// assertEquals(getSchema("Query"), schema); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableSqlWithTableColumnsWithSchema() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.schemaName = schemaName; -// jobConf.fromTable.sql = tableSql; -// jobConf.fromTable.columns = tableColumns; -// jobConf.fromTable.partitionColumn = "DCOL"; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, -// "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " -// + "(SELECT * FROM " + fullTableName -// + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS", -// tableColumns, -// "DCOL", -// String.valueOf(Types.DOUBLE), -// String.valueOf((double)START), -// String.valueOf((double)(START+NUMBER_OF_ROWS-1))); -// } -// -// private void verifyResult(MutableContext context, -// String dataSql, String fieldNames, -// String partitionColumnName, String partitionColumnType, -// String partitionMinValue, String partitionMaxValue) { -// assertEquals(dataSql, context.getString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL)); -// assertEquals(fieldNames, context.getString( -// Constants.JOB_ETL_FIELD_NAMES)); -// -// assertEquals(partitionColumnName, context.getString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME)); -// assertEquals(partitionColumnType, context.getString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE)); -// assertEquals(partitionMinValue, context.getString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE)); -// assertEquals(partitionMaxValue, context.getString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE)); -// } -} diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java deleted file mode 100644 index 958f75f7..00000000 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java +++ /dev/null @@ -1,505 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.jdbc; - -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; -import java.sql.Types; -import java.util.Iterator; -import java.util.List; - -import junit.framework.TestCase; - -import org.apache.sqoop.common.MutableContext; -import org.apache.sqoop.common.MutableMapContext; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -//import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; -import org.apache.sqoop.job.Constants; -import org.apache.sqoop.job.etl.Partition; -import org.apache.sqoop.job.etl.Partitioner; -import org.apache.sqoop.job.etl.PartitionerContext; - -public class TestImportPartitioner extends TestCase { - -// private static final int START = -5; -// private static final int NUMBER_OF_ROWS = 11; -// -// public void testIntegerEvenPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, -// "ICOL"); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -// String.valueOf(Types.INTEGER)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, -// String.valueOf(START)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, -// String.valueOf(START + NUMBER_OF_ROWS - 1)); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); -// List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[] { -// "-5 <= ICOL AND ICOL < -3", -// "-3 <= ICOL AND ICOL < -1", -// "-1 <= ICOL AND ICOL < 1", -// "1 <= ICOL AND ICOL < 3", -// "3 <= ICOL AND ICOL <= 5" -// }); -// } -// -// public void testIntegerUnevenPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, -// "ICOL"); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -// String.valueOf(Types.INTEGER)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, -// String.valueOf(START)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, -// String.valueOf(START + NUMBER_OF_ROWS - 1)); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); -// List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[] { -// "-5 <= ICOL AND ICOL < -1", -// "-1 <= ICOL AND ICOL < 2", -// "2 <= ICOL AND ICOL <= 5" -// }); -// } -// -// public void testIntegerOverPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, -// "ICOL"); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -// String.valueOf(Types.INTEGER)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, -// String.valueOf(START)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, -// String.valueOf(START + NUMBER_OF_ROWS - 1)); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 13, null); -// List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[] { -// "-5 <= ICOL AND ICOL < -4", -// "-4 <= ICOL AND ICOL < -3", -// "-3 <= ICOL AND ICOL < -2", -// "-2 <= ICOL AND ICOL < -1", -// "-1 <= ICOL AND ICOL < 0", -// "0 <= ICOL AND ICOL < 1", -// "1 <= ICOL AND ICOL < 2", -// "2 <= ICOL AND ICOL < 3", -// "3 <= ICOL AND ICOL < 4", -// "4 <= ICOL AND ICOL <= 5" -// }); -// } -// -// public void testFloatingPointEvenPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, -// "DCOL"); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -// String.valueOf(Types.DOUBLE)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, -// String.valueOf((double)START)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, -// String.valueOf((double)(START + NUMBER_OF_ROWS - 1))); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); -// List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[] { -// "-5.0 <= DCOL AND DCOL < -3.0", -// "-3.0 <= DCOL AND DCOL < -1.0", -// "-1.0 <= DCOL AND DCOL < 1.0", -// "1.0 <= DCOL AND DCOL < 3.0", -// "3.0 <= DCOL AND DCOL <= 5.0" -// }); -// } -// -// public void testFloatingPointUnevenPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, -// "DCOL"); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -// String.valueOf(Types.DOUBLE)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, -// String.valueOf((double)START)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, -// String.valueOf((double)(START + NUMBER_OF_ROWS - 1))); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); -// List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[] { -// "-5.0 <= DCOL AND DCOL < -1.6666666666666665", -// "-1.6666666666666665 <= DCOL AND DCOL < 1.666666666666667", -// "1.666666666666667 <= DCOL AND DCOL <= 5.0" -// }); -// } -// -// public void testNumericEvenPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "ICOL"); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC)); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(START)); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(START + NUMBER_OF_ROWS - 1)); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); -// List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[] { -// "-5 <= ICOL AND ICOL < -3", -// "-3 <= ICOL AND ICOL < -1", -// "-1 <= ICOL AND ICOL < 1", -// "1 <= ICOL AND ICOL < 3", -// "3 <= ICOL AND ICOL <= 5" -// }); -// } -// -// public void testNumericUnevenPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL"); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC)); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(START))); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(START + NUMBER_OF_ROWS - 1))); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); -// List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[]{ -// "-5 <= DCOL AND DCOL < -2", -// "-2 <= DCOL AND DCOL < 1", -// "1 <= DCOL AND DCOL <= 5" -// }); -// } -// -// public void testNumericSinglePartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL"); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC)); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(START))); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(START))); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); -// List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[]{ -// "DCOL = -5", -// }); -// } -// -// -// public void testDatePartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.DATE)); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, -// Date.valueOf("2004-10-20").toString()); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MAXVALUE, Date.valueOf("2013-10-17") -// .toString()); -// -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); -// List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// -// verifyResult(partitions, new String[]{ -// "'2004-10-20' <= DCOL AND DCOL < '2007-10-19'", -// "'2007-10-19' <= DCOL AND DCOL < '2010-10-18'", -// "'2010-10-18' <= DCOL AND DCOL <= '2013-10-17'", -// }); -// -// } -// -// public void testTimePartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TCOL"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIME)); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, -// Time.valueOf("01:01:01").toString()); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, -// Time.valueOf("10:40:50").toString()); -// -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); -// List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[]{ -// "'01:01:01' <= TCOL AND TCOL < '04:14:17'", -// "'04:14:17' <= TCOL AND TCOL < '07:27:33'", -// "'07:27:33' <= TCOL AND TCOL <= '10:40:50'", -// }); -// } -// -// public void testTimestampPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TSCOL"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIMESTAMP)); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, -// Timestamp.valueOf("2013-01-01 01:01:01.123").toString()); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, -// Timestamp.valueOf("2013-12-31 10:40:50.654").toString()); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); -// List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// verifyResult(partitions, new String[]{ -// "'2013-01-01 01:01:01.123' <= TSCOL AND TSCOL < '2013-05-02 12:14:17.634'", -// "'2013-05-02 12:14:17.634' <= TSCOL AND TSCOL < '2013-08-31 23:27:34.144'", -// "'2013-08-31 23:27:34.144' <= TSCOL AND TSCOL <= '2013-12-31 10:40:50.654'", -// }); -// } -// -// public void testBooleanPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "BCOL"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.BOOLEAN)); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MINVALUE, "0"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MAXVALUE, "1"); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); -// List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// verifyResult(partitions, new String[]{ -// "BCOL = TRUE", -// "BCOL = FALSE", -// }); -// } -// -// public void testVarcharPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR)); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MINVALUE, "A"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MAXVALUE, "Z"); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 25, null); -// List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[] { -// "'A' <= VCCOL AND VCCOL < 'B'", -// "'B' <= VCCOL AND VCCOL < 'C'", -// "'C' <= VCCOL AND VCCOL < 'D'", -// "'D' <= VCCOL AND VCCOL < 'E'", -// "'E' <= VCCOL AND VCCOL < 'F'", -// "'F' <= VCCOL AND VCCOL < 'G'", -// "'G' <= VCCOL AND VCCOL < 'H'", -// "'H' <= VCCOL AND VCCOL < 'I'", -// "'I' <= VCCOL AND VCCOL < 'J'", -// "'J' <= VCCOL AND VCCOL < 'K'", -// "'K' <= VCCOL AND VCCOL < 'L'", -// "'L' <= VCCOL AND VCCOL < 'M'", -// "'M' <= VCCOL AND VCCOL < 'N'", -// "'N' <= VCCOL AND VCCOL < 'O'", -// "'O' <= VCCOL AND VCCOL < 'P'", -// "'P' <= VCCOL AND VCCOL < 'Q'", -// "'Q' <= VCCOL AND VCCOL < 'R'", -// "'R' <= VCCOL AND VCCOL < 'S'", -// "'S' <= VCCOL AND VCCOL < 'T'", -// "'T' <= VCCOL AND VCCOL < 'U'", -// "'U' <= VCCOL AND VCCOL < 'V'", -// "'V' <= VCCOL AND VCCOL < 'W'", -// "'W' <= VCCOL AND VCCOL < 'X'", -// "'X' <= VCCOL AND VCCOL < 'Y'", -// "'Y' <= VCCOL AND VCCOL <= 'Z'", -// }); -// } -// -// public void testVarcharPartition2() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR)); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MINVALUE, "Breezy Badger"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MAXVALUE, "Warty Warthog"); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); -// List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// assertEquals(partitions.size(), 5); -// // First partition needs to contain entire upper bound -// assertTrue(partitions.get(0).toString().contains("Breezy Badger")); -// // Last partition needs to contain entire lower bound -// assertTrue(partitions.get(4).toString().contains("Warty Warthog")); -// } -// -// public void testVarcharPartitionWithCommonPrefix() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR)); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MINVALUE, "AAA"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MAXVALUE, "AAF"); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); -// -// List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[] { -// "'AAA' <= VCCOL AND VCCOL < 'AAB'", -// "'AAB' <= VCCOL AND VCCOL < 'AAC'", -// "'AAC' <= VCCOL AND VCCOL < 'AAD'", -// "'AAD' <= VCCOL AND VCCOL < 'AAE'", -// "'AAE' <= VCCOL AND VCCOL <= 'AAF'", -// }); -// -// } -// -// public void testPatitionWithNullValues() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR)); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MINVALUE, "AAA"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MAXVALUE, "AAE"); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// jobConf.fromTable.partitionColumnNull = true; -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); -// -// List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[] { -// "VCCOL IS NULL", -// "'AAA' <= VCCOL AND VCCOL < 'AAB'", -// "'AAB' <= VCCOL AND VCCOL < 'AAC'", -// "'AAC' <= VCCOL AND VCCOL < 'AAD'", -// "'AAD' <= VCCOL AND VCCOL <= 'AAE'", -// }); -// -// } -// -// private void verifyResult(List partitions, -// String[] expected) { -// assertEquals(expected.length, partitions.size()); -// -// Iterator iterator = partitions.iterator(); -// for (int i = 0; i < expected.length; i++) { -// assertEquals(expected[i], -// ((GenericJdbcImportPartition)iterator.next()).getConditions()); -// } -// } -} diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java new file mode 100644 index 00000000..d7e8c6ca --- /dev/null +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.sql.ResultSet; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration; +import org.apache.sqoop.etl.io.DataReader; +import org.apache.sqoop.job.etl.Loader; +import org.apache.sqoop.job.etl.LoaderContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class TestLoader { + + private final String tableName; + + private GenericJdbcExecutor executor; + + private static final int START = -50; + + private int numberOfRows; + + @Parameters + public static Collection data() { + return Arrays.asList(new Object[][] {{50}, {100}, {101}, {150}, {200}}); + } + + public TestLoader(int numberOfRows) { + this.numberOfRows = numberOfRows; + tableName = getClass().getSimpleName().toUpperCase(); + } + + @Before + public void setUp() { + executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, + GenericJdbcTestConstants.URL, null, null); + + if (!executor.existTable(tableName)) { + executor.executeUpdate("CREATE TABLE " + + executor.delimitIdentifier(tableName) + + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + } else { + executor.deleteTableData(tableName); + } + } + + @After + public void tearDown() { + executor.close(); + } + + @Test + public void testInsert() throws Exception { + MutableContext context = new MutableMapContext(); + + ConnectionConfiguration connectionConfig = new ConnectionConfiguration(); + + connectionConfig.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connectionConfig.connection.connectionString = GenericJdbcTestConstants.URL; + + ToJobConfiguration jobConfig = new ToJobConfiguration(); + + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL, + "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?)"); + + Loader loader = new GenericJdbcLoader(); + DummyReader reader = new DummyReader(); + LoaderContext loaderContext = new LoaderContext(context, reader, null); + loader.load(loaderContext, connectionConfig, jobConfig); + + int index = START; + ResultSet rs = executor.executeQuery("SELECT * FROM " + + executor.delimitIdentifier(tableName) + " ORDER BY ICOL"); + while (rs.next()) { + assertEquals(index, rs.getObject(1)); + assertEquals((double) index, rs.getObject(2)); + assertEquals(String.valueOf(index), rs.getObject(3)); + index++; + } + assertEquals(numberOfRows, index-START); + } + + public class DummyReader extends DataReader { + int index = 0; + + @Override + public Object[] readArrayRecord() { + if (index < numberOfRows) { + Object[] array = new Object[] { + START + index, + (double) (START + index), + String.valueOf(START+index) }; + index++; + return array; + } else { + return null; + } + } + + @Override + public String readTextRecord() { + fail("This method should not be invoked."); + return null; + } + + @Override + public Object readContent() throws Exception { + fail("This method should not be invoked."); + return null; + } + + } +} diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java new file mode 100644 index 00000000..f1023c85 --- /dev/null +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java @@ -0,0 +1,503 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.Iterator; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration; +import org.apache.sqoop.job.etl.Partition; +import org.apache.sqoop.job.etl.Partitioner; +import org.apache.sqoop.job.etl.PartitionerContext; + +public class TestPartitioner extends TestCase { + + private static final int START = -5; + private static final int NUMBER_OF_ROWS = 11; + + public void testIntegerEvenPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, + "ICOL"); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, + String.valueOf(Types.INTEGER)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + String.valueOf(START)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + String.valueOf(START + NUMBER_OF_ROWS - 1)); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); + List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "-5 <= ICOL AND ICOL < -3", + "-3 <= ICOL AND ICOL < -1", + "-1 <= ICOL AND ICOL < 1", + "1 <= ICOL AND ICOL < 3", + "3 <= ICOL AND ICOL <= 5" + }); + } + + public void testIntegerUnevenPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, + "ICOL"); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, + String.valueOf(Types.INTEGER)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + String.valueOf(START)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + String.valueOf(START + NUMBER_OF_ROWS - 1)); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "-5 <= ICOL AND ICOL < -1", + "-1 <= ICOL AND ICOL < 2", + "2 <= ICOL AND ICOL <= 5" + }); + } + + public void testIntegerOverPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, + "ICOL"); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, + String.valueOf(Types.INTEGER)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + String.valueOf(START)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + String.valueOf(START + NUMBER_OF_ROWS - 1)); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 13, null); + List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "-5 <= ICOL AND ICOL < -4", + "-4 <= ICOL AND ICOL < -3", + "-3 <= ICOL AND ICOL < -2", + "-2 <= ICOL AND ICOL < -1", + "-1 <= ICOL AND ICOL < 0", + "0 <= ICOL AND ICOL < 1", + "1 <= ICOL AND ICOL < 2", + "2 <= ICOL AND ICOL < 3", + "3 <= ICOL AND ICOL < 4", + "4 <= ICOL AND ICOL <= 5" + }); + } + + public void testFloatingPointEvenPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, + "DCOL"); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, + String.valueOf(Types.DOUBLE)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + String.valueOf((double)START)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + String.valueOf((double)(START + NUMBER_OF_ROWS - 1))); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); + List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "-5.0 <= DCOL AND DCOL < -3.0", + "-3.0 <= DCOL AND DCOL < -1.0", + "-1.0 <= DCOL AND DCOL < 1.0", + "1.0 <= DCOL AND DCOL < 3.0", + "3.0 <= DCOL AND DCOL <= 5.0" + }); + } + + public void testFloatingPointUnevenPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, + "DCOL"); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, + String.valueOf(Types.DOUBLE)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + String.valueOf((double)START)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + String.valueOf((double)(START + NUMBER_OF_ROWS - 1))); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "-5.0 <= DCOL AND DCOL < -1.6666666666666665", + "-1.6666666666666665 <= DCOL AND DCOL < 1.666666666666667", + "1.666666666666667 <= DCOL AND DCOL <= 5.0" + }); + } + + public void testNumericEvenPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "ICOL"); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(START)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(START + NUMBER_OF_ROWS - 1)); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); + List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "-5 <= ICOL AND ICOL < -3", + "-3 <= ICOL AND ICOL < -1", + "-1 <= ICOL AND ICOL < 1", + "1 <= ICOL AND ICOL < 3", + "3 <= ICOL AND ICOL <= 5" + }); + } + + public void testNumericUnevenPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL"); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(START))); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(START + NUMBER_OF_ROWS - 1))); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[]{ + "-5 <= DCOL AND DCOL < -2", + "-2 <= DCOL AND DCOL < 1", + "1 <= DCOL AND DCOL <= 5" + }); + } + + public void testNumericSinglePartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL"); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(START))); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(START))); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[]{ + "DCOL = -5", + }); + } + + + public void testDatePartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.DATE)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + Date.valueOf("2004-10-20").toString()); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MAXVALUE, Date.valueOf("2013-10-17") + .toString()); + + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + + verifyResult(partitions, new String[]{ + "'2004-10-20' <= DCOL AND DCOL < '2007-10-19'", + "'2007-10-19' <= DCOL AND DCOL < '2010-10-18'", + "'2010-10-18' <= DCOL AND DCOL <= '2013-10-17'", + }); + + } + + public void testTimePartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIME)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + Time.valueOf("01:01:01").toString()); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + Time.valueOf("10:40:50").toString()); + + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[]{ + "'01:01:01' <= TCOL AND TCOL < '04:14:17'", + "'04:14:17' <= TCOL AND TCOL < '07:27:33'", + "'07:27:33' <= TCOL AND TCOL <= '10:40:50'", + }); + } + + public void testTimestampPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TSCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIMESTAMP)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + Timestamp.valueOf("2013-01-01 01:01:01.123").toString()); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + Timestamp.valueOf("2013-12-31 10:40:50.654").toString()); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + verifyResult(partitions, new String[]{ + "'2013-01-01 01:01:01.123' <= TSCOL AND TSCOL < '2013-05-02 12:14:17.634'", + "'2013-05-02 12:14:17.634' <= TSCOL AND TSCOL < '2013-08-31 23:27:34.144'", + "'2013-08-31 23:27:34.144' <= TSCOL AND TSCOL <= '2013-12-31 10:40:50.654'", + }); + } + + public void testBooleanPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "BCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.BOOLEAN)); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MINVALUE, "0"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MAXVALUE, "1"); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + verifyResult(partitions, new String[]{ + "BCOL = TRUE", + "BCOL = FALSE", + }); + } + + public void testVarcharPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR)); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MINVALUE, "A"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MAXVALUE, "Z"); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 25, null); + List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "'A' <= VCCOL AND VCCOL < 'B'", + "'B' <= VCCOL AND VCCOL < 'C'", + "'C' <= VCCOL AND VCCOL < 'D'", + "'D' <= VCCOL AND VCCOL < 'E'", + "'E' <= VCCOL AND VCCOL < 'F'", + "'F' <= VCCOL AND VCCOL < 'G'", + "'G' <= VCCOL AND VCCOL < 'H'", + "'H' <= VCCOL AND VCCOL < 'I'", + "'I' <= VCCOL AND VCCOL < 'J'", + "'J' <= VCCOL AND VCCOL < 'K'", + "'K' <= VCCOL AND VCCOL < 'L'", + "'L' <= VCCOL AND VCCOL < 'M'", + "'M' <= VCCOL AND VCCOL < 'N'", + "'N' <= VCCOL AND VCCOL < 'O'", + "'O' <= VCCOL AND VCCOL < 'P'", + "'P' <= VCCOL AND VCCOL < 'Q'", + "'Q' <= VCCOL AND VCCOL < 'R'", + "'R' <= VCCOL AND VCCOL < 'S'", + "'S' <= VCCOL AND VCCOL < 'T'", + "'T' <= VCCOL AND VCCOL < 'U'", + "'U' <= VCCOL AND VCCOL < 'V'", + "'V' <= VCCOL AND VCCOL < 'W'", + "'W' <= VCCOL AND VCCOL < 'X'", + "'X' <= VCCOL AND VCCOL < 'Y'", + "'Y' <= VCCOL AND VCCOL <= 'Z'", + }); + } + + public void testVarcharPartition2() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR)); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MINVALUE, "Breezy Badger"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MAXVALUE, "Warty Warthog"); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); + List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + assertEquals(partitions.size(), 5); + // First partition needs to contain entire upper bound + assertTrue(partitions.get(0).toString().contains("Breezy Badger")); + // Last partition needs to contain entire lower bound + assertTrue(partitions.get(4).toString().contains("Warty Warthog")); + } + + public void testVarcharPartitionWithCommonPrefix() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR)); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MINVALUE, "AAA"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MAXVALUE, "AAF"); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); + + List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "'AAA' <= VCCOL AND VCCOL < 'AAB'", + "'AAB' <= VCCOL AND VCCOL < 'AAC'", + "'AAC' <= VCCOL AND VCCOL < 'AAD'", + "'AAD' <= VCCOL AND VCCOL < 'AAE'", + "'AAE' <= VCCOL AND VCCOL <= 'AAF'", + }); + + } + + public void testPatitionWithNullValues() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR)); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MINVALUE, "AAA"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MAXVALUE, "AAE"); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + jobConf.fromTable.partitionColumnNull = true; + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); + + List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "VCCOL IS NULL", + "'AAA' <= VCCOL AND VCCOL < 'AAB'", + "'AAB' <= VCCOL AND VCCOL < 'AAC'", + "'AAC' <= VCCOL AND VCCOL < 'AAD'", + "'AAD' <= VCCOL AND VCCOL <= 'AAE'", + }); + + } + + private void verifyResult(List partitions, + String[] expected) { + assertEquals(expected.length, partitions.size()); + + Iterator iterator = partitions.iterator(); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], + ((GenericJdbcPartition)iterator.next()).getConditions()); + } + } +} diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java new file mode 100644 index 00000000..4831cf83 --- /dev/null +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import junit.framework.TestCase; +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.sqoop.validation.Status; +import org.apache.sqoop.validation.Validation; + +public class TestToInitializer extends TestCase { + private final String schemaName; + private final String tableName; + private final String schemalessTableName; + private final String stageTableName; + private final String tableSql; + private final String schemalessTableSql; + private final String tableColumns; + + private GenericJdbcExecutor executor; + + public TestToInitializer() { + schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA"; + tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA"; + schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE"; + stageTableName = getClass().getSimpleName().toUpperCase() + + "_STAGE_TABLE"; + tableSql = "INSERT INTO " + tableName + " VALUES (?,?,?)"; + schemalessTableSql = "INSERT INTO " + schemalessTableName + " VALUES (?,?,?)"; + tableColumns = "ICOL,VCOL"; + } + + @Override + public void setUp() { + executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, + GenericJdbcTestConstants.URL, null, null); + + String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + if (!executor.existTable(tableName)) { + executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName)); + executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + } + + fullTableName = executor.delimitIdentifier(schemalessTableName); + if (!executor.existTable(schemalessTableName)) { + executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + } + } + + @Override + public void tearDown() { + executor.close(); + } + + @SuppressWarnings("unchecked") + public void testTableName() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + String fullTableName = executor.delimitIdentifier(schemalessTableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.tableName = schemalessTableName; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)"); + } + + @SuppressWarnings("unchecked") + public void testTableNameWithTableColumns() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + String fullTableName = executor.delimitIdentifier(schemalessTableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.tableName = schemalessTableName; + jobConf.toTable.columns = tableColumns; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)"); + } + + @SuppressWarnings("unchecked") + public void testTableSql() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.sql = schemalessTableSql; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(schemalessTableName) + " VALUES (?,?,?)"); + } + + @SuppressWarnings("unchecked") + public void testTableNameWithSchema() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.schemaName = schemaName; + jobConf.toTable.tableName = tableName; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)"); + } + + @SuppressWarnings("unchecked") + public void testTableNameWithTableColumnsWithSchema() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.schemaName = schemaName; + jobConf.toTable.tableName = tableName; + jobConf.toTable.columns = tableColumns; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)"); + } + + @SuppressWarnings("unchecked") + public void testTableSqlWithSchema() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.schemaName = schemaName; + jobConf.toTable.sql = tableSql; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?)"); + } + + private void verifyResult(MutableContext context, String dataSql) { + assertEquals(dataSql, context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL)); + } + + private void createTable(String tableName) { + try { + executor.executeUpdate("DROP TABLE " + tableName); + } catch(SqoopException e) { + //Ok to fail as the table might not exist + } + executor.executeUpdate("CREATE TABLE " + tableName + + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + } + + public void testNonExistingStageTable() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.tableName = schemalessTableName; + jobConf.toTable.stageTableName = stageTableName; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + try { + initializer.initialize(initializerContext, connConf, jobConf); + fail("Initialization should fail for non-existing stage table."); + } catch(SqoopException se) { + //expected + } + } + + @SuppressWarnings("unchecked") + public void testNonEmptyStageTable() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + String fullStageTableName = executor.delimitIdentifier(stageTableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.tableName = schemalessTableName; + jobConf.toTable.stageTableName = stageTableName; + createTable(fullStageTableName); + executor.executeUpdate("INSERT INTO " + fullStageTableName + + " VALUES(1, 1.1, 'one')"); + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + try { + initializer.initialize(initializerContext, connConf, jobConf); + fail("Initialization should fail for non-empty stage table."); + } catch(SqoopException se) { + //expected + } + } + + @SuppressWarnings("unchecked") + public void testClearStageTableValidation() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + //specifying clear stage table flag without specifying name of + // the stage table + jobConf.toTable.tableName = schemalessTableName; + jobConf.toTable.clearStageTable = false; + GenericJdbcValidator validator = new GenericJdbcValidator(); + Validation validation = validator.validateJob(jobConf); + assertEquals("User should not specify clear stage table flag without " + + "specifying name of the stage table", + Status.UNACCEPTABLE, + validation.getStatus()); + assertTrue(validation.getMessages().containsKey( + new Validation.FormInput("toTable"))); + + jobConf.toTable.clearStageTable = true; + validation = validator.validateJob(jobConf); + assertEquals("User should not specify clear stage table flag without " + + "specifying name of the stage table", + Status.UNACCEPTABLE, + validation.getStatus()); + assertTrue(validation.getMessages().containsKey( + new Validation.FormInput("toTable"))); + } + + @SuppressWarnings("unchecked") + public void testStageTableWithoutTable() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + //specifying stage table without specifying table name + jobConf.toTable.stageTableName = stageTableName; + jobConf.toTable.sql = ""; + + GenericJdbcValidator validator = new GenericJdbcValidator(); + Validation validation = validator.validateJob(jobConf); + assertEquals("Stage table name cannot be specified without specifying " + + "table name", Status.UNACCEPTABLE, validation.getStatus()); + assertTrue(validation.getMessages().containsKey( + new Validation.FormInput("toTable"))); + } + + @SuppressWarnings("unchecked") + public void testClearStageTable() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + String fullStageTableName = executor.delimitIdentifier(stageTableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.tableName = schemalessTableName; + jobConf.toTable.stageTableName = stageTableName; + jobConf.toTable.clearStageTable = true; + createTable(fullStageTableName); + executor.executeUpdate("INSERT INTO " + fullStageTableName + + " VALUES(1, 1.1, 'one')"); + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + assertEquals("Stage table should have been cleared", 0, + executor.getTableRowCount(stageTableName)); + } + + @SuppressWarnings("unchecked") + public void testStageTable() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + String fullStageTableName = executor.delimitIdentifier(stageTableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.tableName = schemalessTableName; + jobConf.toTable.stageTableName = stageTableName; + createTable(fullStageTableName); + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, "INSERT INTO " + fullStageTableName + + " VALUES (?,?,?)"); + } +}