5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 01:00:00 +08:00

SQOOP-41. Add support for incremental imports.

Modify ImportTool, SessionTool, to support incremental imports.
Add TestIncrementalImport to unit test incremental imports.
SqoopOptions now implements Cloneable.
SQOOP-44. Bugfix in ClassWriter: fix NPE if the case of column names specified
with --columns do not match the case reported by the database.

From: Aaron Kimball <aaron@cloudera.com>

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149944 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Bayer 2011-07-22 20:04:07 +00:00
parent 41cee93c40
commit 6efcec0da2
17 changed files with 1463 additions and 23 deletions

View File

@ -843,7 +843,7 @@
outputFile="${findbugs.output.xml.file}" effort="max"
excludeFilter="${findbugs.excludes}">
<auxClasspath>
<path refid="compile.classpath"/>
<path refid="test.classpath"/>
</auxClasspath>
<sourcePath path="${src.dir}" />
<sourcePath path="${test.dir}" />

View File

@ -22,6 +22,7 @@
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
@ -31,9 +32,9 @@
import com.cloudera.sqoop.lib.LargeObjectLoader;
/**
* Command-line arguments used by Sqoop.
* Configurable state used by Sqoop tools.
*/
public class SqoopOptions {
public class SqoopOptions implements Cloneable {
public static final Log LOG = LogFactory.getLog(SqoopOptions.class.getName());
@ -73,6 +74,22 @@ public enum FileLayout {
SequenceFile
}
/**
* Incremental imports support two modes:
* <ul>
* <li>new rows being appended to the end of a table with an
* incrementing id</li>
* <li>new data results in a date-last-modified column being
* updated to NOW(); Sqoop will pull all dirty rows in the next
* incremental import.</li>
* </ul>
*/
public enum IncrementalMode {
None,
AppendRows,
DateLastModified,
}
// TODO(aaron): Adding something here? Add a setter and a getter.
// Add a default value in initDefaults() if you need one. If this value
@ -149,6 +166,26 @@ public enum FileLayout {
private String hbaseRowKeyCol; // Column of the input to use as the row key.
private boolean hbaseCreateTable; // if true, create tables/col families.
// col to filter on for incremental imports.
private String incrementalTestCol;
// incremental import mode we're using.
private IncrementalMode incrementalMode;
// What was the last-imported value of incrementalTestCol?
private String incrementalLastValue;
// These next two fields are not serialized to the metastore.
// If this SqoopOptions is created by reading a saved session, these will
// be populated by the SessionStorage to facilitate updating the same
// session.
private String sessionName;
private Map<String, String> sessionStorageDescriptor;
// If we restore a session and then allow the user to apply arguments on
// top, we retain the version without the arguments in a reference to the
// 'parent' SqoopOptions instance, here.
private SqoopOptions parent;
public SqoopOptions() {
initDefaults(null);
}
@ -356,10 +393,12 @@ public void loadProperties(Properties props) {
this.targetDir);
this.append = getBooleanProperty(props, "hdfs.append.dir", this.append);
String fileFmtStr = props.getProperty("hdfs.file.format", "text");
if (fileFmtStr.equals("seq")) {
this.layout = FileLayout.SequenceFile;
} else {
try {
this.layout = FileLayout.valueOf(
props.getProperty("hdfs.file.format", this.layout.toString()));
} catch (IllegalArgumentException iae) {
LOG.warn("Unsupported file format: "
+ props.getProperty("hdfs.file.format", null) + "; setting to text");
this.layout = FileLayout.TextFile;
}
@ -410,6 +449,20 @@ public void loadProperties(Properties props) {
this.hbaseRowKeyCol);
this.hbaseCreateTable = getBooleanProperty(props, "hbase.create.table",
this.hbaseCreateTable);
try {
this.incrementalMode = IncrementalMode.valueOf(props.getProperty(
"incremental.mode", this.incrementalMode.toString()));
} catch (IllegalArgumentException iae) {
LOG.warn("Invalid incremental import type: "
+ props.getProperty("incremental.mode", null) + "; setting to None");
this.incrementalMode = IncrementalMode.None;
}
this.incrementalTestCol = props.getProperty("incremental.col",
this.incrementalTestCol);
this.incrementalLastValue = props.getProperty("incremental.last.value",
this.incrementalLastValue);
}
/**
@ -448,11 +501,7 @@ public Properties writeProperties() {
putProperty(props, "hdfs.warehouse.dir", this.warehouseDir);
putProperty(props, "hdfs.target.dir", this.targetDir);
putProperty(props, "hdfs.append.dir", Boolean.toString(this.append));
if (this.layout == FileLayout.SequenceFile) {
putProperty(props, "hdfs.file.format", "seq");
} else {
putProperty(props, "hdfs.file.format", "text");
}
putProperty(props, "hdfs.file.format", this.layout.toString());
putProperty(props, "direct.import", Boolean.toString(this.direct));
putProperty(props, "hive.import", Boolean.toString(this.hiveImport));
putProperty(props, "hive.overwrite.table",
@ -482,9 +531,48 @@ public Properties writeProperties() {
putProperty(props, "hbase.create.table",
Boolean.toString(this.hbaseCreateTable));
putProperty(props, "incremental.mode", this.incrementalMode.toString());
putProperty(props, "incremental.col", this.incrementalTestCol);
putProperty(props, "incremental.last.value", this.incrementalLastValue);
return props;
}
@Override
public Object clone() {
try {
SqoopOptions other = (SqoopOptions) super.clone();
if (null != columns) {
other.columns = Arrays.copyOf(columns, columns.length);
}
if (null != dbOutColumns) {
other.dbOutColumns = Arrays.copyOf(dbOutColumns, dbOutColumns.length);
}
if (null != inputDelimiters) {
other.inputDelimiters = (DelimiterSet) inputDelimiters.clone();
}
if (null != outputDelimiters) {
other.outputDelimiters = (DelimiterSet) outputDelimiters.clone();
}
if (null != conf) {
other.conf = new Configuration(conf);
}
if (null != extraArgs) {
other.extraArgs = Arrays.copyOf(extraArgs, extraArgs.length);
}
return other;
} catch (CloneNotSupportedException cnse) {
// Shouldn't happen.
return null;
}
}
/**
* @return the temp directory to use; this is guaranteed to end with
* the file separator character (e.g., '/').
@ -536,6 +624,8 @@ private void initDefaults(Configuration baseConfiguration) {
this.extraArgs = null;
this.dbOutColumns = null;
this.incrementalMode = IncrementalMode.None;
}
/**
@ -1312,5 +1402,93 @@ public String getHBaseTable() {
public void setHBaseTable(String table) {
this.hbaseTable = table;
}
/**
* Set the column of the import source table to check for incremental import
* state.
*/
public void setIncrementalTestColumn(String colName) {
this.incrementalTestCol = colName;
}
/**
* Return the name of the column of the import source table
* to check for incremental import state.
*/
public String getIncrementalTestColumn() {
return this.incrementalTestCol;
}
/**
* Set the incremental import mode to use.
*/
public void setIncrementalMode(IncrementalMode mode) {
this.incrementalMode = mode;
}
/**
* Get the incremental import mode to use.
*/
public IncrementalMode getIncrementalMode() {
return this.incrementalMode;
}
/**
* Set the last imported value of the incremental import test column.
*/
public void setIncrementalLastValue(String lastVal) {
this.incrementalLastValue = lastVal;
}
/**
* Get the last imported value of the incremental import test column.
*/
public String getIncrementalLastValue() {
return this.incrementalLastValue;
}
/**
* Set the name of the saved session this SqoopOptions belongs to.
*/
public void setSessionName(String session) {
this.sessionName = session;
}
/**
* Get the name of the saved session this SqoopOptions belongs to.
*/
public String getSessionName() {
return this.sessionName;
}
/**
* Set the SessionStorage descriptor used to open the saved session
* this SqoopOptions belongs to.
*/
public void setStorageDescriptor(Map<String, String> descriptor) {
this.sessionStorageDescriptor = descriptor;
}
/**
* Get the SessionStorage descriptor used to open the saved session
* this SqoopOptions belongs to.
*/
public Map<String, String> getStorageDescriptor() {
return this.sessionStorageDescriptor;
}
/**
* Return the parent instance this SqoopOptions is derived from.
*/
public SqoopOptions getParent() {
return this.parent;
}
/**
* Set the parent instance this SqoopOptions is derived from.
*/
public void setParent(SqoopOptions options) {
this.parent = options;
}
}

View File

@ -22,6 +22,7 @@
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Map;
import org.apache.commons.logging.Log;
@ -200,5 +201,22 @@ public void updateTable(ExportJobContext context)
* to close.
*/
public abstract void release();
/**
* Return the current time from the perspective of the database server.
* Return null if this cannot be accessed.
*/
public Timestamp getCurrentDbTimestamp() {
LOG.warn("getCurrentDbTimestamp(): Using local system timestamp.");
return new Timestamp(System.currentTimeMillis());
}
/**
* Given a non-null Timestamp, return the quoted string that can
* be inserted into a SQL statement, representing that timestamp.
*/
public String timestampToQueryString(Timestamp ts) {
return "'" + ts + "'";
}
}

View File

@ -52,4 +52,14 @@ public String[] listDatabases() {
String [] databases = {HSQL_SCHEMA_NAME};
return databases;
}
@Override
/**
* {@inheritDoc}
*/
protected String getCurTimestampQuery() {
// HSQLDB requires that you select from a table; this table is
// guaranteed to exist.
return "SELECT CURRENT_TIMESTAMP FROM INFORMATION_SCHEMA.SYSTEM_TABLES";
}
}

View File

@ -23,6 +23,7 @@
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.lang.reflect.Method;
@ -456,5 +457,15 @@ protected void finalize() throws Throwable {
close();
super.finalize();
}
@Override
protected String getCurTimestampQuery() {
return "SELECT SYSDATE FROM dual";
}
@Override
public String timestampToQueryString(Timestamp ts) {
return "TO_TIMESTAMP('" + ts + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
}
}

View File

@ -18,6 +18,8 @@
package com.cloudera.sqoop.manager;
import java.sql.Timestamp;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.hive.HiveTypes;
import com.cloudera.sqoop.lib.BlobRef;
@ -616,8 +618,9 @@ public void release() {
}
}
@Override
/**
* @{inheritDoc}
* {@inheritDoc}
*/
public void updateTable(ExportJobContext context)
throws IOException, ExportException {
@ -625,4 +628,51 @@ public void updateTable(ExportJobContext context)
JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context);
exportJob.runExport();
}
/**
* @return a SQL query to retrieve the current timestamp from the db.
*/
protected String getCurTimestampQuery() {
return "SELECT CURRENT_TIMESTAMP()";
}
@Override
/**
* {@inheritDoc}
*/
public Timestamp getCurrentDbTimestamp() {
release(); // Release any previous ResultSet.
Statement s = null;
ResultSet rs = null;
try {
Connection c = getConnection();
s = c.createStatement();
rs = s.executeQuery(getCurTimestampQuery());
if (rs == null || !rs.next()) {
return null; // empty ResultSet.
}
return rs.getTimestamp(1);
} catch (SQLException sqlE) {
LOG.warn("SQL exception accessing current timestamp: " + sqlE);
return null;
} finally {
try {
if (null != rs) {
rs.close();
}
} catch (SQLException sqlE) {
LOG.warn("SQL Exception closing resultset: " + sqlE);
}
try {
if (null != s) {
s.close();
}
} catch (SQLException sqlE) {
LOG.warn("SQL Exception closing statement: " + sqlE);
}
}
}
}

View File

@ -29,8 +29,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;

View File

@ -106,6 +106,7 @@ public void open(Map<String, String> descriptor) throws IOException {
setMetastoreUser(conf.get(AUTO_STORAGE_USER_KEY, DEFAULT_AUTO_USER));
setMetastorePassword(conf.get(AUTO_STORAGE_PASS_KEY,
DEFAULT_AUTO_PASSWORD));
setConnectedDescriptor(descriptor);
init();
}

View File

@ -117,6 +117,7 @@ public class HsqldbSessionStorage extends SessionStorage {
private static final String SQOOP_TOOL_KEY = "sqoop.tool";
private Map<String, String> connectedDescriptor;
private String metastoreConnectStr;
private String metastoreUser;
private String metastorePassword;
@ -144,6 +145,13 @@ protected void setMetastorePassword(String pass) {
private static final String DB_DRIVER_CLASS = "org.hsqldb.jdbcDriver";
/**
* Set the descriptor used to open() this storage.
*/
protected void setConnectedDescriptor(Map<String, String> descriptor) {
this.connectedDescriptor = descriptor;
}
@Override
/**
* Initialize the connection to the database.
@ -152,6 +160,7 @@ public void open(Map<String, String> descriptor) throws IOException {
setMetastoreConnectStr(descriptor.get(META_CONNECT_KEY));
setMetastoreUser(descriptor.get(META_USERNAME_KEY));
setMetastorePassword(descriptor.get(META_PASSWORD_KEY));
setConnectedDescriptor(descriptor);
init();
}
@ -293,6 +302,10 @@ public SessionData read(String sessionName) throws IOException {
opts.setConf(conf);
opts.loadProperties(sqoopOptProps);
// Set the session connection information for this session.
opts.setSessionName(sessionName);
opts.setStorageDescriptor(connectedDescriptor);
return new SessionData(opts, tool);
} catch (SQLException sqlE) {
throw new IOException("Error communicating with database", sqlE);

View File

@ -909,6 +909,23 @@ public void generate() throws IOException {
colNames = connManager.getColumnNamesForQuery(
this.options.getSqlQuery());
}
} else {
// These column names were provided by the user. They may not be in
// the same case as the keys in the columnTypes map. So make sure
// we add the appropriate aliases in that map.
for (String userColName : colNames) {
for (Map.Entry<String, Integer> typeEntry : columnTypes.entrySet()) {
String typeColName = typeEntry.getKey();
if (typeColName.equalsIgnoreCase(userColName)
&& !typeColName.equals(userColName)) {
// We found the correct-case equivalent.
columnTypes.put(userColName, typeEntry.getValue());
// No need to continue iteration; only one could match.
// Also, the use of put() just invalidated the iterator.
break;
}
}
}
}
// Translate all the column names into names that are safe to

View File

@ -23,10 +23,7 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
@ -42,7 +39,6 @@
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.util.FileListing;
import com.cloudera.sqoop.shims.HadoopShim;
import com.cloudera.sqoop.util.Jars;

View File

@ -114,6 +114,11 @@ public abstract class BaseSqoopTool extends SqoopTool {
public static final String HELP_ARG = "help";
public static final String UPDATE_KEY_ARG = "update-key";
// Arguments for incremental imports.
public static final String INCREMENT_TYPE_ARG = "incremental";
public static final String INCREMENT_COL_ARG = "check-column";
public static final String INCREMENT_LAST_VAL_ARG = "last-value";
// HBase arguments.
public static final String HBASE_TABLE_ARG = "hbase-table";
public static final String HBASE_COL_FAM_ARG = "column-family";

View File

@ -19,7 +19,16 @@
package com.cloudera.sqoop.tool;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.OptionBuilder;
@ -35,6 +44,10 @@
import com.cloudera.sqoop.cli.ToolOptions;
import com.cloudera.sqoop.hive.HiveImport;
import com.cloudera.sqoop.manager.ImportJobContext;
import com.cloudera.sqoop.metastore.SessionData;
import com.cloudera.sqoop.metastore.SessionStorage;
import com.cloudera.sqoop.metastore.SessionStorageFactory;
import com.cloudera.sqoop.util.AppendUtils;
import com.cloudera.sqoop.util.ImportException;
import org.apache.hadoop.fs.Path;
@ -76,8 +89,239 @@ protected boolean init(SqoopOptions sqoopOpts) {
public List<String> getGeneratedJarFiles() {
return this.codeGenerator.getGeneratedJarFiles();
}
/**
* @return true if the supplied options specify an incremental import.
*/
private boolean isIncremental(SqoopOptions options) {
return !options.getIncrementalMode().equals(
SqoopOptions.IncrementalMode.None);
}
protected void importTable(SqoopOptions options, String tableName,
/**
* If this is an incremental import, then we should save the
* user's state back to the metastore (if this session was run
* from the metastore). Otherwise, log to the user what data
* they need to supply next time.
*/
private void saveIncrementalState(SqoopOptions options)
throws IOException {
if (!isIncremental(options)) {
return;
}
Map<String, String> descriptor = options.getStorageDescriptor();
String sessionName = options.getSessionName();
if (null != sessionName && null != descriptor) {
// Actually save it back to the metastore.
LOG.info("Saving incremental import state to the metastore");
SessionStorageFactory ssf = new SessionStorageFactory(options.getConf());
SessionStorage storage = ssf.getSessionStorage(descriptor);
storage.open(descriptor);
try {
// Save the 'parent' SqoopOptions; this does not contain the mutations
// to the SqoopOptions state that occurred over the course of this
// execution, except for the one we specifically want to memorize:
// the latest value of the check column.
SessionData data = new SessionData(options.getParent(), this);
storage.update(sessionName, data);
LOG.info("Updated data for session: " + sessionName);
} finally {
storage.close();
}
} else {
// If there wasn't a parent SqoopOptions, then the incremental
// state data was stored in the current SqoopOptions.
LOG.info("Incremental import complete! To run another incremental "
+ "import of all data following this import, supply the "
+ "following arguments:");
SqoopOptions.IncrementalMode incrementalMode =
options.getIncrementalMode();
switch (incrementalMode) {
case AppendRows:
LOG.info(" --incremental append");
break;
case DateLastModified:
LOG.info(" --incremental lastmodified");
break;
default:
LOG.warn("Undefined incremental mode: " + incrementalMode);
break;
}
LOG.info(" --check-column " + options.getIncrementalTestColumn());
LOG.info(" --last-value " + options.getIncrementalLastValue());
LOG.info("(Consider saving this with 'sqoop session --create')");
}
}
/**
* Return the max value in the incremental-import test column. This
* value must be numeric.
*/
private BigDecimal getMaxColumnId(SqoopOptions options) throws SQLException {
StringBuilder sb = new StringBuilder();
sb.append("SELECT MAX(");
sb.append(options.getIncrementalTestColumn());
sb.append(") FROM ");
sb.append(options.getTableName());
String where = options.getWhereClause();
if (null != where) {
sb.append(" WHERE ");
sb.append(where);
}
Connection conn = manager.getConnection();
Statement s = null;
ResultSet rs = null;
try {
s = conn.createStatement();
rs = s.executeQuery(sb.toString());
if (!rs.next()) {
// This probably means the table is empty.
LOG.warn("Unexpected: empty results for max value query?");
return null;
}
return rs.getBigDecimal(1);
} finally {
try {
if (null != rs) {
rs.close();
}
} catch (SQLException sqlE) {
LOG.warn("SQL Exception closing resultset: " + sqlE);
}
try {
if (null != s) {
s.close();
}
} catch (SQLException sqlE) {
LOG.warn("SQL Exception closing statement: " + sqlE);
}
}
}
/**
* Initialize the constraints which set the incremental import range.
* @return false if an import is not necessary, because the dataset has not
* changed.
*/
private boolean initIncrementalConstraints(SqoopOptions options,
ImportJobContext context) throws ImportException, IOException {
// If this is an incremental import, determine the constraints
// to inject in the WHERE clause or $CONDITIONS for a query.
// Also modify the 'last value' field of the SqoopOptions to
// specify the current job start time / start row.
if (!isIncremental(options)) {
return true;
}
SqoopOptions.IncrementalMode incrementalMode = options.getIncrementalMode();
String nextIncrementalValue = null;
switch (incrementalMode) {
case AppendRows:
try {
BigDecimal nextVal = getMaxColumnId(options);
if (null != nextVal) {
nextIncrementalValue = nextVal.toString();
}
} catch (SQLException sqlE) {
throw new IOException(sqlE);
}
break;
case DateLastModified:
Timestamp dbTimestamp = manager.getCurrentDbTimestamp();
if (null == dbTimestamp) {
throw new IOException("Could not get current time from database");
}
nextIncrementalValue = manager.timestampToQueryString(dbTimestamp);
break;
default:
throw new ImportException("Undefined incremental import type: "
+ incrementalMode);
}
// Build the WHERE clause components that are used to import
// only this incremental section.
StringBuilder sb = new StringBuilder();
String prevEndpoint = options.getIncrementalLastValue();
String checkColName = manager.escapeColName(
options.getIncrementalTestColumn());
LOG.info("Incremental import based on column " + checkColName);
if (null != prevEndpoint) {
if (prevEndpoint.equals(nextIncrementalValue)) {
LOG.info("No new rows detected since last import.");
return false;
}
LOG.info("Lower bound value: " + prevEndpoint);
sb.append(checkColName);
switch (incrementalMode) {
case AppendRows:
sb.append(" > ");
break;
case DateLastModified:
sb.append(" >= ");
break;
default:
throw new ImportException("Undefined comparison");
}
sb.append(prevEndpoint);
sb.append(" AND ");
}
if (null != nextIncrementalValue) {
sb.append(checkColName);
switch (incrementalMode) {
case AppendRows:
sb.append(" <= ");
break;
case DateLastModified:
sb.append(" < ");
break;
default:
throw new ImportException("Undefined comparison");
}
sb.append(nextIncrementalValue);
} else {
sb.append(checkColName);
sb.append(" IS NULL ");
}
LOG.info("Upper bound value: " + nextIncrementalValue);
String prevWhereClause = options.getWhereClause();
if (null != prevWhereClause) {
sb.append(" AND (");
sb.append(prevWhereClause);
sb.append(")");
}
String newConstraints = sb.toString();
options.setWhereClause(newConstraints);
// Save this state for next time.
SqoopOptions recordOptions = options.getParent();
if (null == recordOptions) {
recordOptions = options;
}
recordOptions.setIncrementalLastValue(nextIncrementalValue);
return true;
}
/**
* Import a table or query.
* @return true if an import was performed, false otherwise.
*/
protected boolean importTable(SqoopOptions options, String tableName,
HiveImport hiveImport) throws IOException, ImportException {
String jarFile = null;
@ -88,6 +332,12 @@ protected void importTable(SqoopOptions options, String tableName,
ImportJobContext context = new ImportJobContext(tableName, jarFile,
options, getOutputPath(options, tableName));
// If we're doing an incremental import, set up the
// filtering conditions used to get the latest records.
if (!initIncrementalConstraints(options, context)) {
return false;
}
if (null != tableName) {
manager.importTable(context);
} else {
@ -103,6 +353,10 @@ protected void importTable(SqoopOptions options, String tableName,
if (options.doHiveImport()) {
hiveImport.importTable(tableName, options.getHiveTableName(), false);
}
saveIncrementalState(options);
return true;
}
/**
@ -264,12 +518,42 @@ protected RelatedOptions getImportOptions() {
return importOpts;
}
/**
* Return options for incremental import.
*/
protected RelatedOptions getIncrementalOptions() {
RelatedOptions incrementalOpts =
new RelatedOptions("Incremental import arguments");
incrementalOpts.addOption(OptionBuilder.withArgName("import-type")
.hasArg()
.withDescription(
"Define an incremental import of type 'append' or 'lastmodified'")
.withLongOpt(INCREMENT_TYPE_ARG)
.create());
incrementalOpts.addOption(OptionBuilder.withArgName("column")
.hasArg()
.withDescription("Source column to check for incremental change")
.withLongOpt(INCREMENT_COL_ARG)
.create());
incrementalOpts.addOption(OptionBuilder.withArgName("value")
.hasArg()
.withDescription("Last imported value in the incremental check column")
.withLongOpt(INCREMENT_LAST_VAL_ARG)
.create());
return incrementalOpts;
}
@Override
/** Configure the command-line arguments we expect to receive */
public void configureOptions(ToolOptions toolOptions) {
toolOptions.addUniqueOptions(getCommonOptions());
toolOptions.addUniqueOptions(getImportOptions());
if (!allTables) {
toolOptions.addUniqueOptions(getIncrementalOptions());
}
toolOptions.addUniqueOptions(getOutputFormatOptions());
toolOptions.addUniqueOptions(getInputFormatOptions());
toolOptions.addUniqueOptions(getHiveOptions(true));
@ -306,6 +590,32 @@ public void printHelp(ToolOptions toolOptions) {
"after a '--' on the command line.");
}
private void applyIncrementalOptions(CommandLine in, SqoopOptions out)
throws InvalidOptionsException {
if (in.hasOption(INCREMENT_TYPE_ARG)) {
String incrementalTypeStr = in.getOptionValue(INCREMENT_TYPE_ARG);
if ("append".equals(incrementalTypeStr)) {
out.setIncrementalMode(SqoopOptions.IncrementalMode.AppendRows);
// This argument implies ability to append to the same directory.
out.setAppendMode(true);
} else if ("lastmodified".equals(incrementalTypeStr)) {
out.setIncrementalMode(SqoopOptions.IncrementalMode.DateLastModified);
} else {
throw new InvalidOptionsException("Unknown incremental import mode: "
+ incrementalTypeStr + ". Use 'append' or 'lastmodified'."
+ HELP_STR);
}
}
if (in.hasOption(INCREMENT_COL_ARG)) {
out.setIncrementalTestColumn(in.getOptionValue(INCREMENT_COL_ARG));
}
if (in.hasOption(INCREMENT_LAST_VAL_ARG)) {
out.setIncrementalLastValue(in.getOptionValue(INCREMENT_LAST_VAL_ARG));
}
}
@Override
/** {@inheritDoc} */
public void applyOptions(CommandLine in, SqoopOptions out)
@ -382,6 +692,7 @@ public void applyOptions(CommandLine in, SqoopOptions out)
out.setExistingJarName(in.getOptionValue(JAR_FILE_NAME_ARG));
}
applyIncrementalOptions(in, out);
applyHiveOptions(in, out);
applyOutputFormatOptions(in, out);
applyInputFormatOptions(in, out);
@ -437,6 +748,32 @@ protected void validateImportOptions(SqoopOptions options)
}
}
/**
* Validate the incremental import options.
*/
private void validateIncrementalOptions(SqoopOptions options)
throws InvalidOptionsException {
if (options.getIncrementalMode() != SqoopOptions.IncrementalMode.None
&& options.getIncrementalTestColumn() == null) {
throw new InvalidOptionsException(
"For an incremental import, the check column must be specified "
+ "with --" + INCREMENT_COL_ARG + ". " + HELP_STR);
}
if (options.getIncrementalMode() == SqoopOptions.IncrementalMode.None
&& options.getIncrementalTestColumn() != null) {
throw new InvalidOptionsException(
"You must specify an incremental import mode with --"
+ INCREMENT_TYPE_ARG + ". " + HELP_STR);
}
if (options.getIncrementalMode() != SqoopOptions.IncrementalMode.None
&& options.getTableName() == null) {
throw new InvalidOptionsException("Incremental imports require a table."
+ HELP_STR);
}
}
@Override
/** {@inheritDoc} */
public void validateOptions(SqoopOptions options)
@ -452,6 +789,7 @@ public void validateOptions(SqoopOptions options)
}
validateImportOptions(options);
validateIncrementalOptions(options);
validateCommonOptions(options);
validateCodeGenOptions(options);
validateOutputFormatOptions(options);

View File

@ -209,6 +209,12 @@ private int execSession(SqoopOptions opts) throws IOException {
SqoopOptions childOpts = data.getSqoopOptions();
SqoopTool childTool = data.getSqoopTool();
// Don't overwrite the original SqoopOptions with the
// arguments; make a child options.
SqoopOptions clonedOpts = (SqoopOptions) childOpts.clone();
clonedOpts.setParent(childOpts);
int dashPos = getDashPosition(extraArguments);
String [] childArgv;
if (dashPos >= extraArguments.length) {
@ -218,13 +224,13 @@ private int execSession(SqoopOptions opts) throws IOException {
extraArguments.length);
}
int confRet = configureChildTool(childOpts, childTool, childArgv);
int confRet = configureChildTool(clonedOpts, childTool, childArgv);
if (0 != confRet) {
// Error.
return confRet;
}
return childTool.run(childOpts);
return childTool.run(clonedOpts);
}
private int showSession(SqoopOptions opts) throws IOException {

View File

@ -38,6 +38,7 @@ public static Test suite() {
suite.addTest(ThirdPartyTests.suite());
suite.addTestSuite(TestHBaseImport.class);
suite.addTestSuite(TestHBaseQueryImport.class);
suite.addTestSuite(TestIncrementalImport.class);
return suite;
}

View File

@ -0,0 +1,790 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.HsqldbManager;
import com.cloudera.sqoop.manager.ManagerFactory;
import com.cloudera.sqoop.metastore.TestSessions;
import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
import com.cloudera.sqoop.testutil.CommonArgs;
import com.cloudera.sqoop.tool.ImportTool;
import com.cloudera.sqoop.tool.SessionTool;
import junit.framework.TestCase;
import java.sql.Connection;
/**
* Test the incremental import functionality.
*
* These all make use of the auto-connect hsqldb-based metastore.
* The metastore URL is configured to be in-memory, and drop all
* state between individual tests.
*/
public class TestIncrementalImport extends TestCase {
public static final Log LOG = LogFactory.getLog(
TestIncrementalImport.class.getName());
// What database do we read from.
public static final String SOURCE_DB_URL = "jdbc:hsqldb:mem:incremental";
@Override
public void setUp() throws Exception {
// Delete db state between tests.
TestSessions.resetSessionSchema();
resetSourceDataSchema();
}
public static void resetSourceDataSchema() throws SQLException {
SqoopOptions options = new SqoopOptions();
options.setConnectString(SOURCE_DB_URL);
TestSessions.resetSchema(options);
}
public static Configuration newConf() {
return TestSessions.newConf();
}
/**
* Assert that a table has a specified number of rows.
*/
private void assertRowCount(String table, int numRows) throws SQLException {
SqoopOptions options = new SqoopOptions();
options.setConnectString(SOURCE_DB_URL);
HsqldbManager manager = new HsqldbManager(options);
Connection c = manager.getConnection();
PreparedStatement s = null;
ResultSet rs = null;
try {
s = c.prepareStatement("SELECT COUNT(*) FROM " + table);
rs = s.executeQuery();
if (!rs.next()) {
fail("No resultset");
}
int realNumRows = rs.getInt(1);
assertEquals(numRows, realNumRows);
LOG.info("Expected " + numRows + " rows -- ok.");
} finally {
if (null != s) {
try {
s.close();
} catch (SQLException sqlE) {
LOG.warn("exception: " + sqlE);
}
}
if (null != rs) {
try {
rs.close();
} catch (SQLException sqlE) {
LOG.warn("exception: " + sqlE);
}
}
}
}
/**
* Insert rows with id = [low, hi) into tableName.
*/
private void insertIdRows(String tableName, int low, int hi)
throws SQLException {
SqoopOptions options = new SqoopOptions();
options.setConnectString(SOURCE_DB_URL);
HsqldbManager manager = new HsqldbManager(options);
Connection c = manager.getConnection();
PreparedStatement s = null;
try {
s = c.prepareStatement("INSERT INTO " + tableName + " VALUES(?)");
for (int i = low; i < hi; i++) {
s.setInt(1, i);
s.executeUpdate();
}
c.commit();
} finally {
s.close();
}
}
/**
* Insert rows with id = [low, hi) into tableName with
* the timestamp column set to the specified ts.
*/
private void insertIdTimestampRows(String tableName, int low, int hi,
Timestamp ts) throws SQLException {
LOG.info("Inserting id rows in [" + low + ", " + hi + ") @ " + ts);
SqoopOptions options = new SqoopOptions();
options.setConnectString(SOURCE_DB_URL);
HsqldbManager manager = new HsqldbManager(options);
Connection c = manager.getConnection();
PreparedStatement s = null;
try {
s = c.prepareStatement("INSERT INTO " + tableName + " VALUES(?,?)");
for (int i = low; i < hi; i++) {
s.setInt(1, i);
s.setTimestamp(2, ts);
s.executeUpdate();
}
c.commit();
} finally {
s.close();
}
}
/**
* Create a table with an 'id' column full of integers.
*/
private void createIdTable(String tableName, int insertRows)
throws SQLException {
SqoopOptions options = new SqoopOptions();
options.setConnectString(SOURCE_DB_URL);
HsqldbManager manager = new HsqldbManager(options);
Connection c = manager.getConnection();
PreparedStatement s = null;
try {
s = c.prepareStatement("CREATE TABLE " + tableName + "(id INT NOT NULL)");
s.executeUpdate();
c.commit();
insertIdRows(tableName, 0, insertRows);
} finally {
s.close();
}
}
/**
* Create a table with an 'id' column full of integers and a
* last_modified column with timestamps.
*/
private void createTimestampTable(String tableName, int insertRows,
Timestamp baseTime) throws SQLException {
SqoopOptions options = new SqoopOptions();
options.setConnectString(SOURCE_DB_URL);
HsqldbManager manager = new HsqldbManager(options);
Connection c = manager.getConnection();
PreparedStatement s = null;
try {
s = c.prepareStatement("CREATE TABLE " + tableName + "(id INT NOT NULL, "
+ "last_modified TIMESTAMP)");
s.executeUpdate();
c.commit();
insertIdTimestampRows(tableName, 0, insertRows, baseTime);
} finally {
s.close();
}
}
/**
* Delete all files in a directory for a table.
*/
public void clearDir(String tableName) {
try {
FileSystem fs = FileSystem.getLocal(new Configuration());
Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
Path tableDir = new Path(warehouse, tableName);
fs.delete(tableDir, true);
} catch (Exception e) {
fail("Got unexpected exception: " + StringUtils.stringifyException(e));
}
}
/**
* Look at a directory that should contain files full of an imported 'id'
* column. Assert that all numbers in [0, expectedNums) are present
* in order.
*/
public void assertDirOfNumbers(String tableName, int expectedNums) {
try {
FileSystem fs = FileSystem.getLocal(new Configuration());
Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
Path tableDir = new Path(warehouse, tableName);
FileStatus [] stats = fs.listStatus(tableDir);
String [] fileNames = new String[stats.length];
for (int i = 0; i < stats.length; i++) {
fileNames[i] = stats[i].getPath().toString();
}
Arrays.sort(fileNames);
// Read all the files in sorted order, adding the value lines to the list.
List<String> receivedNums = new ArrayList<String>();
for (String fileName : fileNames) {
if (fileName.startsWith("_") || fileName.startsWith(".")) {
continue;
}
BufferedReader r = new BufferedReader(
new InputStreamReader(fs.open(new Path(fileName))));
try {
while (true) {
String s = r.readLine();
if (null == s) {
break;
}
receivedNums.add(s.trim());
}
} finally {
r.close();
}
}
assertEquals(expectedNums, receivedNums.size());
// Compare the received values with the expected set.
for (int i = 0; i < expectedNums; i++) {
assertEquals((int) i, (int) Integer.valueOf(receivedNums.get(i)));
}
} catch (Exception e) {
fail("Got unexpected exception: " + StringUtils.stringifyException(e));
}
}
/**
* Assert that a directory contains a file with exactly one line
* in it, containing the prescribed number 'val'.
*/
public void assertSpecificNumber(String tableName, int val) {
try {
FileSystem fs = FileSystem.getLocal(new Configuration());
Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
Path tableDir = new Path(warehouse, tableName);
FileStatus [] stats = fs.listStatus(tableDir);
String [] fileNames = new String[stats.length];
for (int i = 0; i < stats.length; i++) {
fileNames[i] = stats[i].getPath().toString();
}
// Read the first file that is not a hidden file.
boolean foundVal = false;
for (String fileName : fileNames) {
if (fileName.startsWith("_") || fileName.startsWith(".")) {
continue;
}
if (foundVal) {
// Make sure we don't have two or more "real" files in the dir.
fail("Got an extra data-containing file in this directory.");
}
BufferedReader r = new BufferedReader(
new InputStreamReader(fs.open(new Path(fileName))));
try {
String s = r.readLine();
if (null == s) {
fail("Unexpected empty file " + fileName + ".");
}
assertEquals(val, (int) Integer.valueOf(s.trim()));
String nextLine = r.readLine();
if (nextLine != null) {
fail("Expected only one result, but got another line: " + nextLine);
}
// Successfully got the value we were looking for.
foundVal = true;
} finally {
r.close();
}
}
} catch (Exception e) {
fail("Got unexpected exception: " + StringUtils.stringifyException(e));
}
}
public void runImport(SqoopOptions options, List<String> args) {
try {
Sqoop importer = new Sqoop(new ImportTool(), options.getConf(), options);
int ret = Sqoop.runSqoop(importer, args.toArray(new String[0]));
assertEquals("Failure during job", 0, ret);
} catch (Exception e) {
LOG.error("Got exception running Sqoop: "
+ StringUtils.stringifyException(e));
throw new RuntimeException(e);
}
}
/**
* Return a list of arguments to import the specified table.
*/
private List<String> getArgListForTable(String tableName, boolean commonArgs,
boolean isAppend) {
List<String> args = new ArrayList<String>();
if (commonArgs) {
CommonArgs.addHadoopFlags(args);
}
args.add("--connect");
args.add(SOURCE_DB_URL);
args.add("--table");
args.add(tableName);
args.add("--warehouse-dir");
args.add(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
if (isAppend) {
args.add("--incremental");
args.add("append");
args.add("--check-column");
args.add("id");
} else {
args.add("--incremental");
args.add("lastmodified");
args.add("--check-column");
args.add("last_modified");
}
args.add("--columns");
args.add("id");
args.add("-m");
args.add("1");
return args;
}
/**
* Create a session with the specified name, where the session performs
* an import configured with 'sessionArgs'.
*/
private void createSession(String sessionName, List<String> sessionArgs) {
createSession(sessionName, sessionArgs, newConf());
}
/**
* Create a session with the specified name, where the session performs
* an import configured with 'sessionArgs', using the provided configuration
* as defaults.
*/
private void createSession(String sessionName, List<String> sessionArgs,
Configuration conf) {
try {
SqoopOptions options = new SqoopOptions();
options.setConf(conf);
Sqoop makeSession = new Sqoop(new SessionTool(), conf, options);
List<String> args = new ArrayList<String>();
args.add("--create");
args.add(sessionName);
args.add("--");
args.add("import");
args.addAll(sessionArgs);
int ret = Sqoop.runSqoop(makeSession, args.toArray(new String[0]));
assertEquals("Failure during job to create session", 0, ret);
} catch (Exception e) {
LOG.error("Got exception running Sqoop to create session: "
+ StringUtils.stringifyException(e));
throw new RuntimeException(e);
}
}
/**
* Run the specified session.
*/
private void runSession(String sessionName) {
runSession(sessionName, newConf());
}
/**
* Run the specified session.
*/
private void runSession(String sessionName, Configuration conf) {
try {
SqoopOptions options = new SqoopOptions();
options.setConf(conf);
Sqoop runSession = new Sqoop(new SessionTool(), conf, options);
List<String> args = new ArrayList<String>();
args.add("--exec");
args.add(sessionName);
int ret = Sqoop.runSqoop(runSession, args.toArray(new String[0]));
assertEquals("Failure during job to run session", 0, ret);
} catch (Exception e) {
LOG.error("Got exception running Sqoop to run session: "
+ StringUtils.stringifyException(e));
throw new RuntimeException(e);
}
}
// Incremental import of an empty table, no metastore.
public void testEmptyAppendImport() throws Exception {
final String TABLE_NAME = "emptyAppend1";
createIdTable(TABLE_NAME, 0);
List<String> args = getArgListForTable(TABLE_NAME, true, true);
Configuration conf = newConf();
SqoopOptions options = new SqoopOptions();
options.setConf(conf);
runImport(options, args);
assertDirOfNumbers(TABLE_NAME, 0);
}
// Incremental import of a filled table, no metastore.
public void testFullAppendImport() throws Exception {
final String TABLE_NAME = "fullAppend1";
createIdTable(TABLE_NAME, 10);
List<String> args = getArgListForTable(TABLE_NAME, true, true);
Configuration conf = newConf();
SqoopOptions options = new SqoopOptions();
options.setConf(conf);
runImport(options, args);
assertDirOfNumbers(TABLE_NAME, 10);
}
public void testEmptySessionAppend() throws Exception {
// Create a session and run an import on an empty table.
// Nothing should happen.
final String TABLE_NAME = "emptySession";
createIdTable(TABLE_NAME, 0);
List<String> args = getArgListForTable(TABLE_NAME, false, true);
createSession("emptySession", args);
runSession("emptySession");
assertDirOfNumbers(TABLE_NAME, 0);
// Running the session a second time should result in
// nothing happening, it's still empty.
runSession("emptySession");
assertDirOfNumbers(TABLE_NAME, 0);
}
public void testEmptyThenFullSessionAppend() throws Exception {
// Create an empty table. Import it; nothing happens.
// Add some rows. Verify they are appended.
final String TABLE_NAME = "emptyThenFull";
createIdTable(TABLE_NAME, 0);
List<String> args = getArgListForTable(TABLE_NAME, false, true);
createSession(TABLE_NAME, args);
runSession(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 0);
// Now add some rows.
insertIdRows(TABLE_NAME, 0, 10);
// Running the session a second time should import 10 rows.
runSession(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 10);
// Add some more rows.
insertIdRows(TABLE_NAME, 10, 20);
// Import only those rows.
runSession(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 20);
}
public void testAppend() throws Exception {
// Create a table with data in it; import it.
// Then add more data, verify that only the incremental data is pulled.
final String TABLE_NAME = "append";
createIdTable(TABLE_NAME, 10);
List<String> args = getArgListForTable(TABLE_NAME, false, true);
createSession(TABLE_NAME, args);
runSession(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 10);
// Add some more rows.
insertIdRows(TABLE_NAME, 10, 20);
// Import only those rows.
runSession(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 20);
}
public void testEmptyLastModified() throws Exception {
final String TABLE_NAME = "emptyLastModified";
createTimestampTable(TABLE_NAME, 0, null);
List<String> args = getArgListForTable(TABLE_NAME, true, false);
Configuration conf = newConf();
SqoopOptions options = new SqoopOptions();
options.setConf(conf);
runImport(options, args);
assertDirOfNumbers(TABLE_NAME, 0);
}
public void testFullLastModifiedImport() throws Exception {
// Given a table of rows imported in the past,
// see that they are imported.
final String TABLE_NAME = "fullLastModified";
Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
createTimestampTable(TABLE_NAME, 10, thePast);
List<String> args = getArgListForTable(TABLE_NAME, true, false);
Configuration conf = newConf();
SqoopOptions options = new SqoopOptions();
options.setConf(conf);
runImport(options, args);
assertDirOfNumbers(TABLE_NAME, 10);
}
public void testNoImportFromTheFuture() throws Exception {
// If last-modified dates for writes are serialized to be in the
// future w.r.t. an import, do not import these rows.
final String TABLE_NAME = "futureLastModified";
Timestamp theFuture = new Timestamp(System.currentTimeMillis() + 1000000);
createTimestampTable(TABLE_NAME, 10, theFuture);
List<String> args = getArgListForTable(TABLE_NAME, true, false);
Configuration conf = newConf();
SqoopOptions options = new SqoopOptions();
options.setConf(conf);
runImport(options, args);
assertDirOfNumbers(TABLE_NAME, 0);
}
public void testEmptySessionLastMod() throws Exception {
// Create a session and run an import on an empty table.
// Nothing should happen.
final String TABLE_NAME = "emptySessionLastMod";
createTimestampTable(TABLE_NAME, 0, null);
List<String> args = getArgListForTable(TABLE_NAME, false, false);
args.add("--append");
createSession("emptySessionLastMod", args);
runSession("emptySessionLastMod");
assertDirOfNumbers(TABLE_NAME, 0);
// Running the session a second time should result in
// nothing happening, it's still empty.
runSession("emptySessionLastMod");
assertDirOfNumbers(TABLE_NAME, 0);
}
public void testEmptyThenFullSessionLastMod() throws Exception {
// Create an empty table. Import it; nothing happens.
// Add some rows. Verify they are appended.
final String TABLE_NAME = "emptyThenFullTimestamp";
createTimestampTable(TABLE_NAME, 0, null);
List<String> args = getArgListForTable(TABLE_NAME, false, false);
args.add("--append");
createSession(TABLE_NAME, args);
runSession(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 0);
long importWasBefore = System.currentTimeMillis();
// Let some time elapse.
Thread.sleep(50);
long rowsAddedTime = System.currentTimeMillis() - 5;
// Check: we are adding rows after the previous import time
// and before the current time.
assertTrue(rowsAddedTime > importWasBefore);
assertTrue(rowsAddedTime < System.currentTimeMillis());
insertIdTimestampRows(TABLE_NAME, 0, 10, new Timestamp(rowsAddedTime));
// Running the session a second time should import 10 rows.
runSession(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 10);
// Add some more rows.
importWasBefore = System.currentTimeMillis();
Thread.sleep(50);
rowsAddedTime = System.currentTimeMillis() - 5;
assertTrue(rowsAddedTime > importWasBefore);
assertTrue(rowsAddedTime < System.currentTimeMillis());
insertIdTimestampRows(TABLE_NAME, 10, 20, new Timestamp(rowsAddedTime));
// Import only those rows.
runSession(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 20);
}
public void testAppendWithTimestamp() throws Exception {
// Create a table with data in it; import it.
// Then add more data, verify that only the incremental data is pulled.
final String TABLE_NAME = "appendTimestamp";
Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
createTimestampTable(TABLE_NAME, 10, thePast);
List<String> args = getArgListForTable(TABLE_NAME, false, false);
args.add("--append");
createSession(TABLE_NAME, args);
runSession(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 10);
// Add some more rows.
long importWasBefore = System.currentTimeMillis();
Thread.sleep(50);
long rowsAddedTime = System.currentTimeMillis() - 5;
assertTrue(rowsAddedTime > importWasBefore);
assertTrue(rowsAddedTime < System.currentTimeMillis());
insertIdTimestampRows(TABLE_NAME, 10, 20, new Timestamp(rowsAddedTime));
// Import only those rows.
runSession(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 20);
}
public void testModifyWithTimestamp() throws Exception {
// Create a table with data in it; import it.
// Then modify some existing rows, and verify that we only grab
// those rows.
final String TABLE_NAME = "modifyTimestamp";
Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
createTimestampTable(TABLE_NAME, 10, thePast);
List<String> args = getArgListForTable(TABLE_NAME, false, false);
createSession(TABLE_NAME, args);
runSession(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 10);
// Modify a row.
long importWasBefore = System.currentTimeMillis();
Thread.sleep(50);
long rowsAddedTime = System.currentTimeMillis() - 5;
assertTrue(rowsAddedTime > importWasBefore);
assertTrue(rowsAddedTime < System.currentTimeMillis());
SqoopOptions options = new SqoopOptions();
options.setConnectString(SOURCE_DB_URL);
HsqldbManager manager = new HsqldbManager(options);
Connection c = manager.getConnection();
PreparedStatement s = null;
try {
s = c.prepareStatement("UPDATE " + TABLE_NAME
+ " SET id=?, last_modified=? WHERE id=?");
s.setInt(1, 4000); // the first row should have '4000' in it now.
s.setTimestamp(2, new Timestamp(rowsAddedTime));
s.setInt(3, 0);
s.executeUpdate();
c.commit();
} finally {
s.close();
}
// Import only the new row.
clearDir(TABLE_NAME);
runSession(TABLE_NAME);
assertSpecificNumber(TABLE_NAME, 4000);
}
/**
* ManagerFactory returning an HSQLDB ConnManager which allows you to
* specify the current database timestamp.
*/
public static class InstrumentHsqldbManagerFactory extends ManagerFactory {
@Override
public ConnManager accept(SqoopOptions options) {
LOG.info("Using instrumented manager");
return new InstrumentHsqldbManager(options);
}
}
/**
* Hsqldb ConnManager that lets you set the current reported timestamp
* from the database, to allow testing of boundary conditions for imports.
*/
public static class InstrumentHsqldbManager extends HsqldbManager {
private static Timestamp curTimestamp;
public InstrumentHsqldbManager(SqoopOptions options) {
super(options);
}
@Override
public Timestamp getCurrentDbTimestamp() {
return InstrumentHsqldbManager.curTimestamp;
}
public static void setCurrentDbTimestamp(Timestamp t) {
InstrumentHsqldbManager.curTimestamp = t;
}
}
public void testTimestampBoundary() throws Exception {
// Run an import, and then insert rows with the last-modified timestamp
// set to the exact time when the first import runs. Run a second import
// and ensure that we pick up the new data.
long now = System.currentTimeMillis();
final String TABLE_NAME = "boundaryTimestamp";
Timestamp thePast = new Timestamp(now - 100);
createTimestampTable(TABLE_NAME, 10, thePast);
Timestamp firstJobTime = new Timestamp(now);
InstrumentHsqldbManager.setCurrentDbTimestamp(firstJobTime);
// Configure the job to use the instrumented Hsqldb manager.
Configuration conf = newConf();
conf.set(ConnFactory.FACTORY_CLASS_NAMES_KEY,
InstrumentHsqldbManagerFactory.class.getName());
List<String> args = getArgListForTable(TABLE_NAME, false, false);
args.add("--append");
createSession(TABLE_NAME, args, conf);
runSession(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 10);
// Add some more rows with the timestamp equal to the job run timestamp.
insertIdTimestampRows(TABLE_NAME, 10, 20, firstJobTime);
assertRowCount(TABLE_NAME, 20);
// Run a second job with the clock advanced by 100 ms.
Timestamp secondJobTime = new Timestamp(now + 100);
InstrumentHsqldbManager.setCurrentDbTimestamp(secondJobTime);
// Import only those rows.
runSession(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 20);
}
}

View File

@ -46,7 +46,8 @@
*/
public class TestSessions extends TestCase {
public static final String TEST_AUTOCONNECT_URL = "jdbc:hsqldb:mem:testdb";
public static final String TEST_AUTOCONNECT_URL =
"jdbc:hsqldb:mem:sqoopmetastore";
public static final String TEST_AUTOCONNECT_USER = "SA";
public static final String TEST_AUTOCONNECT_PASS = "";
@ -62,6 +63,13 @@ public static void resetSessionSchema() throws SQLException {
options.setUsername(TEST_AUTOCONNECT_USER);
options.setPassword(TEST_AUTOCONNECT_PASS);
resetSchema(options);
}
/**
* Drop all tables in the configured HSQLDB-based schema/user/pass.
*/
public static void resetSchema(SqoopOptions options) throws SQLException {
HsqldbManager manager = new HsqldbManager(options);
Connection c = manager.getConnection();
Statement s = c.createStatement();