From 6efcec0da22fc737e1aa30b65b380b5c497deeb1 Mon Sep 17 00:00:00 2001 From: Andrew Bayer Date: Fri, 22 Jul 2011 20:04:07 +0000 Subject: [PATCH] SQOOP-41. Add support for incremental imports. Modify ImportTool, SessionTool, to support incremental imports. Add TestIncrementalImport to unit test incremental imports. SqoopOptions now implements Cloneable. SQOOP-44. Bugfix in ClassWriter: fix NPE if the case of column names specified with --columns do not match the case reported by the database. From: Aaron Kimball git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149944 13f79535-47bb-0310-9956-ffa450edef68 --- build.xml | 2 +- src/java/com/cloudera/sqoop/SqoopOptions.java | 200 ++++- .../cloudera/sqoop/manager/ConnManager.java | 18 + .../cloudera/sqoop/manager/HsqldbManager.java | 10 + .../cloudera/sqoop/manager/OracleManager.java | 11 + .../cloudera/sqoop/manager/SqlManager.java | 52 +- .../com/cloudera/sqoop/mapreduce/JobBase.java | 2 - .../metastore/hsqldb/AutoHsqldbStorage.java | 1 + .../hsqldb/HsqldbSessionStorage.java | 13 + .../com/cloudera/sqoop/orm/ClassWriter.java | 17 + .../sqoop/orm/CompilationManager.java | 4 - .../cloudera/sqoop/tool/BaseSqoopTool.java | 5 + .../com/cloudera/sqoop/tool/ImportTool.java | 340 +++++++- .../com/cloudera/sqoop/tool/SessionTool.java | 10 +- src/test/com/cloudera/sqoop/AllTests.java | 1 + .../cloudera/sqoop/TestIncrementalImport.java | 790 ++++++++++++++++++ .../sqoop/metastore/TestSessions.java | 10 +- 17 files changed, 1463 insertions(+), 23 deletions(-) create mode 100644 src/test/com/cloudera/sqoop/TestIncrementalImport.java diff --git a/build.xml b/build.xml index 2181586e..9d668ad9 100644 --- a/build.xml +++ b/build.xml @@ -843,7 +843,7 @@ outputFile="${findbugs.output.xml.file}" effort="max" excludeFilter="${findbugs.excludes}"> - + diff --git a/src/java/com/cloudera/sqoop/SqoopOptions.java b/src/java/com/cloudera/sqoop/SqoopOptions.java index 37e18337..9b471cd9 100644 --- a/src/java/com/cloudera/sqoop/SqoopOptions.java +++ b/src/java/com/cloudera/sqoop/SqoopOptions.java @@ -22,6 +22,7 @@ import java.io.File; import java.util.ArrayList; import java.util.Arrays; +import java.util.Map; import java.util.Properties; import org.apache.commons.logging.Log; @@ -31,9 +32,9 @@ import com.cloudera.sqoop.lib.LargeObjectLoader; /** - * Command-line arguments used by Sqoop. + * Configurable state used by Sqoop tools. */ -public class SqoopOptions { +public class SqoopOptions implements Cloneable { public static final Log LOG = LogFactory.getLog(SqoopOptions.class.getName()); @@ -73,6 +74,22 @@ public enum FileLayout { SequenceFile } + /** + * Incremental imports support two modes: + *
    + *
  • new rows being appended to the end of a table with an + * incrementing id
  • + *
  • new data results in a date-last-modified column being + * updated to NOW(); Sqoop will pull all dirty rows in the next + * incremental import.
  • + *
+ */ + public enum IncrementalMode { + None, + AppendRows, + DateLastModified, + } + // TODO(aaron): Adding something here? Add a setter and a getter. // Add a default value in initDefaults() if you need one. If this value @@ -149,6 +166,26 @@ public enum FileLayout { private String hbaseRowKeyCol; // Column of the input to use as the row key. private boolean hbaseCreateTable; // if true, create tables/col families. + // col to filter on for incremental imports. + private String incrementalTestCol; + // incremental import mode we're using. + private IncrementalMode incrementalMode; + // What was the last-imported value of incrementalTestCol? + private String incrementalLastValue; + + + // These next two fields are not serialized to the metastore. + // If this SqoopOptions is created by reading a saved session, these will + // be populated by the SessionStorage to facilitate updating the same + // session. + private String sessionName; + private Map sessionStorageDescriptor; + + // If we restore a session and then allow the user to apply arguments on + // top, we retain the version without the arguments in a reference to the + // 'parent' SqoopOptions instance, here. + private SqoopOptions parent; + public SqoopOptions() { initDefaults(null); } @@ -356,10 +393,12 @@ public void loadProperties(Properties props) { this.targetDir); this.append = getBooleanProperty(props, "hdfs.append.dir", this.append); - String fileFmtStr = props.getProperty("hdfs.file.format", "text"); - if (fileFmtStr.equals("seq")) { - this.layout = FileLayout.SequenceFile; - } else { + try { + this.layout = FileLayout.valueOf( + props.getProperty("hdfs.file.format", this.layout.toString())); + } catch (IllegalArgumentException iae) { + LOG.warn("Unsupported file format: " + + props.getProperty("hdfs.file.format", null) + "; setting to text"); this.layout = FileLayout.TextFile; } @@ -410,6 +449,20 @@ public void loadProperties(Properties props) { this.hbaseRowKeyCol); this.hbaseCreateTable = getBooleanProperty(props, "hbase.create.table", this.hbaseCreateTable); + + try { + this.incrementalMode = IncrementalMode.valueOf(props.getProperty( + "incremental.mode", this.incrementalMode.toString())); + } catch (IllegalArgumentException iae) { + LOG.warn("Invalid incremental import type: " + + props.getProperty("incremental.mode", null) + "; setting to None"); + this.incrementalMode = IncrementalMode.None; + } + + this.incrementalTestCol = props.getProperty("incremental.col", + this.incrementalTestCol); + this.incrementalLastValue = props.getProperty("incremental.last.value", + this.incrementalLastValue); } /** @@ -448,11 +501,7 @@ public Properties writeProperties() { putProperty(props, "hdfs.warehouse.dir", this.warehouseDir); putProperty(props, "hdfs.target.dir", this.targetDir); putProperty(props, "hdfs.append.dir", Boolean.toString(this.append)); - if (this.layout == FileLayout.SequenceFile) { - putProperty(props, "hdfs.file.format", "seq"); - } else { - putProperty(props, "hdfs.file.format", "text"); - } + putProperty(props, "hdfs.file.format", this.layout.toString()); putProperty(props, "direct.import", Boolean.toString(this.direct)); putProperty(props, "hive.import", Boolean.toString(this.hiveImport)); putProperty(props, "hive.overwrite.table", @@ -482,9 +531,48 @@ public Properties writeProperties() { putProperty(props, "hbase.create.table", Boolean.toString(this.hbaseCreateTable)); + putProperty(props, "incremental.mode", this.incrementalMode.toString()); + putProperty(props, "incremental.col", this.incrementalTestCol); + putProperty(props, "incremental.last.value", this.incrementalLastValue); + return props; } + @Override + public Object clone() { + try { + SqoopOptions other = (SqoopOptions) super.clone(); + if (null != columns) { + other.columns = Arrays.copyOf(columns, columns.length); + } + + if (null != dbOutColumns) { + other.dbOutColumns = Arrays.copyOf(dbOutColumns, dbOutColumns.length); + } + + if (null != inputDelimiters) { + other.inputDelimiters = (DelimiterSet) inputDelimiters.clone(); + } + + if (null != outputDelimiters) { + other.outputDelimiters = (DelimiterSet) outputDelimiters.clone(); + } + + if (null != conf) { + other.conf = new Configuration(conf); + } + + if (null != extraArgs) { + other.extraArgs = Arrays.copyOf(extraArgs, extraArgs.length); + } + + return other; + } catch (CloneNotSupportedException cnse) { + // Shouldn't happen. + return null; + } + } + /** * @return the temp directory to use; this is guaranteed to end with * the file separator character (e.g., '/'). @@ -536,6 +624,8 @@ private void initDefaults(Configuration baseConfiguration) { this.extraArgs = null; this.dbOutColumns = null; + + this.incrementalMode = IncrementalMode.None; } /** @@ -1312,5 +1402,93 @@ public String getHBaseTable() { public void setHBaseTable(String table) { this.hbaseTable = table; } + + /** + * Set the column of the import source table to check for incremental import + * state. + */ + public void setIncrementalTestColumn(String colName) { + this.incrementalTestCol = colName; + } + + /** + * Return the name of the column of the import source table + * to check for incremental import state. + */ + public String getIncrementalTestColumn() { + return this.incrementalTestCol; + } + + /** + * Set the incremental import mode to use. + */ + public void setIncrementalMode(IncrementalMode mode) { + this.incrementalMode = mode; + } + + /** + * Get the incremental import mode to use. + */ + public IncrementalMode getIncrementalMode() { + return this.incrementalMode; + } + + /** + * Set the last imported value of the incremental import test column. + */ + public void setIncrementalLastValue(String lastVal) { + this.incrementalLastValue = lastVal; + } + + /** + * Get the last imported value of the incremental import test column. + */ + public String getIncrementalLastValue() { + return this.incrementalLastValue; + } + + /** + * Set the name of the saved session this SqoopOptions belongs to. + */ + public void setSessionName(String session) { + this.sessionName = session; + } + + /** + * Get the name of the saved session this SqoopOptions belongs to. + */ + public String getSessionName() { + return this.sessionName; + } + + /** + * Set the SessionStorage descriptor used to open the saved session + * this SqoopOptions belongs to. + */ + public void setStorageDescriptor(Map descriptor) { + this.sessionStorageDescriptor = descriptor; + } + + /** + * Get the SessionStorage descriptor used to open the saved session + * this SqoopOptions belongs to. + */ + public Map getStorageDescriptor() { + return this.sessionStorageDescriptor; + } + + /** + * Return the parent instance this SqoopOptions is derived from. + */ + public SqoopOptions getParent() { + return this.parent; + } + + /** + * Set the parent instance this SqoopOptions is derived from. + */ + public void setParent(SqoopOptions options) { + this.parent = options; + } } diff --git a/src/java/com/cloudera/sqoop/manager/ConnManager.java b/src/java/com/cloudera/sqoop/manager/ConnManager.java index 750118ae..bfa11a23 100644 --- a/src/java/com/cloudera/sqoop/manager/ConnManager.java +++ b/src/java/com/cloudera/sqoop/manager/ConnManager.java @@ -22,6 +22,7 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Timestamp; import java.util.Map; import org.apache.commons.logging.Log; @@ -200,5 +201,22 @@ public void updateTable(ExportJobContext context) * to close. */ public abstract void release(); + + /** + * Return the current time from the perspective of the database server. + * Return null if this cannot be accessed. + */ + public Timestamp getCurrentDbTimestamp() { + LOG.warn("getCurrentDbTimestamp(): Using local system timestamp."); + return new Timestamp(System.currentTimeMillis()); + } + + /** + * Given a non-null Timestamp, return the quoted string that can + * be inserted into a SQL statement, representing that timestamp. + */ + public String timestampToQueryString(Timestamp ts) { + return "'" + ts + "'"; + } } diff --git a/src/java/com/cloudera/sqoop/manager/HsqldbManager.java b/src/java/com/cloudera/sqoop/manager/HsqldbManager.java index 66210571..3d04e876 100644 --- a/src/java/com/cloudera/sqoop/manager/HsqldbManager.java +++ b/src/java/com/cloudera/sqoop/manager/HsqldbManager.java @@ -52,4 +52,14 @@ public String[] listDatabases() { String [] databases = {HSQL_SCHEMA_NAME}; return databases; } + + @Override + /** + * {@inheritDoc} + */ + protected String getCurTimestampQuery() { + // HSQLDB requires that you select from a table; this table is + // guaranteed to exist. + return "SELECT CURRENT_TIMESTAMP FROM INFORMATION_SCHEMA.SYSTEM_TABLES"; + } } diff --git a/src/java/com/cloudera/sqoop/manager/OracleManager.java b/src/java/com/cloudera/sqoop/manager/OracleManager.java index 4824ed73..285c8c00 100644 --- a/src/java/com/cloudera/sqoop/manager/OracleManager.java +++ b/src/java/com/cloudera/sqoop/manager/OracleManager.java @@ -23,6 +23,7 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Timestamp; import java.util.HashMap; import java.util.Map; import java.lang.reflect.Method; @@ -456,5 +457,15 @@ protected void finalize() throws Throwable { close(); super.finalize(); } + + @Override + protected String getCurTimestampQuery() { + return "SELECT SYSDATE FROM dual"; + } + + @Override + public String timestampToQueryString(Timestamp ts) { + return "TO_TIMESTAMP('" + ts + "', 'YYYY-MM-DD HH24:MI:SS.FF')"; + } } diff --git a/src/java/com/cloudera/sqoop/manager/SqlManager.java b/src/java/com/cloudera/sqoop/manager/SqlManager.java index c07b3a0a..8ba8efff 100644 --- a/src/java/com/cloudera/sqoop/manager/SqlManager.java +++ b/src/java/com/cloudera/sqoop/manager/SqlManager.java @@ -18,6 +18,8 @@ package com.cloudera.sqoop.manager; +import java.sql.Timestamp; + import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.hive.HiveTypes; import com.cloudera.sqoop.lib.BlobRef; @@ -616,8 +618,9 @@ public void release() { } } + @Override /** - * @{inheritDoc} + * {@inheritDoc} */ public void updateTable(ExportJobContext context) throws IOException, ExportException { @@ -625,4 +628,51 @@ public void updateTable(ExportJobContext context) JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context); exportJob.runExport(); } + + /** + * @return a SQL query to retrieve the current timestamp from the db. + */ + protected String getCurTimestampQuery() { + return "SELECT CURRENT_TIMESTAMP()"; + } + + @Override + /** + * {@inheritDoc} + */ + public Timestamp getCurrentDbTimestamp() { + release(); // Release any previous ResultSet. + + Statement s = null; + ResultSet rs = null; + try { + Connection c = getConnection(); + s = c.createStatement(); + rs = s.executeQuery(getCurTimestampQuery()); + if (rs == null || !rs.next()) { + return null; // empty ResultSet. + } + + return rs.getTimestamp(1); + } catch (SQLException sqlE) { + LOG.warn("SQL exception accessing current timestamp: " + sqlE); + return null; + } finally { + try { + if (null != rs) { + rs.close(); + } + } catch (SQLException sqlE) { + LOG.warn("SQL Exception closing resultset: " + sqlE); + } + + try { + if (null != s) { + s.close(); + } + } catch (SQLException sqlE) { + LOG.warn("SQL Exception closing statement: " + sqlE); + } + } + } } diff --git a/src/java/com/cloudera/sqoop/mapreduce/JobBase.java b/src/java/com/cloudera/sqoop/mapreduce/JobBase.java index 0613506d..37483d04 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/JobBase.java +++ b/src/java/com/cloudera/sqoop/mapreduce/JobBase.java @@ -29,8 +29,6 @@ import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; diff --git a/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java b/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java index ffcde79e..e51190fd 100644 --- a/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java +++ b/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java @@ -106,6 +106,7 @@ public void open(Map descriptor) throws IOException { setMetastoreUser(conf.get(AUTO_STORAGE_USER_KEY, DEFAULT_AUTO_USER)); setMetastorePassword(conf.get(AUTO_STORAGE_PASS_KEY, DEFAULT_AUTO_PASSWORD)); + setConnectedDescriptor(descriptor); init(); } diff --git a/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbSessionStorage.java b/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbSessionStorage.java index 09d9da9f..5009dc3a 100644 --- a/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbSessionStorage.java +++ b/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbSessionStorage.java @@ -117,6 +117,7 @@ public class HsqldbSessionStorage extends SessionStorage { private static final String SQOOP_TOOL_KEY = "sqoop.tool"; + private Map connectedDescriptor; private String metastoreConnectStr; private String metastoreUser; private String metastorePassword; @@ -144,6 +145,13 @@ protected void setMetastorePassword(String pass) { private static final String DB_DRIVER_CLASS = "org.hsqldb.jdbcDriver"; + /** + * Set the descriptor used to open() this storage. + */ + protected void setConnectedDescriptor(Map descriptor) { + this.connectedDescriptor = descriptor; + } + @Override /** * Initialize the connection to the database. @@ -152,6 +160,7 @@ public void open(Map descriptor) throws IOException { setMetastoreConnectStr(descriptor.get(META_CONNECT_KEY)); setMetastoreUser(descriptor.get(META_USERNAME_KEY)); setMetastorePassword(descriptor.get(META_PASSWORD_KEY)); + setConnectedDescriptor(descriptor); init(); } @@ -293,6 +302,10 @@ public SessionData read(String sessionName) throws IOException { opts.setConf(conf); opts.loadProperties(sqoopOptProps); + // Set the session connection information for this session. + opts.setSessionName(sessionName); + opts.setStorageDescriptor(connectedDescriptor); + return new SessionData(opts, tool); } catch (SQLException sqlE) { throw new IOException("Error communicating with database", sqlE); diff --git a/src/java/com/cloudera/sqoop/orm/ClassWriter.java b/src/java/com/cloudera/sqoop/orm/ClassWriter.java index a9d4ced1..52bcb6e0 100644 --- a/src/java/com/cloudera/sqoop/orm/ClassWriter.java +++ b/src/java/com/cloudera/sqoop/orm/ClassWriter.java @@ -909,6 +909,23 @@ public void generate() throws IOException { colNames = connManager.getColumnNamesForQuery( this.options.getSqlQuery()); } + } else { + // These column names were provided by the user. They may not be in + // the same case as the keys in the columnTypes map. So make sure + // we add the appropriate aliases in that map. + for (String userColName : colNames) { + for (Map.Entry typeEntry : columnTypes.entrySet()) { + String typeColName = typeEntry.getKey(); + if (typeColName.equalsIgnoreCase(userColName) + && !typeColName.equals(userColName)) { + // We found the correct-case equivalent. + columnTypes.put(userColName, typeEntry.getValue()); + // No need to continue iteration; only one could match. + // Also, the use of put() just invalidated the iterator. + break; + } + } + } } // Translate all the column names into names that are safe to diff --git a/src/java/com/cloudera/sqoop/orm/CompilationManager.java b/src/java/com/cloudera/sqoop/orm/CompilationManager.java index d6a6ccf4..4e46c2a1 100644 --- a/src/java/com/cloudera/sqoop/orm/CompilationManager.java +++ b/src/java/com/cloudera/sqoop/orm/CompilationManager.java @@ -23,10 +23,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.net.URL; -import java.net.URLDecoder; import java.util.ArrayList; -import java.util.Enumeration; import java.util.List; import java.util.jar.JarOutputStream; import java.util.zip.ZipEntry; @@ -42,7 +39,6 @@ import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.util.FileListing; -import com.cloudera.sqoop.shims.HadoopShim; import com.cloudera.sqoop.util.Jars; diff --git a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java index 250317db..0320fe3b 100644 --- a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java +++ b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java @@ -114,6 +114,11 @@ public abstract class BaseSqoopTool extends SqoopTool { public static final String HELP_ARG = "help"; public static final String UPDATE_KEY_ARG = "update-key"; + // Arguments for incremental imports. + public static final String INCREMENT_TYPE_ARG = "incremental"; + public static final String INCREMENT_COL_ARG = "check-column"; + public static final String INCREMENT_LAST_VAL_ARG = "last-value"; + // HBase arguments. public static final String HBASE_TABLE_ARG = "hbase-table"; public static final String HBASE_COL_FAM_ARG = "column-family"; diff --git a/src/java/com/cloudera/sqoop/tool/ImportTool.java b/src/java/com/cloudera/sqoop/tool/ImportTool.java index 0ec5310d..6b298acc 100644 --- a/src/java/com/cloudera/sqoop/tool/ImportTool.java +++ b/src/java/com/cloudera/sqoop/tool/ImportTool.java @@ -19,7 +19,16 @@ package com.cloudera.sqoop.tool; import java.io.IOException; + +import java.math.BigDecimal; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; import java.util.List; +import java.util.Map; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.OptionBuilder; @@ -35,6 +44,10 @@ import com.cloudera.sqoop.cli.ToolOptions; import com.cloudera.sqoop.hive.HiveImport; import com.cloudera.sqoop.manager.ImportJobContext; + +import com.cloudera.sqoop.metastore.SessionData; +import com.cloudera.sqoop.metastore.SessionStorage; +import com.cloudera.sqoop.metastore.SessionStorageFactory; import com.cloudera.sqoop.util.AppendUtils; import com.cloudera.sqoop.util.ImportException; import org.apache.hadoop.fs.Path; @@ -76,8 +89,239 @@ protected boolean init(SqoopOptions sqoopOpts) { public List getGeneratedJarFiles() { return this.codeGenerator.getGeneratedJarFiles(); } + + /** + * @return true if the supplied options specify an incremental import. + */ + private boolean isIncremental(SqoopOptions options) { + return !options.getIncrementalMode().equals( + SqoopOptions.IncrementalMode.None); + } - protected void importTable(SqoopOptions options, String tableName, + /** + * If this is an incremental import, then we should save the + * user's state back to the metastore (if this session was run + * from the metastore). Otherwise, log to the user what data + * they need to supply next time. + */ + private void saveIncrementalState(SqoopOptions options) + throws IOException { + if (!isIncremental(options)) { + return; + } + + Map descriptor = options.getStorageDescriptor(); + String sessionName = options.getSessionName(); + + if (null != sessionName && null != descriptor) { + // Actually save it back to the metastore. + LOG.info("Saving incremental import state to the metastore"); + SessionStorageFactory ssf = new SessionStorageFactory(options.getConf()); + SessionStorage storage = ssf.getSessionStorage(descriptor); + storage.open(descriptor); + try { + // Save the 'parent' SqoopOptions; this does not contain the mutations + // to the SqoopOptions state that occurred over the course of this + // execution, except for the one we specifically want to memorize: + // the latest value of the check column. + SessionData data = new SessionData(options.getParent(), this); + storage.update(sessionName, data); + LOG.info("Updated data for session: " + sessionName); + } finally { + storage.close(); + } + } else { + // If there wasn't a parent SqoopOptions, then the incremental + // state data was stored in the current SqoopOptions. + LOG.info("Incremental import complete! To run another incremental " + + "import of all data following this import, supply the " + + "following arguments:"); + SqoopOptions.IncrementalMode incrementalMode = + options.getIncrementalMode(); + switch (incrementalMode) { + case AppendRows: + LOG.info(" --incremental append"); + break; + case DateLastModified: + LOG.info(" --incremental lastmodified"); + break; + default: + LOG.warn("Undefined incremental mode: " + incrementalMode); + break; + } + LOG.info(" --check-column " + options.getIncrementalTestColumn()); + LOG.info(" --last-value " + options.getIncrementalLastValue()); + LOG.info("(Consider saving this with 'sqoop session --create')"); + } + } + + /** + * Return the max value in the incremental-import test column. This + * value must be numeric. + */ + private BigDecimal getMaxColumnId(SqoopOptions options) throws SQLException { + StringBuilder sb = new StringBuilder(); + sb.append("SELECT MAX("); + sb.append(options.getIncrementalTestColumn()); + sb.append(") FROM "); + sb.append(options.getTableName()); + + String where = options.getWhereClause(); + if (null != where) { + sb.append(" WHERE "); + sb.append(where); + } + + Connection conn = manager.getConnection(); + Statement s = null; + ResultSet rs = null; + try { + s = conn.createStatement(); + rs = s.executeQuery(sb.toString()); + if (!rs.next()) { + // This probably means the table is empty. + LOG.warn("Unexpected: empty results for max value query?"); + return null; + } + + return rs.getBigDecimal(1); + } finally { + try { + if (null != rs) { + rs.close(); + } + } catch (SQLException sqlE) { + LOG.warn("SQL Exception closing resultset: " + sqlE); + } + + try { + if (null != s) { + s.close(); + } + } catch (SQLException sqlE) { + LOG.warn("SQL Exception closing statement: " + sqlE); + } + } + } + + /** + * Initialize the constraints which set the incremental import range. + * @return false if an import is not necessary, because the dataset has not + * changed. + */ + private boolean initIncrementalConstraints(SqoopOptions options, + ImportJobContext context) throws ImportException, IOException { + + // If this is an incremental import, determine the constraints + // to inject in the WHERE clause or $CONDITIONS for a query. + // Also modify the 'last value' field of the SqoopOptions to + // specify the current job start time / start row. + + if (!isIncremental(options)) { + return true; + } + + SqoopOptions.IncrementalMode incrementalMode = options.getIncrementalMode(); + String nextIncrementalValue = null; + + switch (incrementalMode) { + case AppendRows: + try { + BigDecimal nextVal = getMaxColumnId(options); + if (null != nextVal) { + nextIncrementalValue = nextVal.toString(); + } + } catch (SQLException sqlE) { + throw new IOException(sqlE); + } + break; + case DateLastModified: + Timestamp dbTimestamp = manager.getCurrentDbTimestamp(); + if (null == dbTimestamp) { + throw new IOException("Could not get current time from database"); + } + + nextIncrementalValue = manager.timestampToQueryString(dbTimestamp); + break; + default: + throw new ImportException("Undefined incremental import type: " + + incrementalMode); + } + + // Build the WHERE clause components that are used to import + // only this incremental section. + StringBuilder sb = new StringBuilder(); + String prevEndpoint = options.getIncrementalLastValue(); + + String checkColName = manager.escapeColName( + options.getIncrementalTestColumn()); + LOG.info("Incremental import based on column " + checkColName); + if (null != prevEndpoint) { + if (prevEndpoint.equals(nextIncrementalValue)) { + LOG.info("No new rows detected since last import."); + return false; + } + LOG.info("Lower bound value: " + prevEndpoint); + sb.append(checkColName); + switch (incrementalMode) { + case AppendRows: + sb.append(" > "); + break; + case DateLastModified: + sb.append(" >= "); + break; + default: + throw new ImportException("Undefined comparison"); + } + sb.append(prevEndpoint); + sb.append(" AND "); + } + + if (null != nextIncrementalValue) { + sb.append(checkColName); + switch (incrementalMode) { + case AppendRows: + sb.append(" <= "); + break; + case DateLastModified: + sb.append(" < "); + break; + default: + throw new ImportException("Undefined comparison"); + } + sb.append(nextIncrementalValue); + } else { + sb.append(checkColName); + sb.append(" IS NULL "); + } + + LOG.info("Upper bound value: " + nextIncrementalValue); + + String prevWhereClause = options.getWhereClause(); + if (null != prevWhereClause) { + sb.append(" AND ("); + sb.append(prevWhereClause); + sb.append(")"); + } + + String newConstraints = sb.toString(); + options.setWhereClause(newConstraints); + + // Save this state for next time. + SqoopOptions recordOptions = options.getParent(); + if (null == recordOptions) { + recordOptions = options; + } + recordOptions.setIncrementalLastValue(nextIncrementalValue); + + return true; + } + + /** + * Import a table or query. + * @return true if an import was performed, false otherwise. + */ + protected boolean importTable(SqoopOptions options, String tableName, HiveImport hiveImport) throws IOException, ImportException { String jarFile = null; @@ -88,6 +332,12 @@ protected void importTable(SqoopOptions options, String tableName, ImportJobContext context = new ImportJobContext(tableName, jarFile, options, getOutputPath(options, tableName)); + // If we're doing an incremental import, set up the + // filtering conditions used to get the latest records. + if (!initIncrementalConstraints(options, context)) { + return false; + } + if (null != tableName) { manager.importTable(context); } else { @@ -103,6 +353,10 @@ protected void importTable(SqoopOptions options, String tableName, if (options.doHiveImport()) { hiveImport.importTable(tableName, options.getHiveTableName(), false); } + + saveIncrementalState(options); + + return true; } /** @@ -264,12 +518,42 @@ protected RelatedOptions getImportOptions() { return importOpts; } + /** + * Return options for incremental import. + */ + protected RelatedOptions getIncrementalOptions() { + RelatedOptions incrementalOpts = + new RelatedOptions("Incremental import arguments"); + + incrementalOpts.addOption(OptionBuilder.withArgName("import-type") + .hasArg() + .withDescription( + "Define an incremental import of type 'append' or 'lastmodified'") + .withLongOpt(INCREMENT_TYPE_ARG) + .create()); + incrementalOpts.addOption(OptionBuilder.withArgName("column") + .hasArg() + .withDescription("Source column to check for incremental change") + .withLongOpt(INCREMENT_COL_ARG) + .create()); + incrementalOpts.addOption(OptionBuilder.withArgName("value") + .hasArg() + .withDescription("Last imported value in the incremental check column") + .withLongOpt(INCREMENT_LAST_VAL_ARG) + .create()); + + return incrementalOpts; + } + @Override /** Configure the command-line arguments we expect to receive */ public void configureOptions(ToolOptions toolOptions) { toolOptions.addUniqueOptions(getCommonOptions()); toolOptions.addUniqueOptions(getImportOptions()); + if (!allTables) { + toolOptions.addUniqueOptions(getIncrementalOptions()); + } toolOptions.addUniqueOptions(getOutputFormatOptions()); toolOptions.addUniqueOptions(getInputFormatOptions()); toolOptions.addUniqueOptions(getHiveOptions(true)); @@ -306,6 +590,32 @@ public void printHelp(ToolOptions toolOptions) { "after a '--' on the command line."); } + private void applyIncrementalOptions(CommandLine in, SqoopOptions out) + throws InvalidOptionsException { + if (in.hasOption(INCREMENT_TYPE_ARG)) { + String incrementalTypeStr = in.getOptionValue(INCREMENT_TYPE_ARG); + if ("append".equals(incrementalTypeStr)) { + out.setIncrementalMode(SqoopOptions.IncrementalMode.AppendRows); + // This argument implies ability to append to the same directory. + out.setAppendMode(true); + } else if ("lastmodified".equals(incrementalTypeStr)) { + out.setIncrementalMode(SqoopOptions.IncrementalMode.DateLastModified); + } else { + throw new InvalidOptionsException("Unknown incremental import mode: " + + incrementalTypeStr + ". Use 'append' or 'lastmodified'." + + HELP_STR); + } + } + + if (in.hasOption(INCREMENT_COL_ARG)) { + out.setIncrementalTestColumn(in.getOptionValue(INCREMENT_COL_ARG)); + } + + if (in.hasOption(INCREMENT_LAST_VAL_ARG)) { + out.setIncrementalLastValue(in.getOptionValue(INCREMENT_LAST_VAL_ARG)); + } + } + @Override /** {@inheritDoc} */ public void applyOptions(CommandLine in, SqoopOptions out) @@ -382,6 +692,7 @@ public void applyOptions(CommandLine in, SqoopOptions out) out.setExistingJarName(in.getOptionValue(JAR_FILE_NAME_ARG)); } + applyIncrementalOptions(in, out); applyHiveOptions(in, out); applyOutputFormatOptions(in, out); applyInputFormatOptions(in, out); @@ -437,6 +748,32 @@ protected void validateImportOptions(SqoopOptions options) } } + /** + * Validate the incremental import options. + */ + private void validateIncrementalOptions(SqoopOptions options) + throws InvalidOptionsException { + if (options.getIncrementalMode() != SqoopOptions.IncrementalMode.None + && options.getIncrementalTestColumn() == null) { + throw new InvalidOptionsException( + "For an incremental import, the check column must be specified " + + "with --" + INCREMENT_COL_ARG + ". " + HELP_STR); + } + + if (options.getIncrementalMode() == SqoopOptions.IncrementalMode.None + && options.getIncrementalTestColumn() != null) { + throw new InvalidOptionsException( + "You must specify an incremental import mode with --" + + INCREMENT_TYPE_ARG + ". " + HELP_STR); + } + + if (options.getIncrementalMode() != SqoopOptions.IncrementalMode.None + && options.getTableName() == null) { + throw new InvalidOptionsException("Incremental imports require a table." + + HELP_STR); + } + } + @Override /** {@inheritDoc} */ public void validateOptions(SqoopOptions options) @@ -452,6 +789,7 @@ public void validateOptions(SqoopOptions options) } validateImportOptions(options); + validateIncrementalOptions(options); validateCommonOptions(options); validateCodeGenOptions(options); validateOutputFormatOptions(options); diff --git a/src/java/com/cloudera/sqoop/tool/SessionTool.java b/src/java/com/cloudera/sqoop/tool/SessionTool.java index af6acc4a..d395a0b1 100644 --- a/src/java/com/cloudera/sqoop/tool/SessionTool.java +++ b/src/java/com/cloudera/sqoop/tool/SessionTool.java @@ -209,6 +209,12 @@ private int execSession(SqoopOptions opts) throws IOException { SqoopOptions childOpts = data.getSqoopOptions(); SqoopTool childTool = data.getSqoopTool(); + // Don't overwrite the original SqoopOptions with the + // arguments; make a child options. + + SqoopOptions clonedOpts = (SqoopOptions) childOpts.clone(); + clonedOpts.setParent(childOpts); + int dashPos = getDashPosition(extraArguments); String [] childArgv; if (dashPos >= extraArguments.length) { @@ -218,13 +224,13 @@ private int execSession(SqoopOptions opts) throws IOException { extraArguments.length); } - int confRet = configureChildTool(childOpts, childTool, childArgv); + int confRet = configureChildTool(clonedOpts, childTool, childArgv); if (0 != confRet) { // Error. return confRet; } - return childTool.run(childOpts); + return childTool.run(clonedOpts); } private int showSession(SqoopOptions opts) throws IOException { diff --git a/src/test/com/cloudera/sqoop/AllTests.java b/src/test/com/cloudera/sqoop/AllTests.java index 3d82a242..7bff1e0f 100644 --- a/src/test/com/cloudera/sqoop/AllTests.java +++ b/src/test/com/cloudera/sqoop/AllTests.java @@ -38,6 +38,7 @@ public static Test suite() { suite.addTest(ThirdPartyTests.suite()); suite.addTestSuite(TestHBaseImport.class); suite.addTestSuite(TestHBaseQueryImport.class); + suite.addTestSuite(TestIncrementalImport.class); return suite; } diff --git a/src/test/com/cloudera/sqoop/TestIncrementalImport.java b/src/test/com/cloudera/sqoop/TestIncrementalImport.java new file mode 100644 index 00000000..10a7231f --- /dev/null +++ b/src/test/com/cloudera/sqoop/TestIncrementalImport.java @@ -0,0 +1,790 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop; + +import java.io.BufferedReader; +import java.io.InputStreamReader; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.StringUtils; + +import com.cloudera.sqoop.manager.ConnManager; +import com.cloudera.sqoop.manager.HsqldbManager; +import com.cloudera.sqoop.manager.ManagerFactory; +import com.cloudera.sqoop.metastore.TestSessions; +import com.cloudera.sqoop.testutil.BaseSqoopTestCase; +import com.cloudera.sqoop.testutil.CommonArgs; +import com.cloudera.sqoop.tool.ImportTool; +import com.cloudera.sqoop.tool.SessionTool; + +import junit.framework.TestCase; + +import java.sql.Connection; + +/** + * Test the incremental import functionality. + * + * These all make use of the auto-connect hsqldb-based metastore. + * The metastore URL is configured to be in-memory, and drop all + * state between individual tests. + */ +public class TestIncrementalImport extends TestCase { + + public static final Log LOG = LogFactory.getLog( + TestIncrementalImport.class.getName()); + + // What database do we read from. + public static final String SOURCE_DB_URL = "jdbc:hsqldb:mem:incremental"; + + @Override + public void setUp() throws Exception { + // Delete db state between tests. + TestSessions.resetSessionSchema(); + resetSourceDataSchema(); + } + + public static void resetSourceDataSchema() throws SQLException { + SqoopOptions options = new SqoopOptions(); + options.setConnectString(SOURCE_DB_URL); + TestSessions.resetSchema(options); + } + + public static Configuration newConf() { + return TestSessions.newConf(); + } + + /** + * Assert that a table has a specified number of rows. + */ + private void assertRowCount(String table, int numRows) throws SQLException { + SqoopOptions options = new SqoopOptions(); + options.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options); + Connection c = manager.getConnection(); + PreparedStatement s = null; + ResultSet rs = null; + try { + s = c.prepareStatement("SELECT COUNT(*) FROM " + table); + rs = s.executeQuery(); + if (!rs.next()) { + fail("No resultset"); + } + int realNumRows = rs.getInt(1); + assertEquals(numRows, realNumRows); + LOG.info("Expected " + numRows + " rows -- ok."); + } finally { + if (null != s) { + try { + s.close(); + } catch (SQLException sqlE) { + LOG.warn("exception: " + sqlE); + } + } + + if (null != rs) { + try { + rs.close(); + } catch (SQLException sqlE) { + LOG.warn("exception: " + sqlE); + } + } + } + } + + /** + * Insert rows with id = [low, hi) into tableName. + */ + private void insertIdRows(String tableName, int low, int hi) + throws SQLException { + SqoopOptions options = new SqoopOptions(); + options.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options); + Connection c = manager.getConnection(); + PreparedStatement s = null; + try { + s = c.prepareStatement("INSERT INTO " + tableName + " VALUES(?)"); + for (int i = low; i < hi; i++) { + s.setInt(1, i); + s.executeUpdate(); + } + + c.commit(); + } finally { + s.close(); + } + } + + /** + * Insert rows with id = [low, hi) into tableName with + * the timestamp column set to the specified ts. + */ + private void insertIdTimestampRows(String tableName, int low, int hi, + Timestamp ts) throws SQLException { + LOG.info("Inserting id rows in [" + low + ", " + hi + ") @ " + ts); + SqoopOptions options = new SqoopOptions(); + options.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options); + Connection c = manager.getConnection(); + PreparedStatement s = null; + try { + s = c.prepareStatement("INSERT INTO " + tableName + " VALUES(?,?)"); + for (int i = low; i < hi; i++) { + s.setInt(1, i); + s.setTimestamp(2, ts); + s.executeUpdate(); + } + + c.commit(); + } finally { + s.close(); + } + } + + /** + * Create a table with an 'id' column full of integers. + */ + private void createIdTable(String tableName, int insertRows) + throws SQLException { + SqoopOptions options = new SqoopOptions(); + options.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options); + Connection c = manager.getConnection(); + PreparedStatement s = null; + try { + s = c.prepareStatement("CREATE TABLE " + tableName + "(id INT NOT NULL)"); + s.executeUpdate(); + c.commit(); + insertIdRows(tableName, 0, insertRows); + } finally { + s.close(); + } + } + + /** + * Create a table with an 'id' column full of integers and a + * last_modified column with timestamps. + */ + private void createTimestampTable(String tableName, int insertRows, + Timestamp baseTime) throws SQLException { + SqoopOptions options = new SqoopOptions(); + options.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options); + Connection c = manager.getConnection(); + PreparedStatement s = null; + try { + s = c.prepareStatement("CREATE TABLE " + tableName + "(id INT NOT NULL, " + + "last_modified TIMESTAMP)"); + s.executeUpdate(); + c.commit(); + insertIdTimestampRows(tableName, 0, insertRows, baseTime); + } finally { + s.close(); + } + } + + /** + * Delete all files in a directory for a table. + */ + public void clearDir(String tableName) { + try { + FileSystem fs = FileSystem.getLocal(new Configuration()); + Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); + Path tableDir = new Path(warehouse, tableName); + fs.delete(tableDir, true); + } catch (Exception e) { + fail("Got unexpected exception: " + StringUtils.stringifyException(e)); + } + } + + /** + * Look at a directory that should contain files full of an imported 'id' + * column. Assert that all numbers in [0, expectedNums) are present + * in order. + */ + public void assertDirOfNumbers(String tableName, int expectedNums) { + try { + FileSystem fs = FileSystem.getLocal(new Configuration()); + Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); + Path tableDir = new Path(warehouse, tableName); + FileStatus [] stats = fs.listStatus(tableDir); + String [] fileNames = new String[stats.length]; + for (int i = 0; i < stats.length; i++) { + fileNames[i] = stats[i].getPath().toString(); + } + + Arrays.sort(fileNames); + + // Read all the files in sorted order, adding the value lines to the list. + List receivedNums = new ArrayList(); + for (String fileName : fileNames) { + if (fileName.startsWith("_") || fileName.startsWith(".")) { + continue; + } + + BufferedReader r = new BufferedReader( + new InputStreamReader(fs.open(new Path(fileName)))); + try { + while (true) { + String s = r.readLine(); + if (null == s) { + break; + } + + receivedNums.add(s.trim()); + } + } finally { + r.close(); + } + } + + assertEquals(expectedNums, receivedNums.size()); + + // Compare the received values with the expected set. + for (int i = 0; i < expectedNums; i++) { + assertEquals((int) i, (int) Integer.valueOf(receivedNums.get(i))); + } + } catch (Exception e) { + fail("Got unexpected exception: " + StringUtils.stringifyException(e)); + } + } + + /** + * Assert that a directory contains a file with exactly one line + * in it, containing the prescribed number 'val'. + */ + public void assertSpecificNumber(String tableName, int val) { + try { + FileSystem fs = FileSystem.getLocal(new Configuration()); + Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); + Path tableDir = new Path(warehouse, tableName); + FileStatus [] stats = fs.listStatus(tableDir); + String [] fileNames = new String[stats.length]; + for (int i = 0; i < stats.length; i++) { + fileNames[i] = stats[i].getPath().toString(); + } + + // Read the first file that is not a hidden file. + boolean foundVal = false; + for (String fileName : fileNames) { + if (fileName.startsWith("_") || fileName.startsWith(".")) { + continue; + } + + if (foundVal) { + // Make sure we don't have two or more "real" files in the dir. + fail("Got an extra data-containing file in this directory."); + } + + BufferedReader r = new BufferedReader( + new InputStreamReader(fs.open(new Path(fileName)))); + try { + String s = r.readLine(); + if (null == s) { + fail("Unexpected empty file " + fileName + "."); + } + assertEquals(val, (int) Integer.valueOf(s.trim())); + + String nextLine = r.readLine(); + if (nextLine != null) { + fail("Expected only one result, but got another line: " + nextLine); + } + + // Successfully got the value we were looking for. + foundVal = true; + } finally { + r.close(); + } + } + } catch (Exception e) { + fail("Got unexpected exception: " + StringUtils.stringifyException(e)); + } + } + + public void runImport(SqoopOptions options, List args) { + try { + Sqoop importer = new Sqoop(new ImportTool(), options.getConf(), options); + int ret = Sqoop.runSqoop(importer, args.toArray(new String[0])); + assertEquals("Failure during job", 0, ret); + } catch (Exception e) { + LOG.error("Got exception running Sqoop: " + + StringUtils.stringifyException(e)); + throw new RuntimeException(e); + } + } + + /** + * Return a list of arguments to import the specified table. + */ + private List getArgListForTable(String tableName, boolean commonArgs, + boolean isAppend) { + List args = new ArrayList(); + if (commonArgs) { + CommonArgs.addHadoopFlags(args); + } + args.add("--connect"); + args.add(SOURCE_DB_URL); + args.add("--table"); + args.add(tableName); + args.add("--warehouse-dir"); + args.add(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); + if (isAppend) { + args.add("--incremental"); + args.add("append"); + args.add("--check-column"); + args.add("id"); + } else { + args.add("--incremental"); + args.add("lastmodified"); + args.add("--check-column"); + args.add("last_modified"); + } + args.add("--columns"); + args.add("id"); + args.add("-m"); + args.add("1"); + + return args; + } + + /** + * Create a session with the specified name, where the session performs + * an import configured with 'sessionArgs'. + */ + private void createSession(String sessionName, List sessionArgs) { + createSession(sessionName, sessionArgs, newConf()); + } + + /** + * Create a session with the specified name, where the session performs + * an import configured with 'sessionArgs', using the provided configuration + * as defaults. + */ + private void createSession(String sessionName, List sessionArgs, + Configuration conf) { + try { + SqoopOptions options = new SqoopOptions(); + options.setConf(conf); + Sqoop makeSession = new Sqoop(new SessionTool(), conf, options); + + List args = new ArrayList(); + args.add("--create"); + args.add(sessionName); + args.add("--"); + args.add("import"); + args.addAll(sessionArgs); + + int ret = Sqoop.runSqoop(makeSession, args.toArray(new String[0])); + assertEquals("Failure during job to create session", 0, ret); + } catch (Exception e) { + LOG.error("Got exception running Sqoop to create session: " + + StringUtils.stringifyException(e)); + throw new RuntimeException(e); + } + } + + /** + * Run the specified session. + */ + private void runSession(String sessionName) { + runSession(sessionName, newConf()); + } + + /** + * Run the specified session. + */ + private void runSession(String sessionName, Configuration conf) { + try { + SqoopOptions options = new SqoopOptions(); + options.setConf(conf); + Sqoop runSession = new Sqoop(new SessionTool(), conf, options); + + List args = new ArrayList(); + args.add("--exec"); + args.add(sessionName); + + int ret = Sqoop.runSqoop(runSession, args.toArray(new String[0])); + assertEquals("Failure during job to run session", 0, ret); + } catch (Exception e) { + LOG.error("Got exception running Sqoop to run session: " + + StringUtils.stringifyException(e)); + throw new RuntimeException(e); + } + } + + // Incremental import of an empty table, no metastore. + public void testEmptyAppendImport() throws Exception { + final String TABLE_NAME = "emptyAppend1"; + createIdTable(TABLE_NAME, 0); + List args = getArgListForTable(TABLE_NAME, true, true); + + Configuration conf = newConf(); + SqoopOptions options = new SqoopOptions(); + options.setConf(conf); + runImport(options, args); + + assertDirOfNumbers(TABLE_NAME, 0); + } + + // Incremental import of a filled table, no metastore. + public void testFullAppendImport() throws Exception { + final String TABLE_NAME = "fullAppend1"; + createIdTable(TABLE_NAME, 10); + List args = getArgListForTable(TABLE_NAME, true, true); + + Configuration conf = newConf(); + SqoopOptions options = new SqoopOptions(); + options.setConf(conf); + runImport(options, args); + + assertDirOfNumbers(TABLE_NAME, 10); + } + + public void testEmptySessionAppend() throws Exception { + // Create a session and run an import on an empty table. + // Nothing should happen. + + final String TABLE_NAME = "emptySession"; + createIdTable(TABLE_NAME, 0); + + List args = getArgListForTable(TABLE_NAME, false, true); + createSession("emptySession", args); + runSession("emptySession"); + assertDirOfNumbers(TABLE_NAME, 0); + + // Running the session a second time should result in + // nothing happening, it's still empty. + runSession("emptySession"); + assertDirOfNumbers(TABLE_NAME, 0); + } + + public void testEmptyThenFullSessionAppend() throws Exception { + // Create an empty table. Import it; nothing happens. + // Add some rows. Verify they are appended. + + final String TABLE_NAME = "emptyThenFull"; + createIdTable(TABLE_NAME, 0); + + List args = getArgListForTable(TABLE_NAME, false, true); + createSession(TABLE_NAME, args); + runSession(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 0); + + // Now add some rows. + insertIdRows(TABLE_NAME, 0, 10); + + // Running the session a second time should import 10 rows. + runSession(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 10); + + // Add some more rows. + insertIdRows(TABLE_NAME, 10, 20); + + // Import only those rows. + runSession(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 20); + } + + public void testAppend() throws Exception { + // Create a table with data in it; import it. + // Then add more data, verify that only the incremental data is pulled. + + final String TABLE_NAME = "append"; + createIdTable(TABLE_NAME, 10); + + List args = getArgListForTable(TABLE_NAME, false, true); + createSession(TABLE_NAME, args); + runSession(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 10); + + // Add some more rows. + insertIdRows(TABLE_NAME, 10, 20); + + // Import only those rows. + runSession(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 20); + } + + public void testEmptyLastModified() throws Exception { + final String TABLE_NAME = "emptyLastModified"; + createTimestampTable(TABLE_NAME, 0, null); + List args = getArgListForTable(TABLE_NAME, true, false); + + Configuration conf = newConf(); + SqoopOptions options = new SqoopOptions(); + options.setConf(conf); + runImport(options, args); + + assertDirOfNumbers(TABLE_NAME, 0); + } + + public void testFullLastModifiedImport() throws Exception { + // Given a table of rows imported in the past, + // see that they are imported. + final String TABLE_NAME = "fullLastModified"; + Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100); + createTimestampTable(TABLE_NAME, 10, thePast); + + List args = getArgListForTable(TABLE_NAME, true, false); + + Configuration conf = newConf(); + SqoopOptions options = new SqoopOptions(); + options.setConf(conf); + runImport(options, args); + + assertDirOfNumbers(TABLE_NAME, 10); + } + + public void testNoImportFromTheFuture() throws Exception { + // If last-modified dates for writes are serialized to be in the + // future w.r.t. an import, do not import these rows. + + final String TABLE_NAME = "futureLastModified"; + Timestamp theFuture = new Timestamp(System.currentTimeMillis() + 1000000); + createTimestampTable(TABLE_NAME, 10, theFuture); + + List args = getArgListForTable(TABLE_NAME, true, false); + + Configuration conf = newConf(); + SqoopOptions options = new SqoopOptions(); + options.setConf(conf); + runImport(options, args); + + assertDirOfNumbers(TABLE_NAME, 0); + } + + public void testEmptySessionLastMod() throws Exception { + // Create a session and run an import on an empty table. + // Nothing should happen. + + final String TABLE_NAME = "emptySessionLastMod"; + createTimestampTable(TABLE_NAME, 0, null); + + List args = getArgListForTable(TABLE_NAME, false, false); + args.add("--append"); + createSession("emptySessionLastMod", args); + runSession("emptySessionLastMod"); + assertDirOfNumbers(TABLE_NAME, 0); + + // Running the session a second time should result in + // nothing happening, it's still empty. + runSession("emptySessionLastMod"); + assertDirOfNumbers(TABLE_NAME, 0); + } + + public void testEmptyThenFullSessionLastMod() throws Exception { + // Create an empty table. Import it; nothing happens. + // Add some rows. Verify they are appended. + + final String TABLE_NAME = "emptyThenFullTimestamp"; + createTimestampTable(TABLE_NAME, 0, null); + + List args = getArgListForTable(TABLE_NAME, false, false); + args.add("--append"); + createSession(TABLE_NAME, args); + runSession(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 0); + + long importWasBefore = System.currentTimeMillis(); + + // Let some time elapse. + Thread.sleep(50); + + long rowsAddedTime = System.currentTimeMillis() - 5; + + // Check: we are adding rows after the previous import time + // and before the current time. + assertTrue(rowsAddedTime > importWasBefore); + assertTrue(rowsAddedTime < System.currentTimeMillis()); + + insertIdTimestampRows(TABLE_NAME, 0, 10, new Timestamp(rowsAddedTime)); + + // Running the session a second time should import 10 rows. + runSession(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 10); + + // Add some more rows. + importWasBefore = System.currentTimeMillis(); + Thread.sleep(50); + rowsAddedTime = System.currentTimeMillis() - 5; + assertTrue(rowsAddedTime > importWasBefore); + assertTrue(rowsAddedTime < System.currentTimeMillis()); + insertIdTimestampRows(TABLE_NAME, 10, 20, new Timestamp(rowsAddedTime)); + + // Import only those rows. + runSession(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 20); + } + + public void testAppendWithTimestamp() throws Exception { + // Create a table with data in it; import it. + // Then add more data, verify that only the incremental data is pulled. + + final String TABLE_NAME = "appendTimestamp"; + Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100); + createTimestampTable(TABLE_NAME, 10, thePast); + + List args = getArgListForTable(TABLE_NAME, false, false); + args.add("--append"); + createSession(TABLE_NAME, args); + runSession(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 10); + + // Add some more rows. + long importWasBefore = System.currentTimeMillis(); + Thread.sleep(50); + long rowsAddedTime = System.currentTimeMillis() - 5; + assertTrue(rowsAddedTime > importWasBefore); + assertTrue(rowsAddedTime < System.currentTimeMillis()); + insertIdTimestampRows(TABLE_NAME, 10, 20, new Timestamp(rowsAddedTime)); + + // Import only those rows. + runSession(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 20); + } + + public void testModifyWithTimestamp() throws Exception { + // Create a table with data in it; import it. + // Then modify some existing rows, and verify that we only grab + // those rows. + + final String TABLE_NAME = "modifyTimestamp"; + Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100); + createTimestampTable(TABLE_NAME, 10, thePast); + + List args = getArgListForTable(TABLE_NAME, false, false); + createSession(TABLE_NAME, args); + runSession(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 10); + + // Modify a row. + long importWasBefore = System.currentTimeMillis(); + Thread.sleep(50); + long rowsAddedTime = System.currentTimeMillis() - 5; + assertTrue(rowsAddedTime > importWasBefore); + assertTrue(rowsAddedTime < System.currentTimeMillis()); + SqoopOptions options = new SqoopOptions(); + options.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options); + Connection c = manager.getConnection(); + PreparedStatement s = null; + try { + s = c.prepareStatement("UPDATE " + TABLE_NAME + + " SET id=?, last_modified=? WHERE id=?"); + s.setInt(1, 4000); // the first row should have '4000' in it now. + s.setTimestamp(2, new Timestamp(rowsAddedTime)); + s.setInt(3, 0); + s.executeUpdate(); + c.commit(); + } finally { + s.close(); + } + + // Import only the new row. + clearDir(TABLE_NAME); + runSession(TABLE_NAME); + assertSpecificNumber(TABLE_NAME, 4000); + } + + /** + * ManagerFactory returning an HSQLDB ConnManager which allows you to + * specify the current database timestamp. + */ + public static class InstrumentHsqldbManagerFactory extends ManagerFactory { + @Override + public ConnManager accept(SqoopOptions options) { + LOG.info("Using instrumented manager"); + return new InstrumentHsqldbManager(options); + } + } + + /** + * Hsqldb ConnManager that lets you set the current reported timestamp + * from the database, to allow testing of boundary conditions for imports. + */ + public static class InstrumentHsqldbManager extends HsqldbManager { + private static Timestamp curTimestamp; + + public InstrumentHsqldbManager(SqoopOptions options) { + super(options); + } + + @Override + public Timestamp getCurrentDbTimestamp() { + return InstrumentHsqldbManager.curTimestamp; + } + + public static void setCurrentDbTimestamp(Timestamp t) { + InstrumentHsqldbManager.curTimestamp = t; + } + } + + public void testTimestampBoundary() throws Exception { + // Run an import, and then insert rows with the last-modified timestamp + // set to the exact time when the first import runs. Run a second import + // and ensure that we pick up the new data. + + long now = System.currentTimeMillis(); + + final String TABLE_NAME = "boundaryTimestamp"; + Timestamp thePast = new Timestamp(now - 100); + createTimestampTable(TABLE_NAME, 10, thePast); + + Timestamp firstJobTime = new Timestamp(now); + InstrumentHsqldbManager.setCurrentDbTimestamp(firstJobTime); + + // Configure the job to use the instrumented Hsqldb manager. + Configuration conf = newConf(); + conf.set(ConnFactory.FACTORY_CLASS_NAMES_KEY, + InstrumentHsqldbManagerFactory.class.getName()); + + List args = getArgListForTable(TABLE_NAME, false, false); + args.add("--append"); + createSession(TABLE_NAME, args, conf); + runSession(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 10); + + // Add some more rows with the timestamp equal to the job run timestamp. + insertIdTimestampRows(TABLE_NAME, 10, 20, firstJobTime); + assertRowCount(TABLE_NAME, 20); + + // Run a second job with the clock advanced by 100 ms. + Timestamp secondJobTime = new Timestamp(now + 100); + InstrumentHsqldbManager.setCurrentDbTimestamp(secondJobTime); + + // Import only those rows. + runSession(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 20); + } +} + diff --git a/src/test/com/cloudera/sqoop/metastore/TestSessions.java b/src/test/com/cloudera/sqoop/metastore/TestSessions.java index b4c3fac4..8d1e853f 100644 --- a/src/test/com/cloudera/sqoop/metastore/TestSessions.java +++ b/src/test/com/cloudera/sqoop/metastore/TestSessions.java @@ -46,7 +46,8 @@ */ public class TestSessions extends TestCase { - public static final String TEST_AUTOCONNECT_URL = "jdbc:hsqldb:mem:testdb"; + public static final String TEST_AUTOCONNECT_URL = + "jdbc:hsqldb:mem:sqoopmetastore"; public static final String TEST_AUTOCONNECT_USER = "SA"; public static final String TEST_AUTOCONNECT_PASS = ""; @@ -62,6 +63,13 @@ public static void resetSessionSchema() throws SQLException { options.setUsername(TEST_AUTOCONNECT_USER); options.setPassword(TEST_AUTOCONNECT_PASS); + resetSchema(options); + } + + /** + * Drop all tables in the configured HSQLDB-based schema/user/pass. + */ + public static void resetSchema(SqoopOptions options) throws SQLException { HsqldbManager manager = new HsqldbManager(options); Connection c = manager.getConnection(); Statement s = c.createStatement();