From dc4a82102ca54c37482852f1a31e365885d3539e Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Wed, 28 Nov 2012 14:15:08 -0800 Subject: [PATCH] SQOOP-724 Support Table hints in Microsoft SQL Server (Jarek Jarcec Cecho via Cheolsoo Park) --- src/docs/user/connectors.txt | 45 ++++++- .../sqoop/manager/SQLServerManager.java | 57 +++++++- .../SqlServerExportBatchOutputFormat.java | 111 ++++++++++++++++ .../sqlserver/SqlServerInputFormat.java | 54 ++++++++ .../sqlserver/SqlServerRecordReader.java | 122 ++++++++++++++++++ .../SQLServerManagerExportManualTest.java | 26 ++++ .../SQLServerManagerImportManualTest.java | 24 ++++ 7 files changed, 435 insertions(+), 4 deletions(-) create mode 100644 src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerExportBatchOutputFormat.java create mode 100644 src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerInputFormat.java create mode 100644 src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerRecordReader.java diff --git a/src/docs/user/connectors.txt b/src/docs/user/connectors.txt index d912840a..ff424f74 100644 --- a/src/docs/user/connectors.txt +++ b/src/docs/user/connectors.txt @@ -39,14 +39,55 @@ it will update appropriate row instead. As a result, Sqoop is ignoring values sp in parameter +\--update-key+, however user needs to specify at least one valid column to turn on update mode itself. +Microsoft SQL Connector +~~~~~~~~~~~~~~~~~~~~~~~ + +Extra arguments +^^^^^^^^^^^^^^^ + +List of all extra arguments supported by Microsoft SQL Connector is shown below: + +.Supported Microsoft SQL Connector extra arguments: +[grid="all"] +`----------------------------------------`--------------------------------------- +Argument Description +--------------------------------------------------------------------------------- ++\--schema + Scheme name that sqoop should use. \ + Default is "dbo". ++\--table-hints + Table hints that Sqoop should use for \ + data movement. +--------------------------------------------------------------------------------- + +Schema support +^^^^^^^^^^^^^^ +If you need to work with tables that are located in non-default schemas, you can +specify schema names via the +\--schema+ argument. Custom schemas are supported for +both import and export jobs. For example: + +---- +$ sqoop import ... --table custom_table -- --schema custom_schema +---- + +Table hints +^^^^^^^^^^^ + +Sqoop supports table hints in both import and export jobs. Table hints are used only +for queries that move data from/to Microsoft SQL Server, but they cannot be used for +meta data queries. You can specify a comma-separated list of table hints in the ++\--table-hints+ argument. For example: + +---- +$ sqoop import ... --table custom_table -- --table-hints NOLOCK +---- + + PostgreSQL Connector ~~~~~~~~~~~~~~~~~~~~~ Extra arguments ^^^^^^^^^^^^^^^ -List of all extra arguments supported by PostgreSQL Connector is shown on table -below: +List of all extra arguments supported by PostgreSQL Connector is shown below: .Supported PostgreSQL extra arguments: [grid="all"] diff --git a/src/java/org/apache/sqoop/manager/SQLServerManager.java b/src/java/org/apache/sqoop/manager/SQLServerManager.java index 51f86795..0c395992 100644 --- a/src/java/org/apache/sqoop/manager/SQLServerManager.java +++ b/src/java/org/apache/sqoop/manager/SQLServerManager.java @@ -29,10 +29,13 @@ import org.apache.commons.logging.LogFactory; import com.cloudera.sqoop.SqoopOptions; -import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat; import com.cloudera.sqoop.mapreduce.JdbcExportJob; import com.cloudera.sqoop.util.ExportException; +import com.cloudera.sqoop.util.ImportException; +import org.apache.hadoop.conf.Configuration; import org.apache.sqoop.cli.RelatedOptions; +import org.apache.sqoop.mapreduce.sqlserver.SqlServerExportBatchOutputFormat; +import org.apache.sqoop.mapreduce.sqlserver.SqlServerInputFormat; /** * Manages connections to SQLServer databases. Requires the SQLServer JDBC @@ -42,6 +45,9 @@ public class SQLServerManager extends com.cloudera.sqoop.manager.InformationSchemaManager { public static final String SCHEMA = "schema"; + public static final String TABLE_HINTS = "table-hints"; + public static final String TABLE_HINTS_PROP + = "org.apache.sqoop.manager.sqlserver.table.hints"; public static final Log LOG = LogFactory.getLog( SQLServerManager.class.getName()); @@ -55,6 +61,11 @@ public class SQLServerManager */ private String schema; + /** + * Optional table hints to use. + */ + private String tableHints; + public SQLServerManager(final SqoopOptions opts) { super(DRIVER_CLASS, opts); @@ -66,6 +77,28 @@ public SQLServerManager(final SqoopOptions opts) { } } + + /** + * {@inheritDoc} + */ + @Override + public void importTable( + com.cloudera.sqoop.manager.ImportJobContext context) + throws IOException, ImportException { + // We're the correct connection manager + context.setConnManager(this); + + // Propagate table hints to job + Configuration configuration = context.getOptions().getConf(); + if (tableHints != null) { + configuration.set(TABLE_HINTS_PROP, tableHints); + } + + // Set our own input format + context.setInputFormat(SqlServerInputFormat.class); + super.importTable(context); + } + /** * Export data stored in HDFS into a table in a database. */ @@ -73,8 +106,15 @@ public SQLServerManager(final SqoopOptions opts) { public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context) throws IOException, ExportException { context.setConnManager(this); + + // Propagate table hints to job + Configuration configuration = context.getOptions().getConf(); + if (tableHints != null) { + configuration.set(TABLE_HINTS_PROP, tableHints); + } + JdbcExportJob exportJob = new JdbcExportJob(context, null, null, - ExportBatchOutputFormat.class); + SqlServerExportBatchOutputFormat.class); exportJob.runExport(); } @@ -154,6 +194,15 @@ void parseExtraArgs(String[] args) throws ParseException { this.schema = schemaName; } + + // Apply table hints + if (cmdLine.hasOption(TABLE_HINTS)) { + String hints = cmdLine.getOptionValue(TABLE_HINTS); + LOG.info("Sqoop will use following table hints for data transfer: " + + hints); + + this.tableHints = hints; + } } /** @@ -171,6 +220,10 @@ private RelatedOptions getExtraOptions() { .withDescription("Optional schema name") .withLongOpt(SCHEMA).create()); + extraOptions.addOption(OptionBuilder.withArgName("string").hasArg() + .withDescription("Optional table hints to use") + .withLongOpt(TABLE_HINTS).create()); + return extraOptions; } } diff --git a/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerExportBatchOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerExportBatchOutputFormat.java new file mode 100644 index 00000000..f47d475e --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerExportBatchOutputFormat.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.mapreduce.sqlserver; + +import com.cloudera.sqoop.lib.SqoopRecord; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.sqoop.manager.SQLServerManager; +import org.apache.sqoop.mapreduce.ExportBatchOutputFormat; + +import java.io.IOException; +import java.sql.SQLException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Output format specific for Microsoft SQL Connector. + */ +public class SqlServerExportBatchOutputFormat + extends ExportBatchOutputFormat { + + private static final Log LOG = + LogFactory.getLog(SqlServerExportBatchOutputFormat.class); + + /** {@inheritDoc} */ + @Override + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException { + try { + return new SqlServerExportBatchRecordWriter(context); + } catch (Exception e) { + throw new IOException(e); + } + } + + /** {@inheritDoc}. */ + public class SqlServerExportBatchRecordWriter extends ExportBatchRecordWriter{ + + public SqlServerExportBatchRecordWriter(TaskAttemptContext context) + throws ClassNotFoundException, SQLException { + super(context); + } + + /** {@inheritDoc} */ + @Override + protected String getInsertStatement(int numRows) { + StringBuilder sb = new StringBuilder(); + + sb.append("INSERT INTO " + tableName + " "); + + String tableHints = getConf().get(SQLServerManager.TABLE_HINTS_PROP); + if (tableHints != null) { + LOG.info("Using table hints: " + tableHints); + sb.append(" WITH (").append(tableHints).append(") "); + } + + int numSlots; + if (this.columnNames != null) { + numSlots = this.columnNames.length; + + sb.append("("); + boolean first = true; + for (String col : columnNames) { + if (!first) { + sb.append(", "); + } + + sb.append(col); + first = false; + } + + sb.append(") "); + } else { + numSlots = this.columnCount; // set if columnNames is null. + } + + sb.append("VALUES "); + + // generates the (?, ?, ?...). + sb.append("("); + for (int i = 0; i < numSlots; i++) { + if (i != 0) { + sb.append(", "); + } + + sb.append("?"); + } + sb.append(")"); + + String query = sb.toString(); + LOG.info("Using query " + query); + + return query; + } + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerInputFormat.java b/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerInputFormat.java new file mode 100644 index 00000000..9996d1ba --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerInputFormat.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.mapreduce.sqlserver; + +import com.cloudera.sqoop.mapreduce.db.DBConfiguration; +import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.sqoop.mapreduce.DBWritable; + +import java.io.IOException; +import java.sql.SQLException; + +/** + * Input format specific for Microsoft SQL Server. + */ +public class SqlServerInputFormat + extends DataDrivenDBInputFormat { + + /** {@inheritDoc} */ + @Override + protected RecordReader createDBRecordReader( + DBInputSplit split, Configuration conf) throws IOException { + + DBConfiguration dbConf = getDBConf(); + @SuppressWarnings("unchecked") + Class inputClass = (Class) (dbConf.getInputClass()); + + try { + // Use Microsoft SQL Server specific db reader + return new SqlServerRecordReader(split, inputClass, + conf, getConnection(), dbConf, dbConf.getInputConditions(), + dbConf.getInputFieldNames(), dbConf.getInputTableName()); + } catch (SQLException ex) { + throw new IOException(ex); + } + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerRecordReader.java b/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerRecordReader.java new file mode 100644 index 00000000..2c08f122 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerRecordReader.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.mapreduce.sqlserver; + +import com.cloudera.sqoop.mapreduce.db.DBConfiguration; +import com.cloudera.sqoop.mapreduce.db.DBInputFormat; +import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat; +import com.cloudera.sqoop.mapreduce.db.DataDrivenDBRecordReader; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.manager.SQLServerManager; +import org.apache.sqoop.mapreduce.DBWritable; + +import java.sql.Connection; +import java.sql.SQLException; + +/** + * Microsoft SQL Server specific Record Reader. + */ +public class SqlServerRecordReader + extends DataDrivenDBRecordReader { + + private static final Log LOG = + LogFactory.getLog(SqlServerRecordReader.class); + + // CHECKSTYLE:OFF + public SqlServerRecordReader(DBInputFormat.DBInputSplit split, + Class inputClass, Configuration conf, Connection conn, + DBConfiguration dbConfig, String cond, String [] fields, + String table) throws SQLException { + + super(split, inputClass, conf, conn, dbConfig, cond, fields, table, + "MICROSOFT SQL SERVER"); + } + // CHECKSTYLE:ON + + + /** + * {@inheritDoc} + */ + @Override + protected String getSelectQuery() { + StringBuilder query = new StringBuilder(); + + DataDrivenDBInputFormat.DataDrivenDBInputSplit dataSplit = + (DataDrivenDBInputFormat.DataDrivenDBInputSplit) getSplit(); + + DBConfiguration dbConf = getDBConf(); + String [] fieldNames = getFieldNames(); + String tableName = getTableName(); + String conditions = getConditions(); + + // Build the WHERE clauses associated with the data split first. + // We need them in both branches of this function. + StringBuilder conditionClauses = new StringBuilder(); + conditionClauses.append("( ").append(dataSplit.getLowerClause()); + conditionClauses.append(" ) AND ( ").append(dataSplit.getUpperClause()); + conditionClauses.append(" )"); + + if (dbConf.getInputQuery() == null) { + // We need to generate the entire query. + query.append("SELECT "); + + for (int i = 0; i < fieldNames.length; i++) { + query.append(fieldNames[i]); + if (i != fieldNames.length -1) { + query.append(", "); + } + } + + query.append(" FROM ").append(tableName); + + String tableHints = + dbConf.getConf().get(SQLServerManager.TABLE_HINTS_PROP); + if (tableHints != null) { + LOG.info("Using table hints: " + tableHints); + query.append(" WITH (").append(tableHints).append(")"); + } + + query.append(" WHERE "); + if (conditions != null && conditions.length() > 0) { + // Put the user's conditions first. + query.append("( ").append(conditions).append(" ) AND "); + } + + // Now append the conditions associated with our split. + query.append(conditionClauses.toString()); + + } else { + // User provided the query. We replace the special token with + // our WHERE clause. + String inputQuery = dbConf.getInputQuery(); + if (inputQuery.indexOf(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN) == -1) { + LOG.error("Could not find the clause substitution token " + + DataDrivenDBInputFormat.SUBSTITUTE_TOKEN + " in the query: [" + + inputQuery + "]. Parallel splits may not work correctly."); + } + + query.append(inputQuery.replace(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN, + conditionClauses.toString())); + } + + LOG.info("Using query: " + query.toString()); + return query.toString(); + } +} diff --git a/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java b/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java index ac7a934a..7800944e 100644 --- a/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java +++ b/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java @@ -257,6 +257,32 @@ public void testExportCustomSchema() throws IOException, SQLException { ); } + public void testExportTableHints() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,400,sales", + "3,Fred,15,marketing", + }); + + String []extra = new String[] {"--", "--table-hints", + "ROWLOCK", + }; + runExport(getArgv(DBO_TABLE_NAME, extra)); + assertRowCount(2, escapeObjectName(DBO_TABLE_NAME), conn); + } + + public void testExportTableHintsMultiple() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,400,sales", + "3,Fred,15,marketing", + }); + + String []extra = new String[] {"--", "--table-hints", + "ROWLOCK,NOWAIT", + }; + runExport(getArgv(DBO_TABLE_NAME, extra)); + assertRowCount(2, escapeObjectName(DBO_TABLE_NAME), conn); + } + public static void assertRowCount(long expected, String tableName, Connection connection) { diff --git a/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java b/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java index bf889d00..27860c20 100644 --- a/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java +++ b/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java @@ -250,6 +250,30 @@ public void testImportDifferentSchema() throws IOException { doImportAndVerify(SCH_TABLE_NAME, expectedResults, extraArgs); } + @Test + public void testImportTableHints() throws IOException { + String [] expectedResults = { + "1,Aaron,1000000.0,engineering", + "2,Bob,400.0,sales", + "3,Fred,15.0,marketing", + }; + + String[] extraArgs = new String[] {"--table-hints", "NOLOCK"}; + doImportAndVerify(DBO_TABLE_NAME, expectedResults, extraArgs); + } + + @Test + public void testImportTableHintsMultiple() throws IOException { + String [] expectedResults = { + "1,Aaron,1000000.0,engineering", + "2,Bob,400.0,sales", + "3,Fred,15.0,marketing", + }; + + String[] extraArgs = new String[] {"--table-hints", "NOLOCK,NOWAIT"}; + doImportAndVerify(DBO_TABLE_NAME, expectedResults, extraArgs); + } + private String [] getArgv(String tableName, String ... extraArgs) { ArrayList args = new ArrayList();