5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-16 00:41:23 +08:00

SQOOP-903: Sqoop2: Add schema support to Generic JDBC Connector

(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2013-03-15 13:22:07 -07:00
parent daccebac51
commit 57d4d3aa02
8 changed files with 285 additions and 32 deletions

View File

@ -67,6 +67,7 @@ private void configureJdbcProperties(MutableContext context, ConnectionConfigura
private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) { private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) {
String dataSql; String dataSql;
String schemaName = jobConfig.table.schemaName;
String tableName = jobConfig.table.tableName; String tableName = jobConfig.table.tableName;
String tableSql = jobConfig.table.sql; String tableSql = jobConfig.table.sql;
String tableColumns = jobConfig.table.columns; String tableColumns = jobConfig.table.columns;
@ -79,12 +80,15 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur
} else if (tableName != null) { } else if (tableName != null) {
// when table name is specified: // when table name is specified:
// For databases that support schemas (IE: postgresql).
String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
if (tableColumns == null) { if (tableColumns == null) {
String[] columns = executor.getQueryColumns("SELECT * FROM " String[] columns = executor.getQueryColumns("SELECT * FROM "
+ executor.delimitIdentifier(tableName) + " WHERE 1 = 0"); + fullTableName + " WHERE 1 = 0");
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append("INSERT INTO "); builder.append("INSERT INTO ");
builder.append(executor.delimitIdentifier(tableName)); builder.append(fullTableName);
builder.append(" VALUES (?"); builder.append(" VALUES (?");
for (int i = 1; i < columns.length; i++) { for (int i = 1; i < columns.length; i++) {
builder.append(",?"); builder.append(",?");
@ -96,7 +100,7 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur
String[] columns = StringUtils.split(tableColumns, ','); String[] columns = StringUtils.split(tableColumns, ',');
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append("INSERT INTO "); builder.append("INSERT INTO ");
builder.append(executor.delimitIdentifier(tableName)); builder.append(fullTableName);
builder.append(" ("); builder.append(" (");
builder.append(tableColumns); builder.append(tableColumns);
builder.append(") VALUES (?"); builder.append(") VALUES (?");

View File

@ -104,6 +104,7 @@ private void configurePartitionProperties(MutableContext context, ConnectionConf
if (minMaxQuery == null) { if (minMaxQuery == null) {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
String schemaName = jobConfig.table.schemaName;
String tableName = jobConfig.table.tableName; String tableName = jobConfig.table.tableName;
String tableSql = jobConfig.table.sql; String tableSql = jobConfig.table.sql;
@ -114,13 +115,17 @@ private void configurePartitionProperties(MutableContext context, ConnectionConf
} else if (tableName != null) { } else if (tableName != null) {
// when table name is specified: // when table name is specified:
// For databases that support schemas (IE: postgresql).
String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
String column = partitionColumnName; String column = partitionColumnName;
builder.append("SELECT MIN("); builder.append("SELECT MIN(");
builder.append(column); builder.append(column);
builder.append("), MAX("); builder.append("), MAX(");
builder.append(column); builder.append(column);
builder.append(") FROM "); builder.append(") FROM ");
builder.append(executor.delimitIdentifier(tableName)); builder.append(fullTableName);
} else if (tableSql != null) { } else if (tableSql != null) {
String column = executor.qualify( String column = executor.qualify(
@ -177,6 +182,7 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur
String dataSql; String dataSql;
String fieldNames; String fieldNames;
String schemaName = jobConfig.table.schemaName;
String tableName = jobConfig.table.tableName; String tableName = jobConfig.table.tableName;
String tableSql = jobConfig.table.sql; String tableSql = jobConfig.table.sql;
String tableColumns = jobConfig.table.columns; String tableColumns = jobConfig.table.columns;
@ -189,10 +195,13 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur
} else if (tableName != null) { } else if (tableName != null) {
// when table name is specified: // when table name is specified:
// For databases that support schemas (IE: postgresql).
String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
if (tableColumns == null) { if (tableColumns == null) {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append("SELECT * FROM "); builder.append("SELECT * FROM ");
builder.append(executor.delimitIdentifier(tableName)); builder.append(fullTableName);
builder.append(" WHERE "); builder.append(" WHERE ");
builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN); builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
dataSql = builder.toString(); dataSql = builder.toString();
@ -206,7 +215,7 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur
builder.append("SELECT "); builder.append("SELECT ");
builder.append(tableColumns); builder.append(tableColumns);
builder.append(" FROM "); builder.append(" FROM ");
builder.append(executor.delimitIdentifier(tableName)); builder.append(fullTableName);
builder.append(" WHERE "); builder.append(" WHERE ");
builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN); builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
dataSql = builder.toString(); dataSql = builder.toString();

View File

@ -102,6 +102,9 @@ private Validation validateImportJob(Object jobConfiguration) {
if(configuration.table.tableName != null && configuration.table.sql != null) { if(configuration.table.tableName != null && configuration.table.sql != null) {
validation.addMessage(Status.UNACCEPTABLE, "table", "Both table name and SQL cannot be specified"); validation.addMessage(Status.UNACCEPTABLE, "table", "Both table name and SQL cannot be specified");
} }
if(configuration.table.schemaName != null && configuration.table.sql != null) {
validation.addMessage(Status.UNACCEPTABLE, "table", "Both schema name and SQL cannot be specified");
}
if(configuration.table.sql != null && !configuration.table.sql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN)) { if(configuration.table.sql != null && !configuration.table.sql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN)) {
validation.addMessage(Status.UNACCEPTABLE, "table", "sql", "SQL statement must contain placeholder for auto generated " validation.addMessage(Status.UNACCEPTABLE, "table", "sql", "SQL statement must contain placeholder for auto generated "

View File

@ -25,6 +25,7 @@
*/ */
@FormClass @FormClass
public class ExportTableForm { public class ExportTableForm {
@Input(size = 50) public String schemaName;
@Input(size = 50) public String tableName; @Input(size = 50) public String tableName;
@Input(size = 50) public String sql; @Input(size = 50) public String sql;
@Input(size = 50) public String columns; @Input(size = 50) public String columns;

View File

@ -25,6 +25,7 @@
*/ */
@FormClass @FormClass
public class ImportTableForm { public class ImportTableForm {
@Input(size = 50) public String schemaName;
@Input(size = 50) public String tableName; @Input(size = 50) public String tableName;
@Input(size = 50) public String sql; @Input(size = 50) public String sql;
@Input(size = 50) public String columns; @Input(size = 50) public String columns;

View File

@ -53,6 +53,10 @@ table.label = Database configuration
table.help = You must supply the information requested in order to create \ table.help = You must supply the information requested in order to create \
a job object. a job object.
# Schema name
table.schemaName.label = Schema name
table.schemaName.help = Schema name to process data in the remote database
# Table name # Table name
table.tableName.label = Table name table.tableName.label = Table name
table.tableName.help = Table name to process data in the remote database table.tableName.help = Table name to process data in the remote database

View File

@ -28,15 +28,21 @@
public class TestExportInitializer extends TestCase { public class TestExportInitializer extends TestCase {
private final String schemaName;
private final String tableName; private final String tableName;
private final String schemalessTableName;
private final String tableSql; private final String tableSql;
private final String schemalessTableSql;
private final String tableColumns; private final String tableColumns;
private GenericJdbcExecutor executor; private GenericJdbcExecutor executor;
public TestExportInitializer() { public TestExportInitializer() {
tableName = getClass().getSimpleName().toUpperCase(); schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA";
tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA";
schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE";
tableSql = "INSERT INTO " + tableName + " VALUES (?,?,?)"; tableSql = "INSERT INTO " + tableName + " VALUES (?,?,?)";
schemalessTableSql = "INSERT INTO " + schemalessTableName + " VALUES (?,?,?)";
tableColumns = "ICOL,VCOL"; tableColumns = "ICOL,VCOL";
} }
@ -45,10 +51,15 @@ public void setUp() {
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
GenericJdbcTestConstants.URL, null, null); GenericJdbcTestConstants.URL, null, null);
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
if (!executor.existTable(tableName)) { if (!executor.existTable(tableName)) {
executor.executeUpdate("CREATE TABLE " executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName));
+ executor.delimitIdentifier(tableName) executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
+ "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); }
fullTableName = executor.delimitIdentifier(schemalessTableName);
if (!executor.existTable(schemalessTableName)) {
executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
} }
} }
@ -57,62 +68,131 @@ public void tearDown() {
executor.close(); executor.close();
} }
@SuppressWarnings("unchecked")
public void testTableName() throws Exception { public void testTableName() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration(); ConnectionConfiguration connConf = new ConnectionConfiguration();
ExportJobConfiguration jobConf = new ExportJobConfiguration(); ExportJobConfiguration jobConf = new ExportJobConfiguration();
String fullTableName = executor.delimitIdentifier(schemalessTableName);
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL; connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.table.tableName = tableName; jobConf.table.tableName = schemalessTableName;
MutableContext context = new MutableMapContext(); MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context); InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcExportInitializer(); Initializer initializer = new GenericJdbcExportInitializer();
initializer.initialize(initializerContext, connConf, jobConf); initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context, verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)");
"INSERT INTO " + executor.delimitIdentifier(tableName)
+ " VALUES (?,?,?)");
} }
@SuppressWarnings("unchecked")
public void testTableNameWithTableColumns() throws Exception { public void testTableNameWithTableColumns() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration(); ConnectionConfiguration connConf = new ConnectionConfiguration();
ExportJobConfiguration jobConf = new ExportJobConfiguration(); ExportJobConfiguration jobConf = new ExportJobConfiguration();
String fullTableName = executor.delimitIdentifier(schemalessTableName);
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL; connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.table.tableName = tableName; jobConf.table.tableName = schemalessTableName;
jobConf.table.columns = tableColumns; jobConf.table.columns = tableColumns;
MutableContext context = new MutableMapContext(); MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context); InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcExportInitializer(); Initializer initializer = new GenericJdbcExportInitializer();
initializer.initialize(initializerContext, connConf, jobConf); initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context, verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)");
"INSERT INTO " + executor.delimitIdentifier(tableName)
+ " (" + tableColumns + ") VALUES (?,?)");
} }
@SuppressWarnings("unchecked")
public void testTableSql() throws Exception { public void testTableSql() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration(); ConnectionConfiguration connConf = new ConnectionConfiguration();
ExportJobConfiguration jobConf = new ExportJobConfiguration(); ExportJobConfiguration jobConf = new ExportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL; connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.table.sql = schemalessTableSql;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcExportInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(schemalessTableName) + " VALUES (?,?,?)");
}
@SuppressWarnings("unchecked")
public void testTableNameWithSchema() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ExportJobConfiguration jobConf = new ExportJobConfiguration();
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.table.schemaName = schemaName;
jobConf.table.tableName = tableName;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcExportInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)");
}
@SuppressWarnings("unchecked")
public void testTableNameWithTableColumnsWithSchema() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ExportJobConfiguration jobConf = new ExportJobConfiguration();
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.table.schemaName = schemaName;
jobConf.table.tableName = tableName;
jobConf.table.columns = tableColumns;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcExportInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)");
}
@SuppressWarnings("unchecked")
public void testTableSqlWithSchema() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ExportJobConfiguration jobConf = new ExportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.table.schemaName = schemaName;
jobConf.table.sql = tableSql; jobConf.table.sql = tableSql;
MutableContext context = new MutableMapContext(); MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context); InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcExportInitializer(); Initializer initializer = new GenericJdbcExportInitializer();
initializer.initialize(initializerContext, connConf, jobConf); initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context, verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?)");
"INSERT INTO " + executor.delimitIdentifier(tableName)
+ " VALUES (?,?,?)");
} }
private void verifyResult(MutableContext context, String dataSql) { private void verifyResult(MutableContext context, String dataSql) {

View File

@ -31,8 +31,11 @@
public class TestImportInitializer extends TestCase { public class TestImportInitializer extends TestCase {
private final String schemaName;
private final String tableName; private final String tableName;
private final String schemalessTableName;
private final String tableSql; private final String tableSql;
private final String schemalessTableSql;
private final String tableColumns; private final String tableColumns;
private GenericJdbcExecutor executor; private GenericJdbcExecutor executor;
@ -41,8 +44,11 @@ public class TestImportInitializer extends TestCase {
private static final int NUMBER_OF_ROWS = 101; private static final int NUMBER_OF_ROWS = 101;
public TestImportInitializer() { public TestImportInitializer() {
tableName = getClass().getSimpleName().toUpperCase(); schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA";
tableSql = "SELECT * FROM " + tableName + " WHERE ${CONDITIONS}"; tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA";
schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE";
tableSql = "SELECT * FROM " + schemaName + "." + tableName + " WHERE ${CONDITIONS}";
schemalessTableSql = "SELECT * FROM " + schemalessTableName + " WHERE ${CONDITIONS}";
tableColumns = "ICOL,VCOL"; tableColumns = "ICOL,VCOL";
} }
@ -51,14 +57,30 @@ public void setUp() {
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
GenericJdbcTestConstants.URL, null, null); GenericJdbcTestConstants.URL, null, null);
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
if (!executor.existTable(tableName)) { if (!executor.existTable(tableName)) {
executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName));
executor.executeUpdate("CREATE TABLE " executor.executeUpdate("CREATE TABLE "
+ executor.delimitIdentifier(tableName) + fullTableName
+ "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
for (int i = 0; i < NUMBER_OF_ROWS; i++) { for (int i = 0; i < NUMBER_OF_ROWS; i++) {
int value = START + i; int value = START + i;
String sql = "INSERT INTO " + executor.delimitIdentifier(tableName) String sql = "INSERT INTO " + fullTableName
+ " VALUES(" + value + ", " + value + ", '" + value + "')";
executor.executeUpdate(sql);
}
}
fullTableName = executor.delimitIdentifier(schemalessTableName);
if (!executor.existTable(schemalessTableName)) {
executor.executeUpdate("CREATE TABLE "
+ fullTableName
+ "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
for (int i = 0; i < NUMBER_OF_ROWS; i++) {
int value = START + i;
String sql = "INSERT INTO " + fullTableName
+ " VALUES(" + value + ", " + value + ", '" + value + "')"; + " VALUES(" + value + ", " + value + ", '" + value + "')";
executor.executeUpdate(sql); executor.executeUpdate(sql);
} }
@ -70,22 +92,24 @@ public void tearDown() {
executor.close(); executor.close();
} }
@SuppressWarnings("unchecked")
public void testTableName() throws Exception { public void testTableName() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration(); ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration(); ImportJobConfiguration jobConf = new ImportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL; connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.table.tableName = tableName; jobConf.table.tableName = schemalessTableName;
MutableContext context = new MutableMapContext(); MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context); InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcImportInitializer(); Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(initializerContext, connConf, jobConf); initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context, verifyResult(context,
"SELECT * FROM " + executor.delimitIdentifier(tableName) "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
+ " WHERE ${CONDITIONS}", + " WHERE ${CONDITIONS}",
"ICOL,DCOL,VCOL", "ICOL,DCOL,VCOL",
"ICOL", "ICOL",
@ -94,23 +118,25 @@ public void testTableName() throws Exception {
String.valueOf(START+NUMBER_OF_ROWS-1)); String.valueOf(START+NUMBER_OF_ROWS-1));
} }
@SuppressWarnings("unchecked")
public void testTableNameWithTableColumns() throws Exception { public void testTableNameWithTableColumns() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration(); ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration(); ImportJobConfiguration jobConf = new ImportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL; connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.table.tableName = tableName; jobConf.table.tableName = schemalessTableName;
jobConf.table.columns = tableColumns; jobConf.table.columns = tableColumns;
MutableContext context = new MutableMapContext(); MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context); InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcImportInitializer(); Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(initializerContext, connConf, jobConf); initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context, verifyResult(context,
"SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(tableName) "SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(schemalessTableName)
+ " WHERE ${CONDITIONS}", + " WHERE ${CONDITIONS}",
tableColumns, tableColumns,
"ICOL", "ICOL",
@ -119,23 +145,25 @@ public void testTableNameWithTableColumns() throws Exception {
String.valueOf(START+NUMBER_OF_ROWS-1)); String.valueOf(START+NUMBER_OF_ROWS-1));
} }
@SuppressWarnings("unchecked")
public void testTableSql() throws Exception { public void testTableSql() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration(); ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration(); ImportJobConfiguration jobConf = new ImportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL; connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.table.sql = tableSql; jobConf.table.sql = schemalessTableSql;
jobConf.table.partitionColumn = "DCOL"; jobConf.table.partitionColumn = "DCOL";
MutableContext context = new MutableMapContext(); MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context); InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcImportInitializer(); Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(initializerContext, connConf, jobConf); initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context, verifyResult(context,
"SELECT * FROM " + executor.delimitIdentifier(tableName) "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
+ " WHERE ${CONDITIONS}", + " WHERE ${CONDITIONS}",
"ICOL,DCOL,VCOL", "ICOL,DCOL,VCOL",
"DCOL", "DCOL",
@ -144,12 +172,134 @@ public void testTableSql() throws Exception {
String.valueOf((double)(START+NUMBER_OF_ROWS-1))); String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
} }
@SuppressWarnings("unchecked")
public void testTableSqlWithTableColumns() throws Exception { public void testTableSqlWithTableColumns() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration(); ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration(); ImportJobConfiguration jobConf = new ImportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL; connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.table.sql = schemalessTableSql;
jobConf.table.columns = tableColumns;
jobConf.table.partitionColumn = "DCOL";
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "
+ "(SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
+ " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS",
tableColumns,
"DCOL",
String.valueOf(Types.DOUBLE),
String.valueOf((double)START),
String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
}
@SuppressWarnings("unchecked")
public void testTableNameWithSchema() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration();
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.table.schemaName = schemaName;
jobConf.table.tableName = tableName;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"SELECT * FROM " + fullTableName
+ " WHERE ${CONDITIONS}",
"ICOL,DCOL,VCOL",
"ICOL",
String.valueOf(Types.INTEGER),
String.valueOf(START),
String.valueOf(START+NUMBER_OF_ROWS-1));
}
@SuppressWarnings("unchecked")
public void testTableNameWithTableColumnsWithSchema() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration();
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.table.schemaName = schemaName;
jobConf.table.tableName = tableName;
jobConf.table.columns = tableColumns;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"SELECT ICOL,VCOL FROM " + fullTableName
+ " WHERE ${CONDITIONS}",
tableColumns,
"ICOL",
String.valueOf(Types.INTEGER),
String.valueOf(START),
String.valueOf(START+NUMBER_OF_ROWS-1));
}
@SuppressWarnings("unchecked")
public void testTableSqlWithSchema() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration();
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.table.schemaName = schemaName;
jobConf.table.sql = tableSql;
jobConf.table.partitionColumn = "DCOL";
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"SELECT * FROM " + fullTableName
+ " WHERE ${CONDITIONS}",
"ICOL,DCOL,VCOL",
"DCOL",
String.valueOf(Types.DOUBLE),
String.valueOf((double)START),
String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
}
@SuppressWarnings("unchecked")
public void testTableSqlWithTableColumnsWithSchema() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration();
String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
jobConf.table.schemaName = schemaName;
jobConf.table.sql = tableSql; jobConf.table.sql = tableSql;
jobConf.table.columns = tableColumns; jobConf.table.columns = tableColumns;
jobConf.table.partitionColumn = "DCOL"; jobConf.table.partitionColumn = "DCOL";
@ -157,12 +307,13 @@ public void testTableSqlWithTableColumns() throws Exception {
MutableContext context = new MutableMapContext(); MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context); InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcImportInitializer(); Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(initializerContext, connConf, jobConf); initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context, verifyResult(context,
"SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "
+ "(SELECT * FROM " + executor.delimitIdentifier(tableName) + "(SELECT * FROM " + fullTableName
+ " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS", + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS",
tableColumns, tableColumns,
"DCOL", "DCOL",