From d03faf3544b1ef07c9496fa7641ed6a42f58cb1d Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Sat, 28 Jun 2014 16:42:34 -0700 Subject: [PATCH] SQOOP-1341: Sqoop Export Upsert for MySQL lacks batch support (Andy Skelton via Jarek Jarcec Cecho) --- .../mysql/MySQLUpsertOutputFormat.java | 47 +++++++++++++++---- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java index e6c758bc..72fffc43 100644 --- a/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java +++ b/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java @@ -25,7 +25,10 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.List; /** * Output format for MySQL Update/insert functionality. We will use MySQL @@ -63,7 +66,33 @@ public MySQLUpsertRecordWriter(TaskAttemptContext context) * {@inheritDoc} */ @Override - protected String getUpdateStatement() { + protected PreparedStatement getPreparedStatement( + List userRecords) throws SQLException { + + PreparedStatement stmt = null; + + // Synchronize on connection to ensure this does not conflict + // with the operations in the update thread. + Connection conn = getConnection(); + synchronized (conn) { + stmt = conn.prepareStatement(getUpdateStatement(userRecords.size())); + } + + // Inject the record parameters into the UPDATE and WHERE clauses. This + // assumes that the update key column is the last column serialized in + // by the underlying record. Our code auto-gen process for exports was + // responsible for taking care of this constraint. + int i = 0; + for (SqoopRecord record : userRecords) { + record.write(stmt, i); + i += columnNames.length; + } + stmt.addBatch(); + + return stmt; + } + + protected String getUpdateStatement(int numRows) { boolean first; StringBuilder sb = new StringBuilder(); sb.append("INSERT INTO "); @@ -80,14 +109,16 @@ protected String getUpdateStatement() { } sb.append(") VALUES("); - first = true; - for (int i = 0; i < columnNames.length; i++) { - if (first) { - first = false; - } else { - sb.append(", "); + for (int i = 0; i < numRows; i++) { + if (i > 0) { + sb.append("),("); + } + for (int j = 0; j < columnNames.length; j++) { + if (j > 0) { + sb.append(", "); + } + sb.append("?"); } - sb.append("?"); } sb.append(") ON DUPLICATE KEY UPDATE ");