diff --git a/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java b/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java index 03bc1049..f18acbd7 100644 --- a/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java +++ b/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java @@ -85,6 +85,10 @@ public enum GenericJdbcConnectorError implements ErrorCode { GENERIC_JDBC_CONNECTOR_0021("Schema column size do not match the result set column size"), + GENERIC_JDBC_CONNECTOR_0022("Can't find maximal value of column"), + + GENERIC_JDBC_CONNECTOR_0023("Received error from the database"), + ; private final String message; diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java index 4369071a..dc86821a 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java @@ -41,6 +41,8 @@ public final class GenericJdbcConnectorConstants { PREFIX_CONNECTOR_JDBC_CONFIG + "partition.minvalue"; public static final String CONNECTOR_JDBC_PARTITION_MAXVALUE = PREFIX_CONNECTOR_JDBC_CONFIG + "partition.maxvalue"; + public static final String CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE = + PREFIX_CONNECTOR_JDBC_CONFIG + "incremental.last_value"; public static final String CONNECTOR_JDBC_FROM_DATA_SQL = PREFIX_CONNECTOR_JDBC_CONFIG + "from.data.sql"; diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java index 7a01992e..5af34a5a 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java @@ -72,6 +72,15 @@ public ResultSet executeQuery(String sql) { } } + public PreparedStatement createStatement(String sql) { + try { + return connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + } catch (SQLException e) { + logSQLException(e); + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e); + } + } + public void setAutoCommit(boolean autoCommit) { try { connection.setAutoCommit(autoCommit); diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java index 1ecd152d..6ad2cab8 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java @@ -17,6 +17,7 @@ */ package org.apache.sqoop.connector.jdbc; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -50,6 +51,8 @@ public void initialize(InitializerContext context, LinkConfiguration linkConfig, try { configurePartitionProperties(context.getContext(), linkConfig, fromJobConfig); configureTableProperties(context.getContext(), linkConfig, fromJobConfig); + } catch(SQLException e) { + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e); } finally { executor.close(); } @@ -124,108 +127,141 @@ private void configureJdbcProperties(MutableContext context, LinkConfiguration l executor = new GenericJdbcExecutor(driver, url, username, password); } - private void configurePartitionProperties(MutableContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) { - // ----- configure column name ----- + private void configurePartitionProperties(MutableContext context, LinkConfiguration linkConfig, FromJobConfiguration jobConf) throws SQLException { + // Assertions that should be valid (verified via validator) + assert (jobConf.fromJobConfig.tableName != null && jobConf.fromJobConfig.sql == null) || + (jobConf.fromJobConfig.tableName == null && jobConf.fromJobConfig.sql != null); + assert (jobConf.fromJobConfig.boundaryQuery == null && jobConf.incrementalRead.checkColumn == null) || + (jobConf.fromJobConfig.boundaryQuery != null && jobConf.incrementalRead.checkColumn == null) || + (jobConf.fromJobConfig.boundaryQuery == null && jobConf.incrementalRead.checkColumn != null); - String partitionColumnName = fromJobConfig.fromJobConfig.partitionColumn; + // We have few if/else conditions based on import type + boolean tableImport = jobConf.fromJobConfig.tableName != null; + boolean incrementalImport = jobConf.incrementalRead.checkColumn != null; - if (partitionColumnName == null) { - // if column is not specified by the user, - // find the primary key of the fromTable (when there is a fromTable). - String tableName = fromJobConfig.fromJobConfig.tableName; - if (tableName != null) { - partitionColumnName = executor.getPrimaryKey(tableName); - } + // For generating queries + StringBuilder sb = new StringBuilder(); + + // Partition column name + String partitionColumnName = jobConf.fromJobConfig.partitionColumn; + // If it's not specified, we can use primary key of given table (if it's table based import) + if (StringUtils.isBlank(partitionColumnName) && tableImport) { + partitionColumnName = executor.getPrimaryKey(jobConf.fromJobConfig.tableName); } - + // If we don't have partition column name, we will error out if (partitionColumnName != null) { - context.setString( - GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, - partitionColumnName); - + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, partitionColumnName); } else { - throw new SqoopException( - GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0005); + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0005); + } + LOG.info("Using partition column: " + partitionColumnName); + + // From fragment for subsequent queries + String fromFragment; + if(tableImport) { + String tableName = jobConf.fromJobConfig.tableName; + String schemaName = jobConf.fromJobConfig.schemaName; + + fromFragment = executor.delimitIdentifier(tableName); + if(schemaName != null) { + fromFragment = executor.delimitIdentifier(schemaName) + "." + fromFragment; + } + } else { + sb.setLength(0); + sb.append("("); + sb.append(jobConf.fromJobConfig.sql.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 1")); + sb.append(") "); + sb.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS); + fromFragment = sb.toString(); } - // ----- configure column type, min value, and max value ----- + // If this is incremental, then we need to get new maximal value and persist is a constant + String incrementalMaxValue = null; + if(incrementalImport) { + sb.setLength(0); + sb.append("SELECT "); + sb.append("MAX(").append(jobConf.incrementalRead.checkColumn).append(") "); + sb.append("FROM "); + sb.append(fromFragment); - String minMaxQuery = fromJobConfig.fromJobConfig.boundaryQuery; + String incrementalNewMaxValueQuery = sb.toString(); + LOG.info("Incremental new max value query: " + incrementalNewMaxValueQuery); + ResultSet rs = null; + try { + rs = executor.executeQuery(incrementalNewMaxValueQuery); + + if (!rs.next()) { + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0022); + } + + incrementalMaxValue = rs.getString(1); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE, incrementalMaxValue); + LOG.info("New maximal value for incremental import is " + incrementalMaxValue); + } finally { + if(rs != null) { + rs.close(); + } + } + } + + // Retrieving min and max values for partition column + String minMaxQuery = jobConf.fromJobConfig.boundaryQuery; if (minMaxQuery == null) { - StringBuilder builder = new StringBuilder(); + sb.setLength(0); + sb.append("SELECT "); + sb.append("MIN(").append(partitionColumnName).append("), "); + sb.append("MAX(").append(partitionColumnName).append(") "); + sb.append("FROM ").append(fromFragment).append(" "); - String schemaName = fromJobConfig.fromJobConfig.schemaName; - String tableName = fromJobConfig.fromJobConfig.tableName; - String tableSql = fromJobConfig.fromJobConfig.sql; - - if (tableName != null && tableSql != null) { - // when both fromTable name and fromTable sql are specified: - throw new SqoopException( - GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007); - - } else if (tableName != null) { - // when fromTable name is specified: - - // For databases that support schemas (IE: postgresql). - String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); - - String column = partitionColumnName; - builder.append("SELECT MIN("); - builder.append(column); - builder.append("), MAX("); - builder.append(column); - builder.append(") FROM "); - builder.append(fullTableName); - - } else if (tableSql != null) { - String column = executor.qualify( - partitionColumnName, GenericJdbcConnectorConstants.SUBQUERY_ALIAS); - builder.append("SELECT MIN("); - builder.append(column); - builder.append("), MAX("); - builder.append(column); - builder.append(") FROM "); - builder.append("("); - builder.append(tableSql.replace( - GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 1")); - builder.append(") "); - builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS); - - } else { - // when neither are specified: - throw new SqoopException( - GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008); + if(incrementalImport) { + sb.append("WHERE "); + sb.append(jobConf.incrementalRead.checkColumn).append(" > ?"); + sb.append(" AND "); + sb.append(jobConf.incrementalRead.checkColumn).append(" <= ?"); } - minMaxQuery = builder.toString(); + minMaxQuery = sb.toString(); } + LOG.info("Using min/max query: " + minMaxQuery); - - LOG.debug("Using minMaxQuery: " + minMaxQuery); - ResultSet rs = executor.executeQuery(minMaxQuery); + PreparedStatement ps = null; + ResultSet rs = null; try { - ResultSetMetaData rsmd = rs.getMetaData(); - if (rsmd.getColumnCount() != 2) { - throw new SqoopException( - GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006); + ps = executor.createStatement(minMaxQuery); + if (incrementalImport) { + ps.setString(1, jobConf.incrementalRead.lastValue); + ps.setString(2, incrementalMaxValue); } - rs.next(); + rs = ps.executeQuery(); + if(!rs.next()) { + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006); + } - int columnType = rsmd.getColumnType(1); + // Boundaries for the job String min = rs.getString(1); String max = rs.getString(2); - LOG.info("Boundaries: min=" + min + ", max=" + max + ", columnType=" + columnType); + // Type of the partition column + ResultSetMetaData rsmd = rs.getMetaData(); + if (rsmd.getColumnCount() != 2) { + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006); + } + int columnType = rsmd.getColumnType(1); + + LOG.info("Boundaries for the job: min=" + min + ", max=" + max + ", columnType=" + columnType); context.setInteger(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, columnType); context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, min); context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, max); - - } catch (SQLException e) { - throw new SqoopException( - GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006, e); + } finally { + if(ps != null) { + ps.close(); + } + if(rs != null) { + rs.close(); + } } } diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java index 39e8eddc..d11b3b14 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java @@ -27,7 +27,10 @@ public class FromJobConfiguration { @Config public FromJobConfig fromJobConfig; + @Config public IncrementalRead incrementalRead; + public FromJobConfiguration() { fromJobConfig = new FromJobConfig(); + incrementalRead = new IncrementalRead(); } } diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/IncrementalRead.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/IncrementalRead.java new file mode 100644 index 00000000..f226532d --- /dev/null +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/IncrementalRead.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.configuration; + +import org.apache.sqoop.model.ConfigClass; +import org.apache.sqoop.model.Input; +import org.apache.sqoop.model.InputEditable; +import org.apache.sqoop.model.Validator; +import org.apache.sqoop.validation.Status; +import org.apache.sqoop.validation.validators.AbstractValidator; + +/** + */ +@ConfigClass(validators = {@Validator(IncrementalRead.ConfigValidator.class)}) +public class IncrementalRead { + @Input(size = 50) + public String checkColumn; + + @Input(editable = InputEditable.ANY) + public String lastValue; + + public static class ConfigValidator extends AbstractValidator { + @Override + public void validate(IncrementalRead conf) { + if(conf.checkColumn != null && conf.lastValue == null) { + addMessage(Status.ERROR, "Last value is required during incremental read"); + } + + if(conf.checkColumn == null && conf.lastValue != null) { + addMessage(Status.ERROR, "Last value can't be filled without check column."); + } + } + } +} diff --git a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties index 6a2159ba..52bf6312 100644 --- a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties +++ b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties @@ -112,9 +112,13 @@ toJobConfig.stageTableName.help = Name of the staging table to use (Optional) toJobConfig.shouldClearStageTable.label = Should clear stage table toJobConfig.shouldClearStageTable.help = Indicate if the stage table should be cleared (Defaults to false) -# Placeholders to have some entities created -ignored.label = Ignored -ignored.help = This is completely ignored +# Incremental related configuration +incrementalRead.label = Incremental read +incrementalRead.help = Configuration related to incremental read + +incrementalRead.checkColumn.label = Check column +incrementalRead.checkColumn.help = Column that is checked during incremental read for new values + +incrementalRead.lastValue.label = Last value +incrementalRead.lastValue.help = Last read value, fetch will resume with higher values -ignored.ignored.label = Ignored -ignored.ignored.help = This is completely ignored \ No newline at end of file diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java index 52003abb..e9c8d417 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java @@ -140,6 +140,66 @@ public void testTableName() throws Exception { String.valueOf(START+NUMBER_OF_ROWS-1)); } + @Test + @SuppressWarnings("unchecked") + public void testIncrementalTableNameFullRange() throws Exception { + LinkConfiguration linkConfig = new LinkConfiguration(); + FromJobConfiguration jobConfig = new FromJobConfiguration(); + + linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER; + linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL; + jobConfig.fromJobConfig.tableName = schemalessTableName; + jobConfig.incrementalRead.checkColumn = "ICOL"; + jobConfig.incrementalRead.lastValue = "-51"; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcFromInitializer(); + initializer.initialize(initializerContext, linkConfig, jobConfig); + + verifyResult(context, + "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}", + "ICOL,DCOL,VCOL", + "ICOL", + String.valueOf(Types.INTEGER), + String.valueOf(START), + String.valueOf(START+NUMBER_OF_ROWS-1)); + + assertEquals(String.valueOf(START+NUMBER_OF_ROWS-1), context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE)); + } + + @Test + @SuppressWarnings("unchecked") + public void testIncrementalTableNameFromZero() throws Exception { + LinkConfiguration linkConfig = new LinkConfiguration(); + FromJobConfiguration jobConfig = new FromJobConfiguration(); + + linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER; + linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL; + jobConfig.fromJobConfig.tableName = schemalessTableName; + jobConfig.incrementalRead.checkColumn = "ICOL"; + jobConfig.incrementalRead.lastValue = "0"; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcFromInitializer(); + initializer.initialize(initializerContext, linkConfig, jobConfig); + + verifyResult(context, + "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}", + "ICOL,DCOL,VCOL", + "ICOL", + String.valueOf(Types.INTEGER), + String.valueOf(1), + String.valueOf(START+NUMBER_OF_ROWS-1)); + + assertEquals(String.valueOf(START+NUMBER_OF_ROWS-1), context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE)); + } + @Test @SuppressWarnings("unchecked") public void testTableNameWithTableColumns() throws Exception { @@ -196,6 +256,66 @@ public void testTableSql() throws Exception { String.valueOf((double)(START+NUMBER_OF_ROWS-1))); } + @Test + @SuppressWarnings("unchecked") + public void testIncrementalTableSqlFullRange() throws Exception { + LinkConfiguration linkConfig = new LinkConfiguration(); + FromJobConfiguration jobConfig = new FromJobConfiguration(); + + linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER; + linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL; + jobConfig.fromJobConfig.sql = schemalessTableSql; + jobConfig.fromJobConfig.partitionColumn = "ICOL"; + jobConfig.incrementalRead.checkColumn = "ICOL"; + jobConfig.incrementalRead.lastValue = "-51"; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcFromInitializer(); + initializer.initialize(initializerContext, linkConfig, jobConfig); + + verifyResult(context, + "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}", + "ICOL,DCOL,VCOL", + "ICOL", + String.valueOf(Types.INTEGER), + String.valueOf(START), + String.valueOf((START+NUMBER_OF_ROWS-1))); + assertEquals(String.valueOf(START+NUMBER_OF_ROWS-1), context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE)); + } + + @Test + @SuppressWarnings("unchecked") + public void testIncrementalTableSqlFromZero() throws Exception { + LinkConfiguration linkConfig = new LinkConfiguration(); + FromJobConfiguration jobConfig = new FromJobConfiguration(); + + linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER; + linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL; + jobConfig.fromJobConfig.sql = schemalessTableSql; + jobConfig.fromJobConfig.partitionColumn = "ICOL"; + jobConfig.incrementalRead.checkColumn = "ICOL"; + jobConfig.incrementalRead.lastValue = "0"; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcFromInitializer(); + initializer.initialize(initializerContext, linkConfig, jobConfig); + + verifyResult(context, + "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}", + "ICOL,DCOL,VCOL", + "ICOL", + String.valueOf(Types.INTEGER), + String.valueOf(1), + String.valueOf((START+NUMBER_OF_ROWS-1))); + assertEquals(String.valueOf(START+NUMBER_OF_ROWS-1), context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE)); + } + @Test @SuppressWarnings("unchecked") public void testTableSqlWithTableColumns() throws Exception { diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcConnector.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcConnector.java new file mode 100644 index 00000000..cc1c58f3 --- /dev/null +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcConnector.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.model.ConfigUtils; +import org.apache.sqoop.model.MConfig; +import org.apache.sqoop.model.MInput; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Locale; +import java.util.ResourceBundle; + +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +/** + */ +public class TestGenericJdbcConnector { + + @Test + public void testBundleForLink() { + GenericJdbcConnector connector = new GenericJdbcConnector(); + verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getLinkConfigurationClass()); + } + + @Test + void testBundleForJobToDirection() { + GenericJdbcConnector connector = new GenericJdbcConnector(); + verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getJobConfigurationClass(Direction.TO)); + } + + @Test + void testBundleForJobFromDirection() { + GenericJdbcConnector connector = new GenericJdbcConnector(); + verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getJobConfigurationClass(Direction.FROM)); + } + + void verifyBundleForConfigClass(ResourceBundle bundle, Class klass) { + assertNotNull(bundle); + assertNotNull(klass); + + List configs = ConfigUtils.toConfigs(klass); + + for(MConfig config : configs) { + assertNotNull(config.getHelpKey()); + assertNotNull(config.getLabelKey()); + + assertTrue(bundle.containsKey(config.getHelpKey()), "Can't find help for " + config.getName()); + assertTrue(bundle.containsKey(config.getLabelKey()), "Can't find label for " + config.getName()); + + for(MInput input : config.getInputs()) { + assertNotNull(input.getHelpKey()); + assertNotNull(input.getLabelKey()); + + assertTrue(bundle.containsKey(input.getHelpKey()), "Can't find help for " + input.getName()); + assertTrue(bundle.containsKey(input.getLabelKey()), "Can't find label for " + input.getName()); + } + } + } +} diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java new file mode 100644 index 00000000..716de307 --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.integration.connector.jdbc.generic; + +import com.google.common.collect.Iterables; +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.connector.hdfs.configuration.ToFormat; +import org.apache.sqoop.model.MConfigList; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; +import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.apache.sqoop.test.utils.ParametrizedUtils; +import org.testng.ITest; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.lang.reflect.Method; + +/** + */ +public class IncrementalReadTest extends ConnectorTestCase implements ITest { + + public static Object[] COLUMNS = new Object [][] { + // column - last value - new max value + { "id", "9", "19"}, + { "version", "8.10", "13.10"}, + {"release_date", "2008-10-18", "2013-10-17"}, + }; + + private String checkColumn; + private String lastValue; + private String newMaxValue; + + @Factory(dataProvider="incremental-integration-test") + public IncrementalReadTest(String checkColumn, String lastValue, String newMaxValue) { + this.checkColumn = checkColumn; + this.lastValue = lastValue; + this.newMaxValue = newMaxValue; + } + + @DataProvider(name="incremental-integration-test", parallel=true) + public static Object[][] data() { + return Iterables.toArray(ParametrizedUtils.toArrayOfArrays(COLUMNS), Object[].class); + } + + @Test + public void testTable() throws Exception { + createAndLoadTableUbuntuReleases(); + + // RDBMS link + MLink rdbmsLink = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsLink); + saveLink(rdbmsLink); + + // HDFS link + MLink hdfsLink = getClient().createLink("hdfs-connector"); + saveLink(hdfsLink); + + // Job creation + MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId()); + + // Set the rdbms "FROM" config + MConfigList fromConfig = job.getJobConfig(Direction.FROM); + fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName())); + fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id")); + fromConfig.getStringInput("incrementalRead.checkColumn").setValue(provider.escapeColumnName(checkColumn)); + fromConfig.getStringInput("incrementalRead.lastValue").setValue(lastValue); + + // Fill hdfs "TO" config + fillHdfsToConfig(job, ToFormat.TEXT_FILE); + + saveJob(job); + + executeJob(job); + + // Assert correct output + assertTo( + "10,'Jaunty Jackalope',9.04,'2009-04-23',false", + "11,'Karmic Koala',9.10,'2009-10-29',false", + "12,'Lucid Lynx',10.04,'2010-04-29',true", + "13,'Maverick Meerkat',10.10,'2010-10-10',false", + "14,'Natty Narwhal',11.04,'2011-04-28',false", + "15,'Oneiric Ocelot',11.10,'2011-10-10',false", + "16,'Precise Pangolin',12.04,'2012-04-26',true", + "17,'Quantal Quetzal',12.10,'2012-10-18',false", + "18,'Raring Ringtail',13.04,'2013-04-25',false", + "19,'Saucy Salamander',13.10,'2013-10-17',false" + ); + + // TODO: After Sqoop will be properly updating configuration objects we need to verify new max value + + // Clean up testing table + dropTable(); + } + + @Test + public void testQuery() throws Exception { + createAndLoadTableUbuntuReleases(); + + // RDBMS link + MLink rdbmsLink = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsLink); + saveLink(rdbmsLink); + + // HDFS link + MLink hdfsLink = getClient().createLink("hdfs-connector"); + saveLink(hdfsLink); + + // Job creation + MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId()); + + String query = "SELECT * FROM " + provider.escapeTableName(getTableName()) + " WHERE ${CONDITIONS}"; + + // Set the rdbms "FROM" config + MConfigList fromConfig = job.getJobConfig(Direction.FROM); + fromConfig.getStringInput("fromJobConfig.sql").setValue(query); + fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id")); + fromConfig.getStringInput("incrementalRead.checkColumn").setValue(provider.escapeColumnName(checkColumn)); + fromConfig.getStringInput("incrementalRead.lastValue").setValue(lastValue); + + // Fill hdfs "TO" config + fillHdfsToConfig(job, ToFormat.TEXT_FILE); + + saveJob(job); + + executeJob(job); + + // Assert correct output + assertTo( + "10,'Jaunty Jackalope',9.04,'2009-04-23',false", + "11,'Karmic Koala',9.10,'2009-10-29',false", + "12,'Lucid Lynx',10.04,'2010-04-29',true", + "13,'Maverick Meerkat',10.10,'2010-10-10',false", + "14,'Natty Narwhal',11.04,'2011-04-28',false", + "15,'Oneiric Ocelot',11.10,'2011-10-10',false", + "16,'Precise Pangolin',12.04,'2012-04-26',true", + "17,'Quantal Quetzal',12.10,'2012-10-18',false", + "18,'Raring Ringtail',13.04,'2013-04-25',false", + "19,'Saucy Salamander',13.10,'2013-10-17',false" + ); + + // TODO: After Sqoop will be properly updating configuration objects we need to verify new max value + + // Clean up testing table + dropTable(); + } + + private String testName; + + @BeforeMethod(alwaysRun = true) + public void beforeMethod(Method aMethod) { + this.testName = aMethod.getName(); + } + + @Override + public String getTestName() { + return testName + "[" + checkColumn + "]"; + } +}