5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 20:21:30 +08:00

SQOOP-1056: Implement connection resiliency in Sqoop using pluggable failure handlers

SQOOP-1057: Introduce fault injection framework to test connection resiliency

(Shuaishuai Nie via Venkat Ranganathan)
This commit is contained in:
Venkat Ranganathan 2014-01-31 19:48:52 -08:00
parent ad12695b59
commit 03fa9c5302
24 changed files with 2421 additions and 20 deletions

View File

@ -193,6 +193,7 @@
<!-- root directory for output/intermediate data for testing -->
<property name="build.test" location="${build.dir}/test"/>
<property name="test.log.dir" location="${build.dir}/test/logs"/>
<property name="test.build.extraconf" value="${build.test}/extraconf" />
<!-- compiled test classes -->
<property name="build.test.classes" location="${build.test}/classes" />
@ -255,6 +256,8 @@
<property name="cobertura.format" value="html" /> <!-- may be 'xml' -->
<property name="cobertura.class.dir" value="${cobertura.dir}/classes" />
<!-- aspectJ fault injection -->
<import file="${test.dir}/aop/build/aop.xml"/>
<!-- Checking code style -->
<property name="checkstyle.xml" value="${test.dir}/checkstyle.xml" />
<property name="checkstyle.format.xsl"
@ -414,6 +417,7 @@
<!-- Classpath for unit tests (superset of compile.classpath) -->
<path id="test.classpath">
<pathelement location="${build.test.classes}" />
<pathelement location="${test.build.extraconf}"/>
<path refid="${name}.hadooptest.classpath" />
<path refid="compile.classpath" />
</path>
@ -480,6 +484,7 @@
depends="compile, ivy-retrieve-hadoop-test"
description="Compile test classes">
<mkdir dir="${build.test.classes}" />
<mkdir dir="${test.build.extraconf}"/>
<javac
encoding="${build.encoding}"
srcdir="${test.dir}"
@ -805,6 +810,8 @@
<delete dir="${build.test}/data"/>
<mkdir dir="${build.test}/data/sqoop" />
<mkdir dir="${cobertura.class.dir}" />
<copy file="${test.dir}/fi-site.xml"
todir="${test.build.extraconf}" />
<junit
printsummary="yes" showoutput="${test.output}"
haltonfailure="no" fork="yes" maxmemory="512m"
@ -1404,4 +1411,46 @@
</classpath>
</eclipse>
</target>
<!-- Fault injection customization section -->
<target name="justjar" depends="ivy-resolve-test">
<echo message="Project: ${ant.project.name}" />
<jar jarfile="${build.dir}/${test.jar}" basedir="${build.test.classes}"/>
</target>
<target name="jar-fault-inject" depends="init, injectfaults"
description="Make sqoop-fi.jar">
<macro-jar-fault-inject
target.name="justjar"
build.dir="${build-fi.dir}"
jar.final.name="final.name"
jar.final.value="${final.name}-fi" />
<copy todir="${lib.dir}">
<fileset dir="${build-fi.dir}">
<include name="*.jar"/>
</fileset>
</copy>
</target>
<target name="jar-test-fault-inject" depends="init, injectfaults"
description="Make sqoop-test-fi.jar">
<macro-jar-test-fault-inject
target.name="test-jar"
jar.final.name="test.final.name"
jar.final.value="${test.final.name}-fi" />
</target>
<target name="run-fault-inject-with-testcaseonly" depends="init, injectfaults">
<fail unless="testcase">Can't run this target without -Dtestcase setting!
</fail>
<macro-run-tests-fault-inject target.name="test"
testcasesonly="true"/>
</target>
<target name="run-test-core-fault-inject" depends="init, injectfaults"
description="Run full set of the unit tests with fault injection">
<macro-run-tests-fault-inject target.name="test"
testcasesonly="false"/>
</target>
</project>

20
ivy.xml
View File

@ -97,6 +97,10 @@ under the License.
rev="${hadoop.version}" conf="hadoop210->default"/>
<dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-core"
rev="${hadoop.version}" conf="hadoop210->default"/>
<dependency org="org.aspectj" name="aspectjtools" rev="${aspectj.version}"
conf="hadoop210->default"/>
<dependency org="org.aspectj" name="aspectjrt" rev="${aspectj.version}"
conf="hadoop210->default"/>
<!-- Dependencies for Hadoop 2.0.0 -->
<dependency org="org.apache.hadoop" name="hadoop-common"
@ -113,6 +117,10 @@ under the License.
rev="${hadoop.version}" conf="hadoop200->default"/>
<dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-core"
rev="${hadoop.version}" conf="hadoop200->default"/>
<dependency org="org.aspectj" name="aspectjtools" rev="${aspectj.version}"
conf="hadoop200->default"/>
<dependency org="org.aspectj" name="aspectjrt" rev="${aspectj.version}"
conf="hadoop200->default"/>
<!-- Dependencies for Hadoop 0.23 -->
<dependency org="org.apache.hadoop" name="hadoop-common"
@ -129,18 +137,30 @@ under the License.
rev="${hadoop.version}" conf="hadoop23->default"/>
<dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-core"
rev="${hadoop.version}" conf="hadoop23->default"/>
<dependency org="org.aspectj" name="aspectjtools" rev="${aspectj.version}"
conf="hadoop23->default"/>
<dependency org="org.aspectj" name="aspectjrt" rev="${aspectj.version}"
conf="hadoop23->default"/>
<!-- Dependencies for Hadoop 1.0.0 -->
<dependency org="org.apache.hadoop" name="hadoop-core"
rev="${hadoop.version}" conf="hadoop100->default"/>
<dependency org="org.apache.hadoop" name="hadoop-test"
rev="${hadoop.version}" conf="hadoop100test->default"/>
<dependency org="org.aspectj" name="aspectjtools" rev="${aspectj.version}"
conf="hadoop100->default"/>
<dependency org="org.aspectj" name="aspectjrt" rev="${aspectj.version}"
conf="hadoop100->default"/>
<!-- Dependencies for Hadoop 0.20 -->
<dependency org="org.apache.hadoop" name="hadoop-core"
rev="${hadoop.version}" conf="hadoop20->default"/>
<dependency org="org.apache.hadoop" name="hadoop-test"
rev="${hadoop.version}" conf="hadoop20test->default"/>
<dependency org="org.aspectj" name="aspectjtools" rev="${aspectj.version}"
conf="hadoop20->default"/>
<dependency org="org.aspectj" name="aspectjrt" rev="${aspectj.version}"
conf="hadoop20->default"/>
<!-- Common dependencies for Sqoop -->
<dependency org="commons-cli" name="commons-cli"

View File

@ -42,4 +42,6 @@ mvn.version=2.0.10
rats-lib.version=0.5.1
aspectj.version=1.6.11
postgresql.version=9.2-1003-jdbc4

View File

@ -27,12 +27,17 @@
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.mapreduce.SQLServerResilientExportOutputFormat;
import org.apache.sqoop.mapreduce.SQLServerResilientUpdateOutputFormat;
import org.apache.sqoop.mapreduce.db.SQLServerDBInputFormat;
import org.apache.sqoop.mapreduce.db.SQLServerConnectionFailureHandler;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.mapreduce.JdbcExportJob;
import com.cloudera.sqoop.mapreduce.JdbcUpdateExportJob;
import com.cloudera.sqoop.util.ExportException;
import com.cloudera.sqoop.util.ImportException;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.cli.RelatedOptions;
import org.apache.sqoop.mapreduce.sqlserver.SqlServerExportBatchOutputFormat;
import org.apache.sqoop.mapreduce.sqlserver.SqlServerInputFormat;
@ -52,6 +57,9 @@ public class SQLServerManager
public static final Log LOG = LogFactory.getLog(
SQLServerManager.class.getName());
// Option set in extra-arguments to disable resiliency and use default mode
public static final String NON_RESILIENT_OPTION = "non-resilient";
// driver class to ensure is loaded when making db connection.
private static final String DRIVER_CLASS =
"com.microsoft.sqlserver.jdbc.SQLServerDriver";
@ -91,9 +99,12 @@ public String toJavaType(int sqlType) {
String javaType;
if (sqlType == DATETIMEOFFSET) {
// We cannot use the TimeStamp class to represent MS SQL Server datetimeoffset
// data type since it does not preserve time zone offset values, so use String
// instead which would work for import/export
/*
* We cannot use the TimeStamp class to represent MS SQL Server
* datetimeoffset data type since it does not preserve time zone
* offset values, so use String instead which would work for
* import/export.
*/
javaType = "String";
}else {
//If none of the above data types match, it returns parent method's
@ -118,9 +129,20 @@ public void importTable(
if (tableHints != null) {
configuration.set(TABLE_HINTS_PROP, tableHints);
}
// Set our own input format
context.setInputFormat(SqlServerInputFormat.class);
if (!isNonResilientOperation()) {
// Enable connection recovery only if split column is provided
SqoopOptions opts = context.getOptions();
String splitCol = getSplitColumn(opts, context.getTableName());
if (splitCol != null) {
// Configure SQLServer table import jobs for connection recovery
configureConnectionRecoveryForImport(context);
} else {
// Set our own input format
context.setInputFormat(SqlServerInputFormat.class);
}
} else {
context.setInputFormat(SqlServerInputFormat.class);
}
super.importTable(context);
}
@ -137,12 +159,36 @@ public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context)
if (tableHints != null) {
configuration.set(TABLE_HINTS_PROP, tableHints);
}
JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
JdbcExportJob exportJob;
if (isNonResilientOperation()) {
exportJob = new JdbcExportJob(context, null, null,
SqlServerExportBatchOutputFormat.class);
} else {
exportJob = new JdbcExportJob(context, null, null,
SQLServerResilientExportOutputFormat.class);
configureConnectionRecoveryForExport(context);
}
exportJob.runExport();
}
@Override
/**
* {@inheritDoc}
*/
public void updateTable(
com.cloudera.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
if (isNonResilientOperation()) {
super.updateTable(context);
} else {
context.setConnManager(this);
JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context, null,
null, SQLServerResilientUpdateOutputFormat.class);
configureConnectionRecoveryForUpdate(context);
exportJob.runExport();
}
}
/**
* SQLServer does not support the CURRENT_TIMESTAMP() function. Instead
* it has the notion of keyword CURRENT_TIMESTAMP that resolves to the
@ -171,9 +217,9 @@ protected String getSchemaQuery() {
protected String getListColumnsQuery(String tableName) {
return
super.getListColumnsQuery(tableName)
+ " ORDER BY ORDINAL_POSITION";
+ " ORDER BY ORDINAL_POSITION";
}
@Override
public String escapeColName(String colName) {
return escapeObjectName(colName);
@ -263,5 +309,107 @@ private RelatedOptions getExtraOptions() {
return extraOptions;
}
/**
* Launch a MapReduce job via DataDrivenImportJob to read the table with
* SQLServerDBInputFormat which handles connection failures while
* using free-form query importer.
*/
public void importQuery(com.cloudera.sqoop.manager.ImportJobContext context)
throws IOException, ImportException {
if (!isNonResilientOperation()) {
// Enable connection recovery only if split column is provided
SqoopOptions opts = context.getOptions();
String splitCol = getSplitColumn(opts, context.getTableName());
if (splitCol != null) {
// Configure SQLServer query import jobs for connection recovery
configureConnectionRecoveryForImport(context);
}
}
super.importQuery(context);
}
/**
* Configure SQLServer Sqoop Jobs to recover failed connections by using
* SQLServerConnectionFailureHandler by default.
*/
protected void configureConnectionRecoveryForImport(
com.cloudera.sqoop.manager.ImportJobContext context) {
Configuration conf = context.getOptions().getConf();
// Configure input format class
context.setInputFormat(SQLServerDBInputFormat.class);
// Set connection failure handler and recovery settings
// Default settings can be overridden if provided as Configuration
// properties by the user
if (conf.get(SQLServerDBInputFormat.IMPORT_FAILURE_HANDLER_CLASS)
== null) {
conf.set(SQLServerDBInputFormat.IMPORT_FAILURE_HANDLER_CLASS,
SQLServerConnectionFailureHandler.class.getName());
}
}
/**
* Configure SQLServer Sqoop export Jobs to recover failed connections by
* using SQLServerConnectionFailureHandler by default.
*/
protected void configureConnectionRecoveryForExport(
com.cloudera.sqoop.manager.ExportJobContext context) {
Configuration conf = context.getOptions().getConf();
// Set connection failure handler and recovery settings
// Default settings can be overridden if provided as Configuration
// properties by the user
String clsFailureHandler = conf.get(
SQLServerResilientExportOutputFormat.EXPORT_FAILURE_HANDLER_CLASS);
if (clsFailureHandler == null) {
conf.set(
SQLServerResilientExportOutputFormat.EXPORT_FAILURE_HANDLER_CLASS,
SQLServerConnectionFailureHandler.class.getName());
}
}
/**
* Configure SQLServer Sqoop Update Jobs to recover connection failures by
* using SQLServerConnectionFailureHandler by default.
*/
protected void configureConnectionRecoveryForUpdate(
com.cloudera.sqoop.manager.ExportJobContext context) {
Configuration conf = context.getOptions().getConf();
// Set connection failure handler and recovery settings
// Default settings can be overridden if provided as Configuration
// properties by the user
String clsFailureHandler = conf.get(
SQLServerResilientExportOutputFormat.EXPORT_FAILURE_HANDLER_CLASS);
if (clsFailureHandler == null) {
conf.set(
SQLServerResilientExportOutputFormat.EXPORT_FAILURE_HANDLER_CLASS,
SQLServerConnectionFailureHandler.class.getName());
}
}
/**
* Check if the user has requested the operation to be non resilient.
*/
protected boolean isNonResilientOperation() {
String [] extraArgs = options.getExtraArgs();
if (extraArgs != null) {
// Traverse the extra options
for (int iArg = 0; iArg < extraArgs.length; ++iArg) {
String currentArg = extraArgs[iArg];
if (currentArg.startsWith("--")
&& currentArg.substring(2).equalsIgnoreCase(NON_RESILIENT_OPTION)) {
// User has explicitly requested the operation to be non-resilient
return true;
}
}
}
return false;
}
}

View File

@ -43,7 +43,6 @@
import org.apache.sqoop.accumulo.AccumuloUtil;
import org.apache.sqoop.mapreduce.AccumuloImportJob;
import org.apache.sqoop.mapreduce.JdbcCallExportJob;
import org.apache.sqoop.tool.BaseSqoopTool;
import org.apache.sqoop.util.LoggingUtils;
import org.apache.sqoop.mapreduce.HBaseBulkImportJob;
import org.apache.sqoop.util.SqlTypeMap;
@ -591,7 +590,7 @@ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
throw new ImportException("HBase jars are not present in "
+ "classpath, cannot import to HBase!");
}
if(!opts.isBulkLoadEnabled()){
if (!opts.isBulkLoadEnabled()){
importer = new HBaseImportJob(opts, context);
} else {
importer = new HBaseBulkImportJob(opts, context);
@ -634,7 +633,7 @@ public void importQuery(com.cloudera.sqoop.manager.ImportJobContext context)
throw new ImportException("HBase jars are not present in classpath,"
+ " cannot import to HBase!");
}
if(!opts.isBulkLoadEnabled()){
if (!opts.isBulkLoadEnabled()){
importer = new HBaseImportJob(opts, context);
} else {
importer = new HBaseBulkImportJob(opts, context);

View File

@ -0,0 +1,355 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.SynchronousQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.mapreduce.db.SQLFailureHandler;
/**
* Represents a database update thread that runs asynchronously to perform
* database operations on the given records
*
* The asynchronous thread receives a batch of records that it writes to
* the database. It uses the configured connection handler to recover from
* connection failures (if possible) until the records are inserted/updated
* in the database
*/
public abstract class SQLServerAsyncDBExecThread extends Thread {
private static final Log LOG = LogFactory.getLog(
SQLServerAsyncDBExecThread.class);
// Recover failed operations for RETRY_MAX
protected static final int RETRY_MAX = 3;
protected static final int RETRY_INTERVAL = 5 * 1000;
private Connection conn; // The connection to the database.
private DBConfiguration dbConf = null;
private SynchronousQueue<List<SqoopRecord>> recordListQueue;
private boolean stop = false;
private Exception err;
// The SQL handler to be used for recovering failed write operations
private SQLFailureHandler failureHandler = null;
protected Configuration conf = null;
protected String tableName;
protected String [] columnNames; // The columns to insert into.
protected int columnCount; // If columnNames is null, tells ## of cols.
/**
* Create a new thread that interacts with the database.
*/
public SQLServerAsyncDBExecThread() {
recordListQueue = new SynchronousQueue<List<SqoopRecord>>();
}
/**
* Initialize the writer thread with Job Configuration.
*/
public void initialize(Configuration c) throws IOException {
// Create a DBConf from the given Configuration
this.conf = c;
this.dbConf = new DBConfiguration(conf);
tableName = dbConf.getOutputTableName();
columnNames = dbConf.getOutputFieldNames();
columnCount = dbConf.getOutputFieldCount();
// Get the SQL Failure handler to be used for recovering failed write
// operations
failureHandler = getSQLFailureHandler();
failureHandler.initialize(conf);
}
/**
* Get the SQL Failure handler to be used for recovering failed write
* operations.
*/
protected SQLFailureHandler getSQLFailureHandler() throws IOException {
if (failureHandler == null) {
Class<? extends SQLFailureHandler> connHandlerClass;
try {
String className = conf.get(
SQLServerResilientExportOutputFormat.EXPORT_FAILURE_HANDLER_CLASS);
// Get the class-name set in configuration
connHandlerClass =
(Class<? extends SQLFailureHandler>) conf.getClassByName(className);
} catch (ClassNotFoundException ex) {
LOG.error("Failed to find class: "
+ SQLServerResilientExportOutputFormat.EXPORT_FAILURE_HANDLER_CLASS);
throw new IOException(ex);
}
// Verify handler class is a subclass of SQLFailureHandler
if (!SQLFailureHandler.class.isAssignableFrom(connHandlerClass)) {
String error = "A subclass of " + SQLFailureHandler.class.getName()
+ " is expected. Actual class set is: "
+ connHandlerClass.getName();
LOG.error(error);
throw new IOException(error);
}
LOG.trace("Using connection handler class: " + connHandlerClass);
// Load the configured connection failure handler
failureHandler = ReflectionUtils.newInstance(connHandlerClass, conf);
}
return failureHandler;
}
protected DBConfiguration getDBConfiguration() {
return dbConf;
}
/**
* Create the connection to use for exporting records. If the connection is
* already created, then return it
*/
protected Connection getConnection() throws SQLException {
if (conn == null || conn.isClosed()) {
try {
conn = dbConf.getConnection();
configureConnection();
} catch (ClassNotFoundException cnfEx) {
LOG.error("Cannot create connection. Driver class not found: "
+ cnfEx);
}
}
return conn;
}
protected Configuration getConf() {
return this.conf;
}
/**
* Configure the connection object used for writing records to the database.
* Subclasses should override this method to change connection
* configuration.
*/
protected void configureConnection() throws SQLException {
conn.setAutoCommit(false);
}
/**
* Enqueue the next list of records to be processed, if the previous
* list is still being processed, we will block until it completes.
* The call blocks if another batch of records is still being processed.
*/
public void put(List<SqoopRecord> recordList)
throws InterruptedException, IOException {
// Check for any exception raised when writing to the database
Exception lastException = getLastError();
if (lastException != null) {
LOG.error("Asynchronous writer thread encountered the following "
+ "exception: " + lastException.toString());
throw new IOException(lastException);
}
recordListQueue.put((List<SqoopRecord>) recordList);
}
/**
* Get the next list of records to be processed, or wait until one becomes
* available.
* @throws InterruptedException
*/
protected List<SqoopRecord> take() throws InterruptedException {
return recordListQueue.take();
}
/** {@inheritDoc} */
@Override
public void start() {
stop = false;
super.start();
}
/**
* Stop the current thread skipping any subsequent database operations on
* records that have not yet been processed.
*/
public void close() {
stop = true;
// In case the thread is blocked inside the take() method, offer
// an empty list which is simply ignored
recordListQueue.offer(new ArrayList<SqoopRecord>());
}
/**
* Indicate whether the current thread is running and accepting records to
* send to the database.
*/
public boolean isRunning() {
return !stop;
}
/**
* Consume records from the list to be written to the database.
* Block until we have records available in the list.
*/
@Override
public void run() {
while (!stop) {
List<SqoopRecord> recordList = null;
try {
// Block until we get a list of records to process
recordList = take();
} catch (InterruptedException ie) {
LOG.warn("Interrupted while waiting for more records");
continue;
}
// Ensure we do not have a null or empty list
if (recordList == null || recordList.size() == 0) {
LOG.warn("Got a Null or empty list. skipping");
continue;
}
// Write the current list of records to the database
try {
write(recordList);
} catch (Exception ex) {
LOG.error("Failed to write records.", ex);
setLastError(ex);
// Stop processing incoming batches and remove any queued ones
close();
recordListQueue.poll();
}
}
}
/**
* Write the records to the database. If a failure occurs, it tries to
* use the configured handler to recover from the failure, otherwise
* a SQLException is throw
*/
protected void write(List<SqoopRecord> records)
throws SQLException, IOException {
PreparedStatement stmt = null;
int retryCount = RETRY_MAX;
boolean doRetry = true;
do {
try {
// Establish the connection to be used if not yet created
getConnection();
// Get the prepared statement to use for writing the records
stmt = getPreparedStatement(records);
// Execute the prepared statement
executeStatement(stmt, records);
// Statement executed successfully, no need to retry
doRetry = false;
} catch (SQLException sqlEx) {
LOG.warn("Trying to recover from DB write failure: ", sqlEx);
// Use configured connection handler to recover from the connection
// failure and use the recovered connection.
// If the failure cannot be recovered, an exception is thrown
if (failureHandler.canHandleFailure(sqlEx)) {
// Recover from connection failure
this.conn = failureHandler.recover();
// Configure the new connection before using it
configureConnection();
--retryCount;
doRetry = (retryCount >= 0);
} else {
// Cannot recover using configured handler, re-throw
throw new IOException("Registered handler cannot recover error "
+ "with SQL State: " + sqlEx.getSQLState() + ", error code: "
+ sqlEx.getErrorCode(), sqlEx);
}
}
} while (doRetry);
// Throw an exception if all retry attempts are consumed
if (retryCount < 0) {
throw new IOException("Failed to write to database after "
+ RETRY_MAX + " retries.");
}
}
/**
* Generate the PreparedStatement object that will be used to write records
* to the database. All parameterized fields of the PreparedStatement must
* be set in this method as well; this is usually based on the records
* collected from the user in the records list
*
* This method must be overridden by sub-classes to define the database
* operation to be executed for user records
*/
protected abstract PreparedStatement getPreparedStatement(
List<SqoopRecord> records) throws SQLException;
/**
* Execute the provided PreparedStatement, by default this assume batch
* execute, but this can be overridden by subclasses for a different mode
* of execution which should match getPreparedStatement implementation.
*/
protected abstract void executeStatement(PreparedStatement stmt,
List<SqoopRecord> records) throws SQLException;
/**
* Report any SQL Exception that could not be automatically handled or
* recovered.
*
* If the error slot was already filled, then subsequent errors are
* squashed until the user calls this method (which clears the error
* slot).
* @return any unrecovered SQLException that occurred due to a
* previously-run database operation.
*/
public synchronized Exception getLastError() {
Exception e = this.err;
this.err = null;
return e;
}
private synchronized void setLastError(Exception e) {
if (this.err == null) {
// Just set it.
LOG.error("Got exception in update thread: "
+ StringUtils.stringifyException(e));
this.err = e;
} else {
// Slot is full. Log it and discard.
LOG.error("Exception in update thread but error slot full: "
+ StringUtils.stringifyException(e));
}
}
}

View File

@ -0,0 +1,175 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.sqoop.manager.SQLServerManager;
/**
* Represents a database update thread that runs asynchronously to perform
* database insert operations on the given records
*
* The asynchronous thread receives a batch of records that it writes to
* the database. It uses the configured connection handler to recover from
* connection failures (if possible) until the records are inserted/updated
* in the database
*/
public class SQLServerExportDBExecThread extends
SQLServerAsyncDBExecThread {
private static final Log LOG = LogFactory.getLog(
SQLServerExportDBExecThread.class);
protected static final String SQLSTATE_CODE_CONSTRAINT_VIOLATION = "23000";
private boolean failedCommit = false;
/**
* Generate the PreparedStatement object that will be used to insert records
* to the database. All parameterized fields of the PreparedStatement must
* be set in this method as well; this is usually based on the records
* collected from the user in the records list
*
* This method must be overridden by sub-classes to define the database
* operation to be executed for user records
*/
@Override
protected PreparedStatement getPreparedStatement(
List<SqoopRecord> records) throws SQLException {
PreparedStatement stmt = null;
Connection conn = getConnection();
// Create a PreparedStatement object to insert all records
stmt = conn.prepareStatement(getInsertStatement(records.size()));
// Inject the record parameters into the VALUES clauses.
for (SqoopRecord record : records) {
record.write(stmt, 0);
stmt.addBatch();
}
return stmt;
}
/**
* Execute the provided PreparedStatement, by default this assume batch
* execute, but this can be overridden by subclasses for a different mode
* of execution which should match getPreparedStatement implementation
*/
@Override
protected void executeStatement(PreparedStatement stmt,
List<SqoopRecord> records) throws SQLException {
// On failures in commit step, we cannot guarantee that transactions have
// been successfully committed to the database.
// This can result in access violation issues for columns with unique
// constraints.
// One way to handle this is check whether records are committed in the
// database before propagating the failure for retry. However in case we
// have connection loss, we wont be able to access the database.
// An alternative option is to ignore violation issues in the next retry
// in case the records have been already committed
Connection conn = getConnection();
try {
stmt.executeBatch();
} catch (SQLException execSqlEx) {
LOG.warn("Error executing statement: " + execSqlEx);
if (failedCommit &&
canIgnoreForFailedCommit(execSqlEx.getSQLState())){
LOG.info("Ignoring error after failed commit");
} else {
throw execSqlEx;
}
}
// If the batch of records is executed successfully, then commit before
// processing the next batch of records
try {
conn.commit();
failedCommit = false;
} catch (SQLException commitSqlEx) {
LOG.warn("Error while committing transactions: " + commitSqlEx);
failedCommit = true;
throw commitSqlEx;
}
}
/**
* Create an INSERT statement for the given records
*/
protected String getInsertStatement(int numRows) {
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO " + tableName + " ");
String tableHints = getConf().get(SQLServerManager.TABLE_HINTS_PROP);
if (tableHints != null) {
LOG.info("Using table hints: " + tableHints);
sb.append(" WITH (").append(tableHints).append(") ");
}
int numSlots;
if (this.columnNames != null) {
numSlots = this.columnNames.length;
sb.append("(");
boolean first = true;
for (String col : columnNames) {
if (!first) {
sb.append(", ");
}
sb.append(col);
first = false;
}
sb.append(") ");
} else {
numSlots = this.columnCount; // set if columnNames is null.
}
sb.append("VALUES ");
// generates the (?, ?, ?...).
sb.append("(");
for (int i = 0; i < numSlots; i++) {
if (i != 0) {
sb.append(", ");
}
sb.append("?");
}
sb.append(")");
return sb.toString();
}
/**
* Specify whether the given SQL State error is expected after failed
* commits. For example, constraint violation
*/
protected boolean canIgnoreForFailedCommit(String sqlState){
return sqlState == SQLSTATE_CODE_CONSTRAINT_VIOLATION;
}
}

View File

@ -0,0 +1,196 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.lib.SqoopRecord;
/**
* Insert the emitted keys as records into a database table.
* Insert failures are handled by the registered Failure Handler class which
* allows for recovering from certain failures like intermittent connection
* or database throttling, .. etc
*
* The number of records per transaction is governed by the
* sqoop.export.records.per.statement configuration value or else default
* value is used
*
* Record objects are buffered before actually performing the INSERT
* statements; this requires that the key implement the
* SqoopRecord interface.
*/
public class SQLServerResilientExportOutputFormat<K extends SqoopRecord, V>
extends OutputFormat<K, V> {
private static final Log LOG = LogFactory.getLog(
SQLServerResilientExportOutputFormat.class);
public static final String EXPORT_FAILURE_HANDLER_CLASS =
"sqoop.export.failure.handler.class";
public static final int DEFAULT_RECORDS_PER_STATEMENT = 1000;
private int curListIdx = 0;
@Override
/** {@inheritDoc} */
public void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
DBConfiguration dbConf = new DBConfiguration(conf);
// Sanity check all the configuration values we need.
if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
throw new IOException("Database connection URL is not set.");
} else if (null == dbConf.getOutputTableName()) {
throw new IOException("Table name is not set for export");
} else if (null == dbConf.getOutputFieldNames()
&& 0 == dbConf.getOutputFieldCount()) {
throw new IOException(
"Output field names are null and zero output field count set.");
}
}
@Override
/** {@inheritDoc} */
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException {
try {
return new SQLServerExportRecordWriter<K, V>(context);
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
/** {@inheritDoc} */
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException {
return new NullOutputCommitter();
}
/**
* RecordWriter to write the output to a row in a database table.
* The actual database updates are executed in a parallel thread in a
* resilient fashion which attempts to recover failed operations
*/
public class SQLServerExportRecordWriter<K extends SqoopRecord, V>
extends RecordWriter<K, V> {
private final Log LOG = LogFactory.getLog(
SQLServerExportRecordWriter.class);
private final int LIST_COUNT = 2;
protected Configuration conf;
protected SQLServerAsyncDBExecThread execThread;
// Number of records to buffer before sending as a batch
protected int recordsPerStmt;
// We alternate between 2 lists of records as we go, as one is sent to the
// target database the other gets asynchronously filled
protected List<List<SqoopRecord>> recordsLists = new ArrayList<List<SqoopRecord>>();
protected List<SqoopRecord> currentList;
public SQLServerExportRecordWriter(TaskAttemptContext context)
throws IOException {
conf = context.getConfiguration();
recordsPerStmt = conf.getInt(
AsyncSqlOutputFormat.RECORDS_PER_STATEMENT_KEY,
DEFAULT_RECORDS_PER_STATEMENT);
// Create the lists to host incoming records
List<SqoopRecord> newList;
for (int i = 0; i < LIST_COUNT; ++i) {
newList = new ArrayList<SqoopRecord>(recordsPerStmt);
recordsLists.add(newList);
}
currentList = recordsLists.get(0);
// Initialize the DB exec Thread
initializeExecThread();
// Start the DB exec thread
execThread.start();
}
/**
* Initialize the thread used to perform the asynchronous DB operation
*/
protected void initializeExecThread() throws IOException {
execThread = new SQLServerExportDBExecThread();
execThread.initialize(conf);
}
@Override
/** {@inheritDoc} */
public void write(K key, V value)
throws InterruptedException, IOException {
try {
currentList.add((SqoopRecord) key.clone());
if (currentList.size() >= this.recordsPerStmt) {
// Schedule the current list for asynchronous transfer
// This will block if the previous operation is still in progress
execThread.put(currentList);
// Switch to the other list for receiving incoming records
curListIdx = (curListIdx + 1) % recordsLists.size();
// Clear the list to be used in case it has previous records
currentList = recordsLists.get(curListIdx);
currentList.clear();
}
} catch (CloneNotSupportedException cnse) {
throw new IOException("Could not buffer record", cnse);
}
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
try {
// Ensure we flush the list of records to the database
if (currentList.size() > 0) {
execThread.put(currentList);
}
}
finally {
execThread.close();
execThread.join();
}
// Final check for any exceptions raised when writing to the database
Exception lastException = execThread.getLastError();
if (lastException != null) {
LOG.error("Asynchronous writer thread encountered the following " +
"exception: " + lastException.toString());
throw new IOException(lastException);
}
}
}
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
/**
* Update an existing table of data with new value data.
* This requires a designated 'key column' for the WHERE clause
* of an UPDATE statement.
*
* The number of records per transaction is governed by the
* sqoop.export.records.per.statement configuration value or else default
* value is used
*
* Record objects are buffered before actually performing the UPDATE
* statements; this requires that the key implement the
* SqoopRecord interface.
*/
public class SQLServerResilientUpdateOutputFormat<K extends SqoopRecord, V>
extends SQLServerResilientExportOutputFormat<K, V> {
private static final Log LOG = LogFactory.getLog(
SQLServerResilientUpdateOutputFormat.class);
@Override
/** {@inheritDoc} */
public void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
DBConfiguration dbConf = new DBConfiguration(conf);
// Sanity check all the configuration values we need.
if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
throw new IOException("Database connection URL is not set.");
} else if (null == dbConf.getOutputTableName()) {
throw new IOException("Table name is not set for export.");
} else if (null == dbConf.getOutputFieldNames()) {
throw new IOException(
"Output field names are null.");
} else if (null == conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY)) {
throw new IOException("Update key column is not set for export.");
}
}
@Override
/** {@inheritDoc} */
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException {
try {
return new SQLServerUpdateRecordWriter(context);
} catch (Exception e) {
throw new IOException(e);
}
}
/**
* RecordWriter to write the output to UPDATE statements modifying rows
* in the database.
* The actual database updates are executed in a parallel thread in a
* resilient fashion which attempts to recover failed operations
*/
public class SQLServerUpdateRecordWriter<K extends SqoopRecord, V>
extends SQLServerExportRecordWriter<K, V> {
private final Log LOG = LogFactory.getLog(
SQLServerUpdateRecordWriter.class);
public SQLServerUpdateRecordWriter(TaskAttemptContext context)
throws IOException {
super(context);
}
/**
* Initialize the thread used to perform the asynchronous DB operation
*/
protected void initializeExecThread() throws IOException {
execThread = new SQLServerUpdateDBExecThread();
execThread.initialize(conf);
}
}
}

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.lib.SqoopRecord;
/**
* Represents a database update thread that runs asynchronously to perform
* database Update operations on the given records
*
* The asynchronous thread receives a batch of records that it writes to
* the database. It uses the configured connection handler to recover from
* connection failures (if possible) until the records are inserted/updated
* in the database
*/
public class SQLServerUpdateDBExecThread extends
SQLServerExportDBExecThread {
private static final Log LOG = LogFactory.getLog(
SQLServerUpdateDBExecThread.class);
protected String [] updateCols; // The columns containing the fixed key.
/**
* Initialize the writer thread with Job Configuration
*/
@Override
public void initialize(Configuration conf) throws IOException {
super.initialize(conf);
// Get the update columns
String updateKeyColumns =
conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY);
Set<String> updateKeys = new LinkedHashSet<String>();
StringTokenizer stok = new StringTokenizer(updateKeyColumns, ",");
while (stok.hasMoreTokens()) {
String nextUpdateKey = stok.nextToken().trim();
if (nextUpdateKey.length() > 0) {
updateKeys.add(nextUpdateKey);
} else {
throw new RuntimeException("Invalid update key column value specified"
+ ": '" + updateKeyColumns + "'");
}
}
updateCols = updateKeys.toArray(new String[updateKeys.size()]);
}
/**
* Generate the PreparedStatement object that will be used to update records
* in the database. All parameterized fields of the PreparedStatement must
* be set in this method as well; this is usually based on the records
* collected from the user in the records list
*
* This method must be overridden by sub-classes to define the database
* operation to be executed for user records
*/
@Override
protected PreparedStatement getPreparedStatement(
List<SqoopRecord> records) throws SQLException {
PreparedStatement stmt = null;
Connection conn = getConnection();
// Create a PreparedStatement object to Update all records
stmt = conn.prepareStatement(getUpdateStatement());
// Inject the record parameters into the UPDATE and WHERE clauses. This
// assumes that the update key column is the last column serialized in
// by the underlying record. Our code auto-gen process for exports was
// responsible for taking care of this constraint.
for (SqoopRecord record : records) {
record.write(stmt, 0);
stmt.addBatch();
}
return stmt;
}
/**
* @return an UPDATE statement that modifies rows based on a single key
* column (with the intent of modifying a single row).
*/
protected String getUpdateStatement() {
StringBuilder sb = new StringBuilder();
sb.append("UPDATE " + this.tableName + " SET ");
boolean first = true;
for (String col : this.columnNames) {
if (!first) {
sb.append(", ");
}
sb.append(col);
sb.append("=?");
first = false;
}
sb.append(" WHERE ");
first = true;
for (int i = 0; i < updateCols.length; i++) {
if (!first) {
sb.append(" AND ");
}
sb.append(updateCols[i]).append("=?");
first = false;
}
return sb.toString();
}
}

View File

@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.db;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
/**
* A failure handler which uses basic retry mechanism for handling
* SQL failures. Retry settings are embedded in job configuration
*/
public class BasicRetrySQLFailureHandler
extends SQLFailureHandler {
private static final Log LOG =
LogFactory.getLog(BasicRetrySQLFailureHandler.class);
// Configuration name for retry attempts
public static final String CONNECTION_RETRY_WAIT_MAX =
"connection.recover.wait.max";
// Configuration name for retry interval
public static final String CONNECTION_RETRY_WAIT_INTERVAL =
"connection.recover.wait.interval";
// Default values for retry settings
public static final int DEFAULT_RETRY_WAIT_MAX = 2 * 60 * 1000;
public static final int DEFAULT_RETRY_WAIT_INTERVAL = 500;
protected int retryWaitMax = 0;
protected int retryWaitInterval = 0;
public BasicRetrySQLFailureHandler() {
}
/**
* Initialize the the handler with job configuration.
*/
public void initialize(Configuration conf) throws IOException {
super.initialize(conf);
// Retrieve retry settings from job-configuration
retryWaitMax = conf.getInt(CONNECTION_RETRY_WAIT_MAX,
DEFAULT_RETRY_WAIT_MAX);
retryWaitInterval = conf.getInt(CONNECTION_RETRY_WAIT_INTERVAL,
DEFAULT_RETRY_WAIT_INTERVAL);
if (retryWaitMax <= retryWaitInterval || retryWaitInterval <= 0) {
LOG.error("Failed to initialize handler");
throw new IOException("Invalid retry paramers. Wait Max: "
+ retryWaitMax + ". wait interval: " + retryWaitInterval);
}
LOG.trace("Retry Handler initialized successfully");
}
/**
* Check whether the given failure is supported by this failure handler
*
* This is a generic handler for all SQLException failures. Subclasses
* should override this method for specific error handling
*/
public boolean canHandleFailure(Throwable failureCause) {
return failureCause != null
&& SQLException.class.isAssignableFrom(failureCause.getClass());
}
/**
* Provide specific handling for the failure and return a new valid
* connection.
*/
public Connection recover() throws IOException {
long nextRetryWait = 0;
int retryAttempts = 0;
boolean doRetry = true;
boolean validConnection = false;
Connection conn = null;
do {
validConnection = false;
// Use increasing wait interval
nextRetryWait = (long) Math.pow(retryAttempts, 2) * retryWaitInterval;
// Increase the number of retry attempts
++retryAttempts;
// If we exceeded max retry attempts, try one last time with max value
if (nextRetryWait > retryWaitMax) {
nextRetryWait = retryWaitMax;
doRetry = false;
}
try {
// Wait before trying to recover the connection
Thread.sleep(nextRetryWait);
// Discard the connection
discardConnection(conn);
// Try to get a new connection
conn = super.getConnection();
if (!validateConnection(conn)) {
// Log failure and continue
LOG.warn("Connection not valid");
} else {
LOG.info("A new connection has been established");
// Connection has been recovered so stop recovery retries
doRetry = false;
validConnection = true;
}
} catch (SQLException sqlEx) {
LOG.warn("Connection recovery attempt [" + retryAttempts + "] failed."
+ "Exception details: " + sqlEx.toString());
} catch (Exception ex) {
// Handle unexpected exceptions
LOG.error("Failed while recovering the connection. Exception details:"
+ ex.toString());
throw new IOException(ex);
}
} while (doRetry);
if (!validConnection) {
throw new IOException("Failed to recover connection after " +
retryAttempts + " retries. Giving up");
}
return conn;
}
/**
* Verify the provided connection is valid.
*/
protected boolean validateConnection(Connection connection)
throws SQLException {
return connection != null && !connection.isClosed()
&& connection.isValid(DEFAULT_RETRY_WAIT_INTERVAL);
}
/**
* Close the given connection.
*/
protected void discardConnection(Connection connection) throws IOException {
try {
if (connection != null) {
connection.close();
}
} catch(SQLException sqlEx) {
LOG.warn("Could not close connection. Exception details: " + sqlEx);
}
}
}

View File

@ -160,10 +160,10 @@ public void close() throws IOException {
if (null != results) {
results.close();
}
if (null != statement) {
if (null != statement && !statement.isClosed()) {
statement.close();
}
if (null != connection) {
if (null != connection && !connection.isClosed()) {
connection.commit();
connection.close();
}
@ -253,6 +253,8 @@ public boolean nextKeyValue() throws IOException {
statement.close();
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Failed to close statement", ex);
} finally {
this.statement = null;
}
}
if (this.connection != null) {
@ -260,6 +262,17 @@ public boolean nextKeyValue() throws IOException {
connection.close();
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Failed to close connection", ex);
} finally {
this.connection = null;
}
}
if (this.results != null) {
try {
results.close();
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Failed to close ResultsSet", ex);
} finally {
this.results = null;
}
}
@ -303,6 +316,10 @@ protected Connection getConnection() {
return connection;
}
protected void setConnection(Connection conn) {
connection = conn;
}
protected PreparedStatement getStatement() {
return statement;
}

View File

@ -70,9 +70,18 @@ public float getProgress() throws IOException {
/** Returns the query for selecting the records,
* subclasses can override this for custom behaviour.*/
protected String getSelectQuery() {
StringBuilder query = new StringBuilder();
DataDrivenDBInputFormat.DataDrivenDBInputSplit dataSplit =
(DataDrivenDBInputFormat.DataDrivenDBInputSplit) getSplit();
return getSelectQuery(dataSplit.getLowerClause(),
dataSplit.getUpperClause());
}
/** Returns the query for selecting the records, with lower and upper
* clause consitions provided as parameters
* This is needed for recovering from connection failures after some data
* in the split have been already processed */
protected String getSelectQuery(String lowerClause, String upperClause) {
StringBuilder query = new StringBuilder();
DBConfiguration dbConf = getDBConf();
String [] fieldNames = getFieldNames();
String tableName = getTableName();
@ -81,8 +90,8 @@ protected String getSelectQuery() {
// Build the WHERE clauses associated with the data split first.
// We need them in both branches of this function.
StringBuilder conditionClauses = new StringBuilder();
conditionClauses.append("( ").append(dataSplit.getLowerClause());
conditionClauses.append(" ) AND ( ").append(dataSplit.getUpperClause());
conditionClauses.append("( ").append(lowerClause);
conditionClauses.append(" ) AND ( ").append(upperClause);
conditionClauses.append(" )");
if (dbConf.getInputQuery() == null) {

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.db;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
/**
* Base class for handling SQL failures to while executing operations on the
* target database.
* Subclasses can provide different handling for connection errors like
* recovering from connection resets, or handling server throttling
*/
public abstract class SQLFailureHandler {
// Job configuration for the currently running job
protected Configuration conf;
/**
* Initialize the the handler with job configuration.
*/
public void initialize(Configuration c) throws IOException {
this.conf = c;
}
/**
* Check whether the given failure is supported by the connection failure
* handler.
*/
public abstract boolean canHandleFailure(Throwable failureCause);
/**
* Provide specific handling for the connection failure and return a new
* valid connection.
*/
public abstract Connection recover() throws IOException;
/**
* Establish a connection to the target database.
*/
protected Connection getConnection()
throws ClassNotFoundException, SQLException {
// Get a DBConfiguration object from the current job configuration
DBConfiguration dbConf = new DBConfiguration(conf);
Connection conn = null;
conn = dbConf.getConnection();
return conn;
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.db;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Connection failure handler for SQL Server which extends basic retry
* mechanism for handling connection failures. Retry settings are embedded
* in job configuration
*/
public class SQLServerConnectionFailureHandler
extends BasicRetrySQLFailureHandler {
private static final Log LOG =
LogFactory.getLog(SQLServerConnectionFailureHandler.class);
protected static final String CONNECTION_RESET_ERR_REGEX =
"(^Connection reset)(.*?)";
protected static final String SQLSTATE_CODE_CONNECTION_RESET = "08S01";
protected static final String VALIDATION_QUERY =
"SELECT CONVERT(NVARCHAR, CONTEXT_INFO()) AS contextInfo";
public SQLServerConnectionFailureHandler() {
}
/**
* Handle only connection reset or TCP exceptions.
*/
public boolean canHandleFailure(Throwable failureCause) {
if (!super.canHandleFailure(failureCause)) {
return false;
}
SQLException sqlEx = (SQLException) failureCause;
String errStateCode = sqlEx.getSQLState();
boolean canHandle = false;
// By default check SQLState code if available
if (errStateCode != null) {
canHandle = errStateCode == SQLSTATE_CODE_CONNECTION_RESET;
} else {
errStateCode = "NULL";
// In case SQLState code is not available, check the exception message
String errMsg = sqlEx.getMessage();
canHandle = errMsg.matches(CONNECTION_RESET_ERR_REGEX);
}
if (!canHandle) {
LOG.warn("Cannot handle error with SQL State: " + errStateCode);
}
return canHandle;
}
/**
* Verify the provided connection is valid. For SQL Server we test the
* connection by querying the session context.
*/
protected boolean validateConnection(Connection connection)
throws SQLException {
boolean isValid = false;
String contextInfo = null;
if (super.validateConnection(connection)) {
// Execute the validation query
PreparedStatement stmt = connection.prepareStatement(VALIDATION_QUERY);
ResultSet results = stmt.executeQuery();
// Read the context from the result set
if (results.next()) {
contextInfo = results.getString("contextInfo");
LOG.info("Session context is: "
+ ((contextInfo == null) ? "NULL" : contextInfo));
isValid = true;
}
}
return isValid;
}
}

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.db;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.sqoop.mapreduce.DBWritable;
import org.apache.sqoop.lib.SqoopRecord;
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
/**
* A InputFormat that reads input data from a SQL table.
* Operates like DataDrivenDBInputFormat, but attempts to recover from
* connection failures based on set recovery options in job configurations
*/
public class SQLServerDBInputFormat<T extends SqoopRecord>
extends DataDrivenDBInputFormat<T> implements Configurable {
private static final Log LOG =
LogFactory.getLog(SQLServerDBInputFormat.class);
public static final String IMPORT_FAILURE_HANDLER_CLASS =
"sqoop.import.failure.handler.class";
@Override
/** {@inheritDoc} */
protected RecordReader<LongWritable, T> createDBRecordReader(
DBInputSplit split, Configuration conf) throws IOException {
DBConfiguration dbConf = getDBConf();
Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
String dbProductName = getDBProductName();
LOG.debug("Creating db record reader for db product: " + dbProductName);
try {
return new SQLServerDBRecordReader<T>(split, inputClass,
conf, getConnection(), dbConf, dbConf.getInputConditions(),
dbConf.getInputFieldNames(), dbConf.getInputTableName(),
dbProductName);
} catch (SQLException ex) {
throw new IOException(ex);
}
}
/** Set Input for table. */
public static void setInput(Job job,
Class<? extends DBWritable> inputClass,
String tableName, String conditions,
String splitBy, String... fieldNames) {
DataDrivenDBInputFormat.setInput(job, inputClass, tableName, conditions,
splitBy, fieldNames);
job.setInputFormatClass(SQLServerDBInputFormat.class);
}
/** Set Input for query. */
public static void setInput(Job job,
Class<? extends DBWritable> inputClass,
String inputQuery, String inputBoundingQuery) {
DataDrivenDBInputFormat.setInput(job, inputClass, inputQuery,
inputBoundingQuery);
job.setInputFormatClass(SQLServerDBInputFormat.class);
}
}

View File

@ -0,0 +1,224 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.db;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.sqoop.mapreduce.sqlserver.SqlServerRecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
import com.cloudera.sqoop.mapreduce.db.DBInputFormat;
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
import org.apache.sqoop.lib.SqoopRecord;
/**
* A RecordReader that reads records from a SQL table.
* This record reader handles connection failures using the configured
* connection failure handler
*/
public class SQLServerDBRecordReader<T extends SqoopRecord> extends
SqlServerRecordReader<T> {
private static final Log LOG =
LogFactory.getLog(SQLServerDBRecordReader.class);
// The SQL handler to be used for recovering failed read operations
protected SQLFailureHandler failureHandler = null;
// Recover failed reads for RETRY_MAX
protected static final int RETRY_MAX = 3;
// Name of the split column used to re-generate selectQueries after
// connection failures
private String splitColumn;
private String lastRecordKey;
public SQLServerDBRecordReader(DBInputFormat.DBInputSplit split,
Class<T> inputClass, Configuration conf, Connection conn,
DBConfiguration dbConfig, String cond, String [] fields, String table,
String dbProduct) throws SQLException {
super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
}
@Override
/** {@inheritDoc} */
public T getCurrentValue() {
T val = super.getCurrentValue();
// Lookup the key of the last read record to use for recovering
// As documented, the map may not be null, though it may be empty.
Object lastRecordSplitCol = val.getFieldMap().get(splitColumn);
lastRecordKey = (lastRecordSplitCol == null) ? null
: lastRecordSplitCol.toString();
return val;
}
/**
* Load the SQLFailureHandler configured for use by the record reader.
*/
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
// Load the configured connection failure handler
Configuration conf = getConf();
if (conf == null) {
LOG.error("Configuration cannot be NULL");
}
Class connHandlerClass;
try {
String className = conf.get(
SQLServerDBInputFormat.IMPORT_FAILURE_HANDLER_CLASS);
// Get the class-name set in configuration
connHandlerClass = conf.getClassByName(className);
} catch (ClassNotFoundException ex) {
LOG.error("Failed to find class: "
+ SQLServerDBInputFormat.IMPORT_FAILURE_HANDLER_CLASS);
throw new IOException(ex);
}
// Verify handler class is a subclass of SQLFailureHandler
if (!SQLFailureHandler.class.isAssignableFrom(connHandlerClass)) {
String error = "A subclass of " + SQLFailureHandler.class.getName()
+ " is expected. Actual class set is: " + connHandlerClass.getName();
LOG.error(error);
throw new IOException(error);
}
LOG.trace("Using connection handler class: " + connHandlerClass);
// Load the configured connection failure handler
failureHandler = ReflectionUtils.newInstance(
(Class<? extends SQLFailureHandler>)connHandlerClass, conf);
// Initialize the connection handler with using job configuration
failureHandler.initialize(conf);
// Get the split-by column
splitColumn = getDBConf().getInputOrderBy();
if (splitColumn == null || splitColumn.length() == 0) {
throw new IOException("Split column must be set");
}
// Ensure the split-column is not escaped so that we can use it to search
// in the record map
int splitColLen = splitColumn.length();
if (splitColLen > 2 && splitColumn.charAt(0) == '['
&& splitColumn.charAt(splitColLen-1) == ']') {
splitColumn = splitColumn.substring(1, splitColLen - 1);
}
}
@Override
/**
* Read the next key, value pair.
* Try to recover failed connections using the configured connection failure
* handler before retrying the failed operation
*/
public boolean nextKeyValue() throws IOException {
boolean valueReceived = false;
int retryCount = RETRY_MAX;
boolean doRetry = true;
do {
try {
// Try to get the next key/value pairs
valueReceived = super.nextKeyValue();
doRetry = false;
} catch (IOException ioEx) {
LOG.warn("Trying to recover from DB read failure: ", ioEx);
Throwable cause = ioEx.getCause();
// Use configured connection handler to recover from the connection
// failure and use the newly constructed connection.
// If the failure cannot be recovered, an exception is thrown
if (failureHandler.canHandleFailure(cause)) {
// Recover from connection failure
Connection conn = failureHandler.recover();
// Configure the new connection before using it
configureConnection(conn);
setConnection(conn);
--retryCount;
doRetry = (retryCount >= 0);
} else {
// Cannot recovered using configured handler, re-throw
throw new IOException("Cannection handler cannot recover failure: ",
ioEx);
}
}
} while (doRetry);
// Rethrow the exception if all retry attempts are consumed
if (retryCount < 0) {
throw new IOException("Failed to read from database after "
+ RETRY_MAX + " retries.");
}
return valueReceived;
}
/**
* Configure the provided Connection for record reads.
*/
protected void configureConnection(Connection conn) throws IOException {
try {
conn.setAutoCommit(false);
conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
} catch (SQLException sqlEx) {
LOG.error("Failed to configure SQL Connection");
throw new IOException(sqlEx);
}
}
/** Returns the query for selecting the records,
* For handling connection recovery we always want to start from the last
* record that was successfully read.
*/
protected String getSelectQuery() {
// Last seen record key is only expected to be unavailable if no reads
// ever happened
String selectQuery;
if (lastRecordKey == null) {
selectQuery = super.getSelectQuery();
} else {
// If last record key is available, construct the select query to start
// from
DataDrivenDBInputFormat.DataDrivenDBInputSplit dataSplit =
(DataDrivenDBInputFormat.DataDrivenDBInputSplit) getSplit();
StringBuilder lowerClause = new StringBuilder();
lowerClause.append(getDBConf().getInputOrderBy());
lowerClause.append(" > ");
lowerClause.append(lastRecordKey.toString());
// Get the select query with the lowerClause, and split upper clause
selectQuery = getSelectQuery(lowerClause.toString(),
dataSplit.getUpperClause());
}
return selectQuery;
}
}

View File

@ -34,7 +34,7 @@
* Microsoft SQL Server specific Record Reader.
*/
public class SqlServerRecordReader<T extends DBWritable>
extends DataDrivenDBRecordReader {
extends DataDrivenDBRecordReader<T> {
private static final Log LOG =
LogFactory.getLog(SqlServerRecordReader.class);

154
src/test/aop/build/aop.xml Normal file
View File

@ -0,0 +1,154 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project name="aspects">
<!-- Properties common for all fault injections -->
<property name="build-fi.dir" location="${basedir}/build-fi"/>
<property name="sqoop-fi.jar" location="${build.dir}/${final.name}-fi.jar"/>
<property name="compile-inject.output"
value="${build-fi.dir}/compile-fi.log"/>
<property name="aspectversion" value="1.6.11"/>
<property name="javac.version" value="1.6"/>
<property file="${basedir}/build.properties"/>
<!--All Fault Injection (FI) related targets are located in this session -->
<target name="clean-fi">
<delete dir="${build-fi.dir}"/>
</target>
<!-- Weaving aspects in place
Later on one can run 'ant jar-fault-inject' to create
Hadoop jar file with instrumented classes-->
<target name="compile-fault-inject" depends="compile, compile-test">
<!-- AspectJ task definition -->
<taskdef
resource="org/aspectj/tools/ant/taskdefs/aspectjTaskdefs.properties">
<classpath>
<pathelement
location="${build.ivy.lib.dir}/sqoop/hadoop${hadoopversion}test/aspectjtools-${aspectversion}.jar"/>
</classpath>
</taskdef>
<echo message="Start weaving aspects in place"/>
<iajc
encoding="${build.encoding}"
srcdir="${src.dir.path}"
includes="**/org/apache/sqoop/**/*.java, **/org/apache/sqoop/**/*.aj"
destDir="${build-fi.dir}/classes"
debug="${javac.debug}"
target="${javac.version}"
source="${javac.version}"
deprecation="${javac.deprecation}"
fork="true"
maxmem="1024m"
>
<classpath>
<path refid="test.classpath"/>
<fileset dir="${build-fi.dir}/test">
<include name="**/*.jar" />
<exclude name="**/excluded/" />
</fileset>
</classpath>
</iajc>
<loadfile property="injection.failure" srcfile="${compile-inject.output}">
<filterchain>
<linecontainsregexp>
<regexp pattern='iajc.*warning'/>
</linecontainsregexp>
</filterchain>
</loadfile>
<fail if="injection.failure">
Broken binding of advises: ${line.separator}${injection.failure}
</fail>
<echo message="Weaving of aspects is finished"/>
</target>
<target name="fi-init">
<mkdir dir="${build-fi.dir}"/>
</target>
<target name="injectfaults"
description="Instrument classes with faults and other AOP advices">
<mkdir dir="${build-fi.dir}"/>
<delete file="${compile-inject.output}"/>
<echo message="In injectfaults ${src.dir}"/>
<weave-injectfault-aspects dest.dir="${build-fi.dir}/classes"
src.dir="${base.src.dir}">
</weave-injectfault-aspects>
</target>
<macrodef name="weave-injectfault-aspects">
<attribute name="dest.dir" />
<attribute name="src.dir" />
<sequential>
<subant buildpath="build.xml" target="compile-fault-inject"
output="${compile-inject.output}">
<property name="build.dir" value="${build-fi.dir}" />
<property name="src.dir.path" value="@{src.dir}" />
<property name="dest.dir" value="@{dest.dir}" />
</subant>
</sequential>
</macrodef>
<macrodef name="macro-run-tests-fault-inject">
<attribute name="target.name" />
<attribute name="testcasesonly" />
<sequential>
<subant buildpath="build.xml" target="@{target.name}">
<property name="build.dir" value="${build-fi.dir}"/>
<property name="test.fault.inject" value="yes"/>
<property name="test.include" value="TestFi*"/>
<property name="test.timeout" value="3000000"/>
<!-- This one is needed for the special "regression" target only -->
<property name="special.fi.testcasesonly" value="@{testcasesonly}"/>
</subant>
</sequential>
</macrodef>
<!-- ================================================================== -->
<!-- Make sqoop-fi.jar including all Fault injected artifacts -->
<!-- ================================================================== -->
<macrodef name="macro-jar-fault-inject">
<attribute name="target.name" />
<attribute name="build.dir" />
<attribute name="jar.final.name" />
<attribute name="jar.final.value" />
<sequential>
<subant buildpath="build.xml" target="@{target.name}">
<property name="build.dir" value="@{build.dir}"/>
<property name="@{jar.final.name}" value="@{jar.final.value}"/>
<property name="jar.extra.properties.list"
value="${test.dir}/fi-site.xml" />
</subant>
</sequential>
</macrodef>
<!-- ================================================================== -->
<!-- Make test jar files including all Fault Injected artifacts -->
<!-- ================================================================== -->
<macrodef name="macro-jar-test-fault-inject">
<attribute name="target.name" />
<attribute name="jar.final.name" />
<attribute name="jar.final.value" />
<sequential>
<echo message="@{jar.final.value}"/>
<subant buildpath="build.xml" target="@{target.name}">
<property name="build.dir" value="${build-fi.dir}"/>
<property name="@{jar.final.name}"
value="@{jar.final.value}"/>
</subant>
</sequential>
</macrodef>
<!--End of Fault Injection (FI) related session-->
</project>

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.fi;
import org.apache.hadoop.conf.Configuration;
/**
* This class wraps the logic around fault injection configuration file.
* Default file is expected to be found in src/test/fi-site.xml
* This default file should be copied by JUnit Ant's tasks to
* build/test/extraconf folder before tests are ran
* An alternative location can be set through
* -Dfi.config=<file_name>
*/
public class FiConfig {
private static final String CONFIG_PARAMETER = ProbabilityModel.FPROB_NAME
+ "config";
private static final String DEFAULT_CONFIG = "fi-site.xml";
private static Configuration conf;
static {
if (conf == null) {
conf = new Configuration(false);
String configName = System.getProperty(CONFIG_PARAMETER, DEFAULT_CONFIG);
conf.addResource(configName);
}
}
/**
* Method provides access to local Configuration
*
* @return Configuration initialized with fault injection's parameters
*/
public static Configuration getConfig() {
return conf;
}
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.fi;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
/**
* This class is responsible for the decision of when a fault
* has to be triggered within a class of Hadoop
*
* Default probability of injection is set to 0%. To change it
* one can set the sys. prop. -Dfi.*=<new probability level>
* Another way to do so is to set this level through FI config file,
* located under src/test/fi-site.conf
*
* To change the level one has to specify the following sys,prop.:
* -Dfi.<name of fault location>=<probability level> in the runtime
* Probability level is specified by a float between 0.0 and 1.0
*
* <name of fault location> might be represented by a short classname
* or otherwise. This decision is left up to the discretion of aspects
* developer, but has to be consistent through the code
*/
public class ProbabilityModel {
private static Random generator = new Random();
private static final Log LOG = LogFactory.getLog(ProbabilityModel.class);
static final String FPROB_NAME = "fi.";
private static final String ALL_PROBABILITIES = FPROB_NAME + "*";
private static final float DEFAULT_PROB = 0.00f; //Default probability is 0%
private static final float MAX_PROB = 1.00f; // Max probability is 100%
private static Configuration conf = FiConfig.getConfig();
static {
// Set new default probability if specified through a system.property
// If neither is specified set default probability to DEFAULT_PROB
conf.set(ALL_PROBABILITIES,
System.getProperty(ALL_PROBABILITIES,
conf.get(ALL_PROBABILITIES, Float.toString(DEFAULT_PROB))));
LOG.info(ALL_PROBABILITIES + "=" + conf.get(ALL_PROBABILITIES));
}
/**
* Simplistic method to check if we have reached the point of injection.
* @param klassName is the name of the probability level to check.
* If a configuration has been set for "fi.myClass" then you can check if the
* inject criteria has been reached by calling this method with "myClass"
* string as its parameter
* @return true if the probability threshold has been reached; false otherwise
*/
public static boolean injectCriteria(String klassName) {
boolean trigger = false;
if (generator.nextFloat() < getProbability(klassName)) {
trigger = true;
}
return trigger;
}
/**
* This primitive checks for arbitrary set of desired probability. If the
* level hasn't been set method will return default setting.
* The probability expected to be set as an float between 0.0 and 1.0
* @param klass is the name of the resource
* @return float representation of configured probability level of
* the requested resource or default value if hasn't been set
*/
protected static float getProbability(final String klass) {
String newProbName = FPROB_NAME + klass;
String newValue = System.getProperty(newProbName,
conf.get(ALL_PROBABILITIES));
if (newValue != null && !newValue.equals(conf.get(newProbName)))
conf.set(newProbName, newValue);
float ret = conf.getFloat(newProbName,
conf.getFloat(ALL_PROBABILITIES, DEFAULT_PROB));
LOG.debug("Request for " + newProbName + " returns=" + ret);
// Make sure that probability level is valid.
if (ret < DEFAULT_PROB || ret > MAX_PROB) {
LOG.info("Probability level is incorrect. Default value is set");
ret = conf.getFloat(ALL_PROBABILITIES, DEFAULT_PROB);
}
return ret;
}
}

View File

@ -0,0 +1,100 @@
package org.apache.sqoop.mapreduce;
import java.sql.SQLException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.List;
import org.apache.sqoop.fi.ProbabilityModel;
import org.apache.sqoop.lib.SqoopRecord;
import java.util.Random;
/**
* This aspect injects faults into the SQLServerExportDBExecThread
* which handles executing the write batch to the sql server database
*/
public privileged aspect SqlServerExportAspect {
// Before adding to these, note that every sqlstate must have a
// corresponding exceptionMsg and vice versa!!!!!!
private static String[] exceptionMsg = { "Connection reset",
"deadlocked on thread",
"Connection initialization error",
"Connection link failure"
};
private static String[] sqlStates = { "08S01", // covers SQL error states
// 40143 40197, 40501, 40613,
// 10054, 10053,64
"40001", // SQL Error 1205,
// deadlock victim
"01000", // SQL Error 233,
// connection init failure
"08001", // 10060 connection link
// init failure
};
private static boolean allFaults = false;
// export pointcut and advice
pointcut ExportExecuteStatementPointcut(SQLServerExportDBExecThread thread,
PreparedStatement stmt,
List<SqoopRecord> records):
execution (protected void SQLServerAsyncDBExecThread.executeStatement(
PreparedStatement, List<SqoopRecord>))
&& target(thread) && args(stmt, records);
void around(SQLServerExportDBExecThread thread,PreparedStatement stmt,
List<SqoopRecord> records) throws SQLException:
ExportExecuteStatementPointcut(thread, stmt, records) {
Random random = new Random();
int exceptionToThrow = 0;
if (allFaults)
{
exceptionToThrow = random.nextInt(sqlStates.length);
}
thread.LOG.info("exception to be thrown is " + exceptionToThrow);
// start the method like normal, execute the batch
Connection conn = thread.getConnection();
try {
// throw a SQL exception before/during the execute
if (ProbabilityModel.injectCriteria("SQLServerExportDBExecThread")) {
thread.LOG.info("throwing " + exceptionMsg[exceptionToThrow]
+ "exception after execute and before commit");
conn.close();
throw new SQLException(exceptionMsg[exceptionToThrow],
sqlStates[exceptionToThrow]);
}
stmt.executeBatch();
} catch (SQLException execSqlEx) {
thread.LOG.warn("Error executing statement: " + execSqlEx);
//conn.rollback();
if (thread.failedCommit &&
thread.canIgnoreForFailedCommit(execSqlEx.getSQLState())){
thread.LOG.info("Ignoring error after failed commit");
} else {
throw execSqlEx;
}
}
// If the batch of records is executed successfully, then commit before
// processing the next batch of records
try {
if (ProbabilityModel.injectCriteria("SQLServerExportDBExecThread")) {
thread.LOG.info("throwing " + exceptionMsg[exceptionToThrow]
+ "exception during commit");
conn.close();
throw new SQLException(exceptionMsg[exceptionToThrow],
sqlStates[exceptionToThrow]);
}
conn.commit();
thread.failedCommit = false;
} catch (SQLException commitSqlEx) {
thread.LOG.warn("Error while committing transactions: " + commitSqlEx);
thread.failedCommit = true;
throw commitSqlEx;
}
}
}

View File

@ -0,0 +1,97 @@
package org.apache.sqoop.mapreduce.db;
import java.sql.SQLException;
import org.apache.sqoop.fi.ProbabilityModel;
import java.sql.ResultSet;
import java.io.IOException;
import java.sql.Connection;
import java.lang.Math;
import java.util.Random;
/**
* This aspect forces sql connection exceptions and long backoff times
* class
*/
public privileged aspect SqlServerImportAspect {
// Before adding to these, note that every sqlstate must have a
// corresponding exceptionMsg and vice versa!!!!!!
private static String[] exceptionMsg = { "Connection reset",
"deadlocked on thread",
"Connection initialization error",
"Connection link failure"
};
private static String[] sqlStates = { "08S01", // covers SQL error states
// 40143 40197, 40501, 40613,
// 10054, 10053,64
"40001", // SQL Error 1205, deadlock victim
"01000", //SQL Error 233, connection
// init failure
"08001", //10060 connection link/
// init failure
};
private static final boolean allFaults = false;
// import pointcut, throw a SQL Exception as if the connection was reset
// during a database read
pointcut ImportQueryPointcut():
execution (protected ResultSet DBRecordReader.executeQuery(String))
&& target(DBRecordReader);
after() returning throws SQLException : ImportQueryPointcut() {
Random random = new Random();
int exceptionToThrow = 0;
if (allFaults)
{
exceptionToThrow = random.nextInt(sqlStates.length);
}
DBRecordReader.LOG.info("exception to be thrown is " + exceptionToThrow);
DBRecordReader.LOG.info("Hitting import execute query pointcut,"
+ " return a SQL Exception after reading rows");
if (ProbabilityModel.injectCriteria("DBRecordReader")) {
DBRecordReader.LOG.info("throwing " + exceptionMsg[exceptionToThrow]
+ "exception after reading");
throw new SQLException(exceptionMsg[exceptionToThrow],
sqlStates[exceptionToThrow]);
}
}
// connection reset pointcut. Make the backoff wait time the maximum time
pointcut ConnectionResetPointcut(SQLServerConnectionFailureHandler handler):
execution (public Connection BasicRetrySQLFailureHandler.recover())
&& target(handler);
before (SQLServerConnectionFailureHandler handler)
throws IOException : ConnectionResetPointcut(handler) {
handler.LOG.info("Hitting connection reset pointcut. "
+ "waiting max time of " + handler.DEFAULT_RETRY_WAIT_MAX
+ " and interval default is " + handler.DEFAULT_RETRY_WAIT_INTERVAL);
// calculate the max number of retries by solving for numRetries
// in the retry logic
// where timeToWait = retryNum^2 * DEFAULT_RETRY_WAIT_INTERVAL
// so therefore since we want to know the number of retries it
// takes to get the DEFAULT_RETRY_WAIT_MAX we solve for retryNum
long maxNumRetries = (long)Math.ceil(Math.sqrt
((double)handler.DEFAULT_RETRY_WAIT_MAX
/handler.DEFAULT_RETRY_WAIT_INTERVAL));
long maxTimeWait = 0;
for (double i = 0; i <= maxNumRetries; i++)
{
maxTimeWait += (long)(Math.pow(i, 2)
* (double)handler.DEFAULT_RETRY_WAIT_INTERVAL);
}
handler.LOG.info("Maximum retries possible is " + maxNumRetries
+ " and maximum time to wait is " + maxTimeWait);
if (ProbabilityModel.injectCriteria("SQLServerConnectionFailureHandler")) {
try {
handler.LOG.info("sleeping waiting for a connection for max time");
Thread.sleep(maxTimeWait);
} catch (InterruptedException ex) {
throw new IOException(ex);
}
}
}
}

31
src/test/fi-site.xml Normal file
View File

@ -0,0 +1,31 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put fault injection specific property overrides in this file. -->
<configuration>
<property>
<name>fi.*</name>
<value>0.10</value>
<description>
Default probability level for all injected faults specified
as a floating number between 0 and 1.00
</description>
</property>
</configuration>