5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-16 00:41:23 +08:00

SQOOP-1463: Sqoop2: From/To: Re-enable generic-jdbc-connector test cases

(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2014-08-20 16:05:33 -07:00 committed by Abraham Elmahrek
parent 4d9513cea8
commit 2873a4b1ec
18 changed files with 1667 additions and 1665 deletions

View File

@ -42,9 +42,9 @@ public final class GenericJdbcConnectorConstants {
public static final String CONNECTOR_JDBC_PARTITION_MAXVALUE = public static final String CONNECTOR_JDBC_PARTITION_MAXVALUE =
PREFIX_CONNECTOR_JDBC_CONFIG + "partition.maxvalue"; PREFIX_CONNECTOR_JDBC_CONFIG + "partition.maxvalue";
public static final String CONNECTOR_FROM_JDBC_DATA_SQL = public static final String CONNECTOR_JDBC_FROM_DATA_SQL =
PREFIX_CONNECTOR_JDBC_CONFIG + "from.data.sql"; PREFIX_CONNECTOR_JDBC_CONFIG + "from.data.sql";
public static final String CONNECTOR_TO_JDBC_DATA_SQL = public static final String CONNECTOR_JDBC_TO_DATA_SQL =
PREFIX_CONNECTOR_JDBC_CONFIG + "to.data.sql"; PREFIX_CONNECTOR_JDBC_CONFIG + "to.data.sql";
public static final String SQL_CONDITIONS_TOKEN = "${CONDITIONS}"; public static final String SQL_CONDITIONS_TOKEN = "${CONDITIONS}";

View File

@ -79,7 +79,9 @@ public enum GenericJdbcConnectorError implements ErrorCode {
GENERIC_JDBC_CONNECTOR_0018("Error occurred while transferring data from " + GENERIC_JDBC_CONNECTOR_0018("Error occurred while transferring data from " +
"stage table to destination table."), "stage table to destination table."),
GENERIC_JDBC_CONNECTOR_0019("Table name extraction not supported.") GENERIC_JDBC_CONNECTOR_0019("Table name extraction not supported."),
GENERIC_JDBC_CONNECTOR_0020("Unknown direction.")
; ;
private final String message; private final String message;

View File

@ -41,7 +41,7 @@ public void extract(ExtractorContext context, ConnectionConfiguration connection
String password = connection.connection.password; String password = connection.connection.password;
GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password); GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password);
String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_FROM_JDBC_DATA_SQL); String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL);
String conditions = partition.getConditions(); String conditions = partition.getConditions();
query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions); query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions);
LOG.info("Using query: " + query); LOG.info("Using query: " + query);

View File

@ -80,7 +80,7 @@ public Schema getSchema(InitializerContext context, ConnectionConfiguration conn
ResultSetMetaData rsmt = null; ResultSetMetaData rsmt = null;
try { try {
rs = executor.executeQuery( rs = executor.executeQuery(
context.getString(GenericJdbcConnectorConstants.CONNECTOR_FROM_JDBC_DATA_SQL) context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL)
.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0") .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0")
); );
@ -316,7 +316,7 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur
LOG.info("Using dataSql: " + dataSql); LOG.info("Using dataSql: " + dataSql);
LOG.info("Field names: " + fieldNames); LOG.info("Field names: " + fieldNames);
context.setString(GenericJdbcConnectorConstants.CONNECTOR_FROM_JDBC_DATA_SQL, dataSql); context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL, dataSql);
context.setString(Constants.JOB_ETL_FIELD_NAMES, fieldNames); context.setString(Constants.JOB_ETL_FIELD_NAMES, fieldNames);
} }
} }

View File

@ -38,7 +38,7 @@ public void load(LoaderContext context, ConnectionConfiguration connection, ToJo
GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password); GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password);
executor.setAutoCommit(false); executor.setAutoCommit(false);
String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_TO_JDBC_DATA_SQL); String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL);
executor.beginBatch(sql); executor.beginBatch(sql);
try { try {
int numberOfRows = 0; int numberOfRows = 0;

View File

@ -216,7 +216,7 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008); GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
} }
context.setString(GenericJdbcConnectorConstants.CONNECTOR_TO_JDBC_DATA_SQL, context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL,
dataSql); dataSql);
} }
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.sqoop.connector.jdbc; package org.apache.sqoop.connector.jdbc;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration; import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
@ -67,43 +68,48 @@ public Validation validateConnection(Object configuration) {
@Override @Override
public Validation validateJob(Object jobConfiguration) { public Validation validateJob(Object jobConfiguration) {
return super.validateJob(jobConfiguration); if (jobConfiguration instanceof FromJobConfiguration) {
return validateFromJobConfiguration((FromJobConfiguration)jobConfiguration);
} else if (jobConfiguration instanceof ToJobConfiguration) {
return validateToJobConfiguration((ToJobConfiguration)jobConfiguration);
} else {
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0020,
"Configuration object for unknown direction.");
}
} }
private Validation validateExportJob(Object jobConfiguration) { private Validation validateToJobConfiguration(ToJobConfiguration configuration) {
Validation validation = new Validation(ToJobConfiguration.class); Validation validation = new Validation(ToJobConfiguration.class);
ToJobConfiguration configuration = (ToJobConfiguration)jobConfiguration;
if(configuration.toTable.tableName == null && configuration.toTable.sql == null) { if(configuration.toTable.tableName == null && configuration.toTable.sql == null) {
validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Either fromTable name or SQL must be specified"); validation.addMessage(Status.UNACCEPTABLE, "toTable", "Either table name or SQL must be specified");
} }
if(configuration.toTable.tableName != null && configuration.toTable.sql != null) { if(configuration.toTable.tableName != null && configuration.toTable.sql != null) {
validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Both fromTable name and SQL cannot be specified"); validation.addMessage(Status.UNACCEPTABLE, "toTable", "Both table name and SQL cannot be specified");
} }
if(configuration.toTable.tableName == null && if(configuration.toTable.tableName == null &&
configuration.toTable.stageTableName != null) { configuration.toTable.stageTableName != null) {
validation.addMessage(Status.UNACCEPTABLE, "fromTable", validation.addMessage(Status.UNACCEPTABLE, "toTable",
"Stage fromTable name cannot be specified without specifying fromTable name"); "Stage table name cannot be specified without specifying table name");
} }
if(configuration.toTable.stageTableName == null && if(configuration.toTable.stageTableName == null &&
configuration.toTable.clearStageTable != null) { configuration.toTable.clearStageTable != null) {
validation.addMessage(Status.UNACCEPTABLE, "fromTable", validation.addMessage(Status.UNACCEPTABLE, "toTable",
"Clear stage fromTable cannot be specified without specifying name of " + "Clear stage table cannot be specified without specifying name of " +
"the stage fromTable."); "the stage table.");
} }
return validation; return validation;
} }
private Validation validateImportJob(Object jobConfiguration) { private Validation validateFromJobConfiguration(FromJobConfiguration configuration) {
Validation validation = new Validation(FromJobConfiguration.class); Validation validation = new Validation(ToJobConfiguration.class);
FromJobConfiguration configuration = (FromJobConfiguration)jobConfiguration;
if(configuration.fromTable.tableName == null && configuration.fromTable.sql == null) { if(configuration.fromTable.tableName == null && configuration.fromTable.sql == null) {
validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Either fromTable name or SQL must be specified"); validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Either table name or SQL must be specified");
} }
if(configuration.fromTable.tableName != null && configuration.fromTable.sql != null) { if(configuration.fromTable.tableName != null && configuration.fromTable.sql != null) {
validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Both fromTable name and SQL cannot be specified"); validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Both table name and SQL cannot be specified");
} }
if(configuration.fromTable.schemaName != null && configuration.fromTable.sql != null) { if(configuration.fromTable.schemaName != null && configuration.fromTable.sql != null) {
validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Both schema name and SQL cannot be specified"); validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Both schema name and SQL cannot be specified");

View File

@ -20,69 +20,69 @@
import junit.framework.TestCase; import junit.framework.TestCase;
public class GenericJdbcExecutorTest extends TestCase { public class GenericJdbcExecutorTest extends TestCase {
// private final String fromTable; private final String table;
// private final String emptyTable; private final String emptyTable;
// private final GenericJdbcExecutor executor; private final GenericJdbcExecutor executor;
//
// private static final int START = -50; private static final int START = -50;
// private static final int NUMBER_OF_ROWS = 974; private static final int NUMBER_OF_ROWS = 974;
//
// public GenericJdbcExecutorTest() { public GenericJdbcExecutorTest() {
// fromTable = getClass().getSimpleName().toUpperCase(); table = getClass().getSimpleName().toUpperCase();
// emptyTable = fromTable + "_EMPTY"; emptyTable = table + "_EMPTY";
// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
// GenericJdbcTestConstants.URL, null, null); GenericJdbcTestConstants.URL, null, null);
// } }
//
// @Override @Override
// public void setUp() { public void setUp() {
// if(executor.existTable(emptyTable)) { if(executor.existTable(emptyTable)) {
// executor.executeUpdate("DROP TABLE " + emptyTable); executor.executeUpdate("DROP TABLE " + emptyTable);
// } }
// executor.executeUpdate("CREATE TABLE " executor.executeUpdate("CREATE TABLE "
// + emptyTable + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))"); + emptyTable + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
//
// if(executor.existTable(fromTable)) { if(executor.existTable(table)) {
// executor.executeUpdate("DROP TABLE " + fromTable); executor.executeUpdate("DROP TABLE " + table);
// } }
// executor.executeUpdate("CREATE TABLE " executor.executeUpdate("CREATE TABLE "
// + fromTable + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))"); + table + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
//
// for (int i = 0; i < NUMBER_OF_ROWS; i++) { for (int i = 0; i < NUMBER_OF_ROWS; i++) {
// int value = START + i; int value = START + i;
// String sql = "INSERT INTO " + fromTable String sql = "INSERT INTO " + table
// + " VALUES(" + value + ", '" + value + "')"; + " VALUES(" + value + ", '" + value + "')";
// executor.executeUpdate(sql); executor.executeUpdate(sql);
// } }
// } }
//
// @SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
// public void testDeleteTableData() throws Exception { public void testDeleteTableData() throws Exception {
// executor.deleteTableData(fromTable); executor.deleteTableData(table);
// assertEquals("Table " + fromTable + " is expected to be empty.", assertEquals("Table " + table + " is expected to be empty.",
// 0, executor.getTableRowCount(fromTable)); 0, executor.getTableRowCount(table));
// } }
//
// @SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
// public void testMigrateData() throws Exception { public void testMigrateData() throws Exception {
// assertEquals("Table " + emptyTable + " is expected to be empty.", assertEquals("Table " + emptyTable + " is expected to be empty.",
// 0, executor.getTableRowCount(emptyTable)); 0, executor.getTableRowCount(emptyTable));
// assertEquals("Table " + fromTable + " is expected to have " + assertEquals("Table " + table + " is expected to have " +
// NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS, NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS,
// executor.getTableRowCount(fromTable)); executor.getTableRowCount(table));
//
// executor.migrateData(fromTable, emptyTable); executor.migrateData(table, emptyTable);
//
// assertEquals("Table " + fromTable + " is expected to be empty.", 0, assertEquals("Table " + table + " is expected to be empty.", 0,
// executor.getTableRowCount(fromTable)); executor.getTableRowCount(table));
// assertEquals("Table " + emptyTable + " is expected to have " + assertEquals("Table " + emptyTable + " is expected to have " +
// NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS, NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS,
// executor.getTableRowCount(emptyTable)); executor.getTableRowCount(emptyTable));
// } }
//
// @SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
// public void testGetTableRowCount() throws Exception { public void testGetTableRowCount() throws Exception {
// assertEquals("Table " + fromTable + " is expected to be empty.", assertEquals("Table " + table + " is expected to be empty.",
// NUMBER_OF_ROWS, executor.getTableRowCount(fromTable)); NUMBER_OF_ROWS, executor.getTableRowCount(table));
// } }
} }

View File

@ -1,365 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc;
import junit.framework.TestCase;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
//import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.Validation;
public class TestExportInitializer extends TestCase {
// private final String schemaName;
// private final String tableName;
// private final String schemalessTableName;
// private final String stageTableName;
// private final String tableSql;
// private final String schemalessTableSql;
// private final String tableColumns;
//
// private GenericJdbcExecutor executor;
//
// public TestExportInitializer() {
// schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA";
// tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA";
// schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE";
// stageTableName = getClass().getSimpleName().toUpperCase() +
// "_STAGE_TABLE";
// tableSql = "INSERT INTO " + tableName + " VALUES (?,?,?)";
// schemalessTableSql = "INSERT INTO " + schemalessTableName + " VALUES (?,?,?)";
// tableColumns = "ICOL,VCOL";
// }
//
// @Override
// public void setUp() {
// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
// GenericJdbcTestConstants.URL, null, null);
//
// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
// if (!executor.existTable(tableName)) {
// executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName));
// executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
// }
//
// fullTableName = executor.delimitIdentifier(schemalessTableName);
// if (!executor.existTable(schemalessTableName)) {
// executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
// }
// }
//
// @Override
// public void tearDown() {
// executor.close();
// }
//
// @SuppressWarnings("unchecked")
// public void testTableName() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ExportJobConfiguration jobConf = new ExportJobConfiguration();
//
// String fullTableName = executor.delimitIdentifier(schemalessTableName);
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.tableName = schemalessTableName;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcExportInitializer();
// initializer.initialize(initializerContext, connConf, jobConf);
//
// verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)");
// }
//
// @SuppressWarnings("unchecked")
// public void testTableNameWithTableColumns() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ExportJobConfiguration jobConf = new ExportJobConfiguration();
//
// String fullTableName = executor.delimitIdentifier(schemalessTableName);
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.tableName = schemalessTableName;
// jobConf.fromTable.columns = tableColumns;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcExportInitializer();
// initializer.initialize(initializerContext, connConf, jobConf);
//
// verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)");
// }
//
// @SuppressWarnings("unchecked")
// public void testTableSql() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ExportJobConfiguration jobConf = new ExportJobConfiguration();
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.sql = schemalessTableSql;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcExportInitializer();
// initializer.initialize(initializerContext, connConf, jobConf);
//
// verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(schemalessTableName) + " VALUES (?,?,?)");
// }
//
// @SuppressWarnings("unchecked")
// public void testTableNameWithSchema() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ExportJobConfiguration jobConf = new ExportJobConfiguration();
//
// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.schemaName = schemaName;
// jobConf.fromTable.tableName = tableName;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcExportInitializer();
// initializer.initialize(initializerContext, connConf, jobConf);
//
// verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)");
// }
//
// @SuppressWarnings("unchecked")
// public void testTableNameWithTableColumnsWithSchema() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ExportJobConfiguration jobConf = new ExportJobConfiguration();
//
// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.schemaName = schemaName;
// jobConf.fromTable.tableName = tableName;
// jobConf.fromTable.columns = tableColumns;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcExportInitializer();
// initializer.initialize(initializerContext, connConf, jobConf);
//
// verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)");
// }
//
// @SuppressWarnings("unchecked")
// public void testTableSqlWithSchema() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ExportJobConfiguration jobConf = new ExportJobConfiguration();
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.schemaName = schemaName;
// jobConf.fromTable.sql = tableSql;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcExportInitializer();
// initializer.initialize(initializerContext, connConf, jobConf);
//
// verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?)");
// }
//
// private void verifyResult(MutableContext context, String dataSql) {
// assertEquals(dataSql, context.getString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL));
// }
//
// private void createTable(String tableName) {
// try {
// executor.executeUpdate("DROP TABLE " + tableName);
// } catch(SqoopException e) {
// //Ok to fail as the table might not exist
// }
// executor.executeUpdate("CREATE TABLE " + tableName +
// "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
// }
//
// public void testNonExistingStageTable() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ExportJobConfiguration jobConf = new ExportJobConfiguration();
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.tableName = schemalessTableName;
// jobConf.fromTable.stageTableName = stageTableName;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcExportInitializer();
// try {
// initializer.initialize(initializerContext, connConf, jobConf);
// fail("Initialization should fail for non-existing stage table.");
// } catch(SqoopException se) {
// //expected
// }
// }
//
// @SuppressWarnings("unchecked")
// public void testNonEmptyStageTable() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ExportJobConfiguration jobConf = new ExportJobConfiguration();
//
// String fullStageTableName = executor.delimitIdentifier(stageTableName);
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.tableName = schemalessTableName;
// jobConf.fromTable.stageTableName = stageTableName;
// createTable(fullStageTableName);
// executor.executeUpdate("INSERT INTO " + fullStageTableName +
// " VALUES(1, 1.1, 'one')");
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcExportInitializer();
// try {
// initializer.initialize(initializerContext, connConf, jobConf);
// fail("Initialization should fail for non-empty stage table.");
// } catch(SqoopException se) {
// //expected
// }
// }
//
// @SuppressWarnings("unchecked")
// public void testClearStageTableValidation() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ExportJobConfiguration jobConf = new ExportJobConfiguration();
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// //specifying clear stage table flag without specifying name of
// // the stage table
// jobConf.fromTable.tableName = schemalessTableName;
// jobConf.fromTable.clearStageTable = false;
// GenericJdbcValidator validator = new GenericJdbcValidator();
// Validation validation = validator.validateJob(MJob.Type.EXPORT, jobConf);
// assertEquals("User should not specify clear stage table flag without " +
// "specifying name of the stage table",
// Status.UNACCEPTABLE,
// validation.getStatus());
// assertTrue(validation.getMessages().containsKey(
// new Validation.FormInput("fromTable")));
//
// jobConf.fromTable.clearStageTable = true;
// validation = validator.validateJob(MJob.Type.EXPORT, jobConf);
// assertEquals("User should not specify clear stage table flag without " +
// "specifying name of the stage table",
// Status.UNACCEPTABLE,
// validation.getStatus());
// assertTrue(validation.getMessages().containsKey(
// new Validation.FormInput("fromTable")));
// }
//
// @SuppressWarnings("unchecked")
// public void testStageTableWithoutTable() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ExportJobConfiguration jobConf = new ExportJobConfiguration();
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// //specifying stage table without specifying table name
// jobConf.fromTable.stageTableName = stageTableName;
// jobConf.fromTable.sql = "";
//
// GenericJdbcValidator validator = new GenericJdbcValidator();
// Validation validation = validator.validateJob(MJob.Type.EXPORT, jobConf);
// assertEquals("Stage table name cannot be specified without specifying " +
// "table name", Status.UNACCEPTABLE, validation.getStatus());
// assertTrue(validation.getMessages().containsKey(
// new Validation.FormInput("fromTable")));
// }
//
// @SuppressWarnings("unchecked")
// public void testClearStageTable() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ExportJobConfiguration jobConf = new ExportJobConfiguration();
//
// String fullStageTableName = executor.delimitIdentifier(stageTableName);
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.tableName = schemalessTableName;
// jobConf.fromTable.stageTableName = stageTableName;
// jobConf.fromTable.clearStageTable = true;
// createTable(fullStageTableName);
// executor.executeUpdate("INSERT INTO " + fullStageTableName +
// " VALUES(1, 1.1, 'one')");
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcExportInitializer();
// initializer.initialize(initializerContext, connConf, jobConf);
// assertEquals("Stage table should have been cleared", 0,
// executor.getTableRowCount(stageTableName));
// }
//
// @SuppressWarnings("unchecked")
// public void testStageTable() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ExportJobConfiguration jobConf = new ExportJobConfiguration();
//
// String fullStageTableName = executor.delimitIdentifier(stageTableName);
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.tableName = schemalessTableName;
// jobConf.fromTable.stageTableName = stageTableName;
// createTable(fullStageTableName);
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcExportInitializer();
// initializer.initialize(initializerContext, connConf, jobConf);
//
// verifyResult(context, "INSERT INTO " + fullStageTableName +
// " VALUES (?,?,?)");
// }
}

View File

@ -1,143 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.sql.ResultSet;
import java.util.Arrays;
import java.util.Collection;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
//import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class TestExportLoader {
// private final String tableName;
//
// private GenericJdbcExecutor executor;
//
// private static final int START = -50;
//
// private int numberOfRows;
//
// @Parameters
// public static Collection<Object[]> data() {
// return Arrays.asList(new Object[][] {{50}, {100}, {101}, {150}, {200}});
// }
//
// public TestExportLoader(int numberOfRows) {
// this.numberOfRows = numberOfRows;
// tableName = getClass().getSimpleName().toUpperCase();
// }
//
// @Before
// public void setUp() {
// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
// GenericJdbcTestConstants.URL, null, null);
//
// if (!executor.existTable(tableName)) {
// executor.executeUpdate("CREATE TABLE "
// + executor.delimitIdentifier(tableName)
// + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
// } else {
// executor.deleteTableData(tableName);
// }
// }
//
// @After
// public void tearDown() {
// executor.close();
// }
//
// @Test
// public void testInsert() throws Exception {
// MutableContext context = new MutableMapContext();
//
// ConnectionConfiguration connectionConfig = new ConnectionConfiguration();
//
// connectionConfig.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connectionConfig.connection.connectionString = GenericJdbcTestConstants.URL;
//
// ExportJobConfiguration jobConfig = new ExportJobConfiguration();
//
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL,
// "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?)");
//
// Loader loader = new GenericJdbcExportLoader();
// DummyReader reader = new DummyReader();
// LoaderContext loaderContext = new LoaderContext(context, reader, null);
// loader.load(loaderContext, connectionConfig, jobConfig);
//
// int index = START;
// ResultSet rs = executor.executeQuery("SELECT * FROM "
// + executor.delimitIdentifier(tableName) + " ORDER BY ICOL");
// while (rs.next()) {
// assertEquals(index, rs.getObject(1));
// assertEquals((double) index, rs.getObject(2));
// assertEquals(String.valueOf(index), rs.getObject(3));
// index++;
// }
// assertEquals(numberOfRows, index-START);
// }
//
// public class DummyReader extends DataReader {
// int index = 0;
//
// @Override
// public Object[] readArrayRecord() {
// if (index < numberOfRows) {
// Object[] array = new Object[] {
// START + index,
// (double) (START + index),
// String.valueOf(START+index) };
// index++;
// return array;
// } else {
// return null;
// }
// }
//
// @Override
// public String readTextRecord() {
// fail("This method should not be invoked.");
// return null;
// }
//
// @Override
// public Object readContent() throws Exception {
// fail("This method should not be invoked.");
// return null;
// }
//
// }
}

View File

@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc;
import junit.framework.TestCase;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.etl.io.DataWriter;
public class TestExtractor extends TestCase {
private final String tableName;
private GenericJdbcExecutor executor;
private static final int START = -50;
private static final int NUMBER_OF_ROWS = 101;
public TestExtractor() {
tableName = getClass().getSimpleName().toUpperCase();
}
@Override
public void setUp() {
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
GenericJdbcTestConstants.URL, null, null);
if (!executor.existTable(tableName)) {
executor.executeUpdate("CREATE TABLE "
+ executor.delimitIdentifier(tableName)
+ "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
for (int i = 0; i < NUMBER_OF_ROWS; i++) {
int value = START + i;
String sql = "INSERT INTO " + executor.delimitIdentifier(tableName)
+ " VALUES(" + value + ", " + value + ", '" + value + "')";
executor.executeUpdate(sql);
}
}
}
@Override
public void tearDown() {
executor.close();
}
public void testQuery() throws Exception {
MutableContext context = new MutableMapContext();
ConnectionConfiguration connectionConfig = new ConnectionConfiguration();
connectionConfig.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connectionConfig.connection.connectionString = GenericJdbcTestConstants.URL;
FromJobConfiguration jobConfig = new FromJobConfiguration();
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
"SELECT * FROM " + executor.delimitIdentifier(tableName) + " WHERE ${CONDITIONS}");
GenericJdbcPartition partition;
Extractor extractor = new GenericJdbcExtractor();
DummyWriter writer = new DummyWriter();
ExtractorContext extractorContext = new ExtractorContext(context, writer, null);
partition = new GenericJdbcPartition();
partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665");
extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
partition = new GenericJdbcPartition();
partition.setConditions("-16.6666666666666665 <= DCOL AND DCOL < 16.666666666666667");
extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
partition = new GenericJdbcPartition();
partition.setConditions("16.666666666666667 <= DCOL AND DCOL <= 50.0");
extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
}
public void testSubquery() throws Exception {
MutableContext context = new MutableMapContext();
ConnectionConfiguration connectionConfig = new ConnectionConfiguration();
connectionConfig.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connectionConfig.connection.connectionString = GenericJdbcTestConstants.URL;
FromJobConfiguration jobConfig = new FromJobConfiguration();
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
"SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "
+ "(SELECT * FROM " + executor.delimitIdentifier(tableName)
+ " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
GenericJdbcPartition partition;
Extractor extractor = new GenericJdbcExtractor();
DummyWriter writer = new DummyWriter();
ExtractorContext extractorContext = new ExtractorContext(context, writer, null);
partition = new GenericJdbcPartition();
partition.setConditions("-50 <= ICOL AND ICOL < -16");
extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
partition = new GenericJdbcPartition();
partition.setConditions("-16 <= ICOL AND ICOL < 17");
extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
partition = new GenericJdbcPartition();
partition.setConditions("17 <= ICOL AND ICOL < 50");
extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
}
public class DummyWriter extends DataWriter {
int indx = START;
@Override
public void writeArrayRecord(Object[] array) {
for (int i = 0; i < array.length; i++) {
if (array[i] instanceof Integer) {
assertEquals(indx, ((Integer)array[i]).intValue());
} else if (array[i] instanceof Double) {
assertEquals((double)indx, ((Double)array[i]).doubleValue());
} else {
assertEquals(String.valueOf(indx), array[i].toString());
}
}
indx++;
}
@Override
public void writeStringRecord(String text) {
fail("This method should not be invoked.");
}
@Override
public void writeRecord(Object content) {
fail("This method should not be invoked.");
}
}
}

View File

@ -0,0 +1,404 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc;
import java.sql.Types;
import junit.framework.TestCase;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
import org.apache.sqoop.job.Constants;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.apache.sqoop.schema.type.Text;
public class TestFromInitializer extends TestCase {
private final String schemaName;
private final String tableName;
private final String schemalessTableName;
private final String tableSql;
private final String schemalessTableSql;
private final String tableColumns;
private GenericJdbcExecutor executor;
private static final int START = -50;
private static final int NUMBER_OF_ROWS = 101;
public TestFromInitializer() {
schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA";
tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA";
schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE";
tableSql = "SELECT * FROM " + schemaName + "." + tableName + " WHERE ${CONDITIONS}";
schemalessTableSql = "SELECT * FROM " + schemalessTableName + " WHERE ${CONDITIONS}";
tableColumns = "ICOL,VCOL";
}
@Override
public void setUp() {
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
GenericJdbcTestConstants.URL, null, null);
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
if (!executor.existTable(tableName)) {
executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName));
executor.executeUpdate("CREATE TABLE "
+ fullTableName
+ "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
for (int i = 0; i < NUMBER_OF_ROWS; i++) {
int value = START + i;
String sql = "INSERT INTO " + fullTableName
+ " VALUES(" + value + ", " + value + ", '" + value + "')";
executor.executeUpdate(sql);
}
}
fullTableName = executor.delimitIdentifier(schemalessTableName);
if (!executor.existTable(schemalessTableName)) {
executor.executeUpdate("CREATE TABLE "
+ fullTableName
+ "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
for (int i = 0; i < NUMBER_OF_ROWS; i++) {
int value = START + i;
String sql = "INSERT INTO " + fullTableName
+ " VALUES(" + value + ", " + value + ", '" + value + "')";
executor.executeUpdate(sql);
}
}
}
/**
* Return Schema representation for the testing table.
*
* @param name Name that should be used for the generated schema.
* @return
*/
public Schema getSchema(String name) {
return new Schema(name)
.addColumn(new FixedPoint("ICOL"))
.addColumn(new FloatingPoint("DCOL"))
.addColumn(new Text("VCOL"))
;
}
@Override
public void tearDown() {
executor.close();
}
@SuppressWarnings("unchecked")
public void testTableName() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.fromTable.tableName = schemalessTableName;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcFromInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
+ " WHERE ${CONDITIONS}",
"ICOL,DCOL,VCOL",
"ICOL",
String.valueOf(Types.INTEGER),
String.valueOf(START),
String.valueOf(START+NUMBER_OF_ROWS-1));
}
@SuppressWarnings("unchecked")
public void testTableNameWithTableColumns() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.fromTable.tableName = schemalessTableName;
jobConf.fromTable.columns = tableColumns;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcFromInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(schemalessTableName)
+ " WHERE ${CONDITIONS}",
tableColumns,
"ICOL",
String.valueOf(Types.INTEGER),
String.valueOf(START),
String.valueOf(START+NUMBER_OF_ROWS-1));
}
@SuppressWarnings("unchecked")
public void testTableSql() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.fromTable.sql = schemalessTableSql;
jobConf.fromTable.partitionColumn = "DCOL";
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcFromInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
+ " WHERE ${CONDITIONS}",
"ICOL,DCOL,VCOL",
"DCOL",
String.valueOf(Types.DOUBLE),
String.valueOf((double)START),
String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
}
@SuppressWarnings("unchecked")
public void testTableSqlWithTableColumns() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.fromTable.sql = schemalessTableSql;
jobConf.fromTable.columns = tableColumns;
jobConf.fromTable.partitionColumn = "DCOL";
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcFromInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "
+ "(SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
+ " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS",
tableColumns,
"DCOL",
String.valueOf(Types.DOUBLE),
String.valueOf((double)START),
String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
}
@SuppressWarnings("unchecked")
public void testTableNameWithSchema() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.fromTable.schemaName = schemaName;
jobConf.fromTable.tableName = tableName;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcFromInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"SELECT * FROM " + fullTableName
+ " WHERE ${CONDITIONS}",
"ICOL,DCOL,VCOL",
"ICOL",
String.valueOf(Types.INTEGER),
String.valueOf(START),
String.valueOf(START+NUMBER_OF_ROWS-1));
}
@SuppressWarnings("unchecked")
public void testTableNameWithTableColumnsWithSchema() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.fromTable.schemaName = schemaName;
jobConf.fromTable.tableName = tableName;
jobConf.fromTable.columns = tableColumns;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcFromInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"SELECT ICOL,VCOL FROM " + fullTableName
+ " WHERE ${CONDITIONS}",
tableColumns,
"ICOL",
String.valueOf(Types.INTEGER),
String.valueOf(START),
String.valueOf(START+NUMBER_OF_ROWS-1));
}
@SuppressWarnings("unchecked")
public void testTableSqlWithSchema() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.fromTable.schemaName = schemaName;
jobConf.fromTable.sql = tableSql;
jobConf.fromTable.partitionColumn = "DCOL";
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcFromInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"SELECT * FROM " + fullTableName
+ " WHERE ${CONDITIONS}",
"ICOL,DCOL,VCOL",
"DCOL",
String.valueOf(Types.DOUBLE),
String.valueOf((double)START),
String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
}
@SuppressWarnings("unchecked")
public void testGetSchemaForTable() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.fromTable.schemaName = schemaName;
jobConf.fromTable.tableName = tableName;
jobConf.fromTable.partitionColumn = "DCOL";
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcFromInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
Schema schema = initializer.getSchema(initializerContext, connConf, jobConf);
assertEquals(getSchema(jobConf.fromTable.schemaName + "." + tableName), schema);
}
@SuppressWarnings("unchecked")
public void testGetSchemaForSql() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.fromTable.schemaName = schemaName;
jobConf.fromTable.sql = tableSql;
jobConf.fromTable.partitionColumn = "DCOL";
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcFromInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
Schema schema = initializer.getSchema(initializerContext, connConf, jobConf);
assertEquals(getSchema("Query"), schema);
}
@SuppressWarnings("unchecked")
public void testTableSqlWithTableColumnsWithSchema() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.fromTable.schemaName = schemaName;
jobConf.fromTable.sql = tableSql;
jobConf.fromTable.columns = tableColumns;
jobConf.fromTable.partitionColumn = "DCOL";
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcFromInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "
+ "(SELECT * FROM " + fullTableName
+ " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS",
tableColumns,
"DCOL",
String.valueOf(Types.DOUBLE),
String.valueOf((double)START),
String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
}
private void verifyResult(MutableContext context,
String dataSql, String fieldNames,
String partitionColumnName, String partitionColumnType,
String partitionMinValue, String partitionMaxValue) {
assertEquals(dataSql, context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL));
assertEquals(fieldNames, context.getString(
Constants.JOB_ETL_FIELD_NAMES));
assertEquals(partitionColumnName, context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME));
assertEquals(partitionColumnType, context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE));
assertEquals(partitionMinValue, context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE));
assertEquals(partitionMaxValue, context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE));
}
}

View File

@ -1,160 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc;
import junit.framework.TestCase;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
//import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.etl.io.DataWriter;
public class TestImportExtractor extends TestCase {
// private final String tableName;
//
// private GenericJdbcExecutor executor;
//
// private static final int START = -50;
// private static final int NUMBER_OF_ROWS = 101;
//
// public TestImportExtractor() {
// tableName = getClass().getSimpleName().toUpperCase();
// }
//
// @Override
// public void setUp() {
// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
// GenericJdbcTestConstants.URL, null, null);
//
// if (!executor.existTable(tableName)) {
// executor.executeUpdate("CREATE TABLE "
// + executor.delimitIdentifier(tableName)
// + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
//
// for (int i = 0; i < NUMBER_OF_ROWS; i++) {
// int value = START + i;
// String sql = "INSERT INTO " + executor.delimitIdentifier(tableName)
// + " VALUES(" + value + ", " + value + ", '" + value + "')";
// executor.executeUpdate(sql);
// }
// }
// }
//
// @Override
// public void tearDown() {
// executor.close();
// }
//
// public void testQuery() throws Exception {
// MutableContext context = new MutableMapContext();
//
// ConnectionConfiguration connectionConfig = new ConnectionConfiguration();
//
// connectionConfig.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connectionConfig.connection.connectionString = GenericJdbcTestConstants.URL;
//
// ImportJobConfiguration jobConfig = new ImportJobConfiguration();
//
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL,
// "SELECT * FROM " + executor.delimitIdentifier(tableName) + " WHERE ${CONDITIONS}");
//
// GenericJdbcImportPartition partition;
//
// Extractor extractor = new GenericJdbcImportExtractor();
// DummyWriter writer = new DummyWriter();
// ExtractorContext extractorContext = new ExtractorContext(context, writer, null);
//
// partition = new GenericJdbcImportPartition();
// partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665");
// extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
//
// partition = new GenericJdbcImportPartition();
// partition.setConditions("-16.6666666666666665 <= DCOL AND DCOL < 16.666666666666667");
// extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
//
// partition = new GenericJdbcImportPartition();
// partition.setConditions("16.666666666666667 <= DCOL AND DCOL <= 50.0");
// extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
// }
//
// public void testSubquery() throws Exception {
// MutableContext context = new MutableMapContext();
//
// ConnectionConfiguration connectionConfig = new ConnectionConfiguration();
//
// connectionConfig.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connectionConfig.connection.connectionString = GenericJdbcTestConstants.URL;
//
// ImportJobConfiguration jobConfig = new ImportJobConfiguration();
//
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL,
// "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "
// + "(SELECT * FROM " + executor.delimitIdentifier(tableName)
// + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
//
// GenericJdbcImportPartition partition;
//
// Extractor extractor = new GenericJdbcImportExtractor();
// DummyWriter writer = new DummyWriter();
// ExtractorContext extractorContext = new ExtractorContext(context, writer, null);
//
// partition = new GenericJdbcImportPartition();
// partition.setConditions("-50 <= ICOL AND ICOL < -16");
// extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
//
// partition = new GenericJdbcImportPartition();
// partition.setConditions("-16 <= ICOL AND ICOL < 17");
// extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
//
// partition = new GenericJdbcImportPartition();
// partition.setConditions("17 <= ICOL AND ICOL < 50");
// extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
// }
//
// public class DummyWriter extends DataWriter {
// int indx = START;
//
// @Override
// public void writeArrayRecord(Object[] array) {
// for (int i = 0; i < array.length; i++) {
// if (array[i] instanceof Integer) {
// assertEquals(indx, ((Integer)array[i]).intValue());
// } else if (array[i] instanceof Double) {
// assertEquals((double)indx, ((Double)array[i]).doubleValue());
// } else {
// assertEquals(String.valueOf(indx), array[i].toString());
// }
// }
// indx++;
// }
//
// @Override
// public void writeStringRecord(String text) {
// fail("This method should not be invoked.");
// }
//
// @Override
// public void writeRecord(Object content) {
// fail("This method should not be invoked.");
// }
// }
}

View File

@ -1,404 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc;
import java.sql.Types;
import junit.framework.TestCase;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
//import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
import org.apache.sqoop.job.Constants;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.apache.sqoop.schema.type.Text;
public class TestImportInitializer extends TestCase {
// private final String schemaName;
// private final String tableName;
// private final String schemalessTableName;
// private final String tableSql;
// private final String schemalessTableSql;
// private final String tableColumns;
//
// private GenericJdbcExecutor executor;
//
// private static final int START = -50;
// private static final int NUMBER_OF_ROWS = 101;
//
// public TestImportInitializer() {
// schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA";
// tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA";
// schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE";
// tableSql = "SELECT * FROM " + schemaName + "." + tableName + " WHERE ${CONDITIONS}";
// schemalessTableSql = "SELECT * FROM " + schemalessTableName + " WHERE ${CONDITIONS}";
// tableColumns = "ICOL,VCOL";
// }
//
// @Override
// public void setUp() {
// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
// GenericJdbcTestConstants.URL, null, null);
//
// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
// if (!executor.existTable(tableName)) {
// executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName));
// executor.executeUpdate("CREATE TABLE "
// + fullTableName
// + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
//
// for (int i = 0; i < NUMBER_OF_ROWS; i++) {
// int value = START + i;
// String sql = "INSERT INTO " + fullTableName
// + " VALUES(" + value + ", " + value + ", '" + value + "')";
// executor.executeUpdate(sql);
// }
// }
//
// fullTableName = executor.delimitIdentifier(schemalessTableName);
// if (!executor.existTable(schemalessTableName)) {
// executor.executeUpdate("CREATE TABLE "
// + fullTableName
// + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
//
// for (int i = 0; i < NUMBER_OF_ROWS; i++) {
// int value = START + i;
// String sql = "INSERT INTO " + fullTableName
// + " VALUES(" + value + ", " + value + ", '" + value + "')";
// executor.executeUpdate(sql);
// }
// }
// }
//
// /**
// * Return Schema representation for the testing fromTable.
// *
// * @param name Name that should be used for the generated schema.
// * @return
// */
// public Schema getSchema(String name) {
// return new Schema(name)
// .addColumn(new FixedPoint("ICOL"))
// .addColumn(new FloatingPoint("DCOL"))
// .addColumn(new Text("VCOL"))
// ;
// }
//
// @Override
// public void tearDown() {
// executor.close();
// }
//
// @SuppressWarnings("unchecked")
// public void testTableName() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.tableName = schemalessTableName;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcImportInitializer();
// initializer.initialize(initializerContext, connConf, jobConf);
//
// verifyResult(context,
// "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
// + " WHERE ${CONDITIONS}",
// "ICOL,DCOL,VCOL",
// "ICOL",
// String.valueOf(Types.INTEGER),
// String.valueOf(START),
// String.valueOf(START+NUMBER_OF_ROWS-1));
// }
//
// @SuppressWarnings("unchecked")
// public void testTableNameWithTableColumns() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.tableName = schemalessTableName;
// jobConf.fromTable.columns = tableColumns;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcImportInitializer();
// initializer.initialize(initializerContext, connConf, jobConf);
//
// verifyResult(context,
// "SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(schemalessTableName)
// + " WHERE ${CONDITIONS}",
// tableColumns,
// "ICOL",
// String.valueOf(Types.INTEGER),
// String.valueOf(START),
// String.valueOf(START+NUMBER_OF_ROWS-1));
// }
//
// @SuppressWarnings("unchecked")
// public void testTableSql() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.sql = schemalessTableSql;
// jobConf.fromTable.partitionColumn = "DCOL";
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcImportInitializer();
// initializer.initialize(initializerContext, connConf, jobConf);
//
// verifyResult(context,
// "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
// + " WHERE ${CONDITIONS}",
// "ICOL,DCOL,VCOL",
// "DCOL",
// String.valueOf(Types.DOUBLE),
// String.valueOf((double)START),
// String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
// }
//
// @SuppressWarnings("unchecked")
// public void testTableSqlWithTableColumns() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.sql = schemalessTableSql;
// jobConf.fromTable.columns = tableColumns;
// jobConf.fromTable.partitionColumn = "DCOL";
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcImportInitializer();
// initializer.initialize(initializerContext, connConf, jobConf);
//
// verifyResult(context,
// "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "
// + "(SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
// + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS",
// tableColumns,
// "DCOL",
// String.valueOf(Types.DOUBLE),
// String.valueOf((double)START),
// String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
// }
//
// @SuppressWarnings("unchecked")
// public void testTableNameWithSchema() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.schemaName = schemaName;
// jobConf.fromTable.tableName = tableName;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcImportInitializer();
// initializer.initialize(initializerContext, connConf, jobConf);
//
// verifyResult(context,
// "SELECT * FROM " + fullTableName
// + " WHERE ${CONDITIONS}",
// "ICOL,DCOL,VCOL",
// "ICOL",
// String.valueOf(Types.INTEGER),
// String.valueOf(START),
// String.valueOf(START+NUMBER_OF_ROWS-1));
// }
//
// @SuppressWarnings("unchecked")
// public void testTableNameWithTableColumnsWithSchema() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.schemaName = schemaName;
// jobConf.fromTable.tableName = tableName;
// jobConf.fromTable.columns = tableColumns;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcImportInitializer();
// initializer.initialize(initializerContext, connConf, jobConf);
//
// verifyResult(context,
// "SELECT ICOL,VCOL FROM " + fullTableName
// + " WHERE ${CONDITIONS}",
// tableColumns,
// "ICOL",
// String.valueOf(Types.INTEGER),
// String.valueOf(START),
// String.valueOf(START+NUMBER_OF_ROWS-1));
// }
//
// @SuppressWarnings("unchecked")
// public void testTableSqlWithSchema() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.schemaName = schemaName;
// jobConf.fromTable.sql = tableSql;
// jobConf.fromTable.partitionColumn = "DCOL";
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcImportInitializer();
// initializer.initialize(initializerContext, connConf, jobConf);
//
// verifyResult(context,
// "SELECT * FROM " + fullTableName
// + " WHERE ${CONDITIONS}",
// "ICOL,DCOL,VCOL",
// "DCOL",
// String.valueOf(Types.DOUBLE),
// String.valueOf((double)START),
// String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
// }
//
//
// @SuppressWarnings("unchecked")
// public void testGetSchemaForTable() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.schemaName = schemaName;
// jobConf.fromTable.tableName = tableName;
// jobConf.fromTable.partitionColumn = "DCOL";
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcImportInitializer();
// initializer.initialize(initializerContext, connConf, jobConf);
// Schema schema = initializer.getSchema(initializerContext, connConf, jobConf);
// assertEquals(getSchema(jobConf.table.schemaName + "." + tableName), schema);
// }
//
// @SuppressWarnings("unchecked")
// public void testGetSchemaForSql() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.schemaName = schemaName;
// jobConf.fromTable.sql = tableSql;
// jobConf.fromTable.partitionColumn = "DCOL";
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcImportInitializer();
// initializer.initialize(initializerContext, connConf, jobConf);
// Schema schema = initializer.getSchema(initializerContext, connConf, jobConf);
// assertEquals(getSchema("Query"), schema);
// }
//
// @SuppressWarnings("unchecked")
// public void testTableSqlWithTableColumnsWithSchema() throws Exception {
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.fromTable.schemaName = schemaName;
// jobConf.fromTable.sql = tableSql;
// jobConf.fromTable.columns = tableColumns;
// jobConf.fromTable.partitionColumn = "DCOL";
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
//
// @SuppressWarnings("rawtypes")
// Initializer initializer = new GenericJdbcImportInitializer();
// initializer.initialize(initializerContext, connConf, jobConf);
//
// verifyResult(context,
// "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "
// + "(SELECT * FROM " + fullTableName
// + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS",
// tableColumns,
// "DCOL",
// String.valueOf(Types.DOUBLE),
// String.valueOf((double)START),
// String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
// }
//
// private void verifyResult(MutableContext context,
// String dataSql, String fieldNames,
// String partitionColumnName, String partitionColumnType,
// String partitionMinValue, String partitionMaxValue) {
// assertEquals(dataSql, context.getString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL));
// assertEquals(fieldNames, context.getString(
// Constants.JOB_ETL_FIELD_NAMES));
//
// assertEquals(partitionColumnName, context.getString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME));
// assertEquals(partitionColumnType, context.getString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE));
// assertEquals(partitionMinValue, context.getString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE));
// assertEquals(partitionMaxValue, context.getString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE));
// }
}

View File

@ -1,505 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.Iterator;
import java.util.List;
import junit.framework.TestCase;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
//import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
import org.apache.sqoop.job.Constants;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
public class TestImportPartitioner extends TestCase {
// private static final int START = -5;
// private static final int NUMBER_OF_ROWS = 11;
//
// public void testIntegerEvenPartition() throws Exception {
// MutableContext context = new MutableMapContext();
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
// "ICOL");
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE,
// String.valueOf(Types.INTEGER));
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
// String.valueOf(START));
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
// String.valueOf(START + NUMBER_OF_ROWS - 1));
//
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// Partitioner partitioner = new GenericJdbcImportPartitioner();
// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
//
// verifyResult(partitions, new String[] {
// "-5 <= ICOL AND ICOL < -3",
// "-3 <= ICOL AND ICOL < -1",
// "-1 <= ICOL AND ICOL < 1",
// "1 <= ICOL AND ICOL < 3",
// "3 <= ICOL AND ICOL <= 5"
// });
// }
//
// public void testIntegerUnevenPartition() throws Exception {
// MutableContext context = new MutableMapContext();
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
// "ICOL");
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE,
// String.valueOf(Types.INTEGER));
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
// String.valueOf(START));
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
// String.valueOf(START + NUMBER_OF_ROWS - 1));
//
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// Partitioner partitioner = new GenericJdbcImportPartitioner();
// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
//
// verifyResult(partitions, new String[] {
// "-5 <= ICOL AND ICOL < -1",
// "-1 <= ICOL AND ICOL < 2",
// "2 <= ICOL AND ICOL <= 5"
// });
// }
//
// public void testIntegerOverPartition() throws Exception {
// MutableContext context = new MutableMapContext();
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
// "ICOL");
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE,
// String.valueOf(Types.INTEGER));
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
// String.valueOf(START));
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
// String.valueOf(START + NUMBER_OF_ROWS - 1));
//
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// Partitioner partitioner = new GenericJdbcImportPartitioner();
// PartitionerContext partitionerContext = new PartitionerContext(context, 13, null);
// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
//
// verifyResult(partitions, new String[] {
// "-5 <= ICOL AND ICOL < -4",
// "-4 <= ICOL AND ICOL < -3",
// "-3 <= ICOL AND ICOL < -2",
// "-2 <= ICOL AND ICOL < -1",
// "-1 <= ICOL AND ICOL < 0",
// "0 <= ICOL AND ICOL < 1",
// "1 <= ICOL AND ICOL < 2",
// "2 <= ICOL AND ICOL < 3",
// "3 <= ICOL AND ICOL < 4",
// "4 <= ICOL AND ICOL <= 5"
// });
// }
//
// public void testFloatingPointEvenPartition() throws Exception {
// MutableContext context = new MutableMapContext();
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
// "DCOL");
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE,
// String.valueOf(Types.DOUBLE));
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
// String.valueOf((double)START));
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
// String.valueOf((double)(START + NUMBER_OF_ROWS - 1)));
//
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// Partitioner partitioner = new GenericJdbcImportPartitioner();
// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
//
// verifyResult(partitions, new String[] {
// "-5.0 <= DCOL AND DCOL < -3.0",
// "-3.0 <= DCOL AND DCOL < -1.0",
// "-1.0 <= DCOL AND DCOL < 1.0",
// "1.0 <= DCOL AND DCOL < 3.0",
// "3.0 <= DCOL AND DCOL <= 5.0"
// });
// }
//
// public void testFloatingPointUnevenPartition() throws Exception {
// MutableContext context = new MutableMapContext();
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
// "DCOL");
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE,
// String.valueOf(Types.DOUBLE));
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
// String.valueOf((double)START));
// context.setString(
// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
// String.valueOf((double)(START + NUMBER_OF_ROWS - 1)));
//
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// Partitioner partitioner = new GenericJdbcImportPartitioner();
// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
//
// verifyResult(partitions, new String[] {
// "-5.0 <= DCOL AND DCOL < -1.6666666666666665",
// "-1.6666666666666665 <= DCOL AND DCOL < 1.666666666666667",
// "1.666666666666667 <= DCOL AND DCOL <= 5.0"
// });
// }
//
// public void testNumericEvenPartition() throws Exception {
// MutableContext context = new MutableMapContext();
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "ICOL");
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC));
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(START));
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(START + NUMBER_OF_ROWS - 1));
//
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// Partitioner partitioner = new GenericJdbcImportPartitioner();
// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
//
// verifyResult(partitions, new String[] {
// "-5 <= ICOL AND ICOL < -3",
// "-3 <= ICOL AND ICOL < -1",
// "-1 <= ICOL AND ICOL < 1",
// "1 <= ICOL AND ICOL < 3",
// "3 <= ICOL AND ICOL <= 5"
// });
// }
//
// public void testNumericUnevenPartition() throws Exception {
// MutableContext context = new MutableMapContext();
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL");
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC));
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(START)));
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(START + NUMBER_OF_ROWS - 1)));
//
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// Partitioner partitioner = new GenericJdbcImportPartitioner();
// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
//
// verifyResult(partitions, new String[]{
// "-5 <= DCOL AND DCOL < -2",
// "-2 <= DCOL AND DCOL < 1",
// "1 <= DCOL AND DCOL <= 5"
// });
// }
//
// public void testNumericSinglePartition() throws Exception {
// MutableContext context = new MutableMapContext();
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL");
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC));
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(START)));
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(START)));
//
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// Partitioner partitioner = new GenericJdbcImportPartitioner();
// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
//
// verifyResult(partitions, new String[]{
// "DCOL = -5",
// });
// }
//
//
// public void testDatePartition() throws Exception {
// MutableContext context = new MutableMapContext();
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL");
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.DATE));
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
// Date.valueOf("2004-10-20").toString());
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_MAXVALUE, Date.valueOf("2013-10-17")
// .toString());
//
//
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// Partitioner partitioner = new GenericJdbcImportPartitioner();
// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
//
//
// verifyResult(partitions, new String[]{
// "'2004-10-20' <= DCOL AND DCOL < '2007-10-19'",
// "'2007-10-19' <= DCOL AND DCOL < '2010-10-18'",
// "'2010-10-18' <= DCOL AND DCOL <= '2013-10-17'",
// });
//
// }
//
// public void testTimePartition() throws Exception {
// MutableContext context = new MutableMapContext();
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TCOL");
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIME));
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
// Time.valueOf("01:01:01").toString());
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
// Time.valueOf("10:40:50").toString());
//
//
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// Partitioner partitioner = new GenericJdbcImportPartitioner();
// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
//
// verifyResult(partitions, new String[]{
// "'01:01:01' <= TCOL AND TCOL < '04:14:17'",
// "'04:14:17' <= TCOL AND TCOL < '07:27:33'",
// "'07:27:33' <= TCOL AND TCOL <= '10:40:50'",
// });
// }
//
// public void testTimestampPartition() throws Exception {
// MutableContext context = new MutableMapContext();
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TSCOL");
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIMESTAMP));
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
// Timestamp.valueOf("2013-01-01 01:01:01.123").toString());
// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
// Timestamp.valueOf("2013-12-31 10:40:50.654").toString());
//
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// Partitioner partitioner = new GenericJdbcImportPartitioner();
// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
// verifyResult(partitions, new String[]{
// "'2013-01-01 01:01:01.123' <= TSCOL AND TSCOL < '2013-05-02 12:14:17.634'",
// "'2013-05-02 12:14:17.634' <= TSCOL AND TSCOL < '2013-08-31 23:27:34.144'",
// "'2013-08-31 23:27:34.144' <= TSCOL AND TSCOL <= '2013-12-31 10:40:50.654'",
// });
// }
//
// public void testBooleanPartition() throws Exception {
// MutableContext context = new MutableMapContext();
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "BCOL");
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.BOOLEAN));
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_MINVALUE, "0");
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_MAXVALUE, "1");
//
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// Partitioner partitioner = new GenericJdbcImportPartitioner();
// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
// verifyResult(partitions, new String[]{
// "BCOL = TRUE",
// "BCOL = FALSE",
// });
// }
//
// public void testVarcharPartition() throws Exception {
// MutableContext context = new MutableMapContext();
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL");
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR));
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_MINVALUE, "A");
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_MAXVALUE, "Z");
//
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// Partitioner partitioner = new GenericJdbcImportPartitioner();
// PartitionerContext partitionerContext = new PartitionerContext(context, 25, null);
// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
//
// verifyResult(partitions, new String[] {
// "'A' <= VCCOL AND VCCOL < 'B'",
// "'B' <= VCCOL AND VCCOL < 'C'",
// "'C' <= VCCOL AND VCCOL < 'D'",
// "'D' <= VCCOL AND VCCOL < 'E'",
// "'E' <= VCCOL AND VCCOL < 'F'",
// "'F' <= VCCOL AND VCCOL < 'G'",
// "'G' <= VCCOL AND VCCOL < 'H'",
// "'H' <= VCCOL AND VCCOL < 'I'",
// "'I' <= VCCOL AND VCCOL < 'J'",
// "'J' <= VCCOL AND VCCOL < 'K'",
// "'K' <= VCCOL AND VCCOL < 'L'",
// "'L' <= VCCOL AND VCCOL < 'M'",
// "'M' <= VCCOL AND VCCOL < 'N'",
// "'N' <= VCCOL AND VCCOL < 'O'",
// "'O' <= VCCOL AND VCCOL < 'P'",
// "'P' <= VCCOL AND VCCOL < 'Q'",
// "'Q' <= VCCOL AND VCCOL < 'R'",
// "'R' <= VCCOL AND VCCOL < 'S'",
// "'S' <= VCCOL AND VCCOL < 'T'",
// "'T' <= VCCOL AND VCCOL < 'U'",
// "'U' <= VCCOL AND VCCOL < 'V'",
// "'V' <= VCCOL AND VCCOL < 'W'",
// "'W' <= VCCOL AND VCCOL < 'X'",
// "'X' <= VCCOL AND VCCOL < 'Y'",
// "'Y' <= VCCOL AND VCCOL <= 'Z'",
// });
// }
//
// public void testVarcharPartition2() throws Exception {
// MutableContext context = new MutableMapContext();
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL");
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR));
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_MINVALUE, "Breezy Badger");
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_MAXVALUE, "Warty Warthog");
//
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
// Partitioner partitioner = new GenericJdbcImportPartitioner();
// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
// assertEquals(partitions.size(), 5);
// // First partition needs to contain entire upper bound
// assertTrue(partitions.get(0).toString().contains("Breezy Badger"));
// // Last partition needs to contain entire lower bound
// assertTrue(partitions.get(4).toString().contains("Warty Warthog"));
// }
//
// public void testVarcharPartitionWithCommonPrefix() throws Exception {
// MutableContext context = new MutableMapContext();
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL");
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR));
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_MINVALUE, "AAA");
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_MAXVALUE, "AAF");
//
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
//
// Partitioner partitioner = new GenericJdbcImportPartitioner();
// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
//
// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
//
// verifyResult(partitions, new String[] {
// "'AAA' <= VCCOL AND VCCOL < 'AAB'",
// "'AAB' <= VCCOL AND VCCOL < 'AAC'",
// "'AAC' <= VCCOL AND VCCOL < 'AAD'",
// "'AAD' <= VCCOL AND VCCOL < 'AAE'",
// "'AAE' <= VCCOL AND VCCOL <= 'AAF'",
// });
//
// }
//
// public void testPatitionWithNullValues() throws Exception {
// MutableContext context = new MutableMapContext();
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL");
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR));
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_MINVALUE, "AAA");
// context.setString(GenericJdbcConnectorConstants
// .CONNECTOR_JDBC_PARTITION_MAXVALUE, "AAE");
//
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
// jobConf.fromTable.partitionColumnNull = true;
//
// Partitioner partitioner = new GenericJdbcImportPartitioner();
// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
//
// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
//
// verifyResult(partitions, new String[] {
// "VCCOL IS NULL",
// "'AAA' <= VCCOL AND VCCOL < 'AAB'",
// "'AAB' <= VCCOL AND VCCOL < 'AAC'",
// "'AAC' <= VCCOL AND VCCOL < 'AAD'",
// "'AAD' <= VCCOL AND VCCOL <= 'AAE'",
// });
//
// }
//
// private void verifyResult(List<Partition> partitions,
// String[] expected) {
// assertEquals(expected.length, partitions.size());
//
// Iterator<Partition> iterator = partitions.iterator();
// for (int i = 0; i < expected.length; i++) {
// assertEquals(expected[i],
// ((GenericJdbcImportPartition)iterator.next()).getConditions());
// }
// }
}

View File

@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.sql.ResultSet;
import java.util.Arrays;
import java.util.Collection;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class TestLoader {
private final String tableName;
private GenericJdbcExecutor executor;
private static final int START = -50;
private int numberOfRows;
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {{50}, {100}, {101}, {150}, {200}});
}
public TestLoader(int numberOfRows) {
this.numberOfRows = numberOfRows;
tableName = getClass().getSimpleName().toUpperCase();
}
@Before
public void setUp() {
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
GenericJdbcTestConstants.URL, null, null);
if (!executor.existTable(tableName)) {
executor.executeUpdate("CREATE TABLE "
+ executor.delimitIdentifier(tableName)
+ "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
} else {
executor.deleteTableData(tableName);
}
}
@After
public void tearDown() {
executor.close();
}
@Test
public void testInsert() throws Exception {
MutableContext context = new MutableMapContext();
ConnectionConfiguration connectionConfig = new ConnectionConfiguration();
connectionConfig.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connectionConfig.connection.connectionString = GenericJdbcTestConstants.URL;
ToJobConfiguration jobConfig = new ToJobConfiguration();
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL,
"INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?)");
Loader loader = new GenericJdbcLoader();
DummyReader reader = new DummyReader();
LoaderContext loaderContext = new LoaderContext(context, reader, null);
loader.load(loaderContext, connectionConfig, jobConfig);
int index = START;
ResultSet rs = executor.executeQuery("SELECT * FROM "
+ executor.delimitIdentifier(tableName) + " ORDER BY ICOL");
while (rs.next()) {
assertEquals(index, rs.getObject(1));
assertEquals((double) index, rs.getObject(2));
assertEquals(String.valueOf(index), rs.getObject(3));
index++;
}
assertEquals(numberOfRows, index-START);
}
public class DummyReader extends DataReader {
int index = 0;
@Override
public Object[] readArrayRecord() {
if (index < numberOfRows) {
Object[] array = new Object[] {
START + index,
(double) (START + index),
String.valueOf(START+index) };
index++;
return array;
} else {
return null;
}
}
@Override
public String readTextRecord() {
fail("This method should not be invoked.");
return null;
}
@Override
public Object readContent() throws Exception {
fail("This method should not be invoked.");
return null;
}
}
}

View File

@ -0,0 +1,503 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.Iterator;
import java.util.List;
import junit.framework.TestCase;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
public class TestPartitioner extends TestCase {
private static final int START = -5;
private static final int NUMBER_OF_ROWS = 11;
public void testIntegerEvenPartition() throws Exception {
MutableContext context = new MutableMapContext();
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
"ICOL");
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE,
String.valueOf(Types.INTEGER));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
String.valueOf(START));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
String.valueOf(START + NUMBER_OF_ROWS - 1));
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
Partitioner partitioner = new GenericJdbcPartitioner();
PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
"-5 <= ICOL AND ICOL < -3",
"-3 <= ICOL AND ICOL < -1",
"-1 <= ICOL AND ICOL < 1",
"1 <= ICOL AND ICOL < 3",
"3 <= ICOL AND ICOL <= 5"
});
}
public void testIntegerUnevenPartition() throws Exception {
MutableContext context = new MutableMapContext();
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
"ICOL");
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE,
String.valueOf(Types.INTEGER));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
String.valueOf(START));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
String.valueOf(START + NUMBER_OF_ROWS - 1));
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
Partitioner partitioner = new GenericJdbcPartitioner();
PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
"-5 <= ICOL AND ICOL < -1",
"-1 <= ICOL AND ICOL < 2",
"2 <= ICOL AND ICOL <= 5"
});
}
public void testIntegerOverPartition() throws Exception {
MutableContext context = new MutableMapContext();
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
"ICOL");
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE,
String.valueOf(Types.INTEGER));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
String.valueOf(START));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
String.valueOf(START + NUMBER_OF_ROWS - 1));
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
Partitioner partitioner = new GenericJdbcPartitioner();
PartitionerContext partitionerContext = new PartitionerContext(context, 13, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
"-5 <= ICOL AND ICOL < -4",
"-4 <= ICOL AND ICOL < -3",
"-3 <= ICOL AND ICOL < -2",
"-2 <= ICOL AND ICOL < -1",
"-1 <= ICOL AND ICOL < 0",
"0 <= ICOL AND ICOL < 1",
"1 <= ICOL AND ICOL < 2",
"2 <= ICOL AND ICOL < 3",
"3 <= ICOL AND ICOL < 4",
"4 <= ICOL AND ICOL <= 5"
});
}
public void testFloatingPointEvenPartition() throws Exception {
MutableContext context = new MutableMapContext();
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
"DCOL");
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE,
String.valueOf(Types.DOUBLE));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
String.valueOf((double)START));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
String.valueOf((double)(START + NUMBER_OF_ROWS - 1)));
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
Partitioner partitioner = new GenericJdbcPartitioner();
PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
"-5.0 <= DCOL AND DCOL < -3.0",
"-3.0 <= DCOL AND DCOL < -1.0",
"-1.0 <= DCOL AND DCOL < 1.0",
"1.0 <= DCOL AND DCOL < 3.0",
"3.0 <= DCOL AND DCOL <= 5.0"
});
}
public void testFloatingPointUnevenPartition() throws Exception {
MutableContext context = new MutableMapContext();
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
"DCOL");
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE,
String.valueOf(Types.DOUBLE));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
String.valueOf((double)START));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
String.valueOf((double)(START + NUMBER_OF_ROWS - 1)));
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
Partitioner partitioner = new GenericJdbcPartitioner();
PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
"-5.0 <= DCOL AND DCOL < -1.6666666666666665",
"-1.6666666666666665 <= DCOL AND DCOL < 1.666666666666667",
"1.666666666666667 <= DCOL AND DCOL <= 5.0"
});
}
public void testNumericEvenPartition() throws Exception {
MutableContext context = new MutableMapContext();
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "ICOL");
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(START));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(START + NUMBER_OF_ROWS - 1));
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
Partitioner partitioner = new GenericJdbcPartitioner();
PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
"-5 <= ICOL AND ICOL < -3",
"-3 <= ICOL AND ICOL < -1",
"-1 <= ICOL AND ICOL < 1",
"1 <= ICOL AND ICOL < 3",
"3 <= ICOL AND ICOL <= 5"
});
}
public void testNumericUnevenPartition() throws Exception {
MutableContext context = new MutableMapContext();
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL");
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(START)));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(START + NUMBER_OF_ROWS - 1)));
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
Partitioner partitioner = new GenericJdbcPartitioner();
PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[]{
"-5 <= DCOL AND DCOL < -2",
"-2 <= DCOL AND DCOL < 1",
"1 <= DCOL AND DCOL <= 5"
});
}
public void testNumericSinglePartition() throws Exception {
MutableContext context = new MutableMapContext();
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL");
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(START)));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(START)));
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
Partitioner partitioner = new GenericJdbcPartitioner();
PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[]{
"DCOL = -5",
});
}
public void testDatePartition() throws Exception {
MutableContext context = new MutableMapContext();
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL");
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.DATE));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
Date.valueOf("2004-10-20").toString());
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_MAXVALUE, Date.valueOf("2013-10-17")
.toString());
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
Partitioner partitioner = new GenericJdbcPartitioner();
PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[]{
"'2004-10-20' <= DCOL AND DCOL < '2007-10-19'",
"'2007-10-19' <= DCOL AND DCOL < '2010-10-18'",
"'2010-10-18' <= DCOL AND DCOL <= '2013-10-17'",
});
}
public void testTimePartition() throws Exception {
MutableContext context = new MutableMapContext();
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TCOL");
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIME));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
Time.valueOf("01:01:01").toString());
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
Time.valueOf("10:40:50").toString());
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
Partitioner partitioner = new GenericJdbcPartitioner();
PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[]{
"'01:01:01' <= TCOL AND TCOL < '04:14:17'",
"'04:14:17' <= TCOL AND TCOL < '07:27:33'",
"'07:27:33' <= TCOL AND TCOL <= '10:40:50'",
});
}
public void testTimestampPartition() throws Exception {
MutableContext context = new MutableMapContext();
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TSCOL");
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIMESTAMP));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
Timestamp.valueOf("2013-01-01 01:01:01.123").toString());
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
Timestamp.valueOf("2013-12-31 10:40:50.654").toString());
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
Partitioner partitioner = new GenericJdbcPartitioner();
PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[]{
"'2013-01-01 01:01:01.123' <= TSCOL AND TSCOL < '2013-05-02 12:14:17.634'",
"'2013-05-02 12:14:17.634' <= TSCOL AND TSCOL < '2013-08-31 23:27:34.144'",
"'2013-08-31 23:27:34.144' <= TSCOL AND TSCOL <= '2013-12-31 10:40:50.654'",
});
}
public void testBooleanPartition() throws Exception {
MutableContext context = new MutableMapContext();
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "BCOL");
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.BOOLEAN));
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_MINVALUE, "0");
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_MAXVALUE, "1");
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
Partitioner partitioner = new GenericJdbcPartitioner();
PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[]{
"BCOL = TRUE",
"BCOL = FALSE",
});
}
public void testVarcharPartition() throws Exception {
MutableContext context = new MutableMapContext();
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL");
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR));
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_MINVALUE, "A");
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_MAXVALUE, "Z");
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
Partitioner partitioner = new GenericJdbcPartitioner();
PartitionerContext partitionerContext = new PartitionerContext(context, 25, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
"'A' <= VCCOL AND VCCOL < 'B'",
"'B' <= VCCOL AND VCCOL < 'C'",
"'C' <= VCCOL AND VCCOL < 'D'",
"'D' <= VCCOL AND VCCOL < 'E'",
"'E' <= VCCOL AND VCCOL < 'F'",
"'F' <= VCCOL AND VCCOL < 'G'",
"'G' <= VCCOL AND VCCOL < 'H'",
"'H' <= VCCOL AND VCCOL < 'I'",
"'I' <= VCCOL AND VCCOL < 'J'",
"'J' <= VCCOL AND VCCOL < 'K'",
"'K' <= VCCOL AND VCCOL < 'L'",
"'L' <= VCCOL AND VCCOL < 'M'",
"'M' <= VCCOL AND VCCOL < 'N'",
"'N' <= VCCOL AND VCCOL < 'O'",
"'O' <= VCCOL AND VCCOL < 'P'",
"'P' <= VCCOL AND VCCOL < 'Q'",
"'Q' <= VCCOL AND VCCOL < 'R'",
"'R' <= VCCOL AND VCCOL < 'S'",
"'S' <= VCCOL AND VCCOL < 'T'",
"'T' <= VCCOL AND VCCOL < 'U'",
"'U' <= VCCOL AND VCCOL < 'V'",
"'V' <= VCCOL AND VCCOL < 'W'",
"'W' <= VCCOL AND VCCOL < 'X'",
"'X' <= VCCOL AND VCCOL < 'Y'",
"'Y' <= VCCOL AND VCCOL <= 'Z'",
});
}
public void testVarcharPartition2() throws Exception {
MutableContext context = new MutableMapContext();
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL");
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR));
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_MINVALUE, "Breezy Badger");
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_MAXVALUE, "Warty Warthog");
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
Partitioner partitioner = new GenericJdbcPartitioner();
PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
assertEquals(partitions.size(), 5);
// First partition needs to contain entire upper bound
assertTrue(partitions.get(0).toString().contains("Breezy Badger"));
// Last partition needs to contain entire lower bound
assertTrue(partitions.get(4).toString().contains("Warty Warthog"));
}
public void testVarcharPartitionWithCommonPrefix() throws Exception {
MutableContext context = new MutableMapContext();
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL");
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR));
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_MINVALUE, "AAA");
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_MAXVALUE, "AAF");
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
Partitioner partitioner = new GenericJdbcPartitioner();
PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
"'AAA' <= VCCOL AND VCCOL < 'AAB'",
"'AAB' <= VCCOL AND VCCOL < 'AAC'",
"'AAC' <= VCCOL AND VCCOL < 'AAD'",
"'AAD' <= VCCOL AND VCCOL < 'AAE'",
"'AAE' <= VCCOL AND VCCOL <= 'AAF'",
});
}
public void testPatitionWithNullValues() throws Exception {
MutableContext context = new MutableMapContext();
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL");
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR));
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_MINVALUE, "AAA");
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_MAXVALUE, "AAE");
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
jobConf.fromTable.partitionColumnNull = true;
Partitioner partitioner = new GenericJdbcPartitioner();
PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
"VCCOL IS NULL",
"'AAA' <= VCCOL AND VCCOL < 'AAB'",
"'AAB' <= VCCOL AND VCCOL < 'AAC'",
"'AAC' <= VCCOL AND VCCOL < 'AAD'",
"'AAD' <= VCCOL AND VCCOL <= 'AAE'",
});
}
private void verifyResult(List<Partition> partitions,
String[] expected) {
assertEquals(expected.length, partitions.size());
Iterator<Partition> iterator = partitions.iterator();
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i],
((GenericJdbcPartition)iterator.next()).getConditions());
}
}
}

View File

@ -0,0 +1,362 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc;
import junit.framework.TestCase;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.Validation;
public class TestToInitializer extends TestCase {
private final String schemaName;
private final String tableName;
private final String schemalessTableName;
private final String stageTableName;
private final String tableSql;
private final String schemalessTableSql;
private final String tableColumns;
private GenericJdbcExecutor executor;
public TestToInitializer() {
schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA";
tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA";
schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE";
stageTableName = getClass().getSimpleName().toUpperCase() +
"_STAGE_TABLE";
tableSql = "INSERT INTO " + tableName + " VALUES (?,?,?)";
schemalessTableSql = "INSERT INTO " + schemalessTableName + " VALUES (?,?,?)";
tableColumns = "ICOL,VCOL";
}
@Override
public void setUp() {
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
GenericJdbcTestConstants.URL, null, null);
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
if (!executor.existTable(tableName)) {
executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName));
executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
}
fullTableName = executor.delimitIdentifier(schemalessTableName);
if (!executor.existTable(schemalessTableName)) {
executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
}
}
@Override
public void tearDown() {
executor.close();
}
@SuppressWarnings("unchecked")
public void testTableName() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ToJobConfiguration jobConf = new ToJobConfiguration();
String fullTableName = executor.delimitIdentifier(schemalessTableName);
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.toTable.tableName = schemalessTableName;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcToInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)");
}
@SuppressWarnings("unchecked")
public void testTableNameWithTableColumns() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ToJobConfiguration jobConf = new ToJobConfiguration();
String fullTableName = executor.delimitIdentifier(schemalessTableName);
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.toTable.tableName = schemalessTableName;
jobConf.toTable.columns = tableColumns;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcToInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)");
}
@SuppressWarnings("unchecked")
public void testTableSql() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ToJobConfiguration jobConf = new ToJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.toTable.sql = schemalessTableSql;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcToInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(schemalessTableName) + " VALUES (?,?,?)");
}
@SuppressWarnings("unchecked")
public void testTableNameWithSchema() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ToJobConfiguration jobConf = new ToJobConfiguration();
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.toTable.schemaName = schemaName;
jobConf.toTable.tableName = tableName;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcToInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)");
}
@SuppressWarnings("unchecked")
public void testTableNameWithTableColumnsWithSchema() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ToJobConfiguration jobConf = new ToJobConfiguration();
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.toTable.schemaName = schemaName;
jobConf.toTable.tableName = tableName;
jobConf.toTable.columns = tableColumns;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcToInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)");
}
@SuppressWarnings("unchecked")
public void testTableSqlWithSchema() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ToJobConfiguration jobConf = new ToJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.toTable.schemaName = schemaName;
jobConf.toTable.sql = tableSql;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcToInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?)");
}
private void verifyResult(MutableContext context, String dataSql) {
assertEquals(dataSql, context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL));
}
private void createTable(String tableName) {
try {
executor.executeUpdate("DROP TABLE " + tableName);
} catch(SqoopException e) {
//Ok to fail as the table might not exist
}
executor.executeUpdate("CREATE TABLE " + tableName +
"(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
}
public void testNonExistingStageTable() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ToJobConfiguration jobConf = new ToJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.toTable.tableName = schemalessTableName;
jobConf.toTable.stageTableName = stageTableName;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcToInitializer();
try {
initializer.initialize(initializerContext, connConf, jobConf);
fail("Initialization should fail for non-existing stage table.");
} catch(SqoopException se) {
//expected
}
}
@SuppressWarnings("unchecked")
public void testNonEmptyStageTable() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ToJobConfiguration jobConf = new ToJobConfiguration();
String fullStageTableName = executor.delimitIdentifier(stageTableName);
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.toTable.tableName = schemalessTableName;
jobConf.toTable.stageTableName = stageTableName;
createTable(fullStageTableName);
executor.executeUpdate("INSERT INTO " + fullStageTableName +
" VALUES(1, 1.1, 'one')");
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcToInitializer();
try {
initializer.initialize(initializerContext, connConf, jobConf);
fail("Initialization should fail for non-empty stage table.");
} catch(SqoopException se) {
//expected
}
}
@SuppressWarnings("unchecked")
public void testClearStageTableValidation() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ToJobConfiguration jobConf = new ToJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
//specifying clear stage table flag without specifying name of
// the stage table
jobConf.toTable.tableName = schemalessTableName;
jobConf.toTable.clearStageTable = false;
GenericJdbcValidator validator = new GenericJdbcValidator();
Validation validation = validator.validateJob(jobConf);
assertEquals("User should not specify clear stage table flag without " +
"specifying name of the stage table",
Status.UNACCEPTABLE,
validation.getStatus());
assertTrue(validation.getMessages().containsKey(
new Validation.FormInput("toTable")));
jobConf.toTable.clearStageTable = true;
validation = validator.validateJob(jobConf);
assertEquals("User should not specify clear stage table flag without " +
"specifying name of the stage table",
Status.UNACCEPTABLE,
validation.getStatus());
assertTrue(validation.getMessages().containsKey(
new Validation.FormInput("toTable")));
}
@SuppressWarnings("unchecked")
public void testStageTableWithoutTable() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ToJobConfiguration jobConf = new ToJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
//specifying stage table without specifying table name
jobConf.toTable.stageTableName = stageTableName;
jobConf.toTable.sql = "";
GenericJdbcValidator validator = new GenericJdbcValidator();
Validation validation = validator.validateJob(jobConf);
assertEquals("Stage table name cannot be specified without specifying " +
"table name", Status.UNACCEPTABLE, validation.getStatus());
assertTrue(validation.getMessages().containsKey(
new Validation.FormInput("toTable")));
}
@SuppressWarnings("unchecked")
public void testClearStageTable() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ToJobConfiguration jobConf = new ToJobConfiguration();
String fullStageTableName = executor.delimitIdentifier(stageTableName);
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.toTable.tableName = schemalessTableName;
jobConf.toTable.stageTableName = stageTableName;
jobConf.toTable.clearStageTable = true;
createTable(fullStageTableName);
executor.executeUpdate("INSERT INTO " + fullStageTableName +
" VALUES(1, 1.1, 'one')");
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcToInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
assertEquals("Stage table should have been cleared", 0,
executor.getTableRowCount(stageTableName));
}
@SuppressWarnings("unchecked")
public void testStageTable() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ToJobConfiguration jobConf = new ToJobConfiguration();
String fullStageTableName = executor.delimitIdentifier(stageTableName);
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.toTable.tableName = schemalessTableName;
jobConf.toTable.stageTableName = stageTableName;
createTable(fullStageTableName);
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcToInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context, "INSERT INTO " + fullStageTableName +
" VALUES (?,?,?)");
}
}