diff --git a/src/docs/man/sqoop-export.txt b/src/docs/man/sqoop-export.txt index 6090ea15..50052cc1 100644 --- a/src/docs/man/sqoop-export.txt +++ b/src/docs/man/sqoop-export.txt @@ -55,6 +55,10 @@ Export control options --clear-staging-table:: Will result in deletion of any data that exists in the staging table. +--batch:: + Use batch mode for underlying statement execution. This is useful, for example, + for those databases that do not support multirow insert in a single statement yet. + include::input-args.txt[] include::output-args.txt[] diff --git a/src/docs/user/export.txt b/src/docs/user/export.txt index 4f878865..4401c265 100644 --- a/src/docs/user/export.txt +++ b/src/docs/user/export.txt @@ -61,6 +61,8 @@ Argument Description the destination table. +\--clear-staging-table+ Indicates that any data present in\ the staging table can be deleted. ++\--batch+ Use batch mode for underlying\ + statement execution. ------------------------------------------------------------------------ The +\--table+ and +\--export-dir+ arguments are required. These diff --git a/src/java/com/cloudera/sqoop/SqoopOptions.java b/src/java/com/cloudera/sqoop/SqoopOptions.java index d760d39b..52ece1db 100644 --- a/src/java/com/cloudera/sqoop/SqoopOptions.java +++ b/src/java/com/cloudera/sqoop/SqoopOptions.java @@ -143,6 +143,7 @@ public enum IncrementalMode { @StoredAsProperty("hdfs.append.dir") private boolean append; @StoredAsProperty("hdfs.file.format") private FileLayout layout; @StoredAsProperty("direct.import") private boolean direct; // "direct mode." + @StoredAsProperty("db.batch") private boolean batchMode; private String tmpDir; // where temp data goes; usually /tmp; not serialized. private String hiveHome; // not serialized to metastore. @StoredAsProperty("hive.import") private boolean hiveImport; @@ -1014,6 +1015,18 @@ public void setDirectMode(boolean isDirect) { this.direct = isDirect; } + /** + * @return true if underlying statements to be executed in batch mode, + * or false if to be executed in a single multirow statement. + */ + public boolean isBatchMode() { + return batchMode; + } + + public void setBatchMode(boolean mode) { + this.batchMode = mode; + } + /** * @return the number of map tasks to use for import. */ diff --git a/src/java/com/cloudera/sqoop/manager/OracleManager.java b/src/java/com/cloudera/sqoop/manager/OracleManager.java index 38554c09..bad45dc7 100644 --- a/src/java/com/cloudera/sqoop/manager/OracleManager.java +++ b/src/java/com/cloudera/sqoop/manager/OracleManager.java @@ -37,8 +37,8 @@ import org.apache.commons.logging.LogFactory; import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat; import com.cloudera.sqoop.mapreduce.JdbcExportJob; -import com.cloudera.sqoop.mapreduce.OracleExportOutputFormat; import com.cloudera.sqoop.mapreduce.db.OracleDataDrivenDBInputFormat; import com.cloudera.sqoop.util.ExportException; import com.cloudera.sqoop.util.ImportException; @@ -377,7 +377,7 @@ public void exportTable(ExportJobContext context) throws IOException, ExportException { context.setConnManager(this); JdbcExportJob exportJob = new JdbcExportJob(context, null, null, - OracleExportOutputFormat.class); + ExportBatchOutputFormat.class); exportJob.runExport(); } diff --git a/src/java/com/cloudera/sqoop/manager/SQLServerManager.java b/src/java/com/cloudera/sqoop/manager/SQLServerManager.java index e1ce2af7..52b55396 100644 --- a/src/java/com/cloudera/sqoop/manager/SQLServerManager.java +++ b/src/java/com/cloudera/sqoop/manager/SQLServerManager.java @@ -24,8 +24,8 @@ import org.apache.commons.logging.LogFactory; import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat; import com.cloudera.sqoop.mapreduce.JdbcExportJob; -import com.cloudera.sqoop.mapreduce.SQLServerExportOutputFormat; import com.cloudera.sqoop.util.ExportException; /** @@ -53,7 +53,7 @@ public void exportTable(ExportJobContext context) throws IOException, ExportException { context.setConnManager(this); JdbcExportJob exportJob = new JdbcExportJob(context, null, null, - SQLServerExportOutputFormat.class); + ExportBatchOutputFormat.class); exportJob.runExport(); } diff --git a/src/java/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java b/src/java/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java index 193cf413..eec27396 100755 --- a/src/java/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java +++ b/src/java/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java @@ -175,10 +175,9 @@ public void close(TaskAttemptContext context) try { try { execUpdate(true, true); + execThread.join(); } catch (SQLException sqle) { throw new IOException(sqle); - } finally { - execThread.join(); } // If we're not leaving on an error return path already, diff --git a/src/java/com/cloudera/sqoop/mapreduce/ExportBatchOutputFormat.java b/src/java/com/cloudera/sqoop/mapreduce/ExportBatchOutputFormat.java new file mode 100644 index 00000000..dd05d0f0 --- /dev/null +++ b/src/java/com/cloudera/sqoop/mapreduce/ExportBatchOutputFormat.java @@ -0,0 +1,140 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import com.cloudera.sqoop.lib.SqoopRecord; + +/** + * This class uses batch mode to execute underlying statements instead of + * using a single multirow insert statement as its superclass. + */ +public class ExportBatchOutputFormat + extends ExportOutputFormat { + + private static final Log LOG = + LogFactory.getLog(ExportBatchOutputFormat.class); + + @Override + /** {@inheritDoc} */ + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException { + try { + return new ExportBatchRecordWriter(context); + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * RecordWriter to write the output to a row in a database table. + * The actual database updates are executed in a second thread. + */ + public class ExportBatchRecordWriter extends ExportRecordWriter { + + public ExportBatchRecordWriter(TaskAttemptContext context) + throws ClassNotFoundException, SQLException { + super(context); + } + + @Override + /** {@inheritDoc} */ + protected boolean isBatchExec() { + // We use batches here. + return true; + } + + @Override + /** {@inheritDoc} */ + protected PreparedStatement getPreparedStatement( + List userRecords) throws SQLException { + + PreparedStatement stmt = null; + + // Synchronize on connection to ensure this does not conflict + // with the operations in the update thread. + Connection conn = getConnection(); + synchronized (conn) { + stmt = conn.prepareStatement(getInsertStatement(userRecords.size())); + } + + // Inject the record parameters into the VALUES clauses. + for (SqoopRecord record : userRecords) { + record.write(stmt, 0); + stmt.addBatch(); + } + + return stmt; + } + + /** + * @return an INSERT statement. + */ + protected String getInsertStatement(int numRows) { + StringBuilder sb = new StringBuilder(); + + sb.append("INSERT INTO " + tableName + " "); + + int numSlots; + if (this.columnNames != null) { + numSlots = this.columnNames.length; + + sb.append("("); + boolean first = true; + for (String col : columnNames) { + if (!first) { + sb.append(", "); + } + + sb.append(col); + first = false; + } + + sb.append(") "); + } else { + numSlots = this.columnCount; // set if columnNames is null. + } + + sb.append("VALUES "); + + // generates the (?, ?, ?...). + sb.append("("); + for (int i = 0; i < numSlots; i++) { + if (i != 0) { + sb.append(", "); + } + + sb.append("?"); + } + sb.append(")"); + + return sb.toString(); + } + } +} diff --git a/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java b/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java index 45bd27eb..1af384ff 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java +++ b/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java @@ -224,7 +224,11 @@ protected Class getOutputFormatClass() throws ClassNotFoundException { Class configuredOF = super.getOutputFormatClass(); if (null == configuredOF) { - return ExportOutputFormat.class; + if (!options.isBatchMode()) { + return ExportOutputFormat.class; + } else { + return ExportBatchOutputFormat.class; + } } else { return configuredOF; } diff --git a/src/java/com/cloudera/sqoop/mapreduce/ExportOutputFormat.java b/src/java/com/cloudera/sqoop/mapreduce/ExportOutputFormat.java index d2a6cf6b..9d4ebe26 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/ExportOutputFormat.java +++ b/src/java/com/cloudera/sqoop/mapreduce/ExportOutputFormat.java @@ -87,9 +87,9 @@ public RecordWriter getRecordWriter(TaskAttemptContext context) */ public class ExportRecordWriter extends AsyncSqlRecordWriter { - private String tableName; - private String [] columnNames; // The columns to insert into. - private int columnCount; // If columnNames is null, tells ## of cols. + protected String tableName; + protected String [] columnNames; // The columns to insert into. + protected int columnCount; // If columnNames is null, tells ## of cols. public ExportRecordWriter(TaskAttemptContext context) throws ClassNotFoundException, SQLException { diff --git a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java index 8f629f10..c307907f 100644 --- a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java +++ b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java @@ -74,6 +74,7 @@ public abstract class BaseSqoopTool extends SqoopTool { public static final String PASSWORD_ARG = "password"; public static final String PASSWORD_PROMPT_ARG = "P"; public static final String DIRECT_ARG = "direct"; + public static final String BATCH_ARG = "batch"; public static final String TABLE_ARG = "table"; public static final String STAGING_TABLE_ARG = "staging-table"; public static final String CLEAR_STAGING_TABLE_ARG = "clear-staging-table"; diff --git a/src/java/com/cloudera/sqoop/tool/ExportTool.java b/src/java/com/cloudera/sqoop/tool/ExportTool.java index b4b10914..d156eeb7 100644 --- a/src/java/com/cloudera/sqoop/tool/ExportTool.java +++ b/src/java/com/cloudera/sqoop/tool/ExportTool.java @@ -169,6 +169,11 @@ protected RelatedOptions getExportOptions() { + "staging table can be deleted") .withLongOpt(CLEAR_STAGING_TABLE_ARG) .create()); + exportOpts.addOption(OptionBuilder + .withDescription("Indicates underlying statements " + + "to be executed in batch mode") + .withLongOpt(BATCH_ARG) + .create()); return exportOpts; } @@ -220,6 +225,10 @@ public void applyOptions(CommandLine in, SqoopOptions out) out.setDirectMode(true); } + if (in.hasOption(BATCH_ARG)) { + out.setBatchMode(true); + } + if (in.hasOption(TABLE_ARG)) { out.setTableName(in.getOptionValue(TABLE_ARG)); } diff --git a/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java b/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java index 8687b0ce..850b754f 100644 --- a/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java +++ b/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java @@ -18,6 +18,7 @@ package com.cloudera.sqoop.manager; +import java.io.IOException; import java.sql.Connection; import java.sql.SQLException; @@ -142,4 +143,27 @@ public void tearDown() { return super.getArgv(includeHadoopFlags, rowsPerStatement, statementsPerTx, subArgv); } + + public void testIntColInBatchMode() throws IOException, SQLException { + final int TOTAL_RECORDS = 10; + + // generate a column equivalent to rownum. + ColumnGenerator gen = new ColumnGenerator() { + public String getExportText(int rowNum) { + return "" + rowNum; + } + public String getVerifyText(int rowNum) { + return "" + rowNum; + } + public String getType() { + return "INTEGER"; + } + }; + + createTextFile(0, TOTAL_RECORDS, false, gen); + createTable(gen); + runExport(getArgv(true, 10, 10, "--batch")); + verifyExport(TOTAL_RECORDS); + assertColMinAndMax(forIdx(0), gen); + } }