5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-09 21:51:01 +08:00

SQOOP-2518: Sqoop2: Findbugs: Fix warning in repository-derby module

(Colin Ma via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2015-08-24 09:38:03 -07:00
parent d26c5031bd
commit c4b2ac61f2
4 changed files with 96 additions and 154 deletions

View File

@ -93,7 +93,6 @@ public String name() {
@Override
public synchronized void initialize(JdbcRepositoryContext ctx) {
repoContext = ctx;
repoContext.getDataSource();
LOG.info("DerbyRepositoryHandler initialized.");
}
@ -119,8 +118,7 @@ public synchronized void shutdown() {
LOG.debug("Attempting to shutdown embedded Derby using URL: "
+ shutDownUrl);
try {
DriverManager.getConnection(shutDownUrl);
try (Connection tempConnection = DriverManager.getConnection(shutDownUrl)) {
} catch (SQLException ex) {
// Shutdown for one db instance is expected to raise SQL STATE 45000
if (ex.getErrorCode() != 45000) {
@ -145,19 +143,14 @@ public synchronized void shutdown() {
* @return
*/
public int detectRepositoryVersion(Connection conn) {
ResultSet rs = null;
PreparedStatement stmt = null;
// First release went out without system table, so we have to detect
// this version differently.
try {
rs = conn.getMetaData().getTables(null, null, null, null);
try (ResultSet rs = conn.getMetaData().getTables(null, null, null, null)) {
Set<String> tableNames = new HashSet<String>();
while(rs.next()) {
tableNames.add(rs.getString("TABLE_NAME"));
}
closeResultSets(rs);
LOG.debug("Detecting existing version of repository");
boolean foundAll = true;
@ -175,30 +168,25 @@ public int detectRepositoryVersion(Connection conn) {
} catch (SQLException e) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0006, e);
} finally {
closeResultSets(rs);
}
// Normal version detection, select and return the version
try {
try (PreparedStatement stmt = conn.prepareStatement(STMT_SELECT_DEPRECATED_OR_NEW_SYSTEM_VERSION)) {
// NOTE: Since we can different types of version stored in system table, we renamed the
// key name for the repository version from "version" to "repository.version" for clarity
stmt = conn.prepareStatement(STMT_SELECT_DEPRECATED_OR_NEW_SYSTEM_VERSION);
stmt.setString(1, CommonRepoConstants.SYSKEY_VERSION);
stmt.setString(2, DerbyRepoConstants.SYSKEY_VERSION);
rs = stmt.executeQuery();
try (ResultSet rs = stmt.executeQuery()){
if (!rs.next()) {
return 0;
}
return rs.getInt(1);
}
} catch (SQLException e) {
LOG.info("Can't fetch repository structure version.", e);
return 0;
} finally {
closeResultSets(rs);
closeStatements(stmt);
}
}
@ -337,12 +325,10 @@ public void createOrUpgradeRepository(Connection conn) {
*/
private void migrateFromUnnamedConstraintsToNamedConstraints(Connection conn) {
// Get unnamed constraints
PreparedStatement fetchUnnamedConstraintsStmt = null;
Statement dropUnnamedConstraintsStmt = null;
Map<String, List<String>> autoConstraintNameMap = new TreeMap<String, List<String>>();
try {
fetchUnnamedConstraintsStmt = conn.prepareStatement(STMT_FETCH_TABLE_FOREIGN_KEYS);
try (PreparedStatement fetchUnnamedConstraintsStmt = conn.prepareStatement(STMT_FETCH_TABLE_FOREIGN_KEYS)) {
for (String tableName : new String[] {
DerbySchemaConstants.TABLE_SQ_FORM_NAME,
CommonRepositorySchemaConstants.TABLE_SQ_INPUT_NAME,
@ -363,34 +349,30 @@ private void migrateFromUnnamedConstraintsToNamedConstraints(Connection conn) {
if (fetchUnnamedConstraintsStmt.execute()) {
LOG.info("QUERY(" + STMT_FETCH_TABLE_FOREIGN_KEYS + ") with args: [" + tableName + "]");
ResultSet rs = fetchUnnamedConstraintsStmt.getResultSet();
try (ResultSet rs = fetchUnnamedConstraintsStmt.getResultSet()) {
while (rs.next()) {
autoConstraintNames.add(rs.getString(1));
}
rs.close();
}
}
}
} catch (SQLException e) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e);
} finally {
closeStatements(fetchUnnamedConstraintsStmt);
}
// Drop constraints
for (String tableName : autoConstraintNameMap.keySet()) {
for (String constraintName : autoConstraintNameMap.get(tableName)) {
for (Map.Entry<String, List<String>> entry : autoConstraintNameMap.entrySet()) {
for (String constraintName : entry.getValue()) {
String query = DerbySchemaUpgradeQuery.getDropConstraintQuery(
SCHEMA_SQOOP, tableName, constraintName);
try {
dropUnnamedConstraintsStmt = conn.createStatement();
SCHEMA_SQOOP, entry.getKey(), constraintName);
try (Statement dropUnnamedConstraintsStmt = conn.createStatement()) {
dropUnnamedConstraintsStmt.execute(query);
} catch (SQLException e) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e);
} finally {
LOG.info("QUERY(" + query + ")");
closeStatements(dropUnnamedConstraintsStmt);
}
}
}
@ -547,14 +529,12 @@ protected Map<Direction, Long> insertDirections(Connection conn) {
*/
protected void updateDirections(Connection conn, Map<Direction, Long> directionMap) {
// Remember directions
Statement fetchFormsStmt = null,
fetchConnectorsStmt = null;
List<Long> connectorIds = new LinkedList<Long>();
List<Long> configIds = new LinkedList<Long>();
List<String> directions = new LinkedList<String>();
try {
fetchFormsStmt = conn.createStatement();
ResultSet rs = fetchFormsStmt.executeQuery(STMT_FETCH_CONFIG_DIRECTIONS);
try (Statement fetchFormsStmt = conn.createStatement();
ResultSet rs = fetchFormsStmt.executeQuery(STMT_FETCH_CONFIG_DIRECTIONS);) {
while (rs.next()) {
configIds.add(rs.getLong(1));
directions.add(rs.getString(2));
@ -562,8 +542,6 @@ protected void updateDirections(Connection conn, Map<Direction, Long> directionM
rs.close();
} catch (SQLException e) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e);
} finally {
closeStatements(fetchFormsStmt);
}
// Change Schema
@ -582,17 +560,14 @@ protected void updateDirections(Connection conn, Map<Direction, Long> directionM
}
// Add connector directions
try {
fetchConnectorsStmt = conn.createStatement();
ResultSet rs = fetchConnectorsStmt.executeQuery(STMT_SELECT_CONNECTOR_ALL);
try (Statement fetchConnectorsStmt = conn.createStatement();
ResultSet rs = fetchConnectorsStmt.executeQuery(STMT_SELECT_CONNECTOR_ALL);) {
while (rs.next()) {
connectorIds.add(rs.getLong(1));
}
rs.close();
} catch (SQLException e) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e);
} finally {
closeStatements(fetchConnectorsStmt);
}
for (Long connectorId : connectorIds) {
@ -652,7 +627,7 @@ private void updateJobConfigInputForHdfsConnector(Connection conn, long hdfsConn
"output");
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR, conn,
new Long(hdfsConnectorId), "input", "output");
Long.valueOf(hdfsConnectorId), "input", "output");
//update the names of the configs
// 1. input ==> fromJobConfig
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_NAME, conn,
@ -677,15 +652,15 @@ private void updateJobConfigInputForHdfsConnector(Connection conn, long hdfsConn
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DIRECTION_TO_NULL, conn,
"throttling");
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DRIVER_INDEX, conn,
new Long(0), "throttling");
Long.valueOf(0), "throttling");
Long connectionId = createHdfsConnection(conn, hdfsConnectorId);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION_COPY_SQB_FROM_CONNECTION, conn,
"EXPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_CONNECTION, conn,
new Long(connectionId), "EXPORT");
connectionId, "EXPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION, conn,
new Long(connectionId), "IMPORT");
connectionId, "IMPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_FROM_JOB_INPUT_NAMES, conn,
"fromJobConfig", "fromJobConfig", Direction.FROM.toString());
@ -735,10 +710,8 @@ protected long registerDriver(Connection conn) {
LOG.trace("Begin Driver loading.");
}
PreparedStatement baseDriverStmt = null;
try {
baseDriverStmt = conn.prepareStatement(crudQueries.getStmtInsertIntoConfigurable(),
Statement.RETURN_GENERATED_KEYS);
try (PreparedStatement baseDriverStmt = conn.prepareStatement(crudQueries.getStmtInsertIntoConfigurable(),
Statement.RETURN_GENERATED_KEYS);) {
baseDriverStmt.setString(1, MDriver.DRIVER_NAME);
baseDriverStmt.setString(2, Driver.getClassName());
baseDriverStmt.setString(3, "1");
@ -749,16 +722,15 @@ protected long registerDriver(Connection conn) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0003, Integer.toString(baseDriverCount));
}
ResultSet rsetDriverId = baseDriverStmt.getGeneratedKeys();
try (ResultSet rsetDriverId = baseDriverStmt.getGeneratedKeys()) {
if (!rsetDriverId.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0004);
}
return rsetDriverId.getLong(1);
}
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0009, ex);
} finally {
closeStatements(baseDriverStmt);
}
}
@ -785,12 +757,11 @@ protected long registerHdfsConnector(Connection conn) {
if (handler.getConnectorConfigurable().getPersistenceId() != -1) {
return handler.getConnectorConfigurable().getPersistenceId();
}
PreparedStatement baseConnectorStmt = null;
if (handler.getUniqueName().equals(CONNECTOR_HDFS)) {
try {
baseConnectorStmt = conn.prepareStatement(
try (PreparedStatement baseConnectorStmt = conn.prepareStatement(
STMT_INSERT_INTO_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS,
Statement.RETURN_GENERATED_KEYS);
Statement.RETURN_GENERATED_KEYS)) {
baseConnectorStmt.setString(1, handler.getConnectorConfigurable().getUniqueName());
baseConnectorStmt.setString(2, handler.getConnectorConfigurable().getClassName());
baseConnectorStmt.setString(3, "0");
@ -805,8 +776,6 @@ protected long registerHdfsConnector(Connection conn) {
}
} catch (SQLException e) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0004);
} finally {
closeStatements(baseConnectorStmt);
}
break;
@ -829,11 +798,9 @@ private Long createHdfsConnection(Connection conn, Long connectorId) {
LOG.trace("Creating HDFS link.");
}
PreparedStatement stmt = null;
int result;
try {
stmt = conn.prepareStatement(STMT_INSERT_CONNECTION,
Statement.RETURN_GENERATED_KEYS);
try (PreparedStatement stmt = conn.prepareStatement(STMT_INSERT_CONNECTION,
Statement.RETURN_GENERATED_KEYS)) {
stmt.setString(1, CONNECTOR_HDFS);
stmt.setLong(2, connectorId);
stmt.setBoolean(3, true);
@ -847,7 +814,7 @@ private Long createHdfsConnection(Connection conn, Long connectorId) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0003,
Integer.toString(result));
}
ResultSet rsetConnectionId = stmt.getGeneratedKeys();
try (ResultSet rsetConnectionId = stmt.getGeneratedKeys()) {
if (!rsetConnectionId.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0004);
@ -858,10 +825,9 @@ private Long createHdfsConnection(Connection conn, Long connectorId) {
}
return rsetConnectionId.getLong(1);
}
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0005, ex);
} finally {
closeStatements(stmt);
}
}
@ -876,11 +842,10 @@ private Long createHdfsLinkForm(Connection conn, Long connectorId) {
LOG.trace("Creating HDFS link.");
}
PreparedStatement stmt = null;
int result;
try {
try (PreparedStatement stmt = conn.prepareStatement(STMT_INSERT_INTO_FORM, Statement.RETURN_GENERATED_KEYS)) {
short index = 0;
stmt = conn.prepareStatement(STMT_INSERT_INTO_FORM, Statement.RETURN_GENERATED_KEYS);
stmt.setLong(1, connectorId);
stmt.setString(2, "linkConfig");
// it could also be set to the deprecated "CONNECTION"
@ -890,7 +855,7 @@ private Long createHdfsLinkForm(Connection conn, Long connectorId) {
if (result != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0003, Integer.toString(result));
}
ResultSet rsetFormId = stmt.getGeneratedKeys();
try (ResultSet rsetFormId = stmt.getGeneratedKeys()) {
if (!rsetFormId.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0004);
@ -900,10 +865,9 @@ private Long createHdfsLinkForm(Connection conn, Long connectorId) {
LOG.trace("Created HDFS connector link FORM.");
}
return rsetFormId.getLong(1);
}
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0005, ex);
} finally {
closeStatements(stmt);
}
}

View File

@ -19,6 +19,7 @@
import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.*;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@ -182,18 +183,19 @@ public final class DerbySchemaConstants {
*/
public static final Set<String> tablesV1;
static {
tablesV1 = new HashSet<String>();
tablesV1.add(TABLE_SQ_CONNECTOR_NAME);
tablesV1.add(TABLE_SQ_CONNECTION_NAME);
tablesV1.add(TABLE_SQ_CONNECTION_INPUT_NAME);
tablesV1.add(TABLE_SQ_COUNTER_NAME);
tablesV1.add(TABLE_SQ_COUNTER_GROUP_NAME);
tablesV1.add(TABLE_SQ_COUNTER_SUBMISSION_NAME);
tablesV1.add(TABLE_SQ_FORM_NAME);
tablesV1.add(TABLE_SQ_INPUT_NAME);
tablesV1.add(TABLE_SQ_JOB_NAME);
tablesV1.add(TABLE_SQ_JOB_INPUT_NAME);
tablesV1.add(TABLE_SQ_SUBMISSION_NAME);
Set<String> tempTablesV1 = new HashSet<String>();
tempTablesV1.add(TABLE_SQ_CONNECTOR_NAME);
tempTablesV1.add(TABLE_SQ_CONNECTION_NAME);
tempTablesV1.add(TABLE_SQ_CONNECTION_INPUT_NAME);
tempTablesV1.add(TABLE_SQ_COUNTER_NAME);
tempTablesV1.add(TABLE_SQ_COUNTER_GROUP_NAME);
tempTablesV1.add(TABLE_SQ_COUNTER_SUBMISSION_NAME);
tempTablesV1.add(TABLE_SQ_FORM_NAME);
tempTablesV1.add(TABLE_SQ_INPUT_NAME);
tempTablesV1.add(TABLE_SQ_JOB_NAME);
tempTablesV1.add(TABLE_SQ_JOB_INPUT_NAME);
tempTablesV1.add(TABLE_SQ_SUBMISSION_NAME);
tablesV1 = Collections.unmodifiableSet(tempTablesV1);
}
private DerbySchemaConstants() {

View File

@ -123,19 +123,15 @@ public void execute() {
}
private Long getConfigId(boolean direction, String ... args) {
PreparedStatement statement = null;
String configIdQuery = (direction) ?
DerbySchemaUpgradeQuery.QUERY_SELECT_CONFIG_ID_BY_NAME_AND_DIRECTION : DerbySchemaUpgradeQuery.QUERY_SELECT_CONFIG_ID_BY_NAME;
try {
statement = connection.prepareStatement(configIdQuery);
try (PreparedStatement statement = connection.prepareStatement(configIdQuery);) {
for (int i = 0; i < args.length; ++i) {
statement.setString(i + 1, args[i]);
}
ResultSet configIdResultSet = statement.executeQuery();
try (ResultSet configIdResultSet = statement.executeQuery()) {
LOG.debug("QUERY(" + configIdQuery + ") with args [" + StringUtils.join(args, ",") + "] fetch size: " + configIdResultSet.getFetchSize());
if (!configIdResultSet.next() || configIdResultSet.getFetchSize() != 1) {
@ -143,19 +139,16 @@ private Long getConfigId(boolean direction, String ... args) {
}
return configIdResultSet.getLong(1);
}
} catch (SQLException e) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0002, e);
} finally {
handler.closeStatements(statement);
}
}
private void renameConfig(long configId, String configName) {
PreparedStatement statement = null;
String query = DerbySchemaUpgradeQuery.QUERY_UPDATE_TABLE_SQ_CONFIG_NAME;
try {
statement = connection.prepareStatement(query);
try (PreparedStatement statement = connection.prepareStatement(query)) {
statement.setString(1, configName);
statement.setLong(2, configId);
@ -163,19 +156,15 @@ private void renameConfig(long configId, String configName) {
LOG.debug("QUERY(" + query + ") with args [" + configName + ", " + configId + "] update count: " + updateCount);
} catch (SQLException e) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0002, e);
} finally {
handler.closeStatements(statement);
}
}
private void renameConfigInputs(long configId, Map<String, String> inputNameMap) {
PreparedStatement statement = null;
try {
statement = connection.prepareStatement(DerbySchemaUpgradeQuery.QUERY_UPDATE_TABLE_SQ_INPUT_SQI_NAME);
for (String inputName : inputNameMap.keySet()) {
statement.setString(1, inputNameMap.get(inputName));
try (PreparedStatement statement = connection.prepareStatement(DerbySchemaUpgradeQuery.QUERY_UPDATE_TABLE_SQ_INPUT_SQI_NAME)) {
for (Map.Entry<String, String> entry : inputNameMap.entrySet()) {
String inputName = entry.getKey();
statement.setString(1, entry.getValue());
statement.setString(2, inputName);
statement.setLong(3, configId);
statement.addBatch();
@ -189,8 +178,6 @@ private void renameConfigInputs(long configId, Map<String, String> inputNameMap)
+ StringUtils.join(ArrayUtils.toObject(updateCounts), ","));
} catch (SQLException e) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0002, e);
} finally {
handler.closeStatements(statement);
}
}
}

View File

@ -66,31 +66,23 @@ public UniqueJobRename(Connection conn) {
public void execute() {
Map<Long, String> idToNewNameMap = new TreeMap<Long, String>();
Statement fetchJobStmt = null;
PreparedStatement updateJobStmt = null;
ResultSet fetchJobResultSet = null;
// Fetch all non-unique job IDs and Names.
// Transform names.
try {
fetchJobStmt = conn.createStatement();
fetchJobResultSet = fetchJobStmt.executeQuery(QUERY_SELECT_JOBS_WITH_NON_UNIQUE_NAMES);
try (Statement fetchJobStmt = conn.createStatement();
ResultSet fetchJobResultSet = fetchJobStmt.executeQuery(QUERY_SELECT_JOBS_WITH_NON_UNIQUE_NAMES)) {
while (fetchJobResultSet.next()) {
idToNewNameMap.put(fetchJobResultSet.getLong(1), getNewName(fetchJobResultSet.getString(2)));
}
} catch (SQLException e) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e);
} finally {
CommonRepoUtils.closeResultSets(fetchJobResultSet);
CommonRepoUtils.closeStatements(fetchJobStmt);
}
try (PreparedStatement updateJobStmt = conn.prepareStatement(QUERY_UPDATE_JOB_NAME_BY_ID)) {
try {
updateJobStmt = conn.prepareStatement(QUERY_UPDATE_JOB_NAME_BY_ID);
for (Long jobId : idToNewNameMap.keySet()) {
updateJobStmt.setString(1, idToNewNameMap.get(jobId));
updateJobStmt.setLong(2, jobId);
for (Map.Entry<Long, String> entry : idToNewNameMap.entrySet()) {
updateJobStmt.setString(1, entry.getValue());
updateJobStmt.setLong(2, entry.getKey());
updateJobStmt.addBatch();
}
@ -99,14 +91,11 @@ public void execute() {
if (count != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0000,
"Update count wrong when changing names for non-unique jobs. Update coutns are: "
+ StringUtils.join(Arrays.asList(counts), ","));
+ Arrays.toString(counts));
}
}
} catch (SQLException e) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e);
} finally {
CommonRepoUtils.closeResultSets(fetchJobResultSet);
CommonRepoUtils.closeStatements(fetchJobStmt);
}
}