From 94ade1d8c8ed02ea42e18d5e3aad99e85fba5b2b Mon Sep 17 00:00:00 2001 From: Venkat Ranganathan Date: Fri, 24 Oct 2014 11:06:00 -0700 Subject: [PATCH] SQOOP-1403 Upsert export for SQL Server (Keegan Witt via Venkat Ranganathan) --- COMPILING.txt | 2 +- .../sqoop/manager/SQLServerManager.java | 35 ++++ .../SqlServerUpsertOutputFormat.java | 163 ++++++++++++++++++ .../SQLServerManagerExportManualTest.java | 15 ++ .../SQLServerManagerImportManualTest.java | 2 +- .../SqlServerUpsertOutputFormatTest.java | 44 +++++ 6 files changed, 259 insertions(+), 2 deletions(-) create mode 100644 src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormat.java create mode 100644 src/test/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormatTest.java diff --git a/COMPILING.txt b/COMPILING.txt index 138dc0ad..eb69b220 100644 --- a/COMPILING.txt +++ b/COMPILING.txt @@ -119,7 +119,7 @@ jdbc:postgresql://localhost/ === SQL Server -Install SQL Server Express 2008 R2 and create a database instance and +Install SQL Server Express 2012 and create a database instance and download the appropriate JDBC driver. Instructions for configuring the database can be found in SQLServerManagerImportManualTest. diff --git a/src/java/org/apache/sqoop/manager/SQLServerManager.java b/src/java/org/apache/sqoop/manager/SQLServerManager.java index fdd4e919..60d380e1 100644 --- a/src/java/org/apache/sqoop/manager/SQLServerManager.java +++ b/src/java/org/apache/sqoop/manager/SQLServerManager.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.mapreduce.JdbcUpsertExportJob; import org.apache.sqoop.mapreduce.SQLServerResilientExportOutputFormat; import org.apache.sqoop.mapreduce.SQLServerResilientUpdateOutputFormat; import org.apache.sqoop.mapreduce.db.SQLServerDBInputFormat; @@ -42,6 +43,7 @@ import org.apache.sqoop.cli.RelatedOptions; import org.apache.sqoop.mapreduce.sqlserver.SqlServerExportBatchOutputFormat; import org.apache.sqoop.mapreduce.sqlserver.SqlServerInputFormat; +import org.apache.sqoop.mapreduce.sqlserver.SqlServerUpsertOutputFormat; /** * Manages connections to SQLServer databases. Requires the SQLServer JDBC @@ -204,6 +206,39 @@ public void updateTable( } } + @Override + /** + * {@inheritDoc} + */ + public void upsertTable(com.cloudera.sqoop.manager.ExportJobContext context) + throws IOException, ExportException { + context.setConnManager(this); + + // Propagate table hints to job + Configuration configuration = context.getOptions().getConf(); + if (tableHints != null) { + configuration.set(TABLE_HINTS_PROP, tableHints); + } + + JdbcUpsertExportJob exportJob = + new JdbcUpsertExportJob(context, SqlServerUpsertOutputFormat.class); + exportJob.runExport(); + } + + @Override + /** + * {@inheritDoc} + */ + public void configureDbOutputColumns(SqoopOptions options) { + if (options.getUpdateMode() == SqoopOptions.UpdateMode.UpdateOnly) { + super.configureDbOutputColumns(options); + } else { + // We're in upsert mode. We need to explicitly set + // the database output column ordering in the codeGenerator. + options.setDbOutputColumns(getColumnNames(options.getTableName())); + } + } + /** * SQLServer does not support the CURRENT_TIMESTAMP() function. Instead * it has the notion of keyword CURRENT_TIMESTAMP that resolves to the diff --git a/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormat.java new file mode 100644 index 00000000..0cb2c78b --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormat.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.sqlserver; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.sqoop.manager.SQLServerManager; + +import com.cloudera.sqoop.lib.SqoopRecord; +import com.cloudera.sqoop.mapreduce.UpdateOutputFormat; + +/** + * Update an existing table with new value if the table already + * contains the row, or insert the data into the table if the table + * does not contain the row yet. + */ +public class SqlServerUpsertOutputFormat + extends UpdateOutputFormat { + + private static final Log LOG = + LogFactory.getLog(SqlServerUpsertOutputFormat.class); + + @Override + /** {@inheritDoc} */ + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException { + try { + return new SqlServerUpsertRecordWriter(context); + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * RecordWriter to write the output to UPDATE/INSERT statements. + */ + public class SqlServerUpsertRecordWriter extends UpdateRecordWriter { + + public SqlServerUpsertRecordWriter(TaskAttemptContext context) + throws ClassNotFoundException, SQLException { + super(context); + } + + @Override + /** + * @return an UPDATE/INSERT statement that modifies/inserts a row + * depending on whether the row already exist in the table or not. + */ + protected String getUpdateStatement() { + boolean first; + List updateKeyLookup = Arrays.asList(updateCols); + StringBuilder sb = new StringBuilder(); + + if (getConf().getBoolean(SQLServerManager.IDENTITY_INSERT_PROP, false)) { + LOG.info("Enabling identity inserts"); + sb.append("SET IDENTITY_INSERT ").append(tableName).append(" ON "); + } + + sb.append("MERGE INTO ").append(tableName).append(" AS _target"); + sb.append(" USING ( VALUES ( "); + first = true; + for (String col : columnNames) { + if (first) { + first = false; + } else { + sb.append(", "); + } + sb.append("?"); + } + sb.append(" ) )").append(" AS _source ( "); + first = true; + for (String col : columnNames) { + if (first) { + first = false; + } else { + sb.append(", "); + } + sb.append(col); + } + sb.append(" )"); + + sb.append(" ON "); + first = true; + for (String updateCol : updateCols) { + if (updateKeyLookup.contains(updateCol)) { + if (first) { + first = false; + } else { + sb.append(" AND "); + } + sb.append("_source.").append(updateCol).append(" = _target.").append(updateCol); + } + } + + sb.append(" WHEN MATCHED THEN UPDATE SET "); + first = true; + for (String col : columnNames) { + if (!updateKeyLookup.contains(col)) { + if (first) { + first = false; + } else { + sb.append(", "); + } + sb.append("_target.").append(col).append(" = _source.").append(col); + } + } + + sb.append(" WHEN NOT MATCHED THEN ").append("INSERT ( "); + first = true; + for (String col : columnNames) { + if (first) { + first = false; + } else { + sb.append(", "); + } + sb.append(col); + } + sb.append(" ) VALUES ( "); + first = true; + for (String col : columnNames) { + if (first) { + first = false; + } else { + sb.append(", "); + } + sb.append("_source.").append(col); + } + sb.append(" )"); + + String tableHints = getConf().get(org.apache.sqoop.manager.SQLServerManager.TABLE_HINTS_PROP); + if (tableHints != null) { + LOG.info("Using table hints for query hints: " + tableHints); + sb.append(" OPTION (").append(tableHints).append(")"); + } + + sb.append(";"); + + return sb.toString(); + } + } +} diff --git a/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java b/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java index 1d4534b5..5f934c3a 100644 --- a/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java +++ b/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java @@ -364,6 +364,21 @@ public void testSQLServerBinaryType() throws IOException, SQLException { checkSQLBinaryTableContent(expectedContent, escapeObjectName(DBO_BINARY_TABLE_NAME), conn); } + /** Make sure mixed update/insert export work correctly. */ + public void testUpsertTextExport() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,400,sales", + "3,Fred,15,marketing", + }); + // first time will be insert. + runExport(getArgv(SCH_TABLE_NAME, "--update-key", "id", + "--update-mode", "allowinsert")); + // second time will be update. + runExport(getArgv(SCH_TABLE_NAME, "--update-key", "id", + "--update-mode", "allowinsert")); + assertRowCount(2, escapeObjectName(SCH_TABLE_NAME), conn); + } + public static void checkSQLBinaryTableContent(String[] expected, String tableName, Connection connection){ Statement stmt = null; ResultSet rs = null; diff --git a/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java b/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java index 27860c20..09f1e6b3 100644 --- a/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java +++ b/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java @@ -58,7 +58,7 @@ * into Apache's tree for licensing reasons). * * To set up your test environment: - * Install SQL Server Express 2008 R2 + * Install SQL Server Express 2012 * Create a database SQOOPTEST * Create a login SQOOPUSER with password PASSWORD and grant all * access for SQOOPTEST to SQOOPUSER. diff --git a/src/test/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormatTest.java b/src/test/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormatTest.java new file mode 100644 index 00000000..b8c4538e --- /dev/null +++ b/src/test/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormatTest.java @@ -0,0 +1,44 @@ +package org.apache.sqoop.mapreduce.sqlserver; + +import static org.junit.Assert.assertEquals; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.lib.db.DBConfiguration; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.sqoop.manager.SQLServerManager; +import org.apache.sqoop.mapreduce.ExportJobBase; +import org.apache.sqoop.mapreduce.sqlserver.SqlServerUpsertOutputFormat.SqlServerUpsertRecordWriter; +import org.junit.Test; + +public class SqlServerUpsertOutputFormatTest { + + @SuppressWarnings("unchecked") + @Test + public void Merge_statement_is_parameterized_correctly() throws Exception { + Configuration conf = new Configuration(); + conf.set(DBConfiguration.DRIVER_CLASS_PROPERTY, org.hsqldb.jdbcDriver.class.getName()); + conf.set(DBConfiguration.URL_PROPERTY, "jdbc:hsqldb:."); + conf.set(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY, ""); + conf.set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, ""); + String tableName = "#myTable"; + String[] columnNames = { "FirstColumn", "SecondColumn", "ThirdColumn" }; + String[] updateKeyColumns = { "FirstColumn" }; + conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName); + conf.set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, StringUtils.join(columnNames, ',')); + conf.set(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY, StringUtils.join(updateKeyColumns, ',')); + conf.set(SQLServerManager.TABLE_HINTS_PROP, "NOLOCK"); + conf.set(SQLServerManager.IDENTITY_INSERT_PROP, "true"); + TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); + SqlServerUpsertOutputFormat outputFormat = new SqlServerUpsertOutputFormat(); + SqlServerUpsertRecordWriter recordWriter = outputFormat.new SqlServerUpsertRecordWriter(context); + assertEquals("SET IDENTITY_INSERT #myTable ON " + + "MERGE INTO #myTable AS _target USING ( VALUES ( ?, ?, ? ) ) AS _source ( FirstColumn, SecondColumn, ThirdColumn ) ON _source.FirstColumn = _target.FirstColumn" + + " WHEN MATCHED THEN UPDATE SET _target.SecondColumn = _source.SecondColumn, _target.ThirdColumn = _source.ThirdColumn" + + " WHEN NOT MATCHED THEN INSERT ( FirstColumn, SecondColumn, ThirdColumn ) VALUES " + + "( _source.FirstColumn, _source.SecondColumn, _source.ThirdColumn ) " + + "OPTION (NOLOCK);", recordWriter.getUpdateStatement()); + } +}