5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-06 11:09:26 +08:00

SQOOP-630: Check if in-use connection/job before deletion

(Jarek Jarcec Cecho)
This commit is contained in:
Bilung Lee 2012-10-17 09:42:06 -07:00
parent 5807db8c3e
commit 6cd9e96888
6 changed files with 85 additions and 12 deletions

View File

@ -206,6 +206,10 @@ public Object doIt(Connection conn) {
throw new SqoopException(RepositoryError.JDBCREPO_0017,
"Invalid id: " + connectionId);
}
if(handler.inUseConnection(connectionId, conn)) {
throw new SqoopException(RepositoryError.JDBCREPO_0021,
"Id in use: " + connectionId);
}
handler.deleteConnection(connectionId, conn);
return null;
@ -292,6 +296,10 @@ public Object doIt(Connection conn) {
throw new SqoopException(RepositoryError.JDBCREPO_0020,
"Invalid id: " + id);
}
if(handler.inUseJob(id, conn)) {
throw new SqoopException(RepositoryError.JDBCREPO_0022,
"Id in use: " + id);
}
handler.deleteJob(id, conn);
return null;

View File

@ -130,28 +130,38 @@ public interface JdbcRepositoryHandler {
/**
* Check if given connection exists in metastore.
*
* @param id Connection id
* @param connetionId Connection id
* @param conn Connection to metadata repository
* @return True if the connection exists
*/
boolean existsConnection(long id, Connection conn);
boolean existsConnection(long connetionId, Connection conn);
/**
* Check if given Connection id is referenced somewhere and thus can't
* be removed.
*
* @param connectionId Connection id
* @param conn Connection to metadata repository
* @return
*/
boolean inUseConnection(long connectionId, Connection conn);
/**
* Delete connection with given id from metadata repository.
*
* @param id Connection object that should be removed from repository
* @param connectionId Connection object that should be removed from repository
* @param conn Connection to metadata repository
*/
void deleteConnection(long id, Connection conn);
void deleteConnection(long connectionId, Connection conn);
/**
* Find connection with given id in repository.
*
* @param id Connection id
* @param connectionId Connection id
* @param conn Connection to metadata repository
* @return Deserialized form of the connection that is saved in repository
*/
MConnection findConnection(long id, Connection conn);
MConnection findConnection(long connectionId, Connection conn);
/**
* Get all connection objects.
@ -184,28 +194,38 @@ public interface JdbcRepositoryHandler {
/**
* Check if given job exists in metastore.
*
* @param id Job id
* @param jobId Job id
* @param conn Connection to metadata repository
* @return True if the job exists
*/
boolean existsJob(long id, Connection conn);
boolean existsJob(long jobId, Connection conn);
/**
* Check if given job id is referenced somewhere and thus can't
* be removed.
*
* @param jobId Job id
* @param conn Connection to metadata repository
* @return
*/
boolean inUseJob(long jobId, Connection conn);
/**
* Delete job with given id from metadata repository.
*
* @param id Job object that should be removed from repository
* @param jobId Job object that should be removed from repository
* @param conn Connection to metadata repository
*/
void deleteJob(long id, Connection conn);
void deleteJob(long jobId, Connection conn);
/**
* Find job with given id in repository.
*
* @param id Job id
* @param jobId Job id
* @param conn Connection to metadata repository
* @return Deserialized form of the job that is present in the repository
*/
MJob findJob(long id, Connection conn);
MJob findJob(long jobId, Connection conn);
/**
* Get all job objects.

View File

@ -100,6 +100,12 @@ public enum RepositoryError implements ErrorCode {
/** Invalid job id **/
JDBCREPO_0020("Given job id is invalid"),
/** Connection ID is in use **/
JDBCREPO_0021("Given connection id is in use"),
/** Job ID is in use **/
JDBCREPO_0022("Given job id is in use"),
;
private final String message;

View File

@ -143,6 +143,9 @@ public enum DerbyRepoError implements ErrorCode {
/** We can't restore job metadata from metastore **/
DERBYREPO_0031("Unable to load job metadata from repository"),
/** Can't verify if connection is referenced from somewhere **/
DERBYREPO_0032("Unable to check if connection is in use"),
;
private final String message;

View File

@ -501,6 +501,29 @@ public boolean existsConnection(long id, Connection conn) {
}
}
@Override
public boolean inUseConnection(long connectionId, Connection conn) {
PreparedStatement stmt = null;
ResultSet rs = null;
try {
stmt = conn.prepareStatement(STMT_SELECT_JOBS_FOR_CONNECTION_CHECK);
stmt.setLong(1, connectionId);
rs = stmt.executeQuery();
// Should be always valid in case of count(*) query
rs.next();
return rs.getLong(1) != 0;
} catch (SQLException e) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0032, e);
} finally {
closeResultSets(rs);
closeStatements(stmt);
}
}
/**
* {@inheritDoc}
*/
@ -676,6 +699,13 @@ public boolean existsJob(long id, Connection conn) {
}
}
@Override
public boolean inUseJob(long jobId, Connection conn) {
// TODO(jarcec): This method will need to be upgraded once submission
// engine will be in place as we can't remove "running" job.
return false;
}
/**
* {@inheritDoc}
*/

View File

@ -329,6 +329,12 @@ public final class DerbySchemaQuery {
public static final String STMT_SELECT_JOB_CHECK =
"SELECT count(*) FROM " + TABLE_SQ_JOB + " WHERE " + COLUMN_SQB_ID + " = ?";
// DML: Check if there are jobs for given connection
public static final String STMT_SELECT_JOBS_FOR_CONNECTION_CHECK =
"SELECT count(*) FROM " + TABLE_SQ_JOB + " JOIN " + TABLE_SQ_CONNECTION
+ " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID + " WHERE "
+ COLUMN_SQN_ID + " = ? ";
// DML: Select one specific job
public static final String STMT_SELECT_JOB_SINGLE =
"SELECT " + COLUMN_SQN_CONNECTOR + ", " + COLUMN_SQB_ID + ", "