5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-19 02:10:54 +08:00

SQOOP-605: Generic JDBC connector for import

(Bilung Lee via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2012-10-08 16:28:18 -07:00
parent f1893ab9bb
commit d4b4df8825
22 changed files with 1677 additions and 41 deletions

View File

@ -35,8 +35,41 @@ limitations under the License.
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-spi</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>sqoop</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -86,7 +86,13 @@ public class GenericJdbcConnector implements SqoopConnector {
forms = new ArrayList<MForm>();
inputs = new ArrayList<MInput<?>>();
inputs.add(new MStringInput(INPUT_TBL_TABLE, false, (short) 50));
inputs.add(new MStringInput(INPUT_TBL_NAME, false, (short) 50));
inputs.add(new MStringInput(INPUT_TBL_SQL, false, (short) 50));
inputs.add(new MStringInput(INPUT_TBL_COLUMNS, false, (short) 50));
inputs.add(new MStringInput(INPUT_TBL_WAREHOUSE, false, (short) 50));
inputs.add(new MStringInput(INPUT_TBL_DATADIR, false, (short) 50));
inputs.add(new MStringInput(INPUT_TBL_PCOL, false, (short) 50));
inputs.add(new MStringInput(INPUT_TBL_BOUNDARY, false, (short) 50));
forms.add(new MForm(FORM_TABLE, inputs));
JOB_FORMS = new ArrayList<MJobForms>();

View File

@ -43,7 +43,54 @@ public final class GenericJdbcConnectorConstants {
public static final String FORM_TABLE = "form-table";
// Table form inputs
public static final String INPUT_TBL_TABLE = "inp-tbl-table";
public static final String INPUT_TBL_NAME = "inp-tbl-name";
public static final String INPUT_TBL_SQL = "inp-tbl-sql";
public static final String INPUT_TBL_COLUMNS = "inp-tbl-columns";
public static final String INPUT_TBL_WAREHOUSE = "inp-tbl-warehouse";
public static final String INPUT_TBL_DATADIR = "inp-tbl-datadir";
public static final String INPUT_TBL_PCOL = "inp-tbl-pcol";
public static final String INPUT_TBL_BOUNDARY = "inp-tbl-boundary";
/*
* All jdbc connector related configuration is prefixed with this:
* <tt>org.apache.sqoop.jdbc.</tt>
*/
public static final String PREFIX_CONNECTOR_JDBC_CONFIG =
"org.apache.sqoop.connector.jdbc.";
public static final String CONNECTOR_JDBC_DRIVER =
PREFIX_CONNECTOR_JDBC_CONFIG + "driver";
public static final String CONNECTOR_JDBC_URL =
PREFIX_CONNECTOR_JDBC_CONFIG + "url";
public static final String CONNECTOR_JDBC_USERNAME =
PREFIX_CONNECTOR_JDBC_CONFIG + "username";
public static final String CONNECTOR_JDBC_PASSWORD =
PREFIX_CONNECTOR_JDBC_CONFIG + "password";
public static final String CONNECTOR_JDBC_PARTITION_COLUMNNAME =
PREFIX_CONNECTOR_JDBC_CONFIG + "partition.columnname";
public static final String CONNECTOR_JDBC_PARTITION_COLUMNTYPE =
PREFIX_CONNECTOR_JDBC_CONFIG + "partition.columntype";
public static final String CONNECTOR_JDBC_PARTITION_MINVALUE =
PREFIX_CONNECTOR_JDBC_CONFIG + "partition.minvalue";
public static final String CONNECTOR_JDBC_PARTITION_MAXVALUE =
PREFIX_CONNECTOR_JDBC_CONFIG + "partition.maxvalue";
public static final String CONNECTOR_JDBC_DATA_SQL =
PREFIX_CONNECTOR_JDBC_CONFIG + "data.sql";
public static final String FILE_SEPARATOR = System.getProperty("file.separator");
public static final String DEFAULT_WAREHOUSE = "/tmp/sqoop/warehouse/";
public static final String DEFAULT_DATADIR = "DataStore";
public static final String SQL_CONDITIONS_TOKEN = "${CONDITIONS}";
public static final String SQL_PARAMETER_MARKER = "?";
public static final String SUBQUERY_ALIAS = "SQOOP_SUBQUERY_ALIAS";
private GenericJdbcConnectorConstants() {
// Disable explicit object creation

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc;
import org.apache.sqoop.common.ErrorCode;
public enum GenericJdbcConnectorError implements ErrorCode {
/** Unable to load the driver class. */
GENERIC_JDBC_CONNECTOR_0000("Unable to load the driver class"),
/** Unable to get a connection. */
GENERIC_JDBC_CONNECTOR_0001("Unable to get a connection"),
/** Unable to execute the SQL statement. */
GENERIC_JDBC_CONNECTOR_0002("Unable to execute the SQL statement"),
/** Unable to access meta data. */
GENERIC_JDBC_CONNECTOR_0003("Unable to access meta data"),
/** Error occurs while retrieving data from result. */
GENERIC_JDBC_CONNECTOR_0004("Error occurs while retrieving data from result"),
/** No column is found to partition data. */
GENERIC_JDBC_CONNECTOR_0005("No column is found to partition data"),
/** No boundaries are found for partition column. */
GENERIC_JDBC_CONNECTOR_0006("No boundaries are found for partition column"),
/** The table name and the table sql cannot be specify together. */
GENERIC_JDBC_CONNECTOR_0007("The table name and the table sql "
+ "cannot be specified together"),
/** Neither the table name nor the table sql are specified. */
GENERIC_JDBC_CONNECTOR_0008("Neither the table name nor the table sql "
+ "are specified"),
/** No substitute token in the specified sql. */
GENERIC_JDBC_CONNECTOR_0010("No substitute token in the specified sql"),
/** The type is not supported. */
GENERIC_JDBC_CONNECTOR_0011("The type is not supported"),
/** The required option has not been set yet. */
GENERIC_JDBC_CONNECTOR_0012("The required option has not been set yet"),
/** No parameter marker in the specified sql. */
GENERIC_JDBC_CONNECTOR_0013("No parameter marker in the specified sql"),
/** The table columns cannot be specified when
* the table sql is specified during export. */
GENERIC_JDBC_CONNECTOR_0014("The table columns cannot be specified "
+ "when the table sql is specified during export");
private final String message;
private GenericJdbcConnectorError(String message) {
this.message = message;
}
public String getCode() {
return name();
}
public String getMessage() {
return message;
}
}

View File

@ -0,0 +1,172 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.sqoop.common.SqoopException;
public class GenericJdbcExecutor {
private Connection connection;
public GenericJdbcExecutor(String driver, String url,
String username, String password) {
try {
Class.forName(driver);
connection = DriverManager.getConnection(url, username, password);
} catch (ClassNotFoundException e) {
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0000, driver, e);
} catch (SQLException e) {
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0001, e);
}
}
public ResultSet executeQuery(String sql) {
try {
Statement statement = connection.createStatement(
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
return statement.executeQuery(sql);
} catch (SQLException e) {
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
}
}
public void executeUpdate(String sql) {
try {
Statement statement = connection.createStatement(
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
statement.executeUpdate(sql);
} catch (SQLException e) {
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
}
}
public String getPrimaryKey(String table) {
try {
String[] splitNames = dequalify(table);
DatabaseMetaData dbmd = connection.getMetaData();
ResultSet rs = dbmd.getPrimaryKeys(null, splitNames[0], splitNames[1]);
if (rs != null && rs.next()) {
return rs.getString("COLUMN_NAME");
} else {
return null;
}
} catch (SQLException e) {
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0003, e);
}
}
public String[] getQueryColumns(String query) {
try {
Statement statement = connection.createStatement(
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ResultSet rs = statement.executeQuery(query);
ResultSetMetaData rsmd = rs.getMetaData();
int count = rsmd.getColumnCount();
String[] columns = new String[count];
for (int i = 0; i < count; i++) {
columns[i] = rsmd.getColumnName(i+1);
}
return columns;
} catch (SQLException e) {
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0003, e);
}
}
public boolean existTable(String table) {
try {
String[] splitNames = dequalify(table);
DatabaseMetaData dbmd = connection.getMetaData();
ResultSet rs = dbmd.getTables(null, splitNames[0], splitNames[1], null);
if (rs.next()) {
return true;
} else {
return false;
}
} catch (SQLException e) {
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0003, e);
}
}
/*
* If not qualified already, the name will be added with the qualifier.
* If qualified already, old qualifier will be replaced.
*/
public String qualify(String name, String qualifier) {
String[] splits = dequalify(name);
return qualifier + "." + splits[1];
}
/*
* Split the name into a qualifier (element 0) and a base (element 1).
*/
public String[] dequalify(String name) {
String qualifier;
String base;
int dot = name.indexOf(".");
if (dot != -1) {
qualifier = name.substring(0, dot);
base = name.substring(dot + 1);
} else {
qualifier = null;
base = name;
}
return new String[] {qualifier, base};
}
public String delimitIdentifier(String name) {
return "\"" + name + "\"";
}
public void close() {
try {
connection.close();
} catch (SQLException e) {
// TODO: Log the exception
}
}
}

View File

@ -19,11 +19,12 @@
import org.apache.sqoop.job.etl.MutableContext;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.Options;
public class GenericJdbcExportInitializer extends Initializer {
@Override
public void run(MutableContext context) {
public void run(MutableContext context, Options options) {
// TODO Auto-generated method stub
}

View File

@ -17,6 +17,11 @@
*/
package org.apache.sqoop.connector.jdbc;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.etl.Context;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Extractor;
@ -26,7 +31,43 @@ public class GenericJdbcImportExtractor extends Extractor {
@Override
public void run(Context context, Partition partition, DataWriter writer) {
// TODO Auto-generated method stub
String driver = context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER);
String url = context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL);
String username = context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_USERNAME);
String password = context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PASSWORD);
GenericJdbcExecutor executor = new GenericJdbcExecutor(
driver, url, username, password);
String query = context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL);
String conditions =
((GenericJdbcImportPartition)partition).getConditions();
query = query.replace(
GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions);
ResultSet resultSet = executor.executeQuery(query);
try {
ResultSetMetaData metaData = resultSet.getMetaData();
int column = metaData.getColumnCount();
Object[] array = new Object[column];
while (resultSet.next()) {
for (int i = 0; i< column; i++) {
array[i] = resultSet.getObject(i+1);
}
writer.writeArrayRecord(array);
}
} catch (SQLException e) {
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0004, e);
} finally {
executor.close();
}
}
}

View File

@ -17,14 +17,301 @@
*/
package org.apache.sqoop.connector.jdbc;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import org.apache.commons.lang.StringUtils;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.Constants;
import org.apache.sqoop.job.etl.MutableContext;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.Options;
public class GenericJdbcImportInitializer extends Initializer {
private MutableContext context;
private Options options;
private GenericJdbcExecutor executor;
@Override
public void run(MutableContext context) {
// TODO Auto-generated method stub
public void run(MutableContext context, Options options) {
this.context = context;
this.options = options;
configureJdbcProperties();
try {
configurePartitionProperties();
configureTableProperties();
} finally {
executor.close();
}
}
private void configureJdbcProperties() {
String driver = options.getOption(
GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER);
String url = options.getOption(
GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING);
String username = options.getOption(
GenericJdbcConnectorConstants.INPUT_CONN_USERNAME);
String password = options.getOption(
GenericJdbcConnectorConstants.INPUT_CONN_PASSWORD);
if (driver == null) {
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0012,
GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER);
}
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER,
driver);
if (url == null) {
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0012,
GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING);
}
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL,
url);
if (username != null) {
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_USERNAME,
username);
}
if (password != null) {
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PASSWORD,
password);
}
executor = new GenericJdbcExecutor(driver, url, username, password);
}
private void configurePartitionProperties() {
// ----- configure column name -----
String partitionColumnName = options.getOption(
GenericJdbcConnectorConstants.INPUT_TBL_PCOL);
if (partitionColumnName == null) {
// if column is not specified by the user,
// find the primary key of the table (when there is a table).
String tableName = options.getOption(
GenericJdbcConnectorConstants.INPUT_TBL_NAME);
if (tableName != null) {
partitionColumnName = executor.getPrimaryKey(tableName);
}
}
if (partitionColumnName != null) {
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
partitionColumnName);
} else {
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0005);
}
// ----- configure column type, min value, and max value -----
String minMaxQuery = options.getOption(
GenericJdbcConnectorConstants.INPUT_TBL_BOUNDARY);
if (minMaxQuery == null) {
StringBuilder builder = new StringBuilder();
String tableName = options.getOption(
GenericJdbcConnectorConstants.INPUT_TBL_NAME);
String tableSql = options.getOption(
GenericJdbcConnectorConstants.INPUT_TBL_SQL);
if (tableName != null && tableSql != null) {
// when both table name and table sql are specified:
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
} else if (tableName != null) {
// when table name is specified:
String column = partitionColumnName;
builder.append("SELECT MIN(");
builder.append(column);
builder.append("), MAX(");
builder.append(column);
builder.append(") FROM ");
builder.append(executor.delimitIdentifier(tableName));
} else if (tableSql != null) {
String column = executor.qualify(
partitionColumnName, GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
builder.append("SELECT MIN(");
builder.append(column);
builder.append("), MAX(");
builder.append(column);
builder.append(") FROM ");
builder.append("(");
builder.append(tableSql.replace(
GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 1"));
builder.append(") ");
builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
} else {
// when neither are specified:
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
}
minMaxQuery = builder.toString();
}
ResultSet rs = executor.executeQuery(minMaxQuery);
try {
ResultSetMetaData rsmd = rs.getMetaData();
if (rsmd.getColumnCount() != 2) {
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006);
}
rs.next();
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE,
String.valueOf(rsmd.getColumnType(1)));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
rs.getString(1));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
rs.getString(2));
} catch (SQLException e) {
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006, e);
}
}
private void configureTableProperties() {
String dataSql;
String fieldNames;
String outputDirectory;
String tableName = options.getOption(
GenericJdbcConnectorConstants.INPUT_TBL_NAME);
String tableSql = options.getOption(
GenericJdbcConnectorConstants.INPUT_TBL_SQL);
String tableColumns = options.getOption(
GenericJdbcConnectorConstants.INPUT_TBL_COLUMNS);
String datadir = options.getOption(
GenericJdbcConnectorConstants.INPUT_TBL_DATADIR);
String warehouse = options.getOption(
GenericJdbcConnectorConstants.INPUT_TBL_WAREHOUSE);
if (warehouse == null) {
warehouse = GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE;
} else if (!warehouse.endsWith(GenericJdbcConnectorConstants.FILE_SEPARATOR)) {
warehouse += GenericJdbcConnectorConstants.FILE_SEPARATOR;
}
if (tableName != null && tableSql != null) {
// when both table name and table sql are specified:
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
} else if (tableName != null) {
// when table name is specified:
if (tableColumns == null) {
StringBuilder builder = new StringBuilder();
builder.append("SELECT * FROM ");
builder.append(executor.delimitIdentifier(tableName));
builder.append(" WHERE ");
builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
dataSql = builder.toString();
String[] queryColumns = executor.getQueryColumns(dataSql.replace(
GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0"));
fieldNames = StringUtils.join(queryColumns, ',');
} else {
StringBuilder builder = new StringBuilder();
builder.append("SELECT ");
builder.append(tableColumns);
builder.append(" FROM ");
builder.append(executor.delimitIdentifier(tableName));
builder.append(" WHERE ");
builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
dataSql = builder.toString();
fieldNames = tableColumns;
}
if (datadir == null) {
outputDirectory = warehouse + tableName;
} else {
outputDirectory = warehouse + datadir;
}
} else if (tableSql != null) {
// when table sql is specified:
if (tableSql.indexOf(
GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN) == -1) {
// make sure substitute token for conditions is in the specified sql
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0010);
}
if (tableColumns == null) {
dataSql = tableSql;
String[] queryColumns = executor.getQueryColumns(dataSql.replace(
GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0"));
fieldNames = StringUtils.join(queryColumns, ',');
} else {
String[] columns = StringUtils.split(tableColumns, ',');
StringBuilder builder = new StringBuilder();
builder.append("SELECT ");
builder.append(executor.qualify(
columns[0], GenericJdbcConnectorConstants.SUBQUERY_ALIAS));
for (int i = 1; i < columns.length; i++) {
builder.append(",");
builder.append(executor.qualify(
columns[i], GenericJdbcConnectorConstants.SUBQUERY_ALIAS));
}
builder.append(" FROM ");
builder.append("(");
builder.append(tableSql);
builder.append(") ");
builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
dataSql = builder.toString();
fieldNames = tableColumns;
}
if (datadir == null) {
outputDirectory =
warehouse + GenericJdbcConnectorConstants.DEFAULT_DATADIR;
} else {
outputDirectory = warehouse + datadir;
}
} else {
// when neither are specified:
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
}
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL,
dataSql.toString());
context.setString(Constants.JOB_ETL_FIELD_NAMES, fieldNames);
context.setString(Constants.JOB_ETL_OUTPUT_DIRECTORY, outputDirectory);
}
}

View File

@ -25,14 +25,24 @@
public class GenericJdbcImportPartition extends Partition {
private String conditions;
public void setConditions(String conditions) {
this.conditions = conditions;
}
public String getConditions() {
return conditions;
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
conditions = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(conditions);
}
}

View File

@ -17,19 +17,166 @@
*/
package org.apache.sqoop.connector.jdbc;
import java.sql.Types;
import java.util.LinkedList;
import java.util.List;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.Constants;
import org.apache.sqoop.job.etl.Context;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
public class GenericJdbcImportPartitioner extends Partitioner {
private int numberPartitions;
private String partitionColumnName;
private int partitionColumnType;
private String partitionMinValue;
private String partitionMaxValue;
@Override
public List<Partition> run(Context context) {
// TODO Auto-generated method stub
return new LinkedList<Partition>();
numberPartitions = Integer.parseInt(context.getString(
Constants.JOB_ETL_NUMBER_PARTITIONS));
partitionColumnName = context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME);
partitionColumnType = Integer.parseInt(context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE));
partitionMinValue = context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE);
partitionMaxValue = context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE);
switch (partitionColumnType) {
case Types.TINYINT:
case Types.SMALLINT:
case Types.INTEGER:
case Types.BIGINT:
// Integer column
return partitionIntegerColumn();
case Types.REAL:
case Types.FLOAT:
case Types.DOUBLE:
// Floating point column
return partitionFloatingPointColumn();
case Types.NUMERIC:
case Types.DECIMAL:
// Decimal column
// TODO: Add partition function
case Types.BIT:
case Types.BOOLEAN:
// Boolean column
// TODO: Add partition function
case Types.DATE:
case Types.TIME:
case Types.TIMESTAMP:
// Date time column
// TODO: Add partition function
case Types.CHAR:
case Types.VARCHAR:
case Types.LONGVARCHAR:
// Text column
// TODO: Add partition function
default:
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
String.valueOf(partitionColumnType));
}
}
protected List<Partition> partitionIntegerColumn() {
List<Partition> partitions = new LinkedList<Partition>();
if (partitionMinValue == null && partitionMaxValue == null) {
GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
partition.setConditions(partitionColumnName + "IS NULL");
partitions.add(partition);
return partitions;
}
long minValue = Long.parseLong(partitionMinValue);
long maxValue = Long.parseLong(partitionMaxValue);
long interval = (maxValue - minValue) / numberPartitions;
long remainder = (maxValue - minValue) % numberPartitions;
if (interval == 0) {
numberPartitions = (int)remainder;
}
long lowerBound;
long upperBound = minValue;
for (int i = 1; i < numberPartitions; i++) {
lowerBound = upperBound;
upperBound = lowerBound + interval;
upperBound += (i <= remainder) ? 1 : 0;
GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
partition.setConditions(
constructConditions(lowerBound, upperBound, false));
partitions.add(partition);
}
GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
partition.setConditions(
constructConditions(upperBound, maxValue, true));
partitions.add(partition);
return partitions;
}
protected List<Partition> partitionFloatingPointColumn() {
List<Partition> partitions = new LinkedList<Partition>();
if (partitionMinValue == null && partitionMaxValue == null) {
GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
partition.setConditions(partitionColumnName + "IS NULL");
partitions.add(partition);
return partitions;
}
double minValue = Double.parseDouble(partitionMinValue);
double maxValue = Double.parseDouble(partitionMaxValue);
double interval = (maxValue - minValue) / numberPartitions;
double lowerBound;
double upperBound = minValue;
for (int i = 1; i < numberPartitions; i++) {
lowerBound = upperBound;
upperBound = lowerBound + interval;
GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
partition.setConditions(
constructConditions(lowerBound, upperBound, false));
partitions.add(partition);
}
GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
partition.setConditions(
constructConditions(upperBound, maxValue, true));
partitions.add(partition);
return partitions;
}
protected String constructConditions(
Object lowerBound, Object upperBound, boolean lastOne) {
StringBuilder conditions = new StringBuilder();
conditions.append(lowerBound);
conditions.append(" <= ");
conditions.append(partitionColumnName);
conditions.append(" AND ");
conditions.append(partitionColumnName);
conditions.append(lastOne ? " <= " : " < ");
conditions.append(upperBound);
return conditions.toString();
}
}

View File

@ -54,5 +54,29 @@ form-table-help = You must supply the information requested in order to create \
a connection object.
# Table name
inp-tbl-table-label = Table name
inp-tbl-table-help = Name of the table in remote database
inp-tbl-name-label = Table name
inp-tbl-name-help = Table name to process data in the remote database
# Table SQL
inp-tbl-sql-label = Table SQL statement
inp-tbl-sql-help = SQL statement to process data in the remote database
# Table columns
inp-tbl-columns-label = Table column names
inp-tbl-columns-help = Specific columns of a table name or a table SQL
# Table warehouse
inp-tbl-warehouse-label = Data warehouse
inp-tbl-warehouse-help = The root directory for data
# Table datadir
inp-tbl-datadir-label = Data directory
inp-tbl-datadir-help = The sub-directory under warehouse for data
# Table pcol
inp-tbl-pcol-label = Partition column name
inp-tbl-pcol-help = A specific column for data partition
# Table boundary
inp-tbl-boundary-label = Boundary query
inp-tbl-boundary-help = The boundary query for data partition

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc;
public class GenericJdbcTestConstants {
public static final String DRIVER = "org.apache.derby.jdbc.EmbeddedDriver";
public static final String URL = "jdbc:derby:memory:TESTDB;create=true";
}

View File

@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc;
import java.util.HashMap;
import junit.framework.TestCase;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.MutableContext;
import org.apache.sqoop.job.io.DataWriter;
import org.junit.Test;
public class TestImportExtractor extends TestCase {
private final String tableName;
private GenericJdbcExecutor executor;
private static final int START = -50;
private static final int NUMBER_OF_ROWS = 101;
public TestImportExtractor() {
tableName = getClass().getSimpleName();
}
@Override
public void setUp() {
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
GenericJdbcTestConstants.URL, null, null);
if (!executor.existTable(tableName)) {
executor.executeUpdate("CREATE TABLE "
+ executor.delimitIdentifier(tableName)
+ "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
for (int i = 0; i < NUMBER_OF_ROWS; i++) {
int value = START + i;
String sql = "INSERT INTO " + executor.delimitIdentifier(tableName)
+ " VALUES(" + value + ", " + value + ", '" + value + "')";
executor.executeUpdate(sql);
}
}
}
@Override
public void tearDown() {
executor.close();
}
@Test
public void testQuery() throws Exception {
DummyContext context = new DummyContext();
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER,
GenericJdbcTestConstants.DRIVER);
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL,
GenericJdbcTestConstants.URL);
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL,
"SELECT * FROM " + executor.delimitIdentifier(tableName)
+ " WHERE ${CONDITIONS}");
GenericJdbcImportPartition partition;
Extractor extractor = new GenericJdbcImportExtractor();
DummyWriter writer = new DummyWriter();
partition = new GenericJdbcImportPartition();
partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665");
extractor.run(context, partition, writer);
partition = new GenericJdbcImportPartition();
partition.setConditions("-16.6666666666666665 <= DCOL AND DCOL < 16.666666666666667");
extractor.run(context, partition, writer);
partition = new GenericJdbcImportPartition();
partition.setConditions("16.666666666666667 <= DCOL AND DCOL <= 50.0");
extractor.run(context, partition, writer);
}
@Test
public void testSubquery() throws Exception {
DummyContext context = new DummyContext();
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER,
GenericJdbcTestConstants.DRIVER);
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL,
GenericJdbcTestConstants.URL);
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL,
"SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "
+ "(SELECT * FROM " + executor.delimitIdentifier(tableName)
+ " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
GenericJdbcImportPartition partition;
Extractor extractor = new GenericJdbcImportExtractor();
DummyWriter writer = new DummyWriter();
partition = new GenericJdbcImportPartition();
partition.setConditions("-50 <= ICOL AND ICOL < -16");
extractor.run(context, partition, writer);
partition = new GenericJdbcImportPartition();
partition.setConditions("-16 <= ICOL AND ICOL < 17");
extractor.run(context, partition, writer);
partition = new GenericJdbcImportPartition();
partition.setConditions("17 <= ICOL AND ICOL < 50");
extractor.run(context, partition, writer);
}
public class DummyContext implements MutableContext {
HashMap<String, String> store = new HashMap<String, String>();
@Override
public String getString(String key) {
return store.get(key);
}
@Override
public void setString(String key, String value) {
store.put(key, value);
}
}
public class DummyWriter extends DataWriter {
int indx = START;
@Override
public void writeArrayRecord(Object[] array) {
for (int i = 0; i < array.length; i++) {
if (array[i] instanceof Integer) {
assertEquals(indx, ((Integer)array[i]).intValue());
} else if (array[i] instanceof Double) {
assertEquals((double)indx, ((Double)array[i]).doubleValue());
} else {
assertEquals(String.valueOf(indx), array[i].toString());
}
}
indx++;
}
@Override
public void writeCsvRecord(String csv) {
fail("This method should not be invoked.");
}
@Override
public void writeRecord(Object record) {
fail("This method should not be invoked.");
}
}
}

View File

@ -0,0 +1,235 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc;
import java.sql.Types;
import java.util.Hashtable;
import junit.framework.TestCase;
import org.apache.sqoop.job.Constants;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.MutableContext;
import org.apache.sqoop.job.etl.Options;
import org.junit.Test;
public class TestImportInitializer extends TestCase {
private final String tableName;
private final String tableSql;
private final String tableColumns;
private GenericJdbcExecutor executor;
private static final int START = -50;
private static final int NUMBER_OF_ROWS = 101;
public TestImportInitializer() {
tableName = getClass().getSimpleName();
tableSql = "SELECT * FROM \"" + tableName + "\" WHERE ${CONDITIONS}";
tableColumns = "ICOL,VCOL";
}
@Override
public void setUp() {
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
GenericJdbcTestConstants.URL, null, null);
if (!executor.existTable(tableName)) {
executor.executeUpdate("CREATE TABLE "
+ executor.delimitIdentifier(tableName)
+ "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
for (int i = 0; i < NUMBER_OF_ROWS; i++) {
int value = START + i;
String sql = "INSERT INTO " + executor.delimitIdentifier(tableName)
+ " VALUES(" + value + ", " + value + ", '" + value + "')";
executor.executeUpdate(sql);
}
}
}
@Override
public void tearDown() {
executor.close();
}
@Test
public void testTableName() throws Exception {
DummyOptions options = new DummyOptions();
options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER,
GenericJdbcTestConstants.DRIVER);
options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING,
GenericJdbcTestConstants.URL);
options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_NAME,
tableName);
DummyContext context = new DummyContext();
Initializer initializer = new GenericJdbcImportInitializer();
initializer.run(context, options);
verifyResult(context,
"SELECT * FROM " + executor.delimitIdentifier(tableName)
+ " WHERE ${CONDITIONS}",
"ICOL,DCOL,VCOL",
GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE + tableName,
"ICOL",
String.valueOf(Types.INTEGER),
String.valueOf(START),
String.valueOf(START+NUMBER_OF_ROWS-1));
}
@Test
public void testTableNameWithTableColumns() throws Exception {
DummyOptions options = new DummyOptions();
options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER,
GenericJdbcTestConstants.DRIVER);
options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING,
GenericJdbcTestConstants.URL);
options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_NAME,
tableName);
options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_COLUMNS,
tableColumns);
DummyContext context = new DummyContext();
Initializer initializer = new GenericJdbcImportInitializer();
initializer.run(context, options);
verifyResult(context,
"SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(tableName)
+ " WHERE ${CONDITIONS}",
tableColumns,
GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE + tableName,
"ICOL",
String.valueOf(Types.INTEGER),
String.valueOf(START),
String.valueOf(START+NUMBER_OF_ROWS-1));
}
@Test
public void testTableSql() throws Exception {
DummyOptions options = new DummyOptions();
options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER,
GenericJdbcTestConstants.DRIVER);
options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING,
GenericJdbcTestConstants.URL);
options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_SQL,
tableSql);
options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_PCOL,
"DCOL");
DummyContext context = new DummyContext();
Initializer initializer = new GenericJdbcImportInitializer();
initializer.run(context, options);
verifyResult(context,
"SELECT * FROM " + executor.delimitIdentifier(tableName)
+ " WHERE ${CONDITIONS}",
"ICOL,DCOL,VCOL",
GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE
+ GenericJdbcConnectorConstants.DEFAULT_DATADIR,
"DCOL",
String.valueOf(Types.DOUBLE),
String.valueOf((double)START),
String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
}
@Test
public void testTableSqlWithTableColumns() throws Exception {
DummyOptions options = new DummyOptions();
options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER,
GenericJdbcTestConstants.DRIVER);
options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING,
GenericJdbcTestConstants.URL);
options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_SQL,
tableSql);
options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_COLUMNS,
tableColumns);
options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_PCOL,
"DCOL");
DummyContext context = new DummyContext();
Initializer initializer = new GenericJdbcImportInitializer();
initializer.run(context, options);
verifyResult(context,
"SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "
+ "(SELECT * FROM " + executor.delimitIdentifier(tableName)
+ " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS",
tableColumns,
GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE
+ GenericJdbcConnectorConstants.DEFAULT_DATADIR,
"DCOL",
String.valueOf(Types.DOUBLE),
String.valueOf((double)START),
String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
}
private void verifyResult(DummyContext context,
String dataSql, String fieldNames, String outputDirectory,
String partitionColumnName, String partitionColumnType,
String partitionMinValue, String partitionMaxValue) {
assertEquals(dataSql, context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL));
assertEquals(fieldNames, context.getString(
Constants.JOB_ETL_FIELD_NAMES));
assertEquals(outputDirectory, context.getString(
Constants.JOB_ETL_OUTPUT_DIRECTORY));
assertEquals(partitionColumnName, context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME));
assertEquals(partitionColumnType, context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE));
assertEquals(partitionMinValue, context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE));
assertEquals(partitionMaxValue, context.getString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE));
}
public class DummyOptions implements Options {
Hashtable<String, String> store = new Hashtable<String, String>();
public void setOption(String key, String value) {
store.put(key, value);
}
@Override
public String getOption(String key) {
return store.get(key);
}
}
public class DummyContext implements MutableContext {
Hashtable<String, String> store = new Hashtable<String, String>();
@Override
public String getString(String key) {
return store.get(key);
}
@Override
public void setString(String key, String value) {
store.put(key, value);
}
}
}

View File

@ -0,0 +1,209 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc;
import java.sql.Types;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import junit.framework.TestCase;
import org.apache.sqoop.job.Constants;
import org.apache.sqoop.job.etl.MutableContext;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.junit.Test;
public class TestImportPartitioner extends TestCase {
private static final int START = -5;
private static final int NUMBER_OF_ROWS = 11;
@Test
public void testIntegerEvenPartition() throws Exception {
DummyContext context = new DummyContext();
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
"ICOL");
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE,
String.valueOf(Types.INTEGER));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
String.valueOf(START));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
String.valueOf(START + NUMBER_OF_ROWS - 1));
context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "5");
Partitioner partitioner = new GenericJdbcImportPartitioner();
List<Partition> partitions = partitioner.run(context);
verifyResult(partitions, new String[] {
"-5 <= ICOL AND ICOL < -3",
"-3 <= ICOL AND ICOL < -1",
"-1 <= ICOL AND ICOL < 1",
"1 <= ICOL AND ICOL < 3",
"3 <= ICOL AND ICOL <= 5"
});
}
@Test
public void testIntegerUnevenPartition() throws Exception {
DummyContext context = new DummyContext();
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
"ICOL");
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE,
String.valueOf(Types.INTEGER));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
String.valueOf(START));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
String.valueOf(START + NUMBER_OF_ROWS - 1));
context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "3");
Partitioner partitioner = new GenericJdbcImportPartitioner();
List<Partition> partitions = partitioner.run(context);
verifyResult(partitions, new String[] {
"-5 <= ICOL AND ICOL < -1",
"-1 <= ICOL AND ICOL < 2",
"2 <= ICOL AND ICOL <= 5"
});
}
@Test
public void testIntegerOverPartition() throws Exception {
DummyContext context = new DummyContext();
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
"ICOL");
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE,
String.valueOf(Types.INTEGER));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
String.valueOf(START));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
String.valueOf(START + NUMBER_OF_ROWS - 1));
context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "13");
Partitioner partitioner = new GenericJdbcImportPartitioner();
List<Partition> partitions = partitioner.run(context);
verifyResult(partitions, new String[] {
"-5 <= ICOL AND ICOL < -4",
"-4 <= ICOL AND ICOL < -3",
"-3 <= ICOL AND ICOL < -2",
"-2 <= ICOL AND ICOL < -1",
"-1 <= ICOL AND ICOL < 0",
"0 <= ICOL AND ICOL < 1",
"1 <= ICOL AND ICOL < 2",
"2 <= ICOL AND ICOL < 3",
"3 <= ICOL AND ICOL < 4",
"4 <= ICOL AND ICOL <= 5"
});
}
@Test
public void testFloatingPointEvenPartition() throws Exception {
DummyContext context = new DummyContext();
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
"DCOL");
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE,
String.valueOf(Types.DOUBLE));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
String.valueOf((double)START));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
String.valueOf((double)(START + NUMBER_OF_ROWS - 1)));
context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "5");
Partitioner partitioner = new GenericJdbcImportPartitioner();
List<Partition> partitions = partitioner.run(context);
verifyResult(partitions, new String[] {
"-5.0 <= DCOL AND DCOL < -3.0",
"-3.0 <= DCOL AND DCOL < -1.0",
"-1.0 <= DCOL AND DCOL < 1.0",
"1.0 <= DCOL AND DCOL < 3.0",
"3.0 <= DCOL AND DCOL <= 5.0"
});
}
@Test
public void testFloatingPointUnevenPartition() throws Exception {
DummyContext context = new DummyContext();
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
"DCOL");
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE,
String.valueOf(Types.DOUBLE));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
String.valueOf((double)START));
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
String.valueOf((double)(START + NUMBER_OF_ROWS - 1)));
context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "3");
Partitioner partitioner = new GenericJdbcImportPartitioner();
List<Partition> partitions = partitioner.run(context);
verifyResult(partitions, new String[] {
"-5.0 <= DCOL AND DCOL < -1.6666666666666665",
"-1.6666666666666665 <= DCOL AND DCOL < 1.666666666666667",
"1.666666666666667 <= DCOL AND DCOL <= 5.0"
});
}
private void verifyResult(List<Partition> partitions,
String[] expected) {
assertEquals(expected.length, partitions.size());
Iterator<Partition> iterator = partitions.iterator();
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i],
((GenericJdbcImportPartition)iterator.next()).getConditions());
}
}
public class DummyContext implements MutableContext {
HashMap<String, String> store = new HashMap<String, String>();
@Override
public String getString(String key) {
return store.get(key);
}
@Override
public void setString(String key, String value) {
store.put(key, value);
}
}
}

View File

@ -35,27 +35,25 @@ limitations under the License.
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-spi</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-common</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -84,8 +84,16 @@ public enum CoreError implements ErrorCode {
/** Error occurs during loader run */
CORE_0018("Error occurs during loader run"),
/** Data have not been completely consumed yet */
CORE_0019("Data have not been completely consumed yet");
CORE_0019("Data have not been completely consumed yet"),
/** The required option has not been set yet */
CORE_0020("The required option has not been set yet"),
/** Error occurs during partitioner run */
CORE_0021("Error occurs during partitioner run"),
/** Unable to parse because it is not properly delimited */
CORE_0022("Unable to parse because it is not properly delimited");
private final String message;

View File

@ -19,7 +19,7 @@
import org.apache.sqoop.core.ConfigurationConstants;
public final class JobConstants {
public final class JobConstants extends Constants {
/**
* All job related configuration is prefixed with this:

93
pom.xml
View File

@ -91,11 +91,14 @@ limitations under the License.
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compile.source>1.6</maven.compile.source>
<maven.compile.target>1.6</maven.compile.target>
<log4j.version>1.2.16</log4j.version>
<json-simple.version>1.1</json-simple.version>
<commons-dbcp.version>1.4</commons-dbcp.version>
<commons-lang.version>2.5</commons-lang.version>
<derby.version>10.8.2.2</derby.version>
<hadoop.version>2.0.0-SNAPSHOT</hadoop.version>
<json-simple.version>1.1</json-simple.version>
<junit.version>4.9</junit.version>
<log4j.version>1.2.16</log4j.version>
<servlet.version>2.5</servlet.version>
</properties>
<dependencies>
@ -108,14 +111,57 @@ limitations under the License.
<dependencyManagement>
<dependencies>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-core</artifactId>
<type>test-jar</type>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-server</artifactId>
<type>war</type>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-spi</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.sqoop.repository</groupId>
<artifactId>sqoop-repository-derby</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-generic-jdbc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-generic-jdbc</artifactId>
<type>test-jar</type>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-mysql-jdbc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
@ -128,15 +174,40 @@ limitations under the License.
<version>${commons-dbcp.version}</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>${derby.version}</version>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>${commons-lang.version}</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>${servlet.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>${derby.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job;
public class Constants {
/**
* All job related configuration is prefixed with this:
* <tt>org.apache.sqoop.job.</tt>
*/
public static final String PREFIX_CONFIG = "org.apache.sqoop.job.";
public static final String JOB_ETL_NUMBER_PARTITIONS = PREFIX_CONFIG
+ "etl.number.partitions";
public static final String JOB_ETL_FIELD_NAMES = PREFIX_CONFIG
+ "etl.field.names";
public static final String JOB_ETL_OUTPUT_DIRECTORY = PREFIX_CONFIG
+ "etl.output.directory";
protected Constants() {
// Disable explicit object creation
}
}

View File

@ -23,6 +23,6 @@
*/
public abstract class Initializer {
public abstract void run(MutableContext context);
public abstract void run(MutableContext context, Options options);
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.etl;
/**
* The options provided from user input.
*/
public interface Options {
public String getOption(String key);
}