diff --git a/build.xml b/build.xml
index 2181586e..9d668ad9 100644
--- a/build.xml
+++ b/build.xml
@@ -843,7 +843,7 @@
outputFile="${findbugs.output.xml.file}" effort="max"
excludeFilter="${findbugs.excludes}">
-
+
diff --git a/src/java/com/cloudera/sqoop/SqoopOptions.java b/src/java/com/cloudera/sqoop/SqoopOptions.java
index 37e18337..9b471cd9 100644
--- a/src/java/com/cloudera/sqoop/SqoopOptions.java
+++ b/src/java/com/cloudera/sqoop/SqoopOptions.java
@@ -22,6 +22,7 @@
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
@@ -31,9 +32,9 @@
import com.cloudera.sqoop.lib.LargeObjectLoader;
/**
- * Command-line arguments used by Sqoop.
+ * Configurable state used by Sqoop tools.
*/
-public class SqoopOptions {
+public class SqoopOptions implements Cloneable {
public static final Log LOG = LogFactory.getLog(SqoopOptions.class.getName());
@@ -73,6 +74,22 @@ public enum FileLayout {
SequenceFile
}
+ /**
+ * Incremental imports support two modes:
+ *
+ * - new rows being appended to the end of a table with an
+ * incrementing id
+ * - new data results in a date-last-modified column being
+ * updated to NOW(); Sqoop will pull all dirty rows in the next
+ * incremental import.
+ *
+ */
+ public enum IncrementalMode {
+ None,
+ AppendRows,
+ DateLastModified,
+ }
+
// TODO(aaron): Adding something here? Add a setter and a getter.
// Add a default value in initDefaults() if you need one. If this value
@@ -149,6 +166,26 @@ public enum FileLayout {
private String hbaseRowKeyCol; // Column of the input to use as the row key.
private boolean hbaseCreateTable; // if true, create tables/col families.
+ // col to filter on for incremental imports.
+ private String incrementalTestCol;
+ // incremental import mode we're using.
+ private IncrementalMode incrementalMode;
+ // What was the last-imported value of incrementalTestCol?
+ private String incrementalLastValue;
+
+
+ // These next two fields are not serialized to the metastore.
+ // If this SqoopOptions is created by reading a saved session, these will
+ // be populated by the SessionStorage to facilitate updating the same
+ // session.
+ private String sessionName;
+ private Map sessionStorageDescriptor;
+
+ // If we restore a session and then allow the user to apply arguments on
+ // top, we retain the version without the arguments in a reference to the
+ // 'parent' SqoopOptions instance, here.
+ private SqoopOptions parent;
+
public SqoopOptions() {
initDefaults(null);
}
@@ -356,10 +393,12 @@ public void loadProperties(Properties props) {
this.targetDir);
this.append = getBooleanProperty(props, "hdfs.append.dir", this.append);
- String fileFmtStr = props.getProperty("hdfs.file.format", "text");
- if (fileFmtStr.equals("seq")) {
- this.layout = FileLayout.SequenceFile;
- } else {
+ try {
+ this.layout = FileLayout.valueOf(
+ props.getProperty("hdfs.file.format", this.layout.toString()));
+ } catch (IllegalArgumentException iae) {
+ LOG.warn("Unsupported file format: "
+ + props.getProperty("hdfs.file.format", null) + "; setting to text");
this.layout = FileLayout.TextFile;
}
@@ -410,6 +449,20 @@ public void loadProperties(Properties props) {
this.hbaseRowKeyCol);
this.hbaseCreateTable = getBooleanProperty(props, "hbase.create.table",
this.hbaseCreateTable);
+
+ try {
+ this.incrementalMode = IncrementalMode.valueOf(props.getProperty(
+ "incremental.mode", this.incrementalMode.toString()));
+ } catch (IllegalArgumentException iae) {
+ LOG.warn("Invalid incremental import type: "
+ + props.getProperty("incremental.mode", null) + "; setting to None");
+ this.incrementalMode = IncrementalMode.None;
+ }
+
+ this.incrementalTestCol = props.getProperty("incremental.col",
+ this.incrementalTestCol);
+ this.incrementalLastValue = props.getProperty("incremental.last.value",
+ this.incrementalLastValue);
}
/**
@@ -448,11 +501,7 @@ public Properties writeProperties() {
putProperty(props, "hdfs.warehouse.dir", this.warehouseDir);
putProperty(props, "hdfs.target.dir", this.targetDir);
putProperty(props, "hdfs.append.dir", Boolean.toString(this.append));
- if (this.layout == FileLayout.SequenceFile) {
- putProperty(props, "hdfs.file.format", "seq");
- } else {
- putProperty(props, "hdfs.file.format", "text");
- }
+ putProperty(props, "hdfs.file.format", this.layout.toString());
putProperty(props, "direct.import", Boolean.toString(this.direct));
putProperty(props, "hive.import", Boolean.toString(this.hiveImport));
putProperty(props, "hive.overwrite.table",
@@ -482,9 +531,48 @@ public Properties writeProperties() {
putProperty(props, "hbase.create.table",
Boolean.toString(this.hbaseCreateTable));
+ putProperty(props, "incremental.mode", this.incrementalMode.toString());
+ putProperty(props, "incremental.col", this.incrementalTestCol);
+ putProperty(props, "incremental.last.value", this.incrementalLastValue);
+
return props;
}
+ @Override
+ public Object clone() {
+ try {
+ SqoopOptions other = (SqoopOptions) super.clone();
+ if (null != columns) {
+ other.columns = Arrays.copyOf(columns, columns.length);
+ }
+
+ if (null != dbOutColumns) {
+ other.dbOutColumns = Arrays.copyOf(dbOutColumns, dbOutColumns.length);
+ }
+
+ if (null != inputDelimiters) {
+ other.inputDelimiters = (DelimiterSet) inputDelimiters.clone();
+ }
+
+ if (null != outputDelimiters) {
+ other.outputDelimiters = (DelimiterSet) outputDelimiters.clone();
+ }
+
+ if (null != conf) {
+ other.conf = new Configuration(conf);
+ }
+
+ if (null != extraArgs) {
+ other.extraArgs = Arrays.copyOf(extraArgs, extraArgs.length);
+ }
+
+ return other;
+ } catch (CloneNotSupportedException cnse) {
+ // Shouldn't happen.
+ return null;
+ }
+ }
+
/**
* @return the temp directory to use; this is guaranteed to end with
* the file separator character (e.g., '/').
@@ -536,6 +624,8 @@ private void initDefaults(Configuration baseConfiguration) {
this.extraArgs = null;
this.dbOutColumns = null;
+
+ this.incrementalMode = IncrementalMode.None;
}
/**
@@ -1312,5 +1402,93 @@ public String getHBaseTable() {
public void setHBaseTable(String table) {
this.hbaseTable = table;
}
+
+ /**
+ * Set the column of the import source table to check for incremental import
+ * state.
+ */
+ public void setIncrementalTestColumn(String colName) {
+ this.incrementalTestCol = colName;
+ }
+
+ /**
+ * Return the name of the column of the import source table
+ * to check for incremental import state.
+ */
+ public String getIncrementalTestColumn() {
+ return this.incrementalTestCol;
+ }
+
+ /**
+ * Set the incremental import mode to use.
+ */
+ public void setIncrementalMode(IncrementalMode mode) {
+ this.incrementalMode = mode;
+ }
+
+ /**
+ * Get the incremental import mode to use.
+ */
+ public IncrementalMode getIncrementalMode() {
+ return this.incrementalMode;
+ }
+
+ /**
+ * Set the last imported value of the incremental import test column.
+ */
+ public void setIncrementalLastValue(String lastVal) {
+ this.incrementalLastValue = lastVal;
+ }
+
+ /**
+ * Get the last imported value of the incremental import test column.
+ */
+ public String getIncrementalLastValue() {
+ return this.incrementalLastValue;
+ }
+
+ /**
+ * Set the name of the saved session this SqoopOptions belongs to.
+ */
+ public void setSessionName(String session) {
+ this.sessionName = session;
+ }
+
+ /**
+ * Get the name of the saved session this SqoopOptions belongs to.
+ */
+ public String getSessionName() {
+ return this.sessionName;
+ }
+
+ /**
+ * Set the SessionStorage descriptor used to open the saved session
+ * this SqoopOptions belongs to.
+ */
+ public void setStorageDescriptor(Map descriptor) {
+ this.sessionStorageDescriptor = descriptor;
+ }
+
+ /**
+ * Get the SessionStorage descriptor used to open the saved session
+ * this SqoopOptions belongs to.
+ */
+ public Map getStorageDescriptor() {
+ return this.sessionStorageDescriptor;
+ }
+
+ /**
+ * Return the parent instance this SqoopOptions is derived from.
+ */
+ public SqoopOptions getParent() {
+ return this.parent;
+ }
+
+ /**
+ * Set the parent instance this SqoopOptions is derived from.
+ */
+ public void setParent(SqoopOptions options) {
+ this.parent = options;
+ }
}
diff --git a/src/java/com/cloudera/sqoop/manager/ConnManager.java b/src/java/com/cloudera/sqoop/manager/ConnManager.java
index 750118ae..bfa11a23 100644
--- a/src/java/com/cloudera/sqoop/manager/ConnManager.java
+++ b/src/java/com/cloudera/sqoop/manager/ConnManager.java
@@ -22,6 +22,7 @@
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Timestamp;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -200,5 +201,22 @@ public void updateTable(ExportJobContext context)
* to close.
*/
public abstract void release();
+
+ /**
+ * Return the current time from the perspective of the database server.
+ * Return null if this cannot be accessed.
+ */
+ public Timestamp getCurrentDbTimestamp() {
+ LOG.warn("getCurrentDbTimestamp(): Using local system timestamp.");
+ return new Timestamp(System.currentTimeMillis());
+ }
+
+ /**
+ * Given a non-null Timestamp, return the quoted string that can
+ * be inserted into a SQL statement, representing that timestamp.
+ */
+ public String timestampToQueryString(Timestamp ts) {
+ return "'" + ts + "'";
+ }
}
diff --git a/src/java/com/cloudera/sqoop/manager/HsqldbManager.java b/src/java/com/cloudera/sqoop/manager/HsqldbManager.java
index 66210571..3d04e876 100644
--- a/src/java/com/cloudera/sqoop/manager/HsqldbManager.java
+++ b/src/java/com/cloudera/sqoop/manager/HsqldbManager.java
@@ -52,4 +52,14 @@ public String[] listDatabases() {
String [] databases = {HSQL_SCHEMA_NAME};
return databases;
}
+
+ @Override
+ /**
+ * {@inheritDoc}
+ */
+ protected String getCurTimestampQuery() {
+ // HSQLDB requires that you select from a table; this table is
+ // guaranteed to exist.
+ return "SELECT CURRENT_TIMESTAMP FROM INFORMATION_SCHEMA.SYSTEM_TABLES";
+ }
}
diff --git a/src/java/com/cloudera/sqoop/manager/OracleManager.java b/src/java/com/cloudera/sqoop/manager/OracleManager.java
index 4824ed73..285c8c00 100644
--- a/src/java/com/cloudera/sqoop/manager/OracleManager.java
+++ b/src/java/com/cloudera/sqoop/manager/OracleManager.java
@@ -23,6 +23,7 @@
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.lang.reflect.Method;
@@ -456,5 +457,15 @@ protected void finalize() throws Throwable {
close();
super.finalize();
}
+
+ @Override
+ protected String getCurTimestampQuery() {
+ return "SELECT SYSDATE FROM dual";
+ }
+
+ @Override
+ public String timestampToQueryString(Timestamp ts) {
+ return "TO_TIMESTAMP('" + ts + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
+ }
}
diff --git a/src/java/com/cloudera/sqoop/manager/SqlManager.java b/src/java/com/cloudera/sqoop/manager/SqlManager.java
index c07b3a0a..8ba8efff 100644
--- a/src/java/com/cloudera/sqoop/manager/SqlManager.java
+++ b/src/java/com/cloudera/sqoop/manager/SqlManager.java
@@ -18,6 +18,8 @@
package com.cloudera.sqoop.manager;
+import java.sql.Timestamp;
+
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.hive.HiveTypes;
import com.cloudera.sqoop.lib.BlobRef;
@@ -616,8 +618,9 @@ public void release() {
}
}
+ @Override
/**
- * @{inheritDoc}
+ * {@inheritDoc}
*/
public void updateTable(ExportJobContext context)
throws IOException, ExportException {
@@ -625,4 +628,51 @@ public void updateTable(ExportJobContext context)
JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context);
exportJob.runExport();
}
+
+ /**
+ * @return a SQL query to retrieve the current timestamp from the db.
+ */
+ protected String getCurTimestampQuery() {
+ return "SELECT CURRENT_TIMESTAMP()";
+ }
+
+ @Override
+ /**
+ * {@inheritDoc}
+ */
+ public Timestamp getCurrentDbTimestamp() {
+ release(); // Release any previous ResultSet.
+
+ Statement s = null;
+ ResultSet rs = null;
+ try {
+ Connection c = getConnection();
+ s = c.createStatement();
+ rs = s.executeQuery(getCurTimestampQuery());
+ if (rs == null || !rs.next()) {
+ return null; // empty ResultSet.
+ }
+
+ return rs.getTimestamp(1);
+ } catch (SQLException sqlE) {
+ LOG.warn("SQL exception accessing current timestamp: " + sqlE);
+ return null;
+ } finally {
+ try {
+ if (null != rs) {
+ rs.close();
+ }
+ } catch (SQLException sqlE) {
+ LOG.warn("SQL Exception closing resultset: " + sqlE);
+ }
+
+ try {
+ if (null != s) {
+ s.close();
+ }
+ } catch (SQLException sqlE) {
+ LOG.warn("SQL Exception closing statement: " + sqlE);
+ }
+ }
+ }
}
diff --git a/src/java/com/cloudera/sqoop/mapreduce/JobBase.java b/src/java/com/cloudera/sqoop/mapreduce/JobBase.java
index 0613506d..37483d04 100644
--- a/src/java/com/cloudera/sqoop/mapreduce/JobBase.java
+++ b/src/java/com/cloudera/sqoop/mapreduce/JobBase.java
@@ -29,8 +29,6 @@
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
diff --git a/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java b/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java
index ffcde79e..e51190fd 100644
--- a/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java
+++ b/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java
@@ -106,6 +106,7 @@ public void open(Map descriptor) throws IOException {
setMetastoreUser(conf.get(AUTO_STORAGE_USER_KEY, DEFAULT_AUTO_USER));
setMetastorePassword(conf.get(AUTO_STORAGE_PASS_KEY,
DEFAULT_AUTO_PASSWORD));
+ setConnectedDescriptor(descriptor);
init();
}
diff --git a/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbSessionStorage.java b/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbSessionStorage.java
index 09d9da9f..5009dc3a 100644
--- a/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbSessionStorage.java
+++ b/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbSessionStorage.java
@@ -117,6 +117,7 @@ public class HsqldbSessionStorage extends SessionStorage {
private static final String SQOOP_TOOL_KEY = "sqoop.tool";
+ private Map connectedDescriptor;
private String metastoreConnectStr;
private String metastoreUser;
private String metastorePassword;
@@ -144,6 +145,13 @@ protected void setMetastorePassword(String pass) {
private static final String DB_DRIVER_CLASS = "org.hsqldb.jdbcDriver";
+ /**
+ * Set the descriptor used to open() this storage.
+ */
+ protected void setConnectedDescriptor(Map descriptor) {
+ this.connectedDescriptor = descriptor;
+ }
+
@Override
/**
* Initialize the connection to the database.
@@ -152,6 +160,7 @@ public void open(Map descriptor) throws IOException {
setMetastoreConnectStr(descriptor.get(META_CONNECT_KEY));
setMetastoreUser(descriptor.get(META_USERNAME_KEY));
setMetastorePassword(descriptor.get(META_PASSWORD_KEY));
+ setConnectedDescriptor(descriptor);
init();
}
@@ -293,6 +302,10 @@ public SessionData read(String sessionName) throws IOException {
opts.setConf(conf);
opts.loadProperties(sqoopOptProps);
+ // Set the session connection information for this session.
+ opts.setSessionName(sessionName);
+ opts.setStorageDescriptor(connectedDescriptor);
+
return new SessionData(opts, tool);
} catch (SQLException sqlE) {
throw new IOException("Error communicating with database", sqlE);
diff --git a/src/java/com/cloudera/sqoop/orm/ClassWriter.java b/src/java/com/cloudera/sqoop/orm/ClassWriter.java
index a9d4ced1..52bcb6e0 100644
--- a/src/java/com/cloudera/sqoop/orm/ClassWriter.java
+++ b/src/java/com/cloudera/sqoop/orm/ClassWriter.java
@@ -909,6 +909,23 @@ public void generate() throws IOException {
colNames = connManager.getColumnNamesForQuery(
this.options.getSqlQuery());
}
+ } else {
+ // These column names were provided by the user. They may not be in
+ // the same case as the keys in the columnTypes map. So make sure
+ // we add the appropriate aliases in that map.
+ for (String userColName : colNames) {
+ for (Map.Entry typeEntry : columnTypes.entrySet()) {
+ String typeColName = typeEntry.getKey();
+ if (typeColName.equalsIgnoreCase(userColName)
+ && !typeColName.equals(userColName)) {
+ // We found the correct-case equivalent.
+ columnTypes.put(userColName, typeEntry.getValue());
+ // No need to continue iteration; only one could match.
+ // Also, the use of put() just invalidated the iterator.
+ break;
+ }
+ }
+ }
}
// Translate all the column names into names that are safe to
diff --git a/src/java/com/cloudera/sqoop/orm/CompilationManager.java b/src/java/com/cloudera/sqoop/orm/CompilationManager.java
index d6a6ccf4..4e46c2a1 100644
--- a/src/java/com/cloudera/sqoop/orm/CompilationManager.java
+++ b/src/java/com/cloudera/sqoop/orm/CompilationManager.java
@@ -23,10 +23,7 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.net.URL;
-import java.net.URLDecoder;
import java.util.ArrayList;
-import java.util.Enumeration;
import java.util.List;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
@@ -42,7 +39,6 @@
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.util.FileListing;
-import com.cloudera.sqoop.shims.HadoopShim;
import com.cloudera.sqoop.util.Jars;
diff --git a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java
index 250317db..0320fe3b 100644
--- a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java
+++ b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java
@@ -114,6 +114,11 @@ public abstract class BaseSqoopTool extends SqoopTool {
public static final String HELP_ARG = "help";
public static final String UPDATE_KEY_ARG = "update-key";
+ // Arguments for incremental imports.
+ public static final String INCREMENT_TYPE_ARG = "incremental";
+ public static final String INCREMENT_COL_ARG = "check-column";
+ public static final String INCREMENT_LAST_VAL_ARG = "last-value";
+
// HBase arguments.
public static final String HBASE_TABLE_ARG = "hbase-table";
public static final String HBASE_COL_FAM_ARG = "column-family";
diff --git a/src/java/com/cloudera/sqoop/tool/ImportTool.java b/src/java/com/cloudera/sqoop/tool/ImportTool.java
index 0ec5310d..6b298acc 100644
--- a/src/java/com/cloudera/sqoop/tool/ImportTool.java
+++ b/src/java/com/cloudera/sqoop/tool/ImportTool.java
@@ -19,7 +19,16 @@
package com.cloudera.sqoop.tool;
import java.io.IOException;
+
+import java.math.BigDecimal;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
import java.util.List;
+import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.OptionBuilder;
@@ -35,6 +44,10 @@
import com.cloudera.sqoop.cli.ToolOptions;
import com.cloudera.sqoop.hive.HiveImport;
import com.cloudera.sqoop.manager.ImportJobContext;
+
+import com.cloudera.sqoop.metastore.SessionData;
+import com.cloudera.sqoop.metastore.SessionStorage;
+import com.cloudera.sqoop.metastore.SessionStorageFactory;
import com.cloudera.sqoop.util.AppendUtils;
import com.cloudera.sqoop.util.ImportException;
import org.apache.hadoop.fs.Path;
@@ -76,8 +89,239 @@ protected boolean init(SqoopOptions sqoopOpts) {
public List getGeneratedJarFiles() {
return this.codeGenerator.getGeneratedJarFiles();
}
+
+ /**
+ * @return true if the supplied options specify an incremental import.
+ */
+ private boolean isIncremental(SqoopOptions options) {
+ return !options.getIncrementalMode().equals(
+ SqoopOptions.IncrementalMode.None);
+ }
- protected void importTable(SqoopOptions options, String tableName,
+ /**
+ * If this is an incremental import, then we should save the
+ * user's state back to the metastore (if this session was run
+ * from the metastore). Otherwise, log to the user what data
+ * they need to supply next time.
+ */
+ private void saveIncrementalState(SqoopOptions options)
+ throws IOException {
+ if (!isIncremental(options)) {
+ return;
+ }
+
+ Map descriptor = options.getStorageDescriptor();
+ String sessionName = options.getSessionName();
+
+ if (null != sessionName && null != descriptor) {
+ // Actually save it back to the metastore.
+ LOG.info("Saving incremental import state to the metastore");
+ SessionStorageFactory ssf = new SessionStorageFactory(options.getConf());
+ SessionStorage storage = ssf.getSessionStorage(descriptor);
+ storage.open(descriptor);
+ try {
+ // Save the 'parent' SqoopOptions; this does not contain the mutations
+ // to the SqoopOptions state that occurred over the course of this
+ // execution, except for the one we specifically want to memorize:
+ // the latest value of the check column.
+ SessionData data = new SessionData(options.getParent(), this);
+ storage.update(sessionName, data);
+ LOG.info("Updated data for session: " + sessionName);
+ } finally {
+ storage.close();
+ }
+ } else {
+ // If there wasn't a parent SqoopOptions, then the incremental
+ // state data was stored in the current SqoopOptions.
+ LOG.info("Incremental import complete! To run another incremental "
+ + "import of all data following this import, supply the "
+ + "following arguments:");
+ SqoopOptions.IncrementalMode incrementalMode =
+ options.getIncrementalMode();
+ switch (incrementalMode) {
+ case AppendRows:
+ LOG.info(" --incremental append");
+ break;
+ case DateLastModified:
+ LOG.info(" --incremental lastmodified");
+ break;
+ default:
+ LOG.warn("Undefined incremental mode: " + incrementalMode);
+ break;
+ }
+ LOG.info(" --check-column " + options.getIncrementalTestColumn());
+ LOG.info(" --last-value " + options.getIncrementalLastValue());
+ LOG.info("(Consider saving this with 'sqoop session --create')");
+ }
+ }
+
+ /**
+ * Return the max value in the incremental-import test column. This
+ * value must be numeric.
+ */
+ private BigDecimal getMaxColumnId(SqoopOptions options) throws SQLException {
+ StringBuilder sb = new StringBuilder();
+ sb.append("SELECT MAX(");
+ sb.append(options.getIncrementalTestColumn());
+ sb.append(") FROM ");
+ sb.append(options.getTableName());
+
+ String where = options.getWhereClause();
+ if (null != where) {
+ sb.append(" WHERE ");
+ sb.append(where);
+ }
+
+ Connection conn = manager.getConnection();
+ Statement s = null;
+ ResultSet rs = null;
+ try {
+ s = conn.createStatement();
+ rs = s.executeQuery(sb.toString());
+ if (!rs.next()) {
+ // This probably means the table is empty.
+ LOG.warn("Unexpected: empty results for max value query?");
+ return null;
+ }
+
+ return rs.getBigDecimal(1);
+ } finally {
+ try {
+ if (null != rs) {
+ rs.close();
+ }
+ } catch (SQLException sqlE) {
+ LOG.warn("SQL Exception closing resultset: " + sqlE);
+ }
+
+ try {
+ if (null != s) {
+ s.close();
+ }
+ } catch (SQLException sqlE) {
+ LOG.warn("SQL Exception closing statement: " + sqlE);
+ }
+ }
+ }
+
+ /**
+ * Initialize the constraints which set the incremental import range.
+ * @return false if an import is not necessary, because the dataset has not
+ * changed.
+ */
+ private boolean initIncrementalConstraints(SqoopOptions options,
+ ImportJobContext context) throws ImportException, IOException {
+
+ // If this is an incremental import, determine the constraints
+ // to inject in the WHERE clause or $CONDITIONS for a query.
+ // Also modify the 'last value' field of the SqoopOptions to
+ // specify the current job start time / start row.
+
+ if (!isIncremental(options)) {
+ return true;
+ }
+
+ SqoopOptions.IncrementalMode incrementalMode = options.getIncrementalMode();
+ String nextIncrementalValue = null;
+
+ switch (incrementalMode) {
+ case AppendRows:
+ try {
+ BigDecimal nextVal = getMaxColumnId(options);
+ if (null != nextVal) {
+ nextIncrementalValue = nextVal.toString();
+ }
+ } catch (SQLException sqlE) {
+ throw new IOException(sqlE);
+ }
+ break;
+ case DateLastModified:
+ Timestamp dbTimestamp = manager.getCurrentDbTimestamp();
+ if (null == dbTimestamp) {
+ throw new IOException("Could not get current time from database");
+ }
+
+ nextIncrementalValue = manager.timestampToQueryString(dbTimestamp);
+ break;
+ default:
+ throw new ImportException("Undefined incremental import type: "
+ + incrementalMode);
+ }
+
+ // Build the WHERE clause components that are used to import
+ // only this incremental section.
+ StringBuilder sb = new StringBuilder();
+ String prevEndpoint = options.getIncrementalLastValue();
+
+ String checkColName = manager.escapeColName(
+ options.getIncrementalTestColumn());
+ LOG.info("Incremental import based on column " + checkColName);
+ if (null != prevEndpoint) {
+ if (prevEndpoint.equals(nextIncrementalValue)) {
+ LOG.info("No new rows detected since last import.");
+ return false;
+ }
+ LOG.info("Lower bound value: " + prevEndpoint);
+ sb.append(checkColName);
+ switch (incrementalMode) {
+ case AppendRows:
+ sb.append(" > ");
+ break;
+ case DateLastModified:
+ sb.append(" >= ");
+ break;
+ default:
+ throw new ImportException("Undefined comparison");
+ }
+ sb.append(prevEndpoint);
+ sb.append(" AND ");
+ }
+
+ if (null != nextIncrementalValue) {
+ sb.append(checkColName);
+ switch (incrementalMode) {
+ case AppendRows:
+ sb.append(" <= ");
+ break;
+ case DateLastModified:
+ sb.append(" < ");
+ break;
+ default:
+ throw new ImportException("Undefined comparison");
+ }
+ sb.append(nextIncrementalValue);
+ } else {
+ sb.append(checkColName);
+ sb.append(" IS NULL ");
+ }
+
+ LOG.info("Upper bound value: " + nextIncrementalValue);
+
+ String prevWhereClause = options.getWhereClause();
+ if (null != prevWhereClause) {
+ sb.append(" AND (");
+ sb.append(prevWhereClause);
+ sb.append(")");
+ }
+
+ String newConstraints = sb.toString();
+ options.setWhereClause(newConstraints);
+
+ // Save this state for next time.
+ SqoopOptions recordOptions = options.getParent();
+ if (null == recordOptions) {
+ recordOptions = options;
+ }
+ recordOptions.setIncrementalLastValue(nextIncrementalValue);
+
+ return true;
+ }
+
+ /**
+ * Import a table or query.
+ * @return true if an import was performed, false otherwise.
+ */
+ protected boolean importTable(SqoopOptions options, String tableName,
HiveImport hiveImport) throws IOException, ImportException {
String jarFile = null;
@@ -88,6 +332,12 @@ protected void importTable(SqoopOptions options, String tableName,
ImportJobContext context = new ImportJobContext(tableName, jarFile,
options, getOutputPath(options, tableName));
+ // If we're doing an incremental import, set up the
+ // filtering conditions used to get the latest records.
+ if (!initIncrementalConstraints(options, context)) {
+ return false;
+ }
+
if (null != tableName) {
manager.importTable(context);
} else {
@@ -103,6 +353,10 @@ protected void importTable(SqoopOptions options, String tableName,
if (options.doHiveImport()) {
hiveImport.importTable(tableName, options.getHiveTableName(), false);
}
+
+ saveIncrementalState(options);
+
+ return true;
}
/**
@@ -264,12 +518,42 @@ protected RelatedOptions getImportOptions() {
return importOpts;
}
+ /**
+ * Return options for incremental import.
+ */
+ protected RelatedOptions getIncrementalOptions() {
+ RelatedOptions incrementalOpts =
+ new RelatedOptions("Incremental import arguments");
+
+ incrementalOpts.addOption(OptionBuilder.withArgName("import-type")
+ .hasArg()
+ .withDescription(
+ "Define an incremental import of type 'append' or 'lastmodified'")
+ .withLongOpt(INCREMENT_TYPE_ARG)
+ .create());
+ incrementalOpts.addOption(OptionBuilder.withArgName("column")
+ .hasArg()
+ .withDescription("Source column to check for incremental change")
+ .withLongOpt(INCREMENT_COL_ARG)
+ .create());
+ incrementalOpts.addOption(OptionBuilder.withArgName("value")
+ .hasArg()
+ .withDescription("Last imported value in the incremental check column")
+ .withLongOpt(INCREMENT_LAST_VAL_ARG)
+ .create());
+
+ return incrementalOpts;
+ }
+
@Override
/** Configure the command-line arguments we expect to receive */
public void configureOptions(ToolOptions toolOptions) {
toolOptions.addUniqueOptions(getCommonOptions());
toolOptions.addUniqueOptions(getImportOptions());
+ if (!allTables) {
+ toolOptions.addUniqueOptions(getIncrementalOptions());
+ }
toolOptions.addUniqueOptions(getOutputFormatOptions());
toolOptions.addUniqueOptions(getInputFormatOptions());
toolOptions.addUniqueOptions(getHiveOptions(true));
@@ -306,6 +590,32 @@ public void printHelp(ToolOptions toolOptions) {
"after a '--' on the command line.");
}
+ private void applyIncrementalOptions(CommandLine in, SqoopOptions out)
+ throws InvalidOptionsException {
+ if (in.hasOption(INCREMENT_TYPE_ARG)) {
+ String incrementalTypeStr = in.getOptionValue(INCREMENT_TYPE_ARG);
+ if ("append".equals(incrementalTypeStr)) {
+ out.setIncrementalMode(SqoopOptions.IncrementalMode.AppendRows);
+ // This argument implies ability to append to the same directory.
+ out.setAppendMode(true);
+ } else if ("lastmodified".equals(incrementalTypeStr)) {
+ out.setIncrementalMode(SqoopOptions.IncrementalMode.DateLastModified);
+ } else {
+ throw new InvalidOptionsException("Unknown incremental import mode: "
+ + incrementalTypeStr + ". Use 'append' or 'lastmodified'."
+ + HELP_STR);
+ }
+ }
+
+ if (in.hasOption(INCREMENT_COL_ARG)) {
+ out.setIncrementalTestColumn(in.getOptionValue(INCREMENT_COL_ARG));
+ }
+
+ if (in.hasOption(INCREMENT_LAST_VAL_ARG)) {
+ out.setIncrementalLastValue(in.getOptionValue(INCREMENT_LAST_VAL_ARG));
+ }
+ }
+
@Override
/** {@inheritDoc} */
public void applyOptions(CommandLine in, SqoopOptions out)
@@ -382,6 +692,7 @@ public void applyOptions(CommandLine in, SqoopOptions out)
out.setExistingJarName(in.getOptionValue(JAR_FILE_NAME_ARG));
}
+ applyIncrementalOptions(in, out);
applyHiveOptions(in, out);
applyOutputFormatOptions(in, out);
applyInputFormatOptions(in, out);
@@ -437,6 +748,32 @@ protected void validateImportOptions(SqoopOptions options)
}
}
+ /**
+ * Validate the incremental import options.
+ */
+ private void validateIncrementalOptions(SqoopOptions options)
+ throws InvalidOptionsException {
+ if (options.getIncrementalMode() != SqoopOptions.IncrementalMode.None
+ && options.getIncrementalTestColumn() == null) {
+ throw new InvalidOptionsException(
+ "For an incremental import, the check column must be specified "
+ + "with --" + INCREMENT_COL_ARG + ". " + HELP_STR);
+ }
+
+ if (options.getIncrementalMode() == SqoopOptions.IncrementalMode.None
+ && options.getIncrementalTestColumn() != null) {
+ throw new InvalidOptionsException(
+ "You must specify an incremental import mode with --"
+ + INCREMENT_TYPE_ARG + ". " + HELP_STR);
+ }
+
+ if (options.getIncrementalMode() != SqoopOptions.IncrementalMode.None
+ && options.getTableName() == null) {
+ throw new InvalidOptionsException("Incremental imports require a table."
+ + HELP_STR);
+ }
+ }
+
@Override
/** {@inheritDoc} */
public void validateOptions(SqoopOptions options)
@@ -452,6 +789,7 @@ public void validateOptions(SqoopOptions options)
}
validateImportOptions(options);
+ validateIncrementalOptions(options);
validateCommonOptions(options);
validateCodeGenOptions(options);
validateOutputFormatOptions(options);
diff --git a/src/java/com/cloudera/sqoop/tool/SessionTool.java b/src/java/com/cloudera/sqoop/tool/SessionTool.java
index af6acc4a..d395a0b1 100644
--- a/src/java/com/cloudera/sqoop/tool/SessionTool.java
+++ b/src/java/com/cloudera/sqoop/tool/SessionTool.java
@@ -209,6 +209,12 @@ private int execSession(SqoopOptions opts) throws IOException {
SqoopOptions childOpts = data.getSqoopOptions();
SqoopTool childTool = data.getSqoopTool();
+ // Don't overwrite the original SqoopOptions with the
+ // arguments; make a child options.
+
+ SqoopOptions clonedOpts = (SqoopOptions) childOpts.clone();
+ clonedOpts.setParent(childOpts);
+
int dashPos = getDashPosition(extraArguments);
String [] childArgv;
if (dashPos >= extraArguments.length) {
@@ -218,13 +224,13 @@ private int execSession(SqoopOptions opts) throws IOException {
extraArguments.length);
}
- int confRet = configureChildTool(childOpts, childTool, childArgv);
+ int confRet = configureChildTool(clonedOpts, childTool, childArgv);
if (0 != confRet) {
// Error.
return confRet;
}
- return childTool.run(childOpts);
+ return childTool.run(clonedOpts);
}
private int showSession(SqoopOptions opts) throws IOException {
diff --git a/src/test/com/cloudera/sqoop/AllTests.java b/src/test/com/cloudera/sqoop/AllTests.java
index 3d82a242..7bff1e0f 100644
--- a/src/test/com/cloudera/sqoop/AllTests.java
+++ b/src/test/com/cloudera/sqoop/AllTests.java
@@ -38,6 +38,7 @@ public static Test suite() {
suite.addTest(ThirdPartyTests.suite());
suite.addTestSuite(TestHBaseImport.class);
suite.addTestSuite(TestHBaseQueryImport.class);
+ suite.addTestSuite(TestIncrementalImport.class);
return suite;
}
diff --git a/src/test/com/cloudera/sqoop/TestIncrementalImport.java b/src/test/com/cloudera/sqoop/TestIncrementalImport.java
new file mode 100644
index 00000000..10a7231f
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/TestIncrementalImport.java
@@ -0,0 +1,790 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.HsqldbManager;
+import com.cloudera.sqoop.manager.ManagerFactory;
+import com.cloudera.sqoop.metastore.TestSessions;
+import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.tool.ImportTool;
+import com.cloudera.sqoop.tool.SessionTool;
+
+import junit.framework.TestCase;
+
+import java.sql.Connection;
+
+/**
+ * Test the incremental import functionality.
+ *
+ * These all make use of the auto-connect hsqldb-based metastore.
+ * The metastore URL is configured to be in-memory, and drop all
+ * state between individual tests.
+ */
+public class TestIncrementalImport extends TestCase {
+
+ public static final Log LOG = LogFactory.getLog(
+ TestIncrementalImport.class.getName());
+
+ // What database do we read from.
+ public static final String SOURCE_DB_URL = "jdbc:hsqldb:mem:incremental";
+
+ @Override
+ public void setUp() throws Exception {
+ // Delete db state between tests.
+ TestSessions.resetSessionSchema();
+ resetSourceDataSchema();
+ }
+
+ public static void resetSourceDataSchema() throws SQLException {
+ SqoopOptions options = new SqoopOptions();
+ options.setConnectString(SOURCE_DB_URL);
+ TestSessions.resetSchema(options);
+ }
+
+ public static Configuration newConf() {
+ return TestSessions.newConf();
+ }
+
+ /**
+ * Assert that a table has a specified number of rows.
+ */
+ private void assertRowCount(String table, int numRows) throws SQLException {
+ SqoopOptions options = new SqoopOptions();
+ options.setConnectString(SOURCE_DB_URL);
+ HsqldbManager manager = new HsqldbManager(options);
+ Connection c = manager.getConnection();
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+ s = c.prepareStatement("SELECT COUNT(*) FROM " + table);
+ rs = s.executeQuery();
+ if (!rs.next()) {
+ fail("No resultset");
+ }
+ int realNumRows = rs.getInt(1);
+ assertEquals(numRows, realNumRows);
+ LOG.info("Expected " + numRows + " rows -- ok.");
+ } finally {
+ if (null != s) {
+ try {
+ s.close();
+ } catch (SQLException sqlE) {
+ LOG.warn("exception: " + sqlE);
+ }
+ }
+
+ if (null != rs) {
+ try {
+ rs.close();
+ } catch (SQLException sqlE) {
+ LOG.warn("exception: " + sqlE);
+ }
+ }
+ }
+ }
+
+ /**
+ * Insert rows with id = [low, hi) into tableName.
+ */
+ private void insertIdRows(String tableName, int low, int hi)
+ throws SQLException {
+ SqoopOptions options = new SqoopOptions();
+ options.setConnectString(SOURCE_DB_URL);
+ HsqldbManager manager = new HsqldbManager(options);
+ Connection c = manager.getConnection();
+ PreparedStatement s = null;
+ try {
+ s = c.prepareStatement("INSERT INTO " + tableName + " VALUES(?)");
+ for (int i = low; i < hi; i++) {
+ s.setInt(1, i);
+ s.executeUpdate();
+ }
+
+ c.commit();
+ } finally {
+ s.close();
+ }
+ }
+
+ /**
+ * Insert rows with id = [low, hi) into tableName with
+ * the timestamp column set to the specified ts.
+ */
+ private void insertIdTimestampRows(String tableName, int low, int hi,
+ Timestamp ts) throws SQLException {
+ LOG.info("Inserting id rows in [" + low + ", " + hi + ") @ " + ts);
+ SqoopOptions options = new SqoopOptions();
+ options.setConnectString(SOURCE_DB_URL);
+ HsqldbManager manager = new HsqldbManager(options);
+ Connection c = manager.getConnection();
+ PreparedStatement s = null;
+ try {
+ s = c.prepareStatement("INSERT INTO " + tableName + " VALUES(?,?)");
+ for (int i = low; i < hi; i++) {
+ s.setInt(1, i);
+ s.setTimestamp(2, ts);
+ s.executeUpdate();
+ }
+
+ c.commit();
+ } finally {
+ s.close();
+ }
+ }
+
+ /**
+ * Create a table with an 'id' column full of integers.
+ */
+ private void createIdTable(String tableName, int insertRows)
+ throws SQLException {
+ SqoopOptions options = new SqoopOptions();
+ options.setConnectString(SOURCE_DB_URL);
+ HsqldbManager manager = new HsqldbManager(options);
+ Connection c = manager.getConnection();
+ PreparedStatement s = null;
+ try {
+ s = c.prepareStatement("CREATE TABLE " + tableName + "(id INT NOT NULL)");
+ s.executeUpdate();
+ c.commit();
+ insertIdRows(tableName, 0, insertRows);
+ } finally {
+ s.close();
+ }
+ }
+
+ /**
+ * Create a table with an 'id' column full of integers and a
+ * last_modified column with timestamps.
+ */
+ private void createTimestampTable(String tableName, int insertRows,
+ Timestamp baseTime) throws SQLException {
+ SqoopOptions options = new SqoopOptions();
+ options.setConnectString(SOURCE_DB_URL);
+ HsqldbManager manager = new HsqldbManager(options);
+ Connection c = manager.getConnection();
+ PreparedStatement s = null;
+ try {
+ s = c.prepareStatement("CREATE TABLE " + tableName + "(id INT NOT NULL, "
+ + "last_modified TIMESTAMP)");
+ s.executeUpdate();
+ c.commit();
+ insertIdTimestampRows(tableName, 0, insertRows, baseTime);
+ } finally {
+ s.close();
+ }
+ }
+
+ /**
+ * Delete all files in a directory for a table.
+ */
+ public void clearDir(String tableName) {
+ try {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+ Path tableDir = new Path(warehouse, tableName);
+ fs.delete(tableDir, true);
+ } catch (Exception e) {
+ fail("Got unexpected exception: " + StringUtils.stringifyException(e));
+ }
+ }
+
+ /**
+ * Look at a directory that should contain files full of an imported 'id'
+ * column. Assert that all numbers in [0, expectedNums) are present
+ * in order.
+ */
+ public void assertDirOfNumbers(String tableName, int expectedNums) {
+ try {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+ Path tableDir = new Path(warehouse, tableName);
+ FileStatus [] stats = fs.listStatus(tableDir);
+ String [] fileNames = new String[stats.length];
+ for (int i = 0; i < stats.length; i++) {
+ fileNames[i] = stats[i].getPath().toString();
+ }
+
+ Arrays.sort(fileNames);
+
+ // Read all the files in sorted order, adding the value lines to the list.
+ List receivedNums = new ArrayList();
+ for (String fileName : fileNames) {
+ if (fileName.startsWith("_") || fileName.startsWith(".")) {
+ continue;
+ }
+
+ BufferedReader r = new BufferedReader(
+ new InputStreamReader(fs.open(new Path(fileName))));
+ try {
+ while (true) {
+ String s = r.readLine();
+ if (null == s) {
+ break;
+ }
+
+ receivedNums.add(s.trim());
+ }
+ } finally {
+ r.close();
+ }
+ }
+
+ assertEquals(expectedNums, receivedNums.size());
+
+ // Compare the received values with the expected set.
+ for (int i = 0; i < expectedNums; i++) {
+ assertEquals((int) i, (int) Integer.valueOf(receivedNums.get(i)));
+ }
+ } catch (Exception e) {
+ fail("Got unexpected exception: " + StringUtils.stringifyException(e));
+ }
+ }
+
+ /**
+ * Assert that a directory contains a file with exactly one line
+ * in it, containing the prescribed number 'val'.
+ */
+ public void assertSpecificNumber(String tableName, int val) {
+ try {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+ Path tableDir = new Path(warehouse, tableName);
+ FileStatus [] stats = fs.listStatus(tableDir);
+ String [] fileNames = new String[stats.length];
+ for (int i = 0; i < stats.length; i++) {
+ fileNames[i] = stats[i].getPath().toString();
+ }
+
+ // Read the first file that is not a hidden file.
+ boolean foundVal = false;
+ for (String fileName : fileNames) {
+ if (fileName.startsWith("_") || fileName.startsWith(".")) {
+ continue;
+ }
+
+ if (foundVal) {
+ // Make sure we don't have two or more "real" files in the dir.
+ fail("Got an extra data-containing file in this directory.");
+ }
+
+ BufferedReader r = new BufferedReader(
+ new InputStreamReader(fs.open(new Path(fileName))));
+ try {
+ String s = r.readLine();
+ if (null == s) {
+ fail("Unexpected empty file " + fileName + ".");
+ }
+ assertEquals(val, (int) Integer.valueOf(s.trim()));
+
+ String nextLine = r.readLine();
+ if (nextLine != null) {
+ fail("Expected only one result, but got another line: " + nextLine);
+ }
+
+ // Successfully got the value we were looking for.
+ foundVal = true;
+ } finally {
+ r.close();
+ }
+ }
+ } catch (Exception e) {
+ fail("Got unexpected exception: " + StringUtils.stringifyException(e));
+ }
+ }
+
+ public void runImport(SqoopOptions options, List args) {
+ try {
+ Sqoop importer = new Sqoop(new ImportTool(), options.getConf(), options);
+ int ret = Sqoop.runSqoop(importer, args.toArray(new String[0]));
+ assertEquals("Failure during job", 0, ret);
+ } catch (Exception e) {
+ LOG.error("Got exception running Sqoop: "
+ + StringUtils.stringifyException(e));
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Return a list of arguments to import the specified table.
+ */
+ private List getArgListForTable(String tableName, boolean commonArgs,
+ boolean isAppend) {
+ List args = new ArrayList();
+ if (commonArgs) {
+ CommonArgs.addHadoopFlags(args);
+ }
+ args.add("--connect");
+ args.add(SOURCE_DB_URL);
+ args.add("--table");
+ args.add(tableName);
+ args.add("--warehouse-dir");
+ args.add(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+ if (isAppend) {
+ args.add("--incremental");
+ args.add("append");
+ args.add("--check-column");
+ args.add("id");
+ } else {
+ args.add("--incremental");
+ args.add("lastmodified");
+ args.add("--check-column");
+ args.add("last_modified");
+ }
+ args.add("--columns");
+ args.add("id");
+ args.add("-m");
+ args.add("1");
+
+ return args;
+ }
+
+ /**
+ * Create a session with the specified name, where the session performs
+ * an import configured with 'sessionArgs'.
+ */
+ private void createSession(String sessionName, List sessionArgs) {
+ createSession(sessionName, sessionArgs, newConf());
+ }
+
+ /**
+ * Create a session with the specified name, where the session performs
+ * an import configured with 'sessionArgs', using the provided configuration
+ * as defaults.
+ */
+ private void createSession(String sessionName, List sessionArgs,
+ Configuration conf) {
+ try {
+ SqoopOptions options = new SqoopOptions();
+ options.setConf(conf);
+ Sqoop makeSession = new Sqoop(new SessionTool(), conf, options);
+
+ List args = new ArrayList();
+ args.add("--create");
+ args.add(sessionName);
+ args.add("--");
+ args.add("import");
+ args.addAll(sessionArgs);
+
+ int ret = Sqoop.runSqoop(makeSession, args.toArray(new String[0]));
+ assertEquals("Failure during job to create session", 0, ret);
+ } catch (Exception e) {
+ LOG.error("Got exception running Sqoop to create session: "
+ + StringUtils.stringifyException(e));
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Run the specified session.
+ */
+ private void runSession(String sessionName) {
+ runSession(sessionName, newConf());
+ }
+
+ /**
+ * Run the specified session.
+ */
+ private void runSession(String sessionName, Configuration conf) {
+ try {
+ SqoopOptions options = new SqoopOptions();
+ options.setConf(conf);
+ Sqoop runSession = new Sqoop(new SessionTool(), conf, options);
+
+ List args = new ArrayList();
+ args.add("--exec");
+ args.add(sessionName);
+
+ int ret = Sqoop.runSqoop(runSession, args.toArray(new String[0]));
+ assertEquals("Failure during job to run session", 0, ret);
+ } catch (Exception e) {
+ LOG.error("Got exception running Sqoop to run session: "
+ + StringUtils.stringifyException(e));
+ throw new RuntimeException(e);
+ }
+ }
+
+ // Incremental import of an empty table, no metastore.
+ public void testEmptyAppendImport() throws Exception {
+ final String TABLE_NAME = "emptyAppend1";
+ createIdTable(TABLE_NAME, 0);
+ List args = getArgListForTable(TABLE_NAME, true, true);
+
+ Configuration conf = newConf();
+ SqoopOptions options = new SqoopOptions();
+ options.setConf(conf);
+ runImport(options, args);
+
+ assertDirOfNumbers(TABLE_NAME, 0);
+ }
+
+ // Incremental import of a filled table, no metastore.
+ public void testFullAppendImport() throws Exception {
+ final String TABLE_NAME = "fullAppend1";
+ createIdTable(TABLE_NAME, 10);
+ List args = getArgListForTable(TABLE_NAME, true, true);
+
+ Configuration conf = newConf();
+ SqoopOptions options = new SqoopOptions();
+ options.setConf(conf);
+ runImport(options, args);
+
+ assertDirOfNumbers(TABLE_NAME, 10);
+ }
+
+ public void testEmptySessionAppend() throws Exception {
+ // Create a session and run an import on an empty table.
+ // Nothing should happen.
+
+ final String TABLE_NAME = "emptySession";
+ createIdTable(TABLE_NAME, 0);
+
+ List args = getArgListForTable(TABLE_NAME, false, true);
+ createSession("emptySession", args);
+ runSession("emptySession");
+ assertDirOfNumbers(TABLE_NAME, 0);
+
+ // Running the session a second time should result in
+ // nothing happening, it's still empty.
+ runSession("emptySession");
+ assertDirOfNumbers(TABLE_NAME, 0);
+ }
+
+ public void testEmptyThenFullSessionAppend() throws Exception {
+ // Create an empty table. Import it; nothing happens.
+ // Add some rows. Verify they are appended.
+
+ final String TABLE_NAME = "emptyThenFull";
+ createIdTable(TABLE_NAME, 0);
+
+ List args = getArgListForTable(TABLE_NAME, false, true);
+ createSession(TABLE_NAME, args);
+ runSession(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 0);
+
+ // Now add some rows.
+ insertIdRows(TABLE_NAME, 0, 10);
+
+ // Running the session a second time should import 10 rows.
+ runSession(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 10);
+
+ // Add some more rows.
+ insertIdRows(TABLE_NAME, 10, 20);
+
+ // Import only those rows.
+ runSession(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 20);
+ }
+
+ public void testAppend() throws Exception {
+ // Create a table with data in it; import it.
+ // Then add more data, verify that only the incremental data is pulled.
+
+ final String TABLE_NAME = "append";
+ createIdTable(TABLE_NAME, 10);
+
+ List args = getArgListForTable(TABLE_NAME, false, true);
+ createSession(TABLE_NAME, args);
+ runSession(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 10);
+
+ // Add some more rows.
+ insertIdRows(TABLE_NAME, 10, 20);
+
+ // Import only those rows.
+ runSession(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 20);
+ }
+
+ public void testEmptyLastModified() throws Exception {
+ final String TABLE_NAME = "emptyLastModified";
+ createTimestampTable(TABLE_NAME, 0, null);
+ List args = getArgListForTable(TABLE_NAME, true, false);
+
+ Configuration conf = newConf();
+ SqoopOptions options = new SqoopOptions();
+ options.setConf(conf);
+ runImport(options, args);
+
+ assertDirOfNumbers(TABLE_NAME, 0);
+ }
+
+ public void testFullLastModifiedImport() throws Exception {
+ // Given a table of rows imported in the past,
+ // see that they are imported.
+ final String TABLE_NAME = "fullLastModified";
+ Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
+ createTimestampTable(TABLE_NAME, 10, thePast);
+
+ List args = getArgListForTable(TABLE_NAME, true, false);
+
+ Configuration conf = newConf();
+ SqoopOptions options = new SqoopOptions();
+ options.setConf(conf);
+ runImport(options, args);
+
+ assertDirOfNumbers(TABLE_NAME, 10);
+ }
+
+ public void testNoImportFromTheFuture() throws Exception {
+ // If last-modified dates for writes are serialized to be in the
+ // future w.r.t. an import, do not import these rows.
+
+ final String TABLE_NAME = "futureLastModified";
+ Timestamp theFuture = new Timestamp(System.currentTimeMillis() + 1000000);
+ createTimestampTable(TABLE_NAME, 10, theFuture);
+
+ List args = getArgListForTable(TABLE_NAME, true, false);
+
+ Configuration conf = newConf();
+ SqoopOptions options = new SqoopOptions();
+ options.setConf(conf);
+ runImport(options, args);
+
+ assertDirOfNumbers(TABLE_NAME, 0);
+ }
+
+ public void testEmptySessionLastMod() throws Exception {
+ // Create a session and run an import on an empty table.
+ // Nothing should happen.
+
+ final String TABLE_NAME = "emptySessionLastMod";
+ createTimestampTable(TABLE_NAME, 0, null);
+
+ List args = getArgListForTable(TABLE_NAME, false, false);
+ args.add("--append");
+ createSession("emptySessionLastMod", args);
+ runSession("emptySessionLastMod");
+ assertDirOfNumbers(TABLE_NAME, 0);
+
+ // Running the session a second time should result in
+ // nothing happening, it's still empty.
+ runSession("emptySessionLastMod");
+ assertDirOfNumbers(TABLE_NAME, 0);
+ }
+
+ public void testEmptyThenFullSessionLastMod() throws Exception {
+ // Create an empty table. Import it; nothing happens.
+ // Add some rows. Verify they are appended.
+
+ final String TABLE_NAME = "emptyThenFullTimestamp";
+ createTimestampTable(TABLE_NAME, 0, null);
+
+ List args = getArgListForTable(TABLE_NAME, false, false);
+ args.add("--append");
+ createSession(TABLE_NAME, args);
+ runSession(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 0);
+
+ long importWasBefore = System.currentTimeMillis();
+
+ // Let some time elapse.
+ Thread.sleep(50);
+
+ long rowsAddedTime = System.currentTimeMillis() - 5;
+
+ // Check: we are adding rows after the previous import time
+ // and before the current time.
+ assertTrue(rowsAddedTime > importWasBefore);
+ assertTrue(rowsAddedTime < System.currentTimeMillis());
+
+ insertIdTimestampRows(TABLE_NAME, 0, 10, new Timestamp(rowsAddedTime));
+
+ // Running the session a second time should import 10 rows.
+ runSession(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 10);
+
+ // Add some more rows.
+ importWasBefore = System.currentTimeMillis();
+ Thread.sleep(50);
+ rowsAddedTime = System.currentTimeMillis() - 5;
+ assertTrue(rowsAddedTime > importWasBefore);
+ assertTrue(rowsAddedTime < System.currentTimeMillis());
+ insertIdTimestampRows(TABLE_NAME, 10, 20, new Timestamp(rowsAddedTime));
+
+ // Import only those rows.
+ runSession(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 20);
+ }
+
+ public void testAppendWithTimestamp() throws Exception {
+ // Create a table with data in it; import it.
+ // Then add more data, verify that only the incremental data is pulled.
+
+ final String TABLE_NAME = "appendTimestamp";
+ Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
+ createTimestampTable(TABLE_NAME, 10, thePast);
+
+ List args = getArgListForTable(TABLE_NAME, false, false);
+ args.add("--append");
+ createSession(TABLE_NAME, args);
+ runSession(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 10);
+
+ // Add some more rows.
+ long importWasBefore = System.currentTimeMillis();
+ Thread.sleep(50);
+ long rowsAddedTime = System.currentTimeMillis() - 5;
+ assertTrue(rowsAddedTime > importWasBefore);
+ assertTrue(rowsAddedTime < System.currentTimeMillis());
+ insertIdTimestampRows(TABLE_NAME, 10, 20, new Timestamp(rowsAddedTime));
+
+ // Import only those rows.
+ runSession(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 20);
+ }
+
+ public void testModifyWithTimestamp() throws Exception {
+ // Create a table with data in it; import it.
+ // Then modify some existing rows, and verify that we only grab
+ // those rows.
+
+ final String TABLE_NAME = "modifyTimestamp";
+ Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
+ createTimestampTable(TABLE_NAME, 10, thePast);
+
+ List args = getArgListForTable(TABLE_NAME, false, false);
+ createSession(TABLE_NAME, args);
+ runSession(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 10);
+
+ // Modify a row.
+ long importWasBefore = System.currentTimeMillis();
+ Thread.sleep(50);
+ long rowsAddedTime = System.currentTimeMillis() - 5;
+ assertTrue(rowsAddedTime > importWasBefore);
+ assertTrue(rowsAddedTime < System.currentTimeMillis());
+ SqoopOptions options = new SqoopOptions();
+ options.setConnectString(SOURCE_DB_URL);
+ HsqldbManager manager = new HsqldbManager(options);
+ Connection c = manager.getConnection();
+ PreparedStatement s = null;
+ try {
+ s = c.prepareStatement("UPDATE " + TABLE_NAME
+ + " SET id=?, last_modified=? WHERE id=?");
+ s.setInt(1, 4000); // the first row should have '4000' in it now.
+ s.setTimestamp(2, new Timestamp(rowsAddedTime));
+ s.setInt(3, 0);
+ s.executeUpdate();
+ c.commit();
+ } finally {
+ s.close();
+ }
+
+ // Import only the new row.
+ clearDir(TABLE_NAME);
+ runSession(TABLE_NAME);
+ assertSpecificNumber(TABLE_NAME, 4000);
+ }
+
+ /**
+ * ManagerFactory returning an HSQLDB ConnManager which allows you to
+ * specify the current database timestamp.
+ */
+ public static class InstrumentHsqldbManagerFactory extends ManagerFactory {
+ @Override
+ public ConnManager accept(SqoopOptions options) {
+ LOG.info("Using instrumented manager");
+ return new InstrumentHsqldbManager(options);
+ }
+ }
+
+ /**
+ * Hsqldb ConnManager that lets you set the current reported timestamp
+ * from the database, to allow testing of boundary conditions for imports.
+ */
+ public static class InstrumentHsqldbManager extends HsqldbManager {
+ private static Timestamp curTimestamp;
+
+ public InstrumentHsqldbManager(SqoopOptions options) {
+ super(options);
+ }
+
+ @Override
+ public Timestamp getCurrentDbTimestamp() {
+ return InstrumentHsqldbManager.curTimestamp;
+ }
+
+ public static void setCurrentDbTimestamp(Timestamp t) {
+ InstrumentHsqldbManager.curTimestamp = t;
+ }
+ }
+
+ public void testTimestampBoundary() throws Exception {
+ // Run an import, and then insert rows with the last-modified timestamp
+ // set to the exact time when the first import runs. Run a second import
+ // and ensure that we pick up the new data.
+
+ long now = System.currentTimeMillis();
+
+ final String TABLE_NAME = "boundaryTimestamp";
+ Timestamp thePast = new Timestamp(now - 100);
+ createTimestampTable(TABLE_NAME, 10, thePast);
+
+ Timestamp firstJobTime = new Timestamp(now);
+ InstrumentHsqldbManager.setCurrentDbTimestamp(firstJobTime);
+
+ // Configure the job to use the instrumented Hsqldb manager.
+ Configuration conf = newConf();
+ conf.set(ConnFactory.FACTORY_CLASS_NAMES_KEY,
+ InstrumentHsqldbManagerFactory.class.getName());
+
+ List args = getArgListForTable(TABLE_NAME, false, false);
+ args.add("--append");
+ createSession(TABLE_NAME, args, conf);
+ runSession(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 10);
+
+ // Add some more rows with the timestamp equal to the job run timestamp.
+ insertIdTimestampRows(TABLE_NAME, 10, 20, firstJobTime);
+ assertRowCount(TABLE_NAME, 20);
+
+ // Run a second job with the clock advanced by 100 ms.
+ Timestamp secondJobTime = new Timestamp(now + 100);
+ InstrumentHsqldbManager.setCurrentDbTimestamp(secondJobTime);
+
+ // Import only those rows.
+ runSession(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 20);
+ }
+}
+
diff --git a/src/test/com/cloudera/sqoop/metastore/TestSessions.java b/src/test/com/cloudera/sqoop/metastore/TestSessions.java
index b4c3fac4..8d1e853f 100644
--- a/src/test/com/cloudera/sqoop/metastore/TestSessions.java
+++ b/src/test/com/cloudera/sqoop/metastore/TestSessions.java
@@ -46,7 +46,8 @@
*/
public class TestSessions extends TestCase {
- public static final String TEST_AUTOCONNECT_URL = "jdbc:hsqldb:mem:testdb";
+ public static final String TEST_AUTOCONNECT_URL =
+ "jdbc:hsqldb:mem:sqoopmetastore";
public static final String TEST_AUTOCONNECT_USER = "SA";
public static final String TEST_AUTOCONNECT_PASS = "";
@@ -62,6 +63,13 @@ public static void resetSessionSchema() throws SQLException {
options.setUsername(TEST_AUTOCONNECT_USER);
options.setPassword(TEST_AUTOCONNECT_PASS);
+ resetSchema(options);
+ }
+
+ /**
+ * Drop all tables in the configured HSQLDB-based schema/user/pass.
+ */
+ public static void resetSchema(SqoopOptions options) throws SQLException {
HsqldbManager manager = new HsqldbManager(options);
Connection c = manager.getConnection();
Statement s = c.createStatement();