From d3758915bb8b265ec157cd4bc21a7e17d922458e Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Tue, 4 Feb 2014 13:01:19 -0800 Subject: [PATCH] SQOOP-1278: Allow use of uncommitted isolation for databases that support it as an import option (Venkat Ranganathan via Jarek Jarcec Cecho) --- src/docs/user/common-args.txt | 2 + src/docs/user/import.txt | 12 ++++++ src/java/org/apache/sqoop/SqoopOptions.java | 16 ++++++++ .../sqoop/mapreduce/DataDrivenImportJob.java | 8 +++- .../sqoop/mapreduce/db/DBConfiguration.java | 9 +++++ .../sqoop/mapreduce/db/DBInputFormat.java | 39 ++++++++++++++++--- .../org/apache/sqoop/tool/BaseSqoopTool.java | 9 +++++ .../com/cloudera/sqoop/TestSqoopOptions.java | 7 ++++ 8 files changed, 95 insertions(+), 7 deletions(-) diff --git a/src/docs/user/common-args.txt b/src/docs/user/common-args.txt index 8a017f42..98f19be0 100644 --- a/src/docs/user/common-args.txt +++ b/src/docs/user/common-args.txt @@ -37,4 +37,6 @@ Argument Description +\--verbose+ Print more information while working +\--connection-param-file + Optional properties file that\ provides connection parameters ++\--relaxed-isolation+ Set connection transaction isolation\ + to read uncommitted for the mappers. ------------------------------------------------------------------------------- diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt index 0db6d977..7a3fa435 100644 --- a/src/docs/user/import.txt +++ b/src/docs/user/import.txt @@ -285,6 +285,18 @@ are expected to be present in the shell path of the task process. For MySQL the utilities +mysqldump+ and +mysqlimport+ are required, whereas for PostgreSQL the utility +psql+ is required. +Controlling transaction isolation +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +By default, Sqoop uses the read committed transaction isolation in the mappers +to import data. This may not be the ideal in all ETL workflows and it may +desired to reduce the isolation guarantees. The +\--relaxed-isolation+ option +can be used to instruct Sqoop to use read uncommitted isolation level. + +The +read-uncommitted+ isolation level is not supported on all databases +(for example, Oracle), so specifying the option +\--relaxed-isolation+ +may not be supported on all databases. + Controlling type mapping ^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index 46e158c7..f1b8b135 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -300,6 +300,9 @@ public String toString() { // Accumulo zookeeper @StoredAsProperty("accumulo.zookeepers") private String accumuloZookeepers; + // Relaxed Isolation + @StoredAsProperty("relaxed.isolation") private boolean relaxedIsolation; + // These next two fields are not serialized to the metastore. // If this SqoopOptions is created by reading a saved job, these will // be populated by the JobStorage to facilitate updating the same @@ -962,6 +965,10 @@ private void initDefaults(Configuration baseConfiguration) { this.validatorClass = RowCountValidator.class; this.validationThresholdClass = AbsoluteValidationThreshold.class; this.validationFailureHandlerClass = AbortOnFailureHandler.class; + + // Relaxed isolation will not enabled by default which is the behavior + // of sqoop until now. + this.relaxedIsolation = false; } /** @@ -2449,4 +2456,13 @@ public void setSkipDistCache(boolean skip) { public boolean isSkipDistCache() { return this.skipDistCache; } + + public void setRelaxedIsolation(boolean b) { + this.relaxedIsolation = true; + + } + + public boolean getRelaxedIsolation() { + return this.relaxedIsolation; + } } diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java index b21560e7..6dcfebb9 100644 --- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java @@ -247,7 +247,13 @@ protected void configureInputFormat(Job job, String tableName, new DBConfiguration(job.getConfiguration()).setInputOrderBy( splitByCol); } - + if (options.getRelaxedIsolation()) { + LOG + .info("Enabling relaxed (read uncommitted) transaction " + + "isolation for imports"); + job.getConfiguration() + .setBoolean(DBConfiguration.PROP_RELAXED_ISOLATION, true); + } LOG.debug("Using table class: " + tableClassName); job.getConfiguration().set(ConfigurationHelper.getDbInputClassProperty(), tableClassName); diff --git a/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java b/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java index be942cee..a9b7e426 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java +++ b/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java @@ -121,6 +121,13 @@ public class DBConfiguration { public static final String OUTPUT_FIELD_COUNT_PROPERTY = "mapreduce.jdbc.output.field.count"; + /** + * The name of the parameter to use for making Isolation level to be + * read uncommitted by default for connections. + */ + public static final String PROP_RELAXED_ISOLATION = + "org.apache.sqoop.db.relaxedisolation"; + /** * Sets the DB access related fields in the {@link Configuration}. * @param conf the configuration @@ -150,6 +157,7 @@ public static void configureDB(Configuration conf, String driverClass, conf.set(CONNECTION_PARAMS_PROPERTY, propertiesToString(connectionParams)); } + } // set the password in the secure credentials object @@ -295,6 +303,7 @@ public Connection getConnection() connectString, username, password); } } + return connection; } diff --git a/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java b/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java index 73ed94e4..3a8e5d08 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java +++ b/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java @@ -29,6 +29,8 @@ import java.util.ArrayList; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; @@ -58,7 +60,8 @@ public class DBInputFormat extends InputFormat implements Configurable { - + public static final Log LOG = LogFactory.getLog( + DBInputFormat.class.getName()); private String dbProductName = "DEFAULT"; /** @@ -160,9 +163,6 @@ public void setConf(Configuration conf) { try { getConnection(); - - DatabaseMetaData dbMeta = connection.getMetaData(); - this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase(); } catch (Exception ex) { throw new RuntimeException(ex); } @@ -172,6 +172,31 @@ public void setConf(Configuration conf) { conditions = dbConf.getInputConditions(); } + private void setTxIsolation(Connection conn) { + try { + + if (getConf() + .getBoolean(DBConfiguration.PROP_RELAXED_ISOLATION, false)) { + if (dbProductName.startsWith("ORACLE")) { + LOG.info("Using read committed transaction isolation for Oracle" + + " as read uncommitted is not supported"); + this.connection.setTransactionIsolation( + Connection.TRANSACTION_READ_COMMITTED); + } else { + LOG.info("Using read uncommited transaction isolation"); + this.connection.setTransactionIsolation( + Connection.TRANSACTION_READ_UNCOMMITTED); + } + } + else { + LOG.info("Using read commited transaction isolation"); + this.connection.setTransactionIsolation( + Connection.TRANSACTION_READ_COMMITTED); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } public Configuration getConf() { return dbConf.getConf(); } @@ -182,12 +207,14 @@ public DBConfiguration getDBConf() { public Connection getConnection() { try { + if (null == this.connection) { // The connection was closed; reinstantiate it. this.connection = dbConf.getConnection(); this.connection.setAutoCommit(false); - this.connection.setTransactionIsolation( - Connection.TRANSACTION_READ_COMMITTED); + DatabaseMetaData dbMeta = connection.getMetaData(); + this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase(); + setTxIsolation(connection); } } catch (Exception e) { throw new RuntimeException(e); diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java index 6d6f1ea5..ceda9f3d 100644 --- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java +++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java @@ -155,6 +155,7 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool { public static final String UPDATE_MODE_ARG = "update-mode"; public static final String CALL_ARG = "call"; public static final String SKIP_DISTCACHE_ARG = "skip-dist-cache"; + public static final String RELAXED_ISOLATION = "relaxed-isolation"; // Arguments for validation. public static final String VALIDATE_ARG = "validate"; @@ -444,6 +445,11 @@ protected RelatedOptions getCommonOptions() { .withDescription("Print usage instructions") .withLongOpt(HELP_ARG) .create()); + // relax isolation requirements + commonOpts.addOption(OptionBuilder + .withDescription("Use read-uncommitted isolation for imports") + .withLongOpt(RELAXED_ISOLATION) + .create()); return commonOpts; } @@ -969,6 +975,9 @@ protected void applyCommonOptions(CommandLine in, SqoopOptions out) } else if (in.hasOption(HADOOP_HOME_ARG)) { out.setHadoopMapRedHome(in.getOptionValue(HADOOP_HOME_ARG)); } + if (in.hasOption(RELAXED_ISOLATION)) { + out.setRelaxedIsolation(true); + } } private void applyCredentialsOptions(CommandLine in, SqoopOptions out) diff --git a/src/test/com/cloudera/sqoop/TestSqoopOptions.java b/src/test/com/cloudera/sqoop/TestSqoopOptions.java index 686d3988..60460d99 100644 --- a/src/test/com/cloudera/sqoop/TestSqoopOptions.java +++ b/src/test/com/cloudera/sqoop/TestSqoopOptions.java @@ -468,4 +468,11 @@ public void testHBaseBulkLoadMissingHbaseTable() throws Exception { private static String longArgument(String argument) { return String.format("--%s", argument); } + + public void testRelaxedIsolation() throws Exception { + String extraArgs[] = { + "--relaxed-isolation", + }; + validateImportOptions(extraArgs); + } }