5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 03:11:00 +08:00

SQOOP-621 Requesting support for upsert export with MySQL

This commit is contained in:
Cheolsoo Park 2012-10-17 20:52:24 -07:00
parent 3aed031676
commit 2750df90f9
7 changed files with 237 additions and 1 deletions

View File

@ -21,6 +21,24 @@
Notes for specific connectors Notes for specific connectors
----------------------------- -----------------------------
MySQL JDBC Connector
~~~~~~~~~~~~~~~~~~~~
This section contains information specific to MySQL JDBC Connector.
Upsert functionality
^^^^^^^^^^^^^^^^^^^^
MySQL JDBC Connector is supporting upsert functionality using argument
+\--update-mode allowinsert+. To achieve that Sqoop is using MySQL clause INSERT INTO
... ON DUPLICATE KEY UPDATE. This clause do not allow user to specify which columns
should be used to distinct whether we should update existing row or add new row. Instead
this clause relies on table's unique keys (primary key belongs to this set). MySQL
will try to insert new row and if the insertion fails with duplicate unique key error
it will update appropriate row instead. As a result, Sqoop is ignoring values specified
in parameter +\--update-key+, however user needs to specify at least one valid column
to turn on update mode itself.
PostgreSQL Connector PostgreSQL Connector
~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~

View File

@ -104,6 +104,12 @@ public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context)
exportJob.runExport(); exportJob.runExport();
} }
public void upsertTable(com.cloudera.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
throw new ExportException("MySQL direct connector does not support upsert"
+ " mode. Please use JDBC based connector (remove --direct parameter)");
}
@Override @Override
public boolean supportsStagingForExport() { public boolean supportsStagingForExport() {
return false; return false;

View File

@ -34,6 +34,9 @@
import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.util.ImportException; import com.cloudera.sqoop.util.ImportException;
import com.cloudera.sqoop.util.ExportException;
import com.cloudera.sqoop.mapreduce.JdbcUpsertExportJob;
import org.apache.sqoop.mapreduce.mysql.MySQLUpsertOutputFormat;
/** /**
* Manages connections to MySQL databases. * Manages connections to MySQL databases.
@ -108,6 +111,39 @@ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
super.importTable(context); super.importTable(context);
} }
/**
* {@inheritDoc}
*/
@Override
public void upsertTable(com.cloudera.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
LOG.warn("MySQL Connector upsert functionality is using INSERT ON");
LOG.warn("DUPLICATE KEY UPDATE clause that relies on table's unique key.");
LOG.warn("Insert/update distinction is therefore independent on column");
LOG.warn("names specified in --update-key parameter. Please see MySQL");
LOG.warn("documentation for additional limitations.");
JdbcUpsertExportJob exportJob =
new JdbcUpsertExportJob(context, MySQLUpsertOutputFormat.class);
exportJob.runExport();
}
@Override
/**
* {@inheritDoc}
*/
public void configureDbOutputColumns(SqoopOptions options) {
// In case that we're running upsert, we do not want to change column order
// as we're actually going to use INSERT INTO ... ON DUPLICATE KEY UPDATE
// clause.
if (options.getUpdateMode() == SqoopOptions.UpdateMode.AllowInsert) {
return;
}
super.configureDbOutputColumns(options);
}
/** /**
* Set a flag to prevent printing the --direct warning twice. * Set a flag to prevent printing the --direct warning twice.
*/ */

View File

@ -121,7 +121,7 @@ protected void configureOutputFormat(Job job, String tableName,
} }
if (updateKeys.size() == 0) { if (updateKeys.size() == 0) {
throw new IOException("Unpdate key columns not valid in export job"); throw new IOException("Update key columns not valid in export job");
} }
// Make sure we strip out the key column from this list. // Make sure we strip out the key column from this list.

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.mysql;
import com.cloudera.sqoop.lib.SqoopRecord;
import com.cloudera.sqoop.mapreduce.UpdateOutputFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
import java.sql.SQLException;
/**
* Output format for MySQL Update/insert functionality. We will use MySQL
* clause INSERT INTO ... ON DUPLICATE KEY UPDATE, for more info please
* see official MySQL documentation.
*/
public class MySQLUpsertOutputFormat<K extends SqoopRecord, V>
extends UpdateOutputFormat<K, V> {
private final Log log =
LogFactory.getLog(getClass());
@Override
/** {@inheritDoc} */
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException {
try {
return new MySQLUpsertRecordWriter(context);
} catch (Exception e) {
throw new IOException(e);
}
}
/**
* RecordWriter to write the output to UPDATE/INSERT statements.
*/
public class MySQLUpsertRecordWriter extends UpdateRecordWriter {
public MySQLUpsertRecordWriter(TaskAttemptContext context)
throws ClassNotFoundException, SQLException {
super(context);
}
/**
* {@inheritDoc}
*/
@Override
protected String getUpdateStatement() {
boolean first;
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO ");
sb.append(tableName);
sb.append("(");
first = true;
for (String column : columnNames) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append(column);
}
sb.append(") VALUES(");
first = true;
for (int i = 0; i < columnNames.length; i++) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append("?");
}
sb.append(") ON DUPLICATE KEY UPDATE ");
first = true;
for (String column : columnNames) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append(column).append("=VALUES(").append(column).append(")");
}
String query = sb.toString();
log.debug("Using upsert query: " + query);
return query;
}
}
}

View File

@ -166,4 +166,27 @@ public String getType() {
verifyExport(TOTAL_RECORDS); verifyExport(TOTAL_RECORDS);
assertColMinAndMax(forIdx(0), gen); assertColMinAndMax(forIdx(0), gen);
} }
public void testUpsert() throws IOException, SQLException {
final int TOTAL_RECORDS = 10;
createTextFile(0, TOTAL_RECORDS, false);
createTable();
// Insert only
runExport(getArgv(true, 10, 10, "--update-key", "id",
"--update-mode", "allowinsert"));
verifyExport(TOTAL_RECORDS);
// Update only
runExport(getArgv(true, 10, 10, "--update-key", "id",
"--update-mode", "allowinsert"));
verifyExport(TOTAL_RECORDS);
// Insert & update
createTextFile(0, TOTAL_RECORDS * 2, false);
runExport(getArgv(true, 10, 10, "--update-key", "id",
"--update-mode", "allowinsert"));
verifyExport(TOTAL_RECORDS * 2);
}
} }

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.manager;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
/**
* Manual test case with all MySQL related tests.
*/
public final class ManualMySQLTests extends TestCase {
private ManualMySQLTests() { }
public static Test suite() {
TestSuite suite = new TestSuite("All MySQL test cases");
suite.addTestSuite(DirectMySQLTest.class);
suite.addTestSuite(DirectMySQLExportTest.class);
suite.addTestSuite(JdbcMySQLExportTest.class);
suite.addTestSuite(MySQLAuthTest.class);
suite.addTestSuite(MySQLCompatTest.class);
return suite;
}
}