mirror of
https://github.com/apache/sqoop.git
synced 2025-05-09 00:29:40 +08:00
SQOOP-1384: Sqoop2: From/To: Refactor Generic JDBC Connector
(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
parent
1d4b9e96cf
commit
153b6badf7
@ -68,11 +68,11 @@ public List<String> getJars(InitializerContext context, ConnectionConfiguration
|
|||||||
public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, FromJobConfiguration fromJobConfiguration) {
|
public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, FromJobConfiguration fromJobConfiguration) {
|
||||||
configureJdbcProperties(context.getContext(), connectionConfiguration, fromJobConfiguration);
|
configureJdbcProperties(context.getContext(), connectionConfiguration, fromJobConfiguration);
|
||||||
|
|
||||||
String schemaName = fromJobConfiguration.table.tableName;
|
String schemaName = fromJobConfiguration.fromTable.tableName;
|
||||||
if(schemaName == null) {
|
if(schemaName == null) {
|
||||||
schemaName = "Query";
|
schemaName = "Query";
|
||||||
} else if(fromJobConfiguration.table.schemaName != null) {
|
} else if(fromJobConfiguration.fromTable.schemaName != null) {
|
||||||
schemaName = fromJobConfiguration.table.schemaName + "." + schemaName;
|
schemaName = fromJobConfiguration.fromTable.schemaName + "." + schemaName;
|
||||||
}
|
}
|
||||||
|
|
||||||
Schema schema = new Schema(schemaName);
|
Schema schema = new Schema(schemaName);
|
||||||
@ -129,12 +129,12 @@ private void configureJdbcProperties(MutableContext context, ConnectionConfigura
|
|||||||
private void configurePartitionProperties(MutableContext context, ConnectionConfiguration connectionConfig, FromJobConfiguration jobConfig) {
|
private void configurePartitionProperties(MutableContext context, ConnectionConfiguration connectionConfig, FromJobConfiguration jobConfig) {
|
||||||
// ----- configure column name -----
|
// ----- configure column name -----
|
||||||
|
|
||||||
String partitionColumnName = jobConfig.table.partitionColumn;
|
String partitionColumnName = jobConfig.fromTable.partitionColumn;
|
||||||
|
|
||||||
if (partitionColumnName == null) {
|
if (partitionColumnName == null) {
|
||||||
// if column is not specified by the user,
|
// if column is not specified by the user,
|
||||||
// find the primary key of the table (when there is a table).
|
// find the primary key of the fromTable (when there is a fromTable).
|
||||||
String tableName = jobConfig.table.tableName;
|
String tableName = jobConfig.fromTable.tableName;
|
||||||
if (tableName != null) {
|
if (tableName != null) {
|
||||||
partitionColumnName = executor.getPrimaryKey(tableName);
|
partitionColumnName = executor.getPrimaryKey(tableName);
|
||||||
}
|
}
|
||||||
@ -152,22 +152,22 @@ private void configurePartitionProperties(MutableContext context, ConnectionConf
|
|||||||
|
|
||||||
// ----- configure column type, min value, and max value -----
|
// ----- configure column type, min value, and max value -----
|
||||||
|
|
||||||
String minMaxQuery = jobConfig.table.boundaryQuery;
|
String minMaxQuery = jobConfig.fromTable.boundaryQuery;
|
||||||
|
|
||||||
if (minMaxQuery == null) {
|
if (minMaxQuery == null) {
|
||||||
StringBuilder builder = new StringBuilder();
|
StringBuilder builder = new StringBuilder();
|
||||||
|
|
||||||
String schemaName = jobConfig.table.schemaName;
|
String schemaName = jobConfig.fromTable.schemaName;
|
||||||
String tableName = jobConfig.table.tableName;
|
String tableName = jobConfig.fromTable.tableName;
|
||||||
String tableSql = jobConfig.table.sql;
|
String tableSql = jobConfig.fromTable.sql;
|
||||||
|
|
||||||
if (tableName != null && tableSql != null) {
|
if (tableName != null && tableSql != null) {
|
||||||
// when both table name and table sql are specified:
|
// when both fromTable name and fromTable sql are specified:
|
||||||
throw new SqoopException(
|
throw new SqoopException(
|
||||||
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
|
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
|
||||||
|
|
||||||
} else if (tableName != null) {
|
} else if (tableName != null) {
|
||||||
// when table name is specified:
|
// when fromTable name is specified:
|
||||||
|
|
||||||
// For databases that support schemas (IE: postgresql).
|
// For databases that support schemas (IE: postgresql).
|
||||||
String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
|
String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
|
||||||
@ -235,18 +235,18 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur
|
|||||||
String dataSql;
|
String dataSql;
|
||||||
String fieldNames;
|
String fieldNames;
|
||||||
|
|
||||||
String schemaName = jobConfig.table.schemaName;
|
String schemaName = jobConfig.fromTable.schemaName;
|
||||||
String tableName = jobConfig.table.tableName;
|
String tableName = jobConfig.fromTable.tableName;
|
||||||
String tableSql = jobConfig.table.sql;
|
String tableSql = jobConfig.fromTable.sql;
|
||||||
String tableColumns = jobConfig.table.columns;
|
String tableColumns = jobConfig.fromTable.columns;
|
||||||
|
|
||||||
if (tableName != null && tableSql != null) {
|
if (tableName != null && tableSql != null) {
|
||||||
// when both table name and table sql are specified:
|
// when both fromTable name and fromTable sql are specified:
|
||||||
throw new SqoopException(
|
throw new SqoopException(
|
||||||
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
|
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
|
||||||
|
|
||||||
} else if (tableName != null) {
|
} else if (tableName != null) {
|
||||||
// when table name is specified:
|
// when fromTable name is specified:
|
||||||
|
|
||||||
// For databases that support schemas (IE: postgresql).
|
// For databases that support schemas (IE: postgresql).
|
||||||
String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
|
String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
|
||||||
@ -276,7 +276,7 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur
|
|||||||
fieldNames = tableColumns;
|
fieldNames = tableColumns;
|
||||||
}
|
}
|
||||||
} else if (tableSql != null) {
|
} else if (tableSql != null) {
|
||||||
// when table sql is specified:
|
// when fromTable sql is specified:
|
||||||
|
|
||||||
assert tableSql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
|
assert tableSql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
|
||||||
|
|
||||||
|
@ -56,7 +56,7 @@ public List<Partition> getPartitions(PartitionerContext context,ConnectionConfig
|
|||||||
partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE);
|
partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE);
|
||||||
partitionMaxValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE);
|
partitionMaxValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE);
|
||||||
|
|
||||||
partitionColumnNull = job.table.partitionColumnNull;
|
partitionColumnNull = job.fromTable.partitionColumnNull;
|
||||||
if (partitionColumnNull == null) {
|
if (partitionColumnNull == null) {
|
||||||
partitionColumnNull = false;
|
partitionColumnNull = false;
|
||||||
}
|
}
|
||||||
|
@ -31,8 +31,8 @@ public class GenericJdbcToDestroyer extends Destroyer<ConnectionConfiguration, T
|
|||||||
public void destroy(DestroyerContext context, ConnectionConfiguration connection, ToJobConfiguration job) {
|
public void destroy(DestroyerContext context, ConnectionConfiguration connection, ToJobConfiguration job) {
|
||||||
LOG.info("Running generic JDBC connector destroyer");
|
LOG.info("Running generic JDBC connector destroyer");
|
||||||
|
|
||||||
final String tableName = job.table.tableName;
|
final String tableName = job.toTable.tableName;
|
||||||
final String stageTableName = job.table.stageTableName;
|
final String stageTableName = job.toTable.stageTableName;
|
||||||
final boolean stageEnabled = stageTableName != null &&
|
final boolean stageEnabled = stageTableName != null &&
|
||||||
stageTableName.length() > 0;
|
stageTableName.length() > 0;
|
||||||
if(stageEnabled) {
|
if(stageEnabled) {
|
||||||
@ -50,11 +50,11 @@ private void moveDataToDestinationTable(ConnectionConfiguration connectorConf,
|
|||||||
connectorConf.connection.password);
|
connectorConf.connection.password);
|
||||||
|
|
||||||
if(success) {
|
if(success) {
|
||||||
LOG.info("Job completed, transferring data from stage table to " +
|
LOG.info("Job completed, transferring data from stage fromTable to " +
|
||||||
"destination table.");
|
"destination fromTable.");
|
||||||
executor.migrateData(stageTableName, tableName);
|
executor.migrateData(stageTableName, tableName);
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Job failed, clearing stage table.");
|
LOG.warn("Job failed, clearing stage fromTable.");
|
||||||
executor.deleteTableData(stageTableName);
|
executor.deleteTableData(stageTableName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -65,15 +65,15 @@ public List<String> getJars(InitializerContext context, ConnectionConfiguration
|
|||||||
public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ToJobConfiguration toJobConfiguration) {
|
public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ToJobConfiguration toJobConfiguration) {
|
||||||
configureJdbcProperties(context.getContext(), connectionConfiguration, toJobConfiguration);
|
configureJdbcProperties(context.getContext(), connectionConfiguration, toJobConfiguration);
|
||||||
|
|
||||||
String schemaName = toJobConfiguration.table.tableName;
|
String schemaName = toJobConfiguration.toTable.tableName;
|
||||||
|
|
||||||
if (schemaName == null) {
|
if (schemaName == null) {
|
||||||
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0019,
|
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0019,
|
||||||
"Table name extraction not supported yet.");
|
"Table name extraction not supported yet.");
|
||||||
}
|
}
|
||||||
|
|
||||||
if(toJobConfiguration.table.schemaName != null) {
|
if(toJobConfiguration.toTable.schemaName != null) {
|
||||||
schemaName = toJobConfiguration.table.schemaName + "." + schemaName;
|
schemaName = toJobConfiguration.toTable.schemaName + "." + schemaName;
|
||||||
}
|
}
|
||||||
|
|
||||||
Schema schema = new Schema(schemaName);
|
Schema schema = new Schema(schemaName);
|
||||||
@ -127,23 +127,23 @@ private void configureJdbcProperties(MutableContext context, ConnectionConfigura
|
|||||||
private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ToJobConfiguration jobConfig) {
|
private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ToJobConfiguration jobConfig) {
|
||||||
String dataSql;
|
String dataSql;
|
||||||
|
|
||||||
String schemaName = jobConfig.table.schemaName;
|
String schemaName = jobConfig.toTable.schemaName;
|
||||||
String tableName = jobConfig.table.tableName;
|
String tableName = jobConfig.toTable.tableName;
|
||||||
String stageTableName = jobConfig.table.stageTableName;
|
String stageTableName = jobConfig.toTable.stageTableName;
|
||||||
boolean clearStageTable = jobConfig.table.clearStageTable == null ?
|
boolean clearStageTable = jobConfig.toTable.clearStageTable == null ?
|
||||||
false : jobConfig.table.clearStageTable;
|
false : jobConfig.toTable.clearStageTable;
|
||||||
final boolean stageEnabled =
|
final boolean stageEnabled =
|
||||||
stageTableName != null && stageTableName.length() > 0;
|
stageTableName != null && stageTableName.length() > 0;
|
||||||
String tableSql = jobConfig.table.sql;
|
String tableSql = jobConfig.toTable.sql;
|
||||||
String tableColumns = jobConfig.table.columns;
|
String tableColumns = jobConfig.toTable.columns;
|
||||||
|
|
||||||
if (tableName != null && tableSql != null) {
|
if (tableName != null && tableSql != null) {
|
||||||
// when both table name and table sql are specified:
|
// when both fromTable name and fromTable sql are specified:
|
||||||
throw new SqoopException(
|
throw new SqoopException(
|
||||||
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
|
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
|
||||||
|
|
||||||
} else if (tableName != null) {
|
} else if (tableName != null) {
|
||||||
// when table name is specified:
|
// when fromTable name is specified:
|
||||||
if(stageEnabled) {
|
if(stageEnabled) {
|
||||||
LOG.info("Stage has been enabled.");
|
LOG.info("Stage has been enabled.");
|
||||||
LOG.info("Use stageTable: " + stageTableName +
|
LOG.info("Use stageTable: " + stageTableName +
|
||||||
@ -195,7 +195,7 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur
|
|||||||
dataSql = builder.toString();
|
dataSql = builder.toString();
|
||||||
}
|
}
|
||||||
} else if (tableSql != null) {
|
} else if (tableSql != null) {
|
||||||
// when table sql is specified:
|
// when fromTable sql is specified:
|
||||||
|
|
||||||
if (tableSql.indexOf(
|
if (tableSql.indexOf(
|
||||||
GenericJdbcConnectorConstants.SQL_PARAMETER_MARKER) == -1) {
|
GenericJdbcConnectorConstants.SQL_PARAMETER_MARKER) == -1) {
|
||||||
|
@ -74,22 +74,22 @@ private Validation validateExportJob(Object jobConfiguration) {
|
|||||||
Validation validation = new Validation(ToJobConfiguration.class);
|
Validation validation = new Validation(ToJobConfiguration.class);
|
||||||
ToJobConfiguration configuration = (ToJobConfiguration)jobConfiguration;
|
ToJobConfiguration configuration = (ToJobConfiguration)jobConfiguration;
|
||||||
|
|
||||||
if(configuration.table.tableName == null && configuration.table.sql == null) {
|
if(configuration.toTable.tableName == null && configuration.toTable.sql == null) {
|
||||||
validation.addMessage(Status.UNACCEPTABLE, "table", "Either table name or SQL must be specified");
|
validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Either fromTable name or SQL must be specified");
|
||||||
}
|
}
|
||||||
if(configuration.table.tableName != null && configuration.table.sql != null) {
|
if(configuration.toTable.tableName != null && configuration.toTable.sql != null) {
|
||||||
validation.addMessage(Status.UNACCEPTABLE, "table", "Both table name and SQL cannot be specified");
|
validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Both fromTable name and SQL cannot be specified");
|
||||||
}
|
}
|
||||||
if(configuration.table.tableName == null &&
|
if(configuration.toTable.tableName == null &&
|
||||||
configuration.table.stageTableName != null) {
|
configuration.toTable.stageTableName != null) {
|
||||||
validation.addMessage(Status.UNACCEPTABLE, "table",
|
validation.addMessage(Status.UNACCEPTABLE, "fromTable",
|
||||||
"Stage table name cannot be specified without specifying table name");
|
"Stage fromTable name cannot be specified without specifying fromTable name");
|
||||||
}
|
}
|
||||||
if(configuration.table.stageTableName == null &&
|
if(configuration.toTable.stageTableName == null &&
|
||||||
configuration.table.clearStageTable != null) {
|
configuration.toTable.clearStageTable != null) {
|
||||||
validation.addMessage(Status.UNACCEPTABLE, "table",
|
validation.addMessage(Status.UNACCEPTABLE, "fromTable",
|
||||||
"Clear stage table cannot be specified without specifying name of " +
|
"Clear stage fromTable cannot be specified without specifying name of " +
|
||||||
"the stage table.");
|
"the stage fromTable.");
|
||||||
}
|
}
|
||||||
|
|
||||||
return validation;
|
return validation;
|
||||||
@ -99,18 +99,18 @@ private Validation validateImportJob(Object jobConfiguration) {
|
|||||||
Validation validation = new Validation(FromJobConfiguration.class);
|
Validation validation = new Validation(FromJobConfiguration.class);
|
||||||
FromJobConfiguration configuration = (FromJobConfiguration)jobConfiguration;
|
FromJobConfiguration configuration = (FromJobConfiguration)jobConfiguration;
|
||||||
|
|
||||||
if(configuration.table.tableName == null && configuration.table.sql == null) {
|
if(configuration.fromTable.tableName == null && configuration.fromTable.sql == null) {
|
||||||
validation.addMessage(Status.UNACCEPTABLE, "table", "Either table name or SQL must be specified");
|
validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Either fromTable name or SQL must be specified");
|
||||||
}
|
}
|
||||||
if(configuration.table.tableName != null && configuration.table.sql != null) {
|
if(configuration.fromTable.tableName != null && configuration.fromTable.sql != null) {
|
||||||
validation.addMessage(Status.UNACCEPTABLE, "table", "Both table name and SQL cannot be specified");
|
validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Both fromTable name and SQL cannot be specified");
|
||||||
}
|
}
|
||||||
if(configuration.table.schemaName != null && configuration.table.sql != null) {
|
if(configuration.fromTable.schemaName != null && configuration.fromTable.sql != null) {
|
||||||
validation.addMessage(Status.UNACCEPTABLE, "table", "Both schema name and SQL cannot be specified");
|
validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Both schema name and SQL cannot be specified");
|
||||||
}
|
}
|
||||||
|
|
||||||
if(configuration.table.sql != null && !configuration.table.sql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN)) {
|
if(configuration.fromTable.sql != null && !configuration.fromTable.sql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN)) {
|
||||||
validation.addMessage(Status.UNACCEPTABLE, "table", "sql", "SQL statement must contain placeholder for auto generated "
|
validation.addMessage(Status.UNACCEPTABLE, "fromTable", "sql", "SQL statement must contain placeholder for auto generated "
|
||||||
+ "conditions - " + GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
|
+ "conditions - " + GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,9 +25,9 @@
|
|||||||
*/
|
*/
|
||||||
@ConfigurationClass
|
@ConfigurationClass
|
||||||
public class FromJobConfiguration {
|
public class FromJobConfiguration {
|
||||||
@Form public FromTableForm table;
|
@Form public FromTableForm fromTable;
|
||||||
|
|
||||||
public FromJobConfiguration() {
|
public FromJobConfiguration() {
|
||||||
table = new FromTableForm();
|
fromTable = new FromTableForm();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -25,9 +25,9 @@
|
|||||||
*/
|
*/
|
||||||
@ConfigurationClass
|
@ConfigurationClass
|
||||||
public class ToJobConfiguration {
|
public class ToJobConfiguration {
|
||||||
@Form public ToTableForm table;
|
@Form public ToTableForm toTable;
|
||||||
|
|
||||||
public ToJobConfiguration() {
|
public ToJobConfiguration() {
|
||||||
table = new ToTableForm();
|
toTable = new ToTableForm();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -47,55 +47,77 @@ connection.jdbcProperties.label = JDBC Connection Properties
|
|||||||
connection.jdbcProperties.help = Enter any JDBC properties that should be \
|
connection.jdbcProperties.help = Enter any JDBC properties that should be \
|
||||||
supplied during the creation of connection.
|
supplied during the creation of connection.
|
||||||
|
|
||||||
# Table Form
|
# From Table Form
|
||||||
#
|
#
|
||||||
table.label = Database configuration
|
fromTable.label = From database configuration
|
||||||
table.help = You must supply the information requested in order to create \
|
fromTable.help = You must supply the information requested in order to create \
|
||||||
a job object.
|
a job object.
|
||||||
|
|
||||||
# Schema name
|
# From schema name
|
||||||
table.schemaName.label = Schema name
|
fromTable.schemaName.label = Schema name
|
||||||
table.schemaName.help = Schema name to process data in the remote database
|
fromTable.schemaName.help = Schema name to process data in the remote database
|
||||||
|
|
||||||
# Table name
|
# From table name
|
||||||
table.tableName.label = Table name
|
fromTable.tableName.label = Table name
|
||||||
table.tableName.help = Table name to process data in the remote database
|
fromTable.tableName.help = Table name to process data in the remote database
|
||||||
|
|
||||||
# Table SQL
|
# From table SQL
|
||||||
table.sql.label = Table SQL statement
|
fromTable.sql.label = Table SQL statement
|
||||||
table.sql.help = SQL statement to process data in the remote database
|
fromTable.sql.help = SQL statement to process data in the remote database
|
||||||
|
|
||||||
# Table columns
|
# From table columns
|
||||||
table.columns.label = Table column names
|
fromTable.columns.label = Table column names
|
||||||
table.columns.help = Specific columns of a table name or a table SQL
|
fromTable.columns.help = Specific columns of a table name or a table SQL
|
||||||
|
|
||||||
# Table warehouse
|
# From table warehouse
|
||||||
table.warehouse.label = Data warehouse
|
fromTable.warehouse.label = Data warehouse
|
||||||
table.warehouse.help = The root directory for data
|
fromTable.warehouse.help = The root directory for data
|
||||||
|
|
||||||
# Stage table name
|
# From table datadir
|
||||||
table.stageTableName.label = Stage table name
|
fromTable.dataDirectory.label = Data directory
|
||||||
table.stageTableName.help = Name of the stage table to use
|
fromTable.dataDirectory.help = The sub-directory under warehouse for data
|
||||||
|
|
||||||
# Clear stage table
|
# From table pcol
|
||||||
table.clearStageTable.label = Clear stage table
|
fromTable.partitionColumn.label = Partition column name
|
||||||
table.clearStageTable.help = Indicate if the stage table should be cleared
|
fromTable.partitionColumn.help = A specific column for data partition
|
||||||
|
|
||||||
# Table datadir
|
# From table pcol is null
|
||||||
table.dataDirectory.label = Data directory
|
fromTable.partitionColumnNull.label = Nulls in partition column
|
||||||
table.dataDirectory.help = The sub-directory under warehouse for data
|
fromTable.partitionColumnNull.help = Whether there are null values in partition column
|
||||||
|
|
||||||
# Table pcol
|
# From table boundary
|
||||||
table.partitionColumn.label = Partition column name
|
fromTable.boundaryQuery.label = Boundary query
|
||||||
table.partitionColumn.help = A specific column for data partition
|
fromTable.boundaryQuery.help = The boundary query for data partition
|
||||||
|
|
||||||
# Table pcol is null
|
# To table form
|
||||||
table.partitionColumnNull.label = Nulls in partition column
|
#
|
||||||
table.partitionColumnNull.help = Whether there are null values in partition column
|
toTable.label = To database configuration
|
||||||
|
toTable.help = You must supply the information requested in order to create \
|
||||||
|
a job object.
|
||||||
|
|
||||||
# Table boundary
|
# From schema name
|
||||||
table.boundaryQuery.label = Boundary query
|
toTable.schemaName.label = Schema name
|
||||||
table.boundaryQuery.help = The boundary query for data partition
|
toTable.schemaName.help = Schema name to process data in the remote database
|
||||||
|
|
||||||
|
# From table name
|
||||||
|
toTable.tableName.label = Table name
|
||||||
|
toTable.tableName.help = Table name to process data in the remote database
|
||||||
|
|
||||||
|
# From table SQL
|
||||||
|
toTable.sql.label = Table SQL statement
|
||||||
|
toTable.sql.help = SQL statement to process data in the remote database
|
||||||
|
|
||||||
|
# From table columns
|
||||||
|
toTable.columns.label = Table column names
|
||||||
|
toTable.columns.help = Specific columns of a table name or a table SQL
|
||||||
|
|
||||||
|
# To stage table name
|
||||||
|
toTable.stageTableName.label = Stage table name
|
||||||
|
toTable.stageTableName.help = Name of the stage table to use
|
||||||
|
|
||||||
|
# To clear stage table
|
||||||
|
toTable.clearStageTable.label = Clear stage table
|
||||||
|
toTable.clearStageTable.help = Indicate if the stage table should be cleared
|
||||||
|
|
||||||
# Placeholders to have some entities created
|
# Placeholders to have some entities created
|
||||||
ignored.label = Ignored
|
ignored.label = Ignored
|
||||||
|
@ -20,7 +20,7 @@
|
|||||||
import junit.framework.TestCase;
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
public class GenericJdbcExecutorTest extends TestCase {
|
public class GenericJdbcExecutorTest extends TestCase {
|
||||||
// private final String table;
|
// private final String fromTable;
|
||||||
// private final String emptyTable;
|
// private final String emptyTable;
|
||||||
// private final GenericJdbcExecutor executor;
|
// private final GenericJdbcExecutor executor;
|
||||||
//
|
//
|
||||||
@ -28,8 +28,8 @@ public class GenericJdbcExecutorTest extends TestCase {
|
|||||||
// private static final int NUMBER_OF_ROWS = 974;
|
// private static final int NUMBER_OF_ROWS = 974;
|
||||||
//
|
//
|
||||||
// public GenericJdbcExecutorTest() {
|
// public GenericJdbcExecutorTest() {
|
||||||
// table = getClass().getSimpleName().toUpperCase();
|
// fromTable = getClass().getSimpleName().toUpperCase();
|
||||||
// emptyTable = table + "_EMPTY";
|
// emptyTable = fromTable + "_EMPTY";
|
||||||
// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
|
// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
|
||||||
// GenericJdbcTestConstants.URL, null, null);
|
// GenericJdbcTestConstants.URL, null, null);
|
||||||
// }
|
// }
|
||||||
@ -42,15 +42,15 @@ public class GenericJdbcExecutorTest extends TestCase {
|
|||||||
// executor.executeUpdate("CREATE TABLE "
|
// executor.executeUpdate("CREATE TABLE "
|
||||||
// + emptyTable + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
|
// + emptyTable + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
|
||||||
//
|
//
|
||||||
// if(executor.existTable(table)) {
|
// if(executor.existTable(fromTable)) {
|
||||||
// executor.executeUpdate("DROP TABLE " + table);
|
// executor.executeUpdate("DROP TABLE " + fromTable);
|
||||||
// }
|
// }
|
||||||
// executor.executeUpdate("CREATE TABLE "
|
// executor.executeUpdate("CREATE TABLE "
|
||||||
// + table + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
|
// + fromTable + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
|
||||||
//
|
//
|
||||||
// for (int i = 0; i < NUMBER_OF_ROWS; i++) {
|
// for (int i = 0; i < NUMBER_OF_ROWS; i++) {
|
||||||
// int value = START + i;
|
// int value = START + i;
|
||||||
// String sql = "INSERT INTO " + table
|
// String sql = "INSERT INTO " + fromTable
|
||||||
// + " VALUES(" + value + ", '" + value + "')";
|
// + " VALUES(" + value + ", '" + value + "')";
|
||||||
// executor.executeUpdate(sql);
|
// executor.executeUpdate(sql);
|
||||||
// }
|
// }
|
||||||
@ -58,23 +58,23 @@ public class GenericJdbcExecutorTest extends TestCase {
|
|||||||
//
|
//
|
||||||
// @SuppressWarnings("unchecked")
|
// @SuppressWarnings("unchecked")
|
||||||
// public void testDeleteTableData() throws Exception {
|
// public void testDeleteTableData() throws Exception {
|
||||||
// executor.deleteTableData(table);
|
// executor.deleteTableData(fromTable);
|
||||||
// assertEquals("Table " + table + " is expected to be empty.",
|
// assertEquals("Table " + fromTable + " is expected to be empty.",
|
||||||
// 0, executor.getTableRowCount(table));
|
// 0, executor.getTableRowCount(fromTable));
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// @SuppressWarnings("unchecked")
|
// @SuppressWarnings("unchecked")
|
||||||
// public void testMigrateData() throws Exception {
|
// public void testMigrateData() throws Exception {
|
||||||
// assertEquals("Table " + emptyTable + " is expected to be empty.",
|
// assertEquals("Table " + emptyTable + " is expected to be empty.",
|
||||||
// 0, executor.getTableRowCount(emptyTable));
|
// 0, executor.getTableRowCount(emptyTable));
|
||||||
// assertEquals("Table " + table + " is expected to have " +
|
// assertEquals("Table " + fromTable + " is expected to have " +
|
||||||
// NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS,
|
// NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS,
|
||||||
// executor.getTableRowCount(table));
|
// executor.getTableRowCount(fromTable));
|
||||||
//
|
//
|
||||||
// executor.migrateData(table, emptyTable);
|
// executor.migrateData(fromTable, emptyTable);
|
||||||
//
|
//
|
||||||
// assertEquals("Table " + table + " is expected to be empty.", 0,
|
// assertEquals("Table " + fromTable + " is expected to be empty.", 0,
|
||||||
// executor.getTableRowCount(table));
|
// executor.getTableRowCount(fromTable));
|
||||||
// assertEquals("Table " + emptyTable + " is expected to have " +
|
// assertEquals("Table " + emptyTable + " is expected to have " +
|
||||||
// NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS,
|
// NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS,
|
||||||
// executor.getTableRowCount(emptyTable));
|
// executor.getTableRowCount(emptyTable));
|
||||||
@ -82,7 +82,7 @@ public class GenericJdbcExecutorTest extends TestCase {
|
|||||||
//
|
//
|
||||||
// @SuppressWarnings("unchecked")
|
// @SuppressWarnings("unchecked")
|
||||||
// public void testGetTableRowCount() throws Exception {
|
// public void testGetTableRowCount() throws Exception {
|
||||||
// assertEquals("Table " + table + " is expected to be empty.",
|
// assertEquals("Table " + fromTable + " is expected to be empty.",
|
||||||
// NUMBER_OF_ROWS, executor.getTableRowCount(table));
|
// NUMBER_OF_ROWS, executor.getTableRowCount(fromTable));
|
||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
|
@ -83,7 +83,7 @@ public class TestExportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.tableName = schemalessTableName;
|
// jobConf.fromTable.tableName = schemalessTableName;
|
||||||
//
|
//
|
||||||
// MutableContext context = new MutableMapContext();
|
// MutableContext context = new MutableMapContext();
|
||||||
// InitializerContext initializerContext = new InitializerContext(context);
|
// InitializerContext initializerContext = new InitializerContext(context);
|
||||||
@ -104,8 +104,8 @@ public class TestExportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.tableName = schemalessTableName;
|
// jobConf.fromTable.tableName = schemalessTableName;
|
||||||
// jobConf.table.columns = tableColumns;
|
// jobConf.fromTable.columns = tableColumns;
|
||||||
//
|
//
|
||||||
// MutableContext context = new MutableMapContext();
|
// MutableContext context = new MutableMapContext();
|
||||||
// InitializerContext initializerContext = new InitializerContext(context);
|
// InitializerContext initializerContext = new InitializerContext(context);
|
||||||
@ -124,7 +124,7 @@ public class TestExportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.sql = schemalessTableSql;
|
// jobConf.fromTable.sql = schemalessTableSql;
|
||||||
//
|
//
|
||||||
// MutableContext context = new MutableMapContext();
|
// MutableContext context = new MutableMapContext();
|
||||||
// InitializerContext initializerContext = new InitializerContext(context);
|
// InitializerContext initializerContext = new InitializerContext(context);
|
||||||
@ -145,8 +145,8 @@ public class TestExportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.schemaName = schemaName;
|
// jobConf.fromTable.schemaName = schemaName;
|
||||||
// jobConf.table.tableName = tableName;
|
// jobConf.fromTable.tableName = tableName;
|
||||||
//
|
//
|
||||||
// MutableContext context = new MutableMapContext();
|
// MutableContext context = new MutableMapContext();
|
||||||
// InitializerContext initializerContext = new InitializerContext(context);
|
// InitializerContext initializerContext = new InitializerContext(context);
|
||||||
@ -167,9 +167,9 @@ public class TestExportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.schemaName = schemaName;
|
// jobConf.fromTable.schemaName = schemaName;
|
||||||
// jobConf.table.tableName = tableName;
|
// jobConf.fromTable.tableName = tableName;
|
||||||
// jobConf.table.columns = tableColumns;
|
// jobConf.fromTable.columns = tableColumns;
|
||||||
//
|
//
|
||||||
// MutableContext context = new MutableMapContext();
|
// MutableContext context = new MutableMapContext();
|
||||||
// InitializerContext initializerContext = new InitializerContext(context);
|
// InitializerContext initializerContext = new InitializerContext(context);
|
||||||
@ -188,8 +188,8 @@ public class TestExportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.schemaName = schemaName;
|
// jobConf.fromTable.schemaName = schemaName;
|
||||||
// jobConf.table.sql = tableSql;
|
// jobConf.fromTable.sql = tableSql;
|
||||||
//
|
//
|
||||||
// MutableContext context = new MutableMapContext();
|
// MutableContext context = new MutableMapContext();
|
||||||
// InitializerContext initializerContext = new InitializerContext(context);
|
// InitializerContext initializerContext = new InitializerContext(context);
|
||||||
@ -222,8 +222,8 @@ public class TestExportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.tableName = schemalessTableName;
|
// jobConf.fromTable.tableName = schemalessTableName;
|
||||||
// jobConf.table.stageTableName = stageTableName;
|
// jobConf.fromTable.stageTableName = stageTableName;
|
||||||
//
|
//
|
||||||
// MutableContext context = new MutableMapContext();
|
// MutableContext context = new MutableMapContext();
|
||||||
// InitializerContext initializerContext = new InitializerContext(context);
|
// InitializerContext initializerContext = new InitializerContext(context);
|
||||||
@ -247,8 +247,8 @@ public class TestExportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.tableName = schemalessTableName;
|
// jobConf.fromTable.tableName = schemalessTableName;
|
||||||
// jobConf.table.stageTableName = stageTableName;
|
// jobConf.fromTable.stageTableName = stageTableName;
|
||||||
// createTable(fullStageTableName);
|
// createTable(fullStageTableName);
|
||||||
// executor.executeUpdate("INSERT INTO " + fullStageTableName +
|
// executor.executeUpdate("INSERT INTO " + fullStageTableName +
|
||||||
// " VALUES(1, 1.1, 'one')");
|
// " VALUES(1, 1.1, 'one')");
|
||||||
@ -274,8 +274,8 @@ public class TestExportInitializer extends TestCase {
|
|||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// //specifying clear stage table flag without specifying name of
|
// //specifying clear stage table flag without specifying name of
|
||||||
// // the stage table
|
// // the stage table
|
||||||
// jobConf.table.tableName = schemalessTableName;
|
// jobConf.fromTable.tableName = schemalessTableName;
|
||||||
// jobConf.table.clearStageTable = false;
|
// jobConf.fromTable.clearStageTable = false;
|
||||||
// GenericJdbcValidator validator = new GenericJdbcValidator();
|
// GenericJdbcValidator validator = new GenericJdbcValidator();
|
||||||
// Validation validation = validator.validateJob(MJob.Type.EXPORT, jobConf);
|
// Validation validation = validator.validateJob(MJob.Type.EXPORT, jobConf);
|
||||||
// assertEquals("User should not specify clear stage table flag without " +
|
// assertEquals("User should not specify clear stage table flag without " +
|
||||||
@ -283,16 +283,16 @@ public class TestExportInitializer extends TestCase {
|
|||||||
// Status.UNACCEPTABLE,
|
// Status.UNACCEPTABLE,
|
||||||
// validation.getStatus());
|
// validation.getStatus());
|
||||||
// assertTrue(validation.getMessages().containsKey(
|
// assertTrue(validation.getMessages().containsKey(
|
||||||
// new Validation.FormInput("table")));
|
// new Validation.FormInput("fromTable")));
|
||||||
//
|
//
|
||||||
// jobConf.table.clearStageTable = true;
|
// jobConf.fromTable.clearStageTable = true;
|
||||||
// validation = validator.validateJob(MJob.Type.EXPORT, jobConf);
|
// validation = validator.validateJob(MJob.Type.EXPORT, jobConf);
|
||||||
// assertEquals("User should not specify clear stage table flag without " +
|
// assertEquals("User should not specify clear stage table flag without " +
|
||||||
// "specifying name of the stage table",
|
// "specifying name of the stage table",
|
||||||
// Status.UNACCEPTABLE,
|
// Status.UNACCEPTABLE,
|
||||||
// validation.getStatus());
|
// validation.getStatus());
|
||||||
// assertTrue(validation.getMessages().containsKey(
|
// assertTrue(validation.getMessages().containsKey(
|
||||||
// new Validation.FormInput("table")));
|
// new Validation.FormInput("fromTable")));
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// @SuppressWarnings("unchecked")
|
// @SuppressWarnings("unchecked")
|
||||||
@ -303,15 +303,15 @@ public class TestExportInitializer extends TestCase {
|
|||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// //specifying stage table without specifying table name
|
// //specifying stage table without specifying table name
|
||||||
// jobConf.table.stageTableName = stageTableName;
|
// jobConf.fromTable.stageTableName = stageTableName;
|
||||||
// jobConf.table.sql = "";
|
// jobConf.fromTable.sql = "";
|
||||||
//
|
//
|
||||||
// GenericJdbcValidator validator = new GenericJdbcValidator();
|
// GenericJdbcValidator validator = new GenericJdbcValidator();
|
||||||
// Validation validation = validator.validateJob(MJob.Type.EXPORT, jobConf);
|
// Validation validation = validator.validateJob(MJob.Type.EXPORT, jobConf);
|
||||||
// assertEquals("Stage table name cannot be specified without specifying " +
|
// assertEquals("Stage table name cannot be specified without specifying " +
|
||||||
// "table name", Status.UNACCEPTABLE, validation.getStatus());
|
// "table name", Status.UNACCEPTABLE, validation.getStatus());
|
||||||
// assertTrue(validation.getMessages().containsKey(
|
// assertTrue(validation.getMessages().containsKey(
|
||||||
// new Validation.FormInput("table")));
|
// new Validation.FormInput("fromTable")));
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// @SuppressWarnings("unchecked")
|
// @SuppressWarnings("unchecked")
|
||||||
@ -323,9 +323,9 @@ public class TestExportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.tableName = schemalessTableName;
|
// jobConf.fromTable.tableName = schemalessTableName;
|
||||||
// jobConf.table.stageTableName = stageTableName;
|
// jobConf.fromTable.stageTableName = stageTableName;
|
||||||
// jobConf.table.clearStageTable = true;
|
// jobConf.fromTable.clearStageTable = true;
|
||||||
// createTable(fullStageTableName);
|
// createTable(fullStageTableName);
|
||||||
// executor.executeUpdate("INSERT INTO " + fullStageTableName +
|
// executor.executeUpdate("INSERT INTO " + fullStageTableName +
|
||||||
// " VALUES(1, 1.1, 'one')");
|
// " VALUES(1, 1.1, 'one')");
|
||||||
@ -348,8 +348,8 @@ public class TestExportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.tableName = schemalessTableName;
|
// jobConf.fromTable.tableName = schemalessTableName;
|
||||||
// jobConf.table.stageTableName = stageTableName;
|
// jobConf.fromTable.stageTableName = stageTableName;
|
||||||
// createTable(fullStageTableName);
|
// createTable(fullStageTableName);
|
||||||
// MutableContext context = new MutableMapContext();
|
// MutableContext context = new MutableMapContext();
|
||||||
// InitializerContext initializerContext = new InitializerContext(context);
|
// InitializerContext initializerContext = new InitializerContext(context);
|
||||||
|
@ -92,7 +92,7 @@ public class TestImportInitializer extends TestCase {
|
|||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// /**
|
// /**
|
||||||
// * Return Schema representation for the testing table.
|
// * Return Schema representation for the testing fromTable.
|
||||||
// *
|
// *
|
||||||
// * @param name Name that should be used for the generated schema.
|
// * @param name Name that should be used for the generated schema.
|
||||||
// * @return
|
// * @return
|
||||||
@ -117,7 +117,7 @@ public class TestImportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.tableName = schemalessTableName;
|
// jobConf.fromTable.tableName = schemalessTableName;
|
||||||
//
|
//
|
||||||
// MutableContext context = new MutableMapContext();
|
// MutableContext context = new MutableMapContext();
|
||||||
// InitializerContext initializerContext = new InitializerContext(context);
|
// InitializerContext initializerContext = new InitializerContext(context);
|
||||||
@ -143,8 +143,8 @@ public class TestImportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.tableName = schemalessTableName;
|
// jobConf.fromTable.tableName = schemalessTableName;
|
||||||
// jobConf.table.columns = tableColumns;
|
// jobConf.fromTable.columns = tableColumns;
|
||||||
//
|
//
|
||||||
// MutableContext context = new MutableMapContext();
|
// MutableContext context = new MutableMapContext();
|
||||||
// InitializerContext initializerContext = new InitializerContext(context);
|
// InitializerContext initializerContext = new InitializerContext(context);
|
||||||
@ -170,8 +170,8 @@ public class TestImportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.sql = schemalessTableSql;
|
// jobConf.fromTable.sql = schemalessTableSql;
|
||||||
// jobConf.table.partitionColumn = "DCOL";
|
// jobConf.fromTable.partitionColumn = "DCOL";
|
||||||
//
|
//
|
||||||
// MutableContext context = new MutableMapContext();
|
// MutableContext context = new MutableMapContext();
|
||||||
// InitializerContext initializerContext = new InitializerContext(context);
|
// InitializerContext initializerContext = new InitializerContext(context);
|
||||||
@ -197,9 +197,9 @@ public class TestImportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.sql = schemalessTableSql;
|
// jobConf.fromTable.sql = schemalessTableSql;
|
||||||
// jobConf.table.columns = tableColumns;
|
// jobConf.fromTable.columns = tableColumns;
|
||||||
// jobConf.table.partitionColumn = "DCOL";
|
// jobConf.fromTable.partitionColumn = "DCOL";
|
||||||
//
|
//
|
||||||
// MutableContext context = new MutableMapContext();
|
// MutableContext context = new MutableMapContext();
|
||||||
// InitializerContext initializerContext = new InitializerContext(context);
|
// InitializerContext initializerContext = new InitializerContext(context);
|
||||||
@ -228,8 +228,8 @@ public class TestImportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.schemaName = schemaName;
|
// jobConf.fromTable.schemaName = schemaName;
|
||||||
// jobConf.table.tableName = tableName;
|
// jobConf.fromTable.tableName = tableName;
|
||||||
//
|
//
|
||||||
// MutableContext context = new MutableMapContext();
|
// MutableContext context = new MutableMapContext();
|
||||||
// InitializerContext initializerContext = new InitializerContext(context);
|
// InitializerContext initializerContext = new InitializerContext(context);
|
||||||
@ -257,9 +257,9 @@ public class TestImportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.schemaName = schemaName;
|
// jobConf.fromTable.schemaName = schemaName;
|
||||||
// jobConf.table.tableName = tableName;
|
// jobConf.fromTable.tableName = tableName;
|
||||||
// jobConf.table.columns = tableColumns;
|
// jobConf.fromTable.columns = tableColumns;
|
||||||
//
|
//
|
||||||
// MutableContext context = new MutableMapContext();
|
// MutableContext context = new MutableMapContext();
|
||||||
// InitializerContext initializerContext = new InitializerContext(context);
|
// InitializerContext initializerContext = new InitializerContext(context);
|
||||||
@ -287,9 +287,9 @@ public class TestImportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.schemaName = schemaName;
|
// jobConf.fromTable.schemaName = schemaName;
|
||||||
// jobConf.table.sql = tableSql;
|
// jobConf.fromTable.sql = tableSql;
|
||||||
// jobConf.table.partitionColumn = "DCOL";
|
// jobConf.fromTable.partitionColumn = "DCOL";
|
||||||
//
|
//
|
||||||
// MutableContext context = new MutableMapContext();
|
// MutableContext context = new MutableMapContext();
|
||||||
// InitializerContext initializerContext = new InitializerContext(context);
|
// InitializerContext initializerContext = new InitializerContext(context);
|
||||||
@ -316,9 +316,9 @@ public class TestImportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.schemaName = schemaName;
|
// jobConf.fromTable.schemaName = schemaName;
|
||||||
// jobConf.table.tableName = tableName;
|
// jobConf.fromTable.tableName = tableName;
|
||||||
// jobConf.table.partitionColumn = "DCOL";
|
// jobConf.fromTable.partitionColumn = "DCOL";
|
||||||
//
|
//
|
||||||
// MutableContext context = new MutableMapContext();
|
// MutableContext context = new MutableMapContext();
|
||||||
// InitializerContext initializerContext = new InitializerContext(context);
|
// InitializerContext initializerContext = new InitializerContext(context);
|
||||||
@ -337,9 +337,9 @@ public class TestImportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.schemaName = schemaName;
|
// jobConf.fromTable.schemaName = schemaName;
|
||||||
// jobConf.table.sql = tableSql;
|
// jobConf.fromTable.sql = tableSql;
|
||||||
// jobConf.table.partitionColumn = "DCOL";
|
// jobConf.fromTable.partitionColumn = "DCOL";
|
||||||
//
|
//
|
||||||
// MutableContext context = new MutableMapContext();
|
// MutableContext context = new MutableMapContext();
|
||||||
// InitializerContext initializerContext = new InitializerContext(context);
|
// InitializerContext initializerContext = new InitializerContext(context);
|
||||||
@ -360,10 +360,10 @@ public class TestImportInitializer extends TestCase {
|
|||||||
//
|
//
|
||||||
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
// connConf.connection.connectionString = GenericJdbcTestConstants.URL;
|
||||||
// jobConf.table.schemaName = schemaName;
|
// jobConf.fromTable.schemaName = schemaName;
|
||||||
// jobConf.table.sql = tableSql;
|
// jobConf.fromTable.sql = tableSql;
|
||||||
// jobConf.table.columns = tableColumns;
|
// jobConf.fromTable.columns = tableColumns;
|
||||||
// jobConf.table.partitionColumn = "DCOL";
|
// jobConf.fromTable.partitionColumn = "DCOL";
|
||||||
//
|
//
|
||||||
// MutableContext context = new MutableMapContext();
|
// MutableContext context = new MutableMapContext();
|
||||||
// InitializerContext initializerContext = new InitializerContext(context);
|
// InitializerContext initializerContext = new InitializerContext(context);
|
||||||
|
@ -475,7 +475,7 @@ public class TestImportPartitioner extends TestCase {
|
|||||||
//
|
//
|
||||||
// ConnectionConfiguration connConf = new ConnectionConfiguration();
|
// ConnectionConfiguration connConf = new ConnectionConfiguration();
|
||||||
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
|
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
|
||||||
// jobConf.table.partitionColumnNull = true;
|
// jobConf.fromTable.partitionColumnNull = true;
|
||||||
//
|
//
|
||||||
// Partitioner partitioner = new GenericJdbcImportPartitioner();
|
// Partitioner partitioner = new GenericJdbcImportPartitioner();
|
||||||
// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
|
// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
|
||||||
|
@ -160,7 +160,7 @@ public static void setFrameworkConnectionConfig(ConnectorType type, Job job, Obj
|
|||||||
* @param job MapReduce job object
|
* @param job MapReduce job object
|
||||||
* @param obj Configuration object
|
* @param obj Configuration object
|
||||||
*/
|
*/
|
||||||
public static void setConfigFrameworkJob(Job job, Object obj) {
|
public static void setFrameworkJobConfig(Job job, Object obj) {
|
||||||
job.getConfiguration().set(JOB_CONFIG_CLASS_FRAMEWORK_JOB, obj.getClass().getName());
|
job.getConfiguration().set(JOB_CONFIG_CLASS_FRAMEWORK_JOB, obj.getClass().getName());
|
||||||
job.getCredentials().addSecretKey(JOB_CONFIG_FRAMEWORK_JOB_KEY, FormUtils.toJson(obj).getBytes());
|
job.getCredentials().addSecretKey(JOB_CONFIG_FRAMEWORK_JOB_KEY, FormUtils.toJson(obj).getBytes());
|
||||||
}
|
}
|
||||||
@ -225,7 +225,7 @@ public static Object getFrameworkConnectionConfig(ConnectorType type, Configurat
|
|||||||
* @param configuration MapReduce configuration object
|
* @param configuration MapReduce configuration object
|
||||||
* @return Configuration object
|
* @return Configuration object
|
||||||
*/
|
*/
|
||||||
public static Object getConfigFrameworkJob(Configuration configuration) {
|
public static Object getFrameworkJobConfig(Configuration configuration) {
|
||||||
return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FRAMEWORK_JOB, JOB_CONFIG_FRAMEWORK_JOB_KEY);
|
return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FRAMEWORK_JOB, JOB_CONFIG_FRAMEWORK_JOB_KEY);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -248,22 +248,10 @@ public static void setConnectorSchema(ConnectorType type, Job job, Schema schema
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Persist Framework generated schema.
|
* Retrieve Connector generated schema.
|
||||||
*
|
|
||||||
* @param job MapReduce Job object
|
|
||||||
* @param schema Schema
|
|
||||||
*/
|
|
||||||
public static void setHioSchema(Job job, Schema schema) {
|
|
||||||
if(schema != null) {
|
|
||||||
job.getCredentials().addSecretKey(SCHEMA_HIO_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Retrieve From Connector generated schema.
|
|
||||||
*
|
*
|
||||||
|
* @param type The FROM or TO connector
|
||||||
* @param configuration MapReduce configuration object
|
* @param configuration MapReduce configuration object
|
||||||
* @return Schema
|
|
||||||
*/
|
*/
|
||||||
public static Schema getConnectorSchema(ConnectorType type, Configuration configuration) {
|
public static Schema getConnectorSchema(ConnectorType type, Configuration configuration) {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
|
@ -68,7 +68,7 @@ public class MapreduceExecutionEngineTest {
|
|||||||
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
|
// ImportJobConfiguration jobConf = new ImportJobConfiguration();
|
||||||
// jobConf.output.outputFormat = OutputFormat.TEXT_FILE;
|
// jobConf.output.outputFormat = OutputFormat.TEXT_FILE;
|
||||||
// jobConf.output.compression = comprssionFormat;
|
// jobConf.output.compression = comprssionFormat;
|
||||||
// request.setConfigFrameworkJob(jobConf);
|
// request.setFrameworkJobConfig(jobConf);
|
||||||
// request.setConnectorCallbacks(new Importer(Initializer.class,
|
// request.setConnectorCallbacks(new Importer(Initializer.class,
|
||||||
// Partitioner.class, Extractor.class, Destroyer.class) {
|
// Partitioner.class, Extractor.class, Destroyer.class) {
|
||||||
// });
|
// });
|
||||||
@ -95,7 +95,7 @@ public class MapreduceExecutionEngineTest {
|
|||||||
// jobConf.output.outputFormat = OutputFormat.TEXT_FILE;
|
// jobConf.output.outputFormat = OutputFormat.TEXT_FILE;
|
||||||
// jobConf.output.compression = OutputCompression.CUSTOM;
|
// jobConf.output.compression = OutputCompression.CUSTOM;
|
||||||
// jobConf.output.customCompression = customCodecName;
|
// jobConf.output.customCompression = customCodecName;
|
||||||
// request.setConfigFrameworkJob(jobConf);
|
// request.setFrameworkJobConfig(jobConf);
|
||||||
// request.setConnectorCallbacks(new Importer(Initializer.class,
|
// request.setConnectorCallbacks(new Importer(Initializer.class,
|
||||||
// Partitioner.class, Extractor.class, Destroyer.class) {
|
// Partitioner.class, Extractor.class, Destroyer.class) {
|
||||||
// });
|
// });
|
||||||
|
@ -89,9 +89,9 @@ public class TestConfigurationUtils {
|
|||||||
//
|
//
|
||||||
// @Test
|
// @Test
|
||||||
// public void testConfigFrameworkJob() throws Exception {
|
// public void testConfigFrameworkJob() throws Exception {
|
||||||
// ConfigurationUtils.setConfigFrameworkJob(job, getConfig());
|
// ConfigurationUtils.setFrameworkJobConfig(job, getConfig());
|
||||||
// setUpJobConf();
|
// setUpJobConf();
|
||||||
// assertEquals(getConfig(), ConfigurationUtils.getConfigFrameworkJob(jobConf));
|
// assertEquals(getConfig(), ConfigurationUtils.getFrameworkJobConfig(jobConf));
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// @Test
|
// @Test
|
||||||
|
@ -206,7 +206,7 @@ public boolean submit(SubmissionRequest generalRequest) {
|
|||||||
ConfigurationUtils.setConnectorJobConfig(ConnectorType.TO, job, request.getConnectorJobConfig(ConnectorType.TO));
|
ConfigurationUtils.setConnectorJobConfig(ConnectorType.TO, job, request.getConnectorJobConfig(ConnectorType.TO));
|
||||||
ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.FROM, job, request.getFrameworkConnectionConfig(ConnectorType.FROM));
|
ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.FROM, job, request.getFrameworkConnectionConfig(ConnectorType.FROM));
|
||||||
ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.TO, job, request.getFrameworkConnectionConfig(ConnectorType.TO));
|
ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.TO, job, request.getFrameworkConnectionConfig(ConnectorType.TO));
|
||||||
ConfigurationUtils.setConfigFrameworkJob(job, request.getConfigFrameworkJob());
|
ConfigurationUtils.setFrameworkJobConfig(job, request.getConfigFrameworkJob());
|
||||||
// @TODO(Abe): Persist TO schema.
|
// @TODO(Abe): Persist TO schema.
|
||||||
ConfigurationUtils.setConnectorSchema(ConnectorType.FROM, job, request.getSummary().getConnectorSchema());
|
ConfigurationUtils.setConnectorSchema(ConnectorType.FROM, job, request.getSummary().getConnectorSchema());
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user