mirror of
https://github.com/apache/sqoop.git
synced 2025-05-03 05:01:10 +08:00
SQOOP-313: Support for multiple column names for update keys
(Arvind Prabhakar via Bilung Lee) git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1177481 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6a7144a496
commit
f4a4fbb304
@ -40,8 +40,9 @@ Export control options
|
||||
--table (table-name)::
|
||||
The table to read (required)
|
||||
|
||||
--update-key (col-name)::
|
||||
Anchor column to use for updates
|
||||
--update-key (col-names)::
|
||||
Anchor column to use for updates. More than one column names may be specified
|
||||
as a comma separated list of column names.
|
||||
|
||||
--update-mode (mode)::
|
||||
Specify how updates are performed when new rows are found with non-matching keys
|
||||
@ -92,7 +93,7 @@ See 'sqoop(1)'
|
||||
|
||||
////
|
||||
Copyright 2011 The Apache Software Foundation
|
||||
|
||||
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
@ -100,9 +101,9 @@ See 'sqoop(1)'
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
@ -1,7 +1,7 @@
|
||||
|
||||
////
|
||||
Copyright 2011 The Apache Software Foundation
|
||||
|
||||
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
@ -9,9 +9,9 @@
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
@ -54,7 +54,9 @@ Argument Description
|
||||
+-m,\--num-mappers <n>+ Use 'n' map tasks to export in\
|
||||
parallel
|
||||
+\--table <table-name>+ Table to populate
|
||||
+\--update-key <col-name>+ Anchor column to use for updates
|
||||
+\--update-key <col-name>+ Anchor column to use for updates.\
|
||||
Use a comma separated list of columns\
|
||||
if there are more than one column.
|
||||
+\--update-mode <mode>+ Specify how updates are performed\
|
||||
when new rows are found with\
|
||||
non-matching keys in database.
|
||||
@ -142,7 +144,7 @@ empty table intended to receive these results.
|
||||
If you specify the +\--update-key+ argument, Sqoop will instead modify
|
||||
an existing dataset in the database. Each input record is treated as
|
||||
an +UPDATE+ statement that modifies an existing row. The row a
|
||||
statement modifies is determined by the column name specified with
|
||||
statement modifies is determined by the column name(s) specified with
|
||||
+\--update-key+. For example, consider the following table
|
||||
definition:
|
||||
|
||||
@ -178,6 +180,10 @@ Likewise, if the column specified with +\--update-key+ does not
|
||||
uniquely identify rows and multiple rows are updated by a single
|
||||
statement, this condition is also undetected.
|
||||
|
||||
The argument +\--update-key+ can also be given a comma separated list of
|
||||
column names. In which case, Sqoop will match all keys from this list before
|
||||
updating any existing record.
|
||||
|
||||
Depending on the target database, you may also specify the +\--update-mode+
|
||||
argument with +allowinsert+ mode if you want to update rows if they exist
|
||||
in the database already or insert rows if they do not exist yet.
|
||||
|
@ -26,8 +26,12 @@
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -300,18 +304,30 @@ public void configureDbOutputColumns(SqoopOptions options) {
|
||||
// last, because the UPDATE-based OutputFormat will generate the SET
|
||||
// clause followed by the WHERE clause, and the SqoopRecord needs to
|
||||
// serialize to this layout.
|
||||
String updateKeyCol = options.getUpdateKeyCol();
|
||||
Set<String> updateKeys = new LinkedHashSet<String>();
|
||||
Set<String> updateKeysUppercase = new HashSet<String>();
|
||||
String updateKeyValue = options.getUpdateKeyCol();
|
||||
StringTokenizer stok = new StringTokenizer(updateKeyValue, ",");
|
||||
while (stok.hasMoreTokens()) {
|
||||
String nextUpdateColumn = stok.nextToken().trim();
|
||||
if (nextUpdateColumn.length() > 0) {
|
||||
updateKeys.add(nextUpdateColumn);
|
||||
updateKeysUppercase.add(nextUpdateColumn.toUpperCase());
|
||||
} else {
|
||||
throw new RuntimeException("Invalid update key column value specified"
|
||||
+ ": '" + updateKeyValue + "'");
|
||||
}
|
||||
}
|
||||
String [] allColNames = getColumnNames(options.getTableName());
|
||||
List<String> dbOutCols = new ArrayList<String>();
|
||||
String upperCaseKeyCol = updateKeyCol.toUpperCase();
|
||||
for (String col : allColNames) {
|
||||
if (!upperCaseKeyCol.equals(col.toUpperCase())) {
|
||||
if (!updateKeysUppercase.contains(col.toUpperCase())) {
|
||||
dbOutCols.add(col); // add non-key columns to the output order list.
|
||||
}
|
||||
}
|
||||
|
||||
// Then add the update key column last.
|
||||
dbOutCols.add(updateKeyCol);
|
||||
dbOutCols.addAll(updateKeys);
|
||||
options.setDbOutputColumns(dbOutCols.toArray(
|
||||
new String[dbOutCols.size()]));
|
||||
}
|
||||
|
@ -32,9 +32,13 @@
|
||||
import java.sql.Types;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -409,13 +413,26 @@ public void configureDbOutputColumns(SqoopOptions options) {
|
||||
} else {
|
||||
// We're in upsert mode. We need to explicitly set
|
||||
// the database output column ordering in the codeGenerator.
|
||||
String updateKeyCol = options.getUpdateKeyCol();
|
||||
Set<String> updateKeys = new LinkedHashSet<String>();
|
||||
Set<String> updateKeysUppercase = new HashSet<String>();
|
||||
String updateKeyValue = options.getUpdateKeyCol();
|
||||
StringTokenizer stok = new StringTokenizer(updateKeyValue, ",");
|
||||
while (stok.hasMoreTokens()) {
|
||||
String nextUpdateColumn = stok.nextToken().trim();
|
||||
if (nextUpdateColumn.length() > 0) {
|
||||
updateKeys.add(nextUpdateColumn);
|
||||
updateKeysUppercase.add(nextUpdateColumn.toUpperCase());
|
||||
} else {
|
||||
throw new RuntimeException("Invalid update key column value specified"
|
||||
+ ": '" + updateKeyValue + "'");
|
||||
}
|
||||
}
|
||||
|
||||
String [] allColNames = getColumnNames(options.getTableName());
|
||||
List<String> dbOutCols = new ArrayList<String>();
|
||||
dbOutCols.add(updateKeyCol);
|
||||
String upperCaseKeyCol = updateKeyCol.toUpperCase();
|
||||
dbOutCols.addAll(updateKeys);
|
||||
for (String col : allColNames) {
|
||||
if (!upperCaseKeyCol.equals(col.toUpperCase())) {
|
||||
if (!updateKeysUppercase.contains(col.toUpperCase())) {
|
||||
dbOutCols.add(col); // add update columns to the output order list.
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,6 @@
|
||||
/**
|
||||
* Copyright 2011 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -20,12 +20,6 @@
|
||||
|
||||
package com.cloudera.sqoop.mapreduce;
|
||||
|
||||
import com.cloudera.sqoop.manager.ConnManager;
|
||||
import com.cloudera.sqoop.manager.ExportJobContext;
|
||||
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
|
||||
import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
|
||||
import com.cloudera.sqoop.orm.ClassWriter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
@ -39,6 +33,11 @@
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
import org.apache.hadoop.mapreduce.OutputFormat;
|
||||
|
||||
import com.cloudera.sqoop.manager.ConnManager;
|
||||
import com.cloudera.sqoop.manager.ExportJobContext;
|
||||
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
|
||||
import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
|
||||
|
||||
/**
|
||||
* Run an export using JDBC (JDBC-based ExportOutputFormat).
|
||||
*/
|
||||
|
@ -21,19 +21,22 @@
|
||||
package com.cloudera.sqoop.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Set;
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.apache.hadoop.mapreduce.InputFormat;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
import org.apache.hadoop.mapreduce.OutputFormat;
|
||||
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
|
||||
import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
|
||||
|
||||
import com.cloudera.sqoop.manager.ConnManager;
|
||||
import com.cloudera.sqoop.manager.ExportJobContext;
|
||||
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
|
||||
import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
|
||||
|
||||
/**
|
||||
* Run an update-based export using JDBC (JDBC-based UpdateOutputFormat).
|
||||
@ -101,17 +104,34 @@ protected void configureOutputFormat(Job job, String tableName,
|
||||
"Export column names could not be determined for " + tableName);
|
||||
}
|
||||
|
||||
String updateKeyCol = options.getUpdateKeyCol();
|
||||
if (null == updateKeyCol) {
|
||||
String updateKeyColumns = options.getUpdateKeyCol();
|
||||
if (null == updateKeyColumns) {
|
||||
throw new IOException("Update key column not set in export job");
|
||||
}
|
||||
// Update key columns lookup and removal
|
||||
Set<String> updateKeys = new LinkedHashSet<String>();
|
||||
Set<String> updateKeysUppercase = new HashSet<String>();
|
||||
StringTokenizer stok = new StringTokenizer(updateKeyColumns, ",");
|
||||
while (stok.hasMoreTokens()) {
|
||||
String nextUpdateKey = stok.nextToken().trim();
|
||||
if (nextUpdateKey.length() > 0) {
|
||||
updateKeys.add(nextUpdateKey);
|
||||
updateKeysUppercase.add(nextUpdateKey.toUpperCase());
|
||||
} else {
|
||||
throw new RuntimeException("Invalid update key column value specified"
|
||||
+ ": '" + updateKeyColumns + "'");
|
||||
}
|
||||
}
|
||||
|
||||
if (updateKeys.size() == 0) {
|
||||
throw new IOException("Unpdate key columns not valid in export job");
|
||||
}
|
||||
|
||||
// Make sure we strip out the key column from this list.
|
||||
String [] outColNames = new String[colNames.length - 1];
|
||||
String [] outColNames = new String[colNames.length - updateKeys.size()];
|
||||
int j = 0;
|
||||
String upperCaseKeyCol = updateKeyCol.toUpperCase();
|
||||
for (int i = 0; i < colNames.length; i++) {
|
||||
if (!colNames[i].toUpperCase().equals(upperCaseKeyCol)) {
|
||||
if (!updateKeysUppercase.contains(colNames[i].toUpperCase())) {
|
||||
outColNames[j++] = colNames[i];
|
||||
}
|
||||
}
|
||||
@ -119,7 +139,7 @@ protected void configureOutputFormat(Job job, String tableName,
|
||||
|
||||
job.setOutputFormatClass(getOutputFormatClass());
|
||||
job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
|
||||
job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyCol);
|
||||
job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyColumns);
|
||||
} catch (ClassNotFoundException cnfe) {
|
||||
throw new IOException("Could not load OutputFormat", cnfe);
|
||||
}
|
||||
|
@ -21,17 +21,19 @@
|
||||
package com.cloudera.sqoop.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Set;
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.OutputFormat;
|
||||
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
|
||||
import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
|
||||
|
||||
import com.cloudera.sqoop.manager.ConnManager;
|
||||
import com.cloudera.sqoop.manager.ExportJobContext;
|
||||
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
|
||||
import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
|
||||
|
||||
/**
|
||||
* Run an update/insert export using JDBC (JDBC-based UpsertOutputFormat).
|
||||
@ -75,14 +77,30 @@ protected void configureOutputFormat(Job job, String tableName,
|
||||
}
|
||||
DBOutputFormat.setOutput(job, tableName, colNames);
|
||||
|
||||
String updateKeyCol = options.getUpdateKeyCol();
|
||||
if (null == updateKeyCol) {
|
||||
String updateKeyColumns = options.getUpdateKeyCol();
|
||||
if (null == updateKeyColumns) {
|
||||
throw new IOException("Update key column not set in export job");
|
||||
}
|
||||
// Update key columns lookup and removal
|
||||
Set<String> updateKeys = new LinkedHashSet<String>();
|
||||
StringTokenizer stok = new StringTokenizer(updateKeyColumns, ",");
|
||||
while (stok.hasMoreTokens()) {
|
||||
String nextUpdateKey = stok.nextToken().trim();
|
||||
if (nextUpdateKey.length() > 0) {
|
||||
updateKeys.add(nextUpdateKey);
|
||||
} else {
|
||||
throw new RuntimeException("Invalid update key column value specified"
|
||||
+ ": '" + updateKeyColumns + "'");
|
||||
}
|
||||
}
|
||||
|
||||
if (updateKeys.size() == 0) {
|
||||
throw new IOException("Unpdate key columns not valid in export job");
|
||||
}
|
||||
|
||||
job.setOutputFormatClass(getOutputFormatClass());
|
||||
job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
|
||||
job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyCol);
|
||||
job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyColumns);
|
||||
} catch (ClassNotFoundException cnfe) {
|
||||
throw new IOException("Could not load OutputFormat", cnfe);
|
||||
}
|
||||
|
@ -22,6 +22,8 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.SQLException;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -69,17 +71,31 @@ public OracleUpsertRecordWriter(TaskAttemptContext context)
|
||||
protected String getUpdateStatement() {
|
||||
boolean first;
|
||||
|
||||
// lookup table for update columns
|
||||
Set<String> updateKeyLookup = new LinkedHashSet<String>();
|
||||
for (String updateKey : updateCols) {
|
||||
updateKeyLookup.add(updateKey);
|
||||
}
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("MERGE INTO ");
|
||||
sb.append(tableName);
|
||||
sb.append(" USING dual ON ( ");
|
||||
sb.append(updateCol);
|
||||
sb.append(" = ? )");
|
||||
first = true;
|
||||
for (int i = 0; i < updateCols.length; i++) {
|
||||
if (first) {
|
||||
first = false;
|
||||
} else {
|
||||
sb.append(" AND ");
|
||||
}
|
||||
sb.append(updateCols[i]).append(" = ?");
|
||||
}
|
||||
sb.append(" )");
|
||||
|
||||
sb.append(" WHEN MATCHED THEN UPDATE SET ");
|
||||
first = true;
|
||||
for (String col : columnNames) {
|
||||
if (!col.equals(updateCol)) {
|
||||
if (!updateKeyLookup.contains(col)) {
|
||||
if (first) {
|
||||
first = false;
|
||||
} else {
|
||||
|
@ -25,7 +25,10 @@
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -33,9 +36,9 @@
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hadoop.mapreduce.RecordWriter;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
|
||||
|
||||
import com.cloudera.sqoop.lib.SqoopRecord;
|
||||
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
|
||||
|
||||
/**
|
||||
* Update an existing table of data with new value data.
|
||||
@ -90,7 +93,7 @@ public class UpdateRecordWriter extends AsyncSqlRecordWriter<K, V> {
|
||||
|
||||
protected String tableName;
|
||||
protected String [] columnNames; // The columns to update.
|
||||
protected String updateCol; // The column containing the fixed key.
|
||||
protected String [] updateCols; // The columns containing the fixed key.
|
||||
|
||||
public UpdateRecordWriter(TaskAttemptContext context)
|
||||
throws ClassNotFoundException, SQLException {
|
||||
@ -101,7 +104,22 @@ public UpdateRecordWriter(TaskAttemptContext context)
|
||||
DBConfiguration dbConf = new DBConfiguration(conf);
|
||||
this.tableName = dbConf.getOutputTableName();
|
||||
this.columnNames = dbConf.getOutputFieldNames();
|
||||
this.updateCol = conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY);
|
||||
String updateKeyColumns =
|
||||
conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY);
|
||||
|
||||
Set<String> updateKeys = new LinkedHashSet<String>();
|
||||
StringTokenizer stok = new StringTokenizer(updateKeyColumns, ",");
|
||||
while (stok.hasMoreTokens()) {
|
||||
String nextUpdateKey = stok.nextToken().trim();
|
||||
if (nextUpdateKey.length() > 0) {
|
||||
updateKeys.add(nextUpdateKey);
|
||||
} else {
|
||||
throw new RuntimeException("Invalid update key column value specified"
|
||||
+ ": '" + updateKeyColumns + "'");
|
||||
}
|
||||
}
|
||||
|
||||
updateCols = updateKeys.toArray(new String[updateKeys.size()]);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -132,8 +150,8 @@ protected final String getTableName() {
|
||||
/**
|
||||
* @return the column we are using to determine the row to update.
|
||||
*/
|
||||
protected final String getUpdateCol() {
|
||||
return updateCol;
|
||||
protected final String[] getUpdateColumns() {
|
||||
return updateCols;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -182,8 +200,15 @@ protected String getUpdateStatement() {
|
||||
}
|
||||
|
||||
sb.append(" WHERE ");
|
||||
sb.append(this.updateCol);
|
||||
sb.append("=?");
|
||||
first = true;
|
||||
for (int i = 0; i < updateCols.length; i++) {
|
||||
if (first) {
|
||||
first = false;
|
||||
} else {
|
||||
sb.append(" AND ");
|
||||
}
|
||||
sb.append(updateCols[i]).append("=?");
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
@ -51,7 +51,8 @@ public void testRoundtripQuery() throws IOException, SQLException {
|
||||
|
||||
runImport(getOutputArgvForQuery(true));
|
||||
deleteTableData();
|
||||
runExport(getExportArgvForQuery(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
|
||||
runExport(getExportArgvForQuery(true, 10, 10, newStrArray(argv, "-m",
|
||||
"" + 1)));
|
||||
|
||||
checkFirstColumnSum();
|
||||
}
|
||||
@ -104,7 +105,8 @@ protected String[] getOutputArgvForQuery(boolean includeHadoopFlags) {
|
||||
}
|
||||
|
||||
args.add("--query");
|
||||
args.add("select * from " + HsqldbTestServer.getTableName() + " where $CONDITIONS");
|
||||
args.add("select * from " + HsqldbTestServer.getTableName()
|
||||
+ " where $CONDITIONS");
|
||||
args.add("--connect");
|
||||
args.add(HsqldbTestServer.getUrl());
|
||||
args.add("--target-dir");
|
||||
|
@ -29,16 +29,14 @@
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
|
||||
import com.cloudera.sqoop.testutil.CommonArgs;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.cloudera.sqoop.testutil.CommonArgs;
|
||||
import com.cloudera.sqoop.testutil.ExportJobTestCase;
|
||||
|
||||
import org.junit.Before;
|
||||
|
||||
/**
|
||||
* Test that we can update a copy of data in the database,
|
||||
* based on newer data in HDFS.
|
||||
@ -84,6 +82,115 @@ private void populateDatabase(int numRows) throws SQLException {
|
||||
conn.commit();
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Creates a table with three columns - A INT, B INT and C VARCHAR(32).
|
||||
* This table is populated with records in a set of three with total records
|
||||
* with the total number of unique values of A equal to the specified aMax
|
||||
* value. For each value of A, there will be three records with value of
|
||||
* B ranging from 0-2, and a corresponding value of C.</p>
|
||||
* <p>For example if <tt>aMax = 2</tt>, the table will contain the
|
||||
* following records:
|
||||
* <pre>
|
||||
* A | B | C
|
||||
* ----------------------
|
||||
* 0 | 0 | 0foo0
|
||||
* 0 | 1 | 0foo1
|
||||
* 0 | 2 | 0foo2
|
||||
* 1 | 0 | 1foo0
|
||||
* 1 | 1 | 1foo1
|
||||
* 1 | 2 | 1foo2
|
||||
* </pre></p>
|
||||
* @param firstKeyRange the number of
|
||||
* @throws SQLException
|
||||
*/
|
||||
private void createMultiKeyTable(int aMax) throws SQLException {
|
||||
Connection conn = getConnection();
|
||||
|
||||
PreparedStatement statement = conn.prepareStatement(
|
||||
"CREATE TABLE " + getTableName()
|
||||
+ " (A INT NOT NULL, B INT NOT NULL, C VARCHAR(32))");
|
||||
try {
|
||||
statement.executeUpdate();
|
||||
conn.commit();
|
||||
} finally {
|
||||
statement.close();
|
||||
statement = null;
|
||||
}
|
||||
|
||||
try {
|
||||
for (int i = 0; i< aMax; i++) {
|
||||
for (int j = 0; j < 3; j++) {
|
||||
statement = conn.prepareStatement("INSERT INTO " + getTableName()
|
||||
+ " VALUES (" + i + ", " + j + ", '"
|
||||
+ i + "foo" + j + "')");
|
||||
statement.executeUpdate();
|
||||
statement.close();
|
||||
statement = null;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (null != statement) {
|
||||
statement.close();
|
||||
}
|
||||
}
|
||||
|
||||
conn.commit();
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Creates update files for multi-key update test. The total number of
|
||||
* update records will be number of files times the number of aKeysPerFile
|
||||
* times 3. Column A value will start with the specified <tt>startAtValue</tt>
|
||||
* and for each value there will be three records corresponding to Column
|
||||
* B values [0-2].</p>
|
||||
* @param numFiles number of files to create
|
||||
* @param aKeysPerFile number of records sets with different column A values
|
||||
* @param startAtValue the starting value of column A
|
||||
* @param bKeyValues the list of values for the column B
|
||||
* @throws IOException
|
||||
*/
|
||||
private void createMultiKeyUpdateFiles(int numFiles, int aKeysPerFile,
|
||||
int startAtValue, int[] bKeyValues)
|
||||
throws IOException {
|
||||
Configuration conf = getConf();
|
||||
if (!isOnPhysicalCluster()) {
|
||||
conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
|
||||
}
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
|
||||
int aValue = startAtValue;
|
||||
for (int i = 0; i < numFiles; i++) {
|
||||
OutputStream os = fs.create(new Path(getTablePath(), "" + i + ".txt"));
|
||||
BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
|
||||
|
||||
for (int j = 0; j < aKeysPerFile; j++) {
|
||||
for (int k = 0; k < bKeyValues.length; k++) {
|
||||
w.write(getUpdateStringForMultiKeyRow(aValue, bKeyValues[k]));
|
||||
}
|
||||
aValue++;
|
||||
}
|
||||
|
||||
w.close();
|
||||
os.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a string of text representing an update for one row
|
||||
* of the multi-key table. The values of columns A and B are given
|
||||
* and the value of column C is generated as <em>a</em>bar<em>b</em>.
|
||||
* @param a the value of column a
|
||||
* @param b the value of column b
|
||||
*/
|
||||
private String getUpdateStringForMultiKeyRow(int a, int b) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(a).append("\t").append(b).append("\t").append(a);
|
||||
sb.append("bar").append(b).append("\n");
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create a set of files that will be used as the input to the update
|
||||
* process.
|
||||
@ -196,6 +303,60 @@ private void verifyRowCount(int expectedCount) throws SQLException {
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyMultiKeyRow(String[] keyColumnNames, int[] keyValues,
|
||||
Object ...expectedVals) throws SQLException {
|
||||
StringBuilder querySb = new StringBuilder("SELECT A, B, C FROM ");
|
||||
querySb.append(getTableName()).append(" WHERE ");
|
||||
boolean first = true;
|
||||
for (int i = 0; i< keyColumnNames.length; i++) {
|
||||
if (first) {
|
||||
first = false;
|
||||
} else {
|
||||
querySb.append(" AND ");
|
||||
}
|
||||
querySb.append(keyColumnNames[i]).append(" = ");
|
||||
querySb.append(keyValues[i]);
|
||||
}
|
||||
|
||||
String query = querySb.toString();
|
||||
PreparedStatement statement = null;
|
||||
ResultSet rs = null;
|
||||
|
||||
try {
|
||||
Connection conn = getConnection();
|
||||
statement = conn.prepareStatement(query);
|
||||
rs = statement.executeQuery();
|
||||
|
||||
boolean success = rs.next();
|
||||
assertTrue("Expected at least one output record", success);
|
||||
|
||||
// Assert that all three columns have the correct values.
|
||||
for (int i = 0; i < expectedVals.length; i++) {
|
||||
String expected = expectedVals[i].toString();
|
||||
String result = rs.getString(i + 1);
|
||||
assertEquals("Invalid response for column " + i + "; got " + result
|
||||
+ " when expected " + expected, expected, result);
|
||||
}
|
||||
|
||||
// This query should have returned exactly one row.
|
||||
success = rs.next();
|
||||
assertFalse("Expected no more than one output record", success);
|
||||
} finally {
|
||||
if (null != rs) {
|
||||
try {
|
||||
rs.close();
|
||||
} catch (SQLException sqle) {
|
||||
LOG.error("Error closing result set: "
|
||||
+ StringUtils.stringifyException(sqle));
|
||||
}
|
||||
}
|
||||
|
||||
if (null != statement) {
|
||||
statement.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that a particular row has the expected values.
|
||||
*/
|
||||
@ -260,6 +421,130 @@ public void testBasicUpdate() throws Exception {
|
||||
verifyRow("A", "9", "9", "foo18", "18");
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a table with two columns that together act as unique keys
|
||||
* and then modifies a subset of the rows via update.
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testMultiKeyUpdate() throws Exception {
|
||||
createMultiKeyTable(3);
|
||||
|
||||
createMultiKeyUpdateFiles(1, 1, 1, new int[] {0, 1, 3});
|
||||
|
||||
runExport(getArgv(true, 2, 2, "-m", "1",
|
||||
"--update-key", "A,B"));
|
||||
verifyRowCount(9);
|
||||
// Check a few rows...
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 0, 0 }, 0, 0, "0foo0");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 0, 1 }, 0, 1, "0foo1");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 0, 2 }, 0, 2, "0foo2");
|
||||
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 1, 0 }, 1, 0, "1bar0");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 1, 1 }, 1, 1, "1bar1");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 1, 2 }, 1, 2, "1foo2");
|
||||
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 2, 0 }, 2, 0, "2foo0");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 2, 1 }, 2, 1, "2foo1");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 2, 2 }, 2, 2, "2foo2");
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a table with two columns that together act as unique keys
|
||||
* and then modifies a subset of the rows via update.
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testMultiKeyUpdateMultipleFilesNoUpdate() throws Exception {
|
||||
createMultiKeyTable(4);
|
||||
|
||||
createMultiKeyUpdateFiles(2, 1, 1, new int[] {3, 4, 5});
|
||||
|
||||
runExport(getArgv(true, 2, 2, "-m", "1",
|
||||
"--update-key", "A,B"));
|
||||
verifyRowCount(12);
|
||||
// Check a few rows...
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 0, 0 }, 0, 0, "0foo0");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 0, 1 }, 0, 1, "0foo1");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 0, 2 }, 0, 2, "0foo2");
|
||||
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 1, 0 }, 1, 0, "1foo0");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 1, 1 }, 1, 1, "1foo1");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 1, 2 }, 1, 2, "1foo2");
|
||||
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 2, 0 }, 2, 0, "2foo0");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 2, 1 }, 2, 1, "2foo1");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 2, 2 }, 2, 2, "2foo2");
|
||||
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 3, 0 }, 3, 0, "3foo0");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 3, 1 }, 3, 1, "3foo1");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 3, 2 }, 3, 2, "3foo2");
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a table with two columns that together act as unique keys
|
||||
* and then modifies a subset of the rows via update.
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testMultiKeyUpdateMultipleFilesFullUpdate() throws Exception {
|
||||
createMultiKeyTable(4);
|
||||
|
||||
createMultiKeyUpdateFiles(2, 2, 0, new int[] {0, 1, 2});
|
||||
|
||||
runExport(getArgv(true, 2, 2, "-m", "1",
|
||||
"--update-key", "A,B"));
|
||||
verifyRowCount(12);
|
||||
// Check a few rows...
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 0, 0 }, 0, 0, "0bar0");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 0, 1 }, 0, 1, "0bar1");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 0, 2 }, 0, 2, "0bar2");
|
||||
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 1, 0 }, 1, 0, "1bar0");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 1, 1 }, 1, 1, "1bar1");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 1, 2 }, 1, 2, "1bar2");
|
||||
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 2, 0 }, 2, 0, "2bar0");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 2, 1 }, 2, 1, "2bar1");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 2, 2 }, 2, 2, "2bar2");
|
||||
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 3, 0 }, 3, 0, "3bar0");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 3, 1 }, 3, 1, "3bar1");
|
||||
verifyMultiKeyRow(new String[] { "A", "B"},
|
||||
new int[] { 3, 2 }, 3, 2, "3bar2");
|
||||
}
|
||||
|
||||
|
||||
public void testEmptyTable() throws Exception {
|
||||
// Test that an empty table will "accept" updates that modify
|
||||
// no rows; no new data is injected into the database.
|
||||
|
@ -135,7 +135,7 @@ protected HsqldbTestServer getTestServer() {
|
||||
protected ConnManager getManager() {
|
||||
return manager;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* @return a connection to the database under test.
|
||||
|
@ -45,7 +45,7 @@ public abstract class ExportJobTestCase extends BaseSqoopTestCase {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
ExportJobTestCase.class.getName());
|
||||
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
// start the server
|
||||
|
Loading…
Reference in New Issue
Block a user