diff --git a/src/docs/man/sqoop-export.txt b/src/docs/man/sqoop-export.txt index 50052cc1..d53cf97f 100644 --- a/src/docs/man/sqoop-export.txt +++ b/src/docs/man/sqoop-export.txt @@ -43,6 +43,12 @@ Export control options --update-key (col-name):: Anchor column to use for updates +--update-mode (mode):: + Specify how updates are performed when new rows are found with non-matching keys + in database. By default, "mode" is +updateonly+, in which case new rows are + silently ignored. Alternatively, "mode" can be +allowinsert+, in which case + new rows are inserted instead. + --input-null-string:: The string to be interpreted as null for string columns diff --git a/src/docs/user/export.txt b/src/docs/user/export.txt index 4401c265..987b0cda 100644 --- a/src/docs/user/export.txt +++ b/src/docs/user/export.txt @@ -52,6 +52,12 @@ Argument Description parallel +\--table + Table to populate +\--update-key + Anchor column to use for updates ++\--update-mode + Specify how updates are performed\ + when new rows are found with\ + non-matching keys in database. + Legal values for +mode+ include\ + +updateonly+ (default) and\ + +allowinsert+. +\--input-null-string + The string to be interpreted as\ null for string columns +\--input-null-non-string + The string to be interpreted as\ @@ -169,6 +175,10 @@ Likewise, if the column specified with +\--update-key+ does not uniquely identify rows and multiple rows are updated by a single statement, this condition is also undetected. +Depending on the target database, you may also specify the +\--update-mode+ +argument with +allowinsert+ mode if you want to update rows if they exist +in the database already or insert rows if they do not exist yet. + include::input-args.txt[] include::output-args.txt[] diff --git a/src/java/com/cloudera/sqoop/SqoopOptions.java b/src/java/com/cloudera/sqoop/SqoopOptions.java index d07aecc6..3893ff31 100644 --- a/src/java/com/cloudera/sqoop/SqoopOptions.java +++ b/src/java/com/cloudera/sqoop/SqoopOptions.java @@ -194,6 +194,22 @@ public enum IncrementalMode { // Column to use for the WHERE clause in an UPDATE-based export. @StoredAsProperty("export.update.col") private String updateKeyCol; + /** + * Update mode option specifies how updates are performed when + * new rows are found with non-matching keys in database. + * It supports two modes: + *
    + *
  • UpdateOnly: This is the default. New rows are silently ignored.
  • + *
  • AllowInsert: New rows are inserted into the database.
  • + *
+ */ + public enum UpdateMode { + UpdateOnly, + AllowInsert + } + + @StoredAsProperty("export.new.update") private UpdateMode updateMode; + private DelimiterSet inputDelimiters; // codegen.input.delimiters. private DelimiterSet outputDelimiters; // codegen.output.delimiters. private boolean areDelimsManuallySet; @@ -797,6 +813,8 @@ private void initDefaults(Configuration baseConfiguration) { this.dbOutColumns = null; this.incrementalMode = IncrementalMode.None; + + this.updateMode = UpdateMode.UpdateOnly; } /** @@ -1585,6 +1603,21 @@ public String getUpdateKeyCol() { return this.updateKeyCol; } + /** + * Set "UpdateOnly" to silently ignore new rows during update export. + * Set "AllowInsert" to insert new rows during update export. + */ + public void setUpdateMode(UpdateMode mode) { + this.updateMode = mode; + } + + /** + * @return how to handle new rows found in update export. + */ + public UpdateMode getUpdateMode() { + return updateMode; + } + /** * @return an ordered list of column names. The code generator should * generate the DBWritable.write(PreparedStatement) method with columns diff --git a/src/java/com/cloudera/sqoop/manager/ConnManager.java b/src/java/com/cloudera/sqoop/manager/ConnManager.java index f5f5a4bc..30a3f6b1 100644 --- a/src/java/com/cloudera/sqoop/manager/ConnManager.java +++ b/src/java/com/cloudera/sqoop/manager/ConnManager.java @@ -23,11 +23,14 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.util.ExportException; import com.cloudera.sqoop.util.ImportException; @@ -273,6 +276,44 @@ public void updateTable(ExportJobContext context) throw new ExportException("This database does not support updates"); } + /** + * Export data stored in HDFS into a table in a database. + * This may update or insert rows into the target table depending on + * whether rows already exist in the target table or not. + */ + public void upsertTable(ExportJobContext context) + throws IOException, ExportException { + throw new ExportException("Mixed update/insert is not supported" + + " against the target database yet"); + } + + /** + * Configure database output column ordering explicitly for code generator. + * The code generator should generate the DBWritable.write(PreparedStatement) + * method with columns exporting in this order. + */ + public void configureDbOutputColumns(SqoopOptions options) { + // We're in update mode. We need to explicitly set the database output + // column ordering in the codeGenerator. The UpdateKeyCol must come + // last, because the UPDATE-based OutputFormat will generate the SET + // clause followed by the WHERE clause, and the SqoopRecord needs to + // serialize to this layout. + String updateKeyCol = options.getUpdateKeyCol(); + String [] allColNames = getColumnNames(options.getTableName()); + List dbOutCols = new ArrayList(); + String upperCaseKeyCol = updateKeyCol.toUpperCase(); + for (String col : allColNames) { + if (!upperCaseKeyCol.equals(col.toUpperCase())) { + dbOutCols.add(col); // add non-key columns to the output order list. + } + } + + // Then add the update key column last. + dbOutCols.add(updateKeyCol); + options.setDbOutputColumns(dbOutCols.toArray( + new String[dbOutCols.size()])); + } + /** * If a method of this ConnManager has returned a ResultSet to you, * you are responsible for calling release() after you close the diff --git a/src/java/com/cloudera/sqoop/manager/OracleManager.java b/src/java/com/cloudera/sqoop/manager/OracleManager.java index 1d08c4d1..c8a0431f 100644 --- a/src/java/com/cloudera/sqoop/manager/OracleManager.java +++ b/src/java/com/cloudera/sqoop/manager/OracleManager.java @@ -38,8 +38,11 @@ import org.apache.commons.logging.LogFactory; import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.SqoopOptions.UpdateMode; import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat; import com.cloudera.sqoop.mapreduce.JdbcExportJob; +import com.cloudera.sqoop.mapreduce.JdbcUpsertExportJob; +import com.cloudera.sqoop.mapreduce.OracleUpsertOutputFormat; import com.cloudera.sqoop.mapreduce.db.OracleDataDrivenDBInputFormat; import com.cloudera.sqoop.util.ExportException; import com.cloudera.sqoop.util.ImportException; @@ -382,6 +385,46 @@ public void exportTable(ExportJobContext context) exportJob.runExport(); } + @Override + /** + * {@inheritDoc} + */ + public void upsertTable(ExportJobContext context) + throws IOException, ExportException { + context.setConnManager(this); + JdbcUpsertExportJob exportJob = + new JdbcUpsertExportJob(context, OracleUpsertOutputFormat.class); + exportJob.runExport(); + } + + @Override + /** + * {@inheritDoc} + */ + public void configureDbOutputColumns(SqoopOptions options) { + if (options.getUpdateMode() == UpdateMode.UpdateOnly) { + super.configureDbOutputColumns(options); + } else { + // We're in upsert mode. We need to explicitly set + // the database output column ordering in the codeGenerator. + String updateKeyCol = options.getUpdateKeyCol(); + String [] allColNames = getColumnNames(options.getTableName()); + List dbOutCols = new ArrayList(); + dbOutCols.add(updateKeyCol); + String upperCaseKeyCol = updateKeyCol.toUpperCase(); + for (String col : allColNames) { + if (!upperCaseKeyCol.equals(col.toUpperCase())) { + dbOutCols.add(col); // add update columns to the output order list. + } + } + for (String col : allColNames) { + dbOutCols.add(col); // add insert columns to the output order list. + } + options.setDbOutputColumns(dbOutCols.toArray( + new String[dbOutCols.size()])); + } + } + @Override public ResultSet readTable(String tableName, String[] columns) throws SQLException { diff --git a/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java b/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java new file mode 100644 index 00000000..4dc246ce --- /dev/null +++ b/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java @@ -0,0 +1,89 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.OutputFormat; +import com.cloudera.sqoop.mapreduce.db.DBConfiguration; +import com.cloudera.sqoop.mapreduce.db.DBOutputFormat; + +import com.cloudera.sqoop.manager.ConnManager; +import com.cloudera.sqoop.manager.ExportJobContext; + +/** + * Run an update/insert export using JDBC (JDBC-based UpsertOutputFormat). + */ +public class JdbcUpsertExportJob extends JdbcUpdateExportJob { + + public static final Log LOG = LogFactory.getLog( + JdbcUpsertExportJob.class.getName()); + + public JdbcUpsertExportJob(final ExportJobContext context, + final Class outputFormatClass) + throws IOException { + super(context, null, null, outputFormatClass); + } + + @Override + protected void configureOutputFormat(Job job, String tableName, + String tableClassName) throws IOException { + + ConnManager mgr = context.getConnManager(); + try { + String username = options.getUsername(); + if (null == username || username.length() == 0) { + DBConfiguration.configureDB(job.getConfiguration(), + mgr.getDriverClass(), + options.getConnectString()); + } else { + DBConfiguration.configureDB(job.getConfiguration(), + mgr.getDriverClass(), + options.getConnectString(), + username, options.getPassword()); + } + + String [] colNames = options.getColumns(); + if (null == colNames) { + colNames = mgr.getColumnNames(tableName); + } + if (null == colNames) { + throw new IOException( + "Export column names could not be determined for " + tableName); + } + DBOutputFormat.setOutput(job, tableName, colNames); + + String updateKeyCol = options.getUpdateKeyCol(); + if (null == updateKeyCol) { + throw new IOException("Update key column not set in export job"); + } + + job.setOutputFormatClass(getOutputFormatClass()); + job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName); + job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyCol); + } catch (ClassNotFoundException cnfe) { + throw new IOException("Could not load OutputFormat", cnfe); + } + } +} + diff --git a/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java b/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java new file mode 100644 index 00000000..ebd13b94 --- /dev/null +++ b/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java @@ -0,0 +1,116 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.SQLException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import com.cloudera.sqoop.lib.SqoopRecord; + +/** + * Update an existing table with new value if the table already + * contains the row, or insert the data into the table if the table + * does not contain the row yet. + */ +public class OracleUpsertOutputFormat + extends UpdateOutputFormat { + + private static final Log LOG = + LogFactory.getLog(OracleUpsertOutputFormat.class); + + @Override + /** {@inheritDoc} */ + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException { + try { + return new OracleUpsertRecordWriter(context); + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * RecordWriter to write the output to UPDATE/INSERT statements. + */ + public class OracleUpsertRecordWriter extends UpdateRecordWriter { + + public OracleUpsertRecordWriter(TaskAttemptContext context) + throws ClassNotFoundException, SQLException { + super(context); + } + + /** + * @return an UPDATE/INSERT statement that modifies/inserts a row + * depending on whether the row already exist in the table or not. + */ + protected String getUpdateStatement() { + boolean first; + + StringBuilder sb = new StringBuilder(); + sb.append("MERGE INTO "); + sb.append(tableName); + sb.append(" USING dual ON ( "); + sb.append(updateCol); + sb.append(" = ? )"); + + sb.append(" WHEN MATCHED THEN UPDATE SET "); + first = true; + for (String col : columnNames) { + if (!col.equals(updateCol)) { + if (first) { + first = false; + } else { + sb.append(", "); + } + sb.append(col); + sb.append(" = ?"); + } + } + + sb.append(" WHEN NOT MATCHED THEN INSERT ( "); + first = true; + for (String col : columnNames) { + if (first) { + first = false; + } else { + sb.append(", "); + } + sb.append(col); + } + sb.append(" ) VALUES ( "); + first = true; + for (String col : columnNames) { + if (first) { + first = false; + } else { + sb.append(", "); + } + sb.append("?"); + } + sb.append(" )"); + + return sb.toString(); + } + } +} diff --git a/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java b/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java index d5339d9c..8800caff 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java +++ b/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java @@ -86,9 +86,9 @@ public RecordWriter getRecordWriter(TaskAttemptContext context) */ public class UpdateRecordWriter extends AsyncSqlRecordWriter { - private String tableName; - private String [] columnNames; // The columns to update. - private String updateCol; // The column containing the fixed key. + protected String tableName; + protected String [] columnNames; // The columns to update. + protected String updateCol; // The column containing the fixed key. public UpdateRecordWriter(TaskAttemptContext context) throws ClassNotFoundException, SQLException { diff --git a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java index 879c7c8c..2a07beed 100644 --- a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java +++ b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java @@ -138,6 +138,7 @@ public abstract class BaseSqoopTool extends SqoopTool { public static final String VERBOSE_ARG = "verbose"; public static final String HELP_ARG = "help"; public static final String UPDATE_KEY_ARG = "update-key"; + public static final String UPDATE_MODE_ARG = "update-mode"; // Arguments for incremental imports. public static final String INCREMENT_TYPE_ARG = "incremental"; diff --git a/src/java/com/cloudera/sqoop/tool/ExportTool.java b/src/java/com/cloudera/sqoop/tool/ExportTool.java index d156eeb7..fc47cbab 100644 --- a/src/java/com/cloudera/sqoop/tool/ExportTool.java +++ b/src/java/com/cloudera/sqoop/tool/ExportTool.java @@ -19,7 +19,6 @@ package com.cloudera.sqoop.tool; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import org.apache.commons.cli.CommandLine; @@ -30,6 +29,7 @@ import com.cloudera.sqoop.Sqoop; import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException; +import com.cloudera.sqoop.SqoopOptions.UpdateMode; import com.cloudera.sqoop.cli.RelatedOptions; import com.cloudera.sqoop.cli.ToolOptions; import com.cloudera.sqoop.manager.ExportJobContext; @@ -66,8 +66,13 @@ private void exportTable(SqoopOptions options, String tableName) ExportJobContext context = new ExportJobContext(tableName, jarFile, options); if (options.getUpdateKeyCol() != null) { - // UPDATE-based export. - manager.updateTable(context); + if (options.getUpdateMode() == UpdateMode.UpdateOnly) { + // UPDATE-based export. + manager.updateTable(context); + } else { + // Mixed update/insert export + manager.upsertTable(context); + } } else { // INSERT-based export. manager.exportTable(context); @@ -84,26 +89,8 @@ public int run(SqoopOptions options) { codeGenerator.setManager(manager); - String updateKeyCol = options.getUpdateKeyCol(); - if (updateKeyCol != null) { - // We're in update mode. We need to explicitly set the database output - // column ordering in the codeGenerator. The UpdateKeyCol must come - // last, because the UPDATE-based OutputFormat will generate the SET - // clause followed by the WHERE clause, and the SqoopRecord needs to - // serialize to this layout. - String [] allColNames = manager.getColumnNames(options.getTableName()); - List dbOutCols = new ArrayList(); - String upperCaseKeyCol = updateKeyCol.toUpperCase(); - for (String col : allColNames) { - if (!upperCaseKeyCol.equals(col.toUpperCase())) { - dbOutCols.add(col); // add non-key columns to the output order list. - } - } - - // Then add the update key column last. - dbOutCols.add(updateKeyCol); - options.setDbOutputColumns(dbOutCols.toArray( - new String[dbOutCols.size()])); + if (options.getUpdateKeyCol() != null) { + manager.configureDbOutputColumns(options); } try { @@ -174,6 +161,13 @@ protected RelatedOptions getExportOptions() { + "to be executed in batch mode") .withLongOpt(BATCH_ARG) .create()); + exportOpts.addOption(OptionBuilder + .withArgName("mode") + .hasArg() + .withDescription("Specifies how updates are performed when " + + "new rows are found with non-matching keys in database") + .withLongOpt(UPDATE_MODE_ARG) + .create()); return exportOpts; } @@ -257,6 +251,7 @@ public void applyOptions(CommandLine in, SqoopOptions out) out.setClearStagingTable(true); } + applyNewUpdateOptions(in, out); applyInputFormatOptions(in, out); applyOutputFormatOptions(in, out); applyOutputFormatOptions(in, out); @@ -335,5 +330,21 @@ public void validateOptions(SqoopOptions options) validateCommonOptions(options); validateCodeGenOptions(options); } + + private void applyNewUpdateOptions(CommandLine in, SqoopOptions out) + throws InvalidOptionsException { + if (in.hasOption(UPDATE_MODE_ARG)) { + String updateTypeStr = in.getOptionValue(UPDATE_MODE_ARG); + if ("updateonly".equals(updateTypeStr)) { + out.setUpdateMode(UpdateMode.UpdateOnly); + } else if ("allowinsert".equals(updateTypeStr)) { + out.setUpdateMode(UpdateMode.AllowInsert); + } else { + throw new InvalidOptionsException("Unknown new update mode: " + + updateTypeStr + ". Use 'updateonly' or 'allowinsert'." + + HELP_STR); + } + } + } } diff --git a/src/test/com/cloudera/sqoop/manager/OracleExportTest.java b/src/test/com/cloudera/sqoop/manager/OracleExportTest.java index 12858d72..5e54f0a8 100644 --- a/src/test/com/cloudera/sqoop/manager/OracleExportTest.java +++ b/src/test/com/cloudera/sqoop/manager/OracleExportTest.java @@ -263,4 +263,18 @@ public void testDatesAndTimes() throws IOException, SQLException { assertColMinAndMax(forIdx(1), genTime); } } + + /** Make sure mixed update/insert export work correctly. */ + public void testUpsertTextExport() throws IOException, SQLException { + final int TOTAL_RECORDS = 10; + createTextFile(0, TOTAL_RECORDS, false); + createTable(); + // first time will be insert. + runExport(getArgv(true, 10, 10, newStrArray(null, + "--update-key", "ID", "--update-mode", "allowinsert"))); + // second time will be update. + runExport(getArgv(true, 10, 10, newStrArray(null, + "--update-key", "ID", "--update-mode", "allowinsert"))); + verifyExport(TOTAL_RECORDS); + } }