diff --git a/build.xml b/build.xml
index 61e8aa53..0adfe00c 100644
--- a/build.xml
+++ b/build.xml
@@ -132,8 +132,9 @@
-
-
+
+
+
@@ -197,7 +198,7 @@
r =
+ (LobRef) super.clone();
+
+ r.reader = null; // Reference to opened reader is not duplicated.
+ if (null != data) {
+ r.data = deepCopyData();
+ }
+
+ return r;
+ }
+
@Override
protected synchronized void finalize() throws Throwable {
close();
@@ -202,6 +220,11 @@ protected abstract ACCESSORTYPE getExternalSource(LobFile.Reader reader)
*/
protected abstract DATATYPE getInternalData(CONTAINERTYPE data);
+ /**
+ * Make a copy of the materialized data.
+ */
+ protected abstract CONTAINERTYPE deepCopyData();
+
public DATATYPE getData() {
if (isExternal()) {
throw new RuntimeException(
diff --git a/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java b/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java
index a3aa0ce0..a4cdf881 100644
--- a/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java
+++ b/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java
@@ -21,16 +21,17 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
+import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.lib.db.DBWritable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
/**
* Interface implemented by the classes generated by sqoop's orm.ClassWriter.
*/
-public interface SqoopRecord extends DBWritable, Writable {
+public interface SqoopRecord extends Cloneable, DBWritable, Writable {
public void parse(CharSequence s) throws RecordParser.ParseError;
public void parse(Text s) throws RecordParser.ParseError;
public void parse(byte [] s) throws RecordParser.ParseError;
@@ -39,5 +40,13 @@ public interface SqoopRecord extends DBWritable, Writable {
public void parse(CharBuffer s) throws RecordParser.ParseError;
public void loadLargeObjects(LargeObjectLoader objLoader)
throws SQLException, IOException, InterruptedException;
+ public Object clone() throws CloneNotSupportedException;
+
+ /**
+ * Inserts the data in this object into the PreparedStatement, starting
+ * at parameter 'offset'.
+ * @return the number of fields written to the statement.
+ */
+ public int write(PreparedStatement stmt, int offset) throws SQLException;
}
diff --git a/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java b/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
index 4d69872e..26b65f75 100644
--- a/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
+++ b/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
@@ -33,8 +33,12 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.db.OracleDataDrivenDBInputFormat;
import org.apache.hadoop.sqoop.SqoopOptions;
+import org.apache.hadoop.sqoop.mapreduce.JdbcExportJob;
+import org.apache.hadoop.sqoop.shims.ShimLoader;
+import org.apache.hadoop.sqoop.util.ExportException;
import org.apache.hadoop.sqoop.util.ImportException;
/**
@@ -281,6 +285,7 @@ private void setSessionTimeZone(Connection conn) throws SQLException {
}
}
+ @Override
public void importTable(ImportJobContext context)
throws IOException, ImportException {
// Specify the Oracle-specific DBInputFormat for import.
@@ -288,6 +293,22 @@ public void importTable(ImportJobContext context)
super.importTable(context);
}
+ /**
+ * Export data stored in HDFS into a table in a database
+ */
+ public void exportTable(ExportJobContext context)
+ throws IOException, ExportException {
+ try {
+ JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
+ (Class extends OutputFormat>) ShimLoader.getShimClass(
+ "org.apache.hadoop.sqoop.mapreduce.OracleExportOutputFormat"));
+ exportJob.runExport();
+ } catch (ClassNotFoundException cnfe) {
+ throw new ExportException("Could not start export; could not find class",
+ cnfe);
+ }
+ }
+
@Override
public ResultSet readTable(String tableName, String[] columns) throws SQLException {
if (columns == null) {
diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJobBase.java b/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJobBase.java
index 25286c21..e716a889 100644
--- a/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJobBase.java
+++ b/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJobBase.java
@@ -37,7 +37,6 @@
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
-import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
@@ -188,6 +187,18 @@ protected Class extends InputFormat> getInputFormatClass()
}
}
+ @Override
+ protected Class extends OutputFormat> getOutputFormatClass()
+ throws ClassNotFoundException {
+ Class extends OutputFormat> configuredOF = super.getOutputFormatClass();
+ if (null == configuredOF) {
+ return (Class extends OutputFormat>) ShimLoader.getShimClass(
+ "org.apache.hadoop.sqoop.mapreduce.ExportOutputFormat");
+ } else {
+ return configuredOF;
+ }
+ }
+
@Override
protected void configureMapper(Job job, String tableName,
String tableClassName) throws ClassNotFoundException, IOException {
diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/hadoop/sqoop/mapreduce/JdbcExportJob.java
index cea778d1..3f75d947 100644
--- a/src/java/org/apache/hadoop/sqoop/mapreduce/JdbcExportJob.java
+++ b/src/java/org/apache/hadoop/sqoop/mapreduce/JdbcExportJob.java
@@ -25,18 +25,20 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.sqoop.ConnFactory;
import org.apache.hadoop.sqoop.manager.ConnManager;
import org.apache.hadoop.sqoop.manager.ExportJobContext;
-
+import org.apache.hadoop.sqoop.shims.ShimLoader;
/**
- * Run an export using JDBC (DBOutputFormat)
+ * Run an export using JDBC (JDBC-based ExportOutputFormat)
*/
public class JdbcExportJob extends ExportJobBase {
@@ -47,6 +49,13 @@ public JdbcExportJob(final ExportJobContext context) {
super(context);
}
+ public JdbcExportJob(final ExportJobContext ctxt,
+ final Class extends Mapper> mapperClass,
+ final Class extends InputFormat> inputFormatClass,
+ final Class extends OutputFormat> outputFormatClass) {
+ super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+ }
+
@Override
protected Class extends Mapper> getMapperClass() {
if (inputIsSequenceFiles()) {
@@ -81,8 +90,10 @@ protected void configureOutputFormat(Job job, String tableName,
}
DBOutputFormat.setOutput(job, tableName, colNames);
- job.setOutputFormatClass(DBOutputFormat.class);
+ job.setOutputFormatClass(getOutputFormatClass());
job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException("Could not load OutputFormat", cnfe);
} finally {
try {
mgr.close();
diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/MySQLExportJob.java b/src/java/org/apache/hadoop/sqoop/mapreduce/MySQLExportJob.java
index 564a3f38..3cecce2c 100644
--- a/src/java/org/apache/hadoop/sqoop/mapreduce/MySQLExportJob.java
+++ b/src/java/org/apache/hadoop/sqoop/mapreduce/MySQLExportJob.java
@@ -47,13 +47,7 @@ public class MySQLExportJob extends ExportJobBase {
LogFactory.getLog(MySQLExportJob.class.getName());
public MySQLExportJob(final ExportJobContext context) {
- super(context);
- }
-
- @Override
- protected Class extends OutputFormat> getOutputFormatClass() {
- // This job does not write to the OutputCollector. Disable it.
- return NullOutputFormat.class;
+ super(context, null, null, NullOutputFormat.class);
}
@Override
diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/SequenceFileExportMapper.java b/src/java/org/apache/hadoop/sqoop/mapreduce/SequenceFileExportMapper.java
index b04bac89..ef50bbaa 100644
--- a/src/java/org/apache/hadoop/sqoop/mapreduce/SequenceFileExportMapper.java
+++ b/src/java/org/apache/hadoop/sqoop/mapreduce/SequenceFileExportMapper.java
@@ -28,7 +28,7 @@
/**
* Reads a SqoopRecord from the SequenceFile in which it's packed and emits
- * that DBWritable to the DBOutputFormat for writeback to the database.
+ * that DBWritable to the OutputFormat for writeback to the database.
*/
public class SequenceFileExportMapper
extends AutoProgressMapper {
diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/TextExportMapper.java b/src/java/org/apache/hadoop/sqoop/mapreduce/TextExportMapper.java
index b70c8eea..511bdcef 100644
--- a/src/java/org/apache/hadoop/sqoop/mapreduce/TextExportMapper.java
+++ b/src/java/org/apache/hadoop/sqoop/mapreduce/TextExportMapper.java
@@ -33,7 +33,7 @@
/**
* Converts an input record from a string representation to a parsed Sqoop record
- * and emits that DBWritable to the DBOutputFormat for writeback to the database.
+ * and emits that DBWritable to the OutputFormat for writeback to the database.
*/
public class TextExportMapper
extends AutoProgressMapper {
diff --git a/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java b/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java
index b51d8f4e..46512d75 100644
--- a/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java
+++ b/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java
@@ -500,6 +500,10 @@ private void generateDbWrite(Map columnTypes, String [] colName
StringBuilder sb) {
sb.append(" public void write(PreparedStatement __dbStmt) throws SQLException {\n");
+ sb.append(" write(__dbStmt, 0);\n");
+ sb.append(" }\n\n");
+
+ sb.append(" public int write(PreparedStatement __dbStmt, int __off) throws SQLException {\n");
int fieldNum = 0;
@@ -520,9 +524,10 @@ private void generateDbWrite(Map columnTypes, String [] colName
}
sb.append(" JdbcWritableBridge." + setterMethod + "(" + col + ", "
- + fieldNum + ", " + sqlType + ", __dbStmt);\n");
+ + fieldNum + " + __off, " + sqlType + ", __dbStmt);\n");
}
+ sb.append(" return " + fieldNum + ";\n");
sb.append(" }\n");
}
@@ -559,7 +564,48 @@ private void generateHadoopRead(Map columnTypes, String [] colN
}
/**
- * Generate the toString() method
+ * Generate the clone() method.
+ * @param columnTypes - mapping from column names to sql types
+ * @param colNames - ordered list of column names for table.
+ * @param sb - StringBuilder to append code to
+ */
+ private void generateCloneMethod(Map columnTypes,
+ String [] colNames, StringBuilder sb) {
+
+ TableClassName tableNameInfo = new TableClassName(options);
+ String className = tableNameInfo.getShortClassForTable(tableName);
+
+ sb.append(" public Object clone() throws CloneNotSupportedException {\n");
+ sb.append(" " + className + " o = (" + className + ") super.clone();\n");
+
+ // For each field that is mutable, we need to perform the deep copy.
+ for (String colName : colNames) {
+ int sqlType = columnTypes.get(colName);
+ String javaType = connManager.toJavaType(sqlType);
+ if (null == javaType) {
+ continue;
+ } else if (javaType.equals("java.sql.Date")
+ || javaType.equals("java.sql.Time")
+ || javaType.equals("java.sql.Timestamp")
+ || javaType.equals(ClobRef.class.getName())
+ || javaType.equals(BlobRef.class.getName())) {
+ sb.append(" o." + colName + " = (" + javaType
+ + ") o." + colName + ".clone();\n");
+ } else if (javaType.equals(BytesWritable.class.getName())) {
+ sb.append(" o." + colName + " = new BytesWritable("
+ + "Arrays.copyOf(" + colName + ".getBytes(), "
+ + colName + ".getLength()));\n");
+ }
+ }
+
+ sb.append(" return o;\n");
+ sb.append(" }\n");
+
+
+ }
+
+ /**
+ * Generate the toString() method.
* @param columnTypes - mapping from column names to sql types
* @param colNames - ordered list of column names for table.
* @param sb - StringBuilder to append code to
@@ -904,6 +950,7 @@ public StringBuilder generateClassForColumns(Map columnTypes,
sb.append("import java.sql.Date;\n");
sb.append("import java.sql.Time;\n");
sb.append("import java.sql.Timestamp;\n");
+ sb.append("import java.util.Arrays;\n");
sb.append("import java.util.Iterator;\n");
sb.append("import java.util.List;\n");
sb.append("\n");
@@ -921,6 +968,8 @@ public StringBuilder generateClassForColumns(Map columnTypes,
generateHadoopWrite(columnTypes, colNames, sb);
generateToString(columnTypes, colNames, sb);
generateParser(columnTypes, colNames, sb);
+ generateCloneMethod(columnTypes, colNames, sb);
+
// TODO(aaron): Generate hashCode(), compareTo(), equals() so it can be a WritableComparable
sb.append("}\n");
diff --git a/src/perftest/ExportStressTest.java b/src/perftest/ExportStressTest.java
new file mode 100644
index 00000000..d95365f7
--- /dev/null
+++ b/src/perftest/ExportStressTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.*;
+import java.sql.*;
+import java.util.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.*;
+
+import org.apache.hadoop.sqoop.Sqoop;
+import org.apache.hadoop.sqoop.SqoopOptions;
+import org.apache.hadoop.sqoop.tool.ExportTool;
+import org.apache.hadoop.sqoop.tool.SqoopTool;
+
+/**
+ * Stress test export procedure by running a large-scale export to MySQL.
+ * This requires MySQL be configured with a database that can be accessed
+ * by the specified username without a password. The user must be able to
+ * create and drop tables in the database.
+ *
+ * Run with: src/scripts/run-perftest.sh ExportStressTest (connect-str) (username)
+ */
+public class ExportStressTest extends Configured implements Tool {
+
+ // Export 10 GB of data. Each record is ~100 bytes.
+ public final static int NUM_FILES = 10;
+ public final static int RECORDS_PER_FILE = 10 * 1024 * 1024;
+
+ public final static String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+
+ public ExportStressTest() {
+ }
+
+ public void createFile(int fileId) throws IOException {
+ Configuration conf = getConf();
+ FileSystem fs = FileSystem.get(conf);
+ Path dirPath = new Path("ExportStressTest");
+ fs.mkdirs(dirPath);
+ Path filePath = new Path(dirPath, "input-" + fileId);
+
+ OutputStream os = fs.create(filePath);
+ Writer w = new BufferedWriter(new OutputStreamWriter(os));
+ for (int i = 0; i < RECORDS_PER_FILE; i++) {
+ long v = (long) i + ((long) RECORDS_PER_FILE * (long) fileId);
+ w.write("" + v + "," + ALPHABET + ALPHABET + ALPHABET + ALPHABET + "\n");
+
+ }
+ w.close();
+ os.close();
+ }
+
+ /** Create a set of data files to export. */
+ public void createData() throws IOException {
+ Configuration conf = getConf();
+ FileSystem fs = FileSystem.get(conf);
+ Path dirPath = new Path("ExportStressTest");
+ if (fs.exists(dirPath)) {
+ System.out.println(
+ "Export directory appears to already exist. Skipping data-gen.");
+ return;
+ }
+
+ for (int i = 0; i < NUM_FILES; i++) {
+ createFile(i);
+ }
+ }
+
+ /** Create a table to hold our results. Drop any existing definition. */
+ public void createTable(String connectStr, String username) throws Exception {
+ Class.forName("com.mysql.jdbc.Driver"); // Load mysql driver.
+
+ Connection conn = DriverManager.getConnection(connectStr, username, null);
+ conn.setAutoCommit(false);
+ PreparedStatement stmt = conn.prepareStatement(
+ "DROP TABLE IF EXISTS ExportStressTestTable", ResultSet.TYPE_FORWARD_ONLY,
+ ResultSet.CONCUR_READ_ONLY);
+ stmt.executeUpdate();
+ stmt.close();
+
+ stmt = conn.prepareStatement(
+ "CREATE TABLE ExportStressTestTable(id INT NOT NULL PRIMARY KEY, "
+ + "msg VARCHAR(110)) Engine=InnoDB", ResultSet.TYPE_FORWARD_ONLY,
+ ResultSet.CONCUR_READ_ONLY);
+ stmt.executeUpdate();
+ stmt.close();
+ conn.commit();
+ conn.close();
+ }
+
+ /** Actually run the export of the generated data to the user-created table. */
+ public void runExport(String connectStr, String username) throws Exception {
+ SqoopOptions options = new SqoopOptions(getConf());
+ options.setConnectString(connectStr);
+ options.setTableName("ExportStressTestTable");
+ options.setUsername(username);
+ options.setExportDir("ExportStressTest");
+ options.setNumMappers(4);
+ options.setLinesTerminatedBy('\n');
+ options.setFieldsTerminatedBy(',');
+ options.setExplicitDelims(true);
+
+ SqoopTool exportTool = new ExportTool();
+ Sqoop sqoop = new Sqoop(exportTool, getConf(), options);
+ int ret = Sqoop.runSqoop(sqoop, new String[0]);
+ if (0 != ret) {
+ throw new Exception("Error doing export; ret=" + ret);
+ }
+ }
+
+ @Override
+ public int run(String [] args) {
+ String connectStr = args[0];
+ String username = args[1];
+
+ try {
+ createData();
+ createTable(connectStr, username);
+ runExport(connectStr, username);
+ } catch (Exception e) {
+ System.err.println("Error: " + StringUtils.stringifyException(e));
+ return 1;
+ }
+
+ return 0;
+ }
+
+ public static void main(String [] args) throws Exception {
+ ExportStressTest test = new ExportStressTest();
+ int ret = ToolRunner.run(test, args);
+ System.exit(ret);
+ }
+}
diff --git a/src/shims/common/org/apache/hadoop/sqoop/mapreduce/ExportOutputFormat.java b/src/shims/common/org/apache/hadoop/sqoop/mapreduce/ExportOutputFormat.java
new file mode 100644
index 00000000..b3831ea7
--- /dev/null
+++ b/src/shims/common/org/apache/hadoop/sqoop/mapreduce/ExportOutputFormat.java
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.SynchronousQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
+import org.apache.hadoop.util.StringUtils;
+
+import org.apache.hadoop.sqoop.lib.SqoopRecord;
+
+/**
+ * Insert the emitted keys as records into a database table.
+ * This supports a configurable "spill threshold" at which
+ * point intermediate transactions are committed.
+ *
+ * Record objects are buffered before actually performing the INSERT
+ * statements; this requires that the key implement the
+ * SqoopRecord interface.
+ *
+ * Uses DBOutputFormat/DBConfiguration for configuring the output.
+ */
+public class ExportOutputFormat
+ extends OutputFormat {
+
+ /** conf key: number of rows to export per INSERT statement. */
+ public static final String RECORDS_PER_STATEMENT_KEY =
+ "sqoop.export.records.per.statement";
+
+ /** conf key: number of INSERT statements to bundle per tx.
+ * If this is set to -1, then a single transaction will be used
+ * per task. Note that each statement may encompass multiple
+ * rows, depending on the value of sqoop.export.records.per.statement.
+ */
+ public static final String STATEMENTS_PER_TRANSACTION_KEY =
+ "sqoop.export.statements.per.transaction";
+
+ private static final int DEFAULT_RECORDS_PER_STATEMENT = 100;
+ private static final int DEFAULT_STATEMENTS_PER_TRANSACTION = 100;
+ private static final int UNLIMITED_STATEMENTS_PER_TRANSACTION = -1;
+
+ private static final Log LOG = LogFactory.getLog(ExportOutputFormat.class);
+
+ /** {@inheritDoc} */
+ public void checkOutputSpecs(JobContext context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ DBConfiguration dbConf = new DBConfiguration(conf);
+
+ // Sanity check all the configuration values we need.
+ if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
+ throw new IOException("Database connection URL is not set.");
+ } else if (null == dbConf.getOutputTableName()) {
+ throw new IOException("Table name is not set for export");
+ } else if (null == dbConf.getOutputFieldNames()
+ && 0 == dbConf.getOutputFieldCount()) {
+ throw new IOException(
+ "Output field names are null and zero output field count set.");
+ }
+ }
+
+ /** {@inheritDoc} */
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new OutputCommitter() {
+ public void abortTask(TaskAttemptContext taskContext) { }
+ public void cleanupJob(JobContext jobContext) { }
+ public void commitTask(TaskAttemptContext taskContext) { }
+ public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+ return false;
+ }
+ public void setupJob(JobContext jobContext) { }
+ public void setupTask(TaskAttemptContext taskContext) { }
+ };
+ }
+
+ /** {@inheritDoc} */
+ public RecordWriter getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ try {
+ return new ExportRecordWriter(context);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Represents a database update operation that should be performed
+ * by an asynchronous background thread.
+ * AsyncDBOperation objects are immutable.
+ * They MAY contain a statement which should be executed. The
+ * statement may also be null.
+ *
+ * They may also set 'forceCommit' to true. If true, then the
+ * executor of this operation should commit the current
+ * transaction, even if stmt is null.
+ */
+ private static class AsyncDBOperation {
+ private final PreparedStatement stmt;
+ private final boolean forceCommit;
+ private final boolean close;
+
+ /**
+ * Create an asynchronous database operation.
+ * @param s the statement, if any, to execute.
+ * @param forceCommit if true, the current transaction should be committed.
+ * @param close if true, the executor thread should stop after processing
+ * this operation.
+ */
+ public AsyncDBOperation(PreparedStatement s, boolean forceCommit,
+ boolean close) {
+ this.stmt = s;
+ this.forceCommit = forceCommit;
+ this.close = close;
+ }
+
+ /**
+ * @return a statement to run as an update.
+ */
+ public PreparedStatement getStatement() {
+ return stmt;
+ }
+
+ /**
+ * @return true if the executor should commit the current transaction.
+ * If getStatement() is non-null, the statement is run first.
+ */
+ public boolean requiresCommit() {
+ return forceCommit;
+ }
+
+ /**
+ * @return true if the executor should stop after this command.
+ */
+ public boolean stop() {
+ return this.close;
+ }
+ }
+
+ /**
+ * A thread that runs the database interactions asynchronously
+ * from the OutputCollector.
+ */
+ private static class ExportUpdateThread extends Thread {
+
+ private final Connection conn; // The connection to the database.
+ private SQLException err; // Error from a previously-run statement.
+
+ // How we receive database operations from the RecordWriter.
+ private SynchronousQueue opsQueue;
+
+ protected int curNumStatements; // statements executed thus far in the tx.
+ protected final int stmtsPerTx; // statements per transaction.
+
+ /**
+ * Create a new update thread that interacts with the database.
+ * @param conn the connection to use. This must only be used by this
+ * thread.
+ * @param stmtsPerTx the number of statements to execute before committing
+ * the current transaction.
+ */
+ public ExportUpdateThread(Connection conn, int stmtsPerTx) {
+ this.conn = conn;
+ this.err = null;
+ this.opsQueue = new SynchronousQueue();
+ this.stmtsPerTx = stmtsPerTx;
+ }
+
+ public void run() {
+ while (true) {
+ AsyncDBOperation op = null;
+ try {
+ op = opsQueue.take();
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted retrieving from operation queue: "
+ + StringUtils.stringifyException(ie));
+ continue;
+ }
+
+ if (null == op) {
+ // This shouldn't be allowed to happen.
+ LOG.warn("Null operation in queue; illegal state.");
+ continue;
+ }
+
+ PreparedStatement stmt = op.getStatement();
+ // Synchronize on the connection to ensure it does not conflict
+ // with the prepareStatement() call in the main thread.
+ synchronized (conn) {
+ try {
+ if (null != stmt) {
+ stmt.executeUpdate();
+ stmt.close();
+ stmt = null;
+ this.curNumStatements++;
+ }
+
+ if (op.requiresCommit() || (curNumStatements >= stmtsPerTx
+ && stmtsPerTx != UNLIMITED_STATEMENTS_PER_TRANSACTION)) {
+ LOG.debug("Committing transaction of " + curNumStatements
+ + " statements");
+ this.conn.commit();
+ this.curNumStatements = 0;
+ }
+ } catch (SQLException sqlE) {
+ setLastError(sqlE);
+ } finally {
+ // Close the statement on our way out if that didn't happen
+ // via the normal execution path.
+ if (null != stmt) {
+ try {
+ stmt.close();
+ } catch (SQLException sqlE) {
+ setLastError(sqlE);
+ }
+ }
+
+ // Always check whether we should end the loop, regardless
+ // of the presence of an exception.
+ if (op.stop()) {
+ // Don't continue processing after this operation.
+ try {
+ conn.close();
+ } catch (SQLException sqlE) {
+ setLastError(sqlE);
+ }
+ return;
+ }
+ } // try .. catch .. finally.
+ } // synchronized (conn)
+ }
+ }
+
+ /**
+ * Allows a user to enqueue the next database operation to run.
+ * Since the connection can only execute a single operation at a time,
+ * the put() method may block if another operation is already underway.
+ * @param op the database operation to perform.
+ */
+ public void put(AsyncDBOperation op) throws InterruptedException {
+ opsQueue.put(op);
+ }
+
+ /**
+ * If a previously-executed statement resulted in an error, post it here.
+ * If the error slot was already filled, then subsequent errors are
+ * squashed until the user calls this method (which clears the error
+ * slot).
+ * @return any SQLException that occurred due to a previously-run
+ * statement.
+ */
+ public synchronized SQLException getLastError() {
+ SQLException e = this.err;
+ this.err = null;
+ return e;
+ }
+
+ private synchronized void setLastError(SQLException e) {
+ if (this.err == null) {
+ // Just set it.
+ LOG.error("Got exception in update thread: "
+ + StringUtils.stringifyException(e));
+ this.err = e;
+ } else {
+ // Slot is full. Log it and discard.
+ LOG.error("SQLException in update thread but error slot full: "
+ + StringUtils.stringifyException(e));
+ }
+ }
+ }
+
+ /**
+ * RecordWriter to write the output to a row in a database table.
+ * The actual database updates are executed in a second thread.
+ */
+ public class ExportRecordWriter extends RecordWriter {
+
+ protected Connection connection;
+
+ protected Configuration conf;
+
+ protected int rowsPerStmt; // rows to insert per statement.
+
+ // Buffer for records to be put in an INSERT statement.
+ protected List records;
+
+ protected String tableName;
+ protected String [] columnNames; // The columns to insert into.
+ protected int columnCount; // If columnNames is null, tells ## of cols.
+
+ // Background thread to actually perform the updates.
+ private ExportUpdateThread updateThread;
+ private boolean startedUpdateThread;
+
+ public ExportRecordWriter(TaskAttemptContext context)
+ throws ClassNotFoundException, SQLException {
+ this.conf = context.getConfiguration();
+
+ this.rowsPerStmt = conf.getInt(RECORDS_PER_STATEMENT_KEY,
+ DEFAULT_RECORDS_PER_STATEMENT);
+ int stmtsPerTx = conf.getInt(STATEMENTS_PER_TRANSACTION_KEY,
+ DEFAULT_STATEMENTS_PER_TRANSACTION);
+
+ DBConfiguration dbConf = new DBConfiguration(conf);
+ this.connection = dbConf.getConnection();
+ this.tableName = dbConf.getOutputTableName();
+ this.columnNames = dbConf.getOutputFieldNames();
+ this.columnCount = dbConf.getOutputFieldCount();
+
+ this.connection.setAutoCommit(false);
+
+ this.records = new ArrayList(this.rowsPerStmt);
+
+ this.updateThread = new ExportUpdateThread(connection, stmtsPerTx);
+ this.updateThread.setDaemon(true);
+ this.startedUpdateThread = false;
+ }
+
+ /**
+ * @return an INSERT statement suitable for inserting 'numRows' rows.
+ */
+ protected String getInsertStatement(int numRows) {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("INSERT INTO " + tableName + " ");
+
+ int numSlots;
+ if (this.columnNames != null) {
+ numSlots = this.columnNames.length;
+
+ sb.append("(");
+ boolean first = true;
+ for (String col : columnNames) {
+ if (!first) {
+ sb.append(", ");
+ }
+
+ sb.append(col);
+ first = false;
+ }
+
+ sb.append(") ");
+ } else {
+ numSlots = this.columnCount; // set if columnNames is null.
+ }
+
+ sb.append("VALUES ");
+
+ // generates the (?, ?, ?...) used for each row.
+ StringBuilder sbRow = new StringBuilder();
+ sbRow.append("(");
+ for (int i = 0; i < numSlots; i++) {
+ if (i != 0) {
+ sbRow.append(", ");
+ }
+
+ sbRow.append("?");
+ }
+ sbRow.append(")");
+
+ // Now append that numRows times.
+ for (int i = 0; i < numRows; i++) {
+ if (i != 0) {
+ sb.append(", ");
+ }
+
+ sb.append(sbRow);
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * Takes the current contents of 'records' and formats and executes the
+ * INSERT statement.
+ * @param closeConn if true, commits the transaction and closes the
+ * connection.
+ */
+ private void insertRows(boolean closeConn)
+ throws InterruptedException, SQLException {
+
+ if (!startedUpdateThread) {
+ this.updateThread.start();
+ this.startedUpdateThread = true;
+ }
+
+ PreparedStatement stmt = null;
+ boolean successfulPut = false;
+ try {
+ if (records.size() > 0) {
+ // Synchronize on connection to ensure this does not conflict
+ // with the operations in the update thread.
+ synchronized (connection) {
+ stmt = connection.prepareStatement(
+ getInsertStatement(records.size()));
+ }
+
+ // Inject the record parameters into the VALUES clauses.
+ int position = 0;
+ for (SqoopRecord record : records) {
+ position += record.write(stmt, position);
+ }
+
+ this.records.clear();
+ }
+
+ // Pass this operation off to the update thread. This will block if
+ // the update thread is already performing an update.
+ AsyncDBOperation op = new AsyncDBOperation(stmt, closeConn, closeConn);
+ updateThread.put(op);
+ successfulPut = true; // op has been posted to the other thread.
+ } finally {
+ if (!successfulPut && null != stmt) {
+ // We created a statement but failed to enqueue it. Close it.
+ stmt.close();
+ }
+ }
+
+ // Check for any previous SQLException. If one happened, rethrow it here.
+ SQLException lastException = updateThread.getLastError();
+ if (null != lastException) {
+ throw lastException;
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void close(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ try {
+ insertRows(true);
+ } catch (SQLException sqle) {
+ throw new IOException(sqle);
+ } finally {
+ updateThread.join();
+ }
+
+ // If we're not leaving on an error return path already,
+ // now that updateThread is definitely stopped, check that the
+ // error slot remains empty.
+ SQLException lastErr = updateThread.getLastError();
+ if (null != lastErr) {
+ throw new IOException(lastErr);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void write(K key, V value)
+ throws InterruptedException, IOException {
+ try {
+ records.add((SqoopRecord) key.clone());
+ if (records.size() >= this.rowsPerStmt) {
+ insertRows(false);
+ }
+ } catch (CloneNotSupportedException cnse) {
+ throw new IOException("Could not buffer record", cnse);
+ } catch (SQLException sqlException) {
+ throw new IOException(sqlException);
+ }
+ }
+ }
+}
diff --git a/src/shims/common/org/apache/hadoop/sqoop/mapreduce/OracleExportOutputFormat.java b/src/shims/common/org/apache/hadoop/sqoop/mapreduce/OracleExportOutputFormat.java
new file mode 100644
index 00000000..dd2d0830
--- /dev/null
+++ b/src/shims/common/org/apache/hadoop/sqoop/mapreduce/OracleExportOutputFormat.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import org.apache.hadoop.sqoop.lib.SqoopRecord;
+
+/**
+ * Oracle-specific SQL formatting overrides default ExportOutputFormat's.
+ */
+public class OracleExportOutputFormat
+ extends ExportOutputFormat {
+
+ /** {@inheritDoc} */
+ public RecordWriter getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ try {
+ return new OracleExportRecordWriter(context);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * RecordWriter to write the output to a row in a database table.
+ * The actual database updates are executed in a second thread.
+ */
+ public class OracleExportRecordWriter extends ExportRecordWriter {
+
+ public OracleExportRecordWriter(TaskAttemptContext context)
+ throws ClassNotFoundException, SQLException {
+ super(context);
+ }
+
+ @Override
+ /**
+ * @return an INSERT statement suitable for inserting 'numRows' rows.
+ */
+ protected String getInsertStatement(int numRows) {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("INSERT INTO " + tableName + " ");
+
+ int numSlots;
+ if (this.columnNames != null) {
+ numSlots = this.columnNames.length;
+
+ sb.append("(");
+ boolean first = true;
+ for (String col : columnNames) {
+ if (!first) {
+ sb.append(", ");
+ }
+
+ sb.append(col);
+ first = false;
+ }
+
+ sb.append(") ");
+ } else {
+ numSlots = this.columnCount; // set if columnNames is null.
+ }
+
+ // generates the (?, ?, ?...) used for each row.
+ StringBuilder sbRow = new StringBuilder();
+ sbRow.append("SELECT ");
+ for (int i = 0; i < numSlots; i++) {
+ if (i != 0) {
+ sbRow.append(", ");
+ }
+
+ sbRow.append("?");
+ }
+ sbRow.append(" FROM DUAL ");
+
+ // Now append that numRows times.
+ for (int i = 0; i < numRows; i++) {
+ if (i != 0) {
+ sb.append("UNION ALL ");
+ }
+
+ sb.append(sbRow);
+ }
+
+ return sb.toString();
+ }
+ }
+}
diff --git a/src/test/org/apache/hadoop/sqoop/TestExport.java b/src/test/org/apache/hadoop/sqoop/TestExport.java
index 76e0edf7..ec0e4e70 100644
--- a/src/test/org/apache/hadoop/sqoop/TestExport.java
+++ b/src/test/org/apache/hadoop/sqoop/TestExport.java
@@ -119,7 +119,7 @@ private String getRecordLine(int recordNum, ColumnGenerator... extraCols) {
as what the string representation of the column as returned by
the database should look like.
*/
- interface ColumnGenerator {
+ public interface ColumnGenerator {
/** for a row with id rowNum, what should we write into that
line of the text file to export?
*/
@@ -400,13 +400,20 @@ protected void multiFileTest(int numFiles, int recordsPerMap, int numMaps,
}
createTable();
- runExport(getArgv(true, newStrArray(argv, "-m", "" + numMaps)));
+ runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + numMaps)));
verifyExport(TOTAL_RECORDS);
} finally {
LOG.info("multi-file test complete");
}
}
+ /**
+ * Run an "export" on an empty file.
+ */
+ public void testEmptyExport() throws IOException, SQLException {
+ multiFileTest(1, 0, 1);
+ }
+
/** Export 10 rows, make sure they load in correctly */
public void testTextExport() throws IOException, SQLException {
multiFileTest(1, 10, 1);
@@ -435,11 +442,45 @@ public void testGzipExport() throws IOException, SQLException {
createTextFile(0, TOTAL_RECORDS, true);
createTable();
- runExport(getArgv(true));
+ runExport(getArgv(true, 10, 10));
verifyExport(TOTAL_RECORDS);
LOG.info("Complete gzip export test");
}
+ /**
+ * Ensure that we use multiple statements in a transaction.
+ */
+ public void testMultiStatement() throws IOException, SQLException {
+ final int TOTAL_RECORDS = 20;
+ createTextFile(0, TOTAL_RECORDS, true);
+ createTable();
+ runExport(getArgv(true, 10, 10));
+ verifyExport(TOTAL_RECORDS);
+ }
+
+ /**
+ * Ensure that we use multiple transactions in a single mapper.
+ */
+ public void testMultiTransaction() throws IOException, SQLException {
+ final int TOTAL_RECORDS = 20;
+ createTextFile(0, TOTAL_RECORDS, true);
+ createTable();
+ runExport(getArgv(true, 5, 2));
+ verifyExport(TOTAL_RECORDS);
+ }
+
+ /**
+ * Ensure that when we don't force a commit with a statement cap,
+ * it happens anyway.
+ */
+ public void testUnlimitedTransactionSize() throws IOException, SQLException {
+ final int TOTAL_RECORDS = 20;
+ createTextFile(0, TOTAL_RECORDS, true);
+ createTable();
+ runExport(getArgv(true, 5, -1));
+ verifyExport(TOTAL_RECORDS);
+ }
+
/** Run 2 mappers, make sure all records load in correctly */
public void testMultiMapTextExport() throws IOException, SQLException {
@@ -451,7 +492,7 @@ public void testMultiMapTextExport() throws IOException, SQLException {
}
createTable();
- runExport(getArgv(true));
+ runExport(getArgv(true, 10, 10));
verifyExport(RECORDS_PER_MAP * NUM_FILES);
}
@@ -508,7 +549,8 @@ public void testSequenceFileExport() throws Exception {
// Now run and verify the export.
LOG.info("Exporting SequenceFile-based data");
- runExport(getArgv(true, "--class-name", className, "--jar-file", jarFileName));
+ runExport(getArgv(true, 10, 10, "--class-name", className,
+ "--jar-file", jarFileName));
verifyExport(TOTAL_RECORDS);
} finally {
if (null != prevClassLoader) {
@@ -535,11 +577,16 @@ public String getType() {
createTextFile(0, TOTAL_RECORDS, false, gen);
createTable(gen);
- runExport(getArgv(true));
+ runExport(getArgv(true, 10, 10));
verifyExport(TOTAL_RECORDS);
assertColMinAndMax(forIdx(0), gen);
}
+ /** @return the column type for a large integer */
+ protected String getBigIntType() {
+ return "BIGINT";
+ }
+
public void testBigIntCol() throws IOException, SQLException {
final int TOTAL_RECORDS = 10;
@@ -554,18 +601,18 @@ public String getVerifyText(int rowNum) {
return "" + val;
}
public String getType() {
- return "BIGINT";
+ return getBigIntType();
}
};
createTextFile(0, TOTAL_RECORDS, false, gen);
createTable(gen);
- runExport(getArgv(true));
+ runExport(getArgv(true, 10, 10));
verifyExport(TOTAL_RECORDS);
assertColMinAndMax(forIdx(0), gen);
}
- private String pad(int n) {
+ protected String pad(int n) {
if (n <= 9) {
return "0" + n;
} else {
@@ -573,10 +620,11 @@ private String pad(int n) {
}
}
- public void testDatesAndTimes() throws IOException, SQLException {
- final int TOTAL_RECORDS = 10;
-
- ColumnGenerator genDate = new ColumnGenerator() {
+ /**
+ * Get a column generator for DATE columns
+ */
+ protected ColumnGenerator getDateColumnGenerator() {
+ return new ColumnGenerator() {
public String getExportText(int rowNum) {
int day = rowNum + 1;
return "2009-10-" + day;
@@ -589,8 +637,13 @@ public String getType() {
return "DATE";
}
};
+ }
- ColumnGenerator genTime = new ColumnGenerator() {
+ /**
+ * Get a column generator for TIME columns.
+ */
+ protected ColumnGenerator getTimeColumnGenerator() {
+ return new ColumnGenerator() {
public String getExportText(int rowNum) {
return "10:01:" + rowNum;
}
@@ -601,10 +654,17 @@ public String getType() {
return "TIME";
}
};
+ }
+
+ public void testDatesAndTimes() throws IOException, SQLException {
+ final int TOTAL_RECORDS = 10;
+
+ ColumnGenerator genDate = getDateColumnGenerator();
+ ColumnGenerator genTime = getTimeColumnGenerator();
createTextFile(0, TOTAL_RECORDS, false, genDate, genTime);
createTable(genDate, genTime);
- runExport(getArgv(true));
+ runExport(getArgv(true, 10, 10));
verifyExport(TOTAL_RECORDS);
assertColMinAndMax(forIdx(0), genDate);
assertColMinAndMax(forIdx(1), genTime);
@@ -647,7 +707,7 @@ public String getType() {
createTextFile(0, TOTAL_RECORDS, false, genFloat, genNumeric);
createTable(genFloat, genNumeric);
- runExport(getArgv(true));
+ runExport(getArgv(true, 10, 10));
verifyExport(TOTAL_RECORDS);
assertColMinAndMax(forIdx(0), genFloat);
assertColMinAndMax(forIdx(1), genNumeric);
diff --git a/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java b/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java
index 05cf3546..c51deed1 100644
--- a/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java
+++ b/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java
@@ -24,8 +24,10 @@
import org.apache.hadoop.sqoop.manager.DirectMySQLTest;
import org.apache.hadoop.sqoop.manager.DirectMySQLExportTest;
+import org.apache.hadoop.sqoop.manager.JdbcMySQLExportTest;
import org.apache.hadoop.sqoop.manager.MySQLAuthTest;
import org.apache.hadoop.sqoop.manager.MySQLCompatTest;
+import org.apache.hadoop.sqoop.manager.OracleExportTest;
import org.apache.hadoop.sqoop.manager.OracleManagerTest;
import org.apache.hadoop.sqoop.manager.OracleCompatTest;
import org.apache.hadoop.sqoop.manager.PostgresqlTest;
@@ -44,8 +46,10 @@ public static Test suite() {
+ "implementations in Sqoop");
suite.addTestSuite(DirectMySQLTest.class);
suite.addTestSuite(DirectMySQLExportTest.class);
+ suite.addTestSuite(JdbcMySQLExportTest.class);
suite.addTestSuite(MySQLAuthTest.class);
suite.addTestSuite(MySQLCompatTest.class);
+ suite.addTestSuite(OracleExportTest.class);
suite.addTestSuite(OracleManagerTest.class);
suite.addTestSuite(OracleCompatTest.class);
suite.addTestSuite(PostgresqlTest.class);
diff --git a/src/test/org/apache/hadoop/sqoop/manager/DirectMySQLExportTest.java b/src/test/org/apache/hadoop/sqoop/manager/DirectMySQLExportTest.java
index ad976c45..047769b6 100644
--- a/src/test/org/apache/hadoop/sqoop/manager/DirectMySQLExportTest.java
+++ b/src/test/org/apache/hadoop/sqoop/manager/DirectMySQLExportTest.java
@@ -81,34 +81,13 @@ public void setUp() {
options.setUsername(MySQLTestUtils.getCurrentUser());
this.manager = new DirectMySQLManager(options);
- Statement st = null;
-
try {
this.conn = manager.getConnection();
this.conn.setAutoCommit(false);
- st = this.conn.createStatement();
-
- // create the database table.
- st.executeUpdate("DROP TABLE IF EXISTS " + getTableName());
- st.executeUpdate("CREATE TABLE " + getTableName() + " ("
- + "id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, "
- + "name VARCHAR(24) NOT NULL, "
- + "start_date DATE, "
- + "salary FLOAT, "
- + "dept VARCHAR(32))");
- this.conn.commit();
} catch (SQLException sqlE) {
LOG.error("Encountered SQL Exception: " + sqlE);
sqlE.printStackTrace();
fail("SQLException when running test setUp(): " + sqlE);
- } finally {
- try {
- if (null != st) {
- st.close();
- }
- } catch (SQLException sqlE) {
- LOG.warn("Got SQLException when closing connection: " + sqlE);
- }
}
}
@@ -153,11 +132,12 @@ public void tearDown() {
@Override
protected String [] getArgv(boolean includeHadoopFlags,
- String... additionalArgv) {
+ int rowsPerStatement, int statementsPerTx, String... additionalArgv) {
String [] subArgv = newStrArray(additionalArgv, "--direct",
"--username", MySQLTestUtils.getCurrentUser());
- return super.getArgv(includeHadoopFlags, subArgv);
+ return super.getArgv(includeHadoopFlags, rowsPerStatement,
+ statementsPerTx, subArgv);
}
/**
diff --git a/src/test/org/apache/hadoop/sqoop/manager/JdbcMySQLExportTest.java b/src/test/org/apache/hadoop/sqoop/manager/JdbcMySQLExportTest.java
new file mode 100644
index 00000000..5783abb4
--- /dev/null
+++ b/src/test/org/apache/hadoop/sqoop/manager/JdbcMySQLExportTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.manager;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.sqoop.SqoopOptions;
+import org.apache.hadoop.sqoop.TestExport;
+import org.apache.hadoop.sqoop.mapreduce.MySQLExportMapper;
+
+/**
+ * Test the MySQLManager implementation's exportJob() functionality.
+ * This does a better test of ExportOutputFormat than TestExport does,
+ * because it supports multi-row INSERT statements.
+ */
+public class JdbcMySQLExportTest extends TestExport {
+
+ public static final Log LOG = LogFactory.getLog(
+ JdbcMySQLExportTest.class.getName());
+
+ static final String TABLE_PREFIX = "EXPORT_MYSQL_J_";
+
+ // instance variables populated during setUp, used during tests.
+ private MySQLManager manager;
+ private Connection conn;
+
+ @Override
+ protected Connection getConnection() {
+ return conn;
+ }
+
+ // MySQL allows multi-row INSERT statements.
+ @Override
+ protected int getMaxRowsPerStatement() {
+ return 1000;
+ }
+
+ @Override
+ protected boolean useHsqldbTestServer() {
+ return false;
+ }
+
+ @Override
+ protected String getConnectString() {
+ return MySQLTestUtils.CONNECT_STRING;
+ }
+
+ @Override
+ protected String getTablePrefix() {
+ return TABLE_PREFIX;
+ }
+
+ @Override
+ protected String getDropTableStatement(String tableName) {
+ return "DROP TABLE IF EXISTS " + tableName;
+ }
+
+ @Before
+ public void setUp() {
+ super.setUp();
+
+ SqoopOptions options = new SqoopOptions(MySQLTestUtils.CONNECT_STRING,
+ getTableName());
+ options.setUsername(MySQLTestUtils.getCurrentUser());
+ this.manager = new MySQLManager(options);
+ try {
+ this.conn = manager.getConnection();
+ this.conn.setAutoCommit(false);
+ } catch (SQLException sqlE) {
+ LOG.error(StringUtils.stringifyException(sqlE));
+ fail("Failed with sql exception in setup: " + sqlE);
+ }
+ }
+
+ @After
+ public void tearDown() {
+ super.tearDown();
+
+ if (null != this.conn) {
+ try {
+ this.conn.close();
+ } catch (SQLException sqlE) {
+ LOG.error("Got SQLException closing conn: " + sqlE.toString());
+ }
+ }
+
+ if (null != manager) {
+ try {
+ manager.close();
+ manager = null;
+ } catch (SQLException sqlE) {
+ LOG.error("Got SQLException: " + sqlE.toString());
+ fail("Got SQLException: " + sqlE.toString());
+ }
+ }
+ }
+
+ @Override
+ protected String [] getCodeGenArgv(String... extraArgs) {
+
+ String [] moreArgs = new String[extraArgs.length + 2];
+ int i = 0;
+ for (i = 0; i < extraArgs.length; i++) {
+ moreArgs[i] = extraArgs[i];
+ }
+
+ // Add username argument for mysql.
+ moreArgs[i++] = "--username";
+ moreArgs[i++] = MySQLTestUtils.getCurrentUser();
+
+ return super.getCodeGenArgv(moreArgs);
+ }
+
+ @Override
+ protected String [] getArgv(boolean includeHadoopFlags,
+ int rowsPerStatement, int statementsPerTx, String... additionalArgv) {
+
+ String [] subArgv = newStrArray(additionalArgv,
+ "--username", MySQLTestUtils.getCurrentUser());
+ return super.getArgv(includeHadoopFlags, rowsPerStatement,
+ statementsPerTx, subArgv);
+ }
+}
diff --git a/src/test/org/apache/hadoop/sqoop/manager/OracleExportTest.java b/src/test/org/apache/hadoop/sqoop/manager/OracleExportTest.java
new file mode 100644
index 00000000..29220058
--- /dev/null
+++ b/src/test/org/apache/hadoop/sqoop/manager/OracleExportTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.manager;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.sqoop.SqoopOptions;
+import org.apache.hadoop.sqoop.TestExport;
+
+/**
+ * Test the OracleManager implementation's exportJob() functionality.
+ * This tests the OracleExportOutputFormat (which subclasses
+ * ExportOutputFormat with Oracle-specific SQL statements).
+ */
+public class OracleExportTest extends TestExport {
+
+ public static final Log LOG = LogFactory.getLog(
+ OracleExportTest.class.getName());
+
+ static final String TABLE_PREFIX = "EXPORT_ORACLE_";
+
+ // instance variables populated during setUp, used during tests.
+ private OracleManager manager;
+ private Connection conn;
+
+ @Override
+ protected Connection getConnection() {
+ return conn;
+ }
+
+ // Oracle allows multi-row inserts (with its own syntax).
+ @Override
+ protected int getMaxRowsPerStatement() {
+ return 1000;
+ }
+
+ @Override
+ protected boolean useHsqldbTestServer() {
+ return false;
+ }
+
+ @Override
+ protected String getConnectString() {
+ return OracleUtils.CONNECT_STRING;
+ }
+
+ @Override
+ protected String getTablePrefix() {
+ return TABLE_PREFIX;
+ }
+
+ @Override
+ protected String getDropTableStatement(String tableName) {
+ return OracleUtils.getDropTableStatement(tableName);
+ }
+
+ @Before
+ public void setUp() {
+ super.setUp();
+
+ SqoopOptions options = new SqoopOptions(OracleUtils.CONNECT_STRING,
+ getTableName());
+ OracleUtils.setOracleAuth(options);
+ this.manager = new OracleManager(options);
+ try {
+ this.conn = manager.getConnection();
+ this.conn.setAutoCommit(false);
+ } catch (SQLException sqlE) {
+ LOG.error(StringUtils.stringifyException(sqlE));
+ fail("Failed with sql exception in setup: " + sqlE);
+ }
+ }
+
+ @After
+ public void tearDown() {
+ super.tearDown();
+
+ if (null != this.conn) {
+ try {
+ this.conn.close();
+ } catch (SQLException sqlE) {
+ LOG.error("Got SQLException closing conn: " + sqlE.toString());
+ }
+ }
+
+ if (null != manager) {
+ try {
+ manager.close();
+ manager = null;
+ } catch (SQLException sqlE) {
+ LOG.error("Got SQLException: " + sqlE.toString());
+ fail("Got SQLException: " + sqlE.toString());
+ }
+ }
+ }
+
+ @Override
+ protected String [] getCodeGenArgv(String... extraArgs) {
+ String [] moreArgs = new String[extraArgs.length + 4];
+ int i = 0;
+ for (i = 0; i < extraArgs.length; i++) {
+ moreArgs[i] = extraArgs[i];
+ }
+
+ // Add username and password args.
+ moreArgs[i++] = "--username";
+ moreArgs[i++] = OracleUtils.ORACLE_USER_NAME;
+ moreArgs[i++] = "--password";
+ moreArgs[i++] = OracleUtils.ORACLE_USER_PASS;
+
+ return super.getCodeGenArgv(moreArgs);
+ }
+
+ @Override
+ protected String [] getArgv(boolean includeHadoopFlags,
+ int rowsPerStatement, int statementsPerTx, String... additionalArgv) {
+
+ String [] subArgv = newStrArray(additionalArgv,
+ "--username", OracleUtils.ORACLE_USER_NAME,
+ "--password", OracleUtils.ORACLE_USER_PASS);
+ return super.getArgv(includeHadoopFlags, rowsPerStatement,
+ statementsPerTx, subArgv);
+ }
+
+ @Override
+ protected ColumnGenerator getDateColumnGenerator() {
+ // Return a TIMESTAMP generator that has increasing date values.
+ // We currently do not support export of DATE columns since
+ // Oracle informs us that they are actually TIMESTAMP; this messes
+ // up Sqoop's parsing of values as we have no way of knowing they're
+ // really supposed to be dates based on the Oracle Jdbc Metadata.
+ return new ColumnGenerator() {
+ public String getExportText(int rowNum) {
+ int day = rowNum + 1;
+ return "2009-10-" + pad(day) + " 00:00:00.0";
+ }
+ public String getVerifyText(int rowNum) {
+ int day = rowNum + 1;
+ return "2009-10-" + day + " 0:0:0. 0";
+ }
+ public String getType() {
+ return "TIMESTAMP"; // TODO: Support DATE more intelligently.
+ }
+ };
+ }
+
+ @Override
+ protected ColumnGenerator getTimeColumnGenerator() {
+ // Return a TIMESTAMP generator that has increasing time values.
+ // We currently do not support the export of DATE columns with
+ // only a time component set (because Oracle reports these column
+ // types to Sqoop as TIMESTAMP, and we parse the user's text
+ // incorrectly based on this result).
+ return new ColumnGenerator() {
+ public String getExportText(int rowNum) {
+ return "1970-01-01 10:01:" + pad(rowNum) + ".0";
+ }
+ public String getVerifyText(int rowNum) {
+ return "1970-1-1 10:1:" + rowNum + ". 0";
+ }
+ public String getType() {
+ return "TIMESTAMP";
+ }
+ };
+ }
+
+ @Override
+ protected String getBigIntType() {
+ // Oracle stores everything in NUMERIC columns.
+ return "NUMERIC(12,0)";
+ }
+}
diff --git a/src/test/org/apache/hadoop/sqoop/manager/OracleUtils.java b/src/test/org/apache/hadoop/sqoop/manager/OracleUtils.java
index 3c91fbe4..9caf8eec 100644
--- a/src/test/org/apache/hadoop/sqoop/manager/OracleUtils.java
+++ b/src/test/org/apache/hadoop/sqoop/manager/OracleUtils.java
@@ -80,8 +80,7 @@ public static void dropTable(String tableName, ConnManager manager)
st = connection.createStatement();
// create the database table and populate it with data.
- st.executeUpdate("BEGIN EXECUTE IMMEDIATE 'DROP TABLE " + tableName + "'; "
- + "exception when others then null; end;");
+ st.executeUpdate(getDropTableStatement(tableName));
connection.commit();
} finally {
@@ -94,4 +93,9 @@ public static void dropTable(String tableName, ConnManager manager)
}
}
}
+
+ public static String getDropTableStatement(String tableName) {
+ return "BEGIN EXECUTE IMMEDIATE 'DROP TABLE " + tableName + "'; "
+ + "exception when others then null; end;";
+ }
}
diff --git a/src/test/org/apache/hadoop/sqoop/testutil/ExportJobTestCase.java b/src/test/org/apache/hadoop/sqoop/testutil/ExportJobTestCase.java
index 6599f1c9..8a1834b9 100644
--- a/src/test/org/apache/hadoop/sqoop/testutil/ExportJobTestCase.java
+++ b/src/test/org/apache/hadoop/sqoop/testutil/ExportJobTestCase.java
@@ -31,11 +31,12 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.sqoop.Sqoop;
+import org.apache.hadoop.sqoop.mapreduce.ExportOutputFormat;
import org.apache.hadoop.sqoop.tool.ExportTool;
/**
* Class that implements common methods required for tests which export data
- * from HDFS to databases, to verify correct export
+ * from HDFS to databases, to verify correct export.
*/
public class ExportJobTestCase extends BaseSqoopTestCase {
@@ -45,6 +46,15 @@ protected String getTablePrefix() {
return "EXPORT_TABLE_";
}
+ /**
+ * @return the maximum rows to fold into an INSERT statement.
+ * HSQLDB can only support the single-row INSERT syntax. Other databases
+ * can support greater numbers of rows per statement.
+ */
+ protected int getMaxRowsPerStatement() {
+ return 1;
+ }
+
/**
* @return a connection to the database under test.
*/
@@ -60,13 +70,27 @@ protected Connection getConnection() {
/**
* Create the argv to pass to Sqoop
* @param includeHadoopFlags if true, then include -D various.settings=values
+ * @param rowsPerStmt number of rows to export in a single INSERT statement.
+ * @param statementsPerTx ## of statements to use in a transaction.
* @return the argv as an array of strings.
*/
- protected String [] getArgv(boolean includeHadoopFlags, String... additionalArgv) {
+ protected String [] getArgv(boolean includeHadoopFlags,
+ int rowsPerStmt, int statementsPerTx, String... additionalArgv) {
ArrayList args = new ArrayList();
if (includeHadoopFlags) {
CommonArgs.addHadoopFlags(args);
+ args.add("-D");
+ int realRowsPerStmt = Math.min(rowsPerStmt, getMaxRowsPerStatement());
+ if (realRowsPerStmt != rowsPerStmt) {
+ LOG.warn("Rows per statement set to " + realRowsPerStmt
+ + " by getMaxRowsPerStatement() limit.");
+ }
+ args.add(ExportOutputFormat.RECORDS_PER_STATEMENT_KEY + "="
+ + realRowsPerStmt);
+ args.add("-D");
+ args.add(ExportOutputFormat.STATEMENTS_PER_TRANSACTION_KEY + "="
+ + statementsPerTx);
}
// Any additional Hadoop flags (-D foo=bar) are prepended.
@@ -201,6 +225,10 @@ protected void verifyExport(int expectedNumRecords) throws IOException, SQLExcep
assertEquals("Got back unexpected row count", expectedNumRecords,
actualNumRecords);
+ if (expectedNumRecords == 0) {
+ return; // Nothing more to verify.
+ }
+
// Check that we start with row 0.
int minVal = getMinRowId();
assertEquals("Minimum row was not zero", 0, minVal);