diff --git a/src/java/org/apache/hadoop/sqoop/Sqoop.java b/src/java/org/apache/hadoop/sqoop/Sqoop.java index 074215f0..95ba8d14 100644 --- a/src/java/org/apache/hadoop/sqoop/Sqoop.java +++ b/src/java/org/apache/hadoop/sqoop/Sqoop.java @@ -19,6 +19,7 @@ package org.apache.hadoop.sqoop; import java.io.IOException; +import java.sql.SQLException; import java.util.ArrayList; import java.util.List; @@ -182,89 +183,97 @@ public int run(String [] args) { } } - if (options.doHiveImport()) { - hiveImport = new HiveImport(options, manager, getConf()); - } + try { + if (options.doHiveImport()) { + hiveImport = new HiveImport(options, manager, getConf()); + } - SqoopOptions.ControlAction action = options.getAction(); - if (action == SqoopOptions.ControlAction.ListTables) { - String [] tables = manager.listTables(); - if (null == tables) { - System.err.println("Could not retrieve tables list from server"); - LOG.error("manager.listTables() returned null"); - return 1; - } else { - for (String tbl : tables) { - System.out.println(tbl); - } - } - } else if (action == SqoopOptions.ControlAction.ListDatabases) { - String [] databases = manager.listDatabases(); - if (null == databases) { - System.err.println("Could not retrieve database list from server"); - LOG.error("manager.listDatabases() returned null"); - return 1; - } else { - for (String db : databases) { - System.out.println(db); - } - } - } else if (action == SqoopOptions.ControlAction.DebugExec) { - // just run a SQL statement for debugging purposes. - manager.execAndPrint(options.getDebugSqlCmd()); - return 0; - } else if (action == SqoopOptions.ControlAction.Export) { - // Export a table. - try { - exportTable(options.getTableName()); - } catch (IOException ioe) { - LOG.error("Encountered IOException running export job: " + ioe.toString()); - if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) { - throw new RuntimeException(ioe); - } else { + SqoopOptions.ControlAction action = options.getAction(); + if (action == SqoopOptions.ControlAction.ListTables) { + String [] tables = manager.listTables(); + if (null == tables) { + System.err.println("Could not retrieve tables list from server"); + LOG.error("manager.listTables() returned null"); return 1; - } - } catch (ExportException ee) { - LOG.error("Error during export: " + ee.toString()); - if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) { - throw new RuntimeException(ee); } else { - return 1; - } - } - } else { - // This is either FullImport or GenerateOnly. - - try { - if (options.isAllTables()) { - String [] tables = manager.listTables(); - if (null == tables) { - System.err.println("Could not retrieve tables list from server"); - LOG.error("manager.listTables() returned null"); - return 1; - } else { - for (String tableName : tables) { - importTable(tableName); - } + for (String tbl : tables) { + System.out.println(tbl); } - } else { - // just import a single table the user specified. - importTable(options.getTableName()); } - } catch (IOException ioe) { - LOG.error("Encountered IOException running import job: " + ioe.toString()); - if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) { - throw new RuntimeException(ioe); - } else { + } else if (action == SqoopOptions.ControlAction.ListDatabases) { + String [] databases = manager.listDatabases(); + if (null == databases) { + System.err.println("Could not retrieve database list from server"); + LOG.error("manager.listDatabases() returned null"); return 1; - } - } catch (ImportException ie) { - LOG.error("Error during import: " + ie.toString()); - if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) { - throw new RuntimeException(ie); } else { - return 1; + for (String db : databases) { + System.out.println(db); + } } + } else if (action == SqoopOptions.ControlAction.DebugExec) { + // just run a SQL statement for debugging purposes. + manager.execAndPrint(options.getDebugSqlCmd()); + return 0; + } else if (action == SqoopOptions.ControlAction.Export) { + // Export a table. + try { + exportTable(options.getTableName()); + } catch (IOException ioe) { + LOG.error("Encountered IOException running export job: " + ioe.toString()); + if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) { + throw new RuntimeException(ioe); + } else { + return 1; + } + } catch (ExportException ee) { + LOG.error("Error during export: " + ee.toString()); + if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) { + throw new RuntimeException(ee); + } else { + return 1; + } + } + } else { + // This is either FullImport or GenerateOnly. + + try { + if (options.isAllTables()) { + String [] tables = manager.listTables(); + if (null == tables) { + System.err.println("Could not retrieve tables list from server"); + LOG.error("manager.listTables() returned null"); + return 1; + } else { + for (String tableName : tables) { + importTable(tableName); + } + } + } else { + // just import a single table the user specified. + importTable(options.getTableName()); + } + } catch (IOException ioe) { + LOG.error("Encountered IOException running import job: " + ioe.toString()); + if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) { + throw new RuntimeException(ioe); + } else { + return 1; + } + } catch (ImportException ie) { + LOG.error("Error during import: " + ie.toString()); + if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) { + throw new RuntimeException(ie); + } else { + return 1; + } + } + } + } finally { + try { + manager.close(); + } catch (SQLException sqlE) { + LOG.warn("Error while closing connection: " + sqlE); } } diff --git a/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java b/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java index 252edf49..12860b74 100644 --- a/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java +++ b/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java @@ -25,6 +25,8 @@ import java.sql.SQLException; import java.sql.Types; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.TimeZone; import java.lang.reflect.Method; @@ -47,10 +49,142 @@ public class OracleManager extends GenericJdbcManager { // driver class to ensure is loaded when making db connection. private static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver"; + + // Oracle XE does a poor job of releasing server-side resources for + // closed connections. So we actually want to cache connections as + // much as possible. This is especially important for JUnit tests which + // may need to make 60 or more connections (serially), since each test + // uses a different OracleManager instance. + private static class ConnCache { + + public static final Log LOG = LogFactory.getLog(ConnCache.class.getName()); + + private static class CacheKey { + public final String connectString; + public final String username; + + public CacheKey(String connect, String user) { + this.connectString = connect; + this.username = user; // note: may be null. + } + + @Override + public boolean equals(Object o) { + if (o instanceof CacheKey) { + CacheKey k = (CacheKey) o; + if (null == username) { + return k.username == null && k.connectString.equals(connectString); + } else { + return k.username.equals(username) + && k.connectString.equals(connectString); + } + } else { + return false; + } + } + + @Override + public int hashCode() { + if (null == username) { + return connectString.hashCode(); + } else { + return username.hashCode() ^ connectString.hashCode(); + } + } + + @Override + public String toString() { + return connectString + "/" + username; + } + } + + private Map connectionMap; + + public ConnCache() { + LOG.debug("Instantiated new connection cache."); + connectionMap = new HashMap(); + } + + /** + * @return a Connection instance that can be used to connect to the + * given database, if a previously-opened connection is available in + * the cache. Returns null if none is available in the map. + */ + public synchronized Connection getConnection(String connectStr, + String username) throws SQLException { + CacheKey key = new CacheKey(connectStr, username); + Connection cached = connectionMap.get(key); + if (null != cached) { + connectionMap.remove(key); + if (cached.isReadOnly()) { + // Read-only mode? Don't want it. + cached.close(); + } + + if (cached.isClosed()) { + // This connection isn't usable. + return null; + } + + cached.rollback(); // Reset any transaction state. + cached.clearWarnings(); + + LOG.debug("Got cached connection for " + key); + } + + return cached; + } + + /** + * Returns a connection to the cache pool for future use. If a connection + * is already cached for the connectstring/username pair, then this + * connection is closed and discarded. + */ + public synchronized void recycle(String connectStr, String username, + Connection conn) throws SQLException { + + CacheKey key = new CacheKey(connectStr, username); + Connection existing = connectionMap.get(key); + if (null != existing) { + // Cache is already full for this entry. + LOG.debug("Discarding additional connection for " + key); + conn.close(); + return; + } + + // Put it in the map for later use. + LOG.debug("Caching released connection for " + key); + connectionMap.put(key, conn); + } + + @Override + protected synchronized void finalize() throws Throwable { + for (Connection c : connectionMap.values()) { + c.close(); + } + + super.finalize(); + } + } + + private static final ConnCache CACHE; + static { + CACHE = new ConnCache(); + } + public OracleManager(final SqoopOptions opts) { super(DRIVER_CLASS, opts); } + public void close() throws SQLException { + release(); // Release any open statements associated with the connection. + if (hasOpenConnection()) { + // Release our open connection back to the cache. + CACHE.recycle(options.getConnectString(), options.getUsername(), + getConnection()); + } + } + protected String getColNamesQuery(String tableName) { // SqlManager uses "tableName AS t" which doesn't work in Oracle. return "SELECT t.* FROM " + escapeTableName(tableName) + " t"; @@ -77,10 +211,19 @@ protected Connection makeConnection() throws SQLException { String username = options.getUsername(); String password = options.getPassword(); - if (null == username) { - connection = DriverManager.getConnection(options.getConnectString()); - } else { - connection = DriverManager.getConnection(options.getConnectString(), username, password); + String connectStr = options.getConnectString(); + + connection = CACHE.getConnection(connectStr, username); + if (null == connection) { + // Couldn't pull one from the cache. Get a new one. + LOG.debug("Creating a new connection for " + + connectStr + "/" + username); + if (null == username) { + connection = DriverManager.getConnection(connectStr); + } else { + connection = DriverManager.getConnection(connectStr, username, + password); + } } // We only use this for metadata queries. Loosest semantics are okay. @@ -273,5 +416,11 @@ private Class getTypeClass(String className) { } return typeClass; } + + @Override + protected void finalize() throws Throwable { + close(); + super.finalize(); + } } diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java index 83b5f5d9..4f0fc9e5 100644 --- a/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java @@ -19,6 +19,7 @@ package org.apache.hadoop.sqoop.mapreduce; import java.io.IOException; +import java.sql.SQLException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -107,45 +108,53 @@ protected Class getOutputFormatClass() { protected void configureInputFormat(Job job, String tableName, String tableClassName, String splitByCol) throws IOException { ConnManager mgr = new ConnFactory(options.getConf()).getManager(options); - String username = options.getUsername(); - if (null == username || username.length() == 0) { - DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), - options.getConnectString()); - } else { - DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), - options.getConnectString(), username, options.getPassword()); - } + try { + String username = options.getUsername(); + if (null == username || username.length() == 0) { + DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), + options.getConnectString()); + } else { + DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), + options.getConnectString(), username, options.getPassword()); + } - String [] colNames = options.getColumns(); - if (null == colNames) { - colNames = mgr.getColumnNames(tableName); - } + String [] colNames = options.getColumns(); + if (null == colNames) { + colNames = mgr.getColumnNames(tableName); + } - String [] sqlColNames = null; - if (null != colNames) { - sqlColNames = new String[colNames.length]; - for (int i = 0; i < colNames.length; i++) { - sqlColNames[i] = mgr.escapeColName(colNames[i]); + String [] sqlColNames = null; + if (null != colNames) { + sqlColNames = new String[colNames.length]; + for (int i = 0; i < colNames.length; i++) { + sqlColNames[i] = mgr.escapeColName(colNames[i]); + } + } + + // It's ok if the where clause is null in DBInputFormat.setInput. + String whereClause = options.getWhereClause(); + + // We can't set the class properly in here, because we may not have the + // jar loaded in this JVM. So we start by calling setInput() with DBWritable + // and then overriding the string manually. + DataDrivenDBInputFormat.setInput(job, DBWritable.class, + mgr.escapeTableName(tableName), whereClause, + mgr.escapeColName(splitByCol), sqlColNames); + job.getConfiguration().set(DBConfiguration.INPUT_CLASS_PROPERTY, + tableClassName); + + job.getConfiguration().setLong(LargeObjectLoader.MAX_INLINE_LOB_LEN_KEY, + options.getInlineLobLimit()); + + LOG.debug("Using InputFormat: " + inputFormatClass); + job.setInputFormatClass(inputFormatClass); + } finally { + try { + mgr.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing connection: " + sqlE); } } - - // It's ok if the where clause is null in DBInputFormat.setInput. - String whereClause = options.getWhereClause(); - - // We can't set the class properly in here, because we may not have the - // jar loaded in this JVM. So we start by calling setInput() with DBWritable - // and then overriding the string manually. - DataDrivenDBInputFormat.setInput(job, DBWritable.class, - mgr.escapeTableName(tableName), whereClause, - mgr.escapeColName(splitByCol), sqlColNames); - job.getConfiguration().set(DBConfiguration.INPUT_CLASS_PROPERTY, - tableClassName); - - job.getConfiguration().setLong(LargeObjectLoader.MAX_INLINE_LOB_LEN_KEY, - options.getInlineLobLimit()); - - LOG.debug("Using InputFormat: " + inputFormatClass); - job.setInputFormatClass(inputFormatClass); } } diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJob.java b/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJob.java index e8cacf25..0edbc861 100644 --- a/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJob.java +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/ExportJob.java @@ -20,6 +20,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.sql.SQLException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -75,6 +76,7 @@ public void runExport() throws ExportException, IOException { String tableName = context.getTableName(); String tableClassName = new TableClassName(options).getClassForTable(tableName); String ormJarFile = context.getJarFile(); + ConnManager mgr = null; LOG.info("Beginning export of " + tableName); @@ -114,7 +116,7 @@ public void runExport() throws ExportException, IOException { // Concurrent writes of the same records would be problematic. job.setMapSpeculativeExecution(false); - ConnManager mgr = new ConnFactory(conf).getManager(options); + mgr = new ConnFactory(conf).getManager(options); String username = options.getUsername(); if (null == username || username.length() == 0) { DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), @@ -150,6 +152,14 @@ public void runExport() throws ExportException, IOException { // unload the special classloader for this jar. ClassLoaderStack.setCurrentClassLoader(prevClassLoader); } + + if (null != mgr) { + try { + mgr.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing connection: " + sqlE); + } + } } } } diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/MySQLDumpImportJob.java b/src/java/org/apache/hadoop/sqoop/mapreduce/MySQLDumpImportJob.java index 5cd6d7ef..deaace13 100644 --- a/src/java/org/apache/hadoop/sqoop/mapreduce/MySQLDumpImportJob.java +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/MySQLDumpImportJob.java @@ -19,6 +19,7 @@ package org.apache.hadoop.sqoop.mapreduce; import java.io.IOException; +import java.sql.SQLException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -71,58 +72,66 @@ protected void configureInputFormat(Job job, String tableName, ConnManager mgr = new ConnFactory(options.getConf()).getManager(options); - String username = options.getUsername(); - if (null == username || username.length() == 0) { - DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), - options.getConnectString()); - } else { - DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), - options.getConnectString(), username, options.getPassword()); - } + try { + String username = options.getUsername(); + if (null == username || username.length() == 0) { + DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), + options.getConnectString()); + } else { + DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), + options.getConnectString(), username, options.getPassword()); + } - String [] colNames = options.getColumns(); - if (null == colNames) { - colNames = mgr.getColumnNames(tableName); - } + String [] colNames = options.getColumns(); + if (null == colNames) { + colNames = mgr.getColumnNames(tableName); + } - String [] sqlColNames = null; - if (null != colNames) { - sqlColNames = new String[colNames.length]; - for (int i = 0; i < colNames.length; i++) { - sqlColNames[i] = mgr.escapeColName(colNames[i]); + String [] sqlColNames = null; + if (null != colNames) { + sqlColNames = new String[colNames.length]; + for (int i = 0; i < colNames.length; i++) { + sqlColNames[i] = mgr.escapeColName(colNames[i]); + } + } + + // It's ok if the where clause is null in DBInputFormat.setInput. + String whereClause = options.getWhereClause(); + + // We can't set the class properly in here, because we may not have the + // jar loaded in this JVM. So we start by calling setInput() with DBWritable + // and then overriding the string manually. + + // Note that mysqldump also does *not* want a quoted table name. + DataDrivenDBInputFormat.setInput(job, DBWritable.class, + tableName, whereClause, + mgr.escapeColName(splitByCol), sqlColNames); + + Configuration conf = job.getConfiguration(); + conf.setInt(MySQLDumpMapper.OUTPUT_FIELD_DELIM_KEY, + options.getOutputFieldDelim()); + conf.setInt(MySQLDumpMapper.OUTPUT_RECORD_DELIM_KEY, + options.getOutputRecordDelim()); + conf.setInt(MySQLDumpMapper.OUTPUT_ENCLOSED_BY_KEY, + options.getOutputEnclosedBy()); + conf.setInt(MySQLDumpMapper.OUTPUT_ESCAPED_BY_KEY, + options.getOutputEscapedBy()); + conf.setBoolean(MySQLDumpMapper.OUTPUT_ENCLOSE_REQUIRED_KEY, + options.isOutputEncloseRequired()); + String [] extraArgs = options.getExtraArgs(); + if (null != extraArgs) { + conf.setStrings(MySQLDumpMapper.EXTRA_ARGS_KEY, extraArgs); + } + + LOG.debug("Using InputFormat: " + inputFormatClass); + job.setInputFormatClass(getInputFormatClass()); + } finally { + try { + mgr.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing connection: " + sqlE); } } - - // It's ok if the where clause is null in DBInputFormat.setInput. - String whereClause = options.getWhereClause(); - - // We can't set the class properly in here, because we may not have the - // jar loaded in this JVM. So we start by calling setInput() with DBWritable - // and then overriding the string manually. - - // Note that mysqldump also does *not* want a quoted table name. - DataDrivenDBInputFormat.setInput(job, DBWritable.class, - tableName, whereClause, - mgr.escapeColName(splitByCol), sqlColNames); - - Configuration conf = job.getConfiguration(); - conf.setInt(MySQLDumpMapper.OUTPUT_FIELD_DELIM_KEY, - options.getOutputFieldDelim()); - conf.setInt(MySQLDumpMapper.OUTPUT_RECORD_DELIM_KEY, - options.getOutputRecordDelim()); - conf.setInt(MySQLDumpMapper.OUTPUT_ENCLOSED_BY_KEY, - options.getOutputEnclosedBy()); - conf.setInt(MySQLDumpMapper.OUTPUT_ESCAPED_BY_KEY, - options.getOutputEscapedBy()); - conf.setBoolean(MySQLDumpMapper.OUTPUT_ENCLOSE_REQUIRED_KEY, - options.isOutputEncloseRequired()); - String [] extraArgs = options.getExtraArgs(); - if (null != extraArgs) { - conf.setStrings(MySQLDumpMapper.EXTRA_ARGS_KEY, extraArgs); - } - - LOG.debug("Using InputFormat: " + inputFormatClass); - job.setInputFormatClass(getInputFormatClass()); } /** diff --git a/src/test/org/apache/hadoop/sqoop/manager/OracleCompatTest.java b/src/test/org/apache/hadoop/sqoop/manager/OracleCompatTest.java index 77a2f96f..1fc5616b 100644 --- a/src/test/org/apache/hadoop/sqoop/manager/OracleCompatTest.java +++ b/src/test/org/apache/hadoop/sqoop/manager/OracleCompatTest.java @@ -87,7 +87,7 @@ public void tearDown() { // See http://forums.oracle.com/forums/thread.jspa?messageID=1145120 LOG.info("Sleeping to wait for Oracle connection cache clear..."); try { - Thread.sleep(750); + Thread.sleep(250); } catch (InterruptedException ie) { } } diff --git a/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java b/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java index 15c80183..3ba137d8 100644 --- a/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java +++ b/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java @@ -68,6 +68,18 @@ * Create a database user named SQOOPTEST * Set the user's password to '12345' * Grant the user the CREATE TABLE privilege + * + * Oracle XE does a poor job of cleaning up connections in a timely fashion. + * Too many connections too quickly will be rejected, because XE will gc the + * closed connections in a lazy fashion. Oracle tests have a delay built in + * to work with this gc, but it is possible that you will see this error: + * + * ORA-12516, TNS:listener could not find available handler with matching + * protocol stack + * + * If so, log in to your database as SYSTEM and execute the following: + * ALTER SYSTEM SET processes=200 scope=spfile; + * ... then restart your database. */ public class OracleManagerTest extends ImportJobTestCase {