diff --git a/src/docs/user/connectors.txt b/src/docs/user/connectors.txt index 930a4996..d912840a 100644 --- a/src/docs/user/connectors.txt +++ b/src/docs/user/connectors.txt @@ -21,6 +21,24 @@ Notes for specific connectors ----------------------------- +MySQL JDBC Connector +~~~~~~~~~~~~~~~~~~~~ + +This section contains information specific to MySQL JDBC Connector. + +Upsert functionality +^^^^^^^^^^^^^^^^^^^^ + +MySQL JDBC Connector is supporting upsert functionality using argument ++\--update-mode allowinsert+. To achieve that Sqoop is using MySQL clause INSERT INTO +... ON DUPLICATE KEY UPDATE. This clause do not allow user to specify which columns +should be used to distinct whether we should update existing row or add new row. Instead +this clause relies on table's unique keys (primary key belongs to this set). MySQL +will try to insert new row and if the insertion fails with duplicate unique key error +it will update appropriate row instead. As a result, Sqoop is ignoring values specified +in parameter +\--update-key+, however user needs to specify at least one valid column +to turn on update mode itself. + PostgreSQL Connector ~~~~~~~~~~~~~~~~~~~~~ diff --git a/src/java/org/apache/sqoop/manager/DirectMySQLManager.java b/src/java/org/apache/sqoop/manager/DirectMySQLManager.java index 2e8d63e5..c984a32f 100644 --- a/src/java/org/apache/sqoop/manager/DirectMySQLManager.java +++ b/src/java/org/apache/sqoop/manager/DirectMySQLManager.java @@ -104,6 +104,12 @@ public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context) exportJob.runExport(); } + public void upsertTable(com.cloudera.sqoop.manager.ExportJobContext context) + throws IOException, ExportException { + throw new ExportException("MySQL direct connector does not support upsert" + + " mode. Please use JDBC based connector (remove --direct parameter)"); + } + @Override public boolean supportsStagingForExport() { return false; diff --git a/src/java/org/apache/sqoop/manager/MySQLManager.java b/src/java/org/apache/sqoop/manager/MySQLManager.java index a817aa41..a3f586ac 100644 --- a/src/java/org/apache/sqoop/manager/MySQLManager.java +++ b/src/java/org/apache/sqoop/manager/MySQLManager.java @@ -34,6 +34,9 @@ import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.util.ImportException; +import com.cloudera.sqoop.util.ExportException; +import com.cloudera.sqoop.mapreduce.JdbcUpsertExportJob; +import org.apache.sqoop.mapreduce.mysql.MySQLUpsertOutputFormat; /** * Manages connections to MySQL databases. @@ -108,6 +111,39 @@ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context) super.importTable(context); } + /** + * {@inheritDoc} + */ + @Override + public void upsertTable(com.cloudera.sqoop.manager.ExportJobContext context) + throws IOException, ExportException { + context.setConnManager(this); + LOG.warn("MySQL Connector upsert functionality is using INSERT ON"); + LOG.warn("DUPLICATE KEY UPDATE clause that relies on table's unique key."); + LOG.warn("Insert/update distinction is therefore independent on column"); + LOG.warn("names specified in --update-key parameter. Please see MySQL"); + LOG.warn("documentation for additional limitations."); + + JdbcUpsertExportJob exportJob = + new JdbcUpsertExportJob(context, MySQLUpsertOutputFormat.class); + exportJob.runExport(); + } + + @Override + /** + * {@inheritDoc} + */ + public void configureDbOutputColumns(SqoopOptions options) { + // In case that we're running upsert, we do not want to change column order + // as we're actually going to use INSERT INTO ... ON DUPLICATE KEY UPDATE + // clause. + if (options.getUpdateMode() == SqoopOptions.UpdateMode.AllowInsert) { + return; + } + + super.configureDbOutputColumns(options); + } + /** * Set a flag to prevent printing the --direct warning twice. */ diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java index c8e17c23..21cb128b 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java @@ -121,7 +121,7 @@ protected void configureOutputFormat(Job job, String tableName, } if (updateKeys.size() == 0) { - throw new IOException("Unpdate key columns not valid in export job"); + throw new IOException("Update key columns not valid in export job"); } // Make sure we strip out the key column from this list. diff --git a/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java new file mode 100644 index 00000000..e6c758bc --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.mapreduce.mysql; + +import com.cloudera.sqoop.lib.SqoopRecord; +import com.cloudera.sqoop.mapreduce.UpdateOutputFormat; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.sql.SQLException; + +/** + * Output format for MySQL Update/insert functionality. We will use MySQL + * clause INSERT INTO ... ON DUPLICATE KEY UPDATE, for more info please + * see official MySQL documentation. + */ +public class MySQLUpsertOutputFormat + extends UpdateOutputFormat { + + private final Log log = + LogFactory.getLog(getClass()); + + @Override + /** {@inheritDoc} */ + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException { + try { + return new MySQLUpsertRecordWriter(context); + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * RecordWriter to write the output to UPDATE/INSERT statements. + */ + public class MySQLUpsertRecordWriter extends UpdateRecordWriter { + + public MySQLUpsertRecordWriter(TaskAttemptContext context) + throws ClassNotFoundException, SQLException { + super(context); + } + + /** + * {@inheritDoc} + */ + @Override + protected String getUpdateStatement() { + boolean first; + StringBuilder sb = new StringBuilder(); + sb.append("INSERT INTO "); + sb.append(tableName); + sb.append("("); + first = true; + for (String column : columnNames) { + if (first) { + first = false; + } else { + sb.append(", "); + } + sb.append(column); + } + + sb.append(") VALUES("); + first = true; + for (int i = 0; i < columnNames.length; i++) { + if (first) { + first = false; + } else { + sb.append(", "); + } + sb.append("?"); + } + + sb.append(") ON DUPLICATE KEY UPDATE "); + + first = true; + for (String column : columnNames) { + if (first) { + first = false; + } else { + sb.append(", "); + } + + sb.append(column).append("=VALUES(").append(column).append(")"); + } + + String query = sb.toString(); + log.debug("Using upsert query: " + query); + return query; + } + } +} diff --git a/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java b/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java index f00cac4e..f7cc297c 100644 --- a/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java +++ b/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java @@ -166,4 +166,27 @@ public String getType() { verifyExport(TOTAL_RECORDS); assertColMinAndMax(forIdx(0), gen); } + + public void testUpsert() throws IOException, SQLException { + final int TOTAL_RECORDS = 10; + + createTextFile(0, TOTAL_RECORDS, false); + createTable(); + + // Insert only + runExport(getArgv(true, 10, 10, "--update-key", "id", + "--update-mode", "allowinsert")); + verifyExport(TOTAL_RECORDS); + + // Update only + runExport(getArgv(true, 10, 10, "--update-key", "id", + "--update-mode", "allowinsert")); + verifyExport(TOTAL_RECORDS); + + // Insert & update + createTextFile(0, TOTAL_RECORDS * 2, false); + runExport(getArgv(true, 10, 10, "--update-key", "id", + "--update-mode", "allowinsert")); + verifyExport(TOTAL_RECORDS * 2); + } } diff --git a/src/test/com/cloudera/sqoop/manager/ManualMySQLTests.java b/src/test/com/cloudera/sqoop/manager/ManualMySQLTests.java new file mode 100644 index 00000000..4d06dd96 --- /dev/null +++ b/src/test/com/cloudera/sqoop/manager/ManualMySQLTests.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cloudera.sqoop.manager; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Manual test case with all MySQL related tests. + */ +public final class ManualMySQLTests extends TestCase { + + private ManualMySQLTests() { } + + public static Test suite() { + TestSuite suite = new TestSuite("All MySQL test cases"); + suite.addTestSuite(DirectMySQLTest.class); + suite.addTestSuite(DirectMySQLExportTest.class); + suite.addTestSuite(JdbcMySQLExportTest.class); + suite.addTestSuite(MySQLAuthTest.class); + suite.addTestSuite(MySQLCompatTest.class); + + return suite; + } + +}