5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 17:40:39 +08:00

SQOOP-1403 Upsert export for SQL Server

(Keegan Witt via Venkat Ranganathan)
This commit is contained in:
Venkat Ranganathan 2014-10-24 11:06:00 -07:00
parent 0d35911416
commit 94ade1d8c8
6 changed files with 259 additions and 2 deletions

View File

@ -119,7 +119,7 @@ jdbc:postgresql://localhost/
=== SQL Server
Install SQL Server Express 2008 R2 and create a database instance and
Install SQL Server Express 2012 and create a database instance and
download the appropriate JDBC driver. Instructions for configuring the
database can be found in SQLServerManagerImportManualTest.

View File

@ -28,6 +28,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.mapreduce.JdbcUpsertExportJob;
import org.apache.sqoop.mapreduce.SQLServerResilientExportOutputFormat;
import org.apache.sqoop.mapreduce.SQLServerResilientUpdateOutputFormat;
import org.apache.sqoop.mapreduce.db.SQLServerDBInputFormat;
@ -42,6 +43,7 @@
import org.apache.sqoop.cli.RelatedOptions;
import org.apache.sqoop.mapreduce.sqlserver.SqlServerExportBatchOutputFormat;
import org.apache.sqoop.mapreduce.sqlserver.SqlServerInputFormat;
import org.apache.sqoop.mapreduce.sqlserver.SqlServerUpsertOutputFormat;
/**
* Manages connections to SQLServer databases. Requires the SQLServer JDBC
@ -204,6 +206,39 @@ public void updateTable(
}
}
@Override
/**
* {@inheritDoc}
*/
public void upsertTable(com.cloudera.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
// Propagate table hints to job
Configuration configuration = context.getOptions().getConf();
if (tableHints != null) {
configuration.set(TABLE_HINTS_PROP, tableHints);
}
JdbcUpsertExportJob exportJob =
new JdbcUpsertExportJob(context, SqlServerUpsertOutputFormat.class);
exportJob.runExport();
}
@Override
/**
* {@inheritDoc}
*/
public void configureDbOutputColumns(SqoopOptions options) {
if (options.getUpdateMode() == SqoopOptions.UpdateMode.UpdateOnly) {
super.configureDbOutputColumns(options);
} else {
// We're in upsert mode. We need to explicitly set
// the database output column ordering in the codeGenerator.
options.setDbOutputColumns(getColumnNames(options.getTableName()));
}
}
/**
* SQLServer does not support the CURRENT_TIMESTAMP() function. Instead
* it has the notion of keyword CURRENT_TIMESTAMP that resolves to the

View File

@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.sqlserver;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.sqoop.manager.SQLServerManager;
import com.cloudera.sqoop.lib.SqoopRecord;
import com.cloudera.sqoop.mapreduce.UpdateOutputFormat;
/**
* Update an existing table with new value if the table already
* contains the row, or insert the data into the table if the table
* does not contain the row yet.
*/
public class SqlServerUpsertOutputFormat<K extends SqoopRecord, V>
extends UpdateOutputFormat<K, V> {
private static final Log LOG =
LogFactory.getLog(SqlServerUpsertOutputFormat.class);
@Override
/** {@inheritDoc} */
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException {
try {
return new SqlServerUpsertRecordWriter(context);
} catch (Exception e) {
throw new IOException(e);
}
}
/**
* RecordWriter to write the output to UPDATE/INSERT statements.
*/
public class SqlServerUpsertRecordWriter extends UpdateRecordWriter {
public SqlServerUpsertRecordWriter(TaskAttemptContext context)
throws ClassNotFoundException, SQLException {
super(context);
}
@Override
/**
* @return an UPDATE/INSERT statement that modifies/inserts a row
* depending on whether the row already exist in the table or not.
*/
protected String getUpdateStatement() {
boolean first;
List<String> updateKeyLookup = Arrays.asList(updateCols);
StringBuilder sb = new StringBuilder();
if (getConf().getBoolean(SQLServerManager.IDENTITY_INSERT_PROP, false)) {
LOG.info("Enabling identity inserts");
sb.append("SET IDENTITY_INSERT ").append(tableName).append(" ON ");
}
sb.append("MERGE INTO ").append(tableName).append(" AS _target");
sb.append(" USING ( VALUES ( ");
first = true;
for (String col : columnNames) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append("?");
}
sb.append(" ) )").append(" AS _source ( ");
first = true;
for (String col : columnNames) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append(col);
}
sb.append(" )");
sb.append(" ON ");
first = true;
for (String updateCol : updateCols) {
if (updateKeyLookup.contains(updateCol)) {
if (first) {
first = false;
} else {
sb.append(" AND ");
}
sb.append("_source.").append(updateCol).append(" = _target.").append(updateCol);
}
}
sb.append(" WHEN MATCHED THEN UPDATE SET ");
first = true;
for (String col : columnNames) {
if (!updateKeyLookup.contains(col)) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append("_target.").append(col).append(" = _source.").append(col);
}
}
sb.append(" WHEN NOT MATCHED THEN ").append("INSERT ( ");
first = true;
for (String col : columnNames) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append(col);
}
sb.append(" ) VALUES ( ");
first = true;
for (String col : columnNames) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append("_source.").append(col);
}
sb.append(" )");
String tableHints = getConf().get(org.apache.sqoop.manager.SQLServerManager.TABLE_HINTS_PROP);
if (tableHints != null) {
LOG.info("Using table hints for query hints: " + tableHints);
sb.append(" OPTION (").append(tableHints).append(")");
}
sb.append(";");
return sb.toString();
}
}
}

View File

@ -364,6 +364,21 @@ public void testSQLServerBinaryType() throws IOException, SQLException {
checkSQLBinaryTableContent(expectedContent, escapeObjectName(DBO_BINARY_TABLE_NAME), conn);
}
/** Make sure mixed update/insert export work correctly. */
public void testUpsertTextExport() throws IOException, SQLException {
createTestFile("inputFile", new String[] {
"2,Bob,400,sales",
"3,Fred,15,marketing",
});
// first time will be insert.
runExport(getArgv(SCH_TABLE_NAME, "--update-key", "id",
"--update-mode", "allowinsert"));
// second time will be update.
runExport(getArgv(SCH_TABLE_NAME, "--update-key", "id",
"--update-mode", "allowinsert"));
assertRowCount(2, escapeObjectName(SCH_TABLE_NAME), conn);
}
public static void checkSQLBinaryTableContent(String[] expected, String tableName, Connection connection){
Statement stmt = null;
ResultSet rs = null;

View File

@ -58,7 +58,7 @@
* into Apache's tree for licensing reasons).
*
* To set up your test environment:
* Install SQL Server Express 2008 R2
* Install SQL Server Express 2012
* Create a database SQOOPTEST
* Create a login SQOOPUSER with password PASSWORD and grant all
* access for SQOOPTEST to SQOOPUSER.

View File

@ -0,0 +1,44 @@
package org.apache.sqoop.mapreduce.sqlserver;
import static org.junit.Assert.assertEquals;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.sqoop.manager.SQLServerManager;
import org.apache.sqoop.mapreduce.ExportJobBase;
import org.apache.sqoop.mapreduce.sqlserver.SqlServerUpsertOutputFormat.SqlServerUpsertRecordWriter;
import org.junit.Test;
public class SqlServerUpsertOutputFormatTest {
@SuppressWarnings("unchecked")
@Test
public void Merge_statement_is_parameterized_correctly() throws Exception {
Configuration conf = new Configuration();
conf.set(DBConfiguration.DRIVER_CLASS_PROPERTY, org.hsqldb.jdbcDriver.class.getName());
conf.set(DBConfiguration.URL_PROPERTY, "jdbc:hsqldb:.");
conf.set(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY, "");
conf.set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, "");
String tableName = "#myTable";
String[] columnNames = { "FirstColumn", "SecondColumn", "ThirdColumn" };
String[] updateKeyColumns = { "FirstColumn" };
conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
conf.set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, StringUtils.join(columnNames, ','));
conf.set(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY, StringUtils.join(updateKeyColumns, ','));
conf.set(SQLServerManager.TABLE_HINTS_PROP, "NOLOCK");
conf.set(SQLServerManager.IDENTITY_INSERT_PROP, "true");
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
SqlServerUpsertOutputFormat outputFormat = new SqlServerUpsertOutputFormat();
SqlServerUpsertRecordWriter recordWriter = outputFormat.new SqlServerUpsertRecordWriter(context);
assertEquals("SET IDENTITY_INSERT #myTable ON " +
"MERGE INTO #myTable AS _target USING ( VALUES ( ?, ?, ? ) ) AS _source ( FirstColumn, SecondColumn, ThirdColumn ) ON _source.FirstColumn = _target.FirstColumn" +
" WHEN MATCHED THEN UPDATE SET _target.SecondColumn = _source.SecondColumn, _target.ThirdColumn = _source.ThirdColumn" +
" WHEN NOT MATCHED THEN INSERT ( FirstColumn, SecondColumn, ThirdColumn ) VALUES " +
"( _source.FirstColumn, _source.SecondColumn, _source.ThirdColumn ) " +
"OPTION (NOLOCK);", recordWriter.getUpdateStatement());
}
}