From d4b4df88257fe78ef3936d021d3c713f181cedbd Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Mon, 8 Oct 2012 16:28:18 -0700 Subject: [PATCH] SQOOP-605: Generic JDBC connector for import (Bilung Lee via Jarek Jarcec Cecho) --- connector/connector-generic-jdbc/pom.xml | 35 ++- .../connector/jdbc/GenericJdbcConnector.java | 8 +- .../jdbc/GenericJdbcConnectorConstants.java | 49 ++- .../jdbc/GenericJdbcConnectorError.java | 83 +++++ .../connector/jdbc/GenericJdbcExecutor.java | 172 +++++++++++ .../jdbc/GenericJdbcExportInitializer.java | 3 +- .../jdbc/GenericJdbcImportExtractor.java | 43 ++- .../jdbc/GenericJdbcImportInitializer.java | 291 +++++++++++++++++- .../jdbc/GenericJdbcImportPartition.java | 14 +- .../jdbc/GenericJdbcImportPartitioner.java | 151 ++++++++- ...eneric-jdbc-connector-resources.properties | 28 +- .../jdbc/GenericJdbcTestConstants.java | 25 ++ .../connector/jdbc/TestImportExtractor.java | 171 ++++++++++ .../connector/jdbc/TestImportInitializer.java | 235 ++++++++++++++ .../connector/jdbc/TestImportPartitioner.java | 209 +++++++++++++ core/pom.xml | 22 +- .../java/org/apache/sqoop/core/CoreError.java | 12 +- .../org/apache/sqoop/job/JobConstants.java | 2 +- pom.xml | 95 +++++- .../java/org/apache/sqoop/job/Constants.java | 41 +++ .../org/apache/sqoop/job/etl/Initializer.java | 2 +- .../org/apache/sqoop/job/etl/Options.java | 27 ++ 22 files changed, 1677 insertions(+), 41 deletions(-) create mode 100644 connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java create mode 100644 connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java create mode 100644 connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcTestConstants.java create mode 100644 connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java create mode 100644 connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java create mode 100644 connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java create mode 100644 spi/src/main/java/org/apache/sqoop/job/Constants.java create mode 100644 spi/src/main/java/org/apache/sqoop/job/etl/Options.java diff --git a/connector/connector-generic-jdbc/pom.xml b/connector/connector-generic-jdbc/pom.xml index 382c6691..be4cedd2 100644 --- a/connector/connector-generic-jdbc/pom.xml +++ b/connector/connector-generic-jdbc/pom.xml @@ -35,8 +35,41 @@ limitations under the License. org.apache.sqoop sqoop-spi - 2.0.0-SNAPSHOT + + + commons-lang + commons-lang + + + + org.apache.derby + derby + + + + junit + junit + test + + + + sqoop + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java index 4363e0a4..b1367edf 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java @@ -86,7 +86,13 @@ public class GenericJdbcConnector implements SqoopConnector { forms = new ArrayList(); inputs = new ArrayList>(); - inputs.add(new MStringInput(INPUT_TBL_TABLE, false, (short) 50)); + inputs.add(new MStringInput(INPUT_TBL_NAME, false, (short) 50)); + inputs.add(new MStringInput(INPUT_TBL_SQL, false, (short) 50)); + inputs.add(new MStringInput(INPUT_TBL_COLUMNS, false, (short) 50)); + inputs.add(new MStringInput(INPUT_TBL_WAREHOUSE, false, (short) 50)); + inputs.add(new MStringInput(INPUT_TBL_DATADIR, false, (short) 50)); + inputs.add(new MStringInput(INPUT_TBL_PCOL, false, (short) 50)); + inputs.add(new MStringInput(INPUT_TBL_BOUNDARY, false, (short) 50)); forms.add(new MForm(FORM_TABLE, inputs)); JOB_FORMS = new ArrayList(); diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java index 0e452418..e991734c 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java @@ -43,7 +43,54 @@ public final class GenericJdbcConnectorConstants { public static final String FORM_TABLE = "form-table"; // Table form inputs - public static final String INPUT_TBL_TABLE = "inp-tbl-table"; + public static final String INPUT_TBL_NAME = "inp-tbl-name"; + public static final String INPUT_TBL_SQL = "inp-tbl-sql"; + public static final String INPUT_TBL_COLUMNS = "inp-tbl-columns"; + public static final String INPUT_TBL_WAREHOUSE = "inp-tbl-warehouse"; + public static final String INPUT_TBL_DATADIR = "inp-tbl-datadir"; + public static final String INPUT_TBL_PCOL = "inp-tbl-pcol"; + public static final String INPUT_TBL_BOUNDARY = "inp-tbl-boundary"; + + + /* + * All jdbc connector related configuration is prefixed with this: + * org.apache.sqoop.jdbc. + */ + public static final String PREFIX_CONNECTOR_JDBC_CONFIG = + "org.apache.sqoop.connector.jdbc."; + + public static final String CONNECTOR_JDBC_DRIVER = + PREFIX_CONNECTOR_JDBC_CONFIG + "driver"; + public static final String CONNECTOR_JDBC_URL = + PREFIX_CONNECTOR_JDBC_CONFIG + "url"; + public static final String CONNECTOR_JDBC_USERNAME = + PREFIX_CONNECTOR_JDBC_CONFIG + "username"; + public static final String CONNECTOR_JDBC_PASSWORD = + PREFIX_CONNECTOR_JDBC_CONFIG + "password"; + + public static final String CONNECTOR_JDBC_PARTITION_COLUMNNAME = + PREFIX_CONNECTOR_JDBC_CONFIG + "partition.columnname"; + public static final String CONNECTOR_JDBC_PARTITION_COLUMNTYPE = + PREFIX_CONNECTOR_JDBC_CONFIG + "partition.columntype"; + public static final String CONNECTOR_JDBC_PARTITION_MINVALUE = + PREFIX_CONNECTOR_JDBC_CONFIG + "partition.minvalue"; + public static final String CONNECTOR_JDBC_PARTITION_MAXVALUE = + PREFIX_CONNECTOR_JDBC_CONFIG + "partition.maxvalue"; + + public static final String CONNECTOR_JDBC_DATA_SQL = + PREFIX_CONNECTOR_JDBC_CONFIG + "data.sql"; + + public static final String FILE_SEPARATOR = System.getProperty("file.separator"); + + public static final String DEFAULT_WAREHOUSE = "/tmp/sqoop/warehouse/"; + + public static final String DEFAULT_DATADIR = "DataStore"; + + public static final String SQL_CONDITIONS_TOKEN = "${CONDITIONS}"; + + public static final String SQL_PARAMETER_MARKER = "?"; + + public static final String SUBQUERY_ALIAS = "SQOOP_SUBQUERY_ALIAS"; private GenericJdbcConnectorConstants() { // Disable explicit object creation diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java new file mode 100644 index 00000000..1fcea5f3 --- /dev/null +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import org.apache.sqoop.common.ErrorCode; + +public enum GenericJdbcConnectorError implements ErrorCode { + + /** Unable to load the driver class. */ + GENERIC_JDBC_CONNECTOR_0000("Unable to load the driver class"), + + /** Unable to get a connection. */ + GENERIC_JDBC_CONNECTOR_0001("Unable to get a connection"), + + /** Unable to execute the SQL statement. */ + GENERIC_JDBC_CONNECTOR_0002("Unable to execute the SQL statement"), + + /** Unable to access meta data. */ + GENERIC_JDBC_CONNECTOR_0003("Unable to access meta data"), + + /** Error occurs while retrieving data from result. */ + GENERIC_JDBC_CONNECTOR_0004("Error occurs while retrieving data from result"), + + /** No column is found to partition data. */ + GENERIC_JDBC_CONNECTOR_0005("No column is found to partition data"), + + /** No boundaries are found for partition column. */ + GENERIC_JDBC_CONNECTOR_0006("No boundaries are found for partition column"), + + /** The table name and the table sql cannot be specify together. */ + GENERIC_JDBC_CONNECTOR_0007("The table name and the table sql " + + "cannot be specified together"), + + /** Neither the table name nor the table sql are specified. */ + GENERIC_JDBC_CONNECTOR_0008("Neither the table name nor the table sql " + + "are specified"), + + /** No substitute token in the specified sql. */ + GENERIC_JDBC_CONNECTOR_0010("No substitute token in the specified sql"), + + /** The type is not supported. */ + GENERIC_JDBC_CONNECTOR_0011("The type is not supported"), + + /** The required option has not been set yet. */ + GENERIC_JDBC_CONNECTOR_0012("The required option has not been set yet"), + + /** No parameter marker in the specified sql. */ + GENERIC_JDBC_CONNECTOR_0013("No parameter marker in the specified sql"), + + /** The table columns cannot be specified when + * the table sql is specified during export. */ + GENERIC_JDBC_CONNECTOR_0014("The table columns cannot be specified " + + "when the table sql is specified during export"); + + private final String message; + + private GenericJdbcConnectorError(String message) { + this.message = message; + } + + public String getCode() { + return name(); + } + + public String getMessage() { + return message; + } +} diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java new file mode 100644 index 00000000..702dd7e3 --- /dev/null +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.sqoop.common.SqoopException; + +public class GenericJdbcExecutor { + + private Connection connection; + + public GenericJdbcExecutor(String driver, String url, + String username, String password) { + try { + Class.forName(driver); + connection = DriverManager.getConnection(url, username, password); + + } catch (ClassNotFoundException e) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0000, driver, e); + + } catch (SQLException e) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0001, e); + } + } + + public ResultSet executeQuery(String sql) { + try { + Statement statement = connection.createStatement( + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + return statement.executeQuery(sql); + + } catch (SQLException e) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e); + } + } + + public void executeUpdate(String sql) { + try { + Statement statement = connection.createStatement( + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + statement.executeUpdate(sql); + + } catch (SQLException e) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e); + } + } + + public String getPrimaryKey(String table) { + try { + String[] splitNames = dequalify(table); + DatabaseMetaData dbmd = connection.getMetaData(); + ResultSet rs = dbmd.getPrimaryKeys(null, splitNames[0], splitNames[1]); + + if (rs != null && rs.next()) { + return rs.getString("COLUMN_NAME"); + + } else { + return null; + } + + } catch (SQLException e) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0003, e); + } + } + + public String[] getQueryColumns(String query) { + try { + Statement statement = connection.createStatement( + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + ResultSet rs = statement.executeQuery(query); + + ResultSetMetaData rsmd = rs.getMetaData(); + int count = rsmd.getColumnCount(); + String[] columns = new String[count]; + for (int i = 0; i < count; i++) { + columns[i] = rsmd.getColumnName(i+1); + } + + return columns; + + } catch (SQLException e) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0003, e); + } + } + + public boolean existTable(String table) { + try { + String[] splitNames = dequalify(table); + + DatabaseMetaData dbmd = connection.getMetaData(); + ResultSet rs = dbmd.getTables(null, splitNames[0], splitNames[1], null); + + if (rs.next()) { + return true; + } else { + return false; + } + + } catch (SQLException e) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0003, e); + } + } + + /* + * If not qualified already, the name will be added with the qualifier. + * If qualified already, old qualifier will be replaced. + */ + public String qualify(String name, String qualifier) { + String[] splits = dequalify(name); + return qualifier + "." + splits[1]; + } + + /* + * Split the name into a qualifier (element 0) and a base (element 1). + */ + public String[] dequalify(String name) { + String qualifier; + String base; + int dot = name.indexOf("."); + if (dot != -1) { + qualifier = name.substring(0, dot); + base = name.substring(dot + 1); + } else { + qualifier = null; + base = name; + } + return new String[] {qualifier, base}; + } + + public String delimitIdentifier(String name) { + return "\"" + name + "\""; + } + + public void close() { + try { + connection.close(); + + } catch (SQLException e) { + // TODO: Log the exception + } + } + +} \ No newline at end of file diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java index 2b0b6214..a21dc764 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java @@ -19,11 +19,12 @@ import org.apache.sqoop.job.etl.MutableContext; import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.Options; public class GenericJdbcExportInitializer extends Initializer { @Override - public void run(MutableContext context) { + public void run(MutableContext context, Options options) { // TODO Auto-generated method stub } diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java index aa7359e6..4499fda0 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java @@ -17,6 +17,11 @@ */ package org.apache.sqoop.connector.jdbc; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.job.etl.Context; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Extractor; @@ -26,7 +31,43 @@ public class GenericJdbcImportExtractor extends Extractor { @Override public void run(Context context, Partition partition, DataWriter writer) { - // TODO Auto-generated method stub + String driver = context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER); + String url = context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL); + String username = context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_USERNAME); + String password = context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PASSWORD); + GenericJdbcExecutor executor = new GenericJdbcExecutor( + driver, url, username, password); + + String query = context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL); + String conditions = + ((GenericJdbcImportPartition)partition).getConditions(); + query = query.replace( + GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions); + ResultSet resultSet = executor.executeQuery(query); + + try { + ResultSetMetaData metaData = resultSet.getMetaData(); + int column = metaData.getColumnCount(); + Object[] array = new Object[column]; + while (resultSet.next()) { + for (int i = 0; i< column; i++) { + array[i] = resultSet.getObject(i+1); + } + writer.writeArrayRecord(array); + } + + } catch (SQLException e) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0004, e); + + } finally { + executor.close(); + } } } diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java index da730e4e..75f3e56a 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java @@ -17,14 +17,301 @@ */ package org.apache.sqoop.connector.jdbc; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +import org.apache.commons.lang.StringUtils; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.job.Constants; import org.apache.sqoop.job.etl.MutableContext; import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.Options; public class GenericJdbcImportInitializer extends Initializer { + private MutableContext context; + private Options options; + + private GenericJdbcExecutor executor; + @Override - public void run(MutableContext context) { - // TODO Auto-generated method stub + public void run(MutableContext context, Options options) { + this.context = context; + this.options = options; + + configureJdbcProperties(); + try { + configurePartitionProperties(); + configureTableProperties(); + + } finally { + executor.close(); + } + } + + private void configureJdbcProperties() { + String driver = options.getOption( + GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER); + String url = options.getOption( + GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING); + String username = options.getOption( + GenericJdbcConnectorConstants.INPUT_CONN_USERNAME); + String password = options.getOption( + GenericJdbcConnectorConstants.INPUT_CONN_PASSWORD); + + if (driver == null) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0012, + GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER); + } + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER, + driver); + + if (url == null) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0012, + GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING); + } + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL, + url); + + if (username != null) { + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_USERNAME, + username); + } + + if (password != null) { + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PASSWORD, + password); + } + + executor = new GenericJdbcExecutor(driver, url, username, password); + } + + private void configurePartitionProperties() { + // ----- configure column name ----- + + String partitionColumnName = options.getOption( + GenericJdbcConnectorConstants.INPUT_TBL_PCOL); + + if (partitionColumnName == null) { + // if column is not specified by the user, + // find the primary key of the table (when there is a table). + String tableName = options.getOption( + GenericJdbcConnectorConstants.INPUT_TBL_NAME); + if (tableName != null) { + partitionColumnName = executor.getPrimaryKey(tableName); + } + } + + if (partitionColumnName != null) { + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, + partitionColumnName); + + } else { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0005); + } + + // ----- configure column type, min value, and max value ----- + + String minMaxQuery = options.getOption( + GenericJdbcConnectorConstants.INPUT_TBL_BOUNDARY); + + if (minMaxQuery == null) { + StringBuilder builder = new StringBuilder(); + + String tableName = options.getOption( + GenericJdbcConnectorConstants.INPUT_TBL_NAME); + String tableSql = options.getOption( + GenericJdbcConnectorConstants.INPUT_TBL_SQL); + + if (tableName != null && tableSql != null) { + // when both table name and table sql are specified: + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007); + + } else if (tableName != null) { + // when table name is specified: + String column = partitionColumnName; + builder.append("SELECT MIN("); + builder.append(column); + builder.append("), MAX("); + builder.append(column); + builder.append(") FROM "); + builder.append(executor.delimitIdentifier(tableName)); + + } else if (tableSql != null) { + String column = executor.qualify( + partitionColumnName, GenericJdbcConnectorConstants.SUBQUERY_ALIAS); + builder.append("SELECT MIN("); + builder.append(column); + builder.append("), MAX("); + builder.append(column); + builder.append(") FROM "); + builder.append("("); + builder.append(tableSql.replace( + GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 1")); + builder.append(") "); + builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS); + + } else { + // when neither are specified: + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008); + } + + minMaxQuery = builder.toString(); + } + + ResultSet rs = executor.executeQuery(minMaxQuery); + try { + ResultSetMetaData rsmd = rs.getMetaData(); + if (rsmd.getColumnCount() != 2) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006); + } + + rs.next(); + + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, + String.valueOf(rsmd.getColumnType(1))); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + rs.getString(1)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + rs.getString(2)); + + } catch (SQLException e) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006, e); + } + } + + private void configureTableProperties() { + String dataSql; + String fieldNames; + String outputDirectory; + + String tableName = options.getOption( + GenericJdbcConnectorConstants.INPUT_TBL_NAME); + String tableSql = options.getOption( + GenericJdbcConnectorConstants.INPUT_TBL_SQL); + String tableColumns = options.getOption( + GenericJdbcConnectorConstants.INPUT_TBL_COLUMNS); + + String datadir = options.getOption( + GenericJdbcConnectorConstants.INPUT_TBL_DATADIR); + String warehouse = options.getOption( + GenericJdbcConnectorConstants.INPUT_TBL_WAREHOUSE); + if (warehouse == null) { + warehouse = GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE; + } else if (!warehouse.endsWith(GenericJdbcConnectorConstants.FILE_SEPARATOR)) { + warehouse += GenericJdbcConnectorConstants.FILE_SEPARATOR; + } + + if (tableName != null && tableSql != null) { + // when both table name and table sql are specified: + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007); + + } else if (tableName != null) { + // when table name is specified: + + if (tableColumns == null) { + StringBuilder builder = new StringBuilder(); + builder.append("SELECT * FROM "); + builder.append(executor.delimitIdentifier(tableName)); + builder.append(" WHERE "); + builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN); + dataSql = builder.toString(); + + String[] queryColumns = executor.getQueryColumns(dataSql.replace( + GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0")); + fieldNames = StringUtils.join(queryColumns, ','); + + } else { + StringBuilder builder = new StringBuilder(); + builder.append("SELECT "); + builder.append(tableColumns); + builder.append(" FROM "); + builder.append(executor.delimitIdentifier(tableName)); + builder.append(" WHERE "); + builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN); + dataSql = builder.toString(); + + fieldNames = tableColumns; + } + + if (datadir == null) { + outputDirectory = warehouse + tableName; + } else { + outputDirectory = warehouse + datadir; + } + + } else if (tableSql != null) { + // when table sql is specified: + + if (tableSql.indexOf( + GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN) == -1) { + // make sure substitute token for conditions is in the specified sql + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0010); + } + + if (tableColumns == null) { + dataSql = tableSql; + + String[] queryColumns = executor.getQueryColumns(dataSql.replace( + GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0")); + fieldNames = StringUtils.join(queryColumns, ','); + + } else { + String[] columns = StringUtils.split(tableColumns, ','); + StringBuilder builder = new StringBuilder(); + builder.append("SELECT "); + builder.append(executor.qualify( + columns[0], GenericJdbcConnectorConstants.SUBQUERY_ALIAS)); + for (int i = 1; i < columns.length; i++) { + builder.append(","); + builder.append(executor.qualify( + columns[i], GenericJdbcConnectorConstants.SUBQUERY_ALIAS)); + } + builder.append(" FROM "); + builder.append("("); + builder.append(tableSql); + builder.append(") "); + builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS); + dataSql = builder.toString(); + + fieldNames = tableColumns; + } + + if (datadir == null) { + outputDirectory = + warehouse + GenericJdbcConnectorConstants.DEFAULT_DATADIR; + } else { + outputDirectory = warehouse + datadir; + } + + } else { + // when neither are specified: + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008); + } + + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL, + dataSql.toString()); + context.setString(Constants.JOB_ETL_FIELD_NAMES, fieldNames); + context.setString(Constants.JOB_ETL_OUTPUT_DIRECTORY, outputDirectory); } } diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java index 2623f159..cba313b4 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java @@ -25,14 +25,24 @@ public class GenericJdbcImportPartition extends Partition { + private String conditions; + + public void setConditions(String conditions) { + this.conditions = conditions; + } + + public String getConditions() { + return conditions; + } + @Override public void readFields(DataInput in) throws IOException { - // TODO Auto-generated method stub + conditions = in.readUTF(); } @Override public void write(DataOutput out) throws IOException { - // TODO Auto-generated method stub + out.writeUTF(conditions); } } diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java index 0540729a..b741b741 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java @@ -17,19 +17,166 @@ */ package org.apache.sqoop.connector.jdbc; +import java.sql.Types; import java.util.LinkedList; import java.util.List; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.job.Constants; import org.apache.sqoop.job.etl.Context; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; public class GenericJdbcImportPartitioner extends Partitioner { + private int numberPartitions; + private String partitionColumnName; + private int partitionColumnType; + private String partitionMinValue; + private String partitionMaxValue; + @Override public List run(Context context) { - // TODO Auto-generated method stub - return new LinkedList(); + numberPartitions = Integer.parseInt(context.getString( + Constants.JOB_ETL_NUMBER_PARTITIONS)); + partitionColumnName = context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME); + partitionColumnType = Integer.parseInt(context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE)); + partitionMinValue = context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE); + partitionMaxValue = context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE); + + switch (partitionColumnType) { + case Types.TINYINT: + case Types.SMALLINT: + case Types.INTEGER: + case Types.BIGINT: + // Integer column + return partitionIntegerColumn(); + + case Types.REAL: + case Types.FLOAT: + case Types.DOUBLE: + // Floating point column + return partitionFloatingPointColumn(); + + case Types.NUMERIC: + case Types.DECIMAL: + // Decimal column + // TODO: Add partition function + + case Types.BIT: + case Types.BOOLEAN: + // Boolean column + // TODO: Add partition function + + case Types.DATE: + case Types.TIME: + case Types.TIMESTAMP: + // Date time column + // TODO: Add partition function + + case Types.CHAR: + case Types.VARCHAR: + case Types.LONGVARCHAR: + // Text column + // TODO: Add partition function + + default: + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011, + String.valueOf(partitionColumnType)); + } } + protected List partitionIntegerColumn() { + List partitions = new LinkedList(); + + if (partitionMinValue == null && partitionMaxValue == null) { + GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); + partition.setConditions(partitionColumnName + "IS NULL"); + partitions.add(partition); + return partitions; + } + + long minValue = Long.parseLong(partitionMinValue); + long maxValue = Long.parseLong(partitionMaxValue); + + long interval = (maxValue - minValue) / numberPartitions; + long remainder = (maxValue - minValue) % numberPartitions; + + if (interval == 0) { + numberPartitions = (int)remainder; + } + + long lowerBound; + long upperBound = minValue; + for (int i = 1; i < numberPartitions; i++) { + lowerBound = upperBound; + upperBound = lowerBound + interval; + upperBound += (i <= remainder) ? 1 : 0; + + GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); + partition.setConditions( + constructConditions(lowerBound, upperBound, false)); + partitions.add(partition); + } + + GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); + partition.setConditions( + constructConditions(upperBound, maxValue, true)); + partitions.add(partition); + + return partitions; + } + + protected List partitionFloatingPointColumn() { + List partitions = new LinkedList(); + + if (partitionMinValue == null && partitionMaxValue == null) { + GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); + partition.setConditions(partitionColumnName + "IS NULL"); + partitions.add(partition); + return partitions; + } + + double minValue = Double.parseDouble(partitionMinValue); + double maxValue = Double.parseDouble(partitionMaxValue); + + double interval = (maxValue - minValue) / numberPartitions; + + double lowerBound; + double upperBound = minValue; + for (int i = 1; i < numberPartitions; i++) { + lowerBound = upperBound; + upperBound = lowerBound + interval; + + GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); + partition.setConditions( + constructConditions(lowerBound, upperBound, false)); + partitions.add(partition); + } + + GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); + partition.setConditions( + constructConditions(upperBound, maxValue, true)); + partitions.add(partition); + + return partitions; + } + + protected String constructConditions( + Object lowerBound, Object upperBound, boolean lastOne) { + StringBuilder conditions = new StringBuilder(); + conditions.append(lowerBound); + conditions.append(" <= "); + conditions.append(partitionColumnName); + conditions.append(" AND "); + conditions.append(partitionColumnName); + conditions.append(lastOne ? " <= " : " < "); + conditions.append(upperBound); + return conditions.toString(); + } } diff --git a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties index c589339f..8f9aa606 100644 --- a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties +++ b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties @@ -54,5 +54,29 @@ form-table-help = You must supply the information requested in order to create \ a connection object. # Table name -inp-tbl-table-label = Table name -inp-tbl-table-help = Name of the table in remote database +inp-tbl-name-label = Table name +inp-tbl-name-help = Table name to process data in the remote database + +# Table SQL +inp-tbl-sql-label = Table SQL statement +inp-tbl-sql-help = SQL statement to process data in the remote database + +# Table columns +inp-tbl-columns-label = Table column names +inp-tbl-columns-help = Specific columns of a table name or a table SQL + +# Table warehouse +inp-tbl-warehouse-label = Data warehouse +inp-tbl-warehouse-help = The root directory for data + +# Table datadir +inp-tbl-datadir-label = Data directory +inp-tbl-datadir-help = The sub-directory under warehouse for data + +# Table pcol +inp-tbl-pcol-label = Partition column name +inp-tbl-pcol-help = A specific column for data partition + +# Table boundary +inp-tbl-boundary-label = Boundary query +inp-tbl-boundary-help = The boundary query for data partition diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcTestConstants.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcTestConstants.java new file mode 100644 index 00000000..67ba5bfe --- /dev/null +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcTestConstants.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +public class GenericJdbcTestConstants { + + public static final String DRIVER = "org.apache.derby.jdbc.EmbeddedDriver"; + public static final String URL = "jdbc:derby:memory:TESTDB;create=true"; + +} diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java new file mode 100644 index 00000000..519286b2 --- /dev/null +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import java.util.HashMap; + +import junit.framework.TestCase; + +import org.apache.sqoop.job.etl.Extractor; +import org.apache.sqoop.job.etl.MutableContext; +import org.apache.sqoop.job.io.DataWriter; +import org.junit.Test; + +public class TestImportExtractor extends TestCase { + + private final String tableName; + + private GenericJdbcExecutor executor; + + private static final int START = -50; + private static final int NUMBER_OF_ROWS = 101; + + public TestImportExtractor() { + tableName = getClass().getSimpleName(); + } + + @Override + public void setUp() { + executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, + GenericJdbcTestConstants.URL, null, null); + + if (!executor.existTable(tableName)) { + executor.executeUpdate("CREATE TABLE " + + executor.delimitIdentifier(tableName) + + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + + for (int i = 0; i < NUMBER_OF_ROWS; i++) { + int value = START + i; + String sql = "INSERT INTO " + executor.delimitIdentifier(tableName) + + " VALUES(" + value + ", " + value + ", '" + value + "')"; + executor.executeUpdate(sql); + } + } + } + + @Override + public void tearDown() { + executor.close(); + } + + @Test + public void testQuery() throws Exception { + DummyContext context = new DummyContext(); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER, + GenericJdbcTestConstants.DRIVER); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL, + GenericJdbcTestConstants.URL); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL, + "SELECT * FROM " + executor.delimitIdentifier(tableName) + + " WHERE ${CONDITIONS}"); + + GenericJdbcImportPartition partition; + + Extractor extractor = new GenericJdbcImportExtractor(); + DummyWriter writer = new DummyWriter(); + + partition = new GenericJdbcImportPartition(); + partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665"); + extractor.run(context, partition, writer); + + partition = new GenericJdbcImportPartition(); + partition.setConditions("-16.6666666666666665 <= DCOL AND DCOL < 16.666666666666667"); + extractor.run(context, partition, writer); + + partition = new GenericJdbcImportPartition(); + partition.setConditions("16.666666666666667 <= DCOL AND DCOL <= 50.0"); + extractor.run(context, partition, writer); + } + + @Test + public void testSubquery() throws Exception { + DummyContext context = new DummyContext(); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER, + GenericJdbcTestConstants.DRIVER); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL, + GenericJdbcTestConstants.URL); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL, + "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " + + "(SELECT * FROM " + executor.delimitIdentifier(tableName) + + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS"); + + GenericJdbcImportPartition partition; + + Extractor extractor = new GenericJdbcImportExtractor(); + DummyWriter writer = new DummyWriter(); + + partition = new GenericJdbcImportPartition(); + partition.setConditions("-50 <= ICOL AND ICOL < -16"); + extractor.run(context, partition, writer); + + partition = new GenericJdbcImportPartition(); + partition.setConditions("-16 <= ICOL AND ICOL < 17"); + extractor.run(context, partition, writer); + + partition = new GenericJdbcImportPartition(); + partition.setConditions("17 <= ICOL AND ICOL < 50"); + extractor.run(context, partition, writer); + } + + public class DummyContext implements MutableContext { + HashMap store = new HashMap(); + + @Override + public String getString(String key) { + return store.get(key); + } + + @Override + public void setString(String key, String value) { + store.put(key, value); + } + } + + public class DummyWriter extends DataWriter { + int indx = START; + + @Override + public void writeArrayRecord(Object[] array) { + for (int i = 0; i < array.length; i++) { + if (array[i] instanceof Integer) { + assertEquals(indx, ((Integer)array[i]).intValue()); + } else if (array[i] instanceof Double) { + assertEquals((double)indx, ((Double)array[i]).doubleValue()); + } else { + assertEquals(String.valueOf(indx), array[i].toString()); + } + } + indx++; + } + + @Override + public void writeCsvRecord(String csv) { + fail("This method should not be invoked."); + } + + @Override + public void writeRecord(Object record) { + fail("This method should not be invoked."); + } + } + +} diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java new file mode 100644 index 00000000..54655934 --- /dev/null +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import java.sql.Types; +import java.util.Hashtable; + +import junit.framework.TestCase; + +import org.apache.sqoop.job.Constants; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.MutableContext; +import org.apache.sqoop.job.etl.Options; +import org.junit.Test; + +public class TestImportInitializer extends TestCase { + + private final String tableName; + private final String tableSql; + private final String tableColumns; + + private GenericJdbcExecutor executor; + + private static final int START = -50; + private static final int NUMBER_OF_ROWS = 101; + + public TestImportInitializer() { + tableName = getClass().getSimpleName(); + tableSql = "SELECT * FROM \"" + tableName + "\" WHERE ${CONDITIONS}"; + tableColumns = "ICOL,VCOL"; + } + + @Override + public void setUp() { + executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, + GenericJdbcTestConstants.URL, null, null); + + if (!executor.existTable(tableName)) { + executor.executeUpdate("CREATE TABLE " + + executor.delimitIdentifier(tableName) + + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + + for (int i = 0; i < NUMBER_OF_ROWS; i++) { + int value = START + i; + String sql = "INSERT INTO " + executor.delimitIdentifier(tableName) + + " VALUES(" + value + ", " + value + ", '" + value + "')"; + executor.executeUpdate(sql); + } + } + } + + @Override + public void tearDown() { + executor.close(); + } + + @Test + public void testTableName() throws Exception { + DummyOptions options = new DummyOptions(); + options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER, + GenericJdbcTestConstants.DRIVER); + options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING, + GenericJdbcTestConstants.URL); + options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_NAME, + tableName); + + DummyContext context = new DummyContext(); + + Initializer initializer = new GenericJdbcImportInitializer(); + initializer.run(context, options); + + verifyResult(context, + "SELECT * FROM " + executor.delimitIdentifier(tableName) + + " WHERE ${CONDITIONS}", + "ICOL,DCOL,VCOL", + GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE + tableName, + "ICOL", + String.valueOf(Types.INTEGER), + String.valueOf(START), + String.valueOf(START+NUMBER_OF_ROWS-1)); + } + + @Test + public void testTableNameWithTableColumns() throws Exception { + DummyOptions options = new DummyOptions(); + options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER, + GenericJdbcTestConstants.DRIVER); + options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING, + GenericJdbcTestConstants.URL); + options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_NAME, + tableName); + options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_COLUMNS, + tableColumns); + + DummyContext context = new DummyContext(); + + Initializer initializer = new GenericJdbcImportInitializer(); + initializer.run(context, options); + + verifyResult(context, + "SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(tableName) + + " WHERE ${CONDITIONS}", + tableColumns, + GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE + tableName, + "ICOL", + String.valueOf(Types.INTEGER), + String.valueOf(START), + String.valueOf(START+NUMBER_OF_ROWS-1)); + } + + @Test + public void testTableSql() throws Exception { + DummyOptions options = new DummyOptions(); + options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER, + GenericJdbcTestConstants.DRIVER); + options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING, + GenericJdbcTestConstants.URL); + options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_SQL, + tableSql); + options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_PCOL, + "DCOL"); + + DummyContext context = new DummyContext(); + + Initializer initializer = new GenericJdbcImportInitializer(); + initializer.run(context, options); + + verifyResult(context, + "SELECT * FROM " + executor.delimitIdentifier(tableName) + + " WHERE ${CONDITIONS}", + "ICOL,DCOL,VCOL", + GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE + + GenericJdbcConnectorConstants.DEFAULT_DATADIR, + "DCOL", + String.valueOf(Types.DOUBLE), + String.valueOf((double)START), + String.valueOf((double)(START+NUMBER_OF_ROWS-1))); + } + + @Test + public void testTableSqlWithTableColumns() throws Exception { + DummyOptions options = new DummyOptions(); + options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER, + GenericJdbcTestConstants.DRIVER); + options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING, + GenericJdbcTestConstants.URL); + options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_SQL, + tableSql); + options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_COLUMNS, + tableColumns); + options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_PCOL, + "DCOL"); + + DummyContext context = new DummyContext(); + + Initializer initializer = new GenericJdbcImportInitializer(); + initializer.run(context, options); + + verifyResult(context, + "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " + + "(SELECT * FROM " + executor.delimitIdentifier(tableName) + + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS", + tableColumns, + GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE + + GenericJdbcConnectorConstants.DEFAULT_DATADIR, + "DCOL", + String.valueOf(Types.DOUBLE), + String.valueOf((double)START), + String.valueOf((double)(START+NUMBER_OF_ROWS-1))); + } + + private void verifyResult(DummyContext context, + String dataSql, String fieldNames, String outputDirectory, + String partitionColumnName, String partitionColumnType, + String partitionMinValue, String partitionMaxValue) { + assertEquals(dataSql, context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL)); + assertEquals(fieldNames, context.getString( + Constants.JOB_ETL_FIELD_NAMES)); + assertEquals(outputDirectory, context.getString( + Constants.JOB_ETL_OUTPUT_DIRECTORY)); + + assertEquals(partitionColumnName, context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME)); + assertEquals(partitionColumnType, context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE)); + assertEquals(partitionMinValue, context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE)); + assertEquals(partitionMaxValue, context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE)); + } + + public class DummyOptions implements Options { + Hashtable store = new Hashtable(); + + public void setOption(String key, String value) { + store.put(key, value); + } + + @Override + public String getOption(String key) { + return store.get(key); + } + } + + public class DummyContext implements MutableContext { + Hashtable store = new Hashtable(); + + @Override + public String getString(String key) { + return store.get(key); + } + + @Override + public void setString(String key, String value) { + store.put(key, value); + } + } + +} diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java new file mode 100644 index 00000000..0e95a43e --- /dev/null +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import java.sql.Types; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.sqoop.job.Constants; +import org.apache.sqoop.job.etl.MutableContext; +import org.apache.sqoop.job.etl.Partition; +import org.apache.sqoop.job.etl.Partitioner; +import org.junit.Test; + +public class TestImportPartitioner extends TestCase { + + private static final int START = -5; + private static final int NUMBER_OF_ROWS = 11; + + @Test + public void testIntegerEvenPartition() throws Exception { + DummyContext context = new DummyContext(); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, + "ICOL"); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, + String.valueOf(Types.INTEGER)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + String.valueOf(START)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + String.valueOf(START + NUMBER_OF_ROWS - 1)); + context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "5"); + + Partitioner partitioner = new GenericJdbcImportPartitioner(); + List partitions = partitioner.run(context); + + verifyResult(partitions, new String[] { + "-5 <= ICOL AND ICOL < -3", + "-3 <= ICOL AND ICOL < -1", + "-1 <= ICOL AND ICOL < 1", + "1 <= ICOL AND ICOL < 3", + "3 <= ICOL AND ICOL <= 5" + }); + } + + @Test + public void testIntegerUnevenPartition() throws Exception { + DummyContext context = new DummyContext(); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, + "ICOL"); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, + String.valueOf(Types.INTEGER)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + String.valueOf(START)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + String.valueOf(START + NUMBER_OF_ROWS - 1)); + context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "3"); + + Partitioner partitioner = new GenericJdbcImportPartitioner(); + List partitions = partitioner.run(context); + + verifyResult(partitions, new String[] { + "-5 <= ICOL AND ICOL < -1", + "-1 <= ICOL AND ICOL < 2", + "2 <= ICOL AND ICOL <= 5" + }); + } + + @Test + public void testIntegerOverPartition() throws Exception { + DummyContext context = new DummyContext(); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, + "ICOL"); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, + String.valueOf(Types.INTEGER)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + String.valueOf(START)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + String.valueOf(START + NUMBER_OF_ROWS - 1)); + context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "13"); + + Partitioner partitioner = new GenericJdbcImportPartitioner(); + List partitions = partitioner.run(context); + + verifyResult(partitions, new String[] { + "-5 <= ICOL AND ICOL < -4", + "-4 <= ICOL AND ICOL < -3", + "-3 <= ICOL AND ICOL < -2", + "-2 <= ICOL AND ICOL < -1", + "-1 <= ICOL AND ICOL < 0", + "0 <= ICOL AND ICOL < 1", + "1 <= ICOL AND ICOL < 2", + "2 <= ICOL AND ICOL < 3", + "3 <= ICOL AND ICOL < 4", + "4 <= ICOL AND ICOL <= 5" + }); + } + + @Test + public void testFloatingPointEvenPartition() throws Exception { + DummyContext context = new DummyContext(); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, + "DCOL"); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, + String.valueOf(Types.DOUBLE)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + String.valueOf((double)START)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + String.valueOf((double)(START + NUMBER_OF_ROWS - 1))); + context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "5"); + + Partitioner partitioner = new GenericJdbcImportPartitioner(); + List partitions = partitioner.run(context); + + verifyResult(partitions, new String[] { + "-5.0 <= DCOL AND DCOL < -3.0", + "-3.0 <= DCOL AND DCOL < -1.0", + "-1.0 <= DCOL AND DCOL < 1.0", + "1.0 <= DCOL AND DCOL < 3.0", + "3.0 <= DCOL AND DCOL <= 5.0" + }); + } + + @Test + public void testFloatingPointUnevenPartition() throws Exception { + DummyContext context = new DummyContext(); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, + "DCOL"); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, + String.valueOf(Types.DOUBLE)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + String.valueOf((double)START)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + String.valueOf((double)(START + NUMBER_OF_ROWS - 1))); + context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "3"); + + Partitioner partitioner = new GenericJdbcImportPartitioner(); + List partitions = partitioner.run(context); + + verifyResult(partitions, new String[] { + "-5.0 <= DCOL AND DCOL < -1.6666666666666665", + "-1.6666666666666665 <= DCOL AND DCOL < 1.666666666666667", + "1.666666666666667 <= DCOL AND DCOL <= 5.0" + }); + } + + private void verifyResult(List partitions, + String[] expected) { + assertEquals(expected.length, partitions.size()); + + Iterator iterator = partitions.iterator(); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], + ((GenericJdbcImportPartition)iterator.next()).getConditions()); + } + } + + public class DummyContext implements MutableContext { + HashMap store = new HashMap(); + + @Override + public String getString(String key) { + return store.get(key); + } + + @Override + public void setString(String key, String value) { + store.put(key, value); + } + } + +} diff --git a/core/pom.xml b/core/pom.xml index fc5bc2e0..028c2406 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -35,27 +35,25 @@ limitations under the License. org.apache.sqoop sqoop-spi - 2.0.0-SNAPSHOT org.apache.sqoop sqoop-common - 2.0.0-SNAPSHOT - - - org.apache.hadoop - hadoop-common - 2.0.0-SNAPSHOT - - - org.apache.hadoop - hadoop-mapreduce-client-jobclient - 2.0.0-SNAPSHOT commons-dbcp commons-dbcp + + org.apache.hadoop + hadoop-common + provided + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + provided + junit junit diff --git a/core/src/main/java/org/apache/sqoop/core/CoreError.java b/core/src/main/java/org/apache/sqoop/core/CoreError.java index 2697eef6..29c0809c 100644 --- a/core/src/main/java/org/apache/sqoop/core/CoreError.java +++ b/core/src/main/java/org/apache/sqoop/core/CoreError.java @@ -84,8 +84,16 @@ public enum CoreError implements ErrorCode { /** Error occurs during loader run */ CORE_0018("Error occurs during loader run"), - /** Data have not been completely consumed yet */ - CORE_0019("Data have not been completely consumed yet"); + CORE_0019("Data have not been completely consumed yet"), + + /** The required option has not been set yet */ + CORE_0020("The required option has not been set yet"), + + /** Error occurs during partitioner run */ + CORE_0021("Error occurs during partitioner run"), + + /** Unable to parse because it is not properly delimited */ + CORE_0022("Unable to parse because it is not properly delimited"); private final String message; diff --git a/core/src/main/java/org/apache/sqoop/job/JobConstants.java b/core/src/main/java/org/apache/sqoop/job/JobConstants.java index 54fc5435..a032c724 100644 --- a/core/src/main/java/org/apache/sqoop/job/JobConstants.java +++ b/core/src/main/java/org/apache/sqoop/job/JobConstants.java @@ -19,7 +19,7 @@ import org.apache.sqoop.core.ConfigurationConstants; -public final class JobConstants { +public final class JobConstants extends Constants { /** * All job related configuration is prefixed with this: diff --git a/pom.xml b/pom.xml index 7549ea86..6206f2e1 100644 --- a/pom.xml +++ b/pom.xml @@ -91,11 +91,14 @@ limitations under the License. UTF-8 1.6 1.6 - 1.2.16 - 1.1 1.4 + 2.5 10.8.2.2 + 2.0.0-SNAPSHOT + 1.1 4.9 + 1.2.16 + 2.5 @@ -108,14 +111,57 @@ limitations under the License. - log4j - log4j - ${log4j.version} + org.apache.sqoop + sqoop-client + ${project.version} - javax.servlet - servlet-api - 2.5 + org.apache.sqoop + sqoop-common + ${project.version} + + + org.apache.sqoop + sqoop-core + ${project.version} + + + org.apache.sqoop + sqoop-core + test-jar + ${project.version} + + + org.apache.sqoop + sqoop-server + war + ${project.version} + + + org.apache.sqoop + sqoop-spi + ${project.version} + + + org.apache.sqoop.repository + sqoop-repository-derby + ${project.version} + + + org.apache.sqoop.connector + sqoop-connector-generic-jdbc + ${project.version} + + + org.apache.sqoop.connector + sqoop-connector-generic-jdbc + test-jar + ${project.version} + + + org.apache.sqoop.connector + sqoop-connector-mysql-jdbc + ${project.version} com.googlecode.json-simple @@ -128,16 +174,41 @@ limitations under the License. ${commons-dbcp.version} - org.apache.derby - derby - ${derby.version} + commons-lang + commons-lang + ${commons-lang.version} + + + javax.servlet + servlet-api + ${servlet.version} junit junit ${junit.version} - + + log4j + log4j + ${log4j.version} + + + org.apache.derby + derby + ${derby.version} + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + ${hadoop.version} + + diff --git a/spi/src/main/java/org/apache/sqoop/job/Constants.java b/spi/src/main/java/org/apache/sqoop/job/Constants.java new file mode 100644 index 00000000..927950d9 --- /dev/null +++ b/spi/src/main/java/org/apache/sqoop/job/Constants.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job; + +public class Constants { + + /** + * All job related configuration is prefixed with this: + * org.apache.sqoop.job. + */ + public static final String PREFIX_CONFIG = "org.apache.sqoop.job."; + + public static final String JOB_ETL_NUMBER_PARTITIONS = PREFIX_CONFIG + + "etl.number.partitions"; + + public static final String JOB_ETL_FIELD_NAMES = PREFIX_CONFIG + + "etl.field.names"; + + public static final String JOB_ETL_OUTPUT_DIRECTORY = PREFIX_CONFIG + + "etl.output.directory"; + + protected Constants() { + // Disable explicit object creation + } + +} diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java index 00f1a6c5..75bd42e7 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java @@ -23,6 +23,6 @@ */ public abstract class Initializer { - public abstract void run(MutableContext context); + public abstract void run(MutableContext context, Options options); } diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Options.java b/spi/src/main/java/org/apache/sqoop/job/etl/Options.java new file mode 100644 index 00000000..2dc46718 --- /dev/null +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Options.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.etl; + +/** + * The options provided from user input. + */ +public interface Options { + + public String getOption(String key); + +}