From e926bf1fe01e02399240490832b9c892f1ea2e0f Mon Sep 17 00:00:00 2001 From: Andrew Bayer Date: Fri, 22 Jul 2011 20:03:46 +0000 Subject: [PATCH] New ExportOutputFormat with improved JDBC performance. Added ExportOutputFormat. JdbcExportJob uses ExportOutputFormat instead of DBOutputFormat. SqoopRecord now supports write() to multi-row INSERT statement, clone(). ClassWriter generates methods to fulfill expanded SqoopRecord contract. Added Jdbc MySQL tests to test multi-row statements. BlobRef, ClobRef classes now support Cloneable interface. Added ExportStressTest. Added Oracle-specific ExportOutputFormat subclass. From: Aaron Kimball git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149898 13f79535-47bb-0310-9956-ffa450edef68 --- build.xml | 7 +- .../sqoop/lib/BigDecimalSerializer.java | 3 - .../org/apache/hadoop/sqoop/lib/BlobRef.java | 6 + .../org/apache/hadoop/sqoop/lib/ClobRef.java | 5 + .../org/apache/hadoop/sqoop/lib/LobRef.java | 23 + .../apache/hadoop/sqoop/lib/SqoopRecord.java | 13 +- .../hadoop/sqoop/manager/OracleManager.java | 21 + .../hadoop/sqoop/mapreduce/ExportJobBase.java | 13 +- .../hadoop/sqoop/mapreduce/JdbcExportJob.java | 17 +- .../sqoop/mapreduce/MySQLExportJob.java | 8 +- .../mapreduce/SequenceFileExportMapper.java | 2 +- .../sqoop/mapreduce/TextExportMapper.java | 2 +- .../apache/hadoop/sqoop/orm/ClassWriter.java | 53 +- src/perftest/ExportStressTest.java | 148 ++++++ .../sqoop/mapreduce/ExportOutputFormat.java | 492 ++++++++++++++++++ .../mapreduce/OracleExportOutputFormat.java | 109 ++++ .../org/apache/hadoop/sqoop/TestExport.java | 92 +++- .../apache/hadoop/sqoop/ThirdPartyTests.java | 4 + .../sqoop/manager/DirectMySQLExportTest.java | 26 +- .../sqoop/manager/JdbcMySQLExportTest.java | 149 ++++++ .../sqoop/manager/OracleExportTest.java | 198 +++++++ .../hadoop/sqoop/manager/OracleUtils.java | 8 +- .../sqoop/testutil/ExportJobTestCase.java | 32 +- 23 files changed, 1365 insertions(+), 66 deletions(-) create mode 100644 src/perftest/ExportStressTest.java create mode 100644 src/shims/common/org/apache/hadoop/sqoop/mapreduce/ExportOutputFormat.java create mode 100644 src/shims/common/org/apache/hadoop/sqoop/mapreduce/OracleExportOutputFormat.java create mode 100644 src/test/org/apache/hadoop/sqoop/manager/JdbcMySQLExportTest.java create mode 100644 src/test/org/apache/hadoop/sqoop/manager/OracleExportTest.java diff --git a/build.xml b/build.xml index 61e8aa53..0adfe00c 100644 --- a/build.xml +++ b/build.xml @@ -132,8 +132,9 @@ - - + + + @@ -197,7 +198,7 @@ r = + (LobRef) super.clone(); + + r.reader = null; // Reference to opened reader is not duplicated. + if (null != data) { + r.data = deepCopyData(); + } + + return r; + } + @Override protected synchronized void finalize() throws Throwable { close(); @@ -202,6 +220,11 @@ protected abstract ACCESSORTYPE getExternalSource(LobFile.Reader reader) */ protected abstract DATATYPE getInternalData(CONTAINERTYPE data); + /** + * Make a copy of the materialized data. + */ + protected abstract CONTAINERTYPE deepCopyData(); + public DATATYPE getData() { if (isExternal()) { throw new RuntimeException( diff --git a/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java b/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java index a3aa0ce0..a4cdf881 100644 --- a/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java +++ b/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java @@ -21,16 +21,17 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.CharBuffer; +import java.sql.PreparedStatement; import java.sql.SQLException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.lib.db.DBWritable; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; /** * Interface implemented by the classes generated by sqoop's orm.ClassWriter. */ -public interface SqoopRecord extends DBWritable, Writable { +public interface SqoopRecord extends Cloneable, DBWritable, Writable { public void parse(CharSequence s) throws RecordParser.ParseError; public void parse(Text s) throws RecordParser.ParseError; public void parse(byte [] s) throws RecordParser.ParseError; @@ -39,5 +40,13 @@ public interface SqoopRecord extends DBWritable, Writable { public void parse(CharBuffer s) throws RecordParser.ParseError; public void loadLargeObjects(LargeObjectLoader objLoader) throws SQLException, IOException, InterruptedException; + public Object clone() throws CloneNotSupportedException; + + /** + * Inserts the data in this object into the PreparedStatement, starting + * at parameter 'offset'. + * @return the number of fields written to the statement. + */ + public int write(PreparedStatement stmt, int offset) throws SQLException; } diff --git a/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java b/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java index 4d69872e..26b65f75 100644 --- a/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java +++ b/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java @@ -33,8 +33,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.lib.db.OracleDataDrivenDBInputFormat; import org.apache.hadoop.sqoop.SqoopOptions; +import org.apache.hadoop.sqoop.mapreduce.JdbcExportJob; +import org.apache.hadoop.sqoop.shims.ShimLoader; +import org.apache.hadoop.sqoop.util.ExportException; import org.apache.hadoop.sqoop.util.ImportException; /** @@ -281,6 +285,7 @@ private void setSessionTimeZone(Connection conn) throws SQLException { } } + @Override public void importTable(ImportJobContext context) throws IOException, ImportException { // Specify the Oracle-specific DBInputFormat for import. @@ -288,6 +293,22 @@ public void importTable(ImportJobContext context) super.importTable(context); } + /** + * Export data stored in HDFS into a table in a database + */ + public void exportTable(ExportJobContext context) + throws IOException, ExportException { + try { + JdbcExportJob exportJob = new JdbcExportJob(context, null, null, + (Class) ShimLoader.getShimClass( + "org.apache.hadoop.sqoop.mapreduce.OracleExportOutputFormat")); + exportJob.runExport(); + } catch (ClassNotFoundException cnfe) { + throw new ExportException("Could not start export; could not find class", + cnfe); + } + } + @Override public ResultSet readTable(String tableName, String[] columns) throws SQLException { if (columns == null) { diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJobBase.java b/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJobBase.java index 25286c21..e716a889 100644 --- a/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJobBase.java +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJobBase.java @@ -37,7 +37,6 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; -import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; @@ -188,6 +187,18 @@ protected Class getInputFormatClass() } } + @Override + protected Class getOutputFormatClass() + throws ClassNotFoundException { + Class configuredOF = super.getOutputFormatClass(); + if (null == configuredOF) { + return (Class) ShimLoader.getShimClass( + "org.apache.hadoop.sqoop.mapreduce.ExportOutputFormat"); + } else { + return configuredOF; + } + } + @Override protected void configureMapper(Job job, String tableName, String tableClassName) throws ClassNotFoundException, IOException { diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/hadoop/sqoop/mapreduce/JdbcExportJob.java index cea778d1..3f75d947 100644 --- a/src/java/org/apache/hadoop/sqoop/mapreduce/JdbcExportJob.java +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/JdbcExportJob.java @@ -25,18 +25,20 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; import org.apache.hadoop.sqoop.ConnFactory; import org.apache.hadoop.sqoop.manager.ConnManager; import org.apache.hadoop.sqoop.manager.ExportJobContext; - +import org.apache.hadoop.sqoop.shims.ShimLoader; /** - * Run an export using JDBC (DBOutputFormat) + * Run an export using JDBC (JDBC-based ExportOutputFormat) */ public class JdbcExportJob extends ExportJobBase { @@ -47,6 +49,13 @@ public JdbcExportJob(final ExportJobContext context) { super(context); } + public JdbcExportJob(final ExportJobContext ctxt, + final Class mapperClass, + final Class inputFormatClass, + final Class outputFormatClass) { + super(ctxt, mapperClass, inputFormatClass, outputFormatClass); + } + @Override protected Class getMapperClass() { if (inputIsSequenceFiles()) { @@ -81,8 +90,10 @@ protected void configureOutputFormat(Job job, String tableName, } DBOutputFormat.setOutput(job, tableName, colNames); - job.setOutputFormatClass(DBOutputFormat.class); + job.setOutputFormatClass(getOutputFormatClass()); job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName); + } catch (ClassNotFoundException cnfe) { + throw new IOException("Could not load OutputFormat", cnfe); } finally { try { mgr.close(); diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/MySQLExportJob.java b/src/java/org/apache/hadoop/sqoop/mapreduce/MySQLExportJob.java index 564a3f38..3cecce2c 100644 --- a/src/java/org/apache/hadoop/sqoop/mapreduce/MySQLExportJob.java +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/MySQLExportJob.java @@ -47,13 +47,7 @@ public class MySQLExportJob extends ExportJobBase { LogFactory.getLog(MySQLExportJob.class.getName()); public MySQLExportJob(final ExportJobContext context) { - super(context); - } - - @Override - protected Class getOutputFormatClass() { - // This job does not write to the OutputCollector. Disable it. - return NullOutputFormat.class; + super(context, null, null, NullOutputFormat.class); } @Override diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/SequenceFileExportMapper.java b/src/java/org/apache/hadoop/sqoop/mapreduce/SequenceFileExportMapper.java index b04bac89..ef50bbaa 100644 --- a/src/java/org/apache/hadoop/sqoop/mapreduce/SequenceFileExportMapper.java +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/SequenceFileExportMapper.java @@ -28,7 +28,7 @@ /** * Reads a SqoopRecord from the SequenceFile in which it's packed and emits - * that DBWritable to the DBOutputFormat for writeback to the database. + * that DBWritable to the OutputFormat for writeback to the database. */ public class SequenceFileExportMapper extends AutoProgressMapper { diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/TextExportMapper.java b/src/java/org/apache/hadoop/sqoop/mapreduce/TextExportMapper.java index b70c8eea..511bdcef 100644 --- a/src/java/org/apache/hadoop/sqoop/mapreduce/TextExportMapper.java +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/TextExportMapper.java @@ -33,7 +33,7 @@ /** * Converts an input record from a string representation to a parsed Sqoop record - * and emits that DBWritable to the DBOutputFormat for writeback to the database. + * and emits that DBWritable to the OutputFormat for writeback to the database. */ public class TextExportMapper extends AutoProgressMapper { diff --git a/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java b/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java index b51d8f4e..46512d75 100644 --- a/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java +++ b/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java @@ -500,6 +500,10 @@ private void generateDbWrite(Map columnTypes, String [] colName StringBuilder sb) { sb.append(" public void write(PreparedStatement __dbStmt) throws SQLException {\n"); + sb.append(" write(__dbStmt, 0);\n"); + sb.append(" }\n\n"); + + sb.append(" public int write(PreparedStatement __dbStmt, int __off) throws SQLException {\n"); int fieldNum = 0; @@ -520,9 +524,10 @@ private void generateDbWrite(Map columnTypes, String [] colName } sb.append(" JdbcWritableBridge." + setterMethod + "(" + col + ", " - + fieldNum + ", " + sqlType + ", __dbStmt);\n"); + + fieldNum + " + __off, " + sqlType + ", __dbStmt);\n"); } + sb.append(" return " + fieldNum + ";\n"); sb.append(" }\n"); } @@ -559,7 +564,48 @@ private void generateHadoopRead(Map columnTypes, String [] colN } /** - * Generate the toString() method + * Generate the clone() method. + * @param columnTypes - mapping from column names to sql types + * @param colNames - ordered list of column names for table. + * @param sb - StringBuilder to append code to + */ + private void generateCloneMethod(Map columnTypes, + String [] colNames, StringBuilder sb) { + + TableClassName tableNameInfo = new TableClassName(options); + String className = tableNameInfo.getShortClassForTable(tableName); + + sb.append(" public Object clone() throws CloneNotSupportedException {\n"); + sb.append(" " + className + " o = (" + className + ") super.clone();\n"); + + // For each field that is mutable, we need to perform the deep copy. + for (String colName : colNames) { + int sqlType = columnTypes.get(colName); + String javaType = connManager.toJavaType(sqlType); + if (null == javaType) { + continue; + } else if (javaType.equals("java.sql.Date") + || javaType.equals("java.sql.Time") + || javaType.equals("java.sql.Timestamp") + || javaType.equals(ClobRef.class.getName()) + || javaType.equals(BlobRef.class.getName())) { + sb.append(" o." + colName + " = (" + javaType + + ") o." + colName + ".clone();\n"); + } else if (javaType.equals(BytesWritable.class.getName())) { + sb.append(" o." + colName + " = new BytesWritable(" + + "Arrays.copyOf(" + colName + ".getBytes(), " + + colName + ".getLength()));\n"); + } + } + + sb.append(" return o;\n"); + sb.append(" }\n"); + + + } + + /** + * Generate the toString() method. * @param columnTypes - mapping from column names to sql types * @param colNames - ordered list of column names for table. * @param sb - StringBuilder to append code to @@ -904,6 +950,7 @@ public StringBuilder generateClassForColumns(Map columnTypes, sb.append("import java.sql.Date;\n"); sb.append("import java.sql.Time;\n"); sb.append("import java.sql.Timestamp;\n"); + sb.append("import java.util.Arrays;\n"); sb.append("import java.util.Iterator;\n"); sb.append("import java.util.List;\n"); sb.append("\n"); @@ -921,6 +968,8 @@ public StringBuilder generateClassForColumns(Map columnTypes, generateHadoopWrite(columnTypes, colNames, sb); generateToString(columnTypes, colNames, sb); generateParser(columnTypes, colNames, sb); + generateCloneMethod(columnTypes, colNames, sb); + // TODO(aaron): Generate hashCode(), compareTo(), equals() so it can be a WritableComparable sb.append("}\n"); diff --git a/src/perftest/ExportStressTest.java b/src/perftest/ExportStressTest.java new file mode 100644 index 00000000..d95365f7 --- /dev/null +++ b/src/perftest/ExportStressTest.java @@ -0,0 +1,148 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.*; +import java.sql.*; +import java.util.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.util.*; + +import org.apache.hadoop.sqoop.Sqoop; +import org.apache.hadoop.sqoop.SqoopOptions; +import org.apache.hadoop.sqoop.tool.ExportTool; +import org.apache.hadoop.sqoop.tool.SqoopTool; + +/** + * Stress test export procedure by running a large-scale export to MySQL. + * This requires MySQL be configured with a database that can be accessed + * by the specified username without a password. The user must be able to + * create and drop tables in the database. + * + * Run with: src/scripts/run-perftest.sh ExportStressTest (connect-str) (username) + */ +public class ExportStressTest extends Configured implements Tool { + + // Export 10 GB of data. Each record is ~100 bytes. + public final static int NUM_FILES = 10; + public final static int RECORDS_PER_FILE = 10 * 1024 * 1024; + + public final static String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + public ExportStressTest() { + } + + public void createFile(int fileId) throws IOException { + Configuration conf = getConf(); + FileSystem fs = FileSystem.get(conf); + Path dirPath = new Path("ExportStressTest"); + fs.mkdirs(dirPath); + Path filePath = new Path(dirPath, "input-" + fileId); + + OutputStream os = fs.create(filePath); + Writer w = new BufferedWriter(new OutputStreamWriter(os)); + for (int i = 0; i < RECORDS_PER_FILE; i++) { + long v = (long) i + ((long) RECORDS_PER_FILE * (long) fileId); + w.write("" + v + "," + ALPHABET + ALPHABET + ALPHABET + ALPHABET + "\n"); + + } + w.close(); + os.close(); + } + + /** Create a set of data files to export. */ + public void createData() throws IOException { + Configuration conf = getConf(); + FileSystem fs = FileSystem.get(conf); + Path dirPath = new Path("ExportStressTest"); + if (fs.exists(dirPath)) { + System.out.println( + "Export directory appears to already exist. Skipping data-gen."); + return; + } + + for (int i = 0; i < NUM_FILES; i++) { + createFile(i); + } + } + + /** Create a table to hold our results. Drop any existing definition. */ + public void createTable(String connectStr, String username) throws Exception { + Class.forName("com.mysql.jdbc.Driver"); // Load mysql driver. + + Connection conn = DriverManager.getConnection(connectStr, username, null); + conn.setAutoCommit(false); + PreparedStatement stmt = conn.prepareStatement( + "DROP TABLE IF EXISTS ExportStressTestTable", ResultSet.TYPE_FORWARD_ONLY, + ResultSet.CONCUR_READ_ONLY); + stmt.executeUpdate(); + stmt.close(); + + stmt = conn.prepareStatement( + "CREATE TABLE ExportStressTestTable(id INT NOT NULL PRIMARY KEY, " + + "msg VARCHAR(110)) Engine=InnoDB", ResultSet.TYPE_FORWARD_ONLY, + ResultSet.CONCUR_READ_ONLY); + stmt.executeUpdate(); + stmt.close(); + conn.commit(); + conn.close(); + } + + /** Actually run the export of the generated data to the user-created table. */ + public void runExport(String connectStr, String username) throws Exception { + SqoopOptions options = new SqoopOptions(getConf()); + options.setConnectString(connectStr); + options.setTableName("ExportStressTestTable"); + options.setUsername(username); + options.setExportDir("ExportStressTest"); + options.setNumMappers(4); + options.setLinesTerminatedBy('\n'); + options.setFieldsTerminatedBy(','); + options.setExplicitDelims(true); + + SqoopTool exportTool = new ExportTool(); + Sqoop sqoop = new Sqoop(exportTool, getConf(), options); + int ret = Sqoop.runSqoop(sqoop, new String[0]); + if (0 != ret) { + throw new Exception("Error doing export; ret=" + ret); + } + } + + @Override + public int run(String [] args) { + String connectStr = args[0]; + String username = args[1]; + + try { + createData(); + createTable(connectStr, username); + runExport(connectStr, username); + } catch (Exception e) { + System.err.println("Error: " + StringUtils.stringifyException(e)); + return 1; + } + + return 0; + } + + public static void main(String [] args) throws Exception { + ExportStressTest test = new ExportStressTest(); + int ret = ToolRunner.run(test, args); + System.exit(ret); + } +} diff --git a/src/shims/common/org/apache/hadoop/sqoop/mapreduce/ExportOutputFormat.java b/src/shims/common/org/apache/hadoop/sqoop/mapreduce/ExportOutputFormat.java new file mode 100644 index 00000000..b3831ea7 --- /dev/null +++ b/src/shims/common/org/apache/hadoop/sqoop/mapreduce/ExportOutputFormat.java @@ -0,0 +1,492 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.SynchronousQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; +import org.apache.hadoop.util.StringUtils; + +import org.apache.hadoop.sqoop.lib.SqoopRecord; + +/** + * Insert the emitted keys as records into a database table. + * This supports a configurable "spill threshold" at which + * point intermediate transactions are committed. + * + * Record objects are buffered before actually performing the INSERT + * statements; this requires that the key implement the + * SqoopRecord interface. + * + * Uses DBOutputFormat/DBConfiguration for configuring the output. + */ +public class ExportOutputFormat + extends OutputFormat { + + /** conf key: number of rows to export per INSERT statement. */ + public static final String RECORDS_PER_STATEMENT_KEY = + "sqoop.export.records.per.statement"; + + /** conf key: number of INSERT statements to bundle per tx. + * If this is set to -1, then a single transaction will be used + * per task. Note that each statement may encompass multiple + * rows, depending on the value of sqoop.export.records.per.statement. + */ + public static final String STATEMENTS_PER_TRANSACTION_KEY = + "sqoop.export.statements.per.transaction"; + + private static final int DEFAULT_RECORDS_PER_STATEMENT = 100; + private static final int DEFAULT_STATEMENTS_PER_TRANSACTION = 100; + private static final int UNLIMITED_STATEMENTS_PER_TRANSACTION = -1; + + private static final Log LOG = LogFactory.getLog(ExportOutputFormat.class); + + /** {@inheritDoc} */ + public void checkOutputSpecs(JobContext context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + DBConfiguration dbConf = new DBConfiguration(conf); + + // Sanity check all the configuration values we need. + if (null == conf.get(DBConfiguration.URL_PROPERTY)) { + throw new IOException("Database connection URL is not set."); + } else if (null == dbConf.getOutputTableName()) { + throw new IOException("Table name is not set for export"); + } else if (null == dbConf.getOutputFieldNames() + && 0 == dbConf.getOutputFieldCount()) { + throw new IOException( + "Output field names are null and zero output field count set."); + } + } + + /** {@inheritDoc} */ + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new OutputCommitter() { + public void abortTask(TaskAttemptContext taskContext) { } + public void cleanupJob(JobContext jobContext) { } + public void commitTask(TaskAttemptContext taskContext) { } + public boolean needsTaskCommit(TaskAttemptContext taskContext) { + return false; + } + public void setupJob(JobContext jobContext) { } + public void setupTask(TaskAttemptContext taskContext) { } + }; + } + + /** {@inheritDoc} */ + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException { + try { + return new ExportRecordWriter(context); + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * Represents a database update operation that should be performed + * by an asynchronous background thread. + * AsyncDBOperation objects are immutable. + * They MAY contain a statement which should be executed. The + * statement may also be null. + * + * They may also set 'forceCommit' to true. If true, then the + * executor of this operation should commit the current + * transaction, even if stmt is null. + */ + private static class AsyncDBOperation { + private final PreparedStatement stmt; + private final boolean forceCommit; + private final boolean close; + + /** + * Create an asynchronous database operation. + * @param s the statement, if any, to execute. + * @param forceCommit if true, the current transaction should be committed. + * @param close if true, the executor thread should stop after processing + * this operation. + */ + public AsyncDBOperation(PreparedStatement s, boolean forceCommit, + boolean close) { + this.stmt = s; + this.forceCommit = forceCommit; + this.close = close; + } + + /** + * @return a statement to run as an update. + */ + public PreparedStatement getStatement() { + return stmt; + } + + /** + * @return true if the executor should commit the current transaction. + * If getStatement() is non-null, the statement is run first. + */ + public boolean requiresCommit() { + return forceCommit; + } + + /** + * @return true if the executor should stop after this command. + */ + public boolean stop() { + return this.close; + } + } + + /** + * A thread that runs the database interactions asynchronously + * from the OutputCollector. + */ + private static class ExportUpdateThread extends Thread { + + private final Connection conn; // The connection to the database. + private SQLException err; // Error from a previously-run statement. + + // How we receive database operations from the RecordWriter. + private SynchronousQueue opsQueue; + + protected int curNumStatements; // statements executed thus far in the tx. + protected final int stmtsPerTx; // statements per transaction. + + /** + * Create a new update thread that interacts with the database. + * @param conn the connection to use. This must only be used by this + * thread. + * @param stmtsPerTx the number of statements to execute before committing + * the current transaction. + */ + public ExportUpdateThread(Connection conn, int stmtsPerTx) { + this.conn = conn; + this.err = null; + this.opsQueue = new SynchronousQueue(); + this.stmtsPerTx = stmtsPerTx; + } + + public void run() { + while (true) { + AsyncDBOperation op = null; + try { + op = opsQueue.take(); + } catch (InterruptedException ie) { + LOG.warn("Interrupted retrieving from operation queue: " + + StringUtils.stringifyException(ie)); + continue; + } + + if (null == op) { + // This shouldn't be allowed to happen. + LOG.warn("Null operation in queue; illegal state."); + continue; + } + + PreparedStatement stmt = op.getStatement(); + // Synchronize on the connection to ensure it does not conflict + // with the prepareStatement() call in the main thread. + synchronized (conn) { + try { + if (null != stmt) { + stmt.executeUpdate(); + stmt.close(); + stmt = null; + this.curNumStatements++; + } + + if (op.requiresCommit() || (curNumStatements >= stmtsPerTx + && stmtsPerTx != UNLIMITED_STATEMENTS_PER_TRANSACTION)) { + LOG.debug("Committing transaction of " + curNumStatements + + " statements"); + this.conn.commit(); + this.curNumStatements = 0; + } + } catch (SQLException sqlE) { + setLastError(sqlE); + } finally { + // Close the statement on our way out if that didn't happen + // via the normal execution path. + if (null != stmt) { + try { + stmt.close(); + } catch (SQLException sqlE) { + setLastError(sqlE); + } + } + + // Always check whether we should end the loop, regardless + // of the presence of an exception. + if (op.stop()) { + // Don't continue processing after this operation. + try { + conn.close(); + } catch (SQLException sqlE) { + setLastError(sqlE); + } + return; + } + } // try .. catch .. finally. + } // synchronized (conn) + } + } + + /** + * Allows a user to enqueue the next database operation to run. + * Since the connection can only execute a single operation at a time, + * the put() method may block if another operation is already underway. + * @param op the database operation to perform. + */ + public void put(AsyncDBOperation op) throws InterruptedException { + opsQueue.put(op); + } + + /** + * If a previously-executed statement resulted in an error, post it here. + * If the error slot was already filled, then subsequent errors are + * squashed until the user calls this method (which clears the error + * slot). + * @return any SQLException that occurred due to a previously-run + * statement. + */ + public synchronized SQLException getLastError() { + SQLException e = this.err; + this.err = null; + return e; + } + + private synchronized void setLastError(SQLException e) { + if (this.err == null) { + // Just set it. + LOG.error("Got exception in update thread: " + + StringUtils.stringifyException(e)); + this.err = e; + } else { + // Slot is full. Log it and discard. + LOG.error("SQLException in update thread but error slot full: " + + StringUtils.stringifyException(e)); + } + } + } + + /** + * RecordWriter to write the output to a row in a database table. + * The actual database updates are executed in a second thread. + */ + public class ExportRecordWriter extends RecordWriter { + + protected Connection connection; + + protected Configuration conf; + + protected int rowsPerStmt; // rows to insert per statement. + + // Buffer for records to be put in an INSERT statement. + protected List records; + + protected String tableName; + protected String [] columnNames; // The columns to insert into. + protected int columnCount; // If columnNames is null, tells ## of cols. + + // Background thread to actually perform the updates. + private ExportUpdateThread updateThread; + private boolean startedUpdateThread; + + public ExportRecordWriter(TaskAttemptContext context) + throws ClassNotFoundException, SQLException { + this.conf = context.getConfiguration(); + + this.rowsPerStmt = conf.getInt(RECORDS_PER_STATEMENT_KEY, + DEFAULT_RECORDS_PER_STATEMENT); + int stmtsPerTx = conf.getInt(STATEMENTS_PER_TRANSACTION_KEY, + DEFAULT_STATEMENTS_PER_TRANSACTION); + + DBConfiguration dbConf = new DBConfiguration(conf); + this.connection = dbConf.getConnection(); + this.tableName = dbConf.getOutputTableName(); + this.columnNames = dbConf.getOutputFieldNames(); + this.columnCount = dbConf.getOutputFieldCount(); + + this.connection.setAutoCommit(false); + + this.records = new ArrayList(this.rowsPerStmt); + + this.updateThread = new ExportUpdateThread(connection, stmtsPerTx); + this.updateThread.setDaemon(true); + this.startedUpdateThread = false; + } + + /** + * @return an INSERT statement suitable for inserting 'numRows' rows. + */ + protected String getInsertStatement(int numRows) { + StringBuilder sb = new StringBuilder(); + + sb.append("INSERT INTO " + tableName + " "); + + int numSlots; + if (this.columnNames != null) { + numSlots = this.columnNames.length; + + sb.append("("); + boolean first = true; + for (String col : columnNames) { + if (!first) { + sb.append(", "); + } + + sb.append(col); + first = false; + } + + sb.append(") "); + } else { + numSlots = this.columnCount; // set if columnNames is null. + } + + sb.append("VALUES "); + + // generates the (?, ?, ?...) used for each row. + StringBuilder sbRow = new StringBuilder(); + sbRow.append("("); + for (int i = 0; i < numSlots; i++) { + if (i != 0) { + sbRow.append(", "); + } + + sbRow.append("?"); + } + sbRow.append(")"); + + // Now append that numRows times. + for (int i = 0; i < numRows; i++) { + if (i != 0) { + sb.append(", "); + } + + sb.append(sbRow); + } + + return sb.toString(); + } + + /** + * Takes the current contents of 'records' and formats and executes the + * INSERT statement. + * @param closeConn if true, commits the transaction and closes the + * connection. + */ + private void insertRows(boolean closeConn) + throws InterruptedException, SQLException { + + if (!startedUpdateThread) { + this.updateThread.start(); + this.startedUpdateThread = true; + } + + PreparedStatement stmt = null; + boolean successfulPut = false; + try { + if (records.size() > 0) { + // Synchronize on connection to ensure this does not conflict + // with the operations in the update thread. + synchronized (connection) { + stmt = connection.prepareStatement( + getInsertStatement(records.size())); + } + + // Inject the record parameters into the VALUES clauses. + int position = 0; + for (SqoopRecord record : records) { + position += record.write(stmt, position); + } + + this.records.clear(); + } + + // Pass this operation off to the update thread. This will block if + // the update thread is already performing an update. + AsyncDBOperation op = new AsyncDBOperation(stmt, closeConn, closeConn); + updateThread.put(op); + successfulPut = true; // op has been posted to the other thread. + } finally { + if (!successfulPut && null != stmt) { + // We created a statement but failed to enqueue it. Close it. + stmt.close(); + } + } + + // Check for any previous SQLException. If one happened, rethrow it here. + SQLException lastException = updateThread.getLastError(); + if (null != lastException) { + throw lastException; + } + } + + /** {@inheritDoc} */ + public void close(TaskAttemptContext context) + throws IOException, InterruptedException { + try { + insertRows(true); + } catch (SQLException sqle) { + throw new IOException(sqle); + } finally { + updateThread.join(); + } + + // If we're not leaving on an error return path already, + // now that updateThread is definitely stopped, check that the + // error slot remains empty. + SQLException lastErr = updateThread.getLastError(); + if (null != lastErr) { + throw new IOException(lastErr); + } + } + + /** {@inheritDoc} */ + public void write(K key, V value) + throws InterruptedException, IOException { + try { + records.add((SqoopRecord) key.clone()); + if (records.size() >= this.rowsPerStmt) { + insertRows(false); + } + } catch (CloneNotSupportedException cnse) { + throw new IOException("Could not buffer record", cnse); + } catch (SQLException sqlException) { + throw new IOException(sqlException); + } + } + } +} diff --git a/src/shims/common/org/apache/hadoop/sqoop/mapreduce/OracleExportOutputFormat.java b/src/shims/common/org/apache/hadoop/sqoop/mapreduce/OracleExportOutputFormat.java new file mode 100644 index 00000000..dd2d0830 --- /dev/null +++ b/src/shims/common/org/apache/hadoop/sqoop/mapreduce/OracleExportOutputFormat.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.SQLException; + +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import org.apache.hadoop.sqoop.lib.SqoopRecord; + +/** + * Oracle-specific SQL formatting overrides default ExportOutputFormat's. + */ +public class OracleExportOutputFormat + extends ExportOutputFormat { + + /** {@inheritDoc} */ + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException { + try { + return new OracleExportRecordWriter(context); + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * RecordWriter to write the output to a row in a database table. + * The actual database updates are executed in a second thread. + */ + public class OracleExportRecordWriter extends ExportRecordWriter { + + public OracleExportRecordWriter(TaskAttemptContext context) + throws ClassNotFoundException, SQLException { + super(context); + } + + @Override + /** + * @return an INSERT statement suitable for inserting 'numRows' rows. + */ + protected String getInsertStatement(int numRows) { + StringBuilder sb = new StringBuilder(); + + sb.append("INSERT INTO " + tableName + " "); + + int numSlots; + if (this.columnNames != null) { + numSlots = this.columnNames.length; + + sb.append("("); + boolean first = true; + for (String col : columnNames) { + if (!first) { + sb.append(", "); + } + + sb.append(col); + first = false; + } + + sb.append(") "); + } else { + numSlots = this.columnCount; // set if columnNames is null. + } + + // generates the (?, ?, ?...) used for each row. + StringBuilder sbRow = new StringBuilder(); + sbRow.append("SELECT "); + for (int i = 0; i < numSlots; i++) { + if (i != 0) { + sbRow.append(", "); + } + + sbRow.append("?"); + } + sbRow.append(" FROM DUAL "); + + // Now append that numRows times. + for (int i = 0; i < numRows; i++) { + if (i != 0) { + sb.append("UNION ALL "); + } + + sb.append(sbRow); + } + + return sb.toString(); + } + } +} diff --git a/src/test/org/apache/hadoop/sqoop/TestExport.java b/src/test/org/apache/hadoop/sqoop/TestExport.java index 76e0edf7..ec0e4e70 100644 --- a/src/test/org/apache/hadoop/sqoop/TestExport.java +++ b/src/test/org/apache/hadoop/sqoop/TestExport.java @@ -119,7 +119,7 @@ private String getRecordLine(int recordNum, ColumnGenerator... extraCols) { as what the string representation of the column as returned by the database should look like. */ - interface ColumnGenerator { + public interface ColumnGenerator { /** for a row with id rowNum, what should we write into that line of the text file to export? */ @@ -400,13 +400,20 @@ protected void multiFileTest(int numFiles, int recordsPerMap, int numMaps, } createTable(); - runExport(getArgv(true, newStrArray(argv, "-m", "" + numMaps))); + runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + numMaps))); verifyExport(TOTAL_RECORDS); } finally { LOG.info("multi-file test complete"); } } + /** + * Run an "export" on an empty file. + */ + public void testEmptyExport() throws IOException, SQLException { + multiFileTest(1, 0, 1); + } + /** Export 10 rows, make sure they load in correctly */ public void testTextExport() throws IOException, SQLException { multiFileTest(1, 10, 1); @@ -435,11 +442,45 @@ public void testGzipExport() throws IOException, SQLException { createTextFile(0, TOTAL_RECORDS, true); createTable(); - runExport(getArgv(true)); + runExport(getArgv(true, 10, 10)); verifyExport(TOTAL_RECORDS); LOG.info("Complete gzip export test"); } + /** + * Ensure that we use multiple statements in a transaction. + */ + public void testMultiStatement() throws IOException, SQLException { + final int TOTAL_RECORDS = 20; + createTextFile(0, TOTAL_RECORDS, true); + createTable(); + runExport(getArgv(true, 10, 10)); + verifyExport(TOTAL_RECORDS); + } + + /** + * Ensure that we use multiple transactions in a single mapper. + */ + public void testMultiTransaction() throws IOException, SQLException { + final int TOTAL_RECORDS = 20; + createTextFile(0, TOTAL_RECORDS, true); + createTable(); + runExport(getArgv(true, 5, 2)); + verifyExport(TOTAL_RECORDS); + } + + /** + * Ensure that when we don't force a commit with a statement cap, + * it happens anyway. + */ + public void testUnlimitedTransactionSize() throws IOException, SQLException { + final int TOTAL_RECORDS = 20; + createTextFile(0, TOTAL_RECORDS, true); + createTable(); + runExport(getArgv(true, 5, -1)); + verifyExport(TOTAL_RECORDS); + } + /** Run 2 mappers, make sure all records load in correctly */ public void testMultiMapTextExport() throws IOException, SQLException { @@ -451,7 +492,7 @@ public void testMultiMapTextExport() throws IOException, SQLException { } createTable(); - runExport(getArgv(true)); + runExport(getArgv(true, 10, 10)); verifyExport(RECORDS_PER_MAP * NUM_FILES); } @@ -508,7 +549,8 @@ public void testSequenceFileExport() throws Exception { // Now run and verify the export. LOG.info("Exporting SequenceFile-based data"); - runExport(getArgv(true, "--class-name", className, "--jar-file", jarFileName)); + runExport(getArgv(true, 10, 10, "--class-name", className, + "--jar-file", jarFileName)); verifyExport(TOTAL_RECORDS); } finally { if (null != prevClassLoader) { @@ -535,11 +577,16 @@ public String getType() { createTextFile(0, TOTAL_RECORDS, false, gen); createTable(gen); - runExport(getArgv(true)); + runExport(getArgv(true, 10, 10)); verifyExport(TOTAL_RECORDS); assertColMinAndMax(forIdx(0), gen); } + /** @return the column type for a large integer */ + protected String getBigIntType() { + return "BIGINT"; + } + public void testBigIntCol() throws IOException, SQLException { final int TOTAL_RECORDS = 10; @@ -554,18 +601,18 @@ public String getVerifyText(int rowNum) { return "" + val; } public String getType() { - return "BIGINT"; + return getBigIntType(); } }; createTextFile(0, TOTAL_RECORDS, false, gen); createTable(gen); - runExport(getArgv(true)); + runExport(getArgv(true, 10, 10)); verifyExport(TOTAL_RECORDS); assertColMinAndMax(forIdx(0), gen); } - private String pad(int n) { + protected String pad(int n) { if (n <= 9) { return "0" + n; } else { @@ -573,10 +620,11 @@ private String pad(int n) { } } - public void testDatesAndTimes() throws IOException, SQLException { - final int TOTAL_RECORDS = 10; - - ColumnGenerator genDate = new ColumnGenerator() { + /** + * Get a column generator for DATE columns + */ + protected ColumnGenerator getDateColumnGenerator() { + return new ColumnGenerator() { public String getExportText(int rowNum) { int day = rowNum + 1; return "2009-10-" + day; @@ -589,8 +637,13 @@ public String getType() { return "DATE"; } }; + } - ColumnGenerator genTime = new ColumnGenerator() { + /** + * Get a column generator for TIME columns. + */ + protected ColumnGenerator getTimeColumnGenerator() { + return new ColumnGenerator() { public String getExportText(int rowNum) { return "10:01:" + rowNum; } @@ -601,10 +654,17 @@ public String getType() { return "TIME"; } }; + } + + public void testDatesAndTimes() throws IOException, SQLException { + final int TOTAL_RECORDS = 10; + + ColumnGenerator genDate = getDateColumnGenerator(); + ColumnGenerator genTime = getTimeColumnGenerator(); createTextFile(0, TOTAL_RECORDS, false, genDate, genTime); createTable(genDate, genTime); - runExport(getArgv(true)); + runExport(getArgv(true, 10, 10)); verifyExport(TOTAL_RECORDS); assertColMinAndMax(forIdx(0), genDate); assertColMinAndMax(forIdx(1), genTime); @@ -647,7 +707,7 @@ public String getType() { createTextFile(0, TOTAL_RECORDS, false, genFloat, genNumeric); createTable(genFloat, genNumeric); - runExport(getArgv(true)); + runExport(getArgv(true, 10, 10)); verifyExport(TOTAL_RECORDS); assertColMinAndMax(forIdx(0), genFloat); assertColMinAndMax(forIdx(1), genNumeric); diff --git a/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java b/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java index 05cf3546..c51deed1 100644 --- a/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java +++ b/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java @@ -24,8 +24,10 @@ import org.apache.hadoop.sqoop.manager.DirectMySQLTest; import org.apache.hadoop.sqoop.manager.DirectMySQLExportTest; +import org.apache.hadoop.sqoop.manager.JdbcMySQLExportTest; import org.apache.hadoop.sqoop.manager.MySQLAuthTest; import org.apache.hadoop.sqoop.manager.MySQLCompatTest; +import org.apache.hadoop.sqoop.manager.OracleExportTest; import org.apache.hadoop.sqoop.manager.OracleManagerTest; import org.apache.hadoop.sqoop.manager.OracleCompatTest; import org.apache.hadoop.sqoop.manager.PostgresqlTest; @@ -44,8 +46,10 @@ public static Test suite() { + "implementations in Sqoop"); suite.addTestSuite(DirectMySQLTest.class); suite.addTestSuite(DirectMySQLExportTest.class); + suite.addTestSuite(JdbcMySQLExportTest.class); suite.addTestSuite(MySQLAuthTest.class); suite.addTestSuite(MySQLCompatTest.class); + suite.addTestSuite(OracleExportTest.class); suite.addTestSuite(OracleManagerTest.class); suite.addTestSuite(OracleCompatTest.class); suite.addTestSuite(PostgresqlTest.class); diff --git a/src/test/org/apache/hadoop/sqoop/manager/DirectMySQLExportTest.java b/src/test/org/apache/hadoop/sqoop/manager/DirectMySQLExportTest.java index ad976c45..047769b6 100644 --- a/src/test/org/apache/hadoop/sqoop/manager/DirectMySQLExportTest.java +++ b/src/test/org/apache/hadoop/sqoop/manager/DirectMySQLExportTest.java @@ -81,34 +81,13 @@ public void setUp() { options.setUsername(MySQLTestUtils.getCurrentUser()); this.manager = new DirectMySQLManager(options); - Statement st = null; - try { this.conn = manager.getConnection(); this.conn.setAutoCommit(false); - st = this.conn.createStatement(); - - // create the database table. - st.executeUpdate("DROP TABLE IF EXISTS " + getTableName()); - st.executeUpdate("CREATE TABLE " + getTableName() + " (" - + "id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, " - + "name VARCHAR(24) NOT NULL, " - + "start_date DATE, " - + "salary FLOAT, " - + "dept VARCHAR(32))"); - this.conn.commit(); } catch (SQLException sqlE) { LOG.error("Encountered SQL Exception: " + sqlE); sqlE.printStackTrace(); fail("SQLException when running test setUp(): " + sqlE); - } finally { - try { - if (null != st) { - st.close(); - } - } catch (SQLException sqlE) { - LOG.warn("Got SQLException when closing connection: " + sqlE); - } } } @@ -153,11 +132,12 @@ public void tearDown() { @Override protected String [] getArgv(boolean includeHadoopFlags, - String... additionalArgv) { + int rowsPerStatement, int statementsPerTx, String... additionalArgv) { String [] subArgv = newStrArray(additionalArgv, "--direct", "--username", MySQLTestUtils.getCurrentUser()); - return super.getArgv(includeHadoopFlags, subArgv); + return super.getArgv(includeHadoopFlags, rowsPerStatement, + statementsPerTx, subArgv); } /** diff --git a/src/test/org/apache/hadoop/sqoop/manager/JdbcMySQLExportTest.java b/src/test/org/apache/hadoop/sqoop/manager/JdbcMySQLExportTest.java new file mode 100644 index 00000000..5783abb4 --- /dev/null +++ b/src/test/org/apache/hadoop/sqoop/manager/JdbcMySQLExportTest.java @@ -0,0 +1,149 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.manager; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.sqoop.SqoopOptions; +import org.apache.hadoop.sqoop.TestExport; +import org.apache.hadoop.sqoop.mapreduce.MySQLExportMapper; + +/** + * Test the MySQLManager implementation's exportJob() functionality. + * This does a better test of ExportOutputFormat than TestExport does, + * because it supports multi-row INSERT statements. + */ +public class JdbcMySQLExportTest extends TestExport { + + public static final Log LOG = LogFactory.getLog( + JdbcMySQLExportTest.class.getName()); + + static final String TABLE_PREFIX = "EXPORT_MYSQL_J_"; + + // instance variables populated during setUp, used during tests. + private MySQLManager manager; + private Connection conn; + + @Override + protected Connection getConnection() { + return conn; + } + + // MySQL allows multi-row INSERT statements. + @Override + protected int getMaxRowsPerStatement() { + return 1000; + } + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + @Override + protected String getConnectString() { + return MySQLTestUtils.CONNECT_STRING; + } + + @Override + protected String getTablePrefix() { + return TABLE_PREFIX; + } + + @Override + protected String getDropTableStatement(String tableName) { + return "DROP TABLE IF EXISTS " + tableName; + } + + @Before + public void setUp() { + super.setUp(); + + SqoopOptions options = new SqoopOptions(MySQLTestUtils.CONNECT_STRING, + getTableName()); + options.setUsername(MySQLTestUtils.getCurrentUser()); + this.manager = new MySQLManager(options); + try { + this.conn = manager.getConnection(); + this.conn.setAutoCommit(false); + } catch (SQLException sqlE) { + LOG.error(StringUtils.stringifyException(sqlE)); + fail("Failed with sql exception in setup: " + sqlE); + } + } + + @After + public void tearDown() { + super.tearDown(); + + if (null != this.conn) { + try { + this.conn.close(); + } catch (SQLException sqlE) { + LOG.error("Got SQLException closing conn: " + sqlE.toString()); + } + } + + if (null != manager) { + try { + manager.close(); + manager = null; + } catch (SQLException sqlE) { + LOG.error("Got SQLException: " + sqlE.toString()); + fail("Got SQLException: " + sqlE.toString()); + } + } + } + + @Override + protected String [] getCodeGenArgv(String... extraArgs) { + + String [] moreArgs = new String[extraArgs.length + 2]; + int i = 0; + for (i = 0; i < extraArgs.length; i++) { + moreArgs[i] = extraArgs[i]; + } + + // Add username argument for mysql. + moreArgs[i++] = "--username"; + moreArgs[i++] = MySQLTestUtils.getCurrentUser(); + + return super.getCodeGenArgv(moreArgs); + } + + @Override + protected String [] getArgv(boolean includeHadoopFlags, + int rowsPerStatement, int statementsPerTx, String... additionalArgv) { + + String [] subArgv = newStrArray(additionalArgv, + "--username", MySQLTestUtils.getCurrentUser()); + return super.getArgv(includeHadoopFlags, rowsPerStatement, + statementsPerTx, subArgv); + } +} diff --git a/src/test/org/apache/hadoop/sqoop/manager/OracleExportTest.java b/src/test/org/apache/hadoop/sqoop/manager/OracleExportTest.java new file mode 100644 index 00000000..29220058 --- /dev/null +++ b/src/test/org/apache/hadoop/sqoop/manager/OracleExportTest.java @@ -0,0 +1,198 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.manager; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.sqoop.SqoopOptions; +import org.apache.hadoop.sqoop.TestExport; + +/** + * Test the OracleManager implementation's exportJob() functionality. + * This tests the OracleExportOutputFormat (which subclasses + * ExportOutputFormat with Oracle-specific SQL statements). + */ +public class OracleExportTest extends TestExport { + + public static final Log LOG = LogFactory.getLog( + OracleExportTest.class.getName()); + + static final String TABLE_PREFIX = "EXPORT_ORACLE_"; + + // instance variables populated during setUp, used during tests. + private OracleManager manager; + private Connection conn; + + @Override + protected Connection getConnection() { + return conn; + } + + // Oracle allows multi-row inserts (with its own syntax). + @Override + protected int getMaxRowsPerStatement() { + return 1000; + } + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + @Override + protected String getConnectString() { + return OracleUtils.CONNECT_STRING; + } + + @Override + protected String getTablePrefix() { + return TABLE_PREFIX; + } + + @Override + protected String getDropTableStatement(String tableName) { + return OracleUtils.getDropTableStatement(tableName); + } + + @Before + public void setUp() { + super.setUp(); + + SqoopOptions options = new SqoopOptions(OracleUtils.CONNECT_STRING, + getTableName()); + OracleUtils.setOracleAuth(options); + this.manager = new OracleManager(options); + try { + this.conn = manager.getConnection(); + this.conn.setAutoCommit(false); + } catch (SQLException sqlE) { + LOG.error(StringUtils.stringifyException(sqlE)); + fail("Failed with sql exception in setup: " + sqlE); + } + } + + @After + public void tearDown() { + super.tearDown(); + + if (null != this.conn) { + try { + this.conn.close(); + } catch (SQLException sqlE) { + LOG.error("Got SQLException closing conn: " + sqlE.toString()); + } + } + + if (null != manager) { + try { + manager.close(); + manager = null; + } catch (SQLException sqlE) { + LOG.error("Got SQLException: " + sqlE.toString()); + fail("Got SQLException: " + sqlE.toString()); + } + } + } + + @Override + protected String [] getCodeGenArgv(String... extraArgs) { + String [] moreArgs = new String[extraArgs.length + 4]; + int i = 0; + for (i = 0; i < extraArgs.length; i++) { + moreArgs[i] = extraArgs[i]; + } + + // Add username and password args. + moreArgs[i++] = "--username"; + moreArgs[i++] = OracleUtils.ORACLE_USER_NAME; + moreArgs[i++] = "--password"; + moreArgs[i++] = OracleUtils.ORACLE_USER_PASS; + + return super.getCodeGenArgv(moreArgs); + } + + @Override + protected String [] getArgv(boolean includeHadoopFlags, + int rowsPerStatement, int statementsPerTx, String... additionalArgv) { + + String [] subArgv = newStrArray(additionalArgv, + "--username", OracleUtils.ORACLE_USER_NAME, + "--password", OracleUtils.ORACLE_USER_PASS); + return super.getArgv(includeHadoopFlags, rowsPerStatement, + statementsPerTx, subArgv); + } + + @Override + protected ColumnGenerator getDateColumnGenerator() { + // Return a TIMESTAMP generator that has increasing date values. + // We currently do not support export of DATE columns since + // Oracle informs us that they are actually TIMESTAMP; this messes + // up Sqoop's parsing of values as we have no way of knowing they're + // really supposed to be dates based on the Oracle Jdbc Metadata. + return new ColumnGenerator() { + public String getExportText(int rowNum) { + int day = rowNum + 1; + return "2009-10-" + pad(day) + " 00:00:00.0"; + } + public String getVerifyText(int rowNum) { + int day = rowNum + 1; + return "2009-10-" + day + " 0:0:0. 0"; + } + public String getType() { + return "TIMESTAMP"; // TODO: Support DATE more intelligently. + } + }; + } + + @Override + protected ColumnGenerator getTimeColumnGenerator() { + // Return a TIMESTAMP generator that has increasing time values. + // We currently do not support the export of DATE columns with + // only a time component set (because Oracle reports these column + // types to Sqoop as TIMESTAMP, and we parse the user's text + // incorrectly based on this result). + return new ColumnGenerator() { + public String getExportText(int rowNum) { + return "1970-01-01 10:01:" + pad(rowNum) + ".0"; + } + public String getVerifyText(int rowNum) { + return "1970-1-1 10:1:" + rowNum + ". 0"; + } + public String getType() { + return "TIMESTAMP"; + } + }; + } + + @Override + protected String getBigIntType() { + // Oracle stores everything in NUMERIC columns. + return "NUMERIC(12,0)"; + } +} diff --git a/src/test/org/apache/hadoop/sqoop/manager/OracleUtils.java b/src/test/org/apache/hadoop/sqoop/manager/OracleUtils.java index 3c91fbe4..9caf8eec 100644 --- a/src/test/org/apache/hadoop/sqoop/manager/OracleUtils.java +++ b/src/test/org/apache/hadoop/sqoop/manager/OracleUtils.java @@ -80,8 +80,7 @@ public static void dropTable(String tableName, ConnManager manager) st = connection.createStatement(); // create the database table and populate it with data. - st.executeUpdate("BEGIN EXECUTE IMMEDIATE 'DROP TABLE " + tableName + "'; " - + "exception when others then null; end;"); + st.executeUpdate(getDropTableStatement(tableName)); connection.commit(); } finally { @@ -94,4 +93,9 @@ public static void dropTable(String tableName, ConnManager manager) } } } + + public static String getDropTableStatement(String tableName) { + return "BEGIN EXECUTE IMMEDIATE 'DROP TABLE " + tableName + "'; " + + "exception when others then null; end;"; + } } diff --git a/src/test/org/apache/hadoop/sqoop/testutil/ExportJobTestCase.java b/src/test/org/apache/hadoop/sqoop/testutil/ExportJobTestCase.java index 6599f1c9..8a1834b9 100644 --- a/src/test/org/apache/hadoop/sqoop/testutil/ExportJobTestCase.java +++ b/src/test/org/apache/hadoop/sqoop/testutil/ExportJobTestCase.java @@ -31,11 +31,12 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.sqoop.Sqoop; +import org.apache.hadoop.sqoop.mapreduce.ExportOutputFormat; import org.apache.hadoop.sqoop.tool.ExportTool; /** * Class that implements common methods required for tests which export data - * from HDFS to databases, to verify correct export + * from HDFS to databases, to verify correct export. */ public class ExportJobTestCase extends BaseSqoopTestCase { @@ -45,6 +46,15 @@ protected String getTablePrefix() { return "EXPORT_TABLE_"; } + /** + * @return the maximum rows to fold into an INSERT statement. + * HSQLDB can only support the single-row INSERT syntax. Other databases + * can support greater numbers of rows per statement. + */ + protected int getMaxRowsPerStatement() { + return 1; + } + /** * @return a connection to the database under test. */ @@ -60,13 +70,27 @@ protected Connection getConnection() { /** * Create the argv to pass to Sqoop * @param includeHadoopFlags if true, then include -D various.settings=values + * @param rowsPerStmt number of rows to export in a single INSERT statement. + * @param statementsPerTx ## of statements to use in a transaction. * @return the argv as an array of strings. */ - protected String [] getArgv(boolean includeHadoopFlags, String... additionalArgv) { + protected String [] getArgv(boolean includeHadoopFlags, + int rowsPerStmt, int statementsPerTx, String... additionalArgv) { ArrayList args = new ArrayList(); if (includeHadoopFlags) { CommonArgs.addHadoopFlags(args); + args.add("-D"); + int realRowsPerStmt = Math.min(rowsPerStmt, getMaxRowsPerStatement()); + if (realRowsPerStmt != rowsPerStmt) { + LOG.warn("Rows per statement set to " + realRowsPerStmt + + " by getMaxRowsPerStatement() limit."); + } + args.add(ExportOutputFormat.RECORDS_PER_STATEMENT_KEY + "=" + + realRowsPerStmt); + args.add("-D"); + args.add(ExportOutputFormat.STATEMENTS_PER_TRANSACTION_KEY + "=" + + statementsPerTx); } // Any additional Hadoop flags (-D foo=bar) are prepended. @@ -201,6 +225,10 @@ protected void verifyExport(int expectedNumRecords) throws IOException, SQLExcep assertEquals("Got back unexpected row count", expectedNumRecords, actualNumRecords); + if (expectedNumRecords == 0) { + return; // Nothing more to verify. + } + // Check that we start with row 0. int minVal = getMinRowId(); assertEquals("Minimum row was not zero", 0, minVal);