5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-08 04:41:31 +08:00

SQOOP-1384: Sqoop2: From/To: Refactor Generic JDBC Connector

(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2014-08-13 16:48:27 -07:00
parent a8fb4df820
commit 2b6447df5d
16 changed files with 204 additions and 194 deletions

View File

@ -68,11 +68,11 @@ public List<String> getJars(InitializerContext context, ConnectionConfiguration
public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, FromJobConfiguration fromJobConfiguration) {
configureJdbcProperties(context.getContext(), connectionConfiguration, fromJobConfiguration);
String schemaName = fromJobConfiguration.table.tableName;
String schemaName = fromJobConfiguration.fromTable.tableName;
if(schemaName == null) {
schemaName = "Query";
} else if(fromJobConfiguration.table.schemaName != null) {
schemaName = fromJobConfiguration.table.schemaName + "." + schemaName;
} else if(fromJobConfiguration.fromTable.schemaName != null) {
schemaName = fromJobConfiguration.fromTable.schemaName + "." + schemaName;
}
Schema schema = new Schema(schemaName);
@ -129,12 +129,12 @@ private void configureJdbcProperties(MutableContext context, ConnectionConfigura
private void configurePartitionProperties(MutableContext context, ConnectionConfiguration connectionConfig, FromJobConfiguration jobConfig) {
// ----- configure column name -----
String partitionColumnName = jobConfig.table.partitionColumn;
String partitionColumnName = jobConfig.fromTable.partitionColumn;
if (partitionColumnName == null) {
// if column is not specified by the user,
// find the primary key of the table (when there is a table).
String tableName = jobConfig.table.tableName;
// find the primary key of the fromTable (when there is a fromTable).
String tableName = jobConfig.fromTable.tableName;
if (tableName != null) {
partitionColumnName = executor.getPrimaryKey(tableName);
}
@ -152,22 +152,22 @@ private void configurePartitionProperties(MutableContext context, ConnectionConf
// ----- configure column type, min value, and max value -----
String minMaxQuery = jobConfig.table.boundaryQuery;
String minMaxQuery = jobConfig.fromTable.boundaryQuery;
if (minMaxQuery == null) {
StringBuilder builder = new StringBuilder();
String schemaName = jobConfig.table.schemaName;
String tableName = jobConfig.table.tableName;
String tableSql = jobConfig.table.sql;
String schemaName = jobConfig.fromTable.schemaName;
String tableName = jobConfig.fromTable.tableName;
String tableSql = jobConfig.fromTable.sql;
if (tableName != null && tableSql != null) {
// when both table name and table sql are specified:
// when both fromTable name and fromTable sql are specified:
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
} else if (tableName != null) {
// when table name is specified:
// when fromTable name is specified:
// For databases that support schemas (IE: postgresql).
String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
@ -235,18 +235,18 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur
String dataSql;
String fieldNames;
String schemaName = jobConfig.table.schemaName;
String tableName = jobConfig.table.tableName;
String tableSql = jobConfig.table.sql;
String tableColumns = jobConfig.table.columns;
String schemaName = jobConfig.fromTable.schemaName;
String tableName = jobConfig.fromTable.tableName;
String tableSql = jobConfig.fromTable.sql;
String tableColumns = jobConfig.fromTable.columns;
if (tableName != null && tableSql != null) {
// when both table name and table sql are specified:
// when both fromTable name and fromTable sql are specified:
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
} else if (tableName != null) {
// when table name is specified:
// when fromTable name is specified:
// For databases that support schemas (IE: postgresql).
String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
@ -276,7 +276,7 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur
fieldNames = tableColumns;
}
} else if (tableSql != null) {
// when table sql is specified:
// when fromTable sql is specified:
assert tableSql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);

View File

@ -56,7 +56,7 @@ public List<Partition> getPartitions(PartitionerContext context,ConnectionConfig
partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE);
partitionMaxValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE);
partitionColumnNull = job.table.partitionColumnNull;
partitionColumnNull = job.fromTable.partitionColumnNull;
if (partitionColumnNull == null) {
partitionColumnNull = false;
}

View File

@ -31,8 +31,8 @@ public class GenericJdbcToDestroyer extends Destroyer<ConnectionConfiguration, T
public void destroy(DestroyerContext context, ConnectionConfiguration connection, ToJobConfiguration job) {
LOG.info("Running generic JDBC connector destroyer");
final String tableName = job.table.tableName;
final String stageTableName = job.table.stageTableName;
final String tableName = job.toTable.tableName;
final String stageTableName = job.toTable.stageTableName;
final boolean stageEnabled = stageTableName != null &&
stageTableName.length() > 0;
if(stageEnabled) {
@ -50,11 +50,11 @@ private void moveDataToDestinationTable(ConnectionConfiguration connectorConf,
connectorConf.connection.password);
if(success) {
LOG.info("Job completed, transferring data from stage table to " +
"destination table.");
LOG.info("Job completed, transferring data from stage fromTable to " +
"destination fromTable.");
executor.migrateData(stageTableName, tableName);
} else {
LOG.warn("Job failed, clearing stage table.");
LOG.warn("Job failed, clearing stage fromTable.");
executor.deleteTableData(stageTableName);
}
}

View File

@ -65,15 +65,15 @@ public List<String> getJars(InitializerContext context, ConnectionConfiguration
public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ToJobConfiguration toJobConfiguration) {
configureJdbcProperties(context.getContext(), connectionConfiguration, toJobConfiguration);
String schemaName = toJobConfiguration.table.tableName;
String schemaName = toJobConfiguration.toTable.tableName;
if (schemaName == null) {
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0019,
"Table name extraction not supported yet.");
}
if(toJobConfiguration.table.schemaName != null) {
schemaName = toJobConfiguration.table.schemaName + "." + schemaName;
if(toJobConfiguration.toTable.schemaName != null) {
schemaName = toJobConfiguration.toTable.schemaName + "." + schemaName;
}
Schema schema = new Schema(schemaName);
@ -127,23 +127,23 @@ private void configureJdbcProperties(MutableContext context, ConnectionConfigura
private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ToJobConfiguration jobConfig) {
String dataSql;
String schemaName = jobConfig.table.schemaName;
String tableName = jobConfig.table.tableName;
String stageTableName = jobConfig.table.stageTableName;
boolean clearStageTable = jobConfig.table.clearStageTable == null ?
false : jobConfig.table.clearStageTable;
String schemaName = jobConfig.toTable.schemaName;
String tableName = jobConfig.toTable.tableName;
String stageTableName = jobConfig.toTable.stageTableName;
boolean clearStageTable = jobConfig.toTable.clearStageTable == null ?
false : jobConfig.toTable.clearStageTable;
final boolean stageEnabled =
stageTableName != null && stageTableName.length() > 0;
String tableSql = jobConfig.table.sql;
String tableColumns = jobConfig.table.columns;
String tableSql = jobConfig.toTable.sql;
String tableColumns = jobConfig.toTable.columns;
if (tableName != null && tableSql != null) {
// when both table name and table sql are specified:
// when both fromTable name and fromTable sql are specified:
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
} else if (tableName != null) {
// when table name is specified:
// when fromTable name is specified:
if(stageEnabled) {
LOG.info("Stage has been enabled.");
LOG.info("Use stageTable: " + stageTableName +
@ -195,7 +195,7 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur
dataSql = builder.toString();
}
} else if (tableSql != null) {
// when table sql is specified:
// when fromTable sql is specified:
if (tableSql.indexOf(
GenericJdbcConnectorConstants.SQL_PARAMETER_MARKER) == -1) {

View File

@ -74,22 +74,22 @@ private Validation validateExportJob(Object jobConfiguration) {
Validation validation = new Validation(ToJobConfiguration.class);
ToJobConfiguration configuration = (ToJobConfiguration)jobConfiguration;
if(configuration.table.tableName == null && configuration.table.sql == null) {
validation.addMessage(Status.UNACCEPTABLE, "table", "Either table name or SQL must be specified");
if(configuration.toTable.tableName == null && configuration.toTable.sql == null) {
validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Either fromTable name or SQL must be specified");
}
if(configuration.table.tableName != null && configuration.table.sql != null) {
validation.addMessage(Status.UNACCEPTABLE, "table", "Both table name and SQL cannot be specified");
if(configuration.toTable.tableName != null && configuration.toTable.sql != null) {
validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Both fromTable name and SQL cannot be specified");
}
if(configuration.table.tableName == null &&
configuration.table.stageTableName != null) {
validation.addMessage(Status.UNACCEPTABLE, "table",
"Stage table name cannot be specified without specifying table name");
if(configuration.toTable.tableName == null &&
configuration.toTable.stageTableName != null) {
validation.addMessage(Status.UNACCEPTABLE, "fromTable",
"Stage fromTable name cannot be specified without specifying fromTable name");
}
if(configuration.table.stageTableName == null &&
configuration.table.clearStageTable != null) {
validation.addMessage(Status.UNACCEPTABLE, "table",
"Clear stage table cannot be specified without specifying name of " +
"the stage table.");
if(configuration.toTable.stageTableName == null &&
configuration.toTable.clearStageTable != null) {
validation.addMessage(Status.UNACCEPTABLE, "fromTable",
"Clear stage fromTable cannot be specified without specifying name of " +
"the stage fromTable.");
}
return validation;
@ -99,18 +99,18 @@ private Validation validateImportJob(Object jobConfiguration) {
Validation validation = new Validation(FromJobConfiguration.class);
FromJobConfiguration configuration = (FromJobConfiguration)jobConfiguration;
if(configuration.table.tableName == null && configuration.table.sql == null) {
validation.addMessage(Status.UNACCEPTABLE, "table", "Either table name or SQL must be specified");
if(configuration.fromTable.tableName == null && configuration.fromTable.sql == null) {
validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Either fromTable name or SQL must be specified");
}
if(configuration.table.tableName != null && configuration.table.sql != null) {
validation.addMessage(Status.UNACCEPTABLE, "table", "Both table name and SQL cannot be specified");
if(configuration.fromTable.tableName != null && configuration.fromTable.sql != null) {
validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Both fromTable name and SQL cannot be specified");
}
if(configuration.table.schemaName != null && configuration.table.sql != null) {
validation.addMessage(Status.UNACCEPTABLE, "table", "Both schema name and SQL cannot be specified");
if(configuration.fromTable.schemaName != null && configuration.fromTable.sql != null) {
validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Both schema name and SQL cannot be specified");
}
if(configuration.table.sql != null && !configuration.table.sql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN)) {
validation.addMessage(Status.UNACCEPTABLE, "table", "sql", "SQL statement must contain placeholder for auto generated "
if(configuration.fromTable.sql != null && !configuration.fromTable.sql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN)) {
validation.addMessage(Status.UNACCEPTABLE, "fromTable", "sql", "SQL statement must contain placeholder for auto generated "
+ "conditions - " + GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
}

View File

@ -25,9 +25,9 @@
*/
@ConfigurationClass
public class FromJobConfiguration {
@Form public FromTableForm table;
@Form public FromTableForm fromTable;
public FromJobConfiguration() {
table = new FromTableForm();
fromTable = new FromTableForm();
}
}

View File

@ -25,9 +25,9 @@
*/
@ConfigurationClass
public class ToJobConfiguration {
@Form public ToTableForm table;
@Form public ToTableForm toTable;
public ToJobConfiguration() {
table = new ToTableForm();
toTable = new ToTableForm();
}
}

View File

@ -47,55 +47,77 @@ connection.jdbcProperties.label = JDBC Connection Properties
connection.jdbcProperties.help = Enter any JDBC properties that should be \
supplied during the creation of connection.
# Table Form
# From Table Form
#
table.label = Database configuration
table.help = You must supply the information requested in order to create \
a job object.
fromTable.label = From database configuration
fromTable.help = You must supply the information requested in order to create \
a job object.
# Schema name
table.schemaName.label = Schema name
table.schemaName.help = Schema name to process data in the remote database
# From schema name
fromTable.schemaName.label = Schema name
fromTable.schemaName.help = Schema name to process data in the remote database
# Table name
table.tableName.label = Table name
table.tableName.help = Table name to process data in the remote database
# From table name
fromTable.tableName.label = Table name
fromTable.tableName.help = Table name to process data in the remote database
# Table SQL
table.sql.label = Table SQL statement
table.sql.help = SQL statement to process data in the remote database
# From table SQL
fromTable.sql.label = Table SQL statement
fromTable.sql.help = SQL statement to process data in the remote database
# Table columns
table.columns.label = Table column names
table.columns.help = Specific columns of a table name or a table SQL
# From table columns
fromTable.columns.label = Table column names
fromTable.columns.help = Specific columns of a table name or a table SQL
# Table warehouse
table.warehouse.label = Data warehouse
table.warehouse.help = The root directory for data
# From table warehouse
fromTable.warehouse.label = Data warehouse
fromTable.warehouse.help = The root directory for data
# Stage table name
table.stageTableName.label = Stage table name
table.stageTableName.help = Name of the stage table to use
# From table datadir
fromTable.dataDirectory.label = Data directory
fromTable.dataDirectory.help = The sub-directory under warehouse for data
# Clear stage table
table.clearStageTable.label = Clear stage table
table.clearStageTable.help = Indicate if the stage table should be cleared
# From table pcol
fromTable.partitionColumn.label = Partition column name
fromTable.partitionColumn.help = A specific column for data partition
# Table datadir
table.dataDirectory.label = Data directory
table.dataDirectory.help = The sub-directory under warehouse for data
# From table pcol is null
fromTable.partitionColumnNull.label = Nulls in partition column
fromTable.partitionColumnNull.help = Whether there are null values in partition column
# Table pcol
table.partitionColumn.label = Partition column name
table.partitionColumn.help = A specific column for data partition
# From table boundary
fromTable.boundaryQuery.label = Boundary query
fromTable.boundaryQuery.help = The boundary query for data partition
# Table pcol is null
table.partitionColumnNull.label = Nulls in partition column
table.partitionColumnNull.help = Whether there are null values in partition column
# To table form
#
toTable.label = To database configuration
toTable.help = You must supply the information requested in order to create \
a job object.
# Table boundary
table.boundaryQuery.label = Boundary query
table.boundaryQuery.help = The boundary query for data partition
# From schema name
toTable.schemaName.label = Schema name
toTable.schemaName.help = Schema name to process data in the remote database
# From table name
toTable.tableName.label = Table name
toTable.tableName.help = Table name to process data in the remote database
# From table SQL
toTable.sql.label = Table SQL statement
toTable.sql.help = SQL statement to process data in the remote database
# From table columns
toTable.columns.label = Table column names
toTable.columns.help = Specific columns of a table name or a table SQL
# To stage table name
toTable.stageTableName.label = Stage table name
toTable.stageTableName.help = Name of the stage table to use
# To clear stage table
toTable.clearStageTable.label = Clear stage table
toTable.clearStageTable.help = Indicate if the stage table should be cleared
# Placeholders to have some entities created
ignored.label = Ignored

View File

@ -20,7 +20,7 @@
import junit.framework.TestCase;
public class GenericJdbcExecutorTest extends TestCase {
// private final String table;
// private final String fromTable;
// private final String emptyTable;
// private final GenericJdbcExecutor executor;
//
@ -28,8 +28,8 @@ public class GenericJdbcExecutorTest extends TestCase {
// private static final int NUMBER_OF_ROWS = 974;
//
// public GenericJdbcExecutorTest() {
// table = getClass().getSimpleName().toUpperCase();
// emptyTable = table + "_EMPTY";
// fromTable = getClass().getSimpleName().toUpperCase();
// emptyTable = fromTable + "_EMPTY";
// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
// GenericJdbcTestConstants.URL, null, null);
// }
@ -42,15 +42,15 @@ public class GenericJdbcExecutorTest extends TestCase {
// executor.executeUpdate("CREATE TABLE "
// + emptyTable + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
//
// if(executor.existTable(table)) {
// executor.executeUpdate("DROP TABLE " + table);
// if(executor.existTable(fromTable)) {
// executor.executeUpdate("DROP TABLE " + fromTable);
// }
// executor.executeUpdate("CREATE TABLE "
// + table + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
// + fromTable + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
//
// for (int i = 0; i < NUMBER_OF_ROWS; i++) {
// int value = START + i;
// String sql = "INSERT INTO " + table
// String sql = "INSERT INTO " + fromTable
// + " VALUES(" + value + ", '" + value + "')";
// executor.executeUpdate(sql);
// }
@ -58,23 +58,23 @@ public class GenericJdbcExecutorTest extends TestCase {
//
// @SuppressWarnings("unchecked")
// public void testDeleteTableData() throws Exception {
// executor.deleteTableData(table);
// assertEquals("Table " + table + " is expected to be empty.",
// 0, executor.getTableRowCount(table));
// executor.deleteTableData(fromTable);
// assertEquals("Table " + fromTable + " is expected to be empty.",
// 0, executor.getTableRowCount(fromTable));
// }
//
// @SuppressWarnings("unchecked")
// public void testMigrateData() throws Exception {
// assertEquals("Table " + emptyTable + " is expected to be empty.",
// 0, executor.getTableRowCount(emptyTable));
// assertEquals("Table " + table + " is expected to have " +
// assertEquals("Table " + fromTable + " is expected to have " +
// NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS,
// executor.getTableRowCount(table));
// executor.getTableRowCount(fromTable));
//
// executor.migrateData(table, emptyTable);
// executor.migrateData(fromTable, emptyTable);
//
// assertEquals("Table " + table + " is expected to be empty.", 0,
// executor.getTableRowCount(table));
// assertEquals("Table " + fromTable + " is expected to be empty.", 0,
// executor.getTableRowCount(fromTable));
// assertEquals("Table " + emptyTable + " is expected to have " +
// NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS,
// executor.getTableRowCount(emptyTable));
@ -82,7 +82,7 @@ public class GenericJdbcExecutorTest extends TestCase {
//
// @SuppressWarnings("unchecked")
// public void testGetTableRowCount() throws Exception {
// assertEquals("Table " + table + " is expected to be empty.",
// NUMBER_OF_ROWS, executor.getTableRowCount(table));
// assertEquals("Table " + fromTable + " is expected to be empty.",
// NUMBER_OF_ROWS, executor.getTableRowCount(fromTable));
// }
}

View File

@ -83,7 +83,7 @@ public class TestExportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.tableName = schemalessTableName;
// jobConf.fromTable.tableName = schemalessTableName;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
@ -104,8 +104,8 @@ public class TestExportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.tableName = schemalessTableName;
// jobConf.table.columns = tableColumns;
// jobConf.fromTable.tableName = schemalessTableName;
// jobConf.fromTable.columns = tableColumns;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
@ -124,7 +124,7 @@ public class TestExportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.sql = schemalessTableSql;
// jobConf.fromTable.sql = schemalessTableSql;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
@ -145,8 +145,8 @@ public class TestExportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.schemaName = schemaName;
// jobConf.table.tableName = tableName;
// jobConf.fromTable.schemaName = schemaName;
// jobConf.fromTable.tableName = tableName;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
@ -167,9 +167,9 @@ public class TestExportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.schemaName = schemaName;
// jobConf.table.tableName = tableName;
// jobConf.table.columns = tableColumns;
// jobConf.fromTable.schemaName = schemaName;
// jobConf.fromTable.tableName = tableName;
// jobConf.fromTable.columns = tableColumns;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
@ -188,8 +188,8 @@ public class TestExportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.schemaName = schemaName;
// jobConf.table.sql = tableSql;
// jobConf.fromTable.schemaName = schemaName;
// jobConf.fromTable.sql = tableSql;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
@ -222,8 +222,8 @@ public class TestExportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.tableName = schemalessTableName;
// jobConf.table.stageTableName = stageTableName;
// jobConf.fromTable.tableName = schemalessTableName;
// jobConf.fromTable.stageTableName = stageTableName;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
@ -247,8 +247,8 @@ public class TestExportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.tableName = schemalessTableName;
// jobConf.table.stageTableName = stageTableName;
// jobConf.fromTable.tableName = schemalessTableName;
// jobConf.fromTable.stageTableName = stageTableName;
// createTable(fullStageTableName);
// executor.executeUpdate("INSERT INTO " + fullStageTableName +
// " VALUES(1, 1.1, 'one')");
@ -274,8 +274,8 @@ public class TestExportInitializer extends TestCase {
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// //specifying clear stage table flag without specifying name of
// // the stage table
// jobConf.table.tableName = schemalessTableName;
// jobConf.table.clearStageTable = false;
// jobConf.fromTable.tableName = schemalessTableName;
// jobConf.fromTable.clearStageTable = false;
// GenericJdbcValidator validator = new GenericJdbcValidator();
// Validation validation = validator.validateJob(MJob.Type.EXPORT, jobConf);
// assertEquals("User should not specify clear stage table flag without " +
@ -283,16 +283,16 @@ public class TestExportInitializer extends TestCase {
// Status.UNACCEPTABLE,
// validation.getStatus());
// assertTrue(validation.getMessages().containsKey(
// new Validation.FormInput("table")));
// new Validation.FormInput("fromTable")));
//
// jobConf.table.clearStageTable = true;
// jobConf.fromTable.clearStageTable = true;
// validation = validator.validateJob(MJob.Type.EXPORT, jobConf);
// assertEquals("User should not specify clear stage table flag without " +
// "specifying name of the stage table",
// Status.UNACCEPTABLE,
// validation.getStatus());
// assertTrue(validation.getMessages().containsKey(
// new Validation.FormInput("table")));
// new Validation.FormInput("fromTable")));
// }
//
// @SuppressWarnings("unchecked")
@ -303,15 +303,15 @@ public class TestExportInitializer extends TestCase {
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// //specifying stage table without specifying table name
// jobConf.table.stageTableName = stageTableName;
// jobConf.table.sql = "";
// jobConf.fromTable.stageTableName = stageTableName;
// jobConf.fromTable.sql = "";
//
// GenericJdbcValidator validator = new GenericJdbcValidator();
// Validation validation = validator.validateJob(MJob.Type.EXPORT, jobConf);
// assertEquals("Stage table name cannot be specified without specifying " +
// "table name", Status.UNACCEPTABLE, validation.getStatus());
// assertTrue(validation.getMessages().containsKey(
// new Validation.FormInput("table")));
// new Validation.FormInput("fromTable")));
// }
//
// @SuppressWarnings("unchecked")
@ -323,9 +323,9 @@ public class TestExportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.tableName = schemalessTableName;
// jobConf.table.stageTableName = stageTableName;
// jobConf.table.clearStageTable = true;
// jobConf.fromTable.tableName = schemalessTableName;
// jobConf.fromTable.stageTableName = stageTableName;
// jobConf.fromTable.clearStageTable = true;
// createTable(fullStageTableName);
// executor.executeUpdate("INSERT INTO " + fullStageTableName +
// " VALUES(1, 1.1, 'one')");
@ -348,8 +348,8 @@ public class TestExportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.tableName = schemalessTableName;
// jobConf.table.stageTableName = stageTableName;
// jobConf.fromTable.tableName = schemalessTableName;
// jobConf.fromTable.stageTableName = stageTableName;
// createTable(fullStageTableName);
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);

View File

@ -92,7 +92,7 @@ public class TestImportInitializer extends TestCase {
// }
//
// /**
// * Return Schema representation for the testing table.
// * Return Schema representation for the testing fromTable.
// *
// * @param name Name that should be used for the generated schema.
// * @return
@ -117,7 +117,7 @@ public class TestImportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.tableName = schemalessTableName;
// jobConf.fromTable.tableName = schemalessTableName;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
@ -143,8 +143,8 @@ public class TestImportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.tableName = schemalessTableName;
// jobConf.table.columns = tableColumns;
// jobConf.fromTable.tableName = schemalessTableName;
// jobConf.fromTable.columns = tableColumns;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
@ -170,8 +170,8 @@ public class TestImportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.sql = schemalessTableSql;
// jobConf.table.partitionColumn = "DCOL";
// jobConf.fromTable.sql = schemalessTableSql;
// jobConf.fromTable.partitionColumn = "DCOL";
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
@ -197,9 +197,9 @@ public class TestImportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.sql = schemalessTableSql;
// jobConf.table.columns = tableColumns;
// jobConf.table.partitionColumn = "DCOL";
// jobConf.fromTable.sql = schemalessTableSql;
// jobConf.fromTable.columns = tableColumns;
// jobConf.fromTable.partitionColumn = "DCOL";
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
@ -228,8 +228,8 @@ public class TestImportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.schemaName = schemaName;
// jobConf.table.tableName = tableName;
// jobConf.fromTable.schemaName = schemaName;
// jobConf.fromTable.tableName = tableName;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
@ -257,9 +257,9 @@ public class TestImportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.schemaName = schemaName;
// jobConf.table.tableName = tableName;
// jobConf.table.columns = tableColumns;
// jobConf.fromTable.schemaName = schemaName;
// jobConf.fromTable.tableName = tableName;
// jobConf.fromTable.columns = tableColumns;
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
@ -287,9 +287,9 @@ public class TestImportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.schemaName = schemaName;
// jobConf.table.sql = tableSql;
// jobConf.table.partitionColumn = "DCOL";
// jobConf.fromTable.schemaName = schemaName;
// jobConf.fromTable.sql = tableSql;
// jobConf.fromTable.partitionColumn = "DCOL";
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
@ -316,9 +316,9 @@ public class TestImportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.schemaName = schemaName;
// jobConf.table.tableName = tableName;
// jobConf.table.partitionColumn = "DCOL";
// jobConf.fromTable.schemaName = schemaName;
// jobConf.fromTable.tableName = tableName;
// jobConf.fromTable.partitionColumn = "DCOL";
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
@ -337,9 +337,9 @@ public class TestImportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.schemaName = schemaName;
// jobConf.table.sql = tableSql;
// jobConf.table.partitionColumn = "DCOL";
// jobConf.fromTable.schemaName = schemaName;
// jobConf.fromTable.sql = tableSql;
// jobConf.fromTable.partitionColumn = "DCOL";
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);
@ -360,10 +360,10 @@ public class TestImportInitializer extends TestCase {
//
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
// jobConf.table.schemaName = schemaName;
// jobConf.table.sql = tableSql;
// jobConf.table.columns = tableColumns;
// jobConf.table.partitionColumn = "DCOL";
// jobConf.fromTable.schemaName = schemaName;
// jobConf.fromTable.sql = tableSql;
// jobConf.fromTable.columns = tableColumns;
// jobConf.fromTable.partitionColumn = "DCOL";
//
// MutableContext context = new MutableMapContext();
// InitializerContext initializerContext = new InitializerContext(context);

View File

@ -475,7 +475,7 @@ public class TestImportPartitioner extends TestCase {
//
// ConnectionConfiguration connConf = new ConnectionConfiguration();
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
// jobConf.table.partitionColumnNull = true;
// jobConf.fromTable.partitionColumnNull = true;
//
// Partitioner partitioner = new GenericJdbcImportPartitioner();
// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);

View File

@ -160,7 +160,7 @@ public static void setFrameworkConnectionConfig(ConnectorType type, Job job, Obj
* @param job MapReduce job object
* @param obj Configuration object
*/
public static void setConfigFrameworkJob(Job job, Object obj) {
public static void setFrameworkJobConfig(Job job, Object obj) {
job.getConfiguration().set(JOB_CONFIG_CLASS_FRAMEWORK_JOB, obj.getClass().getName());
job.getCredentials().addSecretKey(JOB_CONFIG_FRAMEWORK_JOB_KEY, FormUtils.toJson(obj).getBytes());
}
@ -225,7 +225,7 @@ public static Object getFrameworkConnectionConfig(ConnectorType type, Configurat
* @param configuration MapReduce configuration object
* @return Configuration object
*/
public static Object getConfigFrameworkJob(Configuration configuration) {
public static Object getFrameworkJobConfig(Configuration configuration) {
return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FRAMEWORK_JOB, JOB_CONFIG_FRAMEWORK_JOB_KEY);
}
@ -248,22 +248,10 @@ public static void setConnectorSchema(ConnectorType type, Job job, Schema schema
}
/**
* Persist Framework generated schema.
*
* @param job MapReduce Job object
* @param schema Schema
*/
public static void setHioSchema(Job job, Schema schema) {
if(schema != null) {
job.getCredentials().addSecretKey(SCHEMA_HIO_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
}
}
/**
* Retrieve From Connector generated schema.
* Retrieve Connector generated schema.
*
* @param type The FROM or TO connector
* @param configuration MapReduce configuration object
* @return Schema
*/
public static Schema getConnectorSchema(ConnectorType type, Configuration configuration) {
switch (type) {

View File

@ -68,7 +68,7 @@ public class MapreduceExecutionEngineTest {
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
// jobConf.output.outputFormat = OutputFormat.TEXT_FILE;
// jobConf.output.compression = comprssionFormat;
// request.setConfigFrameworkJob(jobConf);
// request.setFrameworkJobConfig(jobConf);
// request.setConnectorCallbacks(new Importer(Initializer.class,
// Partitioner.class, Extractor.class, Destroyer.class) {
// });
@ -95,7 +95,7 @@ public class MapreduceExecutionEngineTest {
// jobConf.output.outputFormat = OutputFormat.TEXT_FILE;
// jobConf.output.compression = OutputCompression.CUSTOM;
// jobConf.output.customCompression = customCodecName;
// request.setConfigFrameworkJob(jobConf);
// request.setFrameworkJobConfig(jobConf);
// request.setConnectorCallbacks(new Importer(Initializer.class,
// Partitioner.class, Extractor.class, Destroyer.class) {
// });

View File

@ -89,9 +89,9 @@ public class TestConfigurationUtils {
//
// @Test
// public void testConfigFrameworkJob() throws Exception {
// ConfigurationUtils.setConfigFrameworkJob(job, getConfig());
// ConfigurationUtils.setFrameworkJobConfig(job, getConfig());
// setUpJobConf();
// assertEquals(getConfig(), ConfigurationUtils.getConfigFrameworkJob(jobConf));
// assertEquals(getConfig(), ConfigurationUtils.getFrameworkJobConfig(jobConf));
// }
//
// @Test

View File

@ -206,7 +206,7 @@ public boolean submit(SubmissionRequest generalRequest) {
ConfigurationUtils.setConnectorJobConfig(ConnectorType.TO, job, request.getConnectorJobConfig(ConnectorType.TO));
ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.FROM, job, request.getFrameworkConnectionConfig(ConnectorType.FROM));
ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.TO, job, request.getFrameworkConnectionConfig(ConnectorType.TO));
ConfigurationUtils.setConfigFrameworkJob(job, request.getConfigFrameworkJob());
ConfigurationUtils.setFrameworkJobConfig(job, request.getConfigFrameworkJob());
// @TODO(Abe): Persist TO schema.
ConfigurationUtils.setConnectorSchema(ConnectorType.FROM, job, request.getSummary().getConnectorSchema());