5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-09 01:50:07 +08:00

SQOOP-681 Split configuration in Generic connector

(Jarek Jarcec Cecho)
This commit is contained in:
Bilung Lee 2012-11-15 10:54:23 -08:00
parent c39c7c9f1a
commit ae23cb26dc
10 changed files with 88 additions and 154 deletions

View File

@ -26,32 +26,6 @@ public final class GenericJdbcConnectorConstants {
public static final String RESOURCE_BUNDLE_NAME =
"generic-jdbc-connector-resources";
// Metadata constants
// Connection form
public static final String FORM_CONNECTION = "form-connection";
// Connection form inputs
public static final String INPUT_CONN_JDBCDRIVER = "inp-conn-jdbcdriver";
public static final String INPUT_CONN_CONNECTSTRING =
"inp-conn-connectstring";
public static final String INPUT_CONN_USERNAME = "inp-conn-username";
public static final String INPUT_CONN_PASSWORD = "inp-conn-password";
public static final String INPUT_CONN_JDBCPROPS = "inp-conn-jdbc-properties";
// Table form
public static final String FORM_TABLE = "form-table";
// Table form inputs
public static final String INPUT_TBL_NAME = "inp-tbl-name";
public static final String INPUT_TBL_SQL = "inp-tbl-sql";
public static final String INPUT_TBL_COLUMNS = "inp-tbl-columns";
public static final String INPUT_TBL_WAREHOUSE = "inp-tbl-warehouse";
public static final String INPUT_TBL_DATADIR = "inp-tbl-datadir";
public static final String INPUT_TBL_PCOL = "inp-tbl-pcol";
public static final String INPUT_TBL_BOUNDARY = "inp-tbl-boundary";
/*
* All jdbc connector related configuration is prefixed with this:
* <tt>org.apache.sqoop.jdbc.</tt>
@ -80,12 +54,6 @@ public final class GenericJdbcConnectorConstants {
public static final String CONNECTOR_JDBC_DATA_SQL =
PREFIX_CONNECTOR_JDBC_CONFIG + "data.sql";
public static final String FILE_SEPARATOR = System.getProperty("file.separator");
public static final String DEFAULT_WAREHOUSE = "/tmp/sqoop/warehouse/";
public static final String DEFAULT_DATADIR = "DataStore";
public static final String SQL_CONDITIONS_TOKEN = "${CONDITIONS}";
public static final String SQL_PARAMETER_MARKER = "?";

View File

@ -67,7 +67,7 @@ private void configureJdbcProperties(MutableContext context, ConnectionConfigura
if (driver == null) {
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0012,
GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER);
"JDBC Driver");
}
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER,
@ -76,7 +76,7 @@ private void configureJdbcProperties(MutableContext context, ConnectionConfigura
if (url == null) {
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0012,
GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING);
"Connection string");
}
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL,
@ -99,19 +99,10 @@ private void configureJdbcProperties(MutableContext context, ConnectionConfigura
private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) {
String dataSql;
String inputDirectory;
String tableName = connectionConfig.table.tableName;
String tableSql = connectionConfig.table.sql;
String tableColumns = connectionConfig.table.columns;
String datadir = connectionConfig.table.dataDirectory;
String warehouse = connectionConfig.table.warehouse;
if (warehouse == null) {
warehouse = GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE;
} else if (!warehouse.endsWith(GenericJdbcConnectorConstants.FILE_SEPARATOR)) {
warehouse += GenericJdbcConnectorConstants.FILE_SEPARATOR;
}
String tableName = jobConfig.table.tableName;
String tableSql = jobConfig.table.sql;
String tableColumns = jobConfig.table.columns;
if (tableName != null && tableSql != null) {
// when both table name and table sql are specified:
@ -148,13 +139,6 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur
builder.append(")");
dataSql = builder.toString();
}
if (datadir == null) {
inputDirectory = warehouse + tableName;
} else {
inputDirectory = warehouse + datadir;
}
} else if (tableSql != null) {
// when table sql is specified:
@ -171,14 +155,6 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0014);
}
if (datadir == null) {
inputDirectory =
warehouse + GenericJdbcConnectorConstants.DEFAULT_DATADIR;
} else {
inputDirectory = warehouse + datadir;
}
} else {
// when neither are specified:
throw new SqoopException(
@ -187,7 +163,5 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL,
dataSql.toString());
context.setString(Constants.JOB_ETL_INPUT_DIRECTORY, inputDirectory);
}
}

View File

@ -77,7 +77,7 @@ private void configureJdbcProperties(MutableContext context, ConnectionConfigura
if (driver == null) {
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0012,
GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER);
"JDBC Driver");
}
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER,
@ -86,7 +86,7 @@ private void configureJdbcProperties(MutableContext context, ConnectionConfigura
if (url == null) {
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0012,
GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING);
"Connection string");
}
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL,
@ -110,12 +110,12 @@ private void configureJdbcProperties(MutableContext context, ConnectionConfigura
private void configurePartitionProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
// ----- configure column name -----
String partitionColumnName = connectionConfig.table.partitionColumn;
String partitionColumnName = jobConfig.table.partitionColumn;
if (partitionColumnName == null) {
// if column is not specified by the user,
// find the primary key of the table (when there is a table).
String tableName = connectionConfig.table.tableName;
String tableName = jobConfig.table.tableName;
if (tableName != null) {
partitionColumnName = executor.getPrimaryKey(tableName);
}
@ -133,13 +133,13 @@ private void configurePartitionProperties(MutableContext context, ConnectionConf
// ----- configure column type, min value, and max value -----
String minMaxQuery = connectionConfig.table.boundaryQuery;
String minMaxQuery = jobConfig.table.boundaryQuery;
if (minMaxQuery == null) {
StringBuilder builder = new StringBuilder();
String tableName = connectionConfig.table.tableName;
String tableSql = connectionConfig.table.sql;
String tableName = jobConfig.table.tableName;
String tableSql = jobConfig.table.sql;
if (tableName != null && tableSql != null) {
// when both table name and table sql are specified:
@ -210,20 +210,10 @@ private void configurePartitionProperties(MutableContext context, ConnectionConf
private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
String dataSql;
String fieldNames;
String outputDirectory;
String tableName = connectionConfig.table.tableName;
String tableSql = connectionConfig.table.sql;
String tableColumns = connectionConfig.table.columns;
//TODO(jarcec): Why is connector concerned with data directory? It should not need it at all!
String datadir = connectionConfig.table.dataDirectory;
String warehouse = connectionConfig.table.warehouse;
if (warehouse == null) {
warehouse = GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE;
} else if (!warehouse.endsWith(GenericJdbcConnectorConstants.FILE_SEPARATOR)) {
warehouse += GenericJdbcConnectorConstants.FILE_SEPARATOR;
}
String tableName = jobConfig.table.tableName;
String tableSql = jobConfig.table.sql;
String tableColumns = jobConfig.table.columns;
if (tableName != null && tableSql != null) {
// when both table name and table sql are specified:
@ -257,13 +247,6 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur
fieldNames = tableColumns;
}
if (datadir == null) {
outputDirectory = warehouse + tableName;
} else {
outputDirectory = warehouse + datadir;
}
} else if (tableSql != null) {
// when table sql is specified:
@ -301,14 +284,6 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur
fieldNames = tableColumns;
}
if (datadir == null) {
outputDirectory =
warehouse + GenericJdbcConnectorConstants.DEFAULT_DATADIR;
} else {
outputDirectory = warehouse + datadir;
}
} else {
// when neither are specified:
throw new SqoopException(
@ -318,7 +293,5 @@ private void configureTableProperties(MutableContext context, ConnectionConfigur
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL,
dataSql.toString());
context.setString(Constants.JOB_ETL_FIELD_NAMES, fieldNames);
context.setString(Constants.JOB_ETL_OUTPUT_DIRECTORY, outputDirectory);
}
}

View File

@ -28,10 +28,7 @@ public class ConnectionConfiguration {
@Form public ConnectionForm connection;
@Form public TableForm table;
public ConnectionConfiguration() {
connection = new ConnectionForm();
table = new TableForm();
}
}

View File

@ -25,5 +25,9 @@
*/
@ConfigurationClass
public class ExportJobConfiguration {
@Form IgnoredForm ignored;
@Form public ExportTableForm table;
public ExportJobConfiguration() {
table = new ExportTableForm();
}
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc.configuration;
import org.apache.sqoop.model.FormClass;
import org.apache.sqoop.model.Input;
/**
*
*/
@FormClass
public class ExportTableForm {
@Input(size = 50) public String tableName;
@Input(size = 50) public String sql;
@Input(size = 50) public String columns;
}

View File

@ -25,5 +25,9 @@
*/
@ConfigurationClass
public class ImportJobConfiguration {
@Form IgnoredForm ignored;
@Form public ImportTableForm table;
public ImportJobConfiguration() {
table = new ImportTableForm();
}
}

View File

@ -24,12 +24,10 @@
*
*/
@FormClass
public class TableForm {
public class ImportTableForm {
@Input(size = 50) public String tableName;
@Input(size = 50) public String sql;
@Input(size = 50) public String columns;
@Input(size = 50) public String warehouse;
@Input(size = 50) public String dataDirectory;
@Input(size = 50) public String partitionColumn;
@Input(size = 50) public String boundaryQuery;
}

View File

@ -59,11 +59,11 @@ public void tearDown() {
public void testTableName() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ExportJobConfiguration jobConf = new ExportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
connConf.table.tableName = tableName;
ExportJobConfiguration jobConf = new ExportJobConfiguration();
jobConf.table.tableName = tableName;
MutableContext context = new MutableMapContext();
@ -72,18 +72,17 @@ public void testTableName() throws Exception {
verifyResult(context,
"INSERT INTO " + executor.delimitIdentifier(tableName)
+ " VALUES (?,?,?)",
GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE + tableName);
+ " VALUES (?,?,?)");
}
public void testTableNameWithTableColumns() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ExportJobConfiguration jobConf = new ExportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
connConf.table.tableName = tableName;
connConf.table.columns = tableColumns;
ExportJobConfiguration jobConf = new ExportJobConfiguration();
jobConf.table.tableName = tableName;
jobConf.table.columns = tableColumns;
MutableContext context = new MutableMapContext();
@ -92,17 +91,16 @@ public void testTableNameWithTableColumns() throws Exception {
verifyResult(context,
"INSERT INTO " + executor.delimitIdentifier(tableName)
+ " (" + tableColumns + ") VALUES (?,?)",
GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE + tableName);
+ " (" + tableColumns + ") VALUES (?,?)");
}
public void testTableSql() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ExportJobConfiguration jobConf = new ExportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
connConf.table.sql = tableSql;
ExportJobConfiguration jobConf = new ExportJobConfiguration();
jobConf.table.sql = tableSql;
MutableContext context = new MutableMapContext();
@ -111,16 +109,11 @@ public void testTableSql() throws Exception {
verifyResult(context,
"INSERT INTO " + executor.delimitIdentifier(tableName)
+ " VALUES (?,?,?)",
GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE
+ GenericJdbcConnectorConstants.DEFAULT_DATADIR);
+ " VALUES (?,?,?)");
}
private void verifyResult(MutableContext context,
String dataSql, String inputDirectory) {
private void verifyResult(MutableContext context, String dataSql) {
assertEquals(dataSql, context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL));
assertEquals(inputDirectory, context.getString(
Constants.JOB_ETL_INPUT_DIRECTORY));
}
}

View File

@ -71,11 +71,11 @@ public void tearDown() {
public void testTableName() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
connConf.table.tableName = tableName;
ImportJobConfiguration jobConf = new ImportJobConfiguration();
jobConf.table.tableName = tableName;
MutableContext context = new MutableMapContext();
@ -86,7 +86,6 @@ public void testTableName() throws Exception {
"SELECT * FROM " + executor.delimitIdentifier(tableName)
+ " WHERE ${CONDITIONS}",
"ICOL,DCOL,VCOL",
GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE + tableName,
"ICOL",
String.valueOf(Types.INTEGER),
String.valueOf(START),
@ -95,12 +94,12 @@ public void testTableName() throws Exception {
public void testTableNameWithTableColumns() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
connConf.table.tableName = tableName;
connConf.table.columns = tableColumns;
ImportJobConfiguration jobConf = new ImportJobConfiguration();
jobConf.table.tableName = tableName;
jobConf.table.columns = tableColumns;
MutableContext context = new MutableMapContext();
@ -111,7 +110,6 @@ public void testTableNameWithTableColumns() throws Exception {
"SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(tableName)
+ " WHERE ${CONDITIONS}",
tableColumns,
GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE + tableName,
"ICOL",
String.valueOf(Types.INTEGER),
String.valueOf(START),
@ -120,12 +118,12 @@ public void testTableNameWithTableColumns() throws Exception {
public void testTableSql() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
connConf.table.sql = tableSql;
connConf.table.partitionColumn = "DCOL";
ImportJobConfiguration jobConf = new ImportJobConfiguration();
jobConf.table.sql = tableSql;
jobConf.table.partitionColumn = "DCOL";
MutableContext context = new MutableMapContext();
@ -136,8 +134,6 @@ public void testTableSql() throws Exception {
"SELECT * FROM " + executor.delimitIdentifier(tableName)
+ " WHERE ${CONDITIONS}",
"ICOL,DCOL,VCOL",
GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE
+ GenericJdbcConnectorConstants.DEFAULT_DATADIR,
"DCOL",
String.valueOf(Types.DOUBLE),
String.valueOf((double)START),
@ -146,13 +142,13 @@ public void testTableSql() throws Exception {
public void testTableSqlWithTableColumns() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
connConf.table.sql = tableSql;
connConf.table.columns = tableColumns;
connConf.table.partitionColumn = "DCOL";
ImportJobConfiguration jobConf = new ImportJobConfiguration();
jobConf.table.sql = tableSql;
jobConf.table.columns = tableColumns;
jobConf.table.partitionColumn = "DCOL";
MutableContext context = new MutableMapContext();
@ -164,8 +160,6 @@ public void testTableSqlWithTableColumns() throws Exception {
+ "(SELECT * FROM " + executor.delimitIdentifier(tableName)
+ " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS",
tableColumns,
GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE
+ GenericJdbcConnectorConstants.DEFAULT_DATADIR,
"DCOL",
String.valueOf(Types.DOUBLE),
String.valueOf((double)START),
@ -173,15 +167,13 @@ public void testTableSqlWithTableColumns() throws Exception {
}
private void verifyResult(MutableContext context,
String dataSql, String fieldNames, String outputDirectory,
String dataSql, String fieldNames,
String partitionColumnName, String partitionColumnType,
String partitionMinValue, String partitionMaxValue) {
assertEquals(dataSql, context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL));
assertEquals(fieldNames, context.getString(
Constants.JOB_ETL_FIELD_NAMES));
assertEquals(outputDirectory, context.getString(
Constants.JOB_ETL_OUTPUT_DIRECTORY));
assertEquals(partitionColumnName, context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME));