5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-07 00:52:47 +08:00

SQOOP-327. Mixed update/insert export for Oracle.

(Bilung Lee via Arvind Prabhakar)


git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1166930 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arvind Prabhakar 2011-09-08 22:25:39 +00:00
parent 7d2939b3a1
commit e1e6e5c009
11 changed files with 390 additions and 26 deletions

View File

@ -43,6 +43,12 @@ Export control options
--update-key (col-name)::
Anchor column to use for updates
--update-mode (mode)::
Specify how updates are performed when new rows are found with non-matching keys
in database. By default, "mode" is +updateonly+, in which case new rows are
silently ignored. Alternatively, "mode" can be +allowinsert+, in which case
new rows are inserted instead.
--input-null-string::
The string to be interpreted as null for string columns

View File

@ -52,6 +52,12 @@ Argument Description
parallel
+\--table <table-name>+ Table to populate
+\--update-key <col-name>+ Anchor column to use for updates
+\--update-mode <mode>+ Specify how updates are performed\
when new rows are found with\
non-matching keys in database.
Legal values for +mode+ include\
+updateonly+ (default) and\
+allowinsert+.
+\--input-null-string <null-string>+ The string to be interpreted as\
null for string columns
+\--input-null-non-string <null-string>+ The string to be interpreted as\
@ -169,6 +175,10 @@ Likewise, if the column specified with +\--update-key+ does not
uniquely identify rows and multiple rows are updated by a single
statement, this condition is also undetected.
Depending on the target database, you may also specify the +\--update-mode+
argument with +allowinsert+ mode if you want to update rows if they exist
in the database already or insert rows if they do not exist yet.
include::input-args.txt[]
include::output-args.txt[]

View File

@ -194,6 +194,22 @@ public enum IncrementalMode {
// Column to use for the WHERE clause in an UPDATE-based export.
@StoredAsProperty("export.update.col") private String updateKeyCol;
/**
* Update mode option specifies how updates are performed when
* new rows are found with non-matching keys in database.
* It supports two modes:
* <ul>
* <li>UpdateOnly: This is the default. New rows are silently ignored.</li>
* <li>AllowInsert: New rows are inserted into the database.</li>
* </ul>
*/
public enum UpdateMode {
UpdateOnly,
AllowInsert
}
@StoredAsProperty("export.new.update") private UpdateMode updateMode;
private DelimiterSet inputDelimiters; // codegen.input.delimiters.
private DelimiterSet outputDelimiters; // codegen.output.delimiters.
private boolean areDelimsManuallySet;
@ -797,6 +813,8 @@ private void initDefaults(Configuration baseConfiguration) {
this.dbOutColumns = null;
this.incrementalMode = IncrementalMode.None;
this.updateMode = UpdateMode.UpdateOnly;
}
/**
@ -1585,6 +1603,21 @@ public String getUpdateKeyCol() {
return this.updateKeyCol;
}
/**
* Set "UpdateOnly" to silently ignore new rows during update export.
* Set "AllowInsert" to insert new rows during update export.
*/
public void setUpdateMode(UpdateMode mode) {
this.updateMode = mode;
}
/**
* @return how to handle new rows found in update export.
*/
public UpdateMode getUpdateMode() {
return updateMode;
}
/**
* @return an ordered list of column names. The code generator should
* generate the DBWritable.write(PreparedStatement) method with columns

View File

@ -23,11 +23,14 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.util.ExportException;
import com.cloudera.sqoop.util.ImportException;
@ -273,6 +276,44 @@ public void updateTable(ExportJobContext context)
throw new ExportException("This database does not support updates");
}
/**
* Export data stored in HDFS into a table in a database.
* This may update or insert rows into the target table depending on
* whether rows already exist in the target table or not.
*/
public void upsertTable(ExportJobContext context)
throws IOException, ExportException {
throw new ExportException("Mixed update/insert is not supported"
+ " against the target database yet");
}
/**
* Configure database output column ordering explicitly for code generator.
* The code generator should generate the DBWritable.write(PreparedStatement)
* method with columns exporting in this order.
*/
public void configureDbOutputColumns(SqoopOptions options) {
// We're in update mode. We need to explicitly set the database output
// column ordering in the codeGenerator. The UpdateKeyCol must come
// last, because the UPDATE-based OutputFormat will generate the SET
// clause followed by the WHERE clause, and the SqoopRecord needs to
// serialize to this layout.
String updateKeyCol = options.getUpdateKeyCol();
String [] allColNames = getColumnNames(options.getTableName());
List<String> dbOutCols = new ArrayList<String>();
String upperCaseKeyCol = updateKeyCol.toUpperCase();
for (String col : allColNames) {
if (!upperCaseKeyCol.equals(col.toUpperCase())) {
dbOutCols.add(col); // add non-key columns to the output order list.
}
}
// Then add the update key column last.
dbOutCols.add(updateKeyCol);
options.setDbOutputColumns(dbOutCols.toArray(
new String[dbOutCols.size()]));
}
/**
* If a method of this ConnManager has returned a ResultSet to you,
* you are responsible for calling release() after you close the

View File

@ -38,8 +38,11 @@
import org.apache.commons.logging.LogFactory;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.SqoopOptions.UpdateMode;
import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat;
import com.cloudera.sqoop.mapreduce.JdbcExportJob;
import com.cloudera.sqoop.mapreduce.JdbcUpsertExportJob;
import com.cloudera.sqoop.mapreduce.OracleUpsertOutputFormat;
import com.cloudera.sqoop.mapreduce.db.OracleDataDrivenDBInputFormat;
import com.cloudera.sqoop.util.ExportException;
import com.cloudera.sqoop.util.ImportException;
@ -382,6 +385,46 @@ public void exportTable(ExportJobContext context)
exportJob.runExport();
}
@Override
/**
* {@inheritDoc}
*/
public void upsertTable(ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
JdbcUpsertExportJob exportJob =
new JdbcUpsertExportJob(context, OracleUpsertOutputFormat.class);
exportJob.runExport();
}
@Override
/**
* {@inheritDoc}
*/
public void configureDbOutputColumns(SqoopOptions options) {
if (options.getUpdateMode() == UpdateMode.UpdateOnly) {
super.configureDbOutputColumns(options);
} else {
// We're in upsert mode. We need to explicitly set
// the database output column ordering in the codeGenerator.
String updateKeyCol = options.getUpdateKeyCol();
String [] allColNames = getColumnNames(options.getTableName());
List<String> dbOutCols = new ArrayList<String>();
dbOutCols.add(updateKeyCol);
String upperCaseKeyCol = updateKeyCol.toUpperCase();
for (String col : allColNames) {
if (!upperCaseKeyCol.equals(col.toUpperCase())) {
dbOutCols.add(col); // add update columns to the output order list.
}
}
for (String col : allColNames) {
dbOutCols.add(col); // add insert columns to the output order list.
}
options.setDbOutputColumns(dbOutCols.toArray(
new String[dbOutCols.size()]));
}
}
@Override
public ResultSet readTable(String tableName, String[] columns)
throws SQLException {

View File

@ -0,0 +1,89 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.mapreduce;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.ExportJobContext;
/**
* Run an update/insert export using JDBC (JDBC-based UpsertOutputFormat).
*/
public class JdbcUpsertExportJob extends JdbcUpdateExportJob {
public static final Log LOG = LogFactory.getLog(
JdbcUpsertExportJob.class.getName());
public JdbcUpsertExportJob(final ExportJobContext context,
final Class<? extends OutputFormat> outputFormatClass)
throws IOException {
super(context, null, null, outputFormatClass);
}
@Override
protected void configureOutputFormat(Job job, String tableName,
String tableClassName) throws IOException {
ConnManager mgr = context.getConnManager();
try {
String username = options.getUsername();
if (null == username || username.length() == 0) {
DBConfiguration.configureDB(job.getConfiguration(),
mgr.getDriverClass(),
options.getConnectString());
} else {
DBConfiguration.configureDB(job.getConfiguration(),
mgr.getDriverClass(),
options.getConnectString(),
username, options.getPassword());
}
String [] colNames = options.getColumns();
if (null == colNames) {
colNames = mgr.getColumnNames(tableName);
}
if (null == colNames) {
throw new IOException(
"Export column names could not be determined for " + tableName);
}
DBOutputFormat.setOutput(job, tableName, colNames);
String updateKeyCol = options.getUpdateKeyCol();
if (null == updateKeyCol) {
throw new IOException("Update key column not set in export job");
}
job.setOutputFormatClass(getOutputFormatClass());
job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyCol);
} catch (ClassNotFoundException cnfe) {
throw new IOException("Could not load OutputFormat", cnfe);
}
}
}

View File

@ -0,0 +1,116 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.mapreduce;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.cloudera.sqoop.lib.SqoopRecord;
/**
* Update an existing table with new value if the table already
* contains the row, or insert the data into the table if the table
* does not contain the row yet.
*/
public class OracleUpsertOutputFormat<K extends SqoopRecord, V>
extends UpdateOutputFormat<K, V> {
private static final Log LOG =
LogFactory.getLog(OracleUpsertOutputFormat.class);
@Override
/** {@inheritDoc} */
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException {
try {
return new OracleUpsertRecordWriter(context);
} catch (Exception e) {
throw new IOException(e);
}
}
/**
* RecordWriter to write the output to UPDATE/INSERT statements.
*/
public class OracleUpsertRecordWriter extends UpdateRecordWriter {
public OracleUpsertRecordWriter(TaskAttemptContext context)
throws ClassNotFoundException, SQLException {
super(context);
}
/**
* @return an UPDATE/INSERT statement that modifies/inserts a row
* depending on whether the row already exist in the table or not.
*/
protected String getUpdateStatement() {
boolean first;
StringBuilder sb = new StringBuilder();
sb.append("MERGE INTO ");
sb.append(tableName);
sb.append(" USING dual ON ( ");
sb.append(updateCol);
sb.append(" = ? )");
sb.append(" WHEN MATCHED THEN UPDATE SET ");
first = true;
for (String col : columnNames) {
if (!col.equals(updateCol)) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append(col);
sb.append(" = ?");
}
}
sb.append(" WHEN NOT MATCHED THEN INSERT ( ");
first = true;
for (String col : columnNames) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append(col);
}
sb.append(" ) VALUES ( ");
first = true;
for (String col : columnNames) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append("?");
}
sb.append(" )");
return sb.toString();
}
}
}

View File

@ -86,9 +86,9 @@ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
*/
public class UpdateRecordWriter extends AsyncSqlRecordWriter<K, V> {
private String tableName;
private String [] columnNames; // The columns to update.
private String updateCol; // The column containing the fixed key.
protected String tableName;
protected String [] columnNames; // The columns to update.
protected String updateCol; // The column containing the fixed key.
public UpdateRecordWriter(TaskAttemptContext context)
throws ClassNotFoundException, SQLException {

View File

@ -138,6 +138,7 @@ public abstract class BaseSqoopTool extends SqoopTool {
public static final String VERBOSE_ARG = "verbose";
public static final String HELP_ARG = "help";
public static final String UPDATE_KEY_ARG = "update-key";
public static final String UPDATE_MODE_ARG = "update-mode";
// Arguments for incremental imports.
public static final String INCREMENT_TYPE_ARG = "incremental";

View File

@ -19,7 +19,6 @@
package com.cloudera.sqoop.tool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.cli.CommandLine;
@ -30,6 +29,7 @@
import com.cloudera.sqoop.Sqoop;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
import com.cloudera.sqoop.SqoopOptions.UpdateMode;
import com.cloudera.sqoop.cli.RelatedOptions;
import com.cloudera.sqoop.cli.ToolOptions;
import com.cloudera.sqoop.manager.ExportJobContext;
@ -66,8 +66,13 @@ private void exportTable(SqoopOptions options, String tableName)
ExportJobContext context = new ExportJobContext(tableName, jarFile,
options);
if (options.getUpdateKeyCol() != null) {
// UPDATE-based export.
manager.updateTable(context);
if (options.getUpdateMode() == UpdateMode.UpdateOnly) {
// UPDATE-based export.
manager.updateTable(context);
} else {
// Mixed update/insert export
manager.upsertTable(context);
}
} else {
// INSERT-based export.
manager.exportTable(context);
@ -84,26 +89,8 @@ public int run(SqoopOptions options) {
codeGenerator.setManager(manager);
String updateKeyCol = options.getUpdateKeyCol();
if (updateKeyCol != null) {
// We're in update mode. We need to explicitly set the database output
// column ordering in the codeGenerator. The UpdateKeyCol must come
// last, because the UPDATE-based OutputFormat will generate the SET
// clause followed by the WHERE clause, and the SqoopRecord needs to
// serialize to this layout.
String [] allColNames = manager.getColumnNames(options.getTableName());
List<String> dbOutCols = new ArrayList<String>();
String upperCaseKeyCol = updateKeyCol.toUpperCase();
for (String col : allColNames) {
if (!upperCaseKeyCol.equals(col.toUpperCase())) {
dbOutCols.add(col); // add non-key columns to the output order list.
}
}
// Then add the update key column last.
dbOutCols.add(updateKeyCol);
options.setDbOutputColumns(dbOutCols.toArray(
new String[dbOutCols.size()]));
if (options.getUpdateKeyCol() != null) {
manager.configureDbOutputColumns(options);
}
try {
@ -174,6 +161,13 @@ protected RelatedOptions getExportOptions() {
+ "to be executed in batch mode")
.withLongOpt(BATCH_ARG)
.create());
exportOpts.addOption(OptionBuilder
.withArgName("mode")
.hasArg()
.withDescription("Specifies how updates are performed when "
+ "new rows are found with non-matching keys in database")
.withLongOpt(UPDATE_MODE_ARG)
.create());
return exportOpts;
}
@ -257,6 +251,7 @@ public void applyOptions(CommandLine in, SqoopOptions out)
out.setClearStagingTable(true);
}
applyNewUpdateOptions(in, out);
applyInputFormatOptions(in, out);
applyOutputFormatOptions(in, out);
applyOutputFormatOptions(in, out);
@ -335,5 +330,21 @@ public void validateOptions(SqoopOptions options)
validateCommonOptions(options);
validateCodeGenOptions(options);
}
private void applyNewUpdateOptions(CommandLine in, SqoopOptions out)
throws InvalidOptionsException {
if (in.hasOption(UPDATE_MODE_ARG)) {
String updateTypeStr = in.getOptionValue(UPDATE_MODE_ARG);
if ("updateonly".equals(updateTypeStr)) {
out.setUpdateMode(UpdateMode.UpdateOnly);
} else if ("allowinsert".equals(updateTypeStr)) {
out.setUpdateMode(UpdateMode.AllowInsert);
} else {
throw new InvalidOptionsException("Unknown new update mode: "
+ updateTypeStr + ". Use 'updateonly' or 'allowinsert'."
+ HELP_STR);
}
}
}
}

View File

@ -263,4 +263,18 @@ public void testDatesAndTimes() throws IOException, SQLException {
assertColMinAndMax(forIdx(1), genTime);
}
}
/** Make sure mixed update/insert export work correctly. */
public void testUpsertTextExport() throws IOException, SQLException {
final int TOTAL_RECORDS = 10;
createTextFile(0, TOTAL_RECORDS, false);
createTable();
// first time will be insert.
runExport(getArgv(true, 10, 10, newStrArray(null,
"--update-key", "ID", "--update-mode", "allowinsert")));
// second time will be update.
runExport(getArgv(true, 10, 10, newStrArray(null,
"--update-key", "ID", "--update-mode", "allowinsert")));
verifyExport(TOTAL_RECORDS);
}
}