5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 01:32:20 +08:00

SQOOP-1278: Allow use of uncommitted isolation for databases that support it as an import option

(Venkat Ranganathan via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2014-02-04 13:01:19 -08:00
parent 03fa9c5302
commit d3758915bb
8 changed files with 95 additions and 7 deletions

View File

@ -37,4 +37,6 @@ Argument Description
+\--verbose+ Print more information while working +\--verbose+ Print more information while working
+\--connection-param-file <filename>+ Optional properties file that\ +\--connection-param-file <filename>+ Optional properties file that\
provides connection parameters provides connection parameters
+\--relaxed-isolation+ Set connection transaction isolation\
to read uncommitted for the mappers.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------

View File

@ -285,6 +285,18 @@ are expected to be present in the shell path of the task process. For MySQL
the utilities +mysqldump+ and +mysqlimport+ are required, whereas for the utilities +mysqldump+ and +mysqlimport+ are required, whereas for
PostgreSQL the utility +psql+ is required. PostgreSQL the utility +psql+ is required.
Controlling transaction isolation
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
By default, Sqoop uses the read committed transaction isolation in the mappers
to import data. This may not be the ideal in all ETL workflows and it may
desired to reduce the isolation guarantees. The +\--relaxed-isolation+ option
can be used to instruct Sqoop to use read uncommitted isolation level.
The +read-uncommitted+ isolation level is not supported on all databases
(for example, Oracle), so specifying the option +\--relaxed-isolation+
may not be supported on all databases.
Controlling type mapping Controlling type mapping
^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^

View File

@ -300,6 +300,9 @@ public String toString() {
// Accumulo zookeeper // Accumulo zookeeper
@StoredAsProperty("accumulo.zookeepers") private String accumuloZookeepers; @StoredAsProperty("accumulo.zookeepers") private String accumuloZookeepers;
// Relaxed Isolation
@StoredAsProperty("relaxed.isolation") private boolean relaxedIsolation;
// These next two fields are not serialized to the metastore. // These next two fields are not serialized to the metastore.
// If this SqoopOptions is created by reading a saved job, these will // If this SqoopOptions is created by reading a saved job, these will
// be populated by the JobStorage to facilitate updating the same // be populated by the JobStorage to facilitate updating the same
@ -962,6 +965,10 @@ private void initDefaults(Configuration baseConfiguration) {
this.validatorClass = RowCountValidator.class; this.validatorClass = RowCountValidator.class;
this.validationThresholdClass = AbsoluteValidationThreshold.class; this.validationThresholdClass = AbsoluteValidationThreshold.class;
this.validationFailureHandlerClass = AbortOnFailureHandler.class; this.validationFailureHandlerClass = AbortOnFailureHandler.class;
// Relaxed isolation will not enabled by default which is the behavior
// of sqoop until now.
this.relaxedIsolation = false;
} }
/** /**
@ -2449,4 +2456,13 @@ public void setSkipDistCache(boolean skip) {
public boolean isSkipDistCache() { public boolean isSkipDistCache() {
return this.skipDistCache; return this.skipDistCache;
} }
public void setRelaxedIsolation(boolean b) {
this.relaxedIsolation = true;
}
public boolean getRelaxedIsolation() {
return this.relaxedIsolation;
}
} }

View File

@ -247,7 +247,13 @@ protected void configureInputFormat(Job job, String tableName,
new DBConfiguration(job.getConfiguration()).setInputOrderBy( new DBConfiguration(job.getConfiguration()).setInputOrderBy(
splitByCol); splitByCol);
} }
if (options.getRelaxedIsolation()) {
LOG
.info("Enabling relaxed (read uncommitted) transaction "
+ "isolation for imports");
job.getConfiguration()
.setBoolean(DBConfiguration.PROP_RELAXED_ISOLATION, true);
}
LOG.debug("Using table class: " + tableClassName); LOG.debug("Using table class: " + tableClassName);
job.getConfiguration().set(ConfigurationHelper.getDbInputClassProperty(), job.getConfiguration().set(ConfigurationHelper.getDbInputClassProperty(),
tableClassName); tableClassName);

View File

@ -121,6 +121,13 @@ public class DBConfiguration {
public static final String OUTPUT_FIELD_COUNT_PROPERTY = public static final String OUTPUT_FIELD_COUNT_PROPERTY =
"mapreduce.jdbc.output.field.count"; "mapreduce.jdbc.output.field.count";
/**
* The name of the parameter to use for making Isolation level to be
* read uncommitted by default for connections.
*/
public static final String PROP_RELAXED_ISOLATION =
"org.apache.sqoop.db.relaxedisolation";
/** /**
* Sets the DB access related fields in the {@link Configuration}. * Sets the DB access related fields in the {@link Configuration}.
* @param conf the configuration * @param conf the configuration
@ -150,6 +157,7 @@ public static void configureDB(Configuration conf, String driverClass,
conf.set(CONNECTION_PARAMS_PROPERTY, conf.set(CONNECTION_PARAMS_PROPERTY,
propertiesToString(connectionParams)); propertiesToString(connectionParams));
} }
} }
// set the password in the secure credentials object // set the password in the secure credentials object
@ -295,6 +303,7 @@ public Connection getConnection()
connectString, username, password); connectString, username, password);
} }
} }
return connection; return connection;
} }

View File

@ -29,6 +29,8 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
@ -58,7 +60,8 @@
public class DBInputFormat<T extends DBWritable> public class DBInputFormat<T extends DBWritable>
extends InputFormat<LongWritable, T> implements Configurable { extends InputFormat<LongWritable, T> implements Configurable {
public static final Log LOG = LogFactory.getLog(
DBInputFormat.class.getName());
private String dbProductName = "DEFAULT"; private String dbProductName = "DEFAULT";
/** /**
@ -160,9 +163,6 @@ public void setConf(Configuration conf) {
try { try {
getConnection(); getConnection();
DatabaseMetaData dbMeta = connection.getMetaData();
this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase();
} catch (Exception ex) { } catch (Exception ex) {
throw new RuntimeException(ex); throw new RuntimeException(ex);
} }
@ -172,6 +172,31 @@ public void setConf(Configuration conf) {
conditions = dbConf.getInputConditions(); conditions = dbConf.getInputConditions();
} }
private void setTxIsolation(Connection conn) {
try {
if (getConf()
.getBoolean(DBConfiguration.PROP_RELAXED_ISOLATION, false)) {
if (dbProductName.startsWith("ORACLE")) {
LOG.info("Using read committed transaction isolation for Oracle"
+ " as read uncommitted is not supported");
this.connection.setTransactionIsolation(
Connection.TRANSACTION_READ_COMMITTED);
} else {
LOG.info("Using read uncommited transaction isolation");
this.connection.setTransactionIsolation(
Connection.TRANSACTION_READ_UNCOMMITTED);
}
}
else {
LOG.info("Using read commited transaction isolation");
this.connection.setTransactionIsolation(
Connection.TRANSACTION_READ_COMMITTED);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public Configuration getConf() { public Configuration getConf() {
return dbConf.getConf(); return dbConf.getConf();
} }
@ -182,12 +207,14 @@ public DBConfiguration getDBConf() {
public Connection getConnection() { public Connection getConnection() {
try { try {
if (null == this.connection) { if (null == this.connection) {
// The connection was closed; reinstantiate it. // The connection was closed; reinstantiate it.
this.connection = dbConf.getConnection(); this.connection = dbConf.getConnection();
this.connection.setAutoCommit(false); this.connection.setAutoCommit(false);
this.connection.setTransactionIsolation( DatabaseMetaData dbMeta = connection.getMetaData();
Connection.TRANSACTION_READ_COMMITTED); this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase();
setTxIsolation(connection);
} }
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -155,6 +155,7 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
public static final String UPDATE_MODE_ARG = "update-mode"; public static final String UPDATE_MODE_ARG = "update-mode";
public static final String CALL_ARG = "call"; public static final String CALL_ARG = "call";
public static final String SKIP_DISTCACHE_ARG = "skip-dist-cache"; public static final String SKIP_DISTCACHE_ARG = "skip-dist-cache";
public static final String RELAXED_ISOLATION = "relaxed-isolation";
// Arguments for validation. // Arguments for validation.
public static final String VALIDATE_ARG = "validate"; public static final String VALIDATE_ARG = "validate";
@ -444,6 +445,11 @@ protected RelatedOptions getCommonOptions() {
.withDescription("Print usage instructions") .withDescription("Print usage instructions")
.withLongOpt(HELP_ARG) .withLongOpt(HELP_ARG)
.create()); .create());
// relax isolation requirements
commonOpts.addOption(OptionBuilder
.withDescription("Use read-uncommitted isolation for imports")
.withLongOpt(RELAXED_ISOLATION)
.create());
return commonOpts; return commonOpts;
} }
@ -969,6 +975,9 @@ protected void applyCommonOptions(CommandLine in, SqoopOptions out)
} else if (in.hasOption(HADOOP_HOME_ARG)) { } else if (in.hasOption(HADOOP_HOME_ARG)) {
out.setHadoopMapRedHome(in.getOptionValue(HADOOP_HOME_ARG)); out.setHadoopMapRedHome(in.getOptionValue(HADOOP_HOME_ARG));
} }
if (in.hasOption(RELAXED_ISOLATION)) {
out.setRelaxedIsolation(true);
}
} }
private void applyCredentialsOptions(CommandLine in, SqoopOptions out) private void applyCredentialsOptions(CommandLine in, SqoopOptions out)

View File

@ -468,4 +468,11 @@ public void testHBaseBulkLoadMissingHbaseTable() throws Exception {
private static String longArgument(String argument) { private static String longArgument(String argument) {
return String.format("--%s", argument); return String.format("--%s", argument);
} }
public void testRelaxedIsolation() throws Exception {
String extraArgs[] = {
"--relaxed-isolation",
};
validateImportOptions(extraArgs);
}
} }