5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-19 02:10:54 +08:00

New ExportOutputFormat with improved JDBC performance.

Added ExportOutputFormat.
JdbcExportJob uses ExportOutputFormat instead of DBOutputFormat.
SqoopRecord now supports write() to multi-row INSERT statement, clone().
ClassWriter generates methods to fulfill expanded SqoopRecord contract.
Added Jdbc MySQL tests to test multi-row statements.
BlobRef, ClobRef classes now support Cloneable interface.
Added ExportStressTest.
Added Oracle-specific ExportOutputFormat subclass.

From: Aaron Kimball <aaron@cloudera.com>

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149898 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Bayer 2011-07-22 20:03:46 +00:00
parent 1eb4226230
commit e926bf1fe0
23 changed files with 1365 additions and 66 deletions

View File

@ -132,8 +132,9 @@
<!-- Classpath for unit tests (superset of compile.classpath) -->
<path id="test.classpath">
<pathelement location="${build.test.classes}" />
<path refid="${name}.hadooptest.classpath"/>
<path refid="compile.classpath"/>
<path refid="${name}.hadooptest.classpath" />
<path refid="compile.classpath" />
<pathelement location="${build.shim.classes}/${hadoop.dist}" />
</path>
<path id="cobertura.classpath">
@ -197,7 +198,7 @@
</target>
<target name="compile-test"
depends="compile, ivy-retrieve-hadoop-test"
depends="compile, compile-one-shim, ivy-retrieve-hadoop-test"
description="Compile test classes">
<mkdir dir="${build.test.classes}" />
<javac

View File

@ -40,9 +40,6 @@
* [int: scale][boolean: b == false][long: BigInt-part]
* [int: scale][boolean: b == true][string: BigInt-part.toString()]
*
*
*
*
* TODO(aaron): Get this to work with Hadoop's Serializations framework.
*/
public final class BigDecimalSerializer {

View File

@ -23,6 +23,7 @@
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -83,6 +84,11 @@ protected InputStream getInternalSource(BytesWritable data) {
return data.getBytes();
}
@Override
protected BytesWritable deepCopyData() {
return new BytesWritable(Arrays.copyOf(data.getBytes(), data.getLength()));
}
@Override
public void readFieldsInternal(DataInput in) throws IOException {
// For internally-stored BLOBs, the data is a BytesWritable

View File

@ -72,6 +72,11 @@ protected Reader getInternalSource(String data) {
return new StringReader(data);
}
@Override
protected String deepCopyData() {
return data;
}
@Override
protected String getInternalData(String data) {
return data;

View File

@ -92,6 +92,24 @@ protected LobRef(String file, long offset, long length) {
// If we've opened a LobFile object, track our reference to it here.
protected LobFile.Reader reader;
@Override
@SuppressWarnings("unchecked")
/**
* Clone the current reference object. data is deep-copied; any open
* file handle remains with the original only.
*/
public Object clone() throws CloneNotSupportedException {
LobRef<DATATYPE, CONTAINERTYPE, ACCESSORTYPE> r =
(LobRef<DATATYPE, CONTAINERTYPE, ACCESSORTYPE>) super.clone();
r.reader = null; // Reference to opened reader is not duplicated.
if (null != data) {
r.data = deepCopyData();
}
return r;
}
@Override
protected synchronized void finalize() throws Throwable {
close();
@ -202,6 +220,11 @@ protected abstract ACCESSORTYPE getExternalSource(LobFile.Reader reader)
*/
protected abstract DATATYPE getInternalData(CONTAINERTYPE data);
/**
* Make a copy of the materialized data.
*/
protected abstract CONTAINERTYPE deepCopyData();
public DATATYPE getData() {
if (isExternal()) {
throw new RuntimeException(

View File

@ -21,16 +21,17 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
/**
* Interface implemented by the classes generated by sqoop's orm.ClassWriter.
*/
public interface SqoopRecord extends DBWritable, Writable {
public interface SqoopRecord extends Cloneable, DBWritable, Writable {
public void parse(CharSequence s) throws RecordParser.ParseError;
public void parse(Text s) throws RecordParser.ParseError;
public void parse(byte [] s) throws RecordParser.ParseError;
@ -39,5 +40,13 @@ public interface SqoopRecord extends DBWritable, Writable {
public void parse(CharBuffer s) throws RecordParser.ParseError;
public void loadLargeObjects(LargeObjectLoader objLoader)
throws SQLException, IOException, InterruptedException;
public Object clone() throws CloneNotSupportedException;
/**
* Inserts the data in this object into the PreparedStatement, starting
* at parameter 'offset'.
* @return the number of fields written to the statement.
*/
public int write(PreparedStatement stmt, int offset) throws SQLException;
}

View File

@ -33,8 +33,12 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.db.OracleDataDrivenDBInputFormat;
import org.apache.hadoop.sqoop.SqoopOptions;
import org.apache.hadoop.sqoop.mapreduce.JdbcExportJob;
import org.apache.hadoop.sqoop.shims.ShimLoader;
import org.apache.hadoop.sqoop.util.ExportException;
import org.apache.hadoop.sqoop.util.ImportException;
/**
@ -281,6 +285,7 @@ private void setSessionTimeZone(Connection conn) throws SQLException {
}
}
@Override
public void importTable(ImportJobContext context)
throws IOException, ImportException {
// Specify the Oracle-specific DBInputFormat for import.
@ -288,6 +293,22 @@ public void importTable(ImportJobContext context)
super.importTable(context);
}
/**
* Export data stored in HDFS into a table in a database
*/
public void exportTable(ExportJobContext context)
throws IOException, ExportException {
try {
JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
(Class<? extends OutputFormat>) ShimLoader.getShimClass(
"org.apache.hadoop.sqoop.mapreduce.OracleExportOutputFormat"));
exportJob.runExport();
} catch (ClassNotFoundException cnfe) {
throw new ExportException("Could not start export; could not find class",
cnfe);
}
}
@Override
public ResultSet readTable(String tableName, String[] columns) throws SQLException {
if (columns == null) {

View File

@ -37,7 +37,6 @@
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
@ -188,6 +187,18 @@ protected Class<? extends InputFormat> getInputFormatClass()
}
}
@Override
protected Class<? extends OutputFormat> getOutputFormatClass()
throws ClassNotFoundException {
Class<? extends OutputFormat> configuredOF = super.getOutputFormatClass();
if (null == configuredOF) {
return (Class<? extends OutputFormat>) ShimLoader.getShimClass(
"org.apache.hadoop.sqoop.mapreduce.ExportOutputFormat");
} else {
return configuredOF;
}
}
@Override
protected void configureMapper(Job job, String tableName,
String tableClassName) throws ClassNotFoundException, IOException {

View File

@ -25,18 +25,20 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.sqoop.ConnFactory;
import org.apache.hadoop.sqoop.manager.ConnManager;
import org.apache.hadoop.sqoop.manager.ExportJobContext;
import org.apache.hadoop.sqoop.shims.ShimLoader;
/**
* Run an export using JDBC (DBOutputFormat)
* Run an export using JDBC (JDBC-based ExportOutputFormat)
*/
public class JdbcExportJob extends ExportJobBase {
@ -47,6 +49,13 @@ public JdbcExportJob(final ExportJobContext context) {
super(context);
}
public JdbcExportJob(final ExportJobContext ctxt,
final Class<? extends Mapper> mapperClass,
final Class<? extends InputFormat> inputFormatClass,
final Class<? extends OutputFormat> outputFormatClass) {
super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
}
@Override
protected Class<? extends Mapper> getMapperClass() {
if (inputIsSequenceFiles()) {
@ -81,8 +90,10 @@ protected void configureOutputFormat(Job job, String tableName,
}
DBOutputFormat.setOutput(job, tableName, colNames);
job.setOutputFormatClass(DBOutputFormat.class);
job.setOutputFormatClass(getOutputFormatClass());
job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
} catch (ClassNotFoundException cnfe) {
throw new IOException("Could not load OutputFormat", cnfe);
} finally {
try {
mgr.close();

View File

@ -47,13 +47,7 @@ public class MySQLExportJob extends ExportJobBase {
LogFactory.getLog(MySQLExportJob.class.getName());
public MySQLExportJob(final ExportJobContext context) {
super(context);
}
@Override
protected Class<? extends OutputFormat> getOutputFormatClass() {
// This job does not write to the OutputCollector. Disable it.
return NullOutputFormat.class;
super(context, null, null, NullOutputFormat.class);
}
@Override

View File

@ -28,7 +28,7 @@
/**
* Reads a SqoopRecord from the SequenceFile in which it's packed and emits
* that DBWritable to the DBOutputFormat for writeback to the database.
* that DBWritable to the OutputFormat for writeback to the database.
*/
public class SequenceFileExportMapper
extends AutoProgressMapper<LongWritable, SqoopRecord, SqoopRecord, NullWritable> {

View File

@ -33,7 +33,7 @@
/**
* Converts an input record from a string representation to a parsed Sqoop record
* and emits that DBWritable to the DBOutputFormat for writeback to the database.
* and emits that DBWritable to the OutputFormat for writeback to the database.
*/
public class TextExportMapper
extends AutoProgressMapper<LongWritable, Text, SqoopRecord, NullWritable> {

View File

@ -500,6 +500,10 @@ private void generateDbWrite(Map<String, Integer> columnTypes, String [] colName
StringBuilder sb) {
sb.append(" public void write(PreparedStatement __dbStmt) throws SQLException {\n");
sb.append(" write(__dbStmt, 0);\n");
sb.append(" }\n\n");
sb.append(" public int write(PreparedStatement __dbStmt, int __off) throws SQLException {\n");
int fieldNum = 0;
@ -520,9 +524,10 @@ private void generateDbWrite(Map<String, Integer> columnTypes, String [] colName
}
sb.append(" JdbcWritableBridge." + setterMethod + "(" + col + ", "
+ fieldNum + ", " + sqlType + ", __dbStmt);\n");
+ fieldNum + " + __off, " + sqlType + ", __dbStmt);\n");
}
sb.append(" return " + fieldNum + ";\n");
sb.append(" }\n");
}
@ -559,7 +564,48 @@ private void generateHadoopRead(Map<String, Integer> columnTypes, String [] colN
}
/**
* Generate the toString() method
* Generate the clone() method.
* @param columnTypes - mapping from column names to sql types
* @param colNames - ordered list of column names for table.
* @param sb - StringBuilder to append code to
*/
private void generateCloneMethod(Map<String, Integer> columnTypes,
String [] colNames, StringBuilder sb) {
TableClassName tableNameInfo = new TableClassName(options);
String className = tableNameInfo.getShortClassForTable(tableName);
sb.append(" public Object clone() throws CloneNotSupportedException {\n");
sb.append(" " + className + " o = (" + className + ") super.clone();\n");
// For each field that is mutable, we need to perform the deep copy.
for (String colName : colNames) {
int sqlType = columnTypes.get(colName);
String javaType = connManager.toJavaType(sqlType);
if (null == javaType) {
continue;
} else if (javaType.equals("java.sql.Date")
|| javaType.equals("java.sql.Time")
|| javaType.equals("java.sql.Timestamp")
|| javaType.equals(ClobRef.class.getName())
|| javaType.equals(BlobRef.class.getName())) {
sb.append(" o." + colName + " = (" + javaType
+ ") o." + colName + ".clone();\n");
} else if (javaType.equals(BytesWritable.class.getName())) {
sb.append(" o." + colName + " = new BytesWritable("
+ "Arrays.copyOf(" + colName + ".getBytes(), "
+ colName + ".getLength()));\n");
}
}
sb.append(" return o;\n");
sb.append(" }\n");
}
/**
* Generate the toString() method.
* @param columnTypes - mapping from column names to sql types
* @param colNames - ordered list of column names for table.
* @param sb - StringBuilder to append code to
@ -904,6 +950,7 @@ public StringBuilder generateClassForColumns(Map<String, Integer> columnTypes,
sb.append("import java.sql.Date;\n");
sb.append("import java.sql.Time;\n");
sb.append("import java.sql.Timestamp;\n");
sb.append("import java.util.Arrays;\n");
sb.append("import java.util.Iterator;\n");
sb.append("import java.util.List;\n");
sb.append("\n");
@ -921,6 +968,8 @@ public StringBuilder generateClassForColumns(Map<String, Integer> columnTypes,
generateHadoopWrite(columnTypes, colNames, sb);
generateToString(columnTypes, colNames, sb);
generateParser(columnTypes, colNames, sb);
generateCloneMethod(columnTypes, colNames, sb);
// TODO(aaron): Generate hashCode(), compareTo(), equals() so it can be a WritableComparable
sb.append("}\n");

View File

@ -0,0 +1,148 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.*;
import java.sql.*;
import java.util.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.sqoop.Sqoop;
import org.apache.hadoop.sqoop.SqoopOptions;
import org.apache.hadoop.sqoop.tool.ExportTool;
import org.apache.hadoop.sqoop.tool.SqoopTool;
/**
* Stress test export procedure by running a large-scale export to MySQL.
* This requires MySQL be configured with a database that can be accessed
* by the specified username without a password. The user must be able to
* create and drop tables in the database.
*
* Run with: src/scripts/run-perftest.sh ExportStressTest (connect-str) (username)
*/
public class ExportStressTest extends Configured implements Tool {
// Export 10 GB of data. Each record is ~100 bytes.
public final static int NUM_FILES = 10;
public final static int RECORDS_PER_FILE = 10 * 1024 * 1024;
public final static String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
public ExportStressTest() {
}
public void createFile(int fileId) throws IOException {
Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
Path dirPath = new Path("ExportStressTest");
fs.mkdirs(dirPath);
Path filePath = new Path(dirPath, "input-" + fileId);
OutputStream os = fs.create(filePath);
Writer w = new BufferedWriter(new OutputStreamWriter(os));
for (int i = 0; i < RECORDS_PER_FILE; i++) {
long v = (long) i + ((long) RECORDS_PER_FILE * (long) fileId);
w.write("" + v + "," + ALPHABET + ALPHABET + ALPHABET + ALPHABET + "\n");
}
w.close();
os.close();
}
/** Create a set of data files to export. */
public void createData() throws IOException {
Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
Path dirPath = new Path("ExportStressTest");
if (fs.exists(dirPath)) {
System.out.println(
"Export directory appears to already exist. Skipping data-gen.");
return;
}
for (int i = 0; i < NUM_FILES; i++) {
createFile(i);
}
}
/** Create a table to hold our results. Drop any existing definition. */
public void createTable(String connectStr, String username) throws Exception {
Class.forName("com.mysql.jdbc.Driver"); // Load mysql driver.
Connection conn = DriverManager.getConnection(connectStr, username, null);
conn.setAutoCommit(false);
PreparedStatement stmt = conn.prepareStatement(
"DROP TABLE IF EXISTS ExportStressTestTable", ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
stmt.executeUpdate();
stmt.close();
stmt = conn.prepareStatement(
"CREATE TABLE ExportStressTestTable(id INT NOT NULL PRIMARY KEY, "
+ "msg VARCHAR(110)) Engine=InnoDB", ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
stmt.executeUpdate();
stmt.close();
conn.commit();
conn.close();
}
/** Actually run the export of the generated data to the user-created table. */
public void runExport(String connectStr, String username) throws Exception {
SqoopOptions options = new SqoopOptions(getConf());
options.setConnectString(connectStr);
options.setTableName("ExportStressTestTable");
options.setUsername(username);
options.setExportDir("ExportStressTest");
options.setNumMappers(4);
options.setLinesTerminatedBy('\n');
options.setFieldsTerminatedBy(',');
options.setExplicitDelims(true);
SqoopTool exportTool = new ExportTool();
Sqoop sqoop = new Sqoop(exportTool, getConf(), options);
int ret = Sqoop.runSqoop(sqoop, new String[0]);
if (0 != ret) {
throw new Exception("Error doing export; ret=" + ret);
}
}
@Override
public int run(String [] args) {
String connectStr = args[0];
String username = args[1];
try {
createData();
createTable(connectStr, username);
runExport(connectStr, username);
} catch (Exception e) {
System.err.println("Error: " + StringUtils.stringifyException(e));
return 1;
}
return 0;
}
public static void main(String [] args) throws Exception {
ExportStressTest test = new ExportStressTest();
int ret = ToolRunner.run(test, args);
System.exit(ret);
}
}

View File

@ -0,0 +1,492 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.mapreduce;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.SynchronousQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.sqoop.lib.SqoopRecord;
/**
* Insert the emitted keys as records into a database table.
* This supports a configurable "spill threshold" at which
* point intermediate transactions are committed.
*
* Record objects are buffered before actually performing the INSERT
* statements; this requires that the key implement the
* SqoopRecord interface.
*
* Uses DBOutputFormat/DBConfiguration for configuring the output.
*/
public class ExportOutputFormat<K extends SqoopRecord, V>
extends OutputFormat<K,V> {
/** conf key: number of rows to export per INSERT statement. */
public static final String RECORDS_PER_STATEMENT_KEY =
"sqoop.export.records.per.statement";
/** conf key: number of INSERT statements to bundle per tx.
* If this is set to -1, then a single transaction will be used
* per task. Note that each statement may encompass multiple
* rows, depending on the value of sqoop.export.records.per.statement.
*/
public static final String STATEMENTS_PER_TRANSACTION_KEY =
"sqoop.export.statements.per.transaction";
private static final int DEFAULT_RECORDS_PER_STATEMENT = 100;
private static final int DEFAULT_STATEMENTS_PER_TRANSACTION = 100;
private static final int UNLIMITED_STATEMENTS_PER_TRANSACTION = -1;
private static final Log LOG = LogFactory.getLog(ExportOutputFormat.class);
/** {@inheritDoc} */
public void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
DBConfiguration dbConf = new DBConfiguration(conf);
// Sanity check all the configuration values we need.
if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
throw new IOException("Database connection URL is not set.");
} else if (null == dbConf.getOutputTableName()) {
throw new IOException("Table name is not set for export");
} else if (null == dbConf.getOutputFieldNames()
&& 0 == dbConf.getOutputFieldCount()) {
throw new IOException(
"Output field names are null and zero output field count set.");
}
}
/** {@inheritDoc} */
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException {
return new OutputCommitter() {
public void abortTask(TaskAttemptContext taskContext) { }
public void cleanupJob(JobContext jobContext) { }
public void commitTask(TaskAttemptContext taskContext) { }
public boolean needsTaskCommit(TaskAttemptContext taskContext) {
return false;
}
public void setupJob(JobContext jobContext) { }
public void setupTask(TaskAttemptContext taskContext) { }
};
}
/** {@inheritDoc} */
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException {
try {
return new ExportRecordWriter(context);
} catch (Exception e) {
throw new IOException(e);
}
}
/**
* Represents a database update operation that should be performed
* by an asynchronous background thread.
* AsyncDBOperation objects are immutable.
* They MAY contain a statement which should be executed. The
* statement may also be null.
*
* They may also set 'forceCommit' to true. If true, then the
* executor of this operation should commit the current
* transaction, even if stmt is null.
*/
private static class AsyncDBOperation {
private final PreparedStatement stmt;
private final boolean forceCommit;
private final boolean close;
/**
* Create an asynchronous database operation.
* @param s the statement, if any, to execute.
* @param forceCommit if true, the current transaction should be committed.
* @param close if true, the executor thread should stop after processing
* this operation.
*/
public AsyncDBOperation(PreparedStatement s, boolean forceCommit,
boolean close) {
this.stmt = s;
this.forceCommit = forceCommit;
this.close = close;
}
/**
* @return a statement to run as an update.
*/
public PreparedStatement getStatement() {
return stmt;
}
/**
* @return true if the executor should commit the current transaction.
* If getStatement() is non-null, the statement is run first.
*/
public boolean requiresCommit() {
return forceCommit;
}
/**
* @return true if the executor should stop after this command.
*/
public boolean stop() {
return this.close;
}
}
/**
* A thread that runs the database interactions asynchronously
* from the OutputCollector.
*/
private static class ExportUpdateThread extends Thread {
private final Connection conn; // The connection to the database.
private SQLException err; // Error from a previously-run statement.
// How we receive database operations from the RecordWriter.
private SynchronousQueue<AsyncDBOperation> opsQueue;
protected int curNumStatements; // statements executed thus far in the tx.
protected final int stmtsPerTx; // statements per transaction.
/**
* Create a new update thread that interacts with the database.
* @param conn the connection to use. This must only be used by this
* thread.
* @param stmtsPerTx the number of statements to execute before committing
* the current transaction.
*/
public ExportUpdateThread(Connection conn, int stmtsPerTx) {
this.conn = conn;
this.err = null;
this.opsQueue = new SynchronousQueue<AsyncDBOperation>();
this.stmtsPerTx = stmtsPerTx;
}
public void run() {
while (true) {
AsyncDBOperation op = null;
try {
op = opsQueue.take();
} catch (InterruptedException ie) {
LOG.warn("Interrupted retrieving from operation queue: "
+ StringUtils.stringifyException(ie));
continue;
}
if (null == op) {
// This shouldn't be allowed to happen.
LOG.warn("Null operation in queue; illegal state.");
continue;
}
PreparedStatement stmt = op.getStatement();
// Synchronize on the connection to ensure it does not conflict
// with the prepareStatement() call in the main thread.
synchronized (conn) {
try {
if (null != stmt) {
stmt.executeUpdate();
stmt.close();
stmt = null;
this.curNumStatements++;
}
if (op.requiresCommit() || (curNumStatements >= stmtsPerTx
&& stmtsPerTx != UNLIMITED_STATEMENTS_PER_TRANSACTION)) {
LOG.debug("Committing transaction of " + curNumStatements
+ " statements");
this.conn.commit();
this.curNumStatements = 0;
}
} catch (SQLException sqlE) {
setLastError(sqlE);
} finally {
// Close the statement on our way out if that didn't happen
// via the normal execution path.
if (null != stmt) {
try {
stmt.close();
} catch (SQLException sqlE) {
setLastError(sqlE);
}
}
// Always check whether we should end the loop, regardless
// of the presence of an exception.
if (op.stop()) {
// Don't continue processing after this operation.
try {
conn.close();
} catch (SQLException sqlE) {
setLastError(sqlE);
}
return;
}
} // try .. catch .. finally.
} // synchronized (conn)
}
}
/**
* Allows a user to enqueue the next database operation to run.
* Since the connection can only execute a single operation at a time,
* the put() method may block if another operation is already underway.
* @param op the database operation to perform.
*/
public void put(AsyncDBOperation op) throws InterruptedException {
opsQueue.put(op);
}
/**
* If a previously-executed statement resulted in an error, post it here.
* If the error slot was already filled, then subsequent errors are
* squashed until the user calls this method (which clears the error
* slot).
* @return any SQLException that occurred due to a previously-run
* statement.
*/
public synchronized SQLException getLastError() {
SQLException e = this.err;
this.err = null;
return e;
}
private synchronized void setLastError(SQLException e) {
if (this.err == null) {
// Just set it.
LOG.error("Got exception in update thread: "
+ StringUtils.stringifyException(e));
this.err = e;
} else {
// Slot is full. Log it and discard.
LOG.error("SQLException in update thread but error slot full: "
+ StringUtils.stringifyException(e));
}
}
}
/**
* RecordWriter to write the output to a row in a database table.
* The actual database updates are executed in a second thread.
*/
public class ExportRecordWriter extends RecordWriter<K, V> {
protected Connection connection;
protected Configuration conf;
protected int rowsPerStmt; // rows to insert per statement.
// Buffer for records to be put in an INSERT statement.
protected List<SqoopRecord> records;
protected String tableName;
protected String [] columnNames; // The columns to insert into.
protected int columnCount; // If columnNames is null, tells ## of cols.
// Background thread to actually perform the updates.
private ExportUpdateThread updateThread;
private boolean startedUpdateThread;
public ExportRecordWriter(TaskAttemptContext context)
throws ClassNotFoundException, SQLException {
this.conf = context.getConfiguration();
this.rowsPerStmt = conf.getInt(RECORDS_PER_STATEMENT_KEY,
DEFAULT_RECORDS_PER_STATEMENT);
int stmtsPerTx = conf.getInt(STATEMENTS_PER_TRANSACTION_KEY,
DEFAULT_STATEMENTS_PER_TRANSACTION);
DBConfiguration dbConf = new DBConfiguration(conf);
this.connection = dbConf.getConnection();
this.tableName = dbConf.getOutputTableName();
this.columnNames = dbConf.getOutputFieldNames();
this.columnCount = dbConf.getOutputFieldCount();
this.connection.setAutoCommit(false);
this.records = new ArrayList<SqoopRecord>(this.rowsPerStmt);
this.updateThread = new ExportUpdateThread(connection, stmtsPerTx);
this.updateThread.setDaemon(true);
this.startedUpdateThread = false;
}
/**
* @return an INSERT statement suitable for inserting 'numRows' rows.
*/
protected String getInsertStatement(int numRows) {
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO " + tableName + " ");
int numSlots;
if (this.columnNames != null) {
numSlots = this.columnNames.length;
sb.append("(");
boolean first = true;
for (String col : columnNames) {
if (!first) {
sb.append(", ");
}
sb.append(col);
first = false;
}
sb.append(") ");
} else {
numSlots = this.columnCount; // set if columnNames is null.
}
sb.append("VALUES ");
// generates the (?, ?, ?...) used for each row.
StringBuilder sbRow = new StringBuilder();
sbRow.append("(");
for (int i = 0; i < numSlots; i++) {
if (i != 0) {
sbRow.append(", ");
}
sbRow.append("?");
}
sbRow.append(")");
// Now append that numRows times.
for (int i = 0; i < numRows; i++) {
if (i != 0) {
sb.append(", ");
}
sb.append(sbRow);
}
return sb.toString();
}
/**
* Takes the current contents of 'records' and formats and executes the
* INSERT statement.
* @param closeConn if true, commits the transaction and closes the
* connection.
*/
private void insertRows(boolean closeConn)
throws InterruptedException, SQLException {
if (!startedUpdateThread) {
this.updateThread.start();
this.startedUpdateThread = true;
}
PreparedStatement stmt = null;
boolean successfulPut = false;
try {
if (records.size() > 0) {
// Synchronize on connection to ensure this does not conflict
// with the operations in the update thread.
synchronized (connection) {
stmt = connection.prepareStatement(
getInsertStatement(records.size()));
}
// Inject the record parameters into the VALUES clauses.
int position = 0;
for (SqoopRecord record : records) {
position += record.write(stmt, position);
}
this.records.clear();
}
// Pass this operation off to the update thread. This will block if
// the update thread is already performing an update.
AsyncDBOperation op = new AsyncDBOperation(stmt, closeConn, closeConn);
updateThread.put(op);
successfulPut = true; // op has been posted to the other thread.
} finally {
if (!successfulPut && null != stmt) {
// We created a statement but failed to enqueue it. Close it.
stmt.close();
}
}
// Check for any previous SQLException. If one happened, rethrow it here.
SQLException lastException = updateThread.getLastError();
if (null != lastException) {
throw lastException;
}
}
/** {@inheritDoc} */
public void close(TaskAttemptContext context)
throws IOException, InterruptedException {
try {
insertRows(true);
} catch (SQLException sqle) {
throw new IOException(sqle);
} finally {
updateThread.join();
}
// If we're not leaving on an error return path already,
// now that updateThread is definitely stopped, check that the
// error slot remains empty.
SQLException lastErr = updateThread.getLastError();
if (null != lastErr) {
throw new IOException(lastErr);
}
}
/** {@inheritDoc} */
public void write(K key, V value)
throws InterruptedException, IOException {
try {
records.add((SqoopRecord) key.clone());
if (records.size() >= this.rowsPerStmt) {
insertRows(false);
}
} catch (CloneNotSupportedException cnse) {
throw new IOException("Could not buffer record", cnse);
} catch (SQLException sqlException) {
throw new IOException(sqlException);
}
}
}
}

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.mapreduce;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.sqoop.lib.SqoopRecord;
/**
* Oracle-specific SQL formatting overrides default ExportOutputFormat's.
*/
public class OracleExportOutputFormat<K extends SqoopRecord, V>
extends ExportOutputFormat<K,V> {
/** {@inheritDoc} */
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException {
try {
return new OracleExportRecordWriter(context);
} catch (Exception e) {
throw new IOException(e);
}
}
/**
* RecordWriter to write the output to a row in a database table.
* The actual database updates are executed in a second thread.
*/
public class OracleExportRecordWriter extends ExportRecordWriter {
public OracleExportRecordWriter(TaskAttemptContext context)
throws ClassNotFoundException, SQLException {
super(context);
}
@Override
/**
* @return an INSERT statement suitable for inserting 'numRows' rows.
*/
protected String getInsertStatement(int numRows) {
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO " + tableName + " ");
int numSlots;
if (this.columnNames != null) {
numSlots = this.columnNames.length;
sb.append("(");
boolean first = true;
for (String col : columnNames) {
if (!first) {
sb.append(", ");
}
sb.append(col);
first = false;
}
sb.append(") ");
} else {
numSlots = this.columnCount; // set if columnNames is null.
}
// generates the (?, ?, ?...) used for each row.
StringBuilder sbRow = new StringBuilder();
sbRow.append("SELECT ");
for (int i = 0; i < numSlots; i++) {
if (i != 0) {
sbRow.append(", ");
}
sbRow.append("?");
}
sbRow.append(" FROM DUAL ");
// Now append that numRows times.
for (int i = 0; i < numRows; i++) {
if (i != 0) {
sb.append("UNION ALL ");
}
sb.append(sbRow);
}
return sb.toString();
}
}
}

View File

@ -119,7 +119,7 @@ private String getRecordLine(int recordNum, ColumnGenerator... extraCols) {
as what the string representation of the column as returned by
the database should look like.
*/
interface ColumnGenerator {
public interface ColumnGenerator {
/** for a row with id rowNum, what should we write into that
line of the text file to export?
*/
@ -400,13 +400,20 @@ protected void multiFileTest(int numFiles, int recordsPerMap, int numMaps,
}
createTable();
runExport(getArgv(true, newStrArray(argv, "-m", "" + numMaps)));
runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + numMaps)));
verifyExport(TOTAL_RECORDS);
} finally {
LOG.info("multi-file test complete");
}
}
/**
* Run an "export" on an empty file.
*/
public void testEmptyExport() throws IOException, SQLException {
multiFileTest(1, 0, 1);
}
/** Export 10 rows, make sure they load in correctly */
public void testTextExport() throws IOException, SQLException {
multiFileTest(1, 10, 1);
@ -435,11 +442,45 @@ public void testGzipExport() throws IOException, SQLException {
createTextFile(0, TOTAL_RECORDS, true);
createTable();
runExport(getArgv(true));
runExport(getArgv(true, 10, 10));
verifyExport(TOTAL_RECORDS);
LOG.info("Complete gzip export test");
}
/**
* Ensure that we use multiple statements in a transaction.
*/
public void testMultiStatement() throws IOException, SQLException {
final int TOTAL_RECORDS = 20;
createTextFile(0, TOTAL_RECORDS, true);
createTable();
runExport(getArgv(true, 10, 10));
verifyExport(TOTAL_RECORDS);
}
/**
* Ensure that we use multiple transactions in a single mapper.
*/
public void testMultiTransaction() throws IOException, SQLException {
final int TOTAL_RECORDS = 20;
createTextFile(0, TOTAL_RECORDS, true);
createTable();
runExport(getArgv(true, 5, 2));
verifyExport(TOTAL_RECORDS);
}
/**
* Ensure that when we don't force a commit with a statement cap,
* it happens anyway.
*/
public void testUnlimitedTransactionSize() throws IOException, SQLException {
final int TOTAL_RECORDS = 20;
createTextFile(0, TOTAL_RECORDS, true);
createTable();
runExport(getArgv(true, 5, -1));
verifyExport(TOTAL_RECORDS);
}
/** Run 2 mappers, make sure all records load in correctly */
public void testMultiMapTextExport() throws IOException, SQLException {
@ -451,7 +492,7 @@ public void testMultiMapTextExport() throws IOException, SQLException {
}
createTable();
runExport(getArgv(true));
runExport(getArgv(true, 10, 10));
verifyExport(RECORDS_PER_MAP * NUM_FILES);
}
@ -508,7 +549,8 @@ public void testSequenceFileExport() throws Exception {
// Now run and verify the export.
LOG.info("Exporting SequenceFile-based data");
runExport(getArgv(true, "--class-name", className, "--jar-file", jarFileName));
runExport(getArgv(true, 10, 10, "--class-name", className,
"--jar-file", jarFileName));
verifyExport(TOTAL_RECORDS);
} finally {
if (null != prevClassLoader) {
@ -535,11 +577,16 @@ public String getType() {
createTextFile(0, TOTAL_RECORDS, false, gen);
createTable(gen);
runExport(getArgv(true));
runExport(getArgv(true, 10, 10));
verifyExport(TOTAL_RECORDS);
assertColMinAndMax(forIdx(0), gen);
}
/** @return the column type for a large integer */
protected String getBigIntType() {
return "BIGINT";
}
public void testBigIntCol() throws IOException, SQLException {
final int TOTAL_RECORDS = 10;
@ -554,18 +601,18 @@ public String getVerifyText(int rowNum) {
return "" + val;
}
public String getType() {
return "BIGINT";
return getBigIntType();
}
};
createTextFile(0, TOTAL_RECORDS, false, gen);
createTable(gen);
runExport(getArgv(true));
runExport(getArgv(true, 10, 10));
verifyExport(TOTAL_RECORDS);
assertColMinAndMax(forIdx(0), gen);
}
private String pad(int n) {
protected String pad(int n) {
if (n <= 9) {
return "0" + n;
} else {
@ -573,10 +620,11 @@ private String pad(int n) {
}
}
public void testDatesAndTimes() throws IOException, SQLException {
final int TOTAL_RECORDS = 10;
ColumnGenerator genDate = new ColumnGenerator() {
/**
* Get a column generator for DATE columns
*/
protected ColumnGenerator getDateColumnGenerator() {
return new ColumnGenerator() {
public String getExportText(int rowNum) {
int day = rowNum + 1;
return "2009-10-" + day;
@ -589,8 +637,13 @@ public String getType() {
return "DATE";
}
};
}
ColumnGenerator genTime = new ColumnGenerator() {
/**
* Get a column generator for TIME columns.
*/
protected ColumnGenerator getTimeColumnGenerator() {
return new ColumnGenerator() {
public String getExportText(int rowNum) {
return "10:01:" + rowNum;
}
@ -601,10 +654,17 @@ public String getType() {
return "TIME";
}
};
}
public void testDatesAndTimes() throws IOException, SQLException {
final int TOTAL_RECORDS = 10;
ColumnGenerator genDate = getDateColumnGenerator();
ColumnGenerator genTime = getTimeColumnGenerator();
createTextFile(0, TOTAL_RECORDS, false, genDate, genTime);
createTable(genDate, genTime);
runExport(getArgv(true));
runExport(getArgv(true, 10, 10));
verifyExport(TOTAL_RECORDS);
assertColMinAndMax(forIdx(0), genDate);
assertColMinAndMax(forIdx(1), genTime);
@ -647,7 +707,7 @@ public String getType() {
createTextFile(0, TOTAL_RECORDS, false, genFloat, genNumeric);
createTable(genFloat, genNumeric);
runExport(getArgv(true));
runExport(getArgv(true, 10, 10));
verifyExport(TOTAL_RECORDS);
assertColMinAndMax(forIdx(0), genFloat);
assertColMinAndMax(forIdx(1), genNumeric);

View File

@ -24,8 +24,10 @@
import org.apache.hadoop.sqoop.manager.DirectMySQLTest;
import org.apache.hadoop.sqoop.manager.DirectMySQLExportTest;
import org.apache.hadoop.sqoop.manager.JdbcMySQLExportTest;
import org.apache.hadoop.sqoop.manager.MySQLAuthTest;
import org.apache.hadoop.sqoop.manager.MySQLCompatTest;
import org.apache.hadoop.sqoop.manager.OracleExportTest;
import org.apache.hadoop.sqoop.manager.OracleManagerTest;
import org.apache.hadoop.sqoop.manager.OracleCompatTest;
import org.apache.hadoop.sqoop.manager.PostgresqlTest;
@ -44,8 +46,10 @@ public static Test suite() {
+ "implementations in Sqoop");
suite.addTestSuite(DirectMySQLTest.class);
suite.addTestSuite(DirectMySQLExportTest.class);
suite.addTestSuite(JdbcMySQLExportTest.class);
suite.addTestSuite(MySQLAuthTest.class);
suite.addTestSuite(MySQLCompatTest.class);
suite.addTestSuite(OracleExportTest.class);
suite.addTestSuite(OracleManagerTest.class);
suite.addTestSuite(OracleCompatTest.class);
suite.addTestSuite(PostgresqlTest.class);

View File

@ -81,34 +81,13 @@ public void setUp() {
options.setUsername(MySQLTestUtils.getCurrentUser());
this.manager = new DirectMySQLManager(options);
Statement st = null;
try {
this.conn = manager.getConnection();
this.conn.setAutoCommit(false);
st = this.conn.createStatement();
// create the database table.
st.executeUpdate("DROP TABLE IF EXISTS " + getTableName());
st.executeUpdate("CREATE TABLE " + getTableName() + " ("
+ "id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, "
+ "name VARCHAR(24) NOT NULL, "
+ "start_date DATE, "
+ "salary FLOAT, "
+ "dept VARCHAR(32))");
this.conn.commit();
} catch (SQLException sqlE) {
LOG.error("Encountered SQL Exception: " + sqlE);
sqlE.printStackTrace();
fail("SQLException when running test setUp(): " + sqlE);
} finally {
try {
if (null != st) {
st.close();
}
} catch (SQLException sqlE) {
LOG.warn("Got SQLException when closing connection: " + sqlE);
}
}
}
@ -153,11 +132,12 @@ public void tearDown() {
@Override
protected String [] getArgv(boolean includeHadoopFlags,
String... additionalArgv) {
int rowsPerStatement, int statementsPerTx, String... additionalArgv) {
String [] subArgv = newStrArray(additionalArgv, "--direct",
"--username", MySQLTestUtils.getCurrentUser());
return super.getArgv(includeHadoopFlags, subArgv);
return super.getArgv(includeHadoopFlags, rowsPerStatement,
statementsPerTx, subArgv);
}
/**

View File

@ -0,0 +1,149 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.manager;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.sqoop.SqoopOptions;
import org.apache.hadoop.sqoop.TestExport;
import org.apache.hadoop.sqoop.mapreduce.MySQLExportMapper;
/**
* Test the MySQLManager implementation's exportJob() functionality.
* This does a better test of ExportOutputFormat than TestExport does,
* because it supports multi-row INSERT statements.
*/
public class JdbcMySQLExportTest extends TestExport {
public static final Log LOG = LogFactory.getLog(
JdbcMySQLExportTest.class.getName());
static final String TABLE_PREFIX = "EXPORT_MYSQL_J_";
// instance variables populated during setUp, used during tests.
private MySQLManager manager;
private Connection conn;
@Override
protected Connection getConnection() {
return conn;
}
// MySQL allows multi-row INSERT statements.
@Override
protected int getMaxRowsPerStatement() {
return 1000;
}
@Override
protected boolean useHsqldbTestServer() {
return false;
}
@Override
protected String getConnectString() {
return MySQLTestUtils.CONNECT_STRING;
}
@Override
protected String getTablePrefix() {
return TABLE_PREFIX;
}
@Override
protected String getDropTableStatement(String tableName) {
return "DROP TABLE IF EXISTS " + tableName;
}
@Before
public void setUp() {
super.setUp();
SqoopOptions options = new SqoopOptions(MySQLTestUtils.CONNECT_STRING,
getTableName());
options.setUsername(MySQLTestUtils.getCurrentUser());
this.manager = new MySQLManager(options);
try {
this.conn = manager.getConnection();
this.conn.setAutoCommit(false);
} catch (SQLException sqlE) {
LOG.error(StringUtils.stringifyException(sqlE));
fail("Failed with sql exception in setup: " + sqlE);
}
}
@After
public void tearDown() {
super.tearDown();
if (null != this.conn) {
try {
this.conn.close();
} catch (SQLException sqlE) {
LOG.error("Got SQLException closing conn: " + sqlE.toString());
}
}
if (null != manager) {
try {
manager.close();
manager = null;
} catch (SQLException sqlE) {
LOG.error("Got SQLException: " + sqlE.toString());
fail("Got SQLException: " + sqlE.toString());
}
}
}
@Override
protected String [] getCodeGenArgv(String... extraArgs) {
String [] moreArgs = new String[extraArgs.length + 2];
int i = 0;
for (i = 0; i < extraArgs.length; i++) {
moreArgs[i] = extraArgs[i];
}
// Add username argument for mysql.
moreArgs[i++] = "--username";
moreArgs[i++] = MySQLTestUtils.getCurrentUser();
return super.getCodeGenArgv(moreArgs);
}
@Override
protected String [] getArgv(boolean includeHadoopFlags,
int rowsPerStatement, int statementsPerTx, String... additionalArgv) {
String [] subArgv = newStrArray(additionalArgv,
"--username", MySQLTestUtils.getCurrentUser());
return super.getArgv(includeHadoopFlags, rowsPerStatement,
statementsPerTx, subArgv);
}
}

View File

@ -0,0 +1,198 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.manager;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.sqoop.SqoopOptions;
import org.apache.hadoop.sqoop.TestExport;
/**
* Test the OracleManager implementation's exportJob() functionality.
* This tests the OracleExportOutputFormat (which subclasses
* ExportOutputFormat with Oracle-specific SQL statements).
*/
public class OracleExportTest extends TestExport {
public static final Log LOG = LogFactory.getLog(
OracleExportTest.class.getName());
static final String TABLE_PREFIX = "EXPORT_ORACLE_";
// instance variables populated during setUp, used during tests.
private OracleManager manager;
private Connection conn;
@Override
protected Connection getConnection() {
return conn;
}
// Oracle allows multi-row inserts (with its own syntax).
@Override
protected int getMaxRowsPerStatement() {
return 1000;
}
@Override
protected boolean useHsqldbTestServer() {
return false;
}
@Override
protected String getConnectString() {
return OracleUtils.CONNECT_STRING;
}
@Override
protected String getTablePrefix() {
return TABLE_PREFIX;
}
@Override
protected String getDropTableStatement(String tableName) {
return OracleUtils.getDropTableStatement(tableName);
}
@Before
public void setUp() {
super.setUp();
SqoopOptions options = new SqoopOptions(OracleUtils.CONNECT_STRING,
getTableName());
OracleUtils.setOracleAuth(options);
this.manager = new OracleManager(options);
try {
this.conn = manager.getConnection();
this.conn.setAutoCommit(false);
} catch (SQLException sqlE) {
LOG.error(StringUtils.stringifyException(sqlE));
fail("Failed with sql exception in setup: " + sqlE);
}
}
@After
public void tearDown() {
super.tearDown();
if (null != this.conn) {
try {
this.conn.close();
} catch (SQLException sqlE) {
LOG.error("Got SQLException closing conn: " + sqlE.toString());
}
}
if (null != manager) {
try {
manager.close();
manager = null;
} catch (SQLException sqlE) {
LOG.error("Got SQLException: " + sqlE.toString());
fail("Got SQLException: " + sqlE.toString());
}
}
}
@Override
protected String [] getCodeGenArgv(String... extraArgs) {
String [] moreArgs = new String[extraArgs.length + 4];
int i = 0;
for (i = 0; i < extraArgs.length; i++) {
moreArgs[i] = extraArgs[i];
}
// Add username and password args.
moreArgs[i++] = "--username";
moreArgs[i++] = OracleUtils.ORACLE_USER_NAME;
moreArgs[i++] = "--password";
moreArgs[i++] = OracleUtils.ORACLE_USER_PASS;
return super.getCodeGenArgv(moreArgs);
}
@Override
protected String [] getArgv(boolean includeHadoopFlags,
int rowsPerStatement, int statementsPerTx, String... additionalArgv) {
String [] subArgv = newStrArray(additionalArgv,
"--username", OracleUtils.ORACLE_USER_NAME,
"--password", OracleUtils.ORACLE_USER_PASS);
return super.getArgv(includeHadoopFlags, rowsPerStatement,
statementsPerTx, subArgv);
}
@Override
protected ColumnGenerator getDateColumnGenerator() {
// Return a TIMESTAMP generator that has increasing date values.
// We currently do not support export of DATE columns since
// Oracle informs us that they are actually TIMESTAMP; this messes
// up Sqoop's parsing of values as we have no way of knowing they're
// really supposed to be dates based on the Oracle Jdbc Metadata.
return new ColumnGenerator() {
public String getExportText(int rowNum) {
int day = rowNum + 1;
return "2009-10-" + pad(day) + " 00:00:00.0";
}
public String getVerifyText(int rowNum) {
int day = rowNum + 1;
return "2009-10-" + day + " 0:0:0. 0";
}
public String getType() {
return "TIMESTAMP"; // TODO: Support DATE more intelligently.
}
};
}
@Override
protected ColumnGenerator getTimeColumnGenerator() {
// Return a TIMESTAMP generator that has increasing time values.
// We currently do not support the export of DATE columns with
// only a time component set (because Oracle reports these column
// types to Sqoop as TIMESTAMP, and we parse the user's text
// incorrectly based on this result).
return new ColumnGenerator() {
public String getExportText(int rowNum) {
return "1970-01-01 10:01:" + pad(rowNum) + ".0";
}
public String getVerifyText(int rowNum) {
return "1970-1-1 10:1:" + rowNum + ". 0";
}
public String getType() {
return "TIMESTAMP";
}
};
}
@Override
protected String getBigIntType() {
// Oracle stores everything in NUMERIC columns.
return "NUMERIC(12,0)";
}
}

View File

@ -80,8 +80,7 @@ public static void dropTable(String tableName, ConnManager manager)
st = connection.createStatement();
// create the database table and populate it with data.
st.executeUpdate("BEGIN EXECUTE IMMEDIATE 'DROP TABLE " + tableName + "'; "
+ "exception when others then null; end;");
st.executeUpdate(getDropTableStatement(tableName));
connection.commit();
} finally {
@ -94,4 +93,9 @@ public static void dropTable(String tableName, ConnManager manager)
}
}
}
public static String getDropTableStatement(String tableName) {
return "BEGIN EXECUTE IMMEDIATE 'DROP TABLE " + tableName + "'; "
+ "exception when others then null; end;";
}
}

View File

@ -31,11 +31,12 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.sqoop.Sqoop;
import org.apache.hadoop.sqoop.mapreduce.ExportOutputFormat;
import org.apache.hadoop.sqoop.tool.ExportTool;
/**
* Class that implements common methods required for tests which export data
* from HDFS to databases, to verify correct export
* from HDFS to databases, to verify correct export.
*/
public class ExportJobTestCase extends BaseSqoopTestCase {
@ -45,6 +46,15 @@ protected String getTablePrefix() {
return "EXPORT_TABLE_";
}
/**
* @return the maximum rows to fold into an INSERT statement.
* HSQLDB can only support the single-row INSERT syntax. Other databases
* can support greater numbers of rows per statement.
*/
protected int getMaxRowsPerStatement() {
return 1;
}
/**
* @return a connection to the database under test.
*/
@ -60,13 +70,27 @@ protected Connection getConnection() {
/**
* Create the argv to pass to Sqoop
* @param includeHadoopFlags if true, then include -D various.settings=values
* @param rowsPerStmt number of rows to export in a single INSERT statement.
* @param statementsPerTx ## of statements to use in a transaction.
* @return the argv as an array of strings.
*/
protected String [] getArgv(boolean includeHadoopFlags, String... additionalArgv) {
protected String [] getArgv(boolean includeHadoopFlags,
int rowsPerStmt, int statementsPerTx, String... additionalArgv) {
ArrayList<String> args = new ArrayList<String>();
if (includeHadoopFlags) {
CommonArgs.addHadoopFlags(args);
args.add("-D");
int realRowsPerStmt = Math.min(rowsPerStmt, getMaxRowsPerStatement());
if (realRowsPerStmt != rowsPerStmt) {
LOG.warn("Rows per statement set to " + realRowsPerStmt
+ " by getMaxRowsPerStatement() limit.");
}
args.add(ExportOutputFormat.RECORDS_PER_STATEMENT_KEY + "="
+ realRowsPerStmt);
args.add("-D");
args.add(ExportOutputFormat.STATEMENTS_PER_TRANSACTION_KEY + "="
+ statementsPerTx);
}
// Any additional Hadoop flags (-D foo=bar) are prepended.
@ -201,6 +225,10 @@ protected void verifyExport(int expectedNumRecords) throws IOException, SQLExcep
assertEquals("Got back unexpected row count", expectedNumRecords,
actualNumRecords);
if (expectedNumRecords == 0) {
return; // Nothing more to verify.
}
// Check that we start with row 0.
int minVal = getMinRowId();
assertEquals("Minimum row was not zero", 0, minVal);