diff --git a/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java b/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java index 93936c74..c57e778c 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java +++ b/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java @@ -19,12 +19,10 @@ package com.cloudera.sqoop.mapreduce; import java.io.IOException; -import java.sql.SQLException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; @@ -32,7 +30,6 @@ import com.cloudera.sqoop.mapreduce.db.DBConfiguration; import com.cloudera.sqoop.mapreduce.db.DBOutputFormat; -import com.cloudera.sqoop.ConnFactory; import com.cloudera.sqoop.manager.ConnManager; import com.cloudera.sqoop.manager.ExportJobContext; @@ -68,8 +65,7 @@ protected Class getMapperClass() { protected void configureOutputFormat(Job job, String tableName, String tableClassName) throws IOException { - Configuration conf = options.getConf(); - ConnManager mgr = new ConnFactory(conf).getManager(options); + ConnManager mgr = context.getConnManager(); try { String username = options.getUsername(); if (null == username || username.length() == 0) { @@ -93,12 +89,6 @@ protected void configureOutputFormat(Job job, String tableName, job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName); } catch (ClassNotFoundException cnfe) { throw new IOException("Could not load OutputFormat", cnfe); - } finally { - try { - mgr.close(); - } catch (SQLException sqlE) { - LOG.warn("Error closing connection: " + sqlE); - } } } diff --git a/src/java/com/cloudera/sqoop/mapreduce/JdbcUpdateExportJob.java b/src/java/com/cloudera/sqoop/mapreduce/JdbcUpdateExportJob.java index 152fd772..0cd9c1fd 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/JdbcUpdateExportJob.java +++ b/src/java/com/cloudera/sqoop/mapreduce/JdbcUpdateExportJob.java @@ -19,12 +19,10 @@ package com.cloudera.sqoop.mapreduce; import java.io.IOException; -import java.sql.SQLException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; @@ -32,7 +30,6 @@ import com.cloudera.sqoop.mapreduce.db.DBConfiguration; import com.cloudera.sqoop.mapreduce.db.DBOutputFormat; -import com.cloudera.sqoop.ConnFactory; import com.cloudera.sqoop.manager.ConnManager; import com.cloudera.sqoop.manager.ExportJobContext; import com.cloudera.sqoop.shims.ShimLoader; @@ -85,8 +82,7 @@ protected Class getMapperClass() { protected void configureOutputFormat(Job job, String tableName, String tableClassName) throws IOException { - Configuration conf = options.getConf(); - ConnManager mgr = new ConnFactory(conf).getManager(options); + ConnManager mgr = context.getConnManager(); try { String username = options.getUsername(); if (null == username || username.length() == 0) { @@ -131,12 +127,6 @@ protected void configureOutputFormat(Job job, String tableName, job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyCol); } catch (ClassNotFoundException cnfe) { throw new IOException("Could not load OutputFormat", cnfe); - } finally { - try { - mgr.close(); - } catch (SQLException sqlE) { - LOG.warn("Error closing connection: " + sqlE); - } } } } diff --git a/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpImportJob.java b/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpImportJob.java index a5bcba14..66aba86a 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpImportJob.java +++ b/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpImportJob.java @@ -19,7 +19,6 @@ package com.cloudera.sqoop.mapreduce; import java.io.IOException; -import java.sql.SQLException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,7 +32,6 @@ import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat; import org.apache.hadoop.mapreduce.lib.db.DBWritable; -import com.cloudera.sqoop.ConnFactory; import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.manager.ConnManager; import com.cloudera.sqoop.manager.MySQLUtils; @@ -71,69 +69,60 @@ protected void configureInputFormat(Job job, String tableName, throw new IOException("null tableName for MySQLDumpImportJob."); } - ConnManager mgr = new ConnFactory(options.getConf()).getManager(options); + ConnManager mgr = getContext().getConnManager(); + String username = options.getUsername(); + if (null == username || username.length() == 0) { + DBConfiguration.configureDB(job.getConfiguration(), + mgr.getDriverClass(), options.getConnectString()); + } else { + DBConfiguration.configureDB(job.getConfiguration(), + mgr.getDriverClass(), options.getConnectString(), username, + options.getPassword()); + } - try { - String username = options.getUsername(); - if (null == username || username.length() == 0) { - DBConfiguration.configureDB(job.getConfiguration(), - mgr.getDriverClass(), options.getConnectString()); - } else { - DBConfiguration.configureDB(job.getConfiguration(), - mgr.getDriverClass(), options.getConnectString(), username, - options.getPassword()); - } + String [] colNames = options.getColumns(); + if (null == colNames) { + colNames = mgr.getColumnNames(tableName); + } - String [] colNames = options.getColumns(); - if (null == colNames) { - colNames = mgr.getColumnNames(tableName); - } - - String [] sqlColNames = null; - if (null != colNames) { - sqlColNames = new String[colNames.length]; - for (int i = 0; i < colNames.length; i++) { - sqlColNames[i] = mgr.escapeColName(colNames[i]); - } - } - - // It's ok if the where clause is null in DBInputFormat.setInput. - String whereClause = options.getWhereClause(); - - // We can't set the class properly in here, because we may not have the - // jar loaded in this JVM. So we start by calling setInput() with - // DBWritable and then overriding the string manually. - - // Note that mysqldump also does *not* want a quoted table name. - DataDrivenDBInputFormat.setInput(job, DBWritable.class, - tableName, whereClause, - mgr.escapeColName(splitByCol), sqlColNames); - - Configuration conf = job.getConfiguration(); - conf.setInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY, - options.getOutputFieldDelim()); - conf.setInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY, - options.getOutputRecordDelim()); - conf.setInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY, - options.getOutputEnclosedBy()); - conf.setInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY, - options.getOutputEscapedBy()); - conf.setBoolean(MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, - options.isOutputEncloseRequired()); - String [] extraArgs = options.getExtraArgs(); - if (null != extraArgs) { - conf.setStrings(MySQLUtils.EXTRA_ARGS_KEY, extraArgs); - } - - LOG.debug("Using InputFormat: " + inputFormatClass); - job.setInputFormatClass(getInputFormatClass()); - } finally { - try { - mgr.close(); - } catch (SQLException sqlE) { - LOG.warn("Error closing connection: " + sqlE); + String [] sqlColNames = null; + if (null != colNames) { + sqlColNames = new String[colNames.length]; + for (int i = 0; i < colNames.length; i++) { + sqlColNames[i] = mgr.escapeColName(colNames[i]); } } + + // It's ok if the where clause is null in DBInputFormat.setInput. + String whereClause = options.getWhereClause(); + + // We can't set the class properly in here, because we may not have the + // jar loaded in this JVM. So we start by calling setInput() with + // DBWritable and then overriding the string manually. + + // Note that mysqldump also does *not* want a quoted table name. + DataDrivenDBInputFormat.setInput(job, DBWritable.class, + tableName, whereClause, + mgr.escapeColName(splitByCol), sqlColNames); + + Configuration conf = job.getConfiguration(); + conf.setInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY, + options.getOutputFieldDelim()); + conf.setInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY, + options.getOutputRecordDelim()); + conf.setInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY, + options.getOutputEnclosedBy()); + conf.setInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY, + options.getOutputEscapedBy()); + conf.setBoolean(MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, + options.isOutputEncloseRequired()); + String [] extraArgs = options.getExtraArgs(); + if (null != extraArgs) { + conf.setStrings(MySQLUtils.EXTRA_ARGS_KEY, extraArgs); + } + + LOG.debug("Using InputFormat: " + inputFormatClass); + job.setInputFormatClass(getInputFormatClass()); } /** diff --git a/src/java/com/cloudera/sqoop/mapreduce/MySQLExportJob.java b/src/java/com/cloudera/sqoop/mapreduce/MySQLExportJob.java index 17807e05..2780a648 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/MySQLExportJob.java +++ b/src/java/com/cloudera/sqoop/mapreduce/MySQLExportJob.java @@ -19,7 +19,6 @@ package com.cloudera.sqoop.mapreduce; import java.io.IOException; -import java.sql.SQLException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,7 +31,6 @@ import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat; import org.apache.hadoop.mapreduce.lib.db.DBWritable; -import com.cloudera.sqoop.ConnFactory; import com.cloudera.sqoop.manager.ConnManager; import com.cloudera.sqoop.manager.ExportJobContext; import com.cloudera.sqoop.manager.MySQLUtils; @@ -74,43 +72,34 @@ protected void configureInputFormat(Job job, String tableName, conf.setStrings(MySQLUtils.EXTRA_ARGS_KEY, extraArgs); } - ConnManager mgr = null; - try { - mgr = new ConnFactory(conf).getManager(options); - String username = options.getUsername(); - if (null == username || username.length() == 0) { - DBConfiguration.configureDB(job.getConfiguration(), - mgr.getDriverClass(), options.getConnectString()); - } else { - DBConfiguration.configureDB(job.getConfiguration(), - mgr.getDriverClass(), options.getConnectString(), username, - options.getPassword()); - } + ConnManager mgr = context.getConnManager(); + String username = options.getUsername(); + if (null == username || username.length() == 0) { + DBConfiguration.configureDB(job.getConfiguration(), + mgr.getDriverClass(), options.getConnectString()); + } else { + DBConfiguration.configureDB(job.getConfiguration(), + mgr.getDriverClass(), options.getConnectString(), username, + options.getPassword()); + } - String [] colNames = options.getColumns(); - if (null == colNames) { - colNames = mgr.getColumnNames(tableName); - } + String [] colNames = options.getColumns(); + if (null == colNames) { + colNames = mgr.getColumnNames(tableName); + } - String [] sqlColNames = null; - if (null != colNames) { - sqlColNames = new String[colNames.length]; - for (int i = 0; i < colNames.length; i++) { - sqlColNames[i] = mgr.escapeColName(colNames[i]); - } - } - - // Note that mysqldump also does *not* want a quoted table name. - DataDrivenDBInputFormat.setInput(job, DBWritable.class, - tableName, null, null, sqlColNames); - } finally { - try { - mgr.close(); - } catch (SQLException sqlE) { - LOG.warn("Error closing connection manager: " + sqlE); + String [] sqlColNames = null; + if (null != colNames) { + sqlColNames = new String[colNames.length]; + for (int i = 0; i < colNames.length; i++) { + sqlColNames[i] = mgr.escapeColName(colNames[i]); } } + // Note that mysqldump also does *not* want a quoted table name. + DataDrivenDBInputFormat.setInput(job, DBWritable.class, + tableName, null, null, sqlColNames); + // Configure the actual InputFormat to use. super.configureInputFormat(job, tableName, tableClassName, splitByCol); }