5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-21 19:31:13 +08:00

Cache connections to Oracle across ConnManagers.

OracleManager now caches Connection instances for subsequent OracleManager
instances.
Refactored uses of ConnManager to call close() before discarding them.
This allows the Oracle JUnit tests to sleep less frequently to wait for Oracle
to reap closed server-side connection resources, improving Oracle test speed
by 50%.

Sleeping cannot be fully eliminated because MapReduce-side Connections are not
governed by this caching mechanism.

Also added some debugging advice re. this topic to OracleManagerTest's comment.

From: Aaron Kimball <aaron@cloudera.com>

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149872 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Bayer 2011-07-22 20:03:39 +00:00
parent 43f9e2f2b0
commit 2240be8807
7 changed files with 362 additions and 164 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.sqoop;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
@ -182,89 +183,97 @@ public int run(String [] args) {
}
}
if (options.doHiveImport()) {
hiveImport = new HiveImport(options, manager, getConf());
}
try {
if (options.doHiveImport()) {
hiveImport = new HiveImport(options, manager, getConf());
}
SqoopOptions.ControlAction action = options.getAction();
if (action == SqoopOptions.ControlAction.ListTables) {
String [] tables = manager.listTables();
if (null == tables) {
System.err.println("Could not retrieve tables list from server");
LOG.error("manager.listTables() returned null");
return 1;
} else {
for (String tbl : tables) {
System.out.println(tbl);
}
}
} else if (action == SqoopOptions.ControlAction.ListDatabases) {
String [] databases = manager.listDatabases();
if (null == databases) {
System.err.println("Could not retrieve database list from server");
LOG.error("manager.listDatabases() returned null");
return 1;
} else {
for (String db : databases) {
System.out.println(db);
}
}
} else if (action == SqoopOptions.ControlAction.DebugExec) {
// just run a SQL statement for debugging purposes.
manager.execAndPrint(options.getDebugSqlCmd());
return 0;
} else if (action == SqoopOptions.ControlAction.Export) {
// Export a table.
try {
exportTable(options.getTableName());
} catch (IOException ioe) {
LOG.error("Encountered IOException running export job: " + ioe.toString());
if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
throw new RuntimeException(ioe);
} else {
SqoopOptions.ControlAction action = options.getAction();
if (action == SqoopOptions.ControlAction.ListTables) {
String [] tables = manager.listTables();
if (null == tables) {
System.err.println("Could not retrieve tables list from server");
LOG.error("manager.listTables() returned null");
return 1;
}
} catch (ExportException ee) {
LOG.error("Error during export: " + ee.toString());
if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
throw new RuntimeException(ee);
} else {
return 1;
}
}
} else {
// This is either FullImport or GenerateOnly.
try {
if (options.isAllTables()) {
String [] tables = manager.listTables();
if (null == tables) {
System.err.println("Could not retrieve tables list from server");
LOG.error("manager.listTables() returned null");
return 1;
} else {
for (String tableName : tables) {
importTable(tableName);
}
for (String tbl : tables) {
System.out.println(tbl);
}
} else {
// just import a single table the user specified.
importTable(options.getTableName());
}
} catch (IOException ioe) {
LOG.error("Encountered IOException running import job: " + ioe.toString());
if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
throw new RuntimeException(ioe);
} else {
} else if (action == SqoopOptions.ControlAction.ListDatabases) {
String [] databases = manager.listDatabases();
if (null == databases) {
System.err.println("Could not retrieve database list from server");
LOG.error("manager.listDatabases() returned null");
return 1;
}
} catch (ImportException ie) {
LOG.error("Error during import: " + ie.toString());
if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
throw new RuntimeException(ie);
} else {
return 1;
for (String db : databases) {
System.out.println(db);
}
}
} else if (action == SqoopOptions.ControlAction.DebugExec) {
// just run a SQL statement for debugging purposes.
manager.execAndPrint(options.getDebugSqlCmd());
return 0;
} else if (action == SqoopOptions.ControlAction.Export) {
// Export a table.
try {
exportTable(options.getTableName());
} catch (IOException ioe) {
LOG.error("Encountered IOException running export job: " + ioe.toString());
if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
throw new RuntimeException(ioe);
} else {
return 1;
}
} catch (ExportException ee) {
LOG.error("Error during export: " + ee.toString());
if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
throw new RuntimeException(ee);
} else {
return 1;
}
}
} else {
// This is either FullImport or GenerateOnly.
try {
if (options.isAllTables()) {
String [] tables = manager.listTables();
if (null == tables) {
System.err.println("Could not retrieve tables list from server");
LOG.error("manager.listTables() returned null");
return 1;
} else {
for (String tableName : tables) {
importTable(tableName);
}
}
} else {
// just import a single table the user specified.
importTable(options.getTableName());
}
} catch (IOException ioe) {
LOG.error("Encountered IOException running import job: " + ioe.toString());
if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
throw new RuntimeException(ioe);
} else {
return 1;
}
} catch (ImportException ie) {
LOG.error("Error during import: " + ie.toString());
if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
throw new RuntimeException(ie);
} else {
return 1;
}
}
}
} finally {
try {
manager.close();
} catch (SQLException sqlE) {
LOG.warn("Error while closing connection: " + sqlE);
}
}

View File

@ -25,6 +25,8 @@
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import java.lang.reflect.Method;
@ -47,10 +49,142 @@ public class OracleManager extends GenericJdbcManager {
// driver class to ensure is loaded when making db connection.
private static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver";
// Oracle XE does a poor job of releasing server-side resources for
// closed connections. So we actually want to cache connections as
// much as possible. This is especially important for JUnit tests which
// may need to make 60 or more connections (serially), since each test
// uses a different OracleManager instance.
private static class ConnCache {
public static final Log LOG = LogFactory.getLog(ConnCache.class.getName());
private static class CacheKey {
public final String connectString;
public final String username;
public CacheKey(String connect, String user) {
this.connectString = connect;
this.username = user; // note: may be null.
}
@Override
public boolean equals(Object o) {
if (o instanceof CacheKey) {
CacheKey k = (CacheKey) o;
if (null == username) {
return k.username == null && k.connectString.equals(connectString);
} else {
return k.username.equals(username)
&& k.connectString.equals(connectString);
}
} else {
return false;
}
}
@Override
public int hashCode() {
if (null == username) {
return connectString.hashCode();
} else {
return username.hashCode() ^ connectString.hashCode();
}
}
@Override
public String toString() {
return connectString + "/" + username;
}
}
private Map<CacheKey, Connection> connectionMap;
public ConnCache() {
LOG.debug("Instantiated new connection cache.");
connectionMap = new HashMap<CacheKey, Connection>();
}
/**
* @return a Connection instance that can be used to connect to the
* given database, if a previously-opened connection is available in
* the cache. Returns null if none is available in the map.
*/
public synchronized Connection getConnection(String connectStr,
String username) throws SQLException {
CacheKey key = new CacheKey(connectStr, username);
Connection cached = connectionMap.get(key);
if (null != cached) {
connectionMap.remove(key);
if (cached.isReadOnly()) {
// Read-only mode? Don't want it.
cached.close();
}
if (cached.isClosed()) {
// This connection isn't usable.
return null;
}
cached.rollback(); // Reset any transaction state.
cached.clearWarnings();
LOG.debug("Got cached connection for " + key);
}
return cached;
}
/**
* Returns a connection to the cache pool for future use. If a connection
* is already cached for the connectstring/username pair, then this
* connection is closed and discarded.
*/
public synchronized void recycle(String connectStr, String username,
Connection conn) throws SQLException {
CacheKey key = new CacheKey(connectStr, username);
Connection existing = connectionMap.get(key);
if (null != existing) {
// Cache is already full for this entry.
LOG.debug("Discarding additional connection for " + key);
conn.close();
return;
}
// Put it in the map for later use.
LOG.debug("Caching released connection for " + key);
connectionMap.put(key, conn);
}
@Override
protected synchronized void finalize() throws Throwable {
for (Connection c : connectionMap.values()) {
c.close();
}
super.finalize();
}
}
private static final ConnCache CACHE;
static {
CACHE = new ConnCache();
}
public OracleManager(final SqoopOptions opts) {
super(DRIVER_CLASS, opts);
}
public void close() throws SQLException {
release(); // Release any open statements associated with the connection.
if (hasOpenConnection()) {
// Release our open connection back to the cache.
CACHE.recycle(options.getConnectString(), options.getUsername(),
getConnection());
}
}
protected String getColNamesQuery(String tableName) {
// SqlManager uses "tableName AS t" which doesn't work in Oracle.
return "SELECT t.* FROM " + escapeTableName(tableName) + " t";
@ -77,10 +211,19 @@ protected Connection makeConnection() throws SQLException {
String username = options.getUsername();
String password = options.getPassword();
if (null == username) {
connection = DriverManager.getConnection(options.getConnectString());
} else {
connection = DriverManager.getConnection(options.getConnectString(), username, password);
String connectStr = options.getConnectString();
connection = CACHE.getConnection(connectStr, username);
if (null == connection) {
// Couldn't pull one from the cache. Get a new one.
LOG.debug("Creating a new connection for "
+ connectStr + "/" + username);
if (null == username) {
connection = DriverManager.getConnection(connectStr);
} else {
connection = DriverManager.getConnection(connectStr, username,
password);
}
}
// We only use this for metadata queries. Loosest semantics are okay.
@ -273,5 +416,11 @@ private Class getTypeClass(String className) {
}
return typeClass;
}
@Override
protected void finalize() throws Throwable {
close();
super.finalize();
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.sqoop.mapreduce;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -107,45 +108,53 @@ protected Class<? extends OutputFormat> getOutputFormatClass() {
protected void configureInputFormat(Job job, String tableName,
String tableClassName, String splitByCol) throws IOException {
ConnManager mgr = new ConnFactory(options.getConf()).getManager(options);
String username = options.getUsername();
if (null == username || username.length() == 0) {
DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
options.getConnectString());
} else {
DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
options.getConnectString(), username, options.getPassword());
}
try {
String username = options.getUsername();
if (null == username || username.length() == 0) {
DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
options.getConnectString());
} else {
DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
options.getConnectString(), username, options.getPassword());
}
String [] colNames = options.getColumns();
if (null == colNames) {
colNames = mgr.getColumnNames(tableName);
}
String [] colNames = options.getColumns();
if (null == colNames) {
colNames = mgr.getColumnNames(tableName);
}
String [] sqlColNames = null;
if (null != colNames) {
sqlColNames = new String[colNames.length];
for (int i = 0; i < colNames.length; i++) {
sqlColNames[i] = mgr.escapeColName(colNames[i]);
String [] sqlColNames = null;
if (null != colNames) {
sqlColNames = new String[colNames.length];
for (int i = 0; i < colNames.length; i++) {
sqlColNames[i] = mgr.escapeColName(colNames[i]);
}
}
// It's ok if the where clause is null in DBInputFormat.setInput.
String whereClause = options.getWhereClause();
// We can't set the class properly in here, because we may not have the
// jar loaded in this JVM. So we start by calling setInput() with DBWritable
// and then overriding the string manually.
DataDrivenDBInputFormat.setInput(job, DBWritable.class,
mgr.escapeTableName(tableName), whereClause,
mgr.escapeColName(splitByCol), sqlColNames);
job.getConfiguration().set(DBConfiguration.INPUT_CLASS_PROPERTY,
tableClassName);
job.getConfiguration().setLong(LargeObjectLoader.MAX_INLINE_LOB_LEN_KEY,
options.getInlineLobLimit());
LOG.debug("Using InputFormat: " + inputFormatClass);
job.setInputFormatClass(inputFormatClass);
} finally {
try {
mgr.close();
} catch (SQLException sqlE) {
LOG.warn("Error closing connection: " + sqlE);
}
}
// It's ok if the where clause is null in DBInputFormat.setInput.
String whereClause = options.getWhereClause();
// We can't set the class properly in here, because we may not have the
// jar loaded in this JVM. So we start by calling setInput() with DBWritable
// and then overriding the string manually.
DataDrivenDBInputFormat.setInput(job, DBWritable.class,
mgr.escapeTableName(tableName), whereClause,
mgr.escapeColName(splitByCol), sqlColNames);
job.getConfiguration().set(DBConfiguration.INPUT_CLASS_PROPERTY,
tableClassName);
job.getConfiguration().setLong(LargeObjectLoader.MAX_INLINE_LOB_LEN_KEY,
options.getInlineLobLimit());
LOG.debug("Using InputFormat: " + inputFormatClass);
job.setInputFormatClass(inputFormatClass);
}
}

View File

@ -20,6 +20,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -75,6 +76,7 @@ public void runExport() throws ExportException, IOException {
String tableName = context.getTableName();
String tableClassName = new TableClassName(options).getClassForTable(tableName);
String ormJarFile = context.getJarFile();
ConnManager mgr = null;
LOG.info("Beginning export of " + tableName);
@ -114,7 +116,7 @@ public void runExport() throws ExportException, IOException {
// Concurrent writes of the same records would be problematic.
job.setMapSpeculativeExecution(false);
ConnManager mgr = new ConnFactory(conf).getManager(options);
mgr = new ConnFactory(conf).getManager(options);
String username = options.getUsername();
if (null == username || username.length() == 0) {
DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
@ -150,6 +152,14 @@ public void runExport() throws ExportException, IOException {
// unload the special classloader for this jar.
ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
}
if (null != mgr) {
try {
mgr.close();
} catch (SQLException sqlE) {
LOG.warn("Error closing connection: " + sqlE);
}
}
}
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.sqoop.mapreduce;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -71,58 +72,66 @@ protected void configureInputFormat(Job job, String tableName,
ConnManager mgr = new ConnFactory(options.getConf()).getManager(options);
String username = options.getUsername();
if (null == username || username.length() == 0) {
DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
options.getConnectString());
} else {
DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
options.getConnectString(), username, options.getPassword());
}
try {
String username = options.getUsername();
if (null == username || username.length() == 0) {
DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
options.getConnectString());
} else {
DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
options.getConnectString(), username, options.getPassword());
}
String [] colNames = options.getColumns();
if (null == colNames) {
colNames = mgr.getColumnNames(tableName);
}
String [] colNames = options.getColumns();
if (null == colNames) {
colNames = mgr.getColumnNames(tableName);
}
String [] sqlColNames = null;
if (null != colNames) {
sqlColNames = new String[colNames.length];
for (int i = 0; i < colNames.length; i++) {
sqlColNames[i] = mgr.escapeColName(colNames[i]);
String [] sqlColNames = null;
if (null != colNames) {
sqlColNames = new String[colNames.length];
for (int i = 0; i < colNames.length; i++) {
sqlColNames[i] = mgr.escapeColName(colNames[i]);
}
}
// It's ok if the where clause is null in DBInputFormat.setInput.
String whereClause = options.getWhereClause();
// We can't set the class properly in here, because we may not have the
// jar loaded in this JVM. So we start by calling setInput() with DBWritable
// and then overriding the string manually.
// Note that mysqldump also does *not* want a quoted table name.
DataDrivenDBInputFormat.setInput(job, DBWritable.class,
tableName, whereClause,
mgr.escapeColName(splitByCol), sqlColNames);
Configuration conf = job.getConfiguration();
conf.setInt(MySQLDumpMapper.OUTPUT_FIELD_DELIM_KEY,
options.getOutputFieldDelim());
conf.setInt(MySQLDumpMapper.OUTPUT_RECORD_DELIM_KEY,
options.getOutputRecordDelim());
conf.setInt(MySQLDumpMapper.OUTPUT_ENCLOSED_BY_KEY,
options.getOutputEnclosedBy());
conf.setInt(MySQLDumpMapper.OUTPUT_ESCAPED_BY_KEY,
options.getOutputEscapedBy());
conf.setBoolean(MySQLDumpMapper.OUTPUT_ENCLOSE_REQUIRED_KEY,
options.isOutputEncloseRequired());
String [] extraArgs = options.getExtraArgs();
if (null != extraArgs) {
conf.setStrings(MySQLDumpMapper.EXTRA_ARGS_KEY, extraArgs);
}
LOG.debug("Using InputFormat: " + inputFormatClass);
job.setInputFormatClass(getInputFormatClass());
} finally {
try {
mgr.close();
} catch (SQLException sqlE) {
LOG.warn("Error closing connection: " + sqlE);
}
}
// It's ok if the where clause is null in DBInputFormat.setInput.
String whereClause = options.getWhereClause();
// We can't set the class properly in here, because we may not have the
// jar loaded in this JVM. So we start by calling setInput() with DBWritable
// and then overriding the string manually.
// Note that mysqldump also does *not* want a quoted table name.
DataDrivenDBInputFormat.setInput(job, DBWritable.class,
tableName, whereClause,
mgr.escapeColName(splitByCol), sqlColNames);
Configuration conf = job.getConfiguration();
conf.setInt(MySQLDumpMapper.OUTPUT_FIELD_DELIM_KEY,
options.getOutputFieldDelim());
conf.setInt(MySQLDumpMapper.OUTPUT_RECORD_DELIM_KEY,
options.getOutputRecordDelim());
conf.setInt(MySQLDumpMapper.OUTPUT_ENCLOSED_BY_KEY,
options.getOutputEnclosedBy());
conf.setInt(MySQLDumpMapper.OUTPUT_ESCAPED_BY_KEY,
options.getOutputEscapedBy());
conf.setBoolean(MySQLDumpMapper.OUTPUT_ENCLOSE_REQUIRED_KEY,
options.isOutputEncloseRequired());
String [] extraArgs = options.getExtraArgs();
if (null != extraArgs) {
conf.setStrings(MySQLDumpMapper.EXTRA_ARGS_KEY, extraArgs);
}
LOG.debug("Using InputFormat: " + inputFormatClass);
job.setInputFormatClass(getInputFormatClass());
}
/**

View File

@ -87,7 +87,7 @@ public void tearDown() {
// See http://forums.oracle.com/forums/thread.jspa?messageID=1145120
LOG.info("Sleeping to wait for Oracle connection cache clear...");
try {
Thread.sleep(750);
Thread.sleep(250);
} catch (InterruptedException ie) {
}
}

View File

@ -68,6 +68,18 @@
* Create a database user named SQOOPTEST
* Set the user's password to '12345'
* Grant the user the CREATE TABLE privilege
*
* Oracle XE does a poor job of cleaning up connections in a timely fashion.
* Too many connections too quickly will be rejected, because XE will gc the
* closed connections in a lazy fashion. Oracle tests have a delay built in
* to work with this gc, but it is possible that you will see this error:
*
* ORA-12516, TNS:listener could not find available handler with matching
* protocol stack
*
* If so, log in to your database as SYSTEM and execute the following:
* ALTER SYSTEM SET processes=200 scope=spfile;
* ... then restart your database.
*/
public class OracleManagerTest extends ImportJobTestCase {