5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-05 06:12:25 +08:00

SQOOP-2509: Sqoop2: Findbugs: Fix resource leak problem in GenericJdbcExecutor and related classes

(Colin Ma via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2015-08-19 16:41:10 -07:00
parent 93d9a77143
commit 1c24ecbde7
6 changed files with 47 additions and 81 deletions

View File

@ -124,16 +124,8 @@ public GenericJdbcExecutor(LinkConfiguration linkConfig) {
} }
} }
public ResultSet executeQuery(String sql) { public Connection getConnection() {
try { return connection;
Statement statement = connection.createStatement(
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
return statement.executeQuery(sql);
} catch (SQLException e) {
logSQLException(e);
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
}
} }
public PreparedStatement createStatement(String sql) { public PreparedStatement createStatement(String sql) {
@ -261,29 +253,21 @@ public void migrateData(String fromTable, String toTable) {
} }
public long getTableRowCount(String tableName) { public long getTableRowCount(String tableName) {
ResultSet resultSet = executeQuery("SELECT COUNT(1) FROM " + encloseIdentifier(tableName)); try (Statement statement = connection.createStatement(
try { ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ResultSet resultSet = statement.executeQuery("SELECT COUNT(1) FROM " + encloseIdentifier(tableName));) {
resultSet.next(); resultSet.next();
return resultSet.getLong(1); return resultSet.getLong(1);
} catch(SQLException e) { } catch(SQLException e) {
throw new SqoopException( throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0004, e); GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0004, e);
} finally {
try {
if(resultSet != null)
resultSet.close();
} catch(SQLException e) {
logSQLException(e, "Got SQLException while closing resultset.");
}
} }
} }
public void executeUpdate(String sql) { public void executeUpdate(String sql) {
try { try (Statement statement = connection.createStatement(
Statement statement = connection.createStatement( ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
statement.executeUpdate(sql); statement.executeUpdate(sql);
} catch (SQLException e) { } catch (SQLException e) {
logSQLException(e); logSQLException(e);
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e); throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
@ -415,7 +399,7 @@ public String[] getPrimaryKey(String ...identifiers) {
// Few shortcuts so that we don't have run full loop // Few shortcuts so that we don't have run full loop
if(primaryKeyColumns.isEmpty()) { if(primaryKeyColumns.isEmpty()) {
return null; return new String[] {};
} else if(primaryKeyColumns.size() == 1){ } else if(primaryKeyColumns.size() == 1){
return new String[] {primaryKeyColumns.get(0).getKey()}; return new String[] {primaryKeyColumns.get(0).getKey()};
} }
@ -434,11 +418,9 @@ public String[] getPrimaryKey(String ...identifiers) {
} }
public String[] getQueryColumns(String query) { public String[] getQueryColumns(String query) {
try { try (Statement statement = connection.createStatement(
Statement statement = connection.createStatement( ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); ResultSet rs = statement.executeQuery(query);) {
ResultSet rs = statement.executeQuery(query);
ResultSetMetaData rsmd = rs.getMetaData(); ResultSetMetaData rsmd = rs.getMetaData();
int count = rsmd.getColumnCount(); int count = rsmd.getColumnCount();
String[] columns = new String[count]; String[] columns = new String[count];

View File

@ -20,6 +20,7 @@
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
@ -50,11 +51,11 @@ public void extract(ExtractorContext context, LinkConfiguration linkConfig, From
LOG.info("Using query: " + query); LOG.info("Using query: " + query);
rowsRead = 0; rowsRead = 0;
ResultSet resultSet = executor.executeQuery(query);
Schema schema = context.getSchema(); Schema schema = context.getSchema();
Column[] schemaColumns = schema.getColumnsArray(); Column[] schemaColumns = schema.getColumnsArray();
try { try (Statement statement = executor.getConnection().createStatement(
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ResultSet resultSet = statement.executeQuery(query);) {
ResultSetMetaData metaData = resultSet.getMetaData(); ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount(); int columnCount = metaData.getColumnCount();
if (schemaColumns.length != columnCount) { if (schemaColumns.length != columnCount) {

View File

@ -21,6 +21,7 @@
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
@ -78,13 +79,11 @@ public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig
} }
Schema schema = new Schema(schemaName); Schema schema = new Schema(schemaName);
ResultSet rs = null;
ResultSetMetaData rsmt = null; ResultSetMetaData rsmt = null;
try { try (Statement statement = executor.getConnection().createStatement(
rs = executor.executeQuery( ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL) ResultSet rs = statement.executeQuery(context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL)
.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0") .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0"));) {
);
rsmt = rs.getMetaData(); rsmt = rs.getMetaData();
for (int i = 1 ; i <= rsmt.getColumnCount(); i++) { for (int i = 1 ; i <= rsmt.getColumnCount(); i++) {
@ -103,13 +102,6 @@ public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig
} catch (SQLException e) { } catch (SQLException e) {
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e); throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e);
} finally { } finally {
if(rs != null) {
try {
rs.close();
} catch (SQLException e) {
LOG.info("Ignoring exception while closing ResultSet", e);
}
}
if (executor != null) { if (executor != null) {
executor.close(); executor.close();
} }
@ -137,7 +129,7 @@ private void configurePartitionProperties(MutableContext context, LinkConfigurat
if (StringUtils.isBlank(partitionColumnName) && tableImport) { if (StringUtils.isBlank(partitionColumnName) && tableImport) {
String [] primaryKeyColumns = executor.getPrimaryKey(jobConf.fromJobConfig.schemaName, jobConf.fromJobConfig.tableName); String [] primaryKeyColumns = executor.getPrimaryKey(jobConf.fromJobConfig.schemaName, jobConf.fromJobConfig.tableName);
LOG.info("Found primary key columns [" + StringUtils.join(primaryKeyColumns, ", ") + "]"); LOG.info("Found primary key columns [" + StringUtils.join(primaryKeyColumns, ", ") + "]");
if(primaryKeyColumns == null) { if(primaryKeyColumns.length == 0) {
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0025, "Please specify partition column."); throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0025, "Please specify partition column.");
} else if (primaryKeyColumns.length > 1) { } else if (primaryKeyColumns.length > 1) {
LOG.warn("Table have compound primary key, for partitioner we're using only first column of the key: " + primaryKeyColumns[0]); LOG.warn("Table have compound primary key, for partitioner we're using only first column of the key: " + primaryKeyColumns[0]);
@ -178,10 +170,9 @@ private void configurePartitionProperties(MutableContext context, LinkConfigurat
String incrementalNewMaxValueQuery = sb.toString(); String incrementalNewMaxValueQuery = sb.toString();
LOG.info("Incremental new max value query: " + incrementalNewMaxValueQuery); LOG.info("Incremental new max value query: " + incrementalNewMaxValueQuery);
ResultSet rs = null; try (Statement statement = executor.getConnection().createStatement(
try { ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
rs = executor.executeQuery(incrementalNewMaxValueQuery); ResultSet rs = statement.executeQuery(incrementalNewMaxValueQuery);) {
if (!rs.next()) { if (!rs.next()) {
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0022); throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0022);
} }
@ -189,10 +180,6 @@ private void configurePartitionProperties(MutableContext context, LinkConfigurat
incrementalMaxValue = rs.getString(1); incrementalMaxValue = rs.getString(1);
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE, incrementalMaxValue); context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE, incrementalMaxValue);
LOG.info("New maximal value for incremental import is " + incrementalMaxValue); LOG.info("New maximal value for incremental import is " + incrementalMaxValue);
} finally {
if(rs != null) {
rs.close();
}
} }
} }

View File

@ -20,6 +20,7 @@
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
@ -70,12 +71,11 @@ public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig
} }
Schema schema = new Schema(schemaName); Schema schema = new Schema(schemaName);
ResultSet rs = null; try (Statement statement = executor.getConnection().createStatement(
ResultSetMetaData rsmt = null; ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
try { ResultSet rs = statement.executeQuery("SELECT * FROM " + schemaName + " WHERE 1 = 0");) {
rs = executor.executeQuery("SELECT * FROM " + schemaName + " WHERE 1 = 0");
rsmt = rs.getMetaData(); ResultSetMetaData rsmt = rs.getMetaData();
for (int i = 1 ; i <= rsmt.getColumnCount(); i++) { for (int i = 1 ; i <= rsmt.getColumnCount(); i++) {
String columnName = rsmt.getColumnName(i); String columnName = rsmt.getColumnName(i);
@ -93,14 +93,6 @@ public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig
return schema; return schema;
} catch (SQLException e) { } catch (SQLException e) {
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e); throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e);
} finally {
if(rs != null) {
try {
rs.close();
} catch (SQLException e) {
LOG.info("Ignoring exception while closing ResultSet", e);
}
}
} }
} }

View File

@ -89,9 +89,9 @@ public void testUnknownDriver() {
@Test @Test
public void testGetPrimaryKey() { public void testGetPrimaryKey() {
assertNull(executor.getPrimaryKey("non-existing-table")); assertEquals(executor.getPrimaryKey("non-existing-table"), new String[] {});
assertNull(executor.getPrimaryKey("non-existing-schema", "non-existing-table")); assertEquals(executor.getPrimaryKey("non-existing-schema", "non-existing-table"), new String[] {});
assertNull(executor.getPrimaryKey("non-existing-catalog", "non-existing-schema", "non-existing-table")); assertEquals(executor.getPrimaryKey("non-existing-catalog", "non-existing-schema", "non-existing-table"), new String[] {});
assertEquals(executor.getPrimaryKey(schema, table), new String[] {"ICOL"}); assertEquals(executor.getPrimaryKey(schema, table), new String[] {"ICOL"});
assertEquals(executor.getPrimaryKey(compoundPrimaryKeyTable), new String[] {"VCOL", "ICOL"}); assertEquals(executor.getPrimaryKey(compoundPrimaryKeyTable), new String[] {"VCOL", "ICOL"});

View File

@ -18,6 +18,7 @@
package org.apache.sqoop.connector.jdbc; package org.apache.sqoop.connector.jdbc;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.Statement;
import org.apache.sqoop.common.MutableContext; import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.common.MutableMapContext;
@ -110,19 +111,22 @@ public void testInsert() throws Exception {
loader.load(loaderContext, linkConfig, jobConfig); loader.load(loaderContext, linkConfig, jobConfig);
int index = START; int index = START;
ResultSet rs = executor.executeQuery("SELECT * FROM " try (Statement statement = executor.getConnection().createStatement(
+ executor.encloseIdentifier(tableName) + " ORDER BY ICOL"); ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
while (rs.next()) { ResultSet rs = statement.executeQuery("SELECT * FROM "
assertEquals(index, rs.getObject(1)); + executor.encloseIdentifier(tableName) + " ORDER BY ICOL");) {
assertEquals((double) index, rs.getObject(2)); while (rs.next()) {
assertEquals(String.valueOf(index), rs.getObject(3)); assertEquals(index, rs.getObject(1));
assertEquals("2004-10-19", rs.getObject(4).toString()); assertEquals((double) index, rs.getObject(2));
assertEquals("2004-10-19 10:23:34.0", rs.getObject(5).toString()); assertEquals(String.valueOf(index), rs.getObject(3));
assertEquals("11:33:59", rs.getObject(6).toString()); assertEquals("2004-10-19", rs.getObject(4).toString());
assertEquals("2004-10-19 10:23:34.0", rs.getObject(5).toString());
assertEquals("11:33:59", rs.getObject(6).toString());
index++; index++;
}
assertEquals(numberOfRows, index - START);
} }
assertEquals(numberOfRows, index-START);
} }
public class DummyReader extends DataReader { public class DummyReader extends DataReader {