mirror of
https://github.com/apache/sqoop.git
synced 2025-05-16 00:41:23 +08:00
SQOOP-2660. Sqoop2: Generic JDBC: Convert input Columns from String to List
(Jarcec via Hari)
This commit is contained in:
parent
9b9c494df5
commit
43c478df5c
@ -21,15 +21,16 @@
|
|||||||
import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
|
import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
|
||||||
import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
|
import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
|
||||||
import org.apache.sqoop.model.MConfig;
|
import org.apache.sqoop.model.MConfig;
|
||||||
|
import org.apache.sqoop.model.MConfigList;
|
||||||
import org.apache.sqoop.model.MFromConfig;
|
import org.apache.sqoop.model.MFromConfig;
|
||||||
import org.apache.sqoop.model.MInput;
|
import org.apache.sqoop.model.MInput;
|
||||||
import org.apache.sqoop.model.MLinkConfig;
|
import org.apache.sqoop.model.MLinkConfig;
|
||||||
import org.apache.sqoop.model.MToConfig;
|
import org.apache.sqoop.model.MToConfig;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
// NOTE: All config types have the similar upgrade path at this point
|
|
||||||
public class GenericJdbcConnectorUpgrader extends ConnectorConfigurableUpgrader {
|
public class GenericJdbcConnectorUpgrader extends ConnectorConfigurableUpgrader {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -39,11 +40,30 @@ public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void upgradeFromJobConfig(MFromConfig original, MFromConfig upgradeTarget) {
|
public void upgradeFromJobConfig(MFromConfig original, MFromConfig upgradeTarget) {
|
||||||
|
// Move all configuration options that did not change
|
||||||
ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
|
ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
|
||||||
|
|
||||||
|
// We've changed "String columns" to "List<String> columnList" as it better represents the type
|
||||||
|
migrateColumnsToColumnList(original, upgradeTarget, "fromJobConfig");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) {
|
public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) {
|
||||||
|
// Move all configuration options that did not change
|
||||||
ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
|
ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
|
||||||
|
|
||||||
|
// We've changed "String columns" to "List<String> columnList" as it better represents the type
|
||||||
|
migrateColumnsToColumnList(original, upgradeTarget, "toJobConfig");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void migrateColumnsToColumnList(MConfigList original, MConfigList upgradeTarget, String configName) {
|
||||||
|
String oldInputName = configName + ".columns";
|
||||||
|
String newInputName = configName + ".columnList";
|
||||||
|
|
||||||
|
if(original.getConfig(configName).getInputNames().contains(oldInputName)) {
|
||||||
|
String columnString = original.getStringInput(oldInputName).getValue();
|
||||||
|
String[] columns = columnString.split(","); // Our code has expected comma separated list in the past
|
||||||
|
upgradeTarget.getListInput(newInputName).setValue(Arrays.asList(columns));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
import java.sql.ResultSetMetaData;
|
import java.sql.ResultSetMetaData;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.Statement;
|
import java.sql.Statement;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
@ -250,7 +251,7 @@ private void configureTableProperties(MutableContext context, LinkConfiguration
|
|||||||
String schemaName = fromJobConfig.fromJobConfig.schemaName;
|
String schemaName = fromJobConfig.fromJobConfig.schemaName;
|
||||||
String tableName = fromJobConfig.fromJobConfig.tableName;
|
String tableName = fromJobConfig.fromJobConfig.tableName;
|
||||||
String tableSql = fromJobConfig.fromJobConfig.sql;
|
String tableSql = fromJobConfig.fromJobConfig.sql;
|
||||||
String tableColumns = fromJobConfig.fromJobConfig.columns;
|
List<String> tableColumns = fromJobConfig.fromJobConfig.columnList;
|
||||||
|
|
||||||
// Assertion that should be true based on our validations
|
// Assertion that should be true based on our validations
|
||||||
assert (tableName != null && tableSql == null) || (tableName == null && tableSql != null);
|
assert (tableName != null && tableSql == null) || (tableName == null && tableSql != null);
|
||||||
@ -259,7 +260,7 @@ private void configureTableProperties(MutableContext context, LinkConfiguration
|
|||||||
// For databases that support schemas (IE: postgresql).
|
// For databases that support schemas (IE: postgresql).
|
||||||
String fullTableName = executor.encloseIdentifiers(schemaName, tableName);
|
String fullTableName = executor.encloseIdentifiers(schemaName, tableName);
|
||||||
|
|
||||||
if (tableColumns == null) {
|
if (tableColumns == null || tableColumns.size() == 0) {
|
||||||
StringBuilder builder = new StringBuilder();
|
StringBuilder builder = new StringBuilder();
|
||||||
builder.append("SELECT * FROM ");
|
builder.append("SELECT * FROM ");
|
||||||
builder.append(fullTableName);
|
builder.append(fullTableName);
|
||||||
@ -270,16 +271,17 @@ private void configureTableProperties(MutableContext context, LinkConfiguration
|
|||||||
String[] queryColumns = executor.getQueryColumns(dataSql.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0"));
|
String[] queryColumns = executor.getQueryColumns(dataSql.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0"));
|
||||||
fieldNames = executor.columnList(queryColumns);
|
fieldNames = executor.columnList(queryColumns);
|
||||||
} else {
|
} else {
|
||||||
|
fieldNames = executor.columnList(tableColumns.toArray(new String[tableColumns.size()]));
|
||||||
|
|
||||||
StringBuilder builder = new StringBuilder();
|
StringBuilder builder = new StringBuilder();
|
||||||
builder.append("SELECT ");
|
builder.append("SELECT ");
|
||||||
builder.append(tableColumns);
|
builder.append(fieldNames);
|
||||||
builder.append(" FROM ");
|
builder.append(" FROM ");
|
||||||
builder.append(fullTableName);
|
builder.append(fullTableName);
|
||||||
builder.append(" WHERE ");
|
builder.append(" WHERE ");
|
||||||
builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
|
builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
|
||||||
dataSql = builder.toString();
|
dataSql = builder.toString();
|
||||||
|
|
||||||
fieldNames = tableColumns;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
assert tableSql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
|
assert tableSql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
|
||||||
|
@ -21,6 +21,7 @@
|
|||||||
import java.sql.ResultSetMetaData;
|
import java.sql.ResultSetMetaData;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.Statement;
|
import java.sql.Statement;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
@ -99,11 +100,9 @@ private void configureTableProperties(MutableContext context, LinkConfiguration
|
|||||||
String schemaName = toJobConfig.toJobConfig.schemaName;
|
String schemaName = toJobConfig.toJobConfig.schemaName;
|
||||||
String tableName = toJobConfig.toJobConfig.tableName;
|
String tableName = toJobConfig.toJobConfig.tableName;
|
||||||
String stageTableName = toJobConfig.toJobConfig.stageTableName;
|
String stageTableName = toJobConfig.toJobConfig.stageTableName;
|
||||||
boolean clearStageTable = toJobConfig.toJobConfig.shouldClearStageTable == null ?
|
boolean clearStageTable = toJobConfig.toJobConfig.shouldClearStageTable == null ? false : toJobConfig.toJobConfig.shouldClearStageTable;
|
||||||
false : toJobConfig.toJobConfig.shouldClearStageTable;
|
final boolean stageEnabled = stageTableName != null && stageTableName.length() > 0;
|
||||||
final boolean stageEnabled =
|
List<String> tableColumns = toJobConfig.toJobConfig.columnList;
|
||||||
stageTableName != null && stageTableName.length() > 0;
|
|
||||||
String tableColumns = toJobConfig.toJobConfig.columns;
|
|
||||||
|
|
||||||
// when fromTable name is specified:
|
// when fromTable name is specified:
|
||||||
if(stageEnabled) {
|
if(stageEnabled) {
|
||||||
@ -124,7 +123,7 @@ private void configureTableProperties(MutableContext context, LinkConfiguration
|
|||||||
final String tableInUse = stageEnabled ? stageTableName : tableName;
|
final String tableInUse = stageEnabled ? stageTableName : tableName;
|
||||||
String fullTableName = executor.encloseIdentifiers(schemaName, tableInUse);
|
String fullTableName = executor.encloseIdentifiers(schemaName, tableInUse);
|
||||||
|
|
||||||
if (tableColumns == null) {
|
if (tableColumns == null || tableColumns.size() == 0) {
|
||||||
String[] columns = executor.getQueryColumns("SELECT * FROM " + fullTableName + " WHERE 1 = 0");
|
String[] columns = executor.getQueryColumns("SELECT * FROM " + fullTableName + " WHERE 1 = 0");
|
||||||
StringBuilder builder = new StringBuilder();
|
StringBuilder builder = new StringBuilder();
|
||||||
builder.append("INSERT INTO ");
|
builder.append("INSERT INTO ");
|
||||||
@ -137,14 +136,13 @@ private void configureTableProperties(MutableContext context, LinkConfiguration
|
|||||||
dataSql = builder.toString();
|
dataSql = builder.toString();
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
String[] columns = StringUtils.split(tableColumns, ',');
|
|
||||||
StringBuilder builder = new StringBuilder();
|
StringBuilder builder = new StringBuilder();
|
||||||
builder.append("INSERT INTO ");
|
builder.append("INSERT INTO ");
|
||||||
builder.append(fullTableName);
|
builder.append(fullTableName);
|
||||||
builder.append(" (");
|
builder.append(" (");
|
||||||
builder.append(tableColumns);
|
builder.append(tableColumns);
|
||||||
builder.append(") VALUES (?");
|
builder.append(") VALUES (?");
|
||||||
for (int i = 1; i < columns.length; i++) {
|
for (int i = 1; i < tableColumns.size(); i++) {
|
||||||
builder.append(",?");
|
builder.append(",?");
|
||||||
}
|
}
|
||||||
builder.append(")");
|
builder.append(")");
|
||||||
|
@ -25,6 +25,9 @@
|
|||||||
import org.apache.sqoop.validation.validators.AbstractValidator;
|
import org.apache.sqoop.validation.validators.AbstractValidator;
|
||||||
import org.apache.sqoop.validation.validators.NullOrContains;
|
import org.apache.sqoop.validation.validators.NullOrContains;
|
||||||
|
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
@ -39,8 +42,8 @@ public class FromJobConfig {
|
|||||||
@Input(size = 2000, validators = { @Validator(value = NullOrContains.class, strArg = GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN) })
|
@Input(size = 2000, validators = { @Validator(value = NullOrContains.class, strArg = GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN) })
|
||||||
public String sql;
|
public String sql;
|
||||||
|
|
||||||
@Input(size = 50)
|
@Input
|
||||||
public String columns;
|
public List<String> columnList;
|
||||||
|
|
||||||
@Input(size = 50)
|
@Input(size = 50)
|
||||||
public String partitionColumn;
|
public String partitionColumn;
|
||||||
@ -51,6 +54,10 @@ public class FromJobConfig {
|
|||||||
@Input(size = 50)
|
@Input(size = 50)
|
||||||
public String boundaryQuery;
|
public String boundaryQuery;
|
||||||
|
|
||||||
|
public FromJobConfig() {
|
||||||
|
columnList = new LinkedList<>();
|
||||||
|
}
|
||||||
|
|
||||||
public static class ConfigValidator extends AbstractValidator<FromJobConfig> {
|
public static class ConfigValidator extends AbstractValidator<FromJobConfig> {
|
||||||
@Override
|
@Override
|
||||||
public void validate(FromJobConfig config) {
|
public void validate(FromJobConfig config) {
|
||||||
@ -66,8 +73,8 @@ public void validate(FromJobConfig config) {
|
|||||||
if (config.sql != null && config.partitionColumn == null) {
|
if (config.sql != null && config.partitionColumn == null) {
|
||||||
addMessage(Status.ERROR, "Partition column is required on query based import");
|
addMessage(Status.ERROR, "Partition column is required on query based import");
|
||||||
}
|
}
|
||||||
if(config.sql != null && config.columns != null) {
|
if(config.sql != null && (config.columnList != null && !config.columnList.isEmpty())) {
|
||||||
addMessage(Status.ERROR, "Can't use sql import and specify columns at the same time");
|
addMessage(Status.ERROR, "Can't use sql import and specify columnList at the same time");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,9 @@
|
|||||||
import org.apache.sqoop.validation.validators.AbstractValidator;
|
import org.apache.sqoop.validation.validators.AbstractValidator;
|
||||||
import org.apache.sqoop.validation.validators.NotEmpty;
|
import org.apache.sqoop.validation.validators.NotEmpty;
|
||||||
|
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
@ -35,8 +38,8 @@ public class ToJobConfig {
|
|||||||
@Input(size = 2000, validators = { @Validator(NotEmpty.class)})
|
@Input(size = 2000, validators = { @Validator(NotEmpty.class)})
|
||||||
public String tableName;
|
public String tableName;
|
||||||
|
|
||||||
@Input(size = 50)
|
@Input
|
||||||
public String columns;
|
public List<String> columnList;
|
||||||
|
|
||||||
@Input(size = 2000)
|
@Input(size = 2000)
|
||||||
public String stageTableName;
|
public String stageTableName;
|
||||||
@ -44,6 +47,10 @@ public class ToJobConfig {
|
|||||||
@Input
|
@Input
|
||||||
public Boolean shouldClearStageTable;
|
public Boolean shouldClearStageTable;
|
||||||
|
|
||||||
|
public ToJobConfig() {
|
||||||
|
columnList = new LinkedList<>();
|
||||||
|
}
|
||||||
|
|
||||||
public static class ConfigValidator extends AbstractValidator<ToJobConfig> {
|
public static class ConfigValidator extends AbstractValidator<ToJobConfig> {
|
||||||
@Override
|
@Override
|
||||||
public void validate(ToJobConfig config) {
|
public void validate(ToJobConfig config) {
|
||||||
|
@ -71,9 +71,9 @@ fromJobConfig.tableName.help = Table name to read data from
|
|||||||
fromJobConfig.sql.label = Table SQL statement
|
fromJobConfig.sql.label = Table SQL statement
|
||||||
fromJobConfig.sql.help = SQL statement to read data from (Optional if table name is already given)
|
fromJobConfig.sql.help = SQL statement to read data from (Optional if table name is already given)
|
||||||
|
|
||||||
# From table columns
|
# From table columnList
|
||||||
fromJobConfig.columns.label = Table column names
|
fromJobConfig.columnList.label = Table column names
|
||||||
fromJobConfig.columns.help = Specific columns in the given table name or the SQL query (Optional)
|
fromJobConfig.columnList.help = Specific columns in the given table name or the SQL query (Optional)
|
||||||
|
|
||||||
# From table partition column
|
# From table partition column
|
||||||
fromJobConfig.partitionColumn.label = Partition column name
|
fromJobConfig.partitionColumn.label = Partition column name
|
||||||
@ -105,9 +105,9 @@ toJobConfig.tableName.help = Table name to write data into
|
|||||||
toJobConfig.sql.label = Table SQL statement
|
toJobConfig.sql.label = Table SQL statement
|
||||||
toJobConfig.sql.help = SQL statement to use to write data into (Optional if table name is already given)
|
toJobConfig.sql.help = SQL statement to use to write data into (Optional if table name is already given)
|
||||||
|
|
||||||
# To table columns
|
# To table columnList
|
||||||
toJobConfig.columns.label = Table column names
|
toJobConfig.columnList.label = Table column names
|
||||||
toJobConfig.columns.help = Specific columns to use in the given table name or the table SQL (Optional)
|
toJobConfig.columnList.help = Specific columns to use in the given table name or the table SQL (Optional)
|
||||||
|
|
||||||
# To stage table name
|
# To stage table name
|
||||||
toJobConfig.stageTableName.label = Stage table name
|
toJobConfig.stageTableName.label = Stage table name
|
||||||
|
@ -18,7 +18,10 @@
|
|||||||
package org.apache.sqoop.connector.jdbc;
|
package org.apache.sqoop.connector.jdbc;
|
||||||
|
|
||||||
import java.sql.Types;
|
import java.sql.Types;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.sqoop.common.MutableContext;
|
import org.apache.sqoop.common.MutableContext;
|
||||||
import org.apache.sqoop.common.MutableMapContext;
|
import org.apache.sqoop.common.MutableMapContext;
|
||||||
import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration;
|
import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration;
|
||||||
@ -43,7 +46,7 @@ public class TestFromInitializer {
|
|||||||
private final String schemalessTableName;
|
private final String schemalessTableName;
|
||||||
private final String tableSql;
|
private final String tableSql;
|
||||||
private final String schemalessTableSql;
|
private final String schemalessTableSql;
|
||||||
private final String tableColumns;
|
private final List<String> tableColumns;
|
||||||
private final String testUser;
|
private final String testUser;
|
||||||
|
|
||||||
private GenericJdbcExecutor executor;
|
private GenericJdbcExecutor executor;
|
||||||
@ -57,8 +60,10 @@ public TestFromInitializer() {
|
|||||||
schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE";
|
schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE";
|
||||||
tableSql = "SELECT * FROM " + schemaName + "." + tableName + " WHERE ${CONDITIONS}";
|
tableSql = "SELECT * FROM " + schemaName + "." + tableName + " WHERE ${CONDITIONS}";
|
||||||
schemalessTableSql = "SELECT * FROM " + schemalessTableName + " WHERE ${CONDITIONS}";
|
schemalessTableSql = "SELECT * FROM " + schemalessTableName + " WHERE ${CONDITIONS}";
|
||||||
tableColumns = "ICOL,VCOL";
|
|
||||||
testUser = "test_user";
|
testUser = "test_user";
|
||||||
|
tableColumns = new LinkedList<>();
|
||||||
|
tableColumns.add("ICOL");
|
||||||
|
tableColumns.add("VCOL");
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeMethod(alwaysRun = true)
|
@BeforeMethod(alwaysRun = true)
|
||||||
@ -209,7 +214,7 @@ public void testTableNameWithTableColumns() throws Exception {
|
|||||||
linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
|
linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
|
||||||
jobConfig.fromJobConfig.tableName = schemalessTableName;
|
jobConfig.fromJobConfig.tableName = schemalessTableName;
|
||||||
jobConfig.fromJobConfig.columns = tableColumns;
|
jobConfig.fromJobConfig.columnList = tableColumns;
|
||||||
|
|
||||||
MutableContext context = new MutableMapContext();
|
MutableContext context = new MutableMapContext();
|
||||||
InitializerContext initializerContext = new InitializerContext(context, testUser);
|
InitializerContext initializerContext = new InitializerContext(context, testUser);
|
||||||
@ -219,8 +224,8 @@ public void testTableNameWithTableColumns() throws Exception {
|
|||||||
initializer.initialize(initializerContext, linkConfig, jobConfig);
|
initializer.initialize(initializerContext, linkConfig, jobConfig);
|
||||||
|
|
||||||
verifyResult(context,
|
verifyResult(context,
|
||||||
"SELECT ICOL,VCOL FROM " + executor.encloseIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
|
"SELECT \"ICOL\", \"VCOL\" FROM " + executor.encloseIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
|
||||||
tableColumns,
|
"\"" + StringUtils.join(tableColumns, "\", \"") + "\"",
|
||||||
"\"ICOL\"",
|
"\"ICOL\"",
|
||||||
String.valueOf(Types.INTEGER),
|
String.valueOf(Types.INTEGER),
|
||||||
String.valueOf(START),
|
String.valueOf(START),
|
||||||
@ -355,7 +360,7 @@ public void testTableNameWithTableColumnsWithSchema() throws Exception {
|
|||||||
linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
|
linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
|
||||||
jobConfig.fromJobConfig.schemaName = schemaName;
|
jobConfig.fromJobConfig.schemaName = schemaName;
|
||||||
jobConfig.fromJobConfig.tableName = tableName;
|
jobConfig.fromJobConfig.tableName = tableName;
|
||||||
jobConfig.fromJobConfig.columns = tableColumns;
|
jobConfig.fromJobConfig.columnList = tableColumns;
|
||||||
|
|
||||||
MutableContext context = new MutableMapContext();
|
MutableContext context = new MutableMapContext();
|
||||||
InitializerContext initializerContext = new InitializerContext(context, testUser);
|
InitializerContext initializerContext = new InitializerContext(context, testUser);
|
||||||
@ -365,8 +370,8 @@ public void testTableNameWithTableColumnsWithSchema() throws Exception {
|
|||||||
initializer.initialize(initializerContext, linkConfig, jobConfig);
|
initializer.initialize(initializerContext, linkConfig, jobConfig);
|
||||||
|
|
||||||
verifyResult(context,
|
verifyResult(context,
|
||||||
"SELECT ICOL,VCOL FROM " + fullTableName + " WHERE ${CONDITIONS}",
|
"SELECT \"ICOL\", \"VCOL\" FROM " + fullTableName + " WHERE ${CONDITIONS}",
|
||||||
tableColumns,
|
"\"" + StringUtils.join(tableColumns, "\", \"") + "\"",
|
||||||
"\"ICOL\"",
|
"\"ICOL\"",
|
||||||
String.valueOf(Types.INTEGER),
|
String.valueOf(Types.INTEGER),
|
||||||
String.valueOf(START),
|
String.valueOf(START),
|
||||||
|
@ -21,13 +21,23 @@
|
|||||||
import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration;
|
import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration;
|
||||||
import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
|
import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
|
||||||
import org.apache.sqoop.model.ConfigUtils;
|
import org.apache.sqoop.model.ConfigUtils;
|
||||||
|
import org.apache.sqoop.model.InputEditable;
|
||||||
|
import org.apache.sqoop.model.MConfig;
|
||||||
import org.apache.sqoop.model.MFromConfig;
|
import org.apache.sqoop.model.MFromConfig;
|
||||||
|
import org.apache.sqoop.model.MInput;
|
||||||
import org.apache.sqoop.model.MLinkConfig;
|
import org.apache.sqoop.model.MLinkConfig;
|
||||||
|
import org.apache.sqoop.model.MStringInput;
|
||||||
import org.apache.sqoop.model.MToConfig;
|
import org.apache.sqoop.model.MToConfig;
|
||||||
|
import org.apache.sqoop.model.MValidator;
|
||||||
import org.testng.annotations.BeforeMethod;
|
import org.testng.annotations.BeforeMethod;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import static org.testng.Assert.assertEquals;
|
import static org.testng.Assert.assertEquals;
|
||||||
|
import static org.testng.Assert.assertNotNull;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test upgrader.
|
* Test upgrader.
|
||||||
@ -48,14 +58,12 @@ public void testFromConfig() {
|
|||||||
MFromConfig newConfigs = new MFromConfig(ConfigUtils.toConfigs(FromJobConfiguration.class), ConfigUtils.getMValidatorsFromConfigurationClass(FromJobConfiguration.class));
|
MFromConfig newConfigs = new MFromConfig(ConfigUtils.toConfigs(FromJobConfiguration.class), ConfigUtils.getMValidatorsFromConfigurationClass(FromJobConfiguration.class));
|
||||||
originalConfigs.getInput("fromJobConfig.schemaName").setValue("test-schema");
|
originalConfigs.getInput("fromJobConfig.schemaName").setValue("test-schema");
|
||||||
originalConfigs.getInput("fromJobConfig.tableName").setValue("test-tableName");
|
originalConfigs.getInput("fromJobConfig.tableName").setValue("test-tableName");
|
||||||
originalConfigs.getInput("fromJobConfig.columns").setValue("test-columns");
|
|
||||||
originalConfigs.getInput("fromJobConfig.partitionColumn").setValue("test-partitionColumn");
|
originalConfigs.getInput("fromJobConfig.partitionColumn").setValue("test-partitionColumn");
|
||||||
originalConfigs.getInput("fromJobConfig.allowNullValueInPartitionColumn").setValue("test-allowNullValueInPartitionColumn");
|
originalConfigs.getInput("fromJobConfig.allowNullValueInPartitionColumn").setValue("test-allowNullValueInPartitionColumn");
|
||||||
upgrader.upgradeFromJobConfig(originalConfigs, newConfigs);
|
upgrader.upgradeFromJobConfig(originalConfigs, newConfigs);
|
||||||
assertEquals(originalConfigs, newConfigs);
|
assertEquals(originalConfigs, newConfigs);
|
||||||
assertEquals("test-schema", newConfigs.getInput("fromJobConfig.schemaName").getValue());
|
assertEquals("test-schema", newConfigs.getInput("fromJobConfig.schemaName").getValue());
|
||||||
assertEquals("test-tableName", newConfigs.getInput("fromJobConfig.tableName").getValue());
|
assertEquals("test-tableName", newConfigs.getInput("fromJobConfig.tableName").getValue());
|
||||||
assertEquals("test-columns", newConfigs.getInput("fromJobConfig.columns").getValue());
|
|
||||||
assertEquals("test-partitionColumn", newConfigs.getInput("fromJobConfig.partitionColumn").getValue());
|
assertEquals("test-partitionColumn", newConfigs.getInput("fromJobConfig.partitionColumn").getValue());
|
||||||
assertEquals("test-allowNullValueInPartitionColumn", newConfigs.getInput("fromJobConfig.allowNullValueInPartitionColumn").getValue());
|
assertEquals("test-allowNullValueInPartitionColumn", newConfigs.getInput("fromJobConfig.allowNullValueInPartitionColumn").getValue());
|
||||||
}
|
}
|
||||||
@ -67,14 +75,12 @@ public void testToConfig() {
|
|||||||
MToConfig newConfigs = new MToConfig(ConfigUtils.toConfigs(ToJobConfiguration.class), ConfigUtils.getMValidatorsFromConfigurationClass(ToJobConfiguration.class));
|
MToConfig newConfigs = new MToConfig(ConfigUtils.toConfigs(ToJobConfiguration.class), ConfigUtils.getMValidatorsFromConfigurationClass(ToJobConfiguration.class));
|
||||||
originalConfigs.getInput("toJobConfig.schemaName").setValue("test-schema");
|
originalConfigs.getInput("toJobConfig.schemaName").setValue("test-schema");
|
||||||
originalConfigs.getInput("toJobConfig.tableName").setValue("test-tableName");
|
originalConfigs.getInput("toJobConfig.tableName").setValue("test-tableName");
|
||||||
originalConfigs.getInput("toJobConfig.columns").setValue("test-columns");
|
|
||||||
originalConfigs.getInput("toJobConfig.stageTableName").setValue("test-stageTableName");
|
originalConfigs.getInput("toJobConfig.stageTableName").setValue("test-stageTableName");
|
||||||
originalConfigs.getInput("toJobConfig.shouldClearStageTable").setValue("test-shouldClearStageTable");
|
originalConfigs.getInput("toJobConfig.shouldClearStageTable").setValue("test-shouldClearStageTable");
|
||||||
upgrader.upgradeToJobConfig(originalConfigs, newConfigs);
|
upgrader.upgradeToJobConfig(originalConfigs, newConfigs);
|
||||||
assertEquals(originalConfigs, newConfigs);
|
assertEquals(originalConfigs, newConfigs);
|
||||||
assertEquals("test-schema", newConfigs.getInput("toJobConfig.schemaName").getValue());
|
assertEquals("test-schema", newConfigs.getInput("toJobConfig.schemaName").getValue());
|
||||||
assertEquals("test-tableName", newConfigs.getInput("toJobConfig.tableName").getValue());
|
assertEquals("test-tableName", newConfigs.getInput("toJobConfig.tableName").getValue());
|
||||||
assertEquals("test-columns", newConfigs.getInput("toJobConfig.columns").getValue());
|
|
||||||
assertEquals("test-stageTableName", newConfigs.getInput("toJobConfig.stageTableName").getValue());
|
assertEquals("test-stageTableName", newConfigs.getInput("toJobConfig.stageTableName").getValue());
|
||||||
assertEquals("test-shouldClearStageTable", newConfigs.getInput("toJobConfig.shouldClearStageTable").getValue());
|
assertEquals("test-shouldClearStageTable", newConfigs.getInput("toJobConfig.shouldClearStageTable").getValue());
|
||||||
}
|
}
|
||||||
@ -97,4 +103,43 @@ public void testLinkConfig() {
|
|||||||
assertEquals("test-password", newConfigs.getInput("linkConfig.password").getValue());
|
assertEquals("test-password", newConfigs.getInput("linkConfig.password").getValue());
|
||||||
assertEquals("test-jdbcProperties", newConfigs.getInput("linkConfig.jdbcProperties").getValue());
|
assertEquals("test-jdbcProperties", newConfigs.getInput("linkConfig.jdbcProperties").getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testColumnsToColumnListFrom() {
|
||||||
|
MFromConfig originalConfigs = new MFromConfig(columnsConfigs("fromJobConfig"), Collections.<MValidator>emptyList());
|
||||||
|
MFromConfig newConfigs = new MFromConfig(ConfigUtils.toConfigs(FromJobConfiguration.class), ConfigUtils.getMValidatorsFromConfigurationClass(FromJobConfiguration.class));
|
||||||
|
originalConfigs.getStringInput("fromJobConfig.columns").setValue("id,first,second");
|
||||||
|
upgrader.upgradeFromJobConfig(originalConfigs, newConfigs);
|
||||||
|
|
||||||
|
List<String> columns = newConfigs.getListInput("fromJobConfig.columnList").getValue();
|
||||||
|
assertNotNull(columns);
|
||||||
|
assertEquals(3, columns.size());
|
||||||
|
assertEquals("id", columns.get(0));
|
||||||
|
assertEquals("first", columns.get(1));
|
||||||
|
assertEquals("second", columns.get(2));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testColumnsToColumnListTo() {
|
||||||
|
MToConfig originalConfigs = new MToConfig(columnsConfigs("toJobConfig"), Collections.<MValidator>emptyList());
|
||||||
|
MToConfig newConfigs = new MToConfig(ConfigUtils.toConfigs(ToJobConfiguration.class), ConfigUtils.getMValidatorsFromConfigurationClass(ToJobConfiguration.class));
|
||||||
|
originalConfigs.getStringInput("toJobConfig.columns").setValue("id,first,second");
|
||||||
|
upgrader.upgradeToJobConfig(originalConfigs, newConfigs);
|
||||||
|
|
||||||
|
List<String> columns = newConfigs.getListInput("toJobConfig.columnList").getValue();
|
||||||
|
assertNotNull(columns);
|
||||||
|
assertEquals(3, columns.size());
|
||||||
|
assertEquals("id", columns.get(0));
|
||||||
|
assertEquals("first", columns.get(1));
|
||||||
|
assertEquals("second", columns.get(2));
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<MConfig> columnsConfigs(String configName) {
|
||||||
|
List<MInput<?>> inputs = new LinkedList<>();
|
||||||
|
inputs.add(new MStringInput(configName + ".columns", false, InputEditable.ANY, "", (short)50, Collections.<MValidator>emptyList()));
|
||||||
|
|
||||||
|
List<MConfig> configs = new LinkedList<>();
|
||||||
|
configs.add(new MConfig(configName, inputs, Collections.<MValidator>emptyList()));
|
||||||
|
return configs;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -35,12 +35,15 @@
|
|||||||
import org.testng.annotations.BeforeMethod;
|
import org.testng.annotations.BeforeMethod;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
public class TestToInitializer {
|
public class TestToInitializer {
|
||||||
private final String schemaName;
|
private final String schemaName;
|
||||||
private final String tableName;
|
private final String tableName;
|
||||||
private final String schemalessTableName;
|
private final String schemalessTableName;
|
||||||
private final String stageTableName;
|
private final String stageTableName;
|
||||||
private final String tableColumns;
|
private final List<String> tableColumns;
|
||||||
private final String testUser;
|
private final String testUser;
|
||||||
|
|
||||||
private GenericJdbcExecutor executor;
|
private GenericJdbcExecutor executor;
|
||||||
@ -50,8 +53,10 @@ public TestToInitializer() {
|
|||||||
tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA";
|
tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA";
|
||||||
schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE";
|
schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE";
|
||||||
stageTableName = getClass().getSimpleName().toUpperCase() + "_STAGE_TABLE";
|
stageTableName = getClass().getSimpleName().toUpperCase() + "_STAGE_TABLE";
|
||||||
tableColumns = "ICOL,VCOL";
|
|
||||||
testUser = "test_user";
|
testUser = "test_user";
|
||||||
|
tableColumns = new LinkedList<>();
|
||||||
|
tableColumns.add("ICOL");
|
||||||
|
tableColumns.add("VCOL");
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeMethod(alwaysRun = true)
|
@BeforeMethod(alwaysRun = true)
|
||||||
@ -108,7 +113,7 @@ public void testTableNameWithTableColumns() throws Exception {
|
|||||||
linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
|
||||||
linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
|
linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
|
||||||
jobConfig.toJobConfig.tableName = schemalessTableName;
|
jobConfig.toJobConfig.tableName = schemalessTableName;
|
||||||
jobConfig.toJobConfig.columns = tableColumns;
|
jobConfig.toJobConfig.columnList = tableColumns;
|
||||||
|
|
||||||
MutableContext context = new MutableMapContext();
|
MutableContext context = new MutableMapContext();
|
||||||
InitializerContext initializerContext = new InitializerContext(context, testUser);
|
InitializerContext initializerContext = new InitializerContext(context, testUser);
|
||||||
@ -155,7 +160,7 @@ public void testTableNameWithTableColumnsWithSchema() throws Exception {
|
|||||||
linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
|
linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
|
||||||
jobConfig.toJobConfig.schemaName = schemaName;
|
jobConfig.toJobConfig.schemaName = schemaName;
|
||||||
jobConfig.toJobConfig.tableName = tableName;
|
jobConfig.toJobConfig.tableName = tableName;
|
||||||
jobConfig.toJobConfig.columns = tableColumns;
|
jobConfig.toJobConfig.columnList = tableColumns;
|
||||||
|
|
||||||
MutableContext context = new MutableMapContext();
|
MutableContext context = new MutableMapContext();
|
||||||
InitializerContext initializerContext = new InitializerContext(context, testUser);
|
InitializerContext initializerContext = new InitializerContext(context, testUser);
|
||||||
|
@ -35,6 +35,7 @@
|
|||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
@Test(groups = "slow")
|
@Test(groups = "slow")
|
||||||
public class FromRDBMSToKiteHiveTest extends HiveConnectorTestCase implements ITest {
|
public class FromRDBMSToKiteHiveTest extends HiveConnectorTestCase implements ITest {
|
||||||
@ -109,7 +110,9 @@ public void testCities() throws Exception {
|
|||||||
|
|
||||||
// Set rdbms "FROM" config
|
// Set rdbms "FROM" config
|
||||||
fillRdbmsFromConfig(job, "id");
|
fillRdbmsFromConfig(job, "id");
|
||||||
job.getFromJobConfig().getStringInput("fromJobConfig.columns").setValue(provider.escapeColumnName("id"));
|
List<String> columns = new java.util.LinkedList<>();
|
||||||
|
columns.add("id");
|
||||||
|
job.getFromJobConfig().getListInput("fromJobConfig.columnList").setValue(columns);
|
||||||
|
|
||||||
// Fill the Kite "TO" config
|
// Fill the Kite "TO" config
|
||||||
MConfigList toConfig = job.getToJobConfig();
|
MConfigList toConfig = job.getToJobConfig();
|
||||||
|
@ -35,6 +35,8 @@
|
|||||||
import org.testng.annotations.Factory;
|
import org.testng.annotations.Factory;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import static org.testng.Assert.assertEquals;
|
import static org.testng.Assert.assertEquals;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -97,7 +99,9 @@ public void testFrom() throws Exception {
|
|||||||
// Fill rdbms "FROM" config
|
// Fill rdbms "FROM" config
|
||||||
fillRdbmsFromConfig(job, "id");
|
fillRdbmsFromConfig(job, "id");
|
||||||
MConfigList fromConfig = job.getFromJobConfig();
|
MConfigList fromConfig = job.getFromJobConfig();
|
||||||
fromConfig.getStringInput("fromJobConfig.columns").setValue(provider.escapeColumnName("value"));
|
List<String> columns = new java.util.LinkedList<>();
|
||||||
|
columns.add("value");
|
||||||
|
fromConfig.getListInput("fromJobConfig.columnList").setValue(columns);
|
||||||
|
|
||||||
// Fill the hdfs "TO" config
|
// Fill the hdfs "TO" config
|
||||||
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
|
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
|
||||||
|
@ -25,6 +25,8 @@
|
|||||||
import org.apache.sqoop.test.testcases.ConnectorTestCase;
|
import org.apache.sqoop.test.testcases.ConnectorTestCase;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Import simple table with various configurations.
|
* Import simple table with various configurations.
|
||||||
*/
|
*/
|
||||||
@ -92,7 +94,11 @@ public void testStories() throws Exception {
|
|||||||
// Connector values
|
// Connector values
|
||||||
fillRdbmsFromConfig(job, "id");
|
fillRdbmsFromConfig(job, "id");
|
||||||
MConfigList configs = job.getFromJobConfig();
|
MConfigList configs = job.getFromJobConfig();
|
||||||
configs.getStringInput("fromJobConfig.columns").setValue(provider.escapeColumnName("id") + "," + provider.escapeColumnName("name") + "," + provider.escapeColumnName("story"));
|
List<String> columns = new java.util.LinkedList<>();
|
||||||
|
columns.add("id");
|
||||||
|
columns.add("name");
|
||||||
|
columns.add("story");
|
||||||
|
configs.getListInput("fromJobConfig.columnList").setValue(columns);
|
||||||
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
|
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
|
||||||
|
|
||||||
saveJob(job);
|
saveJob(job);
|
||||||
@ -130,7 +136,10 @@ public void testColumns() throws Exception {
|
|||||||
// Connector values
|
// Connector values
|
||||||
fillRdbmsFromConfig(job, "id");
|
fillRdbmsFromConfig(job, "id");
|
||||||
MConfigList configs = job.getFromJobConfig();
|
MConfigList configs = job.getFromJobConfig();
|
||||||
configs.getStringInput("fromJobConfig.columns").setValue(provider.escapeColumnName("id") + "," + provider.escapeColumnName("country"));
|
List<String> columns = new java.util.LinkedList<>();
|
||||||
|
columns.add("id");
|
||||||
|
columns.add("country");
|
||||||
|
configs.getListInput("fromJobConfig.columnList").setValue(columns);
|
||||||
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
|
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
|
||||||
|
|
||||||
saveJob(job);
|
saveJob(job);
|
||||||
|
@ -28,6 +28,8 @@
|
|||||||
import org.testng.annotations.BeforeMethod;
|
import org.testng.annotations.BeforeMethod;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public class FromRDBMSToKiteTest extends ConnectorTestCase {
|
public class FromRDBMSToKiteTest extends ConnectorTestCase {
|
||||||
@BeforeMethod(alwaysRun = true)
|
@BeforeMethod(alwaysRun = true)
|
||||||
@ -69,7 +71,9 @@ public void testCities() throws Exception {
|
|||||||
|
|
||||||
// Set rdbms "FROM" config
|
// Set rdbms "FROM" config
|
||||||
fillRdbmsFromConfig(job, "id");
|
fillRdbmsFromConfig(job, "id");
|
||||||
job.getFromJobConfig().getStringInput("fromJobConfig.columns").setValue(provider.escapeColumnName("id"));
|
List<String> columns = new java.util.LinkedList<>();
|
||||||
|
columns.add("id");
|
||||||
|
job.getFromJobConfig().getListInput("fromJobConfig.columnList").setValue(columns);
|
||||||
|
|
||||||
// Fill the Kite "TO" config
|
// Fill the Kite "TO" config
|
||||||
MConfigList toConfig = job.getToJobConfig();
|
MConfigList toConfig = job.getToJobConfig();
|
||||||
|
Loading…
Reference in New Issue
Block a user