diff --git a/common/src/main/java/org/apache/sqoop/submission/counter/Counters.java b/common/src/main/java/org/apache/sqoop/submission/counter/Counters.java index 12c94643..92984190 100644 --- a/common/src/main/java/org/apache/sqoop/submission/counter/Counters.java +++ b/common/src/main/java/org/apache/sqoop/submission/counter/Counters.java @@ -44,4 +44,8 @@ public CounterGroup getCounterGroup(String name) { public Iterator iterator() { return groups.values().iterator(); } + + public boolean isEmpty() { + return groups.isEmpty(); + } } diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java index 90aef4ba..b9ba7466 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java @@ -57,6 +57,9 @@ import org.apache.sqoop.repository.JdbcRepositoryHandler; import org.apache.sqoop.repository.JdbcRepositoryTransactionFactory; import org.apache.sqoop.submission.SubmissionStatus; +import org.apache.sqoop.submission.counter.Counter; +import org.apache.sqoop.submission.counter.CounterGroup; +import org.apache.sqoop.submission.counter.Counters; import org.apache.sqoop.utils.StringUtils; /** @@ -200,6 +203,9 @@ public void createSchema() { runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT); runQuery(QUERY_CREATE_TABLE_SQ_JOB_INPUT); runQuery(QUERY_CREATE_TABLE_SQ_SUBMISSION); + runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP); + runQuery(QUERY_CREATE_TABLE_SQ_COUNTER); + runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION); } /** @@ -796,7 +802,11 @@ public void createSubmission(MSubmission submission, Connection conn) { stmt.setLong(1, submission.getJobId()); stmt.setString(2, submission.getStatus().name()); stmt.setTimestamp(3, new Timestamp(submission.getCreationDate().getTime())); - stmt.setString(4, submission.getExternalId()); + stmt.setTimestamp(4, new Timestamp(submission.getLastUpdateDate().getTime())); + stmt.setString(5, submission.getExternalId()); + stmt.setString(6, submission.getExternalLink()); + stmt.setString(7, submission.getExceptionInfo()); + stmt.setString(8, submission.getExceptionStackTrace()); result = stmt.executeUpdate(); if (result != 1) { @@ -811,6 +821,12 @@ public void createSubmission(MSubmission submission, Connection conn) { } long submissionId = rsetSubmissionId.getLong(1); + + if(submission.getCounters() != null) { + createSubmissionCounters(submissionId, submission.getCounters(), conn); + } + + // Save created persistence id submission.setPersistenceId(submissionId); } catch (SQLException ex) { @@ -850,20 +866,32 @@ public boolean existsSubmission(long submissionId, Connection conn) { @Override public void updateSubmission(MSubmission submission, Connection conn) { PreparedStatement stmt = null; + PreparedStatement deleteStmt = null; try { + // Update properties in main table stmt = conn.prepareStatement(STMT_UPDATE_SUBMISSION); - stmt.setLong(1, submission.getJobId()); - stmt.setString(2, submission.getStatus().name()); - stmt.setTimestamp(3, new Timestamp(submission.getCreationDate().getTime())); - stmt.setString(4, submission.getExternalId()); + stmt.setString(1, submission.getStatus().name()); + stmt.setTimestamp(2, new Timestamp(submission.getLastUpdateDate().getTime())); + stmt.setString(3, submission.getExceptionInfo()); + stmt.setString(4, submission.getExceptionStackTrace()); stmt.setLong(5, submission.getPersistenceId()); stmt.executeUpdate(); + // Delete previous counters + deleteStmt = conn.prepareStatement(STMT_DELETE_COUNTER_SUBMISSION); + deleteStmt.setLong(1, submission.getPersistenceId()); + deleteStmt.executeUpdate(); + + // Reinsert new counters if needed + if(submission.getCounters() != null) { + createSubmissionCounters(submission.getPersistenceId(), submission.getCounters(), conn); + } + } catch (SQLException ex) { throw new SqoopException(DerbyRepoError.DERBYREPO_0035, ex); } finally { - closeStatements(stmt); + closeStatements(stmt, deleteStmt); } } @@ -901,7 +929,7 @@ public List findSubmissionsUnfinished(Connection conn) { rs = stmt.executeQuery(); while(rs.next()) { - submissions.add(loadSubmission(rs)); + submissions.add(loadSubmission(rs, conn)); } rs.close(); @@ -934,7 +962,7 @@ public MSubmission findSubmissionLastForJob(long jobId, Connection conn) { return null; } - return loadSubmission(rs); + return loadSubmission(rs, conn); } catch (SQLException ex) { throw new SqoopException(DerbyRepoError.DERBYREPO_0037, ex); } finally { @@ -944,20 +972,182 @@ public MSubmission findSubmissionLastForJob(long jobId, Connection conn) { } /** - * {@inheritDoc} + * Stores counters for given submission in repository. + * + * @param submissionId Submission id + * @param counters Counters that should be stored + * @param conn Connection to derby repository + * @throws SQLException */ - private MSubmission loadSubmission(ResultSet rs) throws SQLException { - MSubmission submission = new MSubmission( - rs.getLong(2), - rs.getTimestamp(3), - SubmissionStatus.valueOf(rs.getString(4)), - rs.getString(5) - ); + private void createSubmissionCounters(long submissionId, Counters counters, Connection conn) throws SQLException { + PreparedStatement stmt = null; + + try { + stmt = conn.prepareStatement(STMT_INSERT_COUNTER_SUBMISSION); + + for(CounterGroup group : counters) { + long groupId = getCounterGroupId(group, conn); + + for(Counter counter: group) { + long counterId = getCounterId(counter, conn); + + stmt.setLong(1, groupId); + stmt.setLong(2, counterId); + stmt.setLong(3, submissionId); + stmt.setLong(4, counter.getValue()); + + stmt.executeUpdate(); + } + } + } finally { + closeStatements(stmt); + } + } + + /** + * Resolves counter group database id. + * + * @param group Given group + * @param conn Connection to metastore + * @return Id + * @throws SQLException + */ + private long getCounterGroupId(CounterGroup group, Connection conn) throws SQLException { + PreparedStatement select = null; + PreparedStatement insert = null; + ResultSet rsSelect = null; + ResultSet rsInsert = null; + + try { + select = conn.prepareStatement(STMT_SELECT_COUNTER_GROUP); + select.setString(1, group.getName()); + + rsSelect = select.executeQuery(); + + if(rsSelect.next()) { + return rsSelect.getLong(1); + } + + insert = conn.prepareStatement(STMT_INSERT_COUNTER_GROUP, Statement.RETURN_GENERATED_KEYS); + insert.setString(1, group.getName()); + insert.executeUpdate(); + + rsInsert = insert.getGeneratedKeys(); + + if (!rsInsert.next()) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0013); + } + + return rsInsert.getLong(1); + } finally { + closeResultSets(rsSelect, rsInsert); + closeStatements(select, insert); + } + } + + /** + * Resolves counter id. + * + * @param counter Given counter + * @param conn connection to metastore + * @return Id + * @throws SQLException + */ + private long getCounterId(Counter counter, Connection conn) throws SQLException { + PreparedStatement select = null; + PreparedStatement insert = null; + ResultSet rsSelect = null; + ResultSet rsInsert = null; + + try { + select = conn.prepareStatement(STMT_SELECT_COUNTER); + select.setString(1, counter.getName()); + + rsSelect = select.executeQuery(); + + if(rsSelect.next()) { + return rsSelect.getLong(1); + } + + insert = conn.prepareStatement(STMT_INSERT_COUNTER, Statement.RETURN_GENERATED_KEYS); + insert.setString(1, counter.getName()); + insert.executeUpdate(); + + rsInsert = insert.getGeneratedKeys(); + + if (!rsInsert.next()) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0013); + } + + return rsInsert.getLong(1); + } finally { + closeResultSets(rsSelect, rsInsert); + closeStatements(select, insert); + } + } + + /** + * Create MSubmission structure from result set. + * + * @param rs Result set, only active row will be fetched + * @param conn Connection to metastore + * @return Created MSubmission structure + * @throws SQLException + */ + private MSubmission loadSubmission(ResultSet rs, Connection conn) throws SQLException { + MSubmission submission = new MSubmission(); + submission.setPersistenceId(rs.getLong(1)); + submission.setJobId(rs.getLong(2)); + submission.setStatus(SubmissionStatus.valueOf(rs.getString(3))); + submission.setCreationDate(rs.getTimestamp(4)); + submission.setLastUpdateDate(rs.getTimestamp(5)); + submission.setExternalId(rs.getString(6)); + submission.setExternalLink(rs.getString(7)); + submission.setExceptionInfo(rs.getString(8)); + submission.setExceptionStackTrace(rs.getString(9)); + + Counters counters = loadCountersSubmission(rs.getLong(1), conn); + submission.setCounters(counters); return submission; } + private Counters loadCountersSubmission(long submissionId, Connection conn) throws SQLException { + PreparedStatement stmt = null; + ResultSet rs = null; + try { + stmt = conn.prepareStatement(STMT_SELECT_COUNTER_SUBMISSION); + stmt.setLong(1, submissionId); + rs = stmt.executeQuery(); + + Counters counters = new Counters(); + + while(rs.next()) { + String groupName = rs.getString(1); + String counterName = rs.getString(2); + long value = rs.getLong(3); + + CounterGroup group = counters.getCounterGroup(groupName); + if(group == null) { + group = new CounterGroup(groupName); + counters.addCounterGroup(group); + } + + group.addCounter(new Counter(counterName, value)); + } + + if(counters.isEmpty()) { + return null; + } else { + return counters; + } + } finally { + closeStatements(stmt); + closeResultSets(rs); + } + } + private List loadConnections(PreparedStatement stmt, Connection conn) throws SQLException { diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java index 3664df6e..1d1fc09c 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java @@ -146,12 +146,60 @@ public final class DerbySchemaConstants { public static final String COLUMN_SQS_JOB = "SQS_JOB"; - public static final String COLUMN_SQS_DATE = "SQS_DATE"; - public static final String COLUMN_SQS_STATUS = "SQS_STATUS"; + public static final String COLUMN_SQS_CREATION_DATE = "SQS_CREATION_DATE"; + + public static final String COLUMN_SQS_UPDATE_DATE = "SQS_UPDATE_DATE"; + public static final String COLUMN_SQS_EXTERNAL_ID = "SQS_EXTERNAL_ID"; + public static final String COLUMN_SQS_EXTERNAL_LINK = "SQS_EXTERNAL_LINK"; + + public static final String COLUMN_SQS_EXCEPTION = "SQS_EXCEPTION"; + + public static final String COLUMN_SQS_EXCEPTION_TRACE = "SQS_EXCEPTION_TRACE"; + + // SQ_COUNTER_GROUP + + public static final String TABLE_SQ_COUNTER_GROUP_NAME = + "SQ_COUNTER_GROUP"; + + public static final String TABLE_SQ_COUNTER_GROUP = SCHEMA_PREFIX + + TABLE_SQ_COUNTER_GROUP_NAME; + + public static final String COLUMN_SQG_ID = "SQG_ID"; + + public static final String COLUMN_SQG_NAME = "SQG_NAME"; + + // SQ_COUNTER_GROUP + + public static final String TABLE_SQ_COUNTER_NAME = + "SQ_COUNTER"; + + public static final String TABLE_SQ_COUNTER = SCHEMA_PREFIX + + TABLE_SQ_COUNTER_NAME; + + public static final String COLUMN_SQR_ID = "SQR_ID"; + + public static final String COLUMN_SQR_NAME = "SQR_NAME"; + + // SQ_COUNTER_SUBMISSION + + public static final String TABLE_SQ_COUNTER_SUBMISSION_NAME = + "SQ_COUNTER_SUBMISSION"; + + public static final String TABLE_SQ_COUNTER_SUBMISSION = SCHEMA_PREFIX + + TABLE_SQ_COUNTER_SUBMISSION_NAME; + + public static final String COLUMN_SQRS_GROUP = "SQRS_GROUP"; + + public static final String COLUMN_SQRS_COUNTER = "SQRS_COUNTER"; + + public static final String COLUMN_SQRS_SUBMISSION = "SQRS_SUBMISSION"; + + public static final String COLUMN_SQRS_VALUE = "SQRS_VALUE"; + private DerbySchemaConstants() { // Disable explicit object creation } diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java index 496e3d9e..55d7128b 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java @@ -119,14 +119,53 @@ *

* SQ_SUBMISSION: List of submissions *

+ *    +-----------------------------------+
+ *    | SQ_JOB_SUBMISSION                 |
+ *    +-----------------------------------+
+ *    | SQS_ID: BIGINT PK                 |
+ *    | SQS_JOB: BIGINT                   | FK SQ_JOB(SQB_ID)
+ *    | SQS_STATUS: VARCHAR(20)           |
+ *    | SQS_CREATION_DATE: TIMESTAMP      |
+ *    | SQS_UPDATE_DATE: TIMESTAMP        |
+ *    | SQS_EXTERNAL_ID: VARCHAR(25)      |
+ *    | SQS_EXTERNAL_LINK: VARCHAR(75)    |
+ *    | SQS_EXCEPTION: VARCHAR(75)        |
+ *    | SQS_EXCEPTION_TRACE: VARCHAR(500) |
+ *    +-----------------------------------+
+ * 
+ *

+ *

+ * SQ_COUNTER_GROUP: List of counter groups + *

  *    +----------------------------+
- *    | SQ_JOB_SUBMISSION          |
+ *    | SQ_COUNTER_GROUP           |
  *    +----------------------------+
- *    | SQS_ID: BIGINT PK          |
- *    | SQS_JOB: BIGINT            | FK SQ_JOB(SQB_ID)
- *    | SQS_STATUS: VARCHAR(20)    |
- *    | SQS_DATE: TIMESTAMP        |
- *    | SQS_EXTERNAL_ID:VARCHAR(50)|
+ *    | SQG_ID: BIGINT PK          |
+ *    | SQG_NAME: VARCHAR(50)      |
+ *    +----------------------------+
+ * 
+ *

+ *

+ * SQ_COUNTER: List of counters + *

+ *    +----------------------------+
+ *    | SQ_COUNTER                 |
+ *    +----------------------------+
+ *    | SQR_ID: BIGINT PK          |
+ *    | SQR_NAME: VARCHAR(50)      |
+ *    +----------------------------+
+ * 
+ *

+ *

+ * SQ_COUNTER_SUBMISSION: N:M Relationship + *

+ *    +----------------------------+
+ *    | SQ_COUNTER_SUBMISSION      |
+ *    +----------------------------+
+ *    | SQRS_GROUP: BIGINT PK      | FK SQ_COUNTER_GROUP(SQR_ID)
+ *    | SQRS_COUNTER: BIGINT PK    | FK SQ_COUNTER(SQR_ID)
+ *    | SQRS_SUBMISSION: BIGINT PK | FK SQ_SUBMISSION(SQS_ID)
+ *    | SQRS_VALUE: BIGINT         |
  *    +----------------------------+
  * 
*

@@ -143,69 +182,77 @@ public final class DerbySchemaQuery { // DDL: Create table SQ_CONNECTOR public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR = - "CREATE TABLE " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID - + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) " - + "PRIMARY KEY, " + COLUMN_SQC_NAME + " VARCHAR(64), " + COLUMN_SQC_CLASS - + " VARCHAR(255))"; + "CREATE TABLE " + TABLE_SQ_CONNECTOR + " (" + + COLUMN_SQC_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, " + + COLUMN_SQC_NAME + " VARCHAR(64), " + + COLUMN_SQC_CLASS + " VARCHAR(255)" + + ")"; // DDL: Create table SQ_FORM public static final String QUERY_CREATE_TABLE_SQ_FORM = - "CREATE TABLE " + TABLE_SQ_FORM + " (" + COLUMN_SQF_ID - + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) " - + "PRIMARY KEY, " + COLUMN_SQF_CONNECTOR + " BIGINT, " + "CREATE TABLE " + TABLE_SQ_FORM + " (" + + COLUMN_SQF_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, " + + COLUMN_SQF_CONNECTOR + " BIGINT, " + COLUMN_SQF_OPERATION + " VARCHAR(32), " - + COLUMN_SQF_NAME + " VARCHAR(64), " + COLUMN_SQF_TYPE + " VARCHAR(32), " - + COLUMN_SQF_INDEX + " SMALLINT, " + " FOREIGN KEY (" - + COLUMN_SQF_CONNECTOR+ ") REFERENCES " + TABLE_SQ_CONNECTOR + " (" - + COLUMN_SQC_ID + "))"; + + COLUMN_SQF_NAME + " VARCHAR(64), " + + COLUMN_SQF_TYPE + " VARCHAR(32), " + + COLUMN_SQF_INDEX + " SMALLINT, " + + " FOREIGN KEY (" + COLUMN_SQF_CONNECTOR+ ") REFERENCES " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + ")" + + ")"; // DDL: Create table SQ_INPUT public static final String QUERY_CREATE_TABLE_SQ_INPUT = - "CREATE TABLE " + TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID - + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) " - + "PRIMARY KEY, " + COLUMN_SQI_NAME + " VARCHAR(64), " - + COLUMN_SQI_FORM + " BIGINT, " + COLUMN_SQI_INDEX + " SMALLINT, " - + COLUMN_SQI_TYPE + " VARCHAR(32), " + COLUMN_SQI_STRMASK + " BOOLEAN, " - + COLUMN_SQI_STRLENGTH + " SMALLINT, " + COLUMN_SQI_ENUMVALS - + " VARCHAR(100), FOREIGN KEY (" + COLUMN_SQI_FORM + ") REFERENCES " - + TABLE_SQ_FORM + " (" + COLUMN_SQF_ID + "))"; + "CREATE TABLE " + TABLE_SQ_INPUT + " (" + + COLUMN_SQI_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, " + + COLUMN_SQI_NAME + " VARCHAR(64), " + + COLUMN_SQI_FORM + " BIGINT, " + + COLUMN_SQI_INDEX + " SMALLINT, " + + COLUMN_SQI_TYPE + " VARCHAR(32), " + + COLUMN_SQI_STRMASK + " BOOLEAN, " + + COLUMN_SQI_STRLENGTH + " SMALLINT, " + + COLUMN_SQI_ENUMVALS + " VARCHAR(100)," + + " FOREIGN KEY (" + COLUMN_SQI_FORM + ") REFERENCES " + TABLE_SQ_FORM + " (" + COLUMN_SQF_ID + ")" + + ")"; // DDL: Create table SQ_CONNECTION public static final String QUERY_CREATE_TABLE_SQ_CONNECTION = - "CREATE TABLE " + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID - + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) " - + "PRIMARY KEY, " + COLUMN_SQN_CONNECTOR + " BIGINT, " + COLUMN_SQN_NAME - + " VARCHAR(32), FOREIGN KEY(" + COLUMN_SQN_CONNECTOR + ") REFERENCES " - + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + "))"; + "CREATE TABLE " + TABLE_SQ_CONNECTION + " (" + + COLUMN_SQN_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, " + + COLUMN_SQN_CONNECTOR + " BIGINT, " + + COLUMN_SQN_NAME + " VARCHAR(32)," + + " FOREIGN KEY(" + COLUMN_SQN_CONNECTOR + ") REFERENCES " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + ")" + + ")"; // DDL: Create table SQ_JOB public static final String QUERY_CREATE_TABLE_SQ_JOB = - "CREATE TABLE " + TABLE_SQ_JOB + " (" + COLUMN_SQB_ID - + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) " - + "PRIMARY KEY, " + COLUMN_SQB_CONNECTION + " BIGINT, " + COLUMN_SQB_NAME - + " VARCHAR(64), " + COLUMN_SQB_TYPE + " VARCHAR(64), FOREIGN KEY(" - + COLUMN_SQB_CONNECTION + ") REFERENCES " + TABLE_SQ_CONNECTION + " (" - + COLUMN_SQN_ID + "))"; + "CREATE TABLE " + TABLE_SQ_JOB + " (" + + COLUMN_SQB_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, " + + COLUMN_SQB_CONNECTION + " BIGINT, " + + COLUMN_SQB_NAME + " VARCHAR(64), " + + COLUMN_SQB_TYPE + " VARCHAR(64)," + + " FOREIGN KEY(" + COLUMN_SQB_CONNECTION + ") REFERENCES " + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + ")" + + ")"; // DDL: Create table SQ_CONNECTION_INPUT public static final String QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT = "CREATE TABLE " + TABLE_SQ_CONNECTION_INPUT + " (" - + COLUMN_SQNI_CONNECTION + " BIGINT, " + COLUMN_SQNI_INPUT + " BIGINT, " - + COLUMN_SQNI_VALUE + " LONG VARCHAR, PRIMARY KEY (" - + COLUMN_SQNI_CONNECTION + ", " + COLUMN_SQNI_INPUT + "), FOREIGN KEY (" - + COLUMN_SQNI_CONNECTION + ") REFERENCES " + TABLE_SQ_CONNECTION + " (" - + COLUMN_SQN_ID + "), FOREIGN KEY (" + COLUMN_SQNI_INPUT + ") REFERENCES " - + TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID + "))"; + + COLUMN_SQNI_CONNECTION + " BIGINT, " + + COLUMN_SQNI_INPUT + " BIGINT, " + + COLUMN_SQNI_VALUE + " LONG VARCHAR," + + " PRIMARY KEY (" + COLUMN_SQNI_CONNECTION + ", " + COLUMN_SQNI_INPUT + ")," + + " FOREIGN KEY (" + COLUMN_SQNI_CONNECTION + ") REFERENCES " + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + ")," + + " FOREIGN KEY (" + COLUMN_SQNI_INPUT + ") REFERENCES " + TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID + ")" + + ")"; // DDL: Create table SQ_JOB_INPUT public static final String QUERY_CREATE_TABLE_SQ_JOB_INPUT = "CREATE TABLE " + TABLE_SQ_JOB_INPUT + " (" - + COLUMN_SQBI_JOB + " BIGINT, " + COLUMN_SQBI_INPUT + " BIGINT, " - + COLUMN_SQBI_VALUE + " LONG VARCHAR, PRIMARY KEY (" - + COLUMN_SQBI_JOB + ", " + COLUMN_SQBI_INPUT + "), FOREIGN KEY (" - + COLUMN_SQBI_JOB + ") REFERENCES " + TABLE_SQ_JOB + " (" - + COLUMN_SQB_ID + "), FOREIGN KEY (" + COLUMN_SQBI_INPUT + ") REFERENCES " - + TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID + "))"; + + COLUMN_SQBI_JOB + " BIGINT, " + + COLUMN_SQBI_INPUT + " BIGINT, " + + COLUMN_SQBI_VALUE + " LONG VARCHAR," + + " PRIMARY KEY (" + COLUMN_SQBI_JOB + ", " + COLUMN_SQBI_INPUT + "), " + + " FOREIGN KEY (" + COLUMN_SQBI_JOB + ") REFERENCES " + TABLE_SQ_JOB + " (" + COLUMN_SQB_ID + "), " + + " FOREIGN KEY (" + COLUMN_SQBI_INPUT + ") REFERENCES " + TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID + "))"; // DDL: Create table SQ_SUBMISSION public static final String QUERY_CREATE_TABLE_SQ_SUBMISSION = @@ -213,11 +260,46 @@ public final class DerbySchemaQuery { + COLUMN_SQS_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), " + COLUMN_SQS_JOB + " BIGINT, " + COLUMN_SQS_STATUS + " VARCHAR(20), " - + COLUMN_SQS_DATE + " TIMESTAMP," - + COLUMN_SQS_EXTERNAL_ID + " VARCHAR(50), " + + COLUMN_SQS_CREATION_DATE + " TIMESTAMP," + + COLUMN_SQS_UPDATE_DATE + " TIMESTAMP," + + COLUMN_SQS_EXTERNAL_ID + " VARCHAR(25), " + + COLUMN_SQS_EXTERNAL_LINK + " VARCHAR(75), " + + COLUMN_SQS_EXCEPTION + " VARCHAR(75), " + + COLUMN_SQS_EXCEPTION_TRACE + " VARCHAR(500), " + "PRIMARY KEY (" + COLUMN_SQS_ID + "), " - + "FOREIGN KEY (" + COLUMN_SQS_JOB + ") REFERENCES " + TABLE_SQ_JOB + "(" - + COLUMN_SQB_ID + "))"; + + "FOREIGN KEY (" + COLUMN_SQS_JOB + ") REFERENCES " + TABLE_SQ_JOB + "(" + COLUMN_SQB_ID + ") ON DELETE CASCADE" + + ")"; + + // DDL: Create table SQ_COUNTER_GROUP + public static final String QUERY_CREATE_TABLE_SQ_COUNTER_GROUP = + "CREATE TABLE " + TABLE_SQ_COUNTER_GROUP + " (" + + COLUMN_SQG_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), " + + COLUMN_SQG_NAME + " VARCHAR(50), " + + "PRIMARY KEY (" + COLUMN_SQG_ID + ")," + + "UNIQUE ( " + COLUMN_SQG_NAME + ")" + + ")"; + + // DDL: Create table SQ_COUNTER + public static final String QUERY_CREATE_TABLE_SQ_COUNTER = + "CREATE TABLE " + TABLE_SQ_COUNTER + " (" + + COLUMN_SQR_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), " + + COLUMN_SQR_NAME + " VARCHAR(50), " + + "PRIMARY KEY (" + COLUMN_SQR_ID + "), " + + "UNIQUE ( " + COLUMN_SQR_NAME + ")" + + ")"; + + // DDL: Create table SQ_COUNTER_SUBMISSION + public static final String QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION = + "CREATE TABLE " + TABLE_SQ_COUNTER_SUBMISSION + " (" + + COLUMN_SQRS_GROUP + " BIGINT, " + + COLUMN_SQRS_COUNTER + " BIGINT, " + + COLUMN_SQRS_SUBMISSION + " BIGINT, " + + COLUMN_SQRS_VALUE + " BIGINT, " + + "PRIMARY KEY (" + COLUMN_SQRS_GROUP + ", " + COLUMN_SQRS_COUNTER + ", " + COLUMN_SQRS_SUBMISSION + "), " + + "FOREIGN KEY (" + COLUMN_SQRS_GROUP + ") REFERENCES " + TABLE_SQ_COUNTER_GROUP + "(" + COLUMN_SQG_ID + "), " + + "FOREIGN KEY (" + COLUMN_SQRS_COUNTER + ") REFERENCES " + TABLE_SQ_COUNTER + "(" + COLUMN_SQR_ID + "), " + + "FOREIGN KEY (" + COLUMN_SQRS_SUBMISSION + ") REFERENCES " + TABLE_SQ_SUBMISSION + "(" + COLUMN_SQS_ID + ") ON DELETE CASCADE " + + ")"; // DML: Fetch connector Given Name public static final String STMT_FETCH_BASE_CONNECTOR = @@ -384,18 +466,22 @@ public final class DerbySchemaQuery { "INSERT INTO " + TABLE_SQ_SUBMISSION + "(" + COLUMN_SQS_JOB + ", " + COLUMN_SQS_STATUS + ", " - + COLUMN_SQS_DATE + ", " - + COLUMN_SQS_EXTERNAL_ID + ") " - + " VALUES(?, ?, ?, ?)"; + + COLUMN_SQS_CREATION_DATE + ", " + + COLUMN_SQS_UPDATE_DATE + ", " + + COLUMN_SQS_EXTERNAL_ID + ", " + + COLUMN_SQS_EXTERNAL_LINK + ", " + + COLUMN_SQS_EXCEPTION + ", " + + COLUMN_SQS_EXCEPTION_TRACE + ") " + + " VALUES(?, ?, ?, ?, ?, ?, ?, ?)"; // DML: Update existing submission public static final String STMT_UPDATE_SUBMISSION = "UPDATE " + TABLE_SQ_SUBMISSION + " SET " - + COLUMN_SQS_JOB + " = ?, " + COLUMN_SQS_STATUS + " = ?, " - + COLUMN_SQS_DATE + " = ?, " - + COLUMN_SQS_EXTERNAL_ID + " = ? " - + "WHERE " + COLUMN_SQS_ID + " = ?"; + + COLUMN_SQS_UPDATE_DATE + " = ?, " + + COLUMN_SQS_EXCEPTION + " = ?, " + + COLUMN_SQS_EXCEPTION_TRACE + " = ?" + + " WHERE " + COLUMN_SQS_ID + " = ?"; // DML: Check if given submission exists public static final String STMT_SELECT_SUBMISSION_CHECK = @@ -404,20 +490,93 @@ public final class DerbySchemaQuery { // DML: Purge old entries public static final String STMT_PURGE_SUBMISSIONS = - "DELETE FROM " + TABLE_SQ_SUBMISSION + " WHERE " + COLUMN_SQS_DATE + " < ?"; + "DELETE FROM " + TABLE_SQ_SUBMISSION + + " WHERE " + COLUMN_SQS_UPDATE_DATE + " < ?"; // DML: Get unfinished public static final String STMT_SELECT_SUBMISSION_UNFINISHED = - "SELECT " + COLUMN_SQS_ID + ", " + COLUMN_SQS_JOB + ", " + COLUMN_SQS_DATE - + ", " + COLUMN_SQS_STATUS + ", " + COLUMN_SQS_EXTERNAL_ID + " FROM " - + TABLE_SQ_SUBMISSION + " WHERE " + COLUMN_SQS_STATUS + " = ?"; + "SELECT " + + COLUMN_SQS_ID + ", " + + COLUMN_SQS_JOB + ", " + + COLUMN_SQS_STATUS + ", " + + COLUMN_SQS_CREATION_DATE + ", " + + COLUMN_SQS_UPDATE_DATE + ", " + + COLUMN_SQS_EXTERNAL_ID + ", " + + COLUMN_SQS_EXTERNAL_LINK + ", " + + COLUMN_SQS_EXCEPTION + ", " + + COLUMN_SQS_EXCEPTION_TRACE + + " FROM " + TABLE_SQ_SUBMISSION + + " WHERE " + COLUMN_SQS_STATUS + " = ?"; // DML: Last submission for a job public static final String STMT_SELECT_SUBMISSION_LAST_FOR_JOB = - "SELECT " + COLUMN_SQS_ID + ", " + COLUMN_SQS_JOB + ", " + COLUMN_SQS_DATE - + ", " + COLUMN_SQS_STATUS + ", " + COLUMN_SQS_EXTERNAL_ID + " FROM " - + TABLE_SQ_SUBMISSION + " WHERE " + COLUMN_SQS_JOB + " = ? ORDER BY " - + COLUMN_SQS_DATE + " DESC"; + "SELECT " + + COLUMN_SQS_ID + ", " + + COLUMN_SQS_JOB + ", " + + COLUMN_SQS_STATUS + ", " + + COLUMN_SQS_CREATION_DATE + ", " + + COLUMN_SQS_UPDATE_DATE + ", " + + COLUMN_SQS_EXTERNAL_ID + ", " + + COLUMN_SQS_EXTERNAL_LINK + ", " + + COLUMN_SQS_EXCEPTION + ", " + + COLUMN_SQS_EXCEPTION_TRACE + + " FROM " + TABLE_SQ_SUBMISSION + + " WHERE " + COLUMN_SQS_JOB + " = ?" + + " ORDER BY " + COLUMN_SQS_UPDATE_DATE + " DESC"; + + // DML: Select counter group + public static final String STMT_SELECT_COUNTER_GROUP = + "SELECT " + + COLUMN_SQG_ID + ", " + + COLUMN_SQG_NAME + " " + + "FROM " + TABLE_SQ_COUNTER_GROUP + " " + + "WHERE " + COLUMN_SQG_NAME + " = ?"; + + // DML: Insert new counter group + public static final String STMT_INSERT_COUNTER_GROUP = + "INSERT INTO " + TABLE_SQ_COUNTER_GROUP + " (" + + COLUMN_SQG_NAME + ") " + + "VALUES (?)"; + + // DML: Select counter + public static final String STMT_SELECT_COUNTER = + "SELECT " + + COLUMN_SQR_ID + ", " + + COLUMN_SQR_NAME + " " + + "FROM " + TABLE_SQ_COUNTER + " " + + "WHERE " + COLUMN_SQR_NAME + " = ?"; + + // DML: Insert new counter + public static final String STMT_INSERT_COUNTER = + "INSERT INTO " + TABLE_SQ_COUNTER + " (" + + COLUMN_SQR_NAME + ") " + + "VALUES (?)"; + + // DML: Insert new counter submission + public static final String STMT_INSERT_COUNTER_SUBMISSION = + "INSERT INTO " + TABLE_SQ_COUNTER_SUBMISSION + " (" + + COLUMN_SQRS_GROUP + ", " + + COLUMN_SQRS_COUNTER + ", " + + COLUMN_SQRS_SUBMISSION + ", " + + COLUMN_SQRS_VALUE + ") " + + "VALUES (?, ?, ?, ?)"; + + // DML: Select counter submission + public static final String STMT_SELECT_COUNTER_SUBMISSION = + "SELECT " + + COLUMN_SQG_NAME + ", " + + COLUMN_SQR_NAME + ", " + + COLUMN_SQRS_VALUE + " " + + "FROM " + TABLE_SQ_COUNTER_SUBMISSION + " " + + "LEFT JOIN " + TABLE_SQ_COUNTER_GROUP + " ON " + COLUMN_SQRS_GROUP + " = " + COLUMN_SQG_ID + " " + + "LEFT JOIN " + TABLE_SQ_COUNTER + " ON " + COLUMN_SQRS_COUNTER + " = " + COLUMN_SQR_ID + " " + + "WHERE " + COLUMN_SQRS_SUBMISSION + " = ? "; + + // DML: Delete rows from counter submission table + public static final String STMT_DELETE_COUNTER_SUBMISSION = + "DELETE FROM " + TABLE_SQ_COUNTER_SUBMISSION + + " WHERE " + COLUMN_SQRS_SUBMISSION + " = ?"; + private DerbySchemaQuery() { // Disable explicit object creation diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java index 7aa362e4..0efa19dc 100644 --- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java +++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java @@ -95,6 +95,9 @@ protected void createSchema() throws Exception { runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT); runQuery(QUERY_CREATE_TABLE_SQ_JOB_INPUT); runQuery(QUERY_CREATE_TABLE_SQ_SUBMISSION); + runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP); + runQuery(QUERY_CREATE_TABLE_SQ_COUNTER); + runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION); } /** @@ -253,14 +256,41 @@ public void loadJobs() throws Exception { * @throws Exception */ public void loadSubmissions() throws Exception { - runQuery("INSERT INTO SQOOP.SQ_SUBMISSION" - + "(SQS_JOB, SQS_STATUS, SQS_DATE, SQS_EXTERNAL_ID) VALUES " - + "(1, 'RUNNING', '2012-01-01 01:01:01', 'job_1')," - + "(2, 'SUCCEEDED', '2012-01-02 01:01:01', 'job_2')," - + "(3, 'FAILED', '2012-01-03 01:01:01', 'job_3')," - + "(4, 'UNKNOWN', '2012-01-04 01:01:01', 'job_4')," - + "(1, 'RUNNING', '2012-01-05 01:01:01', 'job_5')" + runQuery("INSERT INTO SQOOP.SQ_COUNTER_GROUP " + + "(SQG_NAME) " + + "VALUES" + + "('gA'), ('gB')" ); + + runQuery("INSERT INTO SQOOP.SQ_COUNTER " + + "(SQR_NAME) " + + "VALUES" + + "('cA'), ('cB')" + ); + + runQuery("INSERT INTO SQOOP.SQ_SUBMISSION" + + "(SQS_JOB, SQS_STATUS, SQS_CREATION_DATE, SQS_UPDATE_DATE," + + " SQS_EXTERNAL_ID, SQS_EXTERNAL_LINK, SQS_EXCEPTION," + + " SQS_EXCEPTION_TRACE)" + + "VALUES " + + "(1, 'RUNNING', '2012-01-01 01:01:01', '2012-01-01 01:01:01', 'job_1'," + + "NULL, NULL, NULL)," + + "(2, 'SUCCEEDED', '2012-01-01 01:01:01', '2012-01-02 01:01:01', 'job_2'," + + " NULL, NULL, NULL)," + + "(3, 'FAILED', '2012-01-01 01:01:01', '2012-01-03 01:01:01', 'job_3'," + + " NULL, NULL, NULL)," + + "(4, 'UNKNOWN', '2012-01-01 01:01:01', '2012-01-04 01:01:01', 'job_4'," + + " NULL, NULL, NULL)," + + "(1, 'RUNNING', '2012-01-01 01:01:01', '2012-01-05 01:01:01', 'job_5'," + + " NULL, NULL, NULL)" + ); + + runQuery("INSERT INTO SQOOP.SQ_COUNTER_SUBMISSION " + + "(SQRS_GROUP, SQRS_COUNTER, SQRS_SUBMISSION, SQRS_VALUE) " + + "VALUES" + + "(1, 1, 4, 300)" + ); + } protected MConnector getConnector() { diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java index 3433b20f..8fce0dd9 100644 --- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java +++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java @@ -19,6 +19,9 @@ import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.submission.SubmissionStatus; +import org.apache.sqoop.submission.counter.Counter; +import org.apache.sqoop.submission.counter.CounterGroup; +import org.apache.sqoop.submission.counter.Counters; import java.util.Calendar; import java.util.Date; @@ -27,7 +30,7 @@ /** * */ -public class TestSubmissionHandling extends DerbyTestCase { +public class TestSubmissionHandling extends DerbyTestCase { DerbyRepositoryHandler handler; @@ -84,8 +87,29 @@ public void testExistsSubmission() throws Exception { } public void testCreateSubmission() throws Exception { - MSubmission submission = - new MSubmission(1, new Date(), SubmissionStatus.RUNNING, "job-x"); + Date creationDate = new Date(); + Date updateDate = new Date(); + + CounterGroup firstGroup = new CounterGroup("ga"); + CounterGroup secondGroup = new CounterGroup("gb"); + firstGroup.addCounter(new Counter("ca", 100)); + firstGroup.addCounter(new Counter("cb", 200)); + secondGroup.addCounter(new Counter("ca", 300)); + secondGroup.addCounter(new Counter("cd", 400)); + Counters counters = new Counters(); + counters.addCounterGroup(firstGroup); + counters.addCounterGroup(secondGroup); + + MSubmission submission = new MSubmission(); + submission.setJobId(1); + submission.setStatus(SubmissionStatus.RUNNING); + submission.setCreationDate(creationDate); + submission.setLastUpdateDate(updateDate); + submission.setExternalId("job-x"); + submission.setExternalLink("http://somewhere"); + submission.setExceptionInfo("RuntimeException"); + submission.setExceptionStackTrace("Yeah it happens"); + submission.setCounters(counters); handler.createSubmission(submission, getDerbyConnection()); @@ -101,9 +125,41 @@ public void testCreateSubmission() throws Exception { assertEquals(1, submission.getJobId()); assertEquals(SubmissionStatus.RUNNING, submission.getStatus()); + assertEquals(creationDate, submission.getCreationDate()); + assertEquals(updateDate, submission.getLastUpdateDate()); assertEquals("job-x", submission.getExternalId()); + assertEquals("http://somewhere", submission.getExternalLink()); + assertEquals("RuntimeException", submission.getExceptionInfo()); + assertEquals("Yeah it happens", submission.getExceptionStackTrace()); - // Let's create second connection + CounterGroup group; + Counter counter; + Counters retrievedCounters = submission.getCounters(); + assertNotNull(retrievedCounters); + + group = counters.getCounterGroup("ga"); + assertNotNull(group); + + counter = group.getCounter("ca"); + assertNotNull(counter); + assertEquals(100, counter.getValue()); + + counter = group.getCounter("cb"); + assertNotNull(counter); + assertEquals(200, counter.getValue()); + + group = counters.getCounterGroup("gb"); + assertNotNull(group); + + counter = group.getCounter("ca"); + assertNotNull(counter); + assertEquals(300, counter.getValue()); + + counter = group.getCounter("cd"); + assertNotNull(counter); + assertEquals(400, counter.getValue()); + + // Let's create second (simpler) connection submission = new MSubmission(1, new Date(), SubmissionStatus.SUCCEEDED, "job-x"); handler.createSubmission(submission, getDerbyConnection()); @@ -163,4 +219,27 @@ public void testPurgeSubmissions() throws Exception { assertEquals(0, submissions.size()); assertCountForTable("SQOOP.SQ_SUBMISSION", 0); } + + /** + * Test that by directly removing jobs we will also remove associated + * submissions and counters. + * + * @throws Exception + */ + public void testDeleteJobs() throws Exception { + loadSubmissions(); + assertCountForTable("SQOOP.SQ_SUBMISSION", 5); + + handler.deleteJob(1, getDerbyConnection()); + assertCountForTable("SQOOP.SQ_SUBMISSION", 3); + + handler.deleteJob(2, getDerbyConnection()); + assertCountForTable("SQOOP.SQ_SUBMISSION", 2); + + handler.deleteJob(3, getDerbyConnection()); + assertCountForTable("SQOOP.SQ_SUBMISSION", 1); + + handler.deleteJob(4, getDerbyConnection()); + assertCountForTable("SQOOP.SQ_SUBMISSION", 0); + } }