5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-07 05:11:58 +08:00

SQOOP-1805: Sqoop2: GenericJdbcConnector: Delta read support

(jarek Jarcec Cecho via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2015-03-16 10:13:05 -07:00
parent 3f618c9176
commit e6519c76c6
10 changed files with 561 additions and 81 deletions

View File

@ -85,6 +85,10 @@ public enum GenericJdbcConnectorError implements ErrorCode {
GENERIC_JDBC_CONNECTOR_0021("Schema column size do not match the result set column size"), GENERIC_JDBC_CONNECTOR_0021("Schema column size do not match the result set column size"),
GENERIC_JDBC_CONNECTOR_0022("Can't find maximal value of column"),
GENERIC_JDBC_CONNECTOR_0023("Received error from the database"),
; ;
private final String message; private final String message;

View File

@ -41,6 +41,8 @@ public final class GenericJdbcConnectorConstants {
PREFIX_CONNECTOR_JDBC_CONFIG + "partition.minvalue"; PREFIX_CONNECTOR_JDBC_CONFIG + "partition.minvalue";
public static final String CONNECTOR_JDBC_PARTITION_MAXVALUE = public static final String CONNECTOR_JDBC_PARTITION_MAXVALUE =
PREFIX_CONNECTOR_JDBC_CONFIG + "partition.maxvalue"; PREFIX_CONNECTOR_JDBC_CONFIG + "partition.maxvalue";
public static final String CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE =
PREFIX_CONNECTOR_JDBC_CONFIG + "incremental.last_value";
public static final String CONNECTOR_JDBC_FROM_DATA_SQL = public static final String CONNECTOR_JDBC_FROM_DATA_SQL =
PREFIX_CONNECTOR_JDBC_CONFIG + "from.data.sql"; PREFIX_CONNECTOR_JDBC_CONFIG + "from.data.sql";

View File

@ -72,6 +72,15 @@ public ResultSet executeQuery(String sql) {
} }
} }
public PreparedStatement createStatement(String sql) {
try {
return connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
} catch (SQLException e) {
logSQLException(e);
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
}
}
public void setAutoCommit(boolean autoCommit) { public void setAutoCommit(boolean autoCommit) {
try { try {
connection.setAutoCommit(autoCommit); connection.setAutoCommit(autoCommit);

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.sqoop.connector.jdbc; package org.apache.sqoop.connector.jdbc;
import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
@ -50,6 +51,8 @@ public void initialize(InitializerContext context, LinkConfiguration linkConfig,
try { try {
configurePartitionProperties(context.getContext(), linkConfig, fromJobConfig); configurePartitionProperties(context.getContext(), linkConfig, fromJobConfig);
configureTableProperties(context.getContext(), linkConfig, fromJobConfig); configureTableProperties(context.getContext(), linkConfig, fromJobConfig);
} catch(SQLException e) {
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e);
} finally { } finally {
executor.close(); executor.close();
} }
@ -124,108 +127,141 @@ private void configureJdbcProperties(MutableContext context, LinkConfiguration l
executor = new GenericJdbcExecutor(driver, url, username, password); executor = new GenericJdbcExecutor(driver, url, username, password);
} }
private void configurePartitionProperties(MutableContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) { private void configurePartitionProperties(MutableContext context, LinkConfiguration linkConfig, FromJobConfiguration jobConf) throws SQLException {
// ----- configure column name ----- // Assertions that should be valid (verified via validator)
assert (jobConf.fromJobConfig.tableName != null && jobConf.fromJobConfig.sql == null) ||
(jobConf.fromJobConfig.tableName == null && jobConf.fromJobConfig.sql != null);
assert (jobConf.fromJobConfig.boundaryQuery == null && jobConf.incrementalRead.checkColumn == null) ||
(jobConf.fromJobConfig.boundaryQuery != null && jobConf.incrementalRead.checkColumn == null) ||
(jobConf.fromJobConfig.boundaryQuery == null && jobConf.incrementalRead.checkColumn != null);
String partitionColumnName = fromJobConfig.fromJobConfig.partitionColumn; // We have few if/else conditions based on import type
boolean tableImport = jobConf.fromJobConfig.tableName != null;
boolean incrementalImport = jobConf.incrementalRead.checkColumn != null;
if (partitionColumnName == null) { // For generating queries
// if column is not specified by the user, StringBuilder sb = new StringBuilder();
// find the primary key of the fromTable (when there is a fromTable).
String tableName = fromJobConfig.fromJobConfig.tableName; // Partition column name
if (tableName != null) { String partitionColumnName = jobConf.fromJobConfig.partitionColumn;
partitionColumnName = executor.getPrimaryKey(tableName); // If it's not specified, we can use primary key of given table (if it's table based import)
} if (StringUtils.isBlank(partitionColumnName) && tableImport) {
partitionColumnName = executor.getPrimaryKey(jobConf.fromJobConfig.tableName);
} }
// If we don't have partition column name, we will error out
if (partitionColumnName != null) { if (partitionColumnName != null) {
context.setString( context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, partitionColumnName);
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
partitionColumnName);
} else { } else {
throw new SqoopException( throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0005);
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0005); }
LOG.info("Using partition column: " + partitionColumnName);
// From fragment for subsequent queries
String fromFragment;
if(tableImport) {
String tableName = jobConf.fromJobConfig.tableName;
String schemaName = jobConf.fromJobConfig.schemaName;
fromFragment = executor.delimitIdentifier(tableName);
if(schemaName != null) {
fromFragment = executor.delimitIdentifier(schemaName) + "." + fromFragment;
}
} else {
sb.setLength(0);
sb.append("(");
sb.append(jobConf.fromJobConfig.sql.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 1"));
sb.append(") ");
sb.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
fromFragment = sb.toString();
} }
// ----- configure column type, min value, and max value ----- // If this is incremental, then we need to get new maximal value and persist is a constant
String incrementalMaxValue = null;
if(incrementalImport) {
sb.setLength(0);
sb.append("SELECT ");
sb.append("MAX(").append(jobConf.incrementalRead.checkColumn).append(") ");
sb.append("FROM ");
sb.append(fromFragment);
String minMaxQuery = fromJobConfig.fromJobConfig.boundaryQuery; String incrementalNewMaxValueQuery = sb.toString();
LOG.info("Incremental new max value query: " + incrementalNewMaxValueQuery);
ResultSet rs = null;
try {
rs = executor.executeQuery(incrementalNewMaxValueQuery);
if (!rs.next()) {
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0022);
}
incrementalMaxValue = rs.getString(1);
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE, incrementalMaxValue);
LOG.info("New maximal value for incremental import is " + incrementalMaxValue);
} finally {
if(rs != null) {
rs.close();
}
}
}
// Retrieving min and max values for partition column
String minMaxQuery = jobConf.fromJobConfig.boundaryQuery;
if (minMaxQuery == null) { if (minMaxQuery == null) {
StringBuilder builder = new StringBuilder(); sb.setLength(0);
sb.append("SELECT ");
sb.append("MIN(").append(partitionColumnName).append("), ");
sb.append("MAX(").append(partitionColumnName).append(") ");
sb.append("FROM ").append(fromFragment).append(" ");
String schemaName = fromJobConfig.fromJobConfig.schemaName; if(incrementalImport) {
String tableName = fromJobConfig.fromJobConfig.tableName; sb.append("WHERE ");
String tableSql = fromJobConfig.fromJobConfig.sql; sb.append(jobConf.incrementalRead.checkColumn).append(" > ?");
sb.append(" AND ");
if (tableName != null && tableSql != null) { sb.append(jobConf.incrementalRead.checkColumn).append(" <= ?");
// when both fromTable name and fromTable sql are specified:
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
} else if (tableName != null) {
// when fromTable name is specified:
// For databases that support schemas (IE: postgresql).
String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
String column = partitionColumnName;
builder.append("SELECT MIN(");
builder.append(column);
builder.append("), MAX(");
builder.append(column);
builder.append(") FROM ");
builder.append(fullTableName);
} else if (tableSql != null) {
String column = executor.qualify(
partitionColumnName, GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
builder.append("SELECT MIN(");
builder.append(column);
builder.append("), MAX(");
builder.append(column);
builder.append(") FROM ");
builder.append("(");
builder.append(tableSql.replace(
GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 1"));
builder.append(") ");
builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
} else {
// when neither are specified:
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
} }
minMaxQuery = builder.toString(); minMaxQuery = sb.toString();
} }
LOG.info("Using min/max query: " + minMaxQuery);
PreparedStatement ps = null;
LOG.debug("Using minMaxQuery: " + minMaxQuery); ResultSet rs = null;
ResultSet rs = executor.executeQuery(minMaxQuery);
try { try {
ResultSetMetaData rsmd = rs.getMetaData(); ps = executor.createStatement(minMaxQuery);
if (rsmd.getColumnCount() != 2) { if (incrementalImport) {
throw new SqoopException( ps.setString(1, jobConf.incrementalRead.lastValue);
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006); ps.setString(2, incrementalMaxValue);
} }
rs.next(); rs = ps.executeQuery();
if(!rs.next()) {
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006);
}
int columnType = rsmd.getColumnType(1); // Boundaries for the job
String min = rs.getString(1); String min = rs.getString(1);
String max = rs.getString(2); String max = rs.getString(2);
LOG.info("Boundaries: min=" + min + ", max=" + max + ", columnType=" + columnType); // Type of the partition column
ResultSetMetaData rsmd = rs.getMetaData();
if (rsmd.getColumnCount() != 2) {
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006);
}
int columnType = rsmd.getColumnType(1);
LOG.info("Boundaries for the job: min=" + min + ", max=" + max + ", columnType=" + columnType);
context.setInteger(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, columnType); context.setInteger(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, columnType);
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, min); context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, min);
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, max); context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, max);
} finally {
} catch (SQLException e) { if(ps != null) {
throw new SqoopException( ps.close();
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006, e); }
if(rs != null) {
rs.close();
}
} }
} }

View File

@ -27,7 +27,10 @@
public class FromJobConfiguration { public class FromJobConfiguration {
@Config public FromJobConfig fromJobConfig; @Config public FromJobConfig fromJobConfig;
@Config public IncrementalRead incrementalRead;
public FromJobConfiguration() { public FromJobConfiguration() {
fromJobConfig = new FromJobConfig(); fromJobConfig = new FromJobConfig();
incrementalRead = new IncrementalRead();
} }
} }

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc.configuration;
import org.apache.sqoop.model.ConfigClass;
import org.apache.sqoop.model.Input;
import org.apache.sqoop.model.InputEditable;
import org.apache.sqoop.model.Validator;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.validators.AbstractValidator;
/**
*/
@ConfigClass(validators = {@Validator(IncrementalRead.ConfigValidator.class)})
public class IncrementalRead {
@Input(size = 50)
public String checkColumn;
@Input(editable = InputEditable.ANY)
public String lastValue;
public static class ConfigValidator extends AbstractValidator<IncrementalRead> {
@Override
public void validate(IncrementalRead conf) {
if(conf.checkColumn != null && conf.lastValue == null) {
addMessage(Status.ERROR, "Last value is required during incremental read");
}
if(conf.checkColumn == null && conf.lastValue != null) {
addMessage(Status.ERROR, "Last value can't be filled without check column.");
}
}
}
}

View File

@ -112,9 +112,13 @@ toJobConfig.stageTableName.help = Name of the staging table to use (Optional)
toJobConfig.shouldClearStageTable.label = Should clear stage table toJobConfig.shouldClearStageTable.label = Should clear stage table
toJobConfig.shouldClearStageTable.help = Indicate if the stage table should be cleared (Defaults to false) toJobConfig.shouldClearStageTable.help = Indicate if the stage table should be cleared (Defaults to false)
# Placeholders to have some entities created # Incremental related configuration
ignored.label = Ignored incrementalRead.label = Incremental read
ignored.help = This is completely ignored incrementalRead.help = Configuration related to incremental read
incrementalRead.checkColumn.label = Check column
incrementalRead.checkColumn.help = Column that is checked during incremental read for new values
incrementalRead.lastValue.label = Last value
incrementalRead.lastValue.help = Last read value, fetch will resume with higher values
ignored.ignored.label = Ignored
ignored.ignored.help = This is completely ignored

View File

@ -140,6 +140,66 @@ public void testTableName() throws Exception {
String.valueOf(START+NUMBER_OF_ROWS-1)); String.valueOf(START+NUMBER_OF_ROWS-1));
} }
@Test
@SuppressWarnings("unchecked")
public void testIncrementalTableNameFullRange() throws Exception {
LinkConfiguration linkConfig = new LinkConfiguration();
FromJobConfiguration jobConfig = new FromJobConfiguration();
linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
jobConfig.fromJobConfig.tableName = schemalessTableName;
jobConfig.incrementalRead.checkColumn = "ICOL";
jobConfig.incrementalRead.lastValue = "-51";
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcFromInitializer();
initializer.initialize(initializerContext, linkConfig, jobConfig);
verifyResult(context,
"SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
"ICOL,DCOL,VCOL",
"ICOL",
String.valueOf(Types.INTEGER),
String.valueOf(START),
String.valueOf(START+NUMBER_OF_ROWS-1));
assertEquals(String.valueOf(START+NUMBER_OF_ROWS-1), context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE));
}
@Test
@SuppressWarnings("unchecked")
public void testIncrementalTableNameFromZero() throws Exception {
LinkConfiguration linkConfig = new LinkConfiguration();
FromJobConfiguration jobConfig = new FromJobConfiguration();
linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
jobConfig.fromJobConfig.tableName = schemalessTableName;
jobConfig.incrementalRead.checkColumn = "ICOL";
jobConfig.incrementalRead.lastValue = "0";
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcFromInitializer();
initializer.initialize(initializerContext, linkConfig, jobConfig);
verifyResult(context,
"SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
"ICOL,DCOL,VCOL",
"ICOL",
String.valueOf(Types.INTEGER),
String.valueOf(1),
String.valueOf(START+NUMBER_OF_ROWS-1));
assertEquals(String.valueOf(START+NUMBER_OF_ROWS-1), context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE));
}
@Test @Test
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testTableNameWithTableColumns() throws Exception { public void testTableNameWithTableColumns() throws Exception {
@ -196,6 +256,66 @@ public void testTableSql() throws Exception {
String.valueOf((double)(START+NUMBER_OF_ROWS-1))); String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
} }
@Test
@SuppressWarnings("unchecked")
public void testIncrementalTableSqlFullRange() throws Exception {
LinkConfiguration linkConfig = new LinkConfiguration();
FromJobConfiguration jobConfig = new FromJobConfiguration();
linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
jobConfig.fromJobConfig.sql = schemalessTableSql;
jobConfig.fromJobConfig.partitionColumn = "ICOL";
jobConfig.incrementalRead.checkColumn = "ICOL";
jobConfig.incrementalRead.lastValue = "-51";
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcFromInitializer();
initializer.initialize(initializerContext, linkConfig, jobConfig);
verifyResult(context,
"SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
"ICOL,DCOL,VCOL",
"ICOL",
String.valueOf(Types.INTEGER),
String.valueOf(START),
String.valueOf((START+NUMBER_OF_ROWS-1)));
assertEquals(String.valueOf(START+NUMBER_OF_ROWS-1), context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE));
}
@Test
@SuppressWarnings("unchecked")
public void testIncrementalTableSqlFromZero() throws Exception {
LinkConfiguration linkConfig = new LinkConfiguration();
FromJobConfiguration jobConfig = new FromJobConfiguration();
linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
jobConfig.fromJobConfig.sql = schemalessTableSql;
jobConfig.fromJobConfig.partitionColumn = "ICOL";
jobConfig.incrementalRead.checkColumn = "ICOL";
jobConfig.incrementalRead.lastValue = "0";
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
@SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcFromInitializer();
initializer.initialize(initializerContext, linkConfig, jobConfig);
verifyResult(context,
"SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
"ICOL,DCOL,VCOL",
"ICOL",
String.valueOf(Types.INTEGER),
String.valueOf(1),
String.valueOf((START+NUMBER_OF_ROWS-1)));
assertEquals(String.valueOf(START+NUMBER_OF_ROWS-1), context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE));
}
@Test @Test
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testTableSqlWithTableColumns() throws Exception { public void testTableSqlWithTableColumns() throws Exception {

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.model.ConfigUtils;
import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MInput;
import org.testng.annotations.Test;
import java.util.List;
import java.util.Locale;
import java.util.ResourceBundle;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
/**
*/
public class TestGenericJdbcConnector {
@Test
public void testBundleForLink() {
GenericJdbcConnector connector = new GenericJdbcConnector();
verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getLinkConfigurationClass());
}
@Test
void testBundleForJobToDirection() {
GenericJdbcConnector connector = new GenericJdbcConnector();
verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getJobConfigurationClass(Direction.TO));
}
@Test
void testBundleForJobFromDirection() {
GenericJdbcConnector connector = new GenericJdbcConnector();
verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getJobConfigurationClass(Direction.FROM));
}
void verifyBundleForConfigClass(ResourceBundle bundle, Class klass) {
assertNotNull(bundle);
assertNotNull(klass);
List<MConfig> configs = ConfigUtils.toConfigs(klass);
for(MConfig config : configs) {
assertNotNull(config.getHelpKey());
assertNotNull(config.getLabelKey());
assertTrue(bundle.containsKey(config.getHelpKey()), "Can't find help for " + config.getName());
assertTrue(bundle.containsKey(config.getLabelKey()), "Can't find label for " + config.getName());
for(MInput input : config.getInputs()) {
assertNotNull(input.getHelpKey());
assertNotNull(input.getLabelKey());
assertTrue(bundle.containsKey(input.getHelpKey()), "Can't find help for " + input.getName());
assertTrue(bundle.containsKey(input.getLabelKey()), "Can't find label for " + input.getName());
}
}
}
}

View File

@ -0,0 +1,176 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.connector.jdbc.generic;
import com.google.common.collect.Iterables;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.utils.ParametrizedUtils;
import org.testng.ITest;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import java.lang.reflect.Method;
/**
*/
public class IncrementalReadTest extends ConnectorTestCase implements ITest {
public static Object[] COLUMNS = new Object [][] {
// column - last value - new max value
{ "id", "9", "19"},
{ "version", "8.10", "13.10"},
{"release_date", "2008-10-18", "2013-10-17"},
};
private String checkColumn;
private String lastValue;
private String newMaxValue;
@Factory(dataProvider="incremental-integration-test")
public IncrementalReadTest(String checkColumn, String lastValue, String newMaxValue) {
this.checkColumn = checkColumn;
this.lastValue = lastValue;
this.newMaxValue = newMaxValue;
}
@DataProvider(name="incremental-integration-test", parallel=true)
public static Object[][] data() {
return Iterables.toArray(ParametrizedUtils.toArrayOfArrays(COLUMNS), Object[].class);
}
@Test
public void testTable() throws Exception {
createAndLoadTableUbuntuReleases();
// RDBMS link
MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
fillRdbmsLinkConfig(rdbmsLink);
saveLink(rdbmsLink);
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
saveLink(hdfsLink);
// Job creation
MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId());
// Set the rdbms "FROM" config
MConfigList fromConfig = job.getJobConfig(Direction.FROM);
fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName()));
fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
fromConfig.getStringInput("incrementalRead.checkColumn").setValue(provider.escapeColumnName(checkColumn));
fromConfig.getStringInput("incrementalRead.lastValue").setValue(lastValue);
// Fill hdfs "TO" config
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
saveJob(job);
executeJob(job);
// Assert correct output
assertTo(
"10,'Jaunty Jackalope',9.04,'2009-04-23',false",
"11,'Karmic Koala',9.10,'2009-10-29',false",
"12,'Lucid Lynx',10.04,'2010-04-29',true",
"13,'Maverick Meerkat',10.10,'2010-10-10',false",
"14,'Natty Narwhal',11.04,'2011-04-28',false",
"15,'Oneiric Ocelot',11.10,'2011-10-10',false",
"16,'Precise Pangolin',12.04,'2012-04-26',true",
"17,'Quantal Quetzal',12.10,'2012-10-18',false",
"18,'Raring Ringtail',13.04,'2013-04-25',false",
"19,'Saucy Salamander',13.10,'2013-10-17',false"
);
// TODO: After Sqoop will be properly updating configuration objects we need to verify new max value
// Clean up testing table
dropTable();
}
@Test
public void testQuery() throws Exception {
createAndLoadTableUbuntuReleases();
// RDBMS link
MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
fillRdbmsLinkConfig(rdbmsLink);
saveLink(rdbmsLink);
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
saveLink(hdfsLink);
// Job creation
MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId());
String query = "SELECT * FROM " + provider.escapeTableName(getTableName()) + " WHERE ${CONDITIONS}";
// Set the rdbms "FROM" config
MConfigList fromConfig = job.getJobConfig(Direction.FROM);
fromConfig.getStringInput("fromJobConfig.sql").setValue(query);
fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
fromConfig.getStringInput("incrementalRead.checkColumn").setValue(provider.escapeColumnName(checkColumn));
fromConfig.getStringInput("incrementalRead.lastValue").setValue(lastValue);
// Fill hdfs "TO" config
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
saveJob(job);
executeJob(job);
// Assert correct output
assertTo(
"10,'Jaunty Jackalope',9.04,'2009-04-23',false",
"11,'Karmic Koala',9.10,'2009-10-29',false",
"12,'Lucid Lynx',10.04,'2010-04-29',true",
"13,'Maverick Meerkat',10.10,'2010-10-10',false",
"14,'Natty Narwhal',11.04,'2011-04-28',false",
"15,'Oneiric Ocelot',11.10,'2011-10-10',false",
"16,'Precise Pangolin',12.04,'2012-04-26',true",
"17,'Quantal Quetzal',12.10,'2012-10-18',false",
"18,'Raring Ringtail',13.04,'2013-04-25',false",
"19,'Saucy Salamander',13.10,'2013-10-17',false"
);
// TODO: After Sqoop will be properly updating configuration objects we need to verify new max value
// Clean up testing table
dropTable();
}
private String testName;
@BeforeMethod(alwaysRun = true)
public void beforeMethod(Method aMethod) {
this.testName = aMethod.getName();
}
@Override
public String getTestName() {
return testName + "[" + checkColumn + "]";
}
}