From 6cd9e968883a70f83714e766be55a25fe23e32c0 Mon Sep 17 00:00:00 2001 From: Bilung Lee Date: Wed, 17 Oct 2012 09:42:06 -0700 Subject: [PATCH] SQOOP-630: Check if in-use connection/job before deletion (Jarek Jarcec Cecho) --- .../sqoop/repository/JdbcRepository.java | 8 ++++ .../repository/JdbcRepositoryHandler.java | 44 ++++++++++++++----- .../sqoop/repository/RepositoryError.java | 6 +++ .../repository/derby/DerbyRepoError.java | 3 ++ .../derby/DerbyRepositoryHandler.java | 30 +++++++++++++ .../repository/derby/DerbySchemaQuery.java | 6 +++ 6 files changed, 85 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java index e76a0ccf..8d7b95cd 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java @@ -206,6 +206,10 @@ public Object doIt(Connection conn) { throw new SqoopException(RepositoryError.JDBCREPO_0017, "Invalid id: " + connectionId); } + if(handler.inUseConnection(connectionId, conn)) { + throw new SqoopException(RepositoryError.JDBCREPO_0021, + "Id in use: " + connectionId); + } handler.deleteConnection(connectionId, conn); return null; @@ -292,6 +296,10 @@ public Object doIt(Connection conn) { throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + id); } + if(handler.inUseJob(id, conn)) { + throw new SqoopException(RepositoryError.JDBCREPO_0022, + "Id in use: " + id); + } handler.deleteJob(id, conn); return null; diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java index ff10beef..b0c9780b 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java @@ -130,28 +130,38 @@ public interface JdbcRepositoryHandler { /** * Check if given connection exists in metastore. * - * @param id Connection id + * @param connetionId Connection id * @param conn Connection to metadata repository * @return True if the connection exists */ - boolean existsConnection(long id, Connection conn); + boolean existsConnection(long connetionId, Connection conn); + + /** + * Check if given Connection id is referenced somewhere and thus can't + * be removed. + * + * @param connectionId Connection id + * @param conn Connection to metadata repository + * @return + */ + boolean inUseConnection(long connectionId, Connection conn); /** * Delete connection with given id from metadata repository. * - * @param id Connection object that should be removed from repository + * @param connectionId Connection object that should be removed from repository * @param conn Connection to metadata repository */ - void deleteConnection(long id, Connection conn); + void deleteConnection(long connectionId, Connection conn); /** * Find connection with given id in repository. * - * @param id Connection id + * @param connectionId Connection id * @param conn Connection to metadata repository * @return Deserialized form of the connection that is saved in repository */ - MConnection findConnection(long id, Connection conn); + MConnection findConnection(long connectionId, Connection conn); /** * Get all connection objects. @@ -184,28 +194,38 @@ public interface JdbcRepositoryHandler { /** * Check if given job exists in metastore. * - * @param id Job id + * @param jobId Job id * @param conn Connection to metadata repository * @return True if the job exists */ - boolean existsJob(long id, Connection conn); + boolean existsJob(long jobId, Connection conn); + + /** + * Check if given job id is referenced somewhere and thus can't + * be removed. + * + * @param jobId Job id + * @param conn Connection to metadata repository + * @return + */ + boolean inUseJob(long jobId, Connection conn); /** * Delete job with given id from metadata repository. * - * @param id Job object that should be removed from repository + * @param jobId Job object that should be removed from repository * @param conn Connection to metadata repository */ - void deleteJob(long id, Connection conn); + void deleteJob(long jobId, Connection conn); /** * Find job with given id in repository. * - * @param id Job id + * @param jobId Job id * @param conn Connection to metadata repository * @return Deserialized form of the job that is present in the repository */ - MJob findJob(long id, Connection conn); + MJob findJob(long jobId, Connection conn); /** * Get all job objects. diff --git a/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java b/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java index ca0c5a15..ff53b130 100644 --- a/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java +++ b/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java @@ -100,6 +100,12 @@ public enum RepositoryError implements ErrorCode { /** Invalid job id **/ JDBCREPO_0020("Given job id is invalid"), + /** Connection ID is in use **/ + JDBCREPO_0021("Given connection id is in use"), + + /** Job ID is in use **/ + JDBCREPO_0022("Given job id is in use"), + ; private final String message; diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java index 00bc926c..94119b1f 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java @@ -143,6 +143,9 @@ public enum DerbyRepoError implements ErrorCode { /** We can't restore job metadata from metastore **/ DERBYREPO_0031("Unable to load job metadata from repository"), + /** Can't verify if connection is referenced from somewhere **/ + DERBYREPO_0032("Unable to check if connection is in use"), + ; private final String message; diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java index c72a571e..0ce8832f 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java @@ -501,6 +501,29 @@ public boolean existsConnection(long id, Connection conn) { } } + @Override + public boolean inUseConnection(long connectionId, Connection conn) { + PreparedStatement stmt = null; + ResultSet rs = null; + + try { + stmt = conn.prepareStatement(STMT_SELECT_JOBS_FOR_CONNECTION_CHECK); + stmt.setLong(1, connectionId); + rs = stmt.executeQuery(); + + // Should be always valid in case of count(*) query + rs.next(); + + return rs.getLong(1) != 0; + + } catch (SQLException e) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0032, e); + } finally { + closeResultSets(rs); + closeStatements(stmt); + } + } + /** * {@inheritDoc} */ @@ -676,6 +699,13 @@ public boolean existsJob(long id, Connection conn) { } } + @Override + public boolean inUseJob(long jobId, Connection conn) { + // TODO(jarcec): This method will need to be upgraded once submission + // engine will be in place as we can't remove "running" job. + return false; + } + /** * {@inheritDoc} */ diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java index a895cfe3..cddace7f 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java @@ -329,6 +329,12 @@ public final class DerbySchemaQuery { public static final String STMT_SELECT_JOB_CHECK = "SELECT count(*) FROM " + TABLE_SQ_JOB + " WHERE " + COLUMN_SQB_ID + " = ?"; + // DML: Check if there are jobs for given connection + public static final String STMT_SELECT_JOBS_FOR_CONNECTION_CHECK = + "SELECT count(*) FROM " + TABLE_SQ_JOB + " JOIN " + TABLE_SQ_CONNECTION + + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID + " WHERE " + + COLUMN_SQN_ID + " = ? "; + // DML: Select one specific job public static final String STMT_SELECT_JOB_SINGLE = "SELECT " + COLUMN_SQN_CONNECTOR + ", " + COLUMN_SQB_ID + ", "