mirror of
https://github.com/apache/sqoop.git
synced 2025-05-03 02:32:23 +08:00
SQOOP-4. Add update-mode export process.
Add --update-key argument to sqoop-export tool. Refactor ExportOutputFormat into AsyncSqlOutputFormat. Added UpdateOutputFormat. ClassWriter now allows alternate serialization order in database write() method. SqoopOptions holds column list for alternate db serialization order. Added TestExportUpdate unit test battery. AsyncSqlRecordWriter now allows "batch" execution mode. Updated documentation for export updates. From: Aaron Kimball <aaron@cloudera.com> git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149933 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b0cd632b58
commit
6ea1665716
@ -40,6 +40,9 @@ Export control options
|
||||
--table (table-name)::
|
||||
The table to read (required)
|
||||
|
||||
--update-key (col-name)::
|
||||
Anchor column to use for updates
|
||||
|
||||
include::input-args.txt[]
|
||||
|
||||
include::output-args.txt[]
|
||||
|
@ -19,6 +19,11 @@
|
||||
The +export+ tool exports a set of files from HDFS back to an RDBMS.
|
||||
The target table must already exist in the database. The input files
|
||||
are read and parsed into a set of records according to the
|
||||
user-specified delimiters. These are then transformed into a set of
|
||||
+INSERT+ statements that inject the records into the database.
|
||||
user-specified delimiters.
|
||||
|
||||
The default operation is to transform these into a set of +INSERT+
|
||||
statements that inject the records into the database. In "update mode,"
|
||||
Sqoop will generate +UPDATE+ statements that replace existing records
|
||||
in the database.
|
||||
|
||||
|
||||
|
@ -43,14 +43,15 @@ include::common-args.txt[]
|
||||
|
||||
.Export control arguments:
|
||||
[grid="all"]
|
||||
`-------------------------`------------------------------------------
|
||||
Argument Description
|
||||
---------------------------------------------------------------------
|
||||
+\--direct+ Use direct export fast path
|
||||
+\--export-dir <dir>+ HDFS source path for the export
|
||||
+-m,\--num-mappers <n>+ Use 'n' map tasks to export in parallel
|
||||
+\--table <table-name>+ Table to populate
|
||||
---------------------------------------------------------------------
|
||||
`---------------------------`------------------------------------------
|
||||
Argument Description
|
||||
-----------------------------------------------------------------------
|
||||
+\--direct+ Use direct export fast path
|
||||
+\--export-dir <dir>+ HDFS source path for the export
|
||||
+-m,\--num-mappers <n>+ Use 'n' map tasks to export in parallel
|
||||
+\--table <table-name>+ Table to populate
|
||||
+\--update-key <col-name>+ Anchor column to use for updates
|
||||
-----------------------------------------------------------------------
|
||||
|
||||
The +\--table+ and +\--export-dir+ arguments are required. These
|
||||
specify the table to populate in the database, and the
|
||||
@ -72,6 +73,58 @@ MySQL provides a direct mode for exports as well, using the
|
||||
to specify this codepath. This may be
|
||||
higher-performance than the standard JDBC codepath.
|
||||
|
||||
|
||||
Inserts vs. Updates
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
By default, +sqoop-export+ appends new rows to a table; each input
|
||||
record is transformed into an +INSERT+ statement that adds a row to the
|
||||
target database table. If your table has constraints (e.g., a primary
|
||||
key column whose values must be unique) and already contains data, you
|
||||
must take care to avoid inserting records that violate these
|
||||
constraints. The export process will fail if an +INSERT+ statement
|
||||
fails. This mode is primarily intended for exporting records to a new,
|
||||
empty table intended to receive these results.
|
||||
|
||||
If you specify the +\--update-key+ argument, Sqoop will instead modify
|
||||
an existing dataset in the database. Each input record is treated as
|
||||
an +UPDATE+ statement that modifies an existing row. The row a
|
||||
statement modifies is determined by the column name specified with
|
||||
+\--update-key+. For example, consider the following table
|
||||
definition:
|
||||
|
||||
----
|
||||
CREATE TABLE foo(
|
||||
id INT NOT NULL PRIMARY KEY,
|
||||
msg VARCHAR(32),
|
||||
bar INT);
|
||||
----
|
||||
|
||||
Consider also a dataset in HDFS containing records like these:
|
||||
|
||||
----
|
||||
0,this is a test,42
|
||||
1,some more data,100
|
||||
...
|
||||
----
|
||||
|
||||
Running +sqoop-export \--table foo \--update-key id \--export-dir
|
||||
/path/to/data \--connect ...+ will run an export job that executes SQL
|
||||
statements based on the data like so:
|
||||
|
||||
----
|
||||
UPDATE foo SET msg='this is a test', bar=42 WHERE id=0;
|
||||
UPDATE foo SET msg='some more data', bar=100 WHERE id=1;
|
||||
...
|
||||
----
|
||||
|
||||
If an +UPDATE+ statement modifies no rows, this is not considered an
|
||||
error; the export will silently continue. (In effect, this means that
|
||||
an update-based export will not insert new rows into the database.)
|
||||
Likewise, if the column specified with +\--update-key+ does not
|
||||
uniquely identify rows and multiple rows are updated by a single
|
||||
statement, this condition is also undetected.
|
||||
|
||||
include::input-args.txt[]
|
||||
|
||||
include::output-args.txt[]
|
||||
@ -93,6 +146,11 @@ previous import, then the original generated class can be used to read
|
||||
the data back. Specifying +\--jar-file+ and +\--class-name+ obviate
|
||||
the need to specify delimiters in this case.
|
||||
|
||||
The use of existing generated code is incompatible with
|
||||
+\--update-key+; an update-mode export requires new code generation to
|
||||
perform the update. You cannot use +\--jar-file+, and must fully specify
|
||||
any non-default delimiters.
|
||||
|
||||
Exports and Transactions
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
|
@ -97,6 +97,10 @@ public enum FileLayout {
|
||||
private String hiveTableName;
|
||||
private String packageName; // package to prepend to auto-named classes.
|
||||
|
||||
// An ordered list of column names denoting what order columns are
|
||||
// serialized to a PreparedStatement from a generated record type.
|
||||
private String [] dbOutColumns;
|
||||
|
||||
// package+class to apply to individual table import.
|
||||
// also used as an *input* class with existingJarFile.
|
||||
private String className;
|
||||
@ -118,6 +122,9 @@ public enum FileLayout {
|
||||
// HDFS path to read from when performing an export
|
||||
private String exportDir;
|
||||
|
||||
// Column to use for the WHERE clause in an UPDATE-based export.
|
||||
private String updateKeyCol;
|
||||
|
||||
private DelimiterSet inputDelimiters;
|
||||
private DelimiterSet outputDelimiters;
|
||||
private boolean areDelimsManuallySet;
|
||||
@ -278,6 +285,8 @@ private void initDefaults(Configuration baseConfiguration) {
|
||||
|
||||
this.extraArgs = null;
|
||||
|
||||
this.dbOutColumns = null;
|
||||
|
||||
loadFromProperties();
|
||||
}
|
||||
|
||||
@ -951,5 +960,51 @@ public void setExtraArgs(String [] args) {
|
||||
this.extraArgs[i] = args[i];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the name of the column to be used in the WHERE clause of an
|
||||
* UPDATE-based export process.
|
||||
*/
|
||||
public void setUpdateKeyCol(String colName) {
|
||||
this.updateKeyCol = colName;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the column which is the key column in a table to be exported
|
||||
* in update mode.
|
||||
*/
|
||||
public String getUpdateKeyCol() {
|
||||
return this.updateKeyCol;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return an ordered list of column names. The code generator should
|
||||
* generate the DBWritable.write(PreparedStatement) method with columns
|
||||
* exporting in this order, if it is non-null.
|
||||
*/
|
||||
public String [] getDbOutputColumns() {
|
||||
if (null != dbOutColumns) {
|
||||
return Arrays.copyOf(this.dbOutColumns, dbOutColumns.length);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the order in which columns should be serialized by the generated
|
||||
* DBWritable.write(PreparedStatement) method. Setting this to null will use
|
||||
* the "natural order" of the database table.
|
||||
*
|
||||
* TODO: Expose this setter via the command-line arguments for the codegen
|
||||
* module. That would allow users to export to tables with columns in a
|
||||
* different physical order than the file layout in HDFS.
|
||||
*/
|
||||
public void setDbOutputColumns(String [] outCols) {
|
||||
if (null == outCols) {
|
||||
this.dbOutColumns = null;
|
||||
} else {
|
||||
this.dbOutColumns = Arrays.copyOf(outCols, outCols.length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -173,12 +173,23 @@ public String escapeTableName(String tableName) {
|
||||
|
||||
/**
|
||||
* Export data stored in HDFS into a table in a database.
|
||||
* This inserts new rows into the target table.
|
||||
*/
|
||||
public void exportTable(ExportJobContext context)
|
||||
throws IOException, ExportException {
|
||||
throw new ExportException("This database does not support exports");
|
||||
}
|
||||
|
||||
/**
|
||||
* Export updated data stored in HDFS into a database table.
|
||||
* This updates existing rows in the target table, based on the
|
||||
* updateKeyCol specified in the context's SqoopOptions.
|
||||
*/
|
||||
public void updateTable(ExportJobContext context)
|
||||
throws IOException, ExportException {
|
||||
throw new ExportException("This database does not support updates");
|
||||
}
|
||||
|
||||
/**
|
||||
* If a method of this ConnManager has returned a ResultSet to you,
|
||||
* you are responsible for calling release() after you close the
|
||||
|
@ -24,6 +24,7 @@
|
||||
import com.cloudera.sqoop.lib.ClobRef;
|
||||
import com.cloudera.sqoop.mapreduce.DataDrivenImportJob;
|
||||
import com.cloudera.sqoop.mapreduce.JdbcExportJob;
|
||||
import com.cloudera.sqoop.mapreduce.JdbcUpdateExportJob;
|
||||
import com.cloudera.sqoop.util.ExportException;
|
||||
import com.cloudera.sqoop.util.ImportException;
|
||||
import com.cloudera.sqoop.util.ResultSetPrinter;
|
||||
@ -593,4 +594,13 @@ public void release() {
|
||||
this.lastStatement = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @{inheritDoc}
|
||||
*/
|
||||
public void updateTable(ExportJobContext context)
|
||||
throws IOException, ExportException {
|
||||
JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context);
|
||||
exportJob.runExport();
|
||||
}
|
||||
}
|
||||
|
@ -57,6 +57,13 @@ public class ExportJobBase extends JobBase {
|
||||
public static final String SQOOP_EXPORT_TABLE_CLASS_KEY =
|
||||
"sqoop.mapreduce.export.table.class";
|
||||
|
||||
/**
|
||||
* What column of the table to use for the WHERE clause of
|
||||
* an updating export.
|
||||
*/
|
||||
public static final String SQOOP_EXPORT_UPDATE_COL_KEY =
|
||||
"sqoop.mapreduce.export.update.col";
|
||||
|
||||
/** Number of map tasks to use for an export. */
|
||||
public static final String EXPORT_MAP_TASKS_KEY =
|
||||
"sqoop.mapreduce.export.map.tasks";
|
||||
|
143
src/java/com/cloudera/sqoop/mapreduce/JdbcUpdateExportJob.java
Normal file
143
src/java/com/cloudera/sqoop/mapreduce/JdbcUpdateExportJob.java
Normal file
@ -0,0 +1,143 @@
|
||||
/**
|
||||
* Licensed to Cloudera, Inc. under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Cloudera, Inc. licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.cloudera.sqoop.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.SQLException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.InputFormat;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
import org.apache.hadoop.mapreduce.OutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
|
||||
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
|
||||
|
||||
import com.cloudera.sqoop.ConnFactory;
|
||||
import com.cloudera.sqoop.manager.ConnManager;
|
||||
import com.cloudera.sqoop.manager.ExportJobContext;
|
||||
import com.cloudera.sqoop.shims.ShimLoader;
|
||||
|
||||
/**
|
||||
* Run an update-based export using JDBC (JDBC-based UpdateOutputFormat).
|
||||
*/
|
||||
public class JdbcUpdateExportJob extends ExportJobBase {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
JdbcUpdateExportJob.class.getName());
|
||||
|
||||
/**
|
||||
* Return an instance of the UpdateOutputFormat class object loaded
|
||||
* from the shim jar.
|
||||
*/
|
||||
private static Class<? extends OutputFormat> getUpdateOutputFormat()
|
||||
throws IOException {
|
||||
try {
|
||||
return (Class<? extends OutputFormat>) ShimLoader.getShimClass(
|
||||
"com.cloudera.sqoop.mapreduce.UpdateOutputFormat");
|
||||
} catch (ClassNotFoundException cnfe) {
|
||||
throw new IOException("Could not load updating export OutputFormat",
|
||||
cnfe);
|
||||
}
|
||||
}
|
||||
|
||||
public JdbcUpdateExportJob(final ExportJobContext context)
|
||||
throws IOException {
|
||||
super(context, null, null, getUpdateOutputFormat());
|
||||
}
|
||||
|
||||
public JdbcUpdateExportJob(final ExportJobContext ctxt,
|
||||
final Class<? extends Mapper> mapperClass,
|
||||
final Class<? extends InputFormat> inputFormatClass,
|
||||
final Class<? extends OutputFormat> outputFormatClass) {
|
||||
super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<? extends Mapper> getMapperClass() {
|
||||
if (inputIsSequenceFiles()) {
|
||||
return SequenceFileExportMapper.class;
|
||||
} else {
|
||||
return TextExportMapper.class;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configureOutputFormat(Job job, String tableName,
|
||||
String tableClassName) throws IOException {
|
||||
|
||||
Configuration conf = options.getConf();
|
||||
ConnManager mgr = new ConnFactory(conf).getManager(options);
|
||||
try {
|
||||
String username = options.getUsername();
|
||||
if (null == username || username.length() == 0) {
|
||||
DBConfiguration.configureDB(job.getConfiguration(),
|
||||
mgr.getDriverClass(),
|
||||
options.getConnectString());
|
||||
} else {
|
||||
DBConfiguration.configureDB(job.getConfiguration(),
|
||||
mgr.getDriverClass(),
|
||||
options.getConnectString(),
|
||||
username, options.getPassword());
|
||||
}
|
||||
|
||||
String [] colNames = options.getColumns();
|
||||
if (null == colNames) {
|
||||
colNames = mgr.getColumnNames(tableName);
|
||||
}
|
||||
|
||||
if (null == colNames) {
|
||||
throw new IOException(
|
||||
"Export column names could not be determined for " + tableName);
|
||||
}
|
||||
|
||||
String updateKeyCol = options.getUpdateKeyCol();
|
||||
if (null == updateKeyCol) {
|
||||
throw new IOException("Update key column not set in export job");
|
||||
}
|
||||
|
||||
// Make sure we strip out the key column from this list.
|
||||
String [] outColNames = new String[colNames.length - 1];
|
||||
int j = 0;
|
||||
String upperCaseKeyCol = updateKeyCol.toUpperCase();
|
||||
for (int i = 0; i < colNames.length; i++) {
|
||||
if (!colNames[i].toUpperCase().equals(upperCaseKeyCol)) {
|
||||
outColNames[j++] = colNames[i];
|
||||
}
|
||||
}
|
||||
DBOutputFormat.setOutput(job, tableName, outColNames);
|
||||
|
||||
job.setOutputFormatClass(getOutputFormatClass());
|
||||
job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
|
||||
job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyCol);
|
||||
} catch (ClassNotFoundException cnfe) {
|
||||
throw new IOException("Could not load OutputFormat", cnfe);
|
||||
} finally {
|
||||
try {
|
||||
mgr.close();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.warn("Error closing connection: " + sqlE);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -842,6 +842,26 @@ private void generateHadoopWrite(Map<String, Integer> columnTypes,
|
||||
|
||||
sb.append(" }\n");
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a list of identifiers to use based on the true column names
|
||||
* of the table.
|
||||
* @param colNames the actual column names of the table.
|
||||
* @return a list of column names in the same order which are
|
||||
* cleaned up to be used as identifiers in the generated Java class.
|
||||
*/
|
||||
private String [] cleanColNames(String [] colNames) {
|
||||
String [] cleanedColNames = new String[colNames.length];
|
||||
for (int i = 0; i < colNames.length; i++) {
|
||||
String col = colNames[i];
|
||||
String identifier = toIdentifier(col);
|
||||
cleanedColNames[i] = identifier;
|
||||
}
|
||||
|
||||
return cleanedColNames;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Generate the ORM code for the class.
|
||||
*/
|
||||
@ -876,19 +896,47 @@ public void generate() throws IOException {
|
||||
|
||||
// Translate all the column names into names that are safe to
|
||||
// use as identifiers.
|
||||
String [] cleanedColNames = new String[colNames.length];
|
||||
for (int i = 0; i < colNames.length; i++) {
|
||||
String col = colNames[i];
|
||||
String identifier = toIdentifier(col);
|
||||
cleanedColNames[i] = identifier;
|
||||
String [] cleanedColNames = cleanColNames(colNames);
|
||||
|
||||
for (int i = 0; i < colNames.length; i++) {
|
||||
// Make sure the col->type mapping holds for the
|
||||
// new identifier name, too.
|
||||
String identifier = cleanedColNames[i];
|
||||
String col = colNames[i];
|
||||
columnTypes.put(identifier, columnTypes.get(col));
|
||||
}
|
||||
|
||||
// The db write() method may use column names in a different
|
||||
// order. If this is set in the options, pull it out here and
|
||||
// make sure we format the column names to identifiers in the same way
|
||||
// as we do for the ordinary column list.
|
||||
String [] dbWriteColNames = options.getDbOutputColumns();
|
||||
String [] cleanedDbWriteColNames = null;
|
||||
if (null == dbWriteColNames) {
|
||||
cleanedDbWriteColNames = cleanedColNames;
|
||||
} else {
|
||||
cleanedDbWriteColNames = cleanColNames(dbWriteColNames);
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("selected columns:");
|
||||
for (String col : cleanedColNames) {
|
||||
LOG.debug(" " + col);
|
||||
}
|
||||
|
||||
if (cleanedDbWriteColNames != cleanedColNames) {
|
||||
// dbWrite() has a different set of columns than the rest of the
|
||||
// generators.
|
||||
LOG.debug("db write column order:");
|
||||
for (String dbCol : cleanedDbWriteColNames) {
|
||||
LOG.debug(" " + dbCol);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Generate the Java code.
|
||||
StringBuilder sb = generateClassForColumns(columnTypes, cleanedColNames);
|
||||
StringBuilder sb = generateClassForColumns(columnTypes,
|
||||
cleanedColNames, cleanedDbWriteColNames);
|
||||
|
||||
// Write this out to a file.
|
||||
String codeOutDir = options.getCodeOutputDir();
|
||||
@ -954,10 +1002,13 @@ public void generate() throws IOException {
|
||||
* Generate the ORM code for a table object containing the named columns.
|
||||
* @param columnTypes - mapping from column names to sql types
|
||||
* @param colNames - ordered list of column names for table.
|
||||
* @param dbWriteColNames - ordered list of column names for the db
|
||||
* write() method of the class.
|
||||
* @return - A StringBuilder that contains the text of the class code.
|
||||
*/
|
||||
public StringBuilder generateClassForColumns(Map<String, Integer> columnTypes,
|
||||
String [] colNames) {
|
||||
private StringBuilder generateClassForColumns(
|
||||
Map<String, Integer> columnTypes,
|
||||
String [] colNames, String [] dbWriteColNames) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("// ORM class for " + tableName + "\n");
|
||||
sb.append("// WARNING: This class is AUTO-GENERATED. "
|
||||
@ -1011,7 +1062,7 @@ public StringBuilder generateClassForColumns(Map<String, Integer> columnTypes,
|
||||
generateFields(columnTypes, colNames, sb);
|
||||
generateDbRead(columnTypes, colNames, sb);
|
||||
generateLoadLargeObjects(columnTypes, colNames, sb);
|
||||
generateDbWrite(columnTypes, colNames, sb);
|
||||
generateDbWrite(columnTypes, dbWriteColNames, sb);
|
||||
generateHadoopRead(columnTypes, colNames, sb);
|
||||
generateHadoopWrite(columnTypes, colNames, sb);
|
||||
generateToString(columnTypes, colNames, sb);
|
||||
|
@ -110,6 +110,7 @@ public abstract class BaseSqoopTool extends SqoopTool {
|
||||
public static final String SQL_QUERY_SHORT_ARG = "e";
|
||||
public static final String VERBOSE_ARG = "verbose";
|
||||
public static final String HELP_ARG = "help";
|
||||
public static final String UPDATE_KEY_ARG = "update-key";
|
||||
|
||||
|
||||
public BaseSqoopTool() {
|
||||
|
@ -19,6 +19,7 @@
|
||||
package com.cloudera.sqoop.tool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
@ -64,7 +65,13 @@ private void exportTable(SqoopOptions options, String tableName)
|
||||
|
||||
ExportJobContext context = new ExportJobContext(tableName, jarFile,
|
||||
options);
|
||||
manager.exportTable(context);
|
||||
if (options.getUpdateKeyCol() != null) {
|
||||
// UPDATE-based export.
|
||||
manager.updateTable(context);
|
||||
} else {
|
||||
// INSERT-based export.
|
||||
manager.exportTable(context);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -77,6 +84,28 @@ public int run(SqoopOptions options) {
|
||||
|
||||
codeGenerator.setManager(manager);
|
||||
|
||||
String updateKeyCol = options.getUpdateKeyCol();
|
||||
if (updateKeyCol != null) {
|
||||
// We're in update mode. We need to explicitly set the database output
|
||||
// column ordering in the codeGenerator. The UpdateKeyCol must come
|
||||
// last, because the UPDATE-based OutputFormat will generate the SET
|
||||
// clause followed by the WHERE clause, and the SqoopRecord needs to
|
||||
// serialize to this layout.
|
||||
String [] allColNames = manager.getColumnNames(options.getTableName());
|
||||
List<String> dbOutCols = new ArrayList<String>();
|
||||
String upperCaseKeyCol = updateKeyCol.toUpperCase();
|
||||
for (String col : allColNames) {
|
||||
if (!upperCaseKeyCol.equals(col.toUpperCase())) {
|
||||
dbOutCols.add(col); // add non-key columns to the output order list.
|
||||
}
|
||||
}
|
||||
|
||||
// Then add the update key column last.
|
||||
dbOutCols.add(updateKeyCol);
|
||||
options.setDbOutputColumns(dbOutCols.toArray(
|
||||
new String[dbOutCols.size()]));
|
||||
}
|
||||
|
||||
try {
|
||||
exportTable(options, options.getTableName());
|
||||
} catch (IOException ioe) {
|
||||
@ -126,6 +155,11 @@ protected RelatedOptions getExportOptions() {
|
||||
.withDescription("HDFS source path for the export")
|
||||
.withLongOpt(EXPORT_PATH_ARG)
|
||||
.create());
|
||||
exportOpts.addOption(OptionBuilder.withArgName("key")
|
||||
.hasArg()
|
||||
.withDescription("Update records by specified key column")
|
||||
.withLongOpt(UPDATE_KEY_ARG)
|
||||
.create());
|
||||
|
||||
return exportOpts;
|
||||
}
|
||||
@ -193,6 +227,10 @@ public void applyOptions(CommandLine in, SqoopOptions out)
|
||||
out.setExistingJarName(in.getOptionValue(JAR_FILE_NAME_ARG));
|
||||
}
|
||||
|
||||
if (in.hasOption(UPDATE_KEY_ARG)) {
|
||||
out.setUpdateKeyCol(in.getOptionValue(UPDATE_KEY_ARG));
|
||||
}
|
||||
|
||||
applyInputFormatOptions(in, out);
|
||||
applyOutputFormatOptions(in, out);
|
||||
applyOutputFormatOptions(in, out);
|
||||
@ -220,6 +258,13 @@ protected void validateExportOptions(SqoopOptions options)
|
||||
&& options.getClassName() == null) {
|
||||
throw new InvalidOptionsException("Jar specified with --jar-file, but no "
|
||||
+ "class specified with --class-name." + HELP_STR);
|
||||
} else if (options.getExistingJarName() != null
|
||||
&& options.getUpdateKeyCol() != null) {
|
||||
// We need to regenerate the class with the output column order set
|
||||
// correctly for the update-based export. So we can't use a premade
|
||||
// class.
|
||||
throw new InvalidOptionsException("Jar cannot be specified with "
|
||||
+ "--jar-file when export is running in update mode.");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,308 @@
|
||||
/**
|
||||
* Licensed to Cloudera, Inc. under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Cloudera, Inc. licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.cloudera.sqoop.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.OutputFormat;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.cloudera.sqoop.lib.SqoopRecord;
|
||||
|
||||
/**
|
||||
* Abstract OutputFormat class that allows the RecordWriter to buffer
|
||||
* up SQL commands which should be executed in a separate thread after
|
||||
* enough commands are created.
|
||||
*
|
||||
* This supports a configurable "spill threshold" at which
|
||||
* point intermediate transactions are committed.
|
||||
*
|
||||
* Uses DBOutputFormat/DBConfiguration for configuring the output.
|
||||
* This is used in conjunction with the abstract AsyncSqlRecordWriter
|
||||
* class.
|
||||
*
|
||||
* Clients of this OutputFormat must implement getRecordWriter(); the
|
||||
* returned RecordWriter is intended to subclass AsyncSqlRecordWriter.
|
||||
*/
|
||||
public abstract class AsyncSqlOutputFormat<K extends SqoopRecord, V>
|
||||
extends OutputFormat<K, V> {
|
||||
|
||||
/** conf key: number of rows to export per INSERT statement. */
|
||||
public static final String RECORDS_PER_STATEMENT_KEY =
|
||||
"sqoop.export.records.per.statement";
|
||||
|
||||
/** conf key: number of INSERT statements to bundle per tx.
|
||||
* If this is set to -1, then a single transaction will be used
|
||||
* per task. Note that each statement may encompass multiple
|
||||
* rows, depending on the value of sqoop.export.records.per.statement.
|
||||
*/
|
||||
public static final String STATEMENTS_PER_TRANSACTION_KEY =
|
||||
"sqoop.export.statements.per.transaction";
|
||||
|
||||
/**
|
||||
* Default number of records to put in an INSERT statement or
|
||||
* other batched update statement.
|
||||
*/
|
||||
public static final int DEFAULT_RECORDS_PER_STATEMENT = 100;
|
||||
|
||||
/**
|
||||
* Default number of statements to execute before committing the
|
||||
* current transaction.
|
||||
*/
|
||||
public static final int DEFAULT_STATEMENTS_PER_TRANSACTION = 100;
|
||||
|
||||
/**
|
||||
* Value for STATEMENTS_PER_TRANSACTION_KEY signifying that we should
|
||||
* not commit until the RecordWriter is being closed, regardless of
|
||||
* the number of statements we execute.
|
||||
*/
|
||||
public static final int UNLIMITED_STATEMENTS_PER_TRANSACTION = -1;
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(AsyncSqlOutputFormat.class);
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void checkOutputSpecs(JobContext context)
|
||||
throws IOException, InterruptedException {
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
|
||||
throws IOException, InterruptedException {
|
||||
return new OutputCommitter() {
|
||||
public void abortTask(TaskAttemptContext taskContext) { }
|
||||
public void cleanupJob(JobContext jobContext) { }
|
||||
public void commitTask(TaskAttemptContext taskContext) { }
|
||||
public boolean needsTaskCommit(TaskAttemptContext taskContext) {
|
||||
return false;
|
||||
}
|
||||
public void setupJob(JobContext jobContext) { }
|
||||
public void setupTask(TaskAttemptContext taskContext) { }
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a database update operation that should be performed
|
||||
* by an asynchronous background thread.
|
||||
* AsyncDBOperation objects are immutable.
|
||||
* They MAY contain a statement which should be executed. The
|
||||
* statement may also be null.
|
||||
*
|
||||
* They may also set 'commitAndClose' to true. If true, then the
|
||||
* executor of this operation should commit the current
|
||||
* transaction, even if stmt is null, and then stop the executor
|
||||
* thread.
|
||||
*/
|
||||
public static class AsyncDBOperation {
|
||||
private final PreparedStatement stmt;
|
||||
private final boolean commitAndClose;
|
||||
private final boolean isBatch;
|
||||
|
||||
/**
|
||||
* Create an asynchronous database operation.
|
||||
* @param s the statement, if any, to execute.
|
||||
* @param commitAndClose if true, the current transaction should be
|
||||
* committed, and the executor thread should stop after this operation.
|
||||
* @param batch is true if this is a batch PreparedStatement, or false
|
||||
* if it's a normal singleton statement.
|
||||
*/
|
||||
public AsyncDBOperation(PreparedStatement s, boolean commitAndClose,
|
||||
boolean batch) {
|
||||
this.stmt = s;
|
||||
this.commitAndClose = commitAndClose;
|
||||
this.isBatch = batch;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a statement to run as an update.
|
||||
*/
|
||||
public PreparedStatement getStatement() {
|
||||
return stmt;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the executor should commit the current transaction.
|
||||
* If getStatement() is non-null, the statement is run first.
|
||||
*/
|
||||
public boolean requiresCommit() {
|
||||
return this.commitAndClose;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the executor should stop after this command.
|
||||
*/
|
||||
public boolean stop() {
|
||||
return this.commitAndClose;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if this is a batch SQL statement.
|
||||
*/
|
||||
public boolean execAsBatch() {
|
||||
return this.isBatch;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A thread that runs the database interactions asynchronously
|
||||
* from the OutputCollector.
|
||||
*/
|
||||
public static class AsyncSqlExecThread extends Thread {
|
||||
|
||||
private final Connection conn; // The connection to the database.
|
||||
private SQLException err; // Error from a previously-run statement.
|
||||
|
||||
// How we receive database operations from the RecordWriter.
|
||||
private SynchronousQueue<AsyncDBOperation> opsQueue;
|
||||
|
||||
protected int curNumStatements; // statements executed thus far in the tx.
|
||||
protected final int stmtsPerTx; // statements per transaction.
|
||||
|
||||
/**
|
||||
* Create a new update thread that interacts with the database.
|
||||
* @param conn the connection to use. This must only be used by this
|
||||
* thread.
|
||||
* @param stmtsPerTx the number of statements to execute before committing
|
||||
* the current transaction.
|
||||
*/
|
||||
public AsyncSqlExecThread(Connection conn, int stmtsPerTx) {
|
||||
this.conn = conn;
|
||||
this.err = null;
|
||||
this.opsQueue = new SynchronousQueue<AsyncDBOperation>();
|
||||
this.stmtsPerTx = stmtsPerTx;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
while (true) {
|
||||
AsyncDBOperation op = null;
|
||||
try {
|
||||
op = opsQueue.take();
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Interrupted retrieving from operation queue: "
|
||||
+ StringUtils.stringifyException(ie));
|
||||
continue;
|
||||
}
|
||||
|
||||
if (null == op) {
|
||||
// This shouldn't be allowed to happen.
|
||||
LOG.warn("Null operation in queue; illegal state.");
|
||||
continue;
|
||||
}
|
||||
|
||||
PreparedStatement stmt = op.getStatement();
|
||||
// Synchronize on the connection to ensure it does not conflict
|
||||
// with the prepareStatement() call in the main thread.
|
||||
synchronized (conn) {
|
||||
try {
|
||||
if (null != stmt) {
|
||||
if (op.execAsBatch()) {
|
||||
stmt.executeBatch();
|
||||
} else {
|
||||
// Normal update.
|
||||
stmt.executeUpdate();
|
||||
}
|
||||
stmt.close();
|
||||
stmt = null;
|
||||
this.curNumStatements++;
|
||||
}
|
||||
|
||||
if (op.requiresCommit() || (curNumStatements >= stmtsPerTx
|
||||
&& stmtsPerTx != UNLIMITED_STATEMENTS_PER_TRANSACTION)) {
|
||||
LOG.debug("Committing transaction of " + curNumStatements
|
||||
+ " statements");
|
||||
this.conn.commit();
|
||||
this.curNumStatements = 0;
|
||||
}
|
||||
} catch (SQLException sqlE) {
|
||||
setLastError(sqlE);
|
||||
} finally {
|
||||
// Close the statement on our way out if that didn't happen
|
||||
// via the normal execution path.
|
||||
if (null != stmt) {
|
||||
try {
|
||||
stmt.close();
|
||||
} catch (SQLException sqlE) {
|
||||
setLastError(sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
// Always check whether we should end the loop, regardless
|
||||
// of the presence of an exception.
|
||||
if (op.stop()) {
|
||||
// Don't continue processing after this operation.
|
||||
try {
|
||||
conn.close();
|
||||
} catch (SQLException sqlE) {
|
||||
setLastError(sqlE);
|
||||
}
|
||||
return;
|
||||
}
|
||||
} // try .. catch .. finally.
|
||||
} // synchronized (conn)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows a user to enqueue the next database operation to run.
|
||||
* Since the connection can only execute a single operation at a time,
|
||||
* the put() method may block if another operation is already underway.
|
||||
* @param op the database operation to perform.
|
||||
*/
|
||||
public void put(AsyncDBOperation op) throws InterruptedException {
|
||||
opsQueue.put(op);
|
||||
}
|
||||
|
||||
/**
|
||||
* If a previously-executed statement resulted in an error, post it here.
|
||||
* If the error slot was already filled, then subsequent errors are
|
||||
* squashed until the user calls this method (which clears the error
|
||||
* slot).
|
||||
* @return any SQLException that occurred due to a previously-run
|
||||
* statement.
|
||||
*/
|
||||
public synchronized SQLException getLastError() {
|
||||
SQLException e = this.err;
|
||||
this.err = null;
|
||||
return e;
|
||||
}
|
||||
|
||||
private synchronized void setLastError(SQLException e) {
|
||||
if (this.err == null) {
|
||||
// Just set it.
|
||||
LOG.error("Got exception in update thread: "
|
||||
+ StringUtils.stringifyException(e));
|
||||
this.err = e;
|
||||
} else {
|
||||
// Slot is full. Log it and discard.
|
||||
LOG.error("SQLException in update thread but error slot full: "
|
||||
+ StringUtils.stringifyException(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,207 @@
|
||||
/**
|
||||
* Licensed to Cloudera, Inc. under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Cloudera, Inc. licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.cloudera.sqoop.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.RecordWriter;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
|
||||
|
||||
import com.cloudera.sqoop.lib.SqoopRecord;
|
||||
|
||||
/**
|
||||
* Abstract RecordWriter base class that buffers SqoopRecords to be injected
|
||||
* into JDBC SQL PreparedStatements to be executed by the
|
||||
* AsyncSqlOutputFormat's background thread.
|
||||
*
|
||||
* Record objects are buffered before actually performing the INSERT
|
||||
* statements; this requires that the key implement the SqoopRecord interface.
|
||||
*
|
||||
* Uses DBOutputFormat/DBConfiguration for configuring the output.
|
||||
*/
|
||||
public abstract class AsyncSqlRecordWriter<K extends SqoopRecord, V>
|
||||
extends RecordWriter<K, V> {
|
||||
|
||||
private Connection connection;
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
protected final int rowsPerStmt; // rows to insert per statement.
|
||||
|
||||
// Buffer for records to be put into export SQL statements.
|
||||
private List<SqoopRecord> records;
|
||||
|
||||
// Background thread to actually perform the updates.
|
||||
private AsyncSqlOutputFormat.AsyncSqlExecThread execThread;
|
||||
private boolean startedExecThread;
|
||||
|
||||
public AsyncSqlRecordWriter(TaskAttemptContext context)
|
||||
throws ClassNotFoundException, SQLException {
|
||||
this.conf = context.getConfiguration();
|
||||
|
||||
this.rowsPerStmt = conf.getInt(
|
||||
AsyncSqlOutputFormat.RECORDS_PER_STATEMENT_KEY,
|
||||
AsyncSqlOutputFormat.DEFAULT_RECORDS_PER_STATEMENT);
|
||||
int stmtsPerTx = conf.getInt(
|
||||
AsyncSqlOutputFormat.STATEMENTS_PER_TRANSACTION_KEY,
|
||||
AsyncSqlOutputFormat.DEFAULT_STATEMENTS_PER_TRANSACTION);
|
||||
|
||||
DBConfiguration dbConf = new DBConfiguration(conf);
|
||||
this.connection = dbConf.getConnection();
|
||||
this.connection.setAutoCommit(false);
|
||||
|
||||
this.records = new ArrayList<SqoopRecord>(this.rowsPerStmt);
|
||||
|
||||
this.execThread = new AsyncSqlOutputFormat.AsyncSqlExecThread(
|
||||
connection, stmtsPerTx);
|
||||
this.execThread.setDaemon(true);
|
||||
this.startedExecThread = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allow subclasses access to the Connection instance we hold.
|
||||
* This Connection is shared with the asynchronous SQL exec thread.
|
||||
* Any uses of the Connection must be synchronized on it.
|
||||
* @return the Connection object used for this SQL transaction.
|
||||
*/
|
||||
protected final Connection getConnection() {
|
||||
return this.connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allow subclasses access to the Configuration.
|
||||
* @return the Configuration for this MapReduc task.
|
||||
*/
|
||||
protected final Configuration getConf() {
|
||||
return this.conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Should return 'true' if the PreparedStatements generated by the
|
||||
* RecordWriter are intended to be executed in "batch" mode, or false
|
||||
* if it's just one big statement.
|
||||
*/
|
||||
protected boolean isBatchExec() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate the PreparedStatement object that will be fed into the execution
|
||||
* thread. All parameterized fields of the PreparedStatement must be set in
|
||||
* this method as well; this is usually based on the records collected from
|
||||
* the user in the userRecords list.
|
||||
*
|
||||
* Note that any uses of the Connection object here must be synchronized on
|
||||
* the Connection.
|
||||
*
|
||||
* @param userRecords a list of records that should be injected into SQL
|
||||
* statements.
|
||||
* @return a PreparedStatement to be populated with rows
|
||||
* from the collected record list.
|
||||
*/
|
||||
protected abstract PreparedStatement getPreparedStatement(
|
||||
List<SqoopRecord> userRecords) throws SQLException;
|
||||
|
||||
/**
|
||||
* Takes the current contents of 'records' and formats and executes the
|
||||
* INSERT statement.
|
||||
* @param closeConn if true, commits the transaction and closes the
|
||||
* connection.
|
||||
*/
|
||||
private void execUpdate(boolean closeConn)
|
||||
throws InterruptedException, SQLException {
|
||||
|
||||
if (!startedExecThread) {
|
||||
this.execThread.start();
|
||||
this.startedExecThread = true;
|
||||
}
|
||||
|
||||
PreparedStatement stmt = null;
|
||||
boolean successfulPut = false;
|
||||
try {
|
||||
if (records.size() > 0) {
|
||||
stmt = getPreparedStatement(records);
|
||||
this.records.clear();
|
||||
}
|
||||
|
||||
// Pass this operation off to the update thread. This will block if
|
||||
// the update thread is already performing an update.
|
||||
AsyncSqlOutputFormat.AsyncDBOperation op =
|
||||
new AsyncSqlOutputFormat.AsyncDBOperation(stmt, closeConn,
|
||||
isBatchExec());
|
||||
execThread.put(op);
|
||||
successfulPut = true; // op has been posted to the other thread.
|
||||
} finally {
|
||||
if (!successfulPut && null != stmt) {
|
||||
// We created a statement but failed to enqueue it. Close it.
|
||||
stmt.close();
|
||||
}
|
||||
}
|
||||
|
||||
// Check for any previous SQLException. If one happened, rethrow it here.
|
||||
SQLException lastException = execThread.getLastError();
|
||||
if (null != lastException) {
|
||||
throw lastException;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void close(TaskAttemptContext context)
|
||||
throws IOException, InterruptedException {
|
||||
try {
|
||||
execUpdate(true);
|
||||
} catch (SQLException sqle) {
|
||||
throw new IOException(sqle);
|
||||
} finally {
|
||||
execThread.join();
|
||||
}
|
||||
|
||||
// If we're not leaving on an error return path already,
|
||||
// now that execThread is definitely stopped, check that the
|
||||
// error slot remains empty.
|
||||
SQLException lastErr = execThread.getLastError();
|
||||
if (null != lastErr) {
|
||||
throw new IOException(lastErr);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void write(K key, V value)
|
||||
throws InterruptedException, IOException {
|
||||
try {
|
||||
records.add((SqoopRecord) key.clone());
|
||||
if (records.size() >= this.rowsPerStmt) {
|
||||
execUpdate(false);
|
||||
}
|
||||
} catch (CloneNotSupportedException cnse) {
|
||||
throw new IOException("Could not buffer record", cnse);
|
||||
} catch (SQLException sqlException) {
|
||||
throw new IOException(sqlException);
|
||||
}
|
||||
}
|
||||
}
|
@ -22,20 +22,16 @@
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.OutputFormat;
|
||||
import org.apache.hadoop.mapreduce.RecordWriter;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.cloudera.sqoop.lib.SqoopRecord;
|
||||
|
||||
@ -51,23 +47,7 @@
|
||||
* Uses DBOutputFormat/DBConfiguration for configuring the output.
|
||||
*/
|
||||
public class ExportOutputFormat<K extends SqoopRecord, V>
|
||||
extends OutputFormat<K, V> {
|
||||
|
||||
/** conf key: number of rows to export per INSERT statement. */
|
||||
public static final String RECORDS_PER_STATEMENT_KEY =
|
||||
"sqoop.export.records.per.statement";
|
||||
|
||||
/** conf key: number of INSERT statements to bundle per tx.
|
||||
* If this is set to -1, then a single transaction will be used
|
||||
* per task. Note that each statement may encompass multiple
|
||||
* rows, depending on the value of sqoop.export.records.per.statement.
|
||||
*/
|
||||
public static final String STATEMENTS_PER_TRANSACTION_KEY =
|
||||
"sqoop.export.statements.per.transaction";
|
||||
|
||||
private static final int DEFAULT_RECORDS_PER_STATEMENT = 100;
|
||||
private static final int DEFAULT_STATEMENTS_PER_TRANSACTION = 100;
|
||||
private static final int UNLIMITED_STATEMENTS_PER_TRANSACTION = -1;
|
||||
extends AsyncSqlOutputFormat<K, V> {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ExportOutputFormat.class);
|
||||
|
||||
@ -90,22 +70,6 @@ public void checkOutputSpecs(JobContext context)
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
|
||||
throws IOException, InterruptedException {
|
||||
return new OutputCommitter() {
|
||||
public void abortTask(TaskAttemptContext taskContext) { }
|
||||
public void cleanupJob(JobContext jobContext) { }
|
||||
public void commitTask(TaskAttemptContext taskContext) { }
|
||||
public boolean needsTaskCommit(TaskAttemptContext taskContext) {
|
||||
return false;
|
||||
}
|
||||
public void setupJob(JobContext jobContext) { }
|
||||
public void setupTask(TaskAttemptContext taskContext) { }
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
|
||||
@ -117,236 +81,74 @@ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a database update operation that should be performed
|
||||
* by an asynchronous background thread.
|
||||
* AsyncDBOperation objects are immutable.
|
||||
* They MAY contain a statement which should be executed. The
|
||||
* statement may also be null.
|
||||
*
|
||||
* They may also set 'forceCommit' to true. If true, then the
|
||||
* executor of this operation should commit the current
|
||||
* transaction, even if stmt is null.
|
||||
*/
|
||||
private static class AsyncDBOperation {
|
||||
private final PreparedStatement stmt;
|
||||
private final boolean forceCommit;
|
||||
private final boolean close;
|
||||
|
||||
/**
|
||||
* Create an asynchronous database operation.
|
||||
* @param s the statement, if any, to execute.
|
||||
* @param forceCommit if true, the current transaction should be committed.
|
||||
* @param close if true, the executor thread should stop after processing
|
||||
* this operation.
|
||||
*/
|
||||
public AsyncDBOperation(PreparedStatement s, boolean forceCommit,
|
||||
boolean close) {
|
||||
this.stmt = s;
|
||||
this.forceCommit = forceCommit;
|
||||
this.close = close;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a statement to run as an update.
|
||||
*/
|
||||
public PreparedStatement getStatement() {
|
||||
return stmt;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the executor should commit the current transaction.
|
||||
* If getStatement() is non-null, the statement is run first.
|
||||
*/
|
||||
public boolean requiresCommit() {
|
||||
return forceCommit;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the executor should stop after this command.
|
||||
*/
|
||||
public boolean stop() {
|
||||
return this.close;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A thread that runs the database interactions asynchronously
|
||||
* from the OutputCollector.
|
||||
*/
|
||||
private static class ExportUpdateThread extends Thread {
|
||||
|
||||
private final Connection conn; // The connection to the database.
|
||||
private SQLException err; // Error from a previously-run statement.
|
||||
|
||||
// How we receive database operations from the RecordWriter.
|
||||
private SynchronousQueue<AsyncDBOperation> opsQueue;
|
||||
|
||||
protected int curNumStatements; // statements executed thus far in the tx.
|
||||
protected final int stmtsPerTx; // statements per transaction.
|
||||
|
||||
/**
|
||||
* Create a new update thread that interacts with the database.
|
||||
* @param conn the connection to use. This must only be used by this
|
||||
* thread.
|
||||
* @param stmtsPerTx the number of statements to execute before committing
|
||||
* the current transaction.
|
||||
*/
|
||||
public ExportUpdateThread(Connection conn, int stmtsPerTx) {
|
||||
this.conn = conn;
|
||||
this.err = null;
|
||||
this.opsQueue = new SynchronousQueue<AsyncDBOperation>();
|
||||
this.stmtsPerTx = stmtsPerTx;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
while (true) {
|
||||
AsyncDBOperation op = null;
|
||||
try {
|
||||
op = opsQueue.take();
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Interrupted retrieving from operation queue: "
|
||||
+ StringUtils.stringifyException(ie));
|
||||
continue;
|
||||
}
|
||||
|
||||
if (null == op) {
|
||||
// This shouldn't be allowed to happen.
|
||||
LOG.warn("Null operation in queue; illegal state.");
|
||||
continue;
|
||||
}
|
||||
|
||||
PreparedStatement stmt = op.getStatement();
|
||||
// Synchronize on the connection to ensure it does not conflict
|
||||
// with the prepareStatement() call in the main thread.
|
||||
synchronized (conn) {
|
||||
try {
|
||||
if (null != stmt) {
|
||||
stmt.executeUpdate();
|
||||
stmt.close();
|
||||
stmt = null;
|
||||
this.curNumStatements++;
|
||||
}
|
||||
|
||||
if (op.requiresCommit() || (curNumStatements >= stmtsPerTx
|
||||
&& stmtsPerTx != UNLIMITED_STATEMENTS_PER_TRANSACTION)) {
|
||||
LOG.debug("Committing transaction of " + curNumStatements
|
||||
+ " statements");
|
||||
this.conn.commit();
|
||||
this.curNumStatements = 0;
|
||||
}
|
||||
} catch (SQLException sqlE) {
|
||||
setLastError(sqlE);
|
||||
} finally {
|
||||
// Close the statement on our way out if that didn't happen
|
||||
// via the normal execution path.
|
||||
if (null != stmt) {
|
||||
try {
|
||||
stmt.close();
|
||||
} catch (SQLException sqlE) {
|
||||
setLastError(sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
// Always check whether we should end the loop, regardless
|
||||
// of the presence of an exception.
|
||||
if (op.stop()) {
|
||||
// Don't continue processing after this operation.
|
||||
try {
|
||||
conn.close();
|
||||
} catch (SQLException sqlE) {
|
||||
setLastError(sqlE);
|
||||
}
|
||||
return;
|
||||
}
|
||||
} // try .. catch .. finally.
|
||||
} // synchronized (conn)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows a user to enqueue the next database operation to run.
|
||||
* Since the connection can only execute a single operation at a time,
|
||||
* the put() method may block if another operation is already underway.
|
||||
* @param op the database operation to perform.
|
||||
*/
|
||||
public void put(AsyncDBOperation op) throws InterruptedException {
|
||||
opsQueue.put(op);
|
||||
}
|
||||
|
||||
/**
|
||||
* If a previously-executed statement resulted in an error, post it here.
|
||||
* If the error slot was already filled, then subsequent errors are
|
||||
* squashed until the user calls this method (which clears the error
|
||||
* slot).
|
||||
* @return any SQLException that occurred due to a previously-run
|
||||
* statement.
|
||||
*/
|
||||
public synchronized SQLException getLastError() {
|
||||
SQLException e = this.err;
|
||||
this.err = null;
|
||||
return e;
|
||||
}
|
||||
|
||||
private synchronized void setLastError(SQLException e) {
|
||||
if (this.err == null) {
|
||||
// Just set it.
|
||||
LOG.error("Got exception in update thread: "
|
||||
+ StringUtils.stringifyException(e));
|
||||
this.err = e;
|
||||
} else {
|
||||
// Slot is full. Log it and discard.
|
||||
LOG.error("SQLException in update thread but error slot full: "
|
||||
+ StringUtils.stringifyException(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* RecordWriter to write the output to a row in a database table.
|
||||
* The actual database updates are executed in a second thread.
|
||||
*/
|
||||
public class ExportRecordWriter extends RecordWriter<K, V> {
|
||||
public class ExportRecordWriter extends AsyncSqlRecordWriter<K, V> {
|
||||
|
||||
protected Connection connection;
|
||||
|
||||
protected Configuration conf;
|
||||
|
||||
protected int rowsPerStmt; // rows to insert per statement.
|
||||
|
||||
// Buffer for records to be put in an INSERT statement.
|
||||
protected List<SqoopRecord> records;
|
||||
|
||||
protected String tableName;
|
||||
protected String [] columnNames; // The columns to insert into.
|
||||
protected int columnCount; // If columnNames is null, tells ## of cols.
|
||||
|
||||
// Background thread to actually perform the updates.
|
||||
private ExportUpdateThread updateThread;
|
||||
private boolean startedUpdateThread;
|
||||
private String tableName;
|
||||
private String [] columnNames; // The columns to insert into.
|
||||
private int columnCount; // If columnNames is null, tells ## of cols.
|
||||
|
||||
public ExportRecordWriter(TaskAttemptContext context)
|
||||
throws ClassNotFoundException, SQLException {
|
||||
this.conf = context.getConfiguration();
|
||||
super(context);
|
||||
|
||||
this.rowsPerStmt = conf.getInt(RECORDS_PER_STATEMENT_KEY,
|
||||
DEFAULT_RECORDS_PER_STATEMENT);
|
||||
int stmtsPerTx = conf.getInt(STATEMENTS_PER_TRANSACTION_KEY,
|
||||
DEFAULT_STATEMENTS_PER_TRANSACTION);
|
||||
Configuration conf = getConf();
|
||||
|
||||
DBConfiguration dbConf = new DBConfiguration(conf);
|
||||
this.connection = dbConf.getConnection();
|
||||
this.tableName = dbConf.getOutputTableName();
|
||||
this.columnNames = dbConf.getOutputFieldNames();
|
||||
this.columnCount = dbConf.getOutputFieldCount();
|
||||
}
|
||||
|
||||
this.connection.setAutoCommit(false);
|
||||
/**
|
||||
* @return the name of the table we are inserting into.
|
||||
*/
|
||||
protected final String getTableName() {
|
||||
return tableName;
|
||||
}
|
||||
|
||||
this.records = new ArrayList<SqoopRecord>(this.rowsPerStmt);
|
||||
/**
|
||||
* @return the list of columns we are updating.
|
||||
*/
|
||||
protected final String [] getColumnNames() {
|
||||
if (null == columnNames) {
|
||||
return null;
|
||||
} else {
|
||||
return Arrays.copyOf(columnNames, columnNames.length);
|
||||
}
|
||||
}
|
||||
|
||||
this.updateThread = new ExportUpdateThread(connection, stmtsPerTx);
|
||||
this.updateThread.setDaemon(true);
|
||||
this.startedUpdateThread = false;
|
||||
/**
|
||||
* @return the number of columns we are updating.
|
||||
*/
|
||||
protected final int getColumnCount() {
|
||||
return columnCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
protected PreparedStatement getPreparedStatement(
|
||||
List<SqoopRecord> userRecords) throws SQLException {
|
||||
|
||||
PreparedStatement stmt = null;
|
||||
|
||||
// Synchronize on connection to ensure this does not conflict
|
||||
// with the operations in the update thread.
|
||||
Connection conn = getConnection();
|
||||
synchronized (conn) {
|
||||
stmt = conn.prepareStatement(getInsertStatement(userRecords.size()));
|
||||
}
|
||||
|
||||
// Inject the record parameters into the VALUES clauses.
|
||||
int position = 0;
|
||||
for (SqoopRecord record : userRecords) {
|
||||
position += record.write(stmt, position);
|
||||
}
|
||||
|
||||
return stmt;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -402,95 +204,5 @@ protected String getInsertStatement(int numRows) {
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes the current contents of 'records' and formats and executes the
|
||||
* INSERT statement.
|
||||
* @param closeConn if true, commits the transaction and closes the
|
||||
* connection.
|
||||
*/
|
||||
private void insertRows(boolean closeConn)
|
||||
throws InterruptedException, SQLException {
|
||||
|
||||
if (!startedUpdateThread) {
|
||||
this.updateThread.start();
|
||||
this.startedUpdateThread = true;
|
||||
}
|
||||
|
||||
PreparedStatement stmt = null;
|
||||
boolean successfulPut = false;
|
||||
try {
|
||||
if (records.size() > 0) {
|
||||
// Synchronize on connection to ensure this does not conflict
|
||||
// with the operations in the update thread.
|
||||
synchronized (connection) {
|
||||
stmt = connection.prepareStatement(
|
||||
getInsertStatement(records.size()));
|
||||
}
|
||||
|
||||
// Inject the record parameters into the VALUES clauses.
|
||||
int position = 0;
|
||||
for (SqoopRecord record : records) {
|
||||
position += record.write(stmt, position);
|
||||
}
|
||||
|
||||
this.records.clear();
|
||||
}
|
||||
|
||||
// Pass this operation off to the update thread. This will block if
|
||||
// the update thread is already performing an update.
|
||||
AsyncDBOperation op = new AsyncDBOperation(stmt, closeConn, closeConn);
|
||||
updateThread.put(op);
|
||||
successfulPut = true; // op has been posted to the other thread.
|
||||
} finally {
|
||||
if (!successfulPut && null != stmt) {
|
||||
// We created a statement but failed to enqueue it. Close it.
|
||||
stmt.close();
|
||||
}
|
||||
}
|
||||
|
||||
// Check for any previous SQLException. If one happened, rethrow it here.
|
||||
SQLException lastException = updateThread.getLastError();
|
||||
if (null != lastException) {
|
||||
throw lastException;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void close(TaskAttemptContext context)
|
||||
throws IOException, InterruptedException {
|
||||
try {
|
||||
insertRows(true);
|
||||
} catch (SQLException sqle) {
|
||||
throw new IOException(sqle);
|
||||
} finally {
|
||||
updateThread.join();
|
||||
}
|
||||
|
||||
// If we're not leaving on an error return path already,
|
||||
// now that updateThread is definitely stopped, check that the
|
||||
// error slot remains empty.
|
||||
SQLException lastErr = updateThread.getLastError();
|
||||
if (null != lastErr) {
|
||||
throw new IOException(lastErr);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void write(K key, V value)
|
||||
throws InterruptedException, IOException {
|
||||
try {
|
||||
records.add((SqoopRecord) key.clone());
|
||||
if (records.size() >= this.rowsPerStmt) {
|
||||
insertRows(false);
|
||||
}
|
||||
} catch (CloneNotSupportedException cnse) {
|
||||
throw new IOException("Could not buffer record", cnse);
|
||||
} catch (SQLException sqlException) {
|
||||
throw new IOException(sqlException);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -61,15 +61,16 @@ public OracleExportRecordWriter(TaskAttemptContext context)
|
||||
protected String getInsertStatement(int numRows) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
sb.append("INSERT INTO " + tableName + " ");
|
||||
sb.append("INSERT INTO " + getTableName() + " ");
|
||||
|
||||
int numSlots;
|
||||
if (this.columnNames != null) {
|
||||
numSlots = this.columnNames.length;
|
||||
String [] colNames = getColumnNames();
|
||||
if (colNames != null) {
|
||||
numSlots = colNames.length;
|
||||
|
||||
sb.append("(");
|
||||
boolean first = true;
|
||||
for (String col : columnNames) {
|
||||
for (String col : colNames) {
|
||||
if (!first) {
|
||||
sb.append(", ");
|
||||
}
|
||||
@ -80,7 +81,7 @@ protected String getInsertStatement(int numRows) {
|
||||
|
||||
sb.append(") ");
|
||||
} else {
|
||||
numSlots = this.columnCount; // set if columnNames is null.
|
||||
numSlots = getColumnCount(); // set if columnNames is null.
|
||||
}
|
||||
|
||||
// generates the (?, ?, ?...) used for each row.
|
||||
|
@ -0,0 +1,188 @@
|
||||
/**
|
||||
* Licensed to Cloudera, Inc. under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Cloudera, Inc. licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.cloudera.sqoop.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hadoop.mapreduce.RecordWriter;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
|
||||
|
||||
import com.cloudera.sqoop.lib.SqoopRecord;
|
||||
|
||||
/**
|
||||
* Update an existing table of data with new value data.
|
||||
* This requires a designated 'key column' for the WHERE clause
|
||||
* of an UPDATE statement.
|
||||
*
|
||||
* Updates are executed en batch in the PreparedStatement.
|
||||
*
|
||||
* Uses DBOutputFormat/DBConfiguration for configuring the output.
|
||||
*/
|
||||
public class UpdateOutputFormat<K extends SqoopRecord, V>
|
||||
extends AsyncSqlOutputFormat<K, V> {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(UpdateOutputFormat.class);
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void checkOutputSpecs(JobContext context)
|
||||
throws IOException, InterruptedException {
|
||||
Configuration conf = context.getConfiguration();
|
||||
DBConfiguration dbConf = new DBConfiguration(conf);
|
||||
|
||||
// Sanity check all the configuration values we need.
|
||||
if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
|
||||
throw new IOException("Database connection URL is not set.");
|
||||
} else if (null == dbConf.getOutputTableName()) {
|
||||
throw new IOException("Table name is not set for export.");
|
||||
} else if (null == dbConf.getOutputFieldNames()) {
|
||||
throw new IOException(
|
||||
"Output field names are null.");
|
||||
} else if (null == conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY)) {
|
||||
throw new IOException("Update key column is not set for export.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
|
||||
throws IOException {
|
||||
try {
|
||||
return new UpdateRecordWriter(context);
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* RecordWriter to write the output to UPDATE statements modifying rows
|
||||
* in the database.
|
||||
*/
|
||||
public class UpdateRecordWriter extends AsyncSqlRecordWriter<K, V> {
|
||||
|
||||
private String tableName;
|
||||
private String [] columnNames; // The columns to update.
|
||||
private String updateCol; // The column containing the fixed key.
|
||||
|
||||
public UpdateRecordWriter(TaskAttemptContext context)
|
||||
throws ClassNotFoundException, SQLException {
|
||||
super(context);
|
||||
|
||||
Configuration conf = getConf();
|
||||
|
||||
DBConfiguration dbConf = new DBConfiguration(conf);
|
||||
this.tableName = dbConf.getOutputTableName();
|
||||
this.columnNames = dbConf.getOutputFieldNames();
|
||||
this.updateCol = conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY);
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
protected boolean isBatchExec() {
|
||||
// We use batches here.
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the name of the table we are inserting into.
|
||||
*/
|
||||
protected final String getTableName() {
|
||||
return tableName;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the list of columns we are updating.
|
||||
*/
|
||||
protected final String [] getColumnNames() {
|
||||
if (null == columnNames) {
|
||||
return null;
|
||||
} else {
|
||||
return Arrays.copyOf(columnNames, columnNames.length);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the column we are using to determine the row to update.
|
||||
*/
|
||||
protected final String getUpdateCol() {
|
||||
return updateCol;
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
protected PreparedStatement getPreparedStatement(
|
||||
List<SqoopRecord> userRecords) throws SQLException {
|
||||
|
||||
PreparedStatement stmt = null;
|
||||
|
||||
// Synchronize on connection to ensure this does not conflict
|
||||
// with the operations in the update thread.
|
||||
Connection conn = getConnection();
|
||||
synchronized (conn) {
|
||||
stmt = conn.prepareStatement(getUpdateStatement());
|
||||
}
|
||||
|
||||
// Inject the record parameters into the UPDATE and WHERE clauses. This
|
||||
// assumes that the update key column is the last column serialized in
|
||||
// by the underlying record. Our code auto-gen process for exports was
|
||||
// responsible for taking care of this constraint.
|
||||
for (SqoopRecord record : userRecords) {
|
||||
record.write(stmt, 0);
|
||||
stmt.addBatch();
|
||||
}
|
||||
|
||||
return stmt;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return an UPDATE statement that modifies rows based on a single key
|
||||
* column (with the intent of modifying a single row).
|
||||
*/
|
||||
protected String getUpdateStatement() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("UPDATE " + this.tableName + " SET ");
|
||||
|
||||
boolean first = true;
|
||||
for (String col : this.columnNames) {
|
||||
if (!first) {
|
||||
sb.append(", ");
|
||||
}
|
||||
|
||||
sb.append(col);
|
||||
sb.append("=?");
|
||||
first = false;
|
||||
}
|
||||
|
||||
sb.append(" WHERE ");
|
||||
sb.append(this.updateCol);
|
||||
sb.append("=?");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
}
|
@ -73,6 +73,7 @@ public static Test suite() {
|
||||
suite.addTestSuite(TestLargeObjectLoader.class);
|
||||
suite.addTestSuite(TestDirectImportUtils.class);
|
||||
suite.addTestSuite(TestLobFile.class);
|
||||
suite.addTestSuite(TestExportUpdate.class);
|
||||
suite.addTest(MapreduceTests.suite());
|
||||
|
||||
return suite;
|
||||
|
371
src/test/com/cloudera/sqoop/TestExportUpdate.java
Normal file
371
src/test/com/cloudera/sqoop/TestExportUpdate.java
Normal file
@ -0,0 +1,371 @@
|
||||
/**
|
||||
* Licensed to Cloudera, Inc. under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Cloudera, Inc. licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.cloudera.sqoop;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.cloudera.sqoop.testutil.ExportJobTestCase;
|
||||
|
||||
import org.junit.Before;
|
||||
|
||||
/**
|
||||
* Test that we can update a copy of data in the database,
|
||||
* based on newer data in HDFS.
|
||||
*/
|
||||
public class TestExportUpdate extends ExportJobTestCase {
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
// start the server
|
||||
super.setUp();
|
||||
|
||||
if (useHsqldbTestServer()) {
|
||||
// throw away any existing data that might be in the database.
|
||||
try {
|
||||
this.getTestServer().dropExistingSchema();
|
||||
} catch (SQLException sqlE) {
|
||||
fail(sqlE.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getTablePrefix() {
|
||||
return "UPDATE_TABLE_";
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the underlying table to update.
|
||||
*/
|
||||
private void populateDatabase(int numRows) throws SQLException {
|
||||
Connection conn = getConnection();
|
||||
|
||||
PreparedStatement statement = conn.prepareStatement(
|
||||
"CREATE TABLE " + getTableName()
|
||||
+ " (A INT NOT NULL, B VARCHAR(32), C INT)");
|
||||
try {
|
||||
statement.executeUpdate();
|
||||
conn.commit();
|
||||
} finally {
|
||||
statement.close();
|
||||
statement = null;
|
||||
}
|
||||
|
||||
try {
|
||||
for (int i = 0; i < numRows; i++) {
|
||||
statement = conn.prepareStatement("INSERT INTO " + getTableName()
|
||||
+ " VALUES (" + i + ", 'foo" + i + "', " + i + ")");
|
||||
statement.executeUpdate();
|
||||
statement.close();
|
||||
statement = null;
|
||||
}
|
||||
} finally {
|
||||
if (null != statement) {
|
||||
statement.close();
|
||||
}
|
||||
}
|
||||
|
||||
conn.commit();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a set of files that will be used as the input to the update
|
||||
* process.
|
||||
* @param numFiles the number of files to generate
|
||||
* @param updatesPerFile the number of rows to create in each file
|
||||
* @param keyCol a value between 0 and 2 specifying whether 'a',
|
||||
* 'b', or 'c' ({@see populateDatabase()}) is the key column to keep
|
||||
* the same.
|
||||
* @param startOffsets is an optional list of row ids/values for a/c
|
||||
* which are the record ids at which the update files begin.
|
||||
* For instance, if numFiles=3, updatesPerFile=2, and keyCol=0 then
|
||||
* if startOffsets is {5, 10, 12}, files will be generated to update
|
||||
* rows with A=5,6; A=10,11; A=12,13.
|
||||
*
|
||||
* If startOffsets is empty or underspecified (given numFiles), then
|
||||
* subsequent files will start immediately after the previous file.
|
||||
*/
|
||||
private void createUpdateFiles(int numFiles, int updatesPerFile,
|
||||
int keyCol, int... startOffsets) throws IOException {
|
||||
|
||||
FileSystem fs = FileSystem.getLocal(new Configuration());
|
||||
|
||||
int rowId = 0;
|
||||
for (int i = 0; i < numFiles; i++) {
|
||||
OutputStream os = fs.create(new Path(getTablePath(), "" + i + ".txt"));
|
||||
BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
|
||||
|
||||
if (null != startOffsets && startOffsets.length > i) {
|
||||
// If a start offset has been specified for this file, go there.
|
||||
// Otherwise, just carry over from the previous file iteration.
|
||||
rowId = startOffsets[i];
|
||||
}
|
||||
|
||||
for (int j = 0; j < updatesPerFile; j++) {
|
||||
w.write(getUpdateStringForRow(keyCol, rowId++));
|
||||
}
|
||||
|
||||
w.close();
|
||||
os.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a string of text representing an update for one row
|
||||
* of the database. keyCol is a value in [0, 2] representing which
|
||||
* column is kept fixed. rowId specifies the row being updated.
|
||||
*/
|
||||
private String getUpdateStringForRow(int keyCol, int rowId) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
int [] rowInts = new int[3]; // There are 3 columns in the table.
|
||||
for (int i = 0; i < 3; i++) {
|
||||
if (keyCol == i) {
|
||||
// Keep this column fixed.
|
||||
rowInts[i] = rowId;
|
||||
} else {
|
||||
// Update the int in this column.
|
||||
rowInts[i] = rowId * 2;
|
||||
}
|
||||
}
|
||||
|
||||
sb.append(rowInts[0]);
|
||||
sb.append("\tfoo");
|
||||
sb.append(rowInts[1]);
|
||||
sb.append("\t");
|
||||
sb.append(rowInts[2]);
|
||||
sb.append("\n");
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies the number of rows in the table.
|
||||
*/
|
||||
private void verifyRowCount(int expectedCount) throws SQLException {
|
||||
String query = "SELECT COUNT(*) FROM " + getTableName();
|
||||
PreparedStatement statement = null;
|
||||
ResultSet rs = null;
|
||||
|
||||
try {
|
||||
Connection conn = getConnection();
|
||||
statement = conn.prepareStatement(query);
|
||||
rs = statement.executeQuery();
|
||||
|
||||
boolean success = rs.next();
|
||||
assertTrue("Expected at least one result", success);
|
||||
|
||||
int trueCount = rs.getInt(1);
|
||||
assertEquals(expectedCount, trueCount);
|
||||
|
||||
// This query should have returned exactly one row.
|
||||
success = rs.next();
|
||||
assertFalse("Expected no more than one output record", success);
|
||||
} finally {
|
||||
if (null != rs) {
|
||||
try {
|
||||
rs.close();
|
||||
} catch (SQLException sqle) {
|
||||
LOG.error("Error closing result set: "
|
||||
+ StringUtils.stringifyException(sqle));
|
||||
}
|
||||
}
|
||||
|
||||
if (null != statement) {
|
||||
statement.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that a particular row has the expected values.
|
||||
*/
|
||||
private void verifyRow(String keyColName, String keyVal,
|
||||
String... expectedVals) throws SQLException {
|
||||
String query = "SELECT A, B, C FROM " + getTableName() + " WHERE "
|
||||
+ keyColName + " = " + keyVal;
|
||||
PreparedStatement statement = null;
|
||||
ResultSet rs = null;
|
||||
|
||||
try {
|
||||
Connection conn = getConnection();
|
||||
statement = conn.prepareStatement(query);
|
||||
rs = statement.executeQuery();
|
||||
|
||||
boolean success = rs.next();
|
||||
assertTrue("Expected at least one output record", success);
|
||||
|
||||
// Assert that all three columns have the correct values.
|
||||
for (int i = 0; i < expectedVals.length; i++) {
|
||||
String expected = expectedVals[i];
|
||||
String result = rs.getString(i + 1);
|
||||
assertEquals("Invalid response for column " + i + "; got " + result
|
||||
+ " when expected " + expected, expected, result);
|
||||
}
|
||||
|
||||
// This query should have returned exactly one row.
|
||||
success = rs.next();
|
||||
assertFalse("Expected no more than one output record", success);
|
||||
} finally {
|
||||
if (null != rs) {
|
||||
try {
|
||||
rs.close();
|
||||
} catch (SQLException sqle) {
|
||||
LOG.error("Error closing result set: "
|
||||
+ StringUtils.stringifyException(sqle));
|
||||
}
|
||||
}
|
||||
|
||||
if (null != statement) {
|
||||
statement.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void runUpdate(int numMappers, String updateCol) throws IOException {
|
||||
runExport(getArgv(true, 2, 2, "-m", "" + numMappers,
|
||||
"--update-key", updateCol));
|
||||
}
|
||||
|
||||
public void testBasicUpdate() throws Exception {
|
||||
// Test that we can do a single-task single-file update.
|
||||
// This updates the entire database.
|
||||
|
||||
populateDatabase(10);
|
||||
createUpdateFiles(1, 10, 0, 0);
|
||||
runUpdate(1, "A");
|
||||
verifyRowCount(10);
|
||||
// Check a few rows...
|
||||
verifyRow("A", "0", "0", "foo0", "0");
|
||||
verifyRow("A", "1", "1", "foo2", "2");
|
||||
verifyRow("A", "9", "9", "foo18", "18");
|
||||
}
|
||||
|
||||
public void testEmptyTable() throws Exception {
|
||||
// Test that an empty table will "accept" updates that modify
|
||||
// no rows; no new data is injected into the database.
|
||||
populateDatabase(0);
|
||||
createUpdateFiles(1, 10, 0, 0);
|
||||
runUpdate(1, "A");
|
||||
verifyRowCount(0);
|
||||
}
|
||||
|
||||
public void testEmptyFiles() throws Exception {
|
||||
// An empty input file results in no changes to a db table.
|
||||
populateDatabase(10);
|
||||
createUpdateFiles(1, 0, 0);
|
||||
runUpdate(1, "A");
|
||||
verifyRowCount(10);
|
||||
// Check that a few rows have not changed at all.
|
||||
verifyRow("A", "0", "0", "foo0", "0");
|
||||
verifyRow("A", "1", "1", "foo1", "1");
|
||||
verifyRow("A", "9", "9", "foo9", "9");
|
||||
}
|
||||
|
||||
public void testStringCol() throws Exception {
|
||||
// Test that we can do modifications based on the string "B" column.
|
||||
populateDatabase(10);
|
||||
createUpdateFiles(1, 10, 1);
|
||||
runUpdate(1, "B");
|
||||
verifyRowCount(10);
|
||||
verifyRow("B", "'foo0'", "0", "foo0", "0");
|
||||
verifyRow("B", "'foo1'", "2", "foo1", "2");
|
||||
verifyRow("B", "'foo9'", "18", "foo9", "18");
|
||||
}
|
||||
|
||||
public void testLastCol() throws Exception {
|
||||
// Test that we can do modifications based on the third int column.
|
||||
populateDatabase(10);
|
||||
createUpdateFiles(1, 10, 2);
|
||||
runUpdate(1, "C");
|
||||
verifyRowCount(10);
|
||||
verifyRow("C", "0", "0", "foo0", "0");
|
||||
verifyRow("C", "1", "2", "foo2", "1");
|
||||
verifyRow("C", "9", "18", "foo18", "9");
|
||||
}
|
||||
|
||||
public void testMultiMaps() throws Exception {
|
||||
// Test that we can handle multiple map tasks.
|
||||
populateDatabase(20);
|
||||
createUpdateFiles(2, 10, 0);
|
||||
runUpdate(1, "A");
|
||||
verifyRowCount(20);
|
||||
verifyRow("A", "0", "0", "foo0", "0");
|
||||
verifyRow("A", "1", "1", "foo2", "2");
|
||||
verifyRow("A", "9", "9", "foo18", "18");
|
||||
verifyRow("A", "10", "10", "foo20", "20");
|
||||
verifyRow("A", "15", "15", "foo30", "30");
|
||||
verifyRow("A", "19", "19", "foo38", "38");
|
||||
}
|
||||
|
||||
public void testSubsetUpdate() throws Exception {
|
||||
// Update only a few rows in the middle of the table.
|
||||
populateDatabase(10);
|
||||
createUpdateFiles(1, 5, 0, 3); // only rows A=3..7 change.
|
||||
runUpdate(1, "A");
|
||||
verifyRowCount(10);
|
||||
|
||||
// Verify these rows are unchanged.
|
||||
verifyRow("A", "0", "0", "foo0", "0");
|
||||
verifyRow("A", "2", "2", "foo2", "2");
|
||||
verifyRow("A", "8", "8", "foo8", "8");
|
||||
verifyRow("A", "9", "9", "foo9", "9");
|
||||
|
||||
// Verify these rows have been updated.
|
||||
verifyRow("A", "3", "3", "foo6", "6");
|
||||
verifyRow("A", "5", "5", "foo10", "10");
|
||||
verifyRow("A", "7", "7", "foo14", "14");
|
||||
}
|
||||
|
||||
public void testSubsetUpdate2() throws Exception {
|
||||
// Update only some of the rows in the db. Also include some
|
||||
// updates that do not affect actual rows in the table.
|
||||
// These should just be ignored.
|
||||
|
||||
populateDatabase(10);
|
||||
// Create two files that update four rows each.
|
||||
// File0 updates A=-2..1 (-2 and -1 don't exist).
|
||||
// File1 updates A=8..11 (10 and 11 don't exist).
|
||||
createUpdateFiles(2, 4, 0, -2, 8);
|
||||
runUpdate(2, "A");
|
||||
verifyRowCount(10);
|
||||
|
||||
// Verify these rows are unchanged.
|
||||
verifyRow("A", "4", "4", "foo4", "4");
|
||||
verifyRow("A", "7", "7", "foo7", "7");
|
||||
|
||||
// Verify these updates succeeded.
|
||||
verifyRow("A", "1", "1", "foo2", "2");
|
||||
verifyRow("A", "8", "8", "foo16", "16");
|
||||
verifyRow("A", "9", "9", "foo18", "18");
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user