diff --git a/src/docs/man/sqoop-export.txt b/src/docs/man/sqoop-export.txt index c7c76b4f..3ee36a4b 100644 --- a/src/docs/man/sqoop-export.txt +++ b/src/docs/man/sqoop-export.txt @@ -40,6 +40,9 @@ Export control options --table (table-name):: The table to read (required) +--update-key (col-name):: + Anchor column to use for updates + include::input-args.txt[] include::output-args.txt[] diff --git a/src/docs/user/export-purpose.txt b/src/docs/user/export-purpose.txt index a8b3002a..13345081 100644 --- a/src/docs/user/export-purpose.txt +++ b/src/docs/user/export-purpose.txt @@ -19,6 +19,11 @@ The +export+ tool exports a set of files from HDFS back to an RDBMS. The target table must already exist in the database. The input files are read and parsed into a set of records according to the -user-specified delimiters. These are then transformed into a set of -+INSERT+ statements that inject the records into the database. +user-specified delimiters. + +The default operation is to transform these into a set of +INSERT+ +statements that inject the records into the database. In "update mode," +Sqoop will generate +UPDATE+ statements that replace existing records +in the database. + diff --git a/src/docs/user/export.txt b/src/docs/user/export.txt index 71b9f76c..f621b6db 100644 --- a/src/docs/user/export.txt +++ b/src/docs/user/export.txt @@ -43,14 +43,15 @@ include::common-args.txt[] .Export control arguments: [grid="all"] -`-------------------------`------------------------------------------ -Argument Description ---------------------------------------------------------------------- -+\--direct+ Use direct export fast path -+\--export-dir + HDFS source path for the export -+-m,\--num-mappers + Use 'n' map tasks to export in parallel -+\--table + Table to populate ---------------------------------------------------------------------- +`---------------------------`------------------------------------------ +Argument Description +----------------------------------------------------------------------- ++\--direct+ Use direct export fast path ++\--export-dir + HDFS source path for the export ++-m,\--num-mappers + Use 'n' map tasks to export in parallel ++\--table + Table to populate ++\--update-key + Anchor column to use for updates +----------------------------------------------------------------------- The +\--table+ and +\--export-dir+ arguments are required. These specify the table to populate in the database, and the @@ -72,6 +73,58 @@ MySQL provides a direct mode for exports as well, using the to specify this codepath. This may be higher-performance than the standard JDBC codepath. + +Inserts vs. Updates +~~~~~~~~~~~~~~~~~~~ + +By default, +sqoop-export+ appends new rows to a table; each input +record is transformed into an +INSERT+ statement that adds a row to the +target database table. If your table has constraints (e.g., a primary +key column whose values must be unique) and already contains data, you +must take care to avoid inserting records that violate these +constraints. The export process will fail if an +INSERT+ statement +fails. This mode is primarily intended for exporting records to a new, +empty table intended to receive these results. + +If you specify the +\--update-key+ argument, Sqoop will instead modify +an existing dataset in the database. Each input record is treated as +an +UPDATE+ statement that modifies an existing row. The row a +statement modifies is determined by the column name specified with ++\--update-key+. For example, consider the following table +definition: + +---- +CREATE TABLE foo( + id INT NOT NULL PRIMARY KEY, + msg VARCHAR(32), + bar INT); +---- + +Consider also a dataset in HDFS containing records like these: + +---- +0,this is a test,42 +1,some more data,100 +... +---- + +Running +sqoop-export \--table foo \--update-key id \--export-dir +/path/to/data \--connect ...+ will run an export job that executes SQL +statements based on the data like so: + +---- +UPDATE foo SET msg='this is a test', bar=42 WHERE id=0; +UPDATE foo SET msg='some more data', bar=100 WHERE id=1; +... +---- + +If an +UPDATE+ statement modifies no rows, this is not considered an +error; the export will silently continue. (In effect, this means that +an update-based export will not insert new rows into the database.) +Likewise, if the column specified with +\--update-key+ does not +uniquely identify rows and multiple rows are updated by a single +statement, this condition is also undetected. + include::input-args.txt[] include::output-args.txt[] @@ -93,6 +146,11 @@ previous import, then the original generated class can be used to read the data back. Specifying +\--jar-file+ and +\--class-name+ obviate the need to specify delimiters in this case. +The use of existing generated code is incompatible with ++\--update-key+; an update-mode export requires new code generation to +perform the update. You cannot use +\--jar-file+, and must fully specify +any non-default delimiters. + Exports and Transactions ~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/src/java/com/cloudera/sqoop/SqoopOptions.java b/src/java/com/cloudera/sqoop/SqoopOptions.java index 945b461e..8b37e979 100644 --- a/src/java/com/cloudera/sqoop/SqoopOptions.java +++ b/src/java/com/cloudera/sqoop/SqoopOptions.java @@ -97,6 +97,10 @@ public enum FileLayout { private String hiveTableName; private String packageName; // package to prepend to auto-named classes. + // An ordered list of column names denoting what order columns are + // serialized to a PreparedStatement from a generated record type. + private String [] dbOutColumns; + // package+class to apply to individual table import. // also used as an *input* class with existingJarFile. private String className; @@ -118,6 +122,9 @@ public enum FileLayout { // HDFS path to read from when performing an export private String exportDir; + // Column to use for the WHERE clause in an UPDATE-based export. + private String updateKeyCol; + private DelimiterSet inputDelimiters; private DelimiterSet outputDelimiters; private boolean areDelimsManuallySet; @@ -278,6 +285,8 @@ private void initDefaults(Configuration baseConfiguration) { this.extraArgs = null; + this.dbOutColumns = null; + loadFromProperties(); } @@ -951,5 +960,51 @@ public void setExtraArgs(String [] args) { this.extraArgs[i] = args[i]; } } + + /** + * Set the name of the column to be used in the WHERE clause of an + * UPDATE-based export process. + */ + public void setUpdateKeyCol(String colName) { + this.updateKeyCol = colName; + } + + /** + * @return the column which is the key column in a table to be exported + * in update mode. + */ + public String getUpdateKeyCol() { + return this.updateKeyCol; + } + + /** + * @return an ordered list of column names. The code generator should + * generate the DBWritable.write(PreparedStatement) method with columns + * exporting in this order, if it is non-null. + */ + public String [] getDbOutputColumns() { + if (null != dbOutColumns) { + return Arrays.copyOf(this.dbOutColumns, dbOutColumns.length); + } else { + return null; + } + } + + /** + * Set the order in which columns should be serialized by the generated + * DBWritable.write(PreparedStatement) method. Setting this to null will use + * the "natural order" of the database table. + * + * TODO: Expose this setter via the command-line arguments for the codegen + * module. That would allow users to export to tables with columns in a + * different physical order than the file layout in HDFS. + */ + public void setDbOutputColumns(String [] outCols) { + if (null == outCols) { + this.dbOutColumns = null; + } else { + this.dbOutColumns = Arrays.copyOf(outCols, outCols.length); + } + } } diff --git a/src/java/com/cloudera/sqoop/manager/ConnManager.java b/src/java/com/cloudera/sqoop/manager/ConnManager.java index effd00a5..750118ae 100644 --- a/src/java/com/cloudera/sqoop/manager/ConnManager.java +++ b/src/java/com/cloudera/sqoop/manager/ConnManager.java @@ -173,12 +173,23 @@ public String escapeTableName(String tableName) { /** * Export data stored in HDFS into a table in a database. + * This inserts new rows into the target table. */ public void exportTable(ExportJobContext context) throws IOException, ExportException { throw new ExportException("This database does not support exports"); } + /** + * Export updated data stored in HDFS into a database table. + * This updates existing rows in the target table, based on the + * updateKeyCol specified in the context's SqoopOptions. + */ + public void updateTable(ExportJobContext context) + throws IOException, ExportException { + throw new ExportException("This database does not support updates"); + } + /** * If a method of this ConnManager has returned a ResultSet to you, * you are responsible for calling release() after you close the diff --git a/src/java/com/cloudera/sqoop/manager/SqlManager.java b/src/java/com/cloudera/sqoop/manager/SqlManager.java index 74c87744..9e204346 100644 --- a/src/java/com/cloudera/sqoop/manager/SqlManager.java +++ b/src/java/com/cloudera/sqoop/manager/SqlManager.java @@ -24,6 +24,7 @@ import com.cloudera.sqoop.lib.ClobRef; import com.cloudera.sqoop.mapreduce.DataDrivenImportJob; import com.cloudera.sqoop.mapreduce.JdbcExportJob; +import com.cloudera.sqoop.mapreduce.JdbcUpdateExportJob; import com.cloudera.sqoop.util.ExportException; import com.cloudera.sqoop.util.ImportException; import com.cloudera.sqoop.util.ResultSetPrinter; @@ -593,4 +594,13 @@ public void release() { this.lastStatement = null; } } + + /** + * @{inheritDoc} + */ + public void updateTable(ExportJobContext context) + throws IOException, ExportException { + JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context); + exportJob.runExport(); + } } diff --git a/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java b/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java index 30c13db3..31079e41 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java +++ b/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java @@ -57,6 +57,13 @@ public class ExportJobBase extends JobBase { public static final String SQOOP_EXPORT_TABLE_CLASS_KEY = "sqoop.mapreduce.export.table.class"; + /** + * What column of the table to use for the WHERE clause of + * an updating export. + */ + public static final String SQOOP_EXPORT_UPDATE_COL_KEY = + "sqoop.mapreduce.export.update.col"; + /** Number of map tasks to use for an export. */ public static final String EXPORT_MAP_TASKS_KEY = "sqoop.mapreduce.export.map.tasks"; diff --git a/src/java/com/cloudera/sqoop/mapreduce/JdbcUpdateExportJob.java b/src/java/com/cloudera/sqoop/mapreduce/JdbcUpdateExportJob.java new file mode 100644 index 00000000..bbcb6977 --- /dev/null +++ b/src/java/com/cloudera/sqoop/mapreduce/JdbcUpdateExportJob.java @@ -0,0 +1,143 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.SQLException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; +import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; + +import com.cloudera.sqoop.ConnFactory; +import com.cloudera.sqoop.manager.ConnManager; +import com.cloudera.sqoop.manager.ExportJobContext; +import com.cloudera.sqoop.shims.ShimLoader; + +/** + * Run an update-based export using JDBC (JDBC-based UpdateOutputFormat). + */ +public class JdbcUpdateExportJob extends ExportJobBase { + + public static final Log LOG = LogFactory.getLog( + JdbcUpdateExportJob.class.getName()); + + /** + * Return an instance of the UpdateOutputFormat class object loaded + * from the shim jar. + */ + private static Class getUpdateOutputFormat() + throws IOException { + try { + return (Class) ShimLoader.getShimClass( + "com.cloudera.sqoop.mapreduce.UpdateOutputFormat"); + } catch (ClassNotFoundException cnfe) { + throw new IOException("Could not load updating export OutputFormat", + cnfe); + } + } + + public JdbcUpdateExportJob(final ExportJobContext context) + throws IOException { + super(context, null, null, getUpdateOutputFormat()); + } + + public JdbcUpdateExportJob(final ExportJobContext ctxt, + final Class mapperClass, + final Class inputFormatClass, + final Class outputFormatClass) { + super(ctxt, mapperClass, inputFormatClass, outputFormatClass); + } + + @Override + protected Class getMapperClass() { + if (inputIsSequenceFiles()) { + return SequenceFileExportMapper.class; + } else { + return TextExportMapper.class; + } + } + + @Override + protected void configureOutputFormat(Job job, String tableName, + String tableClassName) throws IOException { + + Configuration conf = options.getConf(); + ConnManager mgr = new ConnFactory(conf).getManager(options); + try { + String username = options.getUsername(); + if (null == username || username.length() == 0) { + DBConfiguration.configureDB(job.getConfiguration(), + mgr.getDriverClass(), + options.getConnectString()); + } else { + DBConfiguration.configureDB(job.getConfiguration(), + mgr.getDriverClass(), + options.getConnectString(), + username, options.getPassword()); + } + + String [] colNames = options.getColumns(); + if (null == colNames) { + colNames = mgr.getColumnNames(tableName); + } + + if (null == colNames) { + throw new IOException( + "Export column names could not be determined for " + tableName); + } + + String updateKeyCol = options.getUpdateKeyCol(); + if (null == updateKeyCol) { + throw new IOException("Update key column not set in export job"); + } + + // Make sure we strip out the key column from this list. + String [] outColNames = new String[colNames.length - 1]; + int j = 0; + String upperCaseKeyCol = updateKeyCol.toUpperCase(); + for (int i = 0; i < colNames.length; i++) { + if (!colNames[i].toUpperCase().equals(upperCaseKeyCol)) { + outColNames[j++] = colNames[i]; + } + } + DBOutputFormat.setOutput(job, tableName, outColNames); + + job.setOutputFormatClass(getOutputFormatClass()); + job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName); + job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyCol); + } catch (ClassNotFoundException cnfe) { + throw new IOException("Could not load OutputFormat", cnfe); + } finally { + try { + mgr.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing connection: " + sqlE); + } + } + } +} + diff --git a/src/java/com/cloudera/sqoop/orm/ClassWriter.java b/src/java/com/cloudera/sqoop/orm/ClassWriter.java index dbbac640..4f447b80 100644 --- a/src/java/com/cloudera/sqoop/orm/ClassWriter.java +++ b/src/java/com/cloudera/sqoop/orm/ClassWriter.java @@ -842,6 +842,26 @@ private void generateHadoopWrite(Map columnTypes, sb.append(" }\n"); } + + /** + * Create a list of identifiers to use based on the true column names + * of the table. + * @param colNames the actual column names of the table. + * @return a list of column names in the same order which are + * cleaned up to be used as identifiers in the generated Java class. + */ + private String [] cleanColNames(String [] colNames) { + String [] cleanedColNames = new String[colNames.length]; + for (int i = 0; i < colNames.length; i++) { + String col = colNames[i]; + String identifier = toIdentifier(col); + cleanedColNames[i] = identifier; + } + + return cleanedColNames; + } + + /** * Generate the ORM code for the class. */ @@ -876,19 +896,47 @@ public void generate() throws IOException { // Translate all the column names into names that are safe to // use as identifiers. - String [] cleanedColNames = new String[colNames.length]; - for (int i = 0; i < colNames.length; i++) { - String col = colNames[i]; - String identifier = toIdentifier(col); - cleanedColNames[i] = identifier; + String [] cleanedColNames = cleanColNames(colNames); + for (int i = 0; i < colNames.length; i++) { // Make sure the col->type mapping holds for the // new identifier name, too. + String identifier = cleanedColNames[i]; + String col = colNames[i]; columnTypes.put(identifier, columnTypes.get(col)); } + // The db write() method may use column names in a different + // order. If this is set in the options, pull it out here and + // make sure we format the column names to identifiers in the same way + // as we do for the ordinary column list. + String [] dbWriteColNames = options.getDbOutputColumns(); + String [] cleanedDbWriteColNames = null; + if (null == dbWriteColNames) { + cleanedDbWriteColNames = cleanedColNames; + } else { + cleanedDbWriteColNames = cleanColNames(dbWriteColNames); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("selected columns:"); + for (String col : cleanedColNames) { + LOG.debug(" " + col); + } + + if (cleanedDbWriteColNames != cleanedColNames) { + // dbWrite() has a different set of columns than the rest of the + // generators. + LOG.debug("db write column order:"); + for (String dbCol : cleanedDbWriteColNames) { + LOG.debug(" " + dbCol); + } + } + } + // Generate the Java code. - StringBuilder sb = generateClassForColumns(columnTypes, cleanedColNames); + StringBuilder sb = generateClassForColumns(columnTypes, + cleanedColNames, cleanedDbWriteColNames); // Write this out to a file. String codeOutDir = options.getCodeOutputDir(); @@ -954,10 +1002,13 @@ public void generate() throws IOException { * Generate the ORM code for a table object containing the named columns. * @param columnTypes - mapping from column names to sql types * @param colNames - ordered list of column names for table. + * @param dbWriteColNames - ordered list of column names for the db + * write() method of the class. * @return - A StringBuilder that contains the text of the class code. */ - public StringBuilder generateClassForColumns(Map columnTypes, - String [] colNames) { + private StringBuilder generateClassForColumns( + Map columnTypes, + String [] colNames, String [] dbWriteColNames) { StringBuilder sb = new StringBuilder(); sb.append("// ORM class for " + tableName + "\n"); sb.append("// WARNING: This class is AUTO-GENERATED. " @@ -1011,7 +1062,7 @@ public StringBuilder generateClassForColumns(Map columnTypes, generateFields(columnTypes, colNames, sb); generateDbRead(columnTypes, colNames, sb); generateLoadLargeObjects(columnTypes, colNames, sb); - generateDbWrite(columnTypes, colNames, sb); + generateDbWrite(columnTypes, dbWriteColNames, sb); generateHadoopRead(columnTypes, colNames, sb); generateHadoopWrite(columnTypes, colNames, sb); generateToString(columnTypes, colNames, sb); diff --git a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java index 8d454c2e..852bb0f9 100644 --- a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java +++ b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java @@ -110,6 +110,7 @@ public abstract class BaseSqoopTool extends SqoopTool { public static final String SQL_QUERY_SHORT_ARG = "e"; public static final String VERBOSE_ARG = "verbose"; public static final String HELP_ARG = "help"; + public static final String UPDATE_KEY_ARG = "update-key"; public BaseSqoopTool() { diff --git a/src/java/com/cloudera/sqoop/tool/ExportTool.java b/src/java/com/cloudera/sqoop/tool/ExportTool.java index a8cb92f3..3c98d882 100644 --- a/src/java/com/cloudera/sqoop/tool/ExportTool.java +++ b/src/java/com/cloudera/sqoop/tool/ExportTool.java @@ -19,6 +19,7 @@ package com.cloudera.sqoop.tool; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.commons.cli.CommandLine; @@ -64,7 +65,13 @@ private void exportTable(SqoopOptions options, String tableName) ExportJobContext context = new ExportJobContext(tableName, jarFile, options); - manager.exportTable(context); + if (options.getUpdateKeyCol() != null) { + // UPDATE-based export. + manager.updateTable(context); + } else { + // INSERT-based export. + manager.exportTable(context); + } } @Override @@ -77,6 +84,28 @@ public int run(SqoopOptions options) { codeGenerator.setManager(manager); + String updateKeyCol = options.getUpdateKeyCol(); + if (updateKeyCol != null) { + // We're in update mode. We need to explicitly set the database output + // column ordering in the codeGenerator. The UpdateKeyCol must come + // last, because the UPDATE-based OutputFormat will generate the SET + // clause followed by the WHERE clause, and the SqoopRecord needs to + // serialize to this layout. + String [] allColNames = manager.getColumnNames(options.getTableName()); + List dbOutCols = new ArrayList(); + String upperCaseKeyCol = updateKeyCol.toUpperCase(); + for (String col : allColNames) { + if (!upperCaseKeyCol.equals(col.toUpperCase())) { + dbOutCols.add(col); // add non-key columns to the output order list. + } + } + + // Then add the update key column last. + dbOutCols.add(updateKeyCol); + options.setDbOutputColumns(dbOutCols.toArray( + new String[dbOutCols.size()])); + } + try { exportTable(options, options.getTableName()); } catch (IOException ioe) { @@ -126,6 +155,11 @@ protected RelatedOptions getExportOptions() { .withDescription("HDFS source path for the export") .withLongOpt(EXPORT_PATH_ARG) .create()); + exportOpts.addOption(OptionBuilder.withArgName("key") + .hasArg() + .withDescription("Update records by specified key column") + .withLongOpt(UPDATE_KEY_ARG) + .create()); return exportOpts; } @@ -193,6 +227,10 @@ public void applyOptions(CommandLine in, SqoopOptions out) out.setExistingJarName(in.getOptionValue(JAR_FILE_NAME_ARG)); } + if (in.hasOption(UPDATE_KEY_ARG)) { + out.setUpdateKeyCol(in.getOptionValue(UPDATE_KEY_ARG)); + } + applyInputFormatOptions(in, out); applyOutputFormatOptions(in, out); applyOutputFormatOptions(in, out); @@ -220,6 +258,13 @@ protected void validateExportOptions(SqoopOptions options) && options.getClassName() == null) { throw new InvalidOptionsException("Jar specified with --jar-file, but no " + "class specified with --class-name." + HELP_STR); + } else if (options.getExistingJarName() != null + && options.getUpdateKeyCol() != null) { + // We need to regenerate the class with the output column order set + // correctly for the update-based export. So we can't use a premade + // class. + throw new InvalidOptionsException("Jar cannot be specified with " + + "--jar-file when export is running in update mode."); } } diff --git a/src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlOutputFormat.java b/src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlOutputFormat.java new file mode 100644 index 00000000..9013c55e --- /dev/null +++ b/src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlOutputFormat.java @@ -0,0 +1,308 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.concurrent.SynchronousQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.StringUtils; + +import com.cloudera.sqoop.lib.SqoopRecord; + +/** + * Abstract OutputFormat class that allows the RecordWriter to buffer + * up SQL commands which should be executed in a separate thread after + * enough commands are created. + * + * This supports a configurable "spill threshold" at which + * point intermediate transactions are committed. + * + * Uses DBOutputFormat/DBConfiguration for configuring the output. + * This is used in conjunction with the abstract AsyncSqlRecordWriter + * class. + * + * Clients of this OutputFormat must implement getRecordWriter(); the + * returned RecordWriter is intended to subclass AsyncSqlRecordWriter. + */ +public abstract class AsyncSqlOutputFormat + extends OutputFormat { + + /** conf key: number of rows to export per INSERT statement. */ + public static final String RECORDS_PER_STATEMENT_KEY = + "sqoop.export.records.per.statement"; + + /** conf key: number of INSERT statements to bundle per tx. + * If this is set to -1, then a single transaction will be used + * per task. Note that each statement may encompass multiple + * rows, depending on the value of sqoop.export.records.per.statement. + */ + public static final String STATEMENTS_PER_TRANSACTION_KEY = + "sqoop.export.statements.per.transaction"; + + /** + * Default number of records to put in an INSERT statement or + * other batched update statement. + */ + public static final int DEFAULT_RECORDS_PER_STATEMENT = 100; + + /** + * Default number of statements to execute before committing the + * current transaction. + */ + public static final int DEFAULT_STATEMENTS_PER_TRANSACTION = 100; + + /** + * Value for STATEMENTS_PER_TRANSACTION_KEY signifying that we should + * not commit until the RecordWriter is being closed, regardless of + * the number of statements we execute. + */ + public static final int UNLIMITED_STATEMENTS_PER_TRANSACTION = -1; + + private static final Log LOG = LogFactory.getLog(AsyncSqlOutputFormat.class); + + @Override + /** {@inheritDoc} */ + public void checkOutputSpecs(JobContext context) + throws IOException, InterruptedException { + } + + @Override + /** {@inheritDoc} */ + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new OutputCommitter() { + public void abortTask(TaskAttemptContext taskContext) { } + public void cleanupJob(JobContext jobContext) { } + public void commitTask(TaskAttemptContext taskContext) { } + public boolean needsTaskCommit(TaskAttemptContext taskContext) { + return false; + } + public void setupJob(JobContext jobContext) { } + public void setupTask(TaskAttemptContext taskContext) { } + }; + } + + /** + * Represents a database update operation that should be performed + * by an asynchronous background thread. + * AsyncDBOperation objects are immutable. + * They MAY contain a statement which should be executed. The + * statement may also be null. + * + * They may also set 'commitAndClose' to true. If true, then the + * executor of this operation should commit the current + * transaction, even if stmt is null, and then stop the executor + * thread. + */ + public static class AsyncDBOperation { + private final PreparedStatement stmt; + private final boolean commitAndClose; + private final boolean isBatch; + + /** + * Create an asynchronous database operation. + * @param s the statement, if any, to execute. + * @param commitAndClose if true, the current transaction should be + * committed, and the executor thread should stop after this operation. + * @param batch is true if this is a batch PreparedStatement, or false + * if it's a normal singleton statement. + */ + public AsyncDBOperation(PreparedStatement s, boolean commitAndClose, + boolean batch) { + this.stmt = s; + this.commitAndClose = commitAndClose; + this.isBatch = batch; + } + + /** + * @return a statement to run as an update. + */ + public PreparedStatement getStatement() { + return stmt; + } + + /** + * @return true if the executor should commit the current transaction. + * If getStatement() is non-null, the statement is run first. + */ + public boolean requiresCommit() { + return this.commitAndClose; + } + + /** + * @return true if the executor should stop after this command. + */ + public boolean stop() { + return this.commitAndClose; + } + + /** + * @return true if this is a batch SQL statement. + */ + public boolean execAsBatch() { + return this.isBatch; + } + } + + /** + * A thread that runs the database interactions asynchronously + * from the OutputCollector. + */ + public static class AsyncSqlExecThread extends Thread { + + private final Connection conn; // The connection to the database. + private SQLException err; // Error from a previously-run statement. + + // How we receive database operations from the RecordWriter. + private SynchronousQueue opsQueue; + + protected int curNumStatements; // statements executed thus far in the tx. + protected final int stmtsPerTx; // statements per transaction. + + /** + * Create a new update thread that interacts with the database. + * @param conn the connection to use. This must only be used by this + * thread. + * @param stmtsPerTx the number of statements to execute before committing + * the current transaction. + */ + public AsyncSqlExecThread(Connection conn, int stmtsPerTx) { + this.conn = conn; + this.err = null; + this.opsQueue = new SynchronousQueue(); + this.stmtsPerTx = stmtsPerTx; + } + + public void run() { + while (true) { + AsyncDBOperation op = null; + try { + op = opsQueue.take(); + } catch (InterruptedException ie) { + LOG.warn("Interrupted retrieving from operation queue: " + + StringUtils.stringifyException(ie)); + continue; + } + + if (null == op) { + // This shouldn't be allowed to happen. + LOG.warn("Null operation in queue; illegal state."); + continue; + } + + PreparedStatement stmt = op.getStatement(); + // Synchronize on the connection to ensure it does not conflict + // with the prepareStatement() call in the main thread. + synchronized (conn) { + try { + if (null != stmt) { + if (op.execAsBatch()) { + stmt.executeBatch(); + } else { + // Normal update. + stmt.executeUpdate(); + } + stmt.close(); + stmt = null; + this.curNumStatements++; + } + + if (op.requiresCommit() || (curNumStatements >= stmtsPerTx + && stmtsPerTx != UNLIMITED_STATEMENTS_PER_TRANSACTION)) { + LOG.debug("Committing transaction of " + curNumStatements + + " statements"); + this.conn.commit(); + this.curNumStatements = 0; + } + } catch (SQLException sqlE) { + setLastError(sqlE); + } finally { + // Close the statement on our way out if that didn't happen + // via the normal execution path. + if (null != stmt) { + try { + stmt.close(); + } catch (SQLException sqlE) { + setLastError(sqlE); + } + } + + // Always check whether we should end the loop, regardless + // of the presence of an exception. + if (op.stop()) { + // Don't continue processing after this operation. + try { + conn.close(); + } catch (SQLException sqlE) { + setLastError(sqlE); + } + return; + } + } // try .. catch .. finally. + } // synchronized (conn) + } + } + + /** + * Allows a user to enqueue the next database operation to run. + * Since the connection can only execute a single operation at a time, + * the put() method may block if another operation is already underway. + * @param op the database operation to perform. + */ + public void put(AsyncDBOperation op) throws InterruptedException { + opsQueue.put(op); + } + + /** + * If a previously-executed statement resulted in an error, post it here. + * If the error slot was already filled, then subsequent errors are + * squashed until the user calls this method (which clears the error + * slot). + * @return any SQLException that occurred due to a previously-run + * statement. + */ + public synchronized SQLException getLastError() { + SQLException e = this.err; + this.err = null; + return e; + } + + private synchronized void setLastError(SQLException e) { + if (this.err == null) { + // Just set it. + LOG.error("Got exception in update thread: " + + StringUtils.stringifyException(e)); + this.err = e; + } else { + // Slot is full. Log it and discard. + LOG.error("SQLException in update thread but error slot full: " + + StringUtils.stringifyException(e)); + } + } + } +} diff --git a/src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java b/src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java new file mode 100644 index 00000000..b7720674 --- /dev/null +++ b/src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java @@ -0,0 +1,207 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; + +import com.cloudera.sqoop.lib.SqoopRecord; + +/** + * Abstract RecordWriter base class that buffers SqoopRecords to be injected + * into JDBC SQL PreparedStatements to be executed by the + * AsyncSqlOutputFormat's background thread. + * + * Record objects are buffered before actually performing the INSERT + * statements; this requires that the key implement the SqoopRecord interface. + * + * Uses DBOutputFormat/DBConfiguration for configuring the output. + */ +public abstract class AsyncSqlRecordWriter + extends RecordWriter { + + private Connection connection; + + private Configuration conf; + + protected final int rowsPerStmt; // rows to insert per statement. + + // Buffer for records to be put into export SQL statements. + private List records; + + // Background thread to actually perform the updates. + private AsyncSqlOutputFormat.AsyncSqlExecThread execThread; + private boolean startedExecThread; + + public AsyncSqlRecordWriter(TaskAttemptContext context) + throws ClassNotFoundException, SQLException { + this.conf = context.getConfiguration(); + + this.rowsPerStmt = conf.getInt( + AsyncSqlOutputFormat.RECORDS_PER_STATEMENT_KEY, + AsyncSqlOutputFormat.DEFAULT_RECORDS_PER_STATEMENT); + int stmtsPerTx = conf.getInt( + AsyncSqlOutputFormat.STATEMENTS_PER_TRANSACTION_KEY, + AsyncSqlOutputFormat.DEFAULT_STATEMENTS_PER_TRANSACTION); + + DBConfiguration dbConf = new DBConfiguration(conf); + this.connection = dbConf.getConnection(); + this.connection.setAutoCommit(false); + + this.records = new ArrayList(this.rowsPerStmt); + + this.execThread = new AsyncSqlOutputFormat.AsyncSqlExecThread( + connection, stmtsPerTx); + this.execThread.setDaemon(true); + this.startedExecThread = false; + } + + /** + * Allow subclasses access to the Connection instance we hold. + * This Connection is shared with the asynchronous SQL exec thread. + * Any uses of the Connection must be synchronized on it. + * @return the Connection object used for this SQL transaction. + */ + protected final Connection getConnection() { + return this.connection; + } + + /** + * Allow subclasses access to the Configuration. + * @return the Configuration for this MapReduc task. + */ + protected final Configuration getConf() { + return this.conf; + } + + /** + * Should return 'true' if the PreparedStatements generated by the + * RecordWriter are intended to be executed in "batch" mode, or false + * if it's just one big statement. + */ + protected boolean isBatchExec() { + return false; + } + + /** + * Generate the PreparedStatement object that will be fed into the execution + * thread. All parameterized fields of the PreparedStatement must be set in + * this method as well; this is usually based on the records collected from + * the user in the userRecords list. + * + * Note that any uses of the Connection object here must be synchronized on + * the Connection. + * + * @param userRecords a list of records that should be injected into SQL + * statements. + * @return a PreparedStatement to be populated with rows + * from the collected record list. + */ + protected abstract PreparedStatement getPreparedStatement( + List userRecords) throws SQLException; + + /** + * Takes the current contents of 'records' and formats and executes the + * INSERT statement. + * @param closeConn if true, commits the transaction and closes the + * connection. + */ + private void execUpdate(boolean closeConn) + throws InterruptedException, SQLException { + + if (!startedExecThread) { + this.execThread.start(); + this.startedExecThread = true; + } + + PreparedStatement stmt = null; + boolean successfulPut = false; + try { + if (records.size() > 0) { + stmt = getPreparedStatement(records); + this.records.clear(); + } + + // Pass this operation off to the update thread. This will block if + // the update thread is already performing an update. + AsyncSqlOutputFormat.AsyncDBOperation op = + new AsyncSqlOutputFormat.AsyncDBOperation(stmt, closeConn, + isBatchExec()); + execThread.put(op); + successfulPut = true; // op has been posted to the other thread. + } finally { + if (!successfulPut && null != stmt) { + // We created a statement but failed to enqueue it. Close it. + stmt.close(); + } + } + + // Check for any previous SQLException. If one happened, rethrow it here. + SQLException lastException = execThread.getLastError(); + if (null != lastException) { + throw lastException; + } + } + + @Override + /** {@inheritDoc} */ + public void close(TaskAttemptContext context) + throws IOException, InterruptedException { + try { + execUpdate(true); + } catch (SQLException sqle) { + throw new IOException(sqle); + } finally { + execThread.join(); + } + + // If we're not leaving on an error return path already, + // now that execThread is definitely stopped, check that the + // error slot remains empty. + SQLException lastErr = execThread.getLastError(); + if (null != lastErr) { + throw new IOException(lastErr); + } + } + + @Override + /** {@inheritDoc} */ + public void write(K key, V value) + throws InterruptedException, IOException { + try { + records.add((SqoopRecord) key.clone()); + if (records.size() >= this.rowsPerStmt) { + execUpdate(false); + } + } catch (CloneNotSupportedException cnse) { + throw new IOException("Could not buffer record", cnse); + } catch (SQLException sqlException) { + throw new IOException(sqlException); + } + } +} diff --git a/src/shims/common/com/cloudera/sqoop/mapreduce/ExportOutputFormat.java b/src/shims/common/com/cloudera/sqoop/mapreduce/ExportOutputFormat.java index 8e517da2..7f494d34 100644 --- a/src/shims/common/com/cloudera/sqoop/mapreduce/ExportOutputFormat.java +++ b/src/shims/common/com/cloudera/sqoop/mapreduce/ExportOutputFormat.java @@ -22,20 +22,16 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; -import java.util.ArrayList; +import java.util.Arrays; import java.util.List; -import java.util.concurrent.SynchronousQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; -import org.apache.hadoop.util.StringUtils; import com.cloudera.sqoop.lib.SqoopRecord; @@ -51,23 +47,7 @@ * Uses DBOutputFormat/DBConfiguration for configuring the output. */ public class ExportOutputFormat - extends OutputFormat { - - /** conf key: number of rows to export per INSERT statement. */ - public static final String RECORDS_PER_STATEMENT_KEY = - "sqoop.export.records.per.statement"; - - /** conf key: number of INSERT statements to bundle per tx. - * If this is set to -1, then a single transaction will be used - * per task. Note that each statement may encompass multiple - * rows, depending on the value of sqoop.export.records.per.statement. - */ - public static final String STATEMENTS_PER_TRANSACTION_KEY = - "sqoop.export.statements.per.transaction"; - - private static final int DEFAULT_RECORDS_PER_STATEMENT = 100; - private static final int DEFAULT_STATEMENTS_PER_TRANSACTION = 100; - private static final int UNLIMITED_STATEMENTS_PER_TRANSACTION = -1; + extends AsyncSqlOutputFormat { private static final Log LOG = LogFactory.getLog(ExportOutputFormat.class); @@ -90,22 +70,6 @@ public void checkOutputSpecs(JobContext context) } } - @Override - /** {@inheritDoc} */ - public OutputCommitter getOutputCommitter(TaskAttemptContext context) - throws IOException, InterruptedException { - return new OutputCommitter() { - public void abortTask(TaskAttemptContext taskContext) { } - public void cleanupJob(JobContext jobContext) { } - public void commitTask(TaskAttemptContext taskContext) { } - public boolean needsTaskCommit(TaskAttemptContext taskContext) { - return false; - } - public void setupJob(JobContext jobContext) { } - public void setupTask(TaskAttemptContext taskContext) { } - }; - } - @Override /** {@inheritDoc} */ public RecordWriter getRecordWriter(TaskAttemptContext context) @@ -117,236 +81,74 @@ public RecordWriter getRecordWriter(TaskAttemptContext context) } } - /** - * Represents a database update operation that should be performed - * by an asynchronous background thread. - * AsyncDBOperation objects are immutable. - * They MAY contain a statement which should be executed. The - * statement may also be null. - * - * They may also set 'forceCommit' to true. If true, then the - * executor of this operation should commit the current - * transaction, even if stmt is null. - */ - private static class AsyncDBOperation { - private final PreparedStatement stmt; - private final boolean forceCommit; - private final boolean close; - - /** - * Create an asynchronous database operation. - * @param s the statement, if any, to execute. - * @param forceCommit if true, the current transaction should be committed. - * @param close if true, the executor thread should stop after processing - * this operation. - */ - public AsyncDBOperation(PreparedStatement s, boolean forceCommit, - boolean close) { - this.stmt = s; - this.forceCommit = forceCommit; - this.close = close; - } - - /** - * @return a statement to run as an update. - */ - public PreparedStatement getStatement() { - return stmt; - } - - /** - * @return true if the executor should commit the current transaction. - * If getStatement() is non-null, the statement is run first. - */ - public boolean requiresCommit() { - return forceCommit; - } - - /** - * @return true if the executor should stop after this command. - */ - public boolean stop() { - return this.close; - } - } - - /** - * A thread that runs the database interactions asynchronously - * from the OutputCollector. - */ - private static class ExportUpdateThread extends Thread { - - private final Connection conn; // The connection to the database. - private SQLException err; // Error from a previously-run statement. - - // How we receive database operations from the RecordWriter. - private SynchronousQueue opsQueue; - - protected int curNumStatements; // statements executed thus far in the tx. - protected final int stmtsPerTx; // statements per transaction. - - /** - * Create a new update thread that interacts with the database. - * @param conn the connection to use. This must only be used by this - * thread. - * @param stmtsPerTx the number of statements to execute before committing - * the current transaction. - */ - public ExportUpdateThread(Connection conn, int stmtsPerTx) { - this.conn = conn; - this.err = null; - this.opsQueue = new SynchronousQueue(); - this.stmtsPerTx = stmtsPerTx; - } - - public void run() { - while (true) { - AsyncDBOperation op = null; - try { - op = opsQueue.take(); - } catch (InterruptedException ie) { - LOG.warn("Interrupted retrieving from operation queue: " - + StringUtils.stringifyException(ie)); - continue; - } - - if (null == op) { - // This shouldn't be allowed to happen. - LOG.warn("Null operation in queue; illegal state."); - continue; - } - - PreparedStatement stmt = op.getStatement(); - // Synchronize on the connection to ensure it does not conflict - // with the prepareStatement() call in the main thread. - synchronized (conn) { - try { - if (null != stmt) { - stmt.executeUpdate(); - stmt.close(); - stmt = null; - this.curNumStatements++; - } - - if (op.requiresCommit() || (curNumStatements >= stmtsPerTx - && stmtsPerTx != UNLIMITED_STATEMENTS_PER_TRANSACTION)) { - LOG.debug("Committing transaction of " + curNumStatements - + " statements"); - this.conn.commit(); - this.curNumStatements = 0; - } - } catch (SQLException sqlE) { - setLastError(sqlE); - } finally { - // Close the statement on our way out if that didn't happen - // via the normal execution path. - if (null != stmt) { - try { - stmt.close(); - } catch (SQLException sqlE) { - setLastError(sqlE); - } - } - - // Always check whether we should end the loop, regardless - // of the presence of an exception. - if (op.stop()) { - // Don't continue processing after this operation. - try { - conn.close(); - } catch (SQLException sqlE) { - setLastError(sqlE); - } - return; - } - } // try .. catch .. finally. - } // synchronized (conn) - } - } - - /** - * Allows a user to enqueue the next database operation to run. - * Since the connection can only execute a single operation at a time, - * the put() method may block if another operation is already underway. - * @param op the database operation to perform. - */ - public void put(AsyncDBOperation op) throws InterruptedException { - opsQueue.put(op); - } - - /** - * If a previously-executed statement resulted in an error, post it here. - * If the error slot was already filled, then subsequent errors are - * squashed until the user calls this method (which clears the error - * slot). - * @return any SQLException that occurred due to a previously-run - * statement. - */ - public synchronized SQLException getLastError() { - SQLException e = this.err; - this.err = null; - return e; - } - - private synchronized void setLastError(SQLException e) { - if (this.err == null) { - // Just set it. - LOG.error("Got exception in update thread: " - + StringUtils.stringifyException(e)); - this.err = e; - } else { - // Slot is full. Log it and discard. - LOG.error("SQLException in update thread but error slot full: " - + StringUtils.stringifyException(e)); - } - } - } - /** * RecordWriter to write the output to a row in a database table. * The actual database updates are executed in a second thread. */ - public class ExportRecordWriter extends RecordWriter { + public class ExportRecordWriter extends AsyncSqlRecordWriter { - protected Connection connection; - - protected Configuration conf; - - protected int rowsPerStmt; // rows to insert per statement. - - // Buffer for records to be put in an INSERT statement. - protected List records; - - protected String tableName; - protected String [] columnNames; // The columns to insert into. - protected int columnCount; // If columnNames is null, tells ## of cols. - - // Background thread to actually perform the updates. - private ExportUpdateThread updateThread; - private boolean startedUpdateThread; + private String tableName; + private String [] columnNames; // The columns to insert into. + private int columnCount; // If columnNames is null, tells ## of cols. public ExportRecordWriter(TaskAttemptContext context) throws ClassNotFoundException, SQLException { - this.conf = context.getConfiguration(); + super(context); - this.rowsPerStmt = conf.getInt(RECORDS_PER_STATEMENT_KEY, - DEFAULT_RECORDS_PER_STATEMENT); - int stmtsPerTx = conf.getInt(STATEMENTS_PER_TRANSACTION_KEY, - DEFAULT_STATEMENTS_PER_TRANSACTION); + Configuration conf = getConf(); DBConfiguration dbConf = new DBConfiguration(conf); - this.connection = dbConf.getConnection(); this.tableName = dbConf.getOutputTableName(); this.columnNames = dbConf.getOutputFieldNames(); this.columnCount = dbConf.getOutputFieldCount(); + } - this.connection.setAutoCommit(false); + /** + * @return the name of the table we are inserting into. + */ + protected final String getTableName() { + return tableName; + } - this.records = new ArrayList(this.rowsPerStmt); + /** + * @return the list of columns we are updating. + */ + protected final String [] getColumnNames() { + if (null == columnNames) { + return null; + } else { + return Arrays.copyOf(columnNames, columnNames.length); + } + } - this.updateThread = new ExportUpdateThread(connection, stmtsPerTx); - this.updateThread.setDaemon(true); - this.startedUpdateThread = false; + /** + * @return the number of columns we are updating. + */ + protected final int getColumnCount() { + return columnCount; + } + + @Override + /** {@inheritDoc} */ + protected PreparedStatement getPreparedStatement( + List userRecords) throws SQLException { + + PreparedStatement stmt = null; + + // Synchronize on connection to ensure this does not conflict + // with the operations in the update thread. + Connection conn = getConnection(); + synchronized (conn) { + stmt = conn.prepareStatement(getInsertStatement(userRecords.size())); + } + + // Inject the record parameters into the VALUES clauses. + int position = 0; + for (SqoopRecord record : userRecords) { + position += record.write(stmt, position); + } + + return stmt; } /** @@ -402,95 +204,5 @@ protected String getInsertStatement(int numRows) { return sb.toString(); } - - /** - * Takes the current contents of 'records' and formats and executes the - * INSERT statement. - * @param closeConn if true, commits the transaction and closes the - * connection. - */ - private void insertRows(boolean closeConn) - throws InterruptedException, SQLException { - - if (!startedUpdateThread) { - this.updateThread.start(); - this.startedUpdateThread = true; - } - - PreparedStatement stmt = null; - boolean successfulPut = false; - try { - if (records.size() > 0) { - // Synchronize on connection to ensure this does not conflict - // with the operations in the update thread. - synchronized (connection) { - stmt = connection.prepareStatement( - getInsertStatement(records.size())); - } - - // Inject the record parameters into the VALUES clauses. - int position = 0; - for (SqoopRecord record : records) { - position += record.write(stmt, position); - } - - this.records.clear(); - } - - // Pass this operation off to the update thread. This will block if - // the update thread is already performing an update. - AsyncDBOperation op = new AsyncDBOperation(stmt, closeConn, closeConn); - updateThread.put(op); - successfulPut = true; // op has been posted to the other thread. - } finally { - if (!successfulPut && null != stmt) { - // We created a statement but failed to enqueue it. Close it. - stmt.close(); - } - } - - // Check for any previous SQLException. If one happened, rethrow it here. - SQLException lastException = updateThread.getLastError(); - if (null != lastException) { - throw lastException; - } - } - - @Override - /** {@inheritDoc} */ - public void close(TaskAttemptContext context) - throws IOException, InterruptedException { - try { - insertRows(true); - } catch (SQLException sqle) { - throw new IOException(sqle); - } finally { - updateThread.join(); - } - - // If we're not leaving on an error return path already, - // now that updateThread is definitely stopped, check that the - // error slot remains empty. - SQLException lastErr = updateThread.getLastError(); - if (null != lastErr) { - throw new IOException(lastErr); - } - } - - @Override - /** {@inheritDoc} */ - public void write(K key, V value) - throws InterruptedException, IOException { - try { - records.add((SqoopRecord) key.clone()); - if (records.size() >= this.rowsPerStmt) { - insertRows(false); - } - } catch (CloneNotSupportedException cnse) { - throw new IOException("Could not buffer record", cnse); - } catch (SQLException sqlException) { - throw new IOException(sqlException); - } - } } } diff --git a/src/shims/common/com/cloudera/sqoop/mapreduce/OracleExportOutputFormat.java b/src/shims/common/com/cloudera/sqoop/mapreduce/OracleExportOutputFormat.java index 3e70ae78..38fb1ef2 100644 --- a/src/shims/common/com/cloudera/sqoop/mapreduce/OracleExportOutputFormat.java +++ b/src/shims/common/com/cloudera/sqoop/mapreduce/OracleExportOutputFormat.java @@ -61,15 +61,16 @@ public OracleExportRecordWriter(TaskAttemptContext context) protected String getInsertStatement(int numRows) { StringBuilder sb = new StringBuilder(); - sb.append("INSERT INTO " + tableName + " "); + sb.append("INSERT INTO " + getTableName() + " "); int numSlots; - if (this.columnNames != null) { - numSlots = this.columnNames.length; + String [] colNames = getColumnNames(); + if (colNames != null) { + numSlots = colNames.length; sb.append("("); boolean first = true; - for (String col : columnNames) { + for (String col : colNames) { if (!first) { sb.append(", "); } @@ -80,7 +81,7 @@ protected String getInsertStatement(int numRows) { sb.append(") "); } else { - numSlots = this.columnCount; // set if columnNames is null. + numSlots = getColumnCount(); // set if columnNames is null. } // generates the (?, ?, ?...) used for each row. diff --git a/src/shims/common/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java b/src/shims/common/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java new file mode 100644 index 00000000..793c8370 --- /dev/null +++ b/src/shims/common/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java @@ -0,0 +1,188 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; + +import com.cloudera.sqoop.lib.SqoopRecord; + +/** + * Update an existing table of data with new value data. + * This requires a designated 'key column' for the WHERE clause + * of an UPDATE statement. + * + * Updates are executed en batch in the PreparedStatement. + * + * Uses DBOutputFormat/DBConfiguration for configuring the output. + */ +public class UpdateOutputFormat + extends AsyncSqlOutputFormat { + + private static final Log LOG = LogFactory.getLog(UpdateOutputFormat.class); + + @Override + /** {@inheritDoc} */ + public void checkOutputSpecs(JobContext context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + DBConfiguration dbConf = new DBConfiguration(conf); + + // Sanity check all the configuration values we need. + if (null == conf.get(DBConfiguration.URL_PROPERTY)) { + throw new IOException("Database connection URL is not set."); + } else if (null == dbConf.getOutputTableName()) { + throw new IOException("Table name is not set for export."); + } else if (null == dbConf.getOutputFieldNames()) { + throw new IOException( + "Output field names are null."); + } else if (null == conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY)) { + throw new IOException("Update key column is not set for export."); + } + } + + @Override + /** {@inheritDoc} */ + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException { + try { + return new UpdateRecordWriter(context); + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * RecordWriter to write the output to UPDATE statements modifying rows + * in the database. + */ + public class UpdateRecordWriter extends AsyncSqlRecordWriter { + + private String tableName; + private String [] columnNames; // The columns to update. + private String updateCol; // The column containing the fixed key. + + public UpdateRecordWriter(TaskAttemptContext context) + throws ClassNotFoundException, SQLException { + super(context); + + Configuration conf = getConf(); + + DBConfiguration dbConf = new DBConfiguration(conf); + this.tableName = dbConf.getOutputTableName(); + this.columnNames = dbConf.getOutputFieldNames(); + this.updateCol = conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY); + } + + @Override + /** {@inheritDoc} */ + protected boolean isBatchExec() { + // We use batches here. + return true; + } + + /** + * @return the name of the table we are inserting into. + */ + protected final String getTableName() { + return tableName; + } + + /** + * @return the list of columns we are updating. + */ + protected final String [] getColumnNames() { + if (null == columnNames) { + return null; + } else { + return Arrays.copyOf(columnNames, columnNames.length); + } + } + + /** + * @return the column we are using to determine the row to update. + */ + protected final String getUpdateCol() { + return updateCol; + } + + @Override + /** {@inheritDoc} */ + protected PreparedStatement getPreparedStatement( + List userRecords) throws SQLException { + + PreparedStatement stmt = null; + + // Synchronize on connection to ensure this does not conflict + // with the operations in the update thread. + Connection conn = getConnection(); + synchronized (conn) { + stmt = conn.prepareStatement(getUpdateStatement()); + } + + // Inject the record parameters into the UPDATE and WHERE clauses. This + // assumes that the update key column is the last column serialized in + // by the underlying record. Our code auto-gen process for exports was + // responsible for taking care of this constraint. + for (SqoopRecord record : userRecords) { + record.write(stmt, 0); + stmt.addBatch(); + } + + return stmt; + } + + /** + * @return an UPDATE statement that modifies rows based on a single key + * column (with the intent of modifying a single row). + */ + protected String getUpdateStatement() { + StringBuilder sb = new StringBuilder(); + sb.append("UPDATE " + this.tableName + " SET "); + + boolean first = true; + for (String col : this.columnNames) { + if (!first) { + sb.append(", "); + } + + sb.append(col); + sb.append("=?"); + first = false; + } + + sb.append(" WHERE "); + sb.append(this.updateCol); + sb.append("=?"); + return sb.toString(); + } + } +} diff --git a/src/test/com/cloudera/sqoop/SmokeTests.java b/src/test/com/cloudera/sqoop/SmokeTests.java index 4e052208..e3b48ae5 100644 --- a/src/test/com/cloudera/sqoop/SmokeTests.java +++ b/src/test/com/cloudera/sqoop/SmokeTests.java @@ -73,6 +73,7 @@ public static Test suite() { suite.addTestSuite(TestLargeObjectLoader.class); suite.addTestSuite(TestDirectImportUtils.class); suite.addTestSuite(TestLobFile.class); + suite.addTestSuite(TestExportUpdate.class); suite.addTest(MapreduceTests.suite()); return suite; diff --git a/src/test/com/cloudera/sqoop/TestExportUpdate.java b/src/test/com/cloudera/sqoop/TestExportUpdate.java new file mode 100644 index 00000000..531c5390 --- /dev/null +++ b/src/test/com/cloudera/sqoop/TestExportUpdate.java @@ -0,0 +1,371 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.StringUtils; + +import com.cloudera.sqoop.testutil.ExportJobTestCase; + +import org.junit.Before; + +/** + * Test that we can update a copy of data in the database, + * based on newer data in HDFS. + */ +public class TestExportUpdate extends ExportJobTestCase { + + @Before + public void setUp() { + // start the server + super.setUp(); + + if (useHsqldbTestServer()) { + // throw away any existing data that might be in the database. + try { + this.getTestServer().dropExistingSchema(); + } catch (SQLException sqlE) { + fail(sqlE.toString()); + } + } + } + + @Override + protected String getTablePrefix() { + return "UPDATE_TABLE_"; + } + + /** + * Create the underlying table to update. + */ + private void populateDatabase(int numRows) throws SQLException { + Connection conn = getConnection(); + + PreparedStatement statement = conn.prepareStatement( + "CREATE TABLE " + getTableName() + + " (A INT NOT NULL, B VARCHAR(32), C INT)"); + try { + statement.executeUpdate(); + conn.commit(); + } finally { + statement.close(); + statement = null; + } + + try { + for (int i = 0; i < numRows; i++) { + statement = conn.prepareStatement("INSERT INTO " + getTableName() + + " VALUES (" + i + ", 'foo" + i + "', " + i + ")"); + statement.executeUpdate(); + statement.close(); + statement = null; + } + } finally { + if (null != statement) { + statement.close(); + } + } + + conn.commit(); + } + + /** + * Create a set of files that will be used as the input to the update + * process. + * @param numFiles the number of files to generate + * @param updatesPerFile the number of rows to create in each file + * @param keyCol a value between 0 and 2 specifying whether 'a', + * 'b', or 'c' ({@see populateDatabase()}) is the key column to keep + * the same. + * @param startOffsets is an optional list of row ids/values for a/c + * which are the record ids at which the update files begin. + * For instance, if numFiles=3, updatesPerFile=2, and keyCol=0 then + * if startOffsets is {5, 10, 12}, files will be generated to update + * rows with A=5,6; A=10,11; A=12,13. + * + * If startOffsets is empty or underspecified (given numFiles), then + * subsequent files will start immediately after the previous file. + */ + private void createUpdateFiles(int numFiles, int updatesPerFile, + int keyCol, int... startOffsets) throws IOException { + + FileSystem fs = FileSystem.getLocal(new Configuration()); + + int rowId = 0; + for (int i = 0; i < numFiles; i++) { + OutputStream os = fs.create(new Path(getTablePath(), "" + i + ".txt")); + BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os)); + + if (null != startOffsets && startOffsets.length > i) { + // If a start offset has been specified for this file, go there. + // Otherwise, just carry over from the previous file iteration. + rowId = startOffsets[i]; + } + + for (int j = 0; j < updatesPerFile; j++) { + w.write(getUpdateStringForRow(keyCol, rowId++)); + } + + w.close(); + os.close(); + } + } + + /** + * Generate a string of text representing an update for one row + * of the database. keyCol is a value in [0, 2] representing which + * column is kept fixed. rowId specifies the row being updated. + */ + private String getUpdateStringForRow(int keyCol, int rowId) { + StringBuilder sb = new StringBuilder(); + + int [] rowInts = new int[3]; // There are 3 columns in the table. + for (int i = 0; i < 3; i++) { + if (keyCol == i) { + // Keep this column fixed. + rowInts[i] = rowId; + } else { + // Update the int in this column. + rowInts[i] = rowId * 2; + } + } + + sb.append(rowInts[0]); + sb.append("\tfoo"); + sb.append(rowInts[1]); + sb.append("\t"); + sb.append(rowInts[2]); + sb.append("\n"); + + return sb.toString(); + } + + /** + * Verifies the number of rows in the table. + */ + private void verifyRowCount(int expectedCount) throws SQLException { + String query = "SELECT COUNT(*) FROM " + getTableName(); + PreparedStatement statement = null; + ResultSet rs = null; + + try { + Connection conn = getConnection(); + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + + boolean success = rs.next(); + assertTrue("Expected at least one result", success); + + int trueCount = rs.getInt(1); + assertEquals(expectedCount, trueCount); + + // This query should have returned exactly one row. + success = rs.next(); + assertFalse("Expected no more than one output record", success); + } finally { + if (null != rs) { + try { + rs.close(); + } catch (SQLException sqle) { + LOG.error("Error closing result set: " + + StringUtils.stringifyException(sqle)); + } + } + + if (null != statement) { + statement.close(); + } + } + } + + /** + * Verify that a particular row has the expected values. + */ + private void verifyRow(String keyColName, String keyVal, + String... expectedVals) throws SQLException { + String query = "SELECT A, B, C FROM " + getTableName() + " WHERE " + + keyColName + " = " + keyVal; + PreparedStatement statement = null; + ResultSet rs = null; + + try { + Connection conn = getConnection(); + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + + boolean success = rs.next(); + assertTrue("Expected at least one output record", success); + + // Assert that all three columns have the correct values. + for (int i = 0; i < expectedVals.length; i++) { + String expected = expectedVals[i]; + String result = rs.getString(i + 1); + assertEquals("Invalid response for column " + i + "; got " + result + + " when expected " + expected, expected, result); + } + + // This query should have returned exactly one row. + success = rs.next(); + assertFalse("Expected no more than one output record", success); + } finally { + if (null != rs) { + try { + rs.close(); + } catch (SQLException sqle) { + LOG.error("Error closing result set: " + + StringUtils.stringifyException(sqle)); + } + } + + if (null != statement) { + statement.close(); + } + } + } + + private void runUpdate(int numMappers, String updateCol) throws IOException { + runExport(getArgv(true, 2, 2, "-m", "" + numMappers, + "--update-key", updateCol)); + } + + public void testBasicUpdate() throws Exception { + // Test that we can do a single-task single-file update. + // This updates the entire database. + + populateDatabase(10); + createUpdateFiles(1, 10, 0, 0); + runUpdate(1, "A"); + verifyRowCount(10); + // Check a few rows... + verifyRow("A", "0", "0", "foo0", "0"); + verifyRow("A", "1", "1", "foo2", "2"); + verifyRow("A", "9", "9", "foo18", "18"); + } + + public void testEmptyTable() throws Exception { + // Test that an empty table will "accept" updates that modify + // no rows; no new data is injected into the database. + populateDatabase(0); + createUpdateFiles(1, 10, 0, 0); + runUpdate(1, "A"); + verifyRowCount(0); + } + + public void testEmptyFiles() throws Exception { + // An empty input file results in no changes to a db table. + populateDatabase(10); + createUpdateFiles(1, 0, 0); + runUpdate(1, "A"); + verifyRowCount(10); + // Check that a few rows have not changed at all. + verifyRow("A", "0", "0", "foo0", "0"); + verifyRow("A", "1", "1", "foo1", "1"); + verifyRow("A", "9", "9", "foo9", "9"); + } + + public void testStringCol() throws Exception { + // Test that we can do modifications based on the string "B" column. + populateDatabase(10); + createUpdateFiles(1, 10, 1); + runUpdate(1, "B"); + verifyRowCount(10); + verifyRow("B", "'foo0'", "0", "foo0", "0"); + verifyRow("B", "'foo1'", "2", "foo1", "2"); + verifyRow("B", "'foo9'", "18", "foo9", "18"); + } + + public void testLastCol() throws Exception { + // Test that we can do modifications based on the third int column. + populateDatabase(10); + createUpdateFiles(1, 10, 2); + runUpdate(1, "C"); + verifyRowCount(10); + verifyRow("C", "0", "0", "foo0", "0"); + verifyRow("C", "1", "2", "foo2", "1"); + verifyRow("C", "9", "18", "foo18", "9"); + } + + public void testMultiMaps() throws Exception { + // Test that we can handle multiple map tasks. + populateDatabase(20); + createUpdateFiles(2, 10, 0); + runUpdate(1, "A"); + verifyRowCount(20); + verifyRow("A", "0", "0", "foo0", "0"); + verifyRow("A", "1", "1", "foo2", "2"); + verifyRow("A", "9", "9", "foo18", "18"); + verifyRow("A", "10", "10", "foo20", "20"); + verifyRow("A", "15", "15", "foo30", "30"); + verifyRow("A", "19", "19", "foo38", "38"); + } + + public void testSubsetUpdate() throws Exception { + // Update only a few rows in the middle of the table. + populateDatabase(10); + createUpdateFiles(1, 5, 0, 3); // only rows A=3..7 change. + runUpdate(1, "A"); + verifyRowCount(10); + + // Verify these rows are unchanged. + verifyRow("A", "0", "0", "foo0", "0"); + verifyRow("A", "2", "2", "foo2", "2"); + verifyRow("A", "8", "8", "foo8", "8"); + verifyRow("A", "9", "9", "foo9", "9"); + + // Verify these rows have been updated. + verifyRow("A", "3", "3", "foo6", "6"); + verifyRow("A", "5", "5", "foo10", "10"); + verifyRow("A", "7", "7", "foo14", "14"); + } + + public void testSubsetUpdate2() throws Exception { + // Update only some of the rows in the db. Also include some + // updates that do not affect actual rows in the table. + // These should just be ignored. + + populateDatabase(10); + // Create two files that update four rows each. + // File0 updates A=-2..1 (-2 and -1 don't exist). + // File1 updates A=8..11 (10 and 11 don't exist). + createUpdateFiles(2, 4, 0, -2, 8); + runUpdate(2, "A"); + verifyRowCount(10); + + // Verify these rows are unchanged. + verifyRow("A", "4", "4", "foo4", "4"); + verifyRow("A", "7", "7", "foo7", "7"); + + // Verify these updates succeeded. + verifyRow("A", "1", "1", "foo2", "2"); + verifyRow("A", "8", "8", "foo16", "16"); + verifyRow("A", "9", "9", "foo18", "18"); + } + +}