5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 03:59:18 +08:00

SQOOP-1341: Sqoop Export Upsert for MySQL lacks batch support

(Andy Skelton via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2014-06-28 16:42:34 -07:00
parent 462bd9170c
commit d03faf3544

View File

@ -25,7 +25,10 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException; import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List;
/** /**
* Output format for MySQL Update/insert functionality. We will use MySQL * Output format for MySQL Update/insert functionality. We will use MySQL
@ -63,7 +66,33 @@ public MySQLUpsertRecordWriter(TaskAttemptContext context)
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
protected String getUpdateStatement() { protected PreparedStatement getPreparedStatement(
List<SqoopRecord> userRecords) throws SQLException {
PreparedStatement stmt = null;
// Synchronize on connection to ensure this does not conflict
// with the operations in the update thread.
Connection conn = getConnection();
synchronized (conn) {
stmt = conn.prepareStatement(getUpdateStatement(userRecords.size()));
}
// Inject the record parameters into the UPDATE and WHERE clauses. This
// assumes that the update key column is the last column serialized in
// by the underlying record. Our code auto-gen process for exports was
// responsible for taking care of this constraint.
int i = 0;
for (SqoopRecord record : userRecords) {
record.write(stmt, i);
i += columnNames.length;
}
stmt.addBatch();
return stmt;
}
protected String getUpdateStatement(int numRows) {
boolean first; boolean first;
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO "); sb.append("INSERT INTO ");
@ -80,14 +109,16 @@ protected String getUpdateStatement() {
} }
sb.append(") VALUES("); sb.append(") VALUES(");
first = true; for (int i = 0; i < numRows; i++) {
for (int i = 0; i < columnNames.length; i++) { if (i > 0) {
if (first) { sb.append("),(");
first = false; }
} else { for (int j = 0; j < columnNames.length; j++) {
sb.append(", "); if (j > 0) {
sb.append(", ");
}
sb.append("?");
} }
sb.append("?");
} }
sb.append(") ON DUPLICATE KEY UPDATE "); sb.append(") ON DUPLICATE KEY UPDATE ");