5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-06 21:23:03 +08:00

SQOOP-667 Persist all properties of MSubmission metadata

(Jarek Jarcec Cecho)
This commit is contained in:
Bilung Lee 2012-11-26 08:34:00 -08:00
parent a348d5bb66
commit 80a11f972c
6 changed files with 606 additions and 96 deletions

View File

@ -44,4 +44,8 @@ public CounterGroup getCounterGroup(String name) {
public Iterator<CounterGroup> iterator() {
return groups.values().iterator();
}
public boolean isEmpty() {
return groups.isEmpty();
}
}

View File

@ -57,6 +57,9 @@
import org.apache.sqoop.repository.JdbcRepositoryHandler;
import org.apache.sqoop.repository.JdbcRepositoryTransactionFactory;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
import org.apache.sqoop.submission.counter.Counters;
import org.apache.sqoop.utils.StringUtils;
/**
@ -200,6 +203,9 @@ public void createSchema() {
runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT);
runQuery(QUERY_CREATE_TABLE_SQ_JOB_INPUT);
runQuery(QUERY_CREATE_TABLE_SQ_SUBMISSION);
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP);
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER);
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION);
}
/**
@ -796,7 +802,11 @@ public void createSubmission(MSubmission submission, Connection conn) {
stmt.setLong(1, submission.getJobId());
stmt.setString(2, submission.getStatus().name());
stmt.setTimestamp(3, new Timestamp(submission.getCreationDate().getTime()));
stmt.setString(4, submission.getExternalId());
stmt.setTimestamp(4, new Timestamp(submission.getLastUpdateDate().getTime()));
stmt.setString(5, submission.getExternalId());
stmt.setString(6, submission.getExternalLink());
stmt.setString(7, submission.getExceptionInfo());
stmt.setString(8, submission.getExceptionStackTrace());
result = stmt.executeUpdate();
if (result != 1) {
@ -811,6 +821,12 @@ public void createSubmission(MSubmission submission, Connection conn) {
}
long submissionId = rsetSubmissionId.getLong(1);
if(submission.getCounters() != null) {
createSubmissionCounters(submissionId, submission.getCounters(), conn);
}
// Save created persistence id
submission.setPersistenceId(submissionId);
} catch (SQLException ex) {
@ -850,20 +866,32 @@ public boolean existsSubmission(long submissionId, Connection conn) {
@Override
public void updateSubmission(MSubmission submission, Connection conn) {
PreparedStatement stmt = null;
PreparedStatement deleteStmt = null;
try {
// Update properties in main table
stmt = conn.prepareStatement(STMT_UPDATE_SUBMISSION);
stmt.setLong(1, submission.getJobId());
stmt.setString(2, submission.getStatus().name());
stmt.setTimestamp(3, new Timestamp(submission.getCreationDate().getTime()));
stmt.setString(4, submission.getExternalId());
stmt.setString(1, submission.getStatus().name());
stmt.setTimestamp(2, new Timestamp(submission.getLastUpdateDate().getTime()));
stmt.setString(3, submission.getExceptionInfo());
stmt.setString(4, submission.getExceptionStackTrace());
stmt.setLong(5, submission.getPersistenceId());
stmt.executeUpdate();
// Delete previous counters
deleteStmt = conn.prepareStatement(STMT_DELETE_COUNTER_SUBMISSION);
deleteStmt.setLong(1, submission.getPersistenceId());
deleteStmt.executeUpdate();
// Reinsert new counters if needed
if(submission.getCounters() != null) {
createSubmissionCounters(submission.getPersistenceId(), submission.getCounters(), conn);
}
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0035, ex);
} finally {
closeStatements(stmt);
closeStatements(stmt, deleteStmt);
}
}
@ -901,7 +929,7 @@ public List<MSubmission> findSubmissionsUnfinished(Connection conn) {
rs = stmt.executeQuery();
while(rs.next()) {
submissions.add(loadSubmission(rs));
submissions.add(loadSubmission(rs, conn));
}
rs.close();
@ -934,7 +962,7 @@ public MSubmission findSubmissionLastForJob(long jobId, Connection conn) {
return null;
}
return loadSubmission(rs);
return loadSubmission(rs, conn);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0037, ex);
} finally {
@ -944,20 +972,182 @@ public MSubmission findSubmissionLastForJob(long jobId, Connection conn) {
}
/**
* {@inheritDoc}
* Stores counters for given submission in repository.
*
* @param submissionId Submission id
* @param counters Counters that should be stored
* @param conn Connection to derby repository
* @throws SQLException
*/
private MSubmission loadSubmission(ResultSet rs) throws SQLException {
MSubmission submission = new MSubmission(
rs.getLong(2),
rs.getTimestamp(3),
SubmissionStatus.valueOf(rs.getString(4)),
rs.getString(5)
);
private void createSubmissionCounters(long submissionId, Counters counters, Connection conn) throws SQLException {
PreparedStatement stmt = null;
try {
stmt = conn.prepareStatement(STMT_INSERT_COUNTER_SUBMISSION);
for(CounterGroup group : counters) {
long groupId = getCounterGroupId(group, conn);
for(Counter counter: group) {
long counterId = getCounterId(counter, conn);
stmt.setLong(1, groupId);
stmt.setLong(2, counterId);
stmt.setLong(3, submissionId);
stmt.setLong(4, counter.getValue());
stmt.executeUpdate();
}
}
} finally {
closeStatements(stmt);
}
}
/**
* Resolves counter group database id.
*
* @param group Given group
* @param conn Connection to metastore
* @return Id
* @throws SQLException
*/
private long getCounterGroupId(CounterGroup group, Connection conn) throws SQLException {
PreparedStatement select = null;
PreparedStatement insert = null;
ResultSet rsSelect = null;
ResultSet rsInsert = null;
try {
select = conn.prepareStatement(STMT_SELECT_COUNTER_GROUP);
select.setString(1, group.getName());
rsSelect = select.executeQuery();
if(rsSelect.next()) {
return rsSelect.getLong(1);
}
insert = conn.prepareStatement(STMT_INSERT_COUNTER_GROUP, Statement.RETURN_GENERATED_KEYS);
insert.setString(1, group.getName());
insert.executeUpdate();
rsInsert = insert.getGeneratedKeys();
if (!rsInsert.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
}
return rsInsert.getLong(1);
} finally {
closeResultSets(rsSelect, rsInsert);
closeStatements(select, insert);
}
}
/**
* Resolves counter id.
*
* @param counter Given counter
* @param conn connection to metastore
* @return Id
* @throws SQLException
*/
private long getCounterId(Counter counter, Connection conn) throws SQLException {
PreparedStatement select = null;
PreparedStatement insert = null;
ResultSet rsSelect = null;
ResultSet rsInsert = null;
try {
select = conn.prepareStatement(STMT_SELECT_COUNTER);
select.setString(1, counter.getName());
rsSelect = select.executeQuery();
if(rsSelect.next()) {
return rsSelect.getLong(1);
}
insert = conn.prepareStatement(STMT_INSERT_COUNTER, Statement.RETURN_GENERATED_KEYS);
insert.setString(1, counter.getName());
insert.executeUpdate();
rsInsert = insert.getGeneratedKeys();
if (!rsInsert.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
}
return rsInsert.getLong(1);
} finally {
closeResultSets(rsSelect, rsInsert);
closeStatements(select, insert);
}
}
/**
* Create MSubmission structure from result set.
*
* @param rs Result set, only active row will be fetched
* @param conn Connection to metastore
* @return Created MSubmission structure
* @throws SQLException
*/
private MSubmission loadSubmission(ResultSet rs, Connection conn) throws SQLException {
MSubmission submission = new MSubmission();
submission.setPersistenceId(rs.getLong(1));
submission.setJobId(rs.getLong(2));
submission.setStatus(SubmissionStatus.valueOf(rs.getString(3)));
submission.setCreationDate(rs.getTimestamp(4));
submission.setLastUpdateDate(rs.getTimestamp(5));
submission.setExternalId(rs.getString(6));
submission.setExternalLink(rs.getString(7));
submission.setExceptionInfo(rs.getString(8));
submission.setExceptionStackTrace(rs.getString(9));
Counters counters = loadCountersSubmission(rs.getLong(1), conn);
submission.setCounters(counters);
return submission;
}
private Counters loadCountersSubmission(long submissionId, Connection conn) throws SQLException {
PreparedStatement stmt = null;
ResultSet rs = null;
try {
stmt = conn.prepareStatement(STMT_SELECT_COUNTER_SUBMISSION);
stmt.setLong(1, submissionId);
rs = stmt.executeQuery();
Counters counters = new Counters();
while(rs.next()) {
String groupName = rs.getString(1);
String counterName = rs.getString(2);
long value = rs.getLong(3);
CounterGroup group = counters.getCounterGroup(groupName);
if(group == null) {
group = new CounterGroup(groupName);
counters.addCounterGroup(group);
}
group.addCounter(new Counter(counterName, value));
}
if(counters.isEmpty()) {
return null;
} else {
return counters;
}
} finally {
closeStatements(stmt);
closeResultSets(rs);
}
}
private List<MConnection> loadConnections(PreparedStatement stmt,
Connection conn)
throws SQLException {

View File

@ -146,12 +146,60 @@ public final class DerbySchemaConstants {
public static final String COLUMN_SQS_JOB = "SQS_JOB";
public static final String COLUMN_SQS_DATE = "SQS_DATE";
public static final String COLUMN_SQS_STATUS = "SQS_STATUS";
public static final String COLUMN_SQS_CREATION_DATE = "SQS_CREATION_DATE";
public static final String COLUMN_SQS_UPDATE_DATE = "SQS_UPDATE_DATE";
public static final String COLUMN_SQS_EXTERNAL_ID = "SQS_EXTERNAL_ID";
public static final String COLUMN_SQS_EXTERNAL_LINK = "SQS_EXTERNAL_LINK";
public static final String COLUMN_SQS_EXCEPTION = "SQS_EXCEPTION";
public static final String COLUMN_SQS_EXCEPTION_TRACE = "SQS_EXCEPTION_TRACE";
// SQ_COUNTER_GROUP
public static final String TABLE_SQ_COUNTER_GROUP_NAME =
"SQ_COUNTER_GROUP";
public static final String TABLE_SQ_COUNTER_GROUP = SCHEMA_PREFIX
+ TABLE_SQ_COUNTER_GROUP_NAME;
public static final String COLUMN_SQG_ID = "SQG_ID";
public static final String COLUMN_SQG_NAME = "SQG_NAME";
// SQ_COUNTER_GROUP
public static final String TABLE_SQ_COUNTER_NAME =
"SQ_COUNTER";
public static final String TABLE_SQ_COUNTER = SCHEMA_PREFIX
+ TABLE_SQ_COUNTER_NAME;
public static final String COLUMN_SQR_ID = "SQR_ID";
public static final String COLUMN_SQR_NAME = "SQR_NAME";
// SQ_COUNTER_SUBMISSION
public static final String TABLE_SQ_COUNTER_SUBMISSION_NAME =
"SQ_COUNTER_SUBMISSION";
public static final String TABLE_SQ_COUNTER_SUBMISSION = SCHEMA_PREFIX
+ TABLE_SQ_COUNTER_SUBMISSION_NAME;
public static final String COLUMN_SQRS_GROUP = "SQRS_GROUP";
public static final String COLUMN_SQRS_COUNTER = "SQRS_COUNTER";
public static final String COLUMN_SQRS_SUBMISSION = "SQRS_SUBMISSION";
public static final String COLUMN_SQRS_VALUE = "SQRS_VALUE";
private DerbySchemaConstants() {
// Disable explicit object creation
}

View File

@ -119,14 +119,53 @@
* <p>
* <strong>SQ_SUBMISSION</strong>: List of submissions
* <pre>
* +----------------------------+
* +-----------------------------------+
* | SQ_JOB_SUBMISSION |
* +----------------------------+
* +-----------------------------------+
* | SQS_ID: BIGINT PK |
* | SQS_JOB: BIGINT | FK SQ_JOB(SQB_ID)
* | SQS_STATUS: VARCHAR(20) |
* | SQS_DATE: TIMESTAMP |
* | SQS_EXTERNAL_ID:VARCHAR(50)|
* | SQS_CREATION_DATE: TIMESTAMP |
* | SQS_UPDATE_DATE: TIMESTAMP |
* | SQS_EXTERNAL_ID: VARCHAR(25) |
* | SQS_EXTERNAL_LINK: VARCHAR(75) |
* | SQS_EXCEPTION: VARCHAR(75) |
* | SQS_EXCEPTION_TRACE: VARCHAR(500) |
* +-----------------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_COUNTER_GROUP</strong>: List of counter groups
* <pre>
* +----------------------------+
* | SQ_COUNTER_GROUP |
* +----------------------------+
* | SQG_ID: BIGINT PK |
* | SQG_NAME: VARCHAR(50) |
* +----------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_COUNTER</strong>: List of counters
* <pre>
* +----------------------------+
* | SQ_COUNTER |
* +----------------------------+
* | SQR_ID: BIGINT PK |
* | SQR_NAME: VARCHAR(50) |
* +----------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_COUNTER_SUBMISSION</strong>: N:M Relationship
* <pre>
* +----------------------------+
* | SQ_COUNTER_SUBMISSION |
* +----------------------------+
* | SQRS_GROUP: BIGINT PK | FK SQ_COUNTER_GROUP(SQR_ID)
* | SQRS_COUNTER: BIGINT PK | FK SQ_COUNTER(SQR_ID)
* | SQRS_SUBMISSION: BIGINT PK | FK SQ_SUBMISSION(SQS_ID)
* | SQRS_VALUE: BIGINT |
* +----------------------------+
* </pre>
* </p>
@ -143,69 +182,77 @@ public final class DerbySchemaQuery {
// DDL: Create table SQ_CONNECTOR
public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR =
"CREATE TABLE " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID
+ " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) "
+ "PRIMARY KEY, " + COLUMN_SQC_NAME + " VARCHAR(64), " + COLUMN_SQC_CLASS
+ " VARCHAR(255))";
"CREATE TABLE " + TABLE_SQ_CONNECTOR + " ("
+ COLUMN_SQC_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+ COLUMN_SQC_NAME + " VARCHAR(64), "
+ COLUMN_SQC_CLASS + " VARCHAR(255)"
+ ")";
// DDL: Create table SQ_FORM
public static final String QUERY_CREATE_TABLE_SQ_FORM =
"CREATE TABLE " + TABLE_SQ_FORM + " (" + COLUMN_SQF_ID
+ " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) "
+ "PRIMARY KEY, " + COLUMN_SQF_CONNECTOR + " BIGINT, "
"CREATE TABLE " + TABLE_SQ_FORM + " ("
+ COLUMN_SQF_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+ COLUMN_SQF_CONNECTOR + " BIGINT, "
+ COLUMN_SQF_OPERATION + " VARCHAR(32), "
+ COLUMN_SQF_NAME + " VARCHAR(64), " + COLUMN_SQF_TYPE + " VARCHAR(32), "
+ COLUMN_SQF_INDEX + " SMALLINT, " + " FOREIGN KEY ("
+ COLUMN_SQF_CONNECTOR+ ") REFERENCES " + TABLE_SQ_CONNECTOR + " ("
+ COLUMN_SQC_ID + "))";
+ COLUMN_SQF_NAME + " VARCHAR(64), "
+ COLUMN_SQF_TYPE + " VARCHAR(32), "
+ COLUMN_SQF_INDEX + " SMALLINT, "
+ " FOREIGN KEY (" + COLUMN_SQF_CONNECTOR+ ") REFERENCES " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + ")"
+ ")";
// DDL: Create table SQ_INPUT
public static final String QUERY_CREATE_TABLE_SQ_INPUT =
"CREATE TABLE " + TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID
+ " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) "
+ "PRIMARY KEY, " + COLUMN_SQI_NAME + " VARCHAR(64), "
+ COLUMN_SQI_FORM + " BIGINT, " + COLUMN_SQI_INDEX + " SMALLINT, "
+ COLUMN_SQI_TYPE + " VARCHAR(32), " + COLUMN_SQI_STRMASK + " BOOLEAN, "
+ COLUMN_SQI_STRLENGTH + " SMALLINT, " + COLUMN_SQI_ENUMVALS
+ " VARCHAR(100), FOREIGN KEY (" + COLUMN_SQI_FORM + ") REFERENCES "
+ TABLE_SQ_FORM + " (" + COLUMN_SQF_ID + "))";
"CREATE TABLE " + TABLE_SQ_INPUT + " ("
+ COLUMN_SQI_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+ COLUMN_SQI_NAME + " VARCHAR(64), "
+ COLUMN_SQI_FORM + " BIGINT, "
+ COLUMN_SQI_INDEX + " SMALLINT, "
+ COLUMN_SQI_TYPE + " VARCHAR(32), "
+ COLUMN_SQI_STRMASK + " BOOLEAN, "
+ COLUMN_SQI_STRLENGTH + " SMALLINT, "
+ COLUMN_SQI_ENUMVALS + " VARCHAR(100),"
+ " FOREIGN KEY (" + COLUMN_SQI_FORM + ") REFERENCES " + TABLE_SQ_FORM + " (" + COLUMN_SQF_ID + ")"
+ ")";
// DDL: Create table SQ_CONNECTION
public static final String QUERY_CREATE_TABLE_SQ_CONNECTION =
"CREATE TABLE " + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID
+ " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) "
+ "PRIMARY KEY, " + COLUMN_SQN_CONNECTOR + " BIGINT, " + COLUMN_SQN_NAME
+ " VARCHAR(32), FOREIGN KEY(" + COLUMN_SQN_CONNECTOR + ") REFERENCES "
+ TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + "))";
"CREATE TABLE " + TABLE_SQ_CONNECTION + " ("
+ COLUMN_SQN_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+ COLUMN_SQN_CONNECTOR + " BIGINT, "
+ COLUMN_SQN_NAME + " VARCHAR(32),"
+ " FOREIGN KEY(" + COLUMN_SQN_CONNECTOR + ") REFERENCES " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + ")"
+ ")";
// DDL: Create table SQ_JOB
public static final String QUERY_CREATE_TABLE_SQ_JOB =
"CREATE TABLE " + TABLE_SQ_JOB + " (" + COLUMN_SQB_ID
+ " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) "
+ "PRIMARY KEY, " + COLUMN_SQB_CONNECTION + " BIGINT, " + COLUMN_SQB_NAME
+ " VARCHAR(64), " + COLUMN_SQB_TYPE + " VARCHAR(64), FOREIGN KEY("
+ COLUMN_SQB_CONNECTION + ") REFERENCES " + TABLE_SQ_CONNECTION + " ("
+ COLUMN_SQN_ID + "))";
"CREATE TABLE " + TABLE_SQ_JOB + " ("
+ COLUMN_SQB_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+ COLUMN_SQB_CONNECTION + " BIGINT, "
+ COLUMN_SQB_NAME + " VARCHAR(64), "
+ COLUMN_SQB_TYPE + " VARCHAR(64),"
+ " FOREIGN KEY(" + COLUMN_SQB_CONNECTION + ") REFERENCES " + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + ")"
+ ")";
// DDL: Create table SQ_CONNECTION_INPUT
public static final String QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT =
"CREATE TABLE " + TABLE_SQ_CONNECTION_INPUT + " ("
+ COLUMN_SQNI_CONNECTION + " BIGINT, " + COLUMN_SQNI_INPUT + " BIGINT, "
+ COLUMN_SQNI_VALUE + " LONG VARCHAR, PRIMARY KEY ("
+ COLUMN_SQNI_CONNECTION + ", " + COLUMN_SQNI_INPUT + "), FOREIGN KEY ("
+ COLUMN_SQNI_CONNECTION + ") REFERENCES " + TABLE_SQ_CONNECTION + " ("
+ COLUMN_SQN_ID + "), FOREIGN KEY (" + COLUMN_SQNI_INPUT + ") REFERENCES "
+ TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID + "))";
+ COLUMN_SQNI_CONNECTION + " BIGINT, "
+ COLUMN_SQNI_INPUT + " BIGINT, "
+ COLUMN_SQNI_VALUE + " LONG VARCHAR,"
+ " PRIMARY KEY (" + COLUMN_SQNI_CONNECTION + ", " + COLUMN_SQNI_INPUT + "),"
+ " FOREIGN KEY (" + COLUMN_SQNI_CONNECTION + ") REFERENCES " + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + "),"
+ " FOREIGN KEY (" + COLUMN_SQNI_INPUT + ") REFERENCES " + TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID + ")"
+ ")";
// DDL: Create table SQ_JOB_INPUT
public static final String QUERY_CREATE_TABLE_SQ_JOB_INPUT =
"CREATE TABLE " + TABLE_SQ_JOB_INPUT + " ("
+ COLUMN_SQBI_JOB + " BIGINT, " + COLUMN_SQBI_INPUT + " BIGINT, "
+ COLUMN_SQBI_VALUE + " LONG VARCHAR, PRIMARY KEY ("
+ COLUMN_SQBI_JOB + ", " + COLUMN_SQBI_INPUT + "), FOREIGN KEY ("
+ COLUMN_SQBI_JOB + ") REFERENCES " + TABLE_SQ_JOB + " ("
+ COLUMN_SQB_ID + "), FOREIGN KEY (" + COLUMN_SQBI_INPUT + ") REFERENCES "
+ TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID + "))";
+ COLUMN_SQBI_JOB + " BIGINT, "
+ COLUMN_SQBI_INPUT + " BIGINT, "
+ COLUMN_SQBI_VALUE + " LONG VARCHAR,"
+ " PRIMARY KEY (" + COLUMN_SQBI_JOB + ", " + COLUMN_SQBI_INPUT + "), "
+ " FOREIGN KEY (" + COLUMN_SQBI_JOB + ") REFERENCES " + TABLE_SQ_JOB + " (" + COLUMN_SQB_ID + "), "
+ " FOREIGN KEY (" + COLUMN_SQBI_INPUT + ") REFERENCES " + TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID + "))";
// DDL: Create table SQ_SUBMISSION
public static final String QUERY_CREATE_TABLE_SQ_SUBMISSION =
@ -213,11 +260,46 @@ public final class DerbySchemaQuery {
+ COLUMN_SQS_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+ COLUMN_SQS_JOB + " BIGINT, "
+ COLUMN_SQS_STATUS + " VARCHAR(20), "
+ COLUMN_SQS_DATE + " TIMESTAMP,"
+ COLUMN_SQS_EXTERNAL_ID + " VARCHAR(50), "
+ COLUMN_SQS_CREATION_DATE + " TIMESTAMP,"
+ COLUMN_SQS_UPDATE_DATE + " TIMESTAMP,"
+ COLUMN_SQS_EXTERNAL_ID + " VARCHAR(25), "
+ COLUMN_SQS_EXTERNAL_LINK + " VARCHAR(75), "
+ COLUMN_SQS_EXCEPTION + " VARCHAR(75), "
+ COLUMN_SQS_EXCEPTION_TRACE + " VARCHAR(500), "
+ "PRIMARY KEY (" + COLUMN_SQS_ID + "), "
+ "FOREIGN KEY (" + COLUMN_SQS_JOB + ") REFERENCES " + TABLE_SQ_JOB + "("
+ COLUMN_SQB_ID + "))";
+ "FOREIGN KEY (" + COLUMN_SQS_JOB + ") REFERENCES " + TABLE_SQ_JOB + "(" + COLUMN_SQB_ID + ") ON DELETE CASCADE"
+ ")";
// DDL: Create table SQ_COUNTER_GROUP
public static final String QUERY_CREATE_TABLE_SQ_COUNTER_GROUP =
"CREATE TABLE " + TABLE_SQ_COUNTER_GROUP + " ("
+ COLUMN_SQG_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+ COLUMN_SQG_NAME + " VARCHAR(50), "
+ "PRIMARY KEY (" + COLUMN_SQG_ID + "),"
+ "UNIQUE ( " + COLUMN_SQG_NAME + ")"
+ ")";
// DDL: Create table SQ_COUNTER
public static final String QUERY_CREATE_TABLE_SQ_COUNTER =
"CREATE TABLE " + TABLE_SQ_COUNTER + " ("
+ COLUMN_SQR_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+ COLUMN_SQR_NAME + " VARCHAR(50), "
+ "PRIMARY KEY (" + COLUMN_SQR_ID + "), "
+ "UNIQUE ( " + COLUMN_SQR_NAME + ")"
+ ")";
// DDL: Create table SQ_COUNTER_SUBMISSION
public static final String QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION =
"CREATE TABLE " + TABLE_SQ_COUNTER_SUBMISSION + " ("
+ COLUMN_SQRS_GROUP + " BIGINT, "
+ COLUMN_SQRS_COUNTER + " BIGINT, "
+ COLUMN_SQRS_SUBMISSION + " BIGINT, "
+ COLUMN_SQRS_VALUE + " BIGINT, "
+ "PRIMARY KEY (" + COLUMN_SQRS_GROUP + ", " + COLUMN_SQRS_COUNTER + ", " + COLUMN_SQRS_SUBMISSION + "), "
+ "FOREIGN KEY (" + COLUMN_SQRS_GROUP + ") REFERENCES " + TABLE_SQ_COUNTER_GROUP + "(" + COLUMN_SQG_ID + "), "
+ "FOREIGN KEY (" + COLUMN_SQRS_COUNTER + ") REFERENCES " + TABLE_SQ_COUNTER + "(" + COLUMN_SQR_ID + "), "
+ "FOREIGN KEY (" + COLUMN_SQRS_SUBMISSION + ") REFERENCES " + TABLE_SQ_SUBMISSION + "(" + COLUMN_SQS_ID + ") ON DELETE CASCADE "
+ ")";
// DML: Fetch connector Given Name
public static final String STMT_FETCH_BASE_CONNECTOR =
@ -384,17 +466,21 @@ public final class DerbySchemaQuery {
"INSERT INTO " + TABLE_SQ_SUBMISSION + "("
+ COLUMN_SQS_JOB + ", "
+ COLUMN_SQS_STATUS + ", "
+ COLUMN_SQS_DATE + ", "
+ COLUMN_SQS_EXTERNAL_ID + ") "
+ " VALUES(?, ?, ?, ?)";
+ COLUMN_SQS_CREATION_DATE + ", "
+ COLUMN_SQS_UPDATE_DATE + ", "
+ COLUMN_SQS_EXTERNAL_ID + ", "
+ COLUMN_SQS_EXTERNAL_LINK + ", "
+ COLUMN_SQS_EXCEPTION + ", "
+ COLUMN_SQS_EXCEPTION_TRACE + ") "
+ " VALUES(?, ?, ?, ?, ?, ?, ?, ?)";
// DML: Update existing submission
public static final String STMT_UPDATE_SUBMISSION =
"UPDATE " + TABLE_SQ_SUBMISSION + " SET "
+ COLUMN_SQS_JOB + " = ?, "
+ COLUMN_SQS_STATUS + " = ?, "
+ COLUMN_SQS_DATE + " = ?, "
+ COLUMN_SQS_EXTERNAL_ID + " = ? "
+ COLUMN_SQS_UPDATE_DATE + " = ?, "
+ COLUMN_SQS_EXCEPTION + " = ?, "
+ COLUMN_SQS_EXCEPTION_TRACE + " = ?"
+ " WHERE " + COLUMN_SQS_ID + " = ?";
// DML: Check if given submission exists
@ -404,20 +490,93 @@ public final class DerbySchemaQuery {
// DML: Purge old entries
public static final String STMT_PURGE_SUBMISSIONS =
"DELETE FROM " + TABLE_SQ_SUBMISSION + " WHERE " + COLUMN_SQS_DATE + " < ?";
"DELETE FROM " + TABLE_SQ_SUBMISSION
+ " WHERE " + COLUMN_SQS_UPDATE_DATE + " < ?";
// DML: Get unfinished
public static final String STMT_SELECT_SUBMISSION_UNFINISHED =
"SELECT " + COLUMN_SQS_ID + ", " + COLUMN_SQS_JOB + ", " + COLUMN_SQS_DATE
+ ", " + COLUMN_SQS_STATUS + ", " + COLUMN_SQS_EXTERNAL_ID + " FROM "
+ TABLE_SQ_SUBMISSION + " WHERE " + COLUMN_SQS_STATUS + " = ?";
"SELECT "
+ COLUMN_SQS_ID + ", "
+ COLUMN_SQS_JOB + ", "
+ COLUMN_SQS_STATUS + ", "
+ COLUMN_SQS_CREATION_DATE + ", "
+ COLUMN_SQS_UPDATE_DATE + ", "
+ COLUMN_SQS_EXTERNAL_ID + ", "
+ COLUMN_SQS_EXTERNAL_LINK + ", "
+ COLUMN_SQS_EXCEPTION + ", "
+ COLUMN_SQS_EXCEPTION_TRACE
+ " FROM " + TABLE_SQ_SUBMISSION
+ " WHERE " + COLUMN_SQS_STATUS + " = ?";
// DML: Last submission for a job
public static final String STMT_SELECT_SUBMISSION_LAST_FOR_JOB =
"SELECT " + COLUMN_SQS_ID + ", " + COLUMN_SQS_JOB + ", " + COLUMN_SQS_DATE
+ ", " + COLUMN_SQS_STATUS + ", " + COLUMN_SQS_EXTERNAL_ID + " FROM "
+ TABLE_SQ_SUBMISSION + " WHERE " + COLUMN_SQS_JOB + " = ? ORDER BY "
+ COLUMN_SQS_DATE + " DESC";
"SELECT "
+ COLUMN_SQS_ID + ", "
+ COLUMN_SQS_JOB + ", "
+ COLUMN_SQS_STATUS + ", "
+ COLUMN_SQS_CREATION_DATE + ", "
+ COLUMN_SQS_UPDATE_DATE + ", "
+ COLUMN_SQS_EXTERNAL_ID + ", "
+ COLUMN_SQS_EXTERNAL_LINK + ", "
+ COLUMN_SQS_EXCEPTION + ", "
+ COLUMN_SQS_EXCEPTION_TRACE
+ " FROM " + TABLE_SQ_SUBMISSION
+ " WHERE " + COLUMN_SQS_JOB + " = ?"
+ " ORDER BY " + COLUMN_SQS_UPDATE_DATE + " DESC";
// DML: Select counter group
public static final String STMT_SELECT_COUNTER_GROUP =
"SELECT "
+ COLUMN_SQG_ID + ", "
+ COLUMN_SQG_NAME + " "
+ "FROM " + TABLE_SQ_COUNTER_GROUP + " "
+ "WHERE " + COLUMN_SQG_NAME + " = ?";
// DML: Insert new counter group
public static final String STMT_INSERT_COUNTER_GROUP =
"INSERT INTO " + TABLE_SQ_COUNTER_GROUP + " ("
+ COLUMN_SQG_NAME + ") "
+ "VALUES (?)";
// DML: Select counter
public static final String STMT_SELECT_COUNTER =
"SELECT "
+ COLUMN_SQR_ID + ", "
+ COLUMN_SQR_NAME + " "
+ "FROM " + TABLE_SQ_COUNTER + " "
+ "WHERE " + COLUMN_SQR_NAME + " = ?";
// DML: Insert new counter
public static final String STMT_INSERT_COUNTER =
"INSERT INTO " + TABLE_SQ_COUNTER + " ("
+ COLUMN_SQR_NAME + ") "
+ "VALUES (?)";
// DML: Insert new counter submission
public static final String STMT_INSERT_COUNTER_SUBMISSION =
"INSERT INTO " + TABLE_SQ_COUNTER_SUBMISSION + " ("
+ COLUMN_SQRS_GROUP + ", "
+ COLUMN_SQRS_COUNTER + ", "
+ COLUMN_SQRS_SUBMISSION + ", "
+ COLUMN_SQRS_VALUE + ") "
+ "VALUES (?, ?, ?, ?)";
// DML: Select counter submission
public static final String STMT_SELECT_COUNTER_SUBMISSION =
"SELECT "
+ COLUMN_SQG_NAME + ", "
+ COLUMN_SQR_NAME + ", "
+ COLUMN_SQRS_VALUE + " "
+ "FROM " + TABLE_SQ_COUNTER_SUBMISSION + " "
+ "LEFT JOIN " + TABLE_SQ_COUNTER_GROUP + " ON " + COLUMN_SQRS_GROUP + " = " + COLUMN_SQG_ID + " "
+ "LEFT JOIN " + TABLE_SQ_COUNTER + " ON " + COLUMN_SQRS_COUNTER + " = " + COLUMN_SQR_ID + " "
+ "WHERE " + COLUMN_SQRS_SUBMISSION + " = ? ";
// DML: Delete rows from counter submission table
public static final String STMT_DELETE_COUNTER_SUBMISSION =
"DELETE FROM " + TABLE_SQ_COUNTER_SUBMISSION
+ " WHERE " + COLUMN_SQRS_SUBMISSION + " = ?";
private DerbySchemaQuery() {
// Disable explicit object creation

View File

@ -95,6 +95,9 @@ protected void createSchema() throws Exception {
runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT);
runQuery(QUERY_CREATE_TABLE_SQ_JOB_INPUT);
runQuery(QUERY_CREATE_TABLE_SQ_SUBMISSION);
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP);
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER);
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION);
}
/**
@ -253,14 +256,41 @@ public void loadJobs() throws Exception {
* @throws Exception
*/
public void loadSubmissions() throws Exception {
runQuery("INSERT INTO SQOOP.SQ_SUBMISSION"
+ "(SQS_JOB, SQS_STATUS, SQS_DATE, SQS_EXTERNAL_ID) VALUES "
+ "(1, 'RUNNING', '2012-01-01 01:01:01', 'job_1'),"
+ "(2, 'SUCCEEDED', '2012-01-02 01:01:01', 'job_2'),"
+ "(3, 'FAILED', '2012-01-03 01:01:01', 'job_3'),"
+ "(4, 'UNKNOWN', '2012-01-04 01:01:01', 'job_4'),"
+ "(1, 'RUNNING', '2012-01-05 01:01:01', 'job_5')"
runQuery("INSERT INTO SQOOP.SQ_COUNTER_GROUP "
+ "(SQG_NAME) "
+ "VALUES"
+ "('gA'), ('gB')"
);
runQuery("INSERT INTO SQOOP.SQ_COUNTER "
+ "(SQR_NAME) "
+ "VALUES"
+ "('cA'), ('cB')"
);
runQuery("INSERT INTO SQOOP.SQ_SUBMISSION"
+ "(SQS_JOB, SQS_STATUS, SQS_CREATION_DATE, SQS_UPDATE_DATE,"
+ " SQS_EXTERNAL_ID, SQS_EXTERNAL_LINK, SQS_EXCEPTION,"
+ " SQS_EXCEPTION_TRACE)"
+ "VALUES "
+ "(1, 'RUNNING', '2012-01-01 01:01:01', '2012-01-01 01:01:01', 'job_1',"
+ "NULL, NULL, NULL),"
+ "(2, 'SUCCEEDED', '2012-01-01 01:01:01', '2012-01-02 01:01:01', 'job_2',"
+ " NULL, NULL, NULL),"
+ "(3, 'FAILED', '2012-01-01 01:01:01', '2012-01-03 01:01:01', 'job_3',"
+ " NULL, NULL, NULL),"
+ "(4, 'UNKNOWN', '2012-01-01 01:01:01', '2012-01-04 01:01:01', 'job_4',"
+ " NULL, NULL, NULL),"
+ "(1, 'RUNNING', '2012-01-01 01:01:01', '2012-01-05 01:01:01', 'job_5',"
+ " NULL, NULL, NULL)"
);
runQuery("INSERT INTO SQOOP.SQ_COUNTER_SUBMISSION "
+ "(SQRS_GROUP, SQRS_COUNTER, SQRS_SUBMISSION, SQRS_VALUE) "
+ "VALUES"
+ "(1, 1, 4, 300)"
);
}
protected MConnector getConnector() {

View File

@ -19,6 +19,9 @@
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
import org.apache.sqoop.submission.counter.Counters;
import java.util.Calendar;
import java.util.Date;
@ -84,8 +87,29 @@ public void testExistsSubmission() throws Exception {
}
public void testCreateSubmission() throws Exception {
MSubmission submission =
new MSubmission(1, new Date(), SubmissionStatus.RUNNING, "job-x");
Date creationDate = new Date();
Date updateDate = new Date();
CounterGroup firstGroup = new CounterGroup("ga");
CounterGroup secondGroup = new CounterGroup("gb");
firstGroup.addCounter(new Counter("ca", 100));
firstGroup.addCounter(new Counter("cb", 200));
secondGroup.addCounter(new Counter("ca", 300));
secondGroup.addCounter(new Counter("cd", 400));
Counters counters = new Counters();
counters.addCounterGroup(firstGroup);
counters.addCounterGroup(secondGroup);
MSubmission submission = new MSubmission();
submission.setJobId(1);
submission.setStatus(SubmissionStatus.RUNNING);
submission.setCreationDate(creationDate);
submission.setLastUpdateDate(updateDate);
submission.setExternalId("job-x");
submission.setExternalLink("http://somewhere");
submission.setExceptionInfo("RuntimeException");
submission.setExceptionStackTrace("Yeah it happens");
submission.setCounters(counters);
handler.createSubmission(submission, getDerbyConnection());
@ -101,9 +125,41 @@ public void testCreateSubmission() throws Exception {
assertEquals(1, submission.getJobId());
assertEquals(SubmissionStatus.RUNNING, submission.getStatus());
assertEquals(creationDate, submission.getCreationDate());
assertEquals(updateDate, submission.getLastUpdateDate());
assertEquals("job-x", submission.getExternalId());
assertEquals("http://somewhere", submission.getExternalLink());
assertEquals("RuntimeException", submission.getExceptionInfo());
assertEquals("Yeah it happens", submission.getExceptionStackTrace());
// Let's create second connection
CounterGroup group;
Counter counter;
Counters retrievedCounters = submission.getCounters();
assertNotNull(retrievedCounters);
group = counters.getCounterGroup("ga");
assertNotNull(group);
counter = group.getCounter("ca");
assertNotNull(counter);
assertEquals(100, counter.getValue());
counter = group.getCounter("cb");
assertNotNull(counter);
assertEquals(200, counter.getValue());
group = counters.getCounterGroup("gb");
assertNotNull(group);
counter = group.getCounter("ca");
assertNotNull(counter);
assertEquals(300, counter.getValue());
counter = group.getCounter("cd");
assertNotNull(counter);
assertEquals(400, counter.getValue());
// Let's create second (simpler) connection
submission =
new MSubmission(1, new Date(), SubmissionStatus.SUCCEEDED, "job-x");
handler.createSubmission(submission, getDerbyConnection());
@ -163,4 +219,27 @@ public void testPurgeSubmissions() throws Exception {
assertEquals(0, submissions.size());
assertCountForTable("SQOOP.SQ_SUBMISSION", 0);
}
/**
* Test that by directly removing jobs we will also remove associated
* submissions and counters.
*
* @throws Exception
*/
public void testDeleteJobs() throws Exception {
loadSubmissions();
assertCountForTable("SQOOP.SQ_SUBMISSION", 5);
handler.deleteJob(1, getDerbyConnection());
assertCountForTable("SQOOP.SQ_SUBMISSION", 3);
handler.deleteJob(2, getDerbyConnection());
assertCountForTable("SQOOP.SQ_SUBMISSION", 2);
handler.deleteJob(3, getDerbyConnection());
assertCountForTable("SQOOP.SQ_SUBMISSION", 1);
handler.deleteJob(4, getDerbyConnection());
assertCountForTable("SQOOP.SQ_SUBMISSION", 0);
}
}