diff --git a/common/src/main/java/org/apache/sqoop/model/MSubmission.java b/common/src/main/java/org/apache/sqoop/model/MSubmission.java index 58d92c78..eb90f9a8 100644 --- a/common/src/main/java/org/apache/sqoop/model/MSubmission.java +++ b/common/src/main/java/org/apache/sqoop/model/MSubmission.java @@ -19,6 +19,7 @@ import org.apache.sqoop.classification.InterfaceAudience; import org.apache.sqoop.classification.InterfaceStability; +import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.submission.SubmissionStatus; import org.apache.sqoop.submission.counter.Counters; @@ -106,6 +107,16 @@ public class MSubmission extends MAccountableEntity { */ Schema toSchema; + /** + * Context objects that are associated with the job. + * + * Please note that we are not sending those to client as they can potentially contain + * sensitive information. + */ + ImmutableContext fromConnectorContext; + ImmutableContext toConnectorContext; + ImmutableContext driverContext; + public MSubmission() { status = SubmissionStatus.UNKNOWN; progress = -1; @@ -187,6 +198,30 @@ public void setError(SubmissionError error) { this.error = error; } + public ImmutableContext getFromConnectorContext() { + return fromConnectorContext; + } + + public void setFromConnectorContext(ImmutableContext fromConnectorContext) { + this.fromConnectorContext = fromConnectorContext; + } + + public ImmutableContext getToConnectorContext() { + return toConnectorContext; + } + + public void setToConnectorContext(ImmutableContext toConnectorContext) { + this.toConnectorContext = toConnectorContext; + } + + public ImmutableContext getDriverContext() { + return driverContext; + } + + public void setDriverContext(ImmutableContext driverContext) { + this.driverContext = driverContext; + } + public SubmissionError getError() { if(this.error == null) { this.error = new SubmissionError(); diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java index 4a66def7..40445108 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -340,6 +340,11 @@ private JobRequest createJobRequest(long jobId, MSubmission submission) { jobRequest.setConnector(Direction.FROM, fromConnector); jobRequest.setConnector(Direction.TO, toConnector); + // We also have to store the JobRequest's context pointers to the associated Submission + submission.setFromConnectorContext(jobRequest.getConnectorContext(Direction.FROM)); + submission.setToConnectorContext(jobRequest.getConnectorContext(Direction.TO)); + submission.setDriverContext(jobRequest.getDriverContext()); + jobRequest.setConnectorLinkConfig(Direction.FROM, fromLinkConfig); jobRequest.setConnectorLinkConfig(Direction.TO, toLinkConfig); diff --git a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java index b648d4de..96fba2c2 100644 --- a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java +++ b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java @@ -21,6 +21,9 @@ import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.DirectionError; +import org.apache.sqoop.common.ImmutableContext; +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SupportedDirections; import org.apache.sqoop.driver.Driver; @@ -970,6 +973,10 @@ public void createSubmission(MSubmission submission, Connection conn) { createSubmissionCounters(submissionId, submission.getCounters(), conn); } + createSubmissionContext(submissionId, submission.getFromConnectorContext(), ContextType.FROM, conn); + createSubmissionContext(submissionId, submission.getToConnectorContext(), ContextType.TO, conn); + createSubmissionContext(submissionId, submission.getDriverContext(), ContextType.DRIVER, conn); + // Save created persistence id submission.setPersistenceId(submissionId); @@ -1035,6 +1042,8 @@ public void updateSubmission(MSubmission submission, Connection conn) { createSubmissionCounters(submission.getPersistenceId(), submission.getCounters(), conn); } + // We are not updating contexts as they are immutable once the submission is created + } catch (SQLException ex) { logException(ex, submission); throw new SqoopException(CommonRepositoryError.COMMON_0032, ex); @@ -1307,6 +1316,98 @@ private long insertAndGetDriverId(MDriver mDriver, Connection conn) { } } + private void createSubmissionContext(long submissionId, ImmutableContext context, ContextType contextType, Connection conn) throws SQLException { + PreparedStatement stmt = null; + + if(context == null) { + return; + } + + try { + stmt = conn.prepareStatement(crudQueries.getStmtInsertContext()); + long contextTypeId = getContextType(contextType, conn); + + for(Map.Entry entry: context) { + long propertyId = getContextProperty(entry.getKey(), conn); + + stmt.setLong(1, submissionId); + stmt.setLong(2, contextTypeId); + stmt.setLong(3, propertyId); + stmt.setString(4, entry.getValue()); + + stmt.executeUpdate(); + } + } finally { + closeStatements(stmt); + } + } + + private long getContextType(ContextType type, Connection conn) throws SQLException { + PreparedStatement select = null; + PreparedStatement insert = null; + ResultSet rsSelect = null; + ResultSet rsInsert = null; + + try { + select = conn.prepareStatement(crudQueries.getStmtSelectContextType()); + select.setString(1, type.toString()); + + rsSelect = select.executeQuery(); + + if(rsSelect.next()) { + return rsSelect.getLong(1); + } + + insert = conn.prepareStatement(crudQueries.getStmtInsertContextType(), Statement.RETURN_GENERATED_KEYS); + insert.setString(1, type.toString()); + insert.executeUpdate(); + + rsInsert = insert.getGeneratedKeys(); + + if (!rsInsert.next()) { + throw new SqoopException(CommonRepositoryError.COMMON_0010); + } + + return rsInsert.getLong(1); + } finally { + closeResultSets(rsSelect, rsInsert); + closeStatements(select, insert); + } + } + + private long getContextProperty(String property, Connection conn) throws SQLException { + PreparedStatement select = null; + PreparedStatement insert = null; + ResultSet rsSelect = null; + ResultSet rsInsert = null; + + try { + select = conn.prepareStatement(crudQueries.getStmtSelectContextProperty()); + select.setString(1, property); + + rsSelect = select.executeQuery(); + + if(rsSelect.next()) { + return rsSelect.getLong(1); + } + + insert = conn.prepareStatement(crudQueries.getStmtInsertContextProperty(), Statement.RETURN_GENERATED_KEYS); + insert.setString(1, property); + insert.executeUpdate(); + + rsInsert = insert.getGeneratedKeys(); + + if (!rsInsert.next()) { + throw new SqoopException(CommonRepositoryError.COMMON_0010); + } + + return rsInsert.getLong(1); + } finally { + closeResultSets(rsSelect, rsInsert); + closeStatements(select, insert); + } + } + /** * Stores counters for given submission in repository. * @@ -1449,9 +1550,38 @@ private MSubmission loadSubmission(ResultSet rs, Connection conn) throws SQLExce Counters counters = loadCountersSubmission(rs.getLong(1), conn); submission.setCounters(counters); + submission.setFromConnectorContext(loadContextSubmission(rs.getLong(1), ContextType.FROM, conn)); + submission.setToConnectorContext(loadContextSubmission(rs.getLong(1), ContextType.TO, conn)); + submission.setDriverContext(loadContextSubmission(rs.getLong(1), ContextType.DRIVER, conn)); + return submission; } + private MutableContext loadContextSubmission(long submissionId, ContextType type, Connection conn) throws SQLException { + PreparedStatement stmt = null; + ResultSet rs = null; + try { + stmt = conn.prepareStatement(crudQueries.getStmtSelectContext()); + stmt.setLong(1, submissionId); + stmt.setLong(2, getContextType(type, conn)); + rs = stmt.executeQuery(); + + MutableContext context = new MutableMapContext(); + + while (rs.next()) { + String key = rs.getString(1); + String value = rs.getString(2); + + context.setString(key, value); + } + + return context; + } finally { + closeStatements(stmt); + closeResultSets(rs); + } + } + private Counters loadCountersSubmission(long submissionId, Connection conn) throws SQLException { PreparedStatement stmt = null; ResultSet rs = null; @@ -2486,6 +2616,7 @@ protected void runQuery(String query, Connection conn, Object... args) { LOG.info("QUERY(" + query + ") Update count: " + updateCount); } } catch (SQLException ex) { + LOG.error("Can't execute query: " + query, ex); throw new SqoopException(CommonRepositoryError.COMMON_0000, query, ex); } finally { closeStatements(stmt); diff --git a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryInsertUpdateDeleteSelectQuery.java b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryInsertUpdateDeleteSelectQuery.java index 43fabb44..932557fd 100644 --- a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryInsertUpdateDeleteSelectQuery.java +++ b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryInsertUpdateDeleteSelectQuery.java @@ -549,6 +549,54 @@ public class CommonRepositoryInsertUpdateDeleteSelectQuery { + " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQS_JOB) + " = ?" + " ORDER BY " + CommonRepoUtils.escapeColumnName(COLUMN_SQS_UPDATE_DATE) + " DESC"; + // DML: Select context type + public static final String STMT_SELECT_CONTEXT_TYPE = + "SELECT " + + CommonRepoUtils.escapeColumnName(COLUMN_SQCT_ID) + ", " + + CommonRepoUtils.escapeColumnName(COLUMN_SQCT_NAME) + " " + + "FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT_TYPE) + " " + + "WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQCT_NAME) + " = substr(?, 1, 25)"; + + // DML: Insert new context type + public static final String STMT_INSERT_CONTEXT_TYPE = + "INSERT INTO " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT_TYPE) + " (" + + CommonRepoUtils.escapeColumnName(COLUMN_SQCT_NAME) + ") " + + "VALUES (substr(?, 1, 25))"; + + // DML: Select context property + public static final String STMT_SELECT_CONTEXT_PROPERTY = + "SELECT " + + CommonRepoUtils.escapeColumnName(COLUMN_SQCP_ID) + ", " + + CommonRepoUtils.escapeColumnName(COLUMN_SQCP_NAME) + " " + + "FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT_PROPERTY) + " " + + "WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQCP_NAME) + " = substr(?, 1, 500)"; + + // DML: Insert new context property + public static final String STMT_INSERT_CONTEXT_PROPERTY = + "INSERT INTO " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT_PROPERTY) + " (" + + CommonRepoUtils.escapeColumnName(COLUMN_SQCP_NAME) + ") " + + "VALUES (substr(?, 1, 500))"; + + // DML: Insert new context + public static final String STMT_INSERT_CONTEXT = + "INSERT INTO " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT) + " (" + + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_SUBMISSION) + ", " + + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_TYPE) + ", " + + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_PROPERTY) + ", " + + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_VALUE) + ") " + + "VALUES (?, ?, ?, substr(?, 1, 500))"; + + // DML: Select context + public static final String STMT_SELECT_CONTEXT = + "SELECT " + + CommonRepoUtils.escapeColumnName(COLUMN_SQCP_NAME) + ", " + + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_VALUE) + " " + + "FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT) + " " + + "LEFT JOIN " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT_PROPERTY) + + " ON " + CommonRepoUtils.escapeColumnName(COLUMN_SQCP_ID) + " = " + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_PROPERTY) + " " + + "WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_SUBMISSION) + " = ? " + + " AND " + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_TYPE) + " = ? "; + // DML: Select counter group public static final String STMT_SELECT_COUNTER_GROUP = "SELECT " @@ -820,6 +868,30 @@ public String getStmtSelectSubmissionsForJob() { return STMT_SELECT_SUBMISSIONS_FOR_JOB; } + public String getStmtSelectContextType() { + return STMT_SELECT_CONTEXT_TYPE; + } + + public String getStmtInsertContextType() { + return STMT_INSERT_CONTEXT_TYPE; + } + + public String getStmtSelectContextProperty() { + return STMT_SELECT_CONTEXT_PROPERTY; + } + + public String getStmtInsertContextProperty() { + return STMT_INSERT_CONTEXT_PROPERTY; + } + + public String getStmtInsertContext() { + return STMT_INSERT_CONTEXT; + } + + public String getStmtSelectContext() { + return STMT_SELECT_CONTEXT; + } + public String getStmtSelectCounterGroup() { return STMT_SELECT_COUNTER_GROUP; } diff --git a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositorySchemaConstants.java b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositorySchemaConstants.java index 545b9908..d1940e82 100644 --- a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositorySchemaConstants.java +++ b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositorySchemaConstants.java @@ -243,7 +243,35 @@ public final class CommonRepositorySchemaConstants { public static final String COLUMN_SQRS_VALUE = "SQRS_VALUE"; - // Constraints + // SQ_CONTEXT_TYPE + + public static final String TABLE_SQ_CONTEXT_TYPE = "SQ_CONTEXT_TYPE"; + + public static final String COLUMN_SQCT_ID = "SQCT_ID"; + + public static final String COLUMN_SQCT_NAME = "SQCT_NAME"; + + // SQ_CONTEXT_PROPERTY + + public static final String TABLE_SQ_CONTEXT_PROPERTY = "SQ_CONTEXT_PROPERTY"; + + public static final String COLUMN_SQCP_ID = "SQCP_ID"; + + public static final String COLUMN_SQCP_NAME = "SQCP_NAME"; + + // SQ_CONTEXT + + public static final String TABLE_SQ_CONTEXT = "SQ_CONTEXT"; + + public static final String COLUMN_SQCO_ID = "SQCO_ID"; + + public static final String COLUMN_SQCO_SUBMISSION = "SQCO_SUBMISSION"; + + public static final String COLUMN_SQCO_TYPE = "SQCO_TYPE"; + + public static final String COLUMN_SQCO_PROPERTY = "SQCO_PROPERTY"; + + public static final String COLUMN_SQCO_VALUE = "SQCO_VALUE"; // Constraints @@ -319,6 +347,15 @@ public final class CommonRepositorySchemaConstants { public static final String CONSTRAINT_SQRS_SQS_NAME = CONSTRAINT_PREFIX + "SQRS_SQS"; + public static final String CONSTRAINT_SQCO_SQS_ID + = CONSTRAINT_PREFIX + "SQCO_SQS_ID"; + + public static final String CONSTRAINT_SQCO_SQCT_ID + = CONSTRAINT_PREFIX + "SQCO_SQCT_ID"; + + public static final String CONSTRAINT_SQCO_SQCP_ID + = CONSTRAINT_PREFIX + "SQCO_SQCP_ID"; + private CommonRepositorySchemaConstants() { // Disable explicit object creation } diff --git a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/ContextType.java b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/ContextType.java new file mode 100644 index 00000000..3533fce7 --- /dev/null +++ b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/ContextType.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.repository.common; + +/** + * Context types that can be stored in the repository + */ +public enum ContextType { + FROM, + TO, + DRIVER, +} diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java index 41414345..0f093f1e 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java @@ -44,8 +44,10 @@ public final class DerbyRepoConstants { * Changed to FROM/TO design. * 5 - Version 1.99.5 * 6 - Version 1.99.6 + * 7 - Version 1.99.7 + * Stored context classes in SQ_CONTEXT tables */ - public static final int LATEST_DERBY_REPOSITORY_VERSION = 6; + public static final int LATEST_DERBY_REPOSITORY_VERSION = 7; private DerbyRepoConstants() { // Disable explicit object creation diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java index fc1a4b59..5c247e1f 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java @@ -315,6 +315,12 @@ public void createOrUpgradeRepository(Connection conn) { if (repositoryVersion < 6) { runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_INPUT_CONSTRAINT_2, conn); } + // 1.99.7 release changes + if (repositoryVersion < 7) { + runQuery(QUERY_CREATE_TABLE_SQ_CONTEXT_TYPE, conn); + runQuery(QUERY_CREATE_TABLE_SQ_CONTEXT_PROPERTY, conn); + runQuery(QUERY_CREATE_TABLE_SQ_CONTEXT, conn); + } // last step upgrade the repository version to the latest value in the code upgradeRepositoryVersion(conn); diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaCreateQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaCreateQuery.java index 27bcde7a..17700367 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaCreateQuery.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaCreateQuery.java @@ -269,6 +269,47 @@ * * *

+ *

+ * SQ_CONTEXT_TYPE: Type of stored context + * + *

+ *    +----------------------------+
+ *    | SQ_CONTEXT_TYPE            |
+ *    +----------------------------+
+ *    | SQCT_ID: BIGINT PK         |
+ *    | SQCT_NAME: VARCHAR(25)     |
+ *    +----------------------------+
+ * 
+ * + *

+ *

+ * SQ_CONTEXT_PROPERTY: Names (keys) of stored properties + * (We're assuming that they will repeat a lot in various submissions) + * + *

+ *    +----------------------------+
+ *    | SQ_CONTEXT_PROPERTY        |
+ *    +----------------------------+
+ *    | SQCP_ID: BIGINT PK         |
+ *    | SQCP_NAME: VARCHAR(500)    |
+ *    +----------------------------+
+ * 
+ *

+ *

+ * SQ_CONTEXT: Context instances for each submission + * + *

+ *    +----------------------------+
+ *    | SQ_CONTEXT                 |
+ *    +----------------------------+
+ *    | SQCO_ID: BIGINT PK         |
+ *    | SQCO_SUBMISSION: BIGINT    | FK SQ_JOB_SUBMISSION(SQS_ID)
+ *    | SQCO_TYPE: BIGINT          | FK SQ_CONTEXT_TYPE(SQCT_ID)
+ *    | SQCO_PROPERTY: BIGINT      | FK SQ_CONTEXT_PROPERTY(SQCP_ID)
+ *    | SQCO_VALUE: VARCHAR(500)   |
+ *    +----------------------------+
+ * 
+ *

*/ // NOTE: If you have signed yourself to modify the schema for the repository @@ -508,6 +549,44 @@ public final class DerbySchemaCreateQuery { + "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_SUBMISSION_NAME) + "(" + CommonRepoUtils.escapeColumnName(COLUMN_SQS_ID) + ") ON DELETE CASCADE " + ")"; + // DDL: Create table SQ_CONTEXT_TYPE + public static final String QUERY_CREATE_TABLE_SQ_CONTEXT_TYPE = + "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT_TYPE) + " (" + + CommonRepoUtils.escapeColumnName(COLUMN_SQCT_ID) + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), " + + CommonRepoUtils.escapeColumnName(COLUMN_SQCT_NAME) + " VARCHAR(25), " + + "PRIMARY KEY (" + CommonRepoUtils.escapeColumnName(COLUMN_SQCT_ID) + "), " + + "UNIQUE ( " + CommonRepoUtils.escapeColumnName(COLUMN_SQCT_NAME) + ")" + + ")"; + + // DDL: Create table SQ_CONTEXT_PROPERTY + public static final String QUERY_CREATE_TABLE_SQ_CONTEXT_PROPERTY = + "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT_PROPERTY) + " (" + + CommonRepoUtils.escapeColumnName(COLUMN_SQCP_ID) + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), " + + CommonRepoUtils.escapeColumnName(COLUMN_SQCP_NAME) + " VARCHAR(500), " + + "PRIMARY KEY (" + CommonRepoUtils.escapeColumnName(COLUMN_SQCP_ID) + "), " + + "UNIQUE ( " + CommonRepoUtils.escapeColumnName(COLUMN_SQCP_NAME) + ")" + + ")"; + + // DDL: Create table SQ_CONTEXT + public static final String QUERY_CREATE_TABLE_SQ_CONTEXT = + "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT) + " (" + + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_ID) + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), " + + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_SUBMISSION) + " BIGINT, " + + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_TYPE) + " BIGINT, " + + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_PROPERTY) + " BIGINT, " + + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_VALUE) + " VARCHAR(500), " + + "PRIMARY KEY (" + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_ID) + "), " + + "CONSTRAINT " + CommonRepoUtils.getConstraintName(SCHEMA_SQOOP, CONSTRAINT_SQCO_SQS_ID) + " " + + "FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_SUBMISSION) + ") " + + "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_SUBMISSION_NAME) + "(" + CommonRepoUtils.escapeColumnName(COLUMN_SQS_ID) + ") ON DELETE CASCADE, " + + "CONSTRAINT " + CommonRepoUtils.getConstraintName(SCHEMA_SQOOP, CONSTRAINT_SQCO_SQCT_ID) + " " + + "FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_TYPE) + ") " + + "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT_TYPE) + "(" + CommonRepoUtils.escapeColumnName(COLUMN_SQCT_ID) + "), " + + "CONSTRAINT " + CommonRepoUtils.getConstraintName(SCHEMA_SQOOP, CONSTRAINT_SQCO_SQCP_ID) + " " + + "FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_PROPERTY) + ") " + + "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT_PROPERTY) + "(" + CommonRepoUtils.escapeColumnName(COLUMN_SQCP_ID) + ") " + + ")"; + private DerbySchemaCreateQuery() { } diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java index fea77bad..d2e793c8 100644 --- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java +++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java @@ -221,7 +221,12 @@ protected void createOrUpgradeSchema(int version) throws Exception { runQuery(QUERY_UPGRADE_TABLE_SQ_INPUT_ADD_COLUMN_SQI_EDITABLE); runQuery(QUERY_CREATE_TABLE_SQ_INPUT_RELATION); runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_INPUT_CONSTRAINT_2); + } + if (version >= 7) { + runQuery(QUERY_CREATE_TABLE_SQ_CONTEXT_TYPE); + runQuery(QUERY_CREATE_TABLE_SQ_CONTEXT_PROPERTY); + runQuery(QUERY_CREATE_TABLE_SQ_CONTEXT); } // deprecated repository version @@ -559,6 +564,7 @@ protected void loadConnectorAndDriverConfig(int version) throws Exception { case 4: case 5: case 6: + case 7: loadConnectorAndDriverConfigVersion4(); break; @@ -597,6 +603,7 @@ public void loadConnectionsOrLinks(int version) throws Exception { case 4: case 5: case 6: + case 7: // Insert two links - CA and CB // Connector 1 has one link config runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONFIGURABLE) " + "VALUES('CA', 1)"); @@ -658,6 +665,7 @@ public void loadJobs(int version) throws Exception { case 4: case 5: case 6: + case 7: for (String name : new String[] { "JA", "JB", "JC", "JD" }) { runQuery("INSERT INTO SQOOP.SQ_JOB(SQB_NAME, SQB_FROM_LINK, SQB_TO_LINK)" + " VALUES('" + name + index + "', 1, 1)"); diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java index bb96e3c9..405c0b8f 100644 --- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java +++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java @@ -17,6 +17,8 @@ */ package org.apache.sqoop.repository.derby; +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.submission.SubmissionStatus; import org.apache.sqoop.submission.counter.Counter; @@ -106,6 +108,16 @@ public void testCreateSubmission() throws Exception { counters.addCounterGroup(firstGroup); counters.addCounterGroup(secondGroup); + MutableContext fromContext = new MutableMapContext(); + MutableContext toContext = new MutableMapContext(); + MutableContext driverContext = new MutableMapContext(); + fromContext.setString("from1", "value1"); + fromContext.setString("from2", "value2"); + toContext.setString("to1", "value1"); + toContext.setString("to2", "value2"); + driverContext.setString("driver1", "value1"); + driverContext.setString("driver2", "value2"); + MSubmission submission = new MSubmission(); submission.setJobId(1); submission.setStatus(SubmissionStatus.RUNNING); @@ -116,6 +128,9 @@ public void testCreateSubmission() throws Exception { submission.getError().setErrorSummary("RuntimeException"); submission.getError().setErrorDetails("Yeah it happens"); submission.setCounters(counters); + submission.setFromConnectorContext(fromContext); + submission.setToConnectorContext(toContext); + submission.setDriverContext(driverContext); handler.createSubmission(submission, getDerbyDatabaseConnection()); @@ -164,6 +179,16 @@ public void testCreateSubmission() throws Exception { assertNotNull(counter); assertEquals(400, counter.getValue()); + assertNotNull(submission.getFromConnectorContext()); + assertNotNull(submission.getToConnectorContext()); + assertNotNull(submission.getDriverContext()); + assertEquals(submission.getFromConnectorContext().getString("from1"), "value1"); + assertEquals(submission.getFromConnectorContext().getString("from2"), "value2"); + assertEquals(submission.getToConnectorContext().getString("to1"), "value1"); + assertEquals(submission.getToConnectorContext().getString("to2"), "value2"); + assertEquals(submission.getDriverContext().getString("driver1"), "value1"); + assertEquals(submission.getDriverContext().getString("driver2"), "value2"); + // Let's create second (simpler) connection submission = new MSubmission(1, new Date(), SubmissionStatus.SUCCEEDED, "job-x"); handler.createSubmission(submission, getDerbyDatabaseConnection()); diff --git a/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlRepoConstants.java b/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlRepoConstants.java index 5951a4ca..b600ee44 100644 --- a/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlRepoConstants.java +++ b/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlRepoConstants.java @@ -24,8 +24,10 @@ public class PostgresqlRepoConstants { * * History: * 1 - Version 1.99.5 + * 2 - Version 1.99.7 + * Stored context classes in SQ_CONTEXT tables */ - public static final int LATEST_POSTGRESQL_REPOSITORY_VERSION = 1; + public static final int LATEST_POSTGRESQL_REPOSITORY_VERSION = 2; private PostgresqlRepoConstants() { // Disable explicit object creation diff --git a/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlRepositoryHandler.java b/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlRepositoryHandler.java index 5ecc53aa..81bc0000 100644 --- a/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlRepositoryHandler.java +++ b/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlRepositoryHandler.java @@ -110,12 +110,13 @@ public int detectRepositoryVersion(Connection conn) { @Override public void createOrUpgradeRepository(Connection conn) { int version = detectRepositoryVersion(conn); + LOG.info("Detected repository version: " + version); if (version == PostgresqlRepoConstants.LATEST_POSTGRESQL_REPOSITORY_VERSION) { return; } - if (version == 0) { + if (version < 1) { runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_SCHEMA_SQOOP, conn); runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONFIGURABLE, conn); runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONFIG, conn); @@ -136,9 +137,11 @@ public void createOrUpgradeRepository(Connection conn) { // Insert FROM and TO directions. insertDirections(conn); - } else if (version < 4) { - LOG.error("Found unknown version for PostgreSQL repository: " + version); - throw new SqoopException(PostgresqlRepoError.POSTGRESQLREPO_0005, "Found version: " + version); + } + if (version < 2) { + runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONTEXT_TYPE, conn); + runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONTEXT_PROPERTY, conn); + runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONTEXT, conn); } ResultSet rs = null; diff --git a/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlSchemaCreateQuery.java b/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlSchemaCreateQuery.java index 7e95be12..8358df0d 100644 --- a/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlSchemaCreateQuery.java +++ b/repository/repository-postgresql/src/main/java/org/apache/sqoop/repository/postgresql/PostgresqlSchemaCreateQuery.java @@ -246,6 +246,47 @@ * +----------------------------+ * *

+ *

+ * SQ_CONTEXT_TYPE: Type of stored context + * + *

+ *    +----------------------------+
+ *    | SQ_CONTEXT_TYPE            |
+ *    +----------------------------+
+ *    | SQCT_ID: BIGINT PK         |
+ *    | SQCT_NAME: VARCHAR(25)     |
+ *    +----------------------------+
+ * 
+ * + *

+ *

+ * SQ_CONTEXT_PROPERTY: Names (keys) of stored properties + * (We're assuming that they will repeat a lot in various submissions) + * + *

+ *    +----------------------------+
+ *    | SQ_CONTEXT_PROPERTY        |
+ *    +----------------------------+
+ *    | SQCP_ID: BIGINT PK         |
+ *    | SQCP_NAME: VARCHAR(500)    |
+ *    +----------------------------+
+ * 
+ *

+ *

+ * SQ_CONTEXT: Context instances for each submission + * + *

+ *    +----------------------------+
+ *    | SQ_CONTEXT                 |
+ *    +----------------------------+
+ *    | SQCO_ID: BIGINT PK         |
+ *    | SQCO_SUBMISSION: BIGINT    | FK SQ_JOB_SUBMISSION(SQS_ID)
+ *    | SQCO_TYPE: BIGINT          | FK SQ_CONTEXT_TYPE(SQCT_ID)
+ *    | SQCO_PROPERTY: BIGINT      | FK SQ_CONTEXT_PROPERTY(SQCP_ID)
+ *    | SQCO_VALUE: VARCHAR(500)   |
+ *    +----------------------------+
+ * 
+ *

*/ public class PostgresqlSchemaCreateQuery { @@ -460,6 +501,39 @@ public class PostgresqlSchemaCreateQuery { + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_SUBMISSION_NAME) + "(" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_ID) + ") ON DELETE CASCADE" + ")"; + // DDL: Create table SQ_CONTEXT_TYPE + public static final String QUERY_CREATE_TABLE_SQ_CONTEXT_TYPE = + "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT_TYPE) + " (" + + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCT_ID) + " BIGSERIAL PRIMARY KEY NOT NULL, " + + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCT_NAME) + " VARCHAR(25) UNIQUE" + + ")"; + + // DDL: Create table SQ_CONTEXT_PROPERTY + public static final String QUERY_CREATE_TABLE_SQ_CONTEXT_PROPERTY = + "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT_PROPERTY) + " (" + + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCP_ID) + " BIGSERIAL PRIMARY KEY NOT NULL, " + + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCP_NAME) + " VARCHAR(500) UNIQUE" + + ")"; + + // DDL: Create table SQ_CONTEXT + public static final String QUERY_CREATE_TABLE_SQ_CONTEXT = + "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT) + " (" + + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_ID) + " BIGSERIAL PRIMARY KEY NOT NULL, " + + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_SUBMISSION) + " BIGINT, " + + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_TYPE) + " BIGINT, " + + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_PROPERTY) + " BIGINT, " + + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_VALUE) + " VARCHAR(500), " + + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQCO_SQS_ID) + " " + + "FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_SUBMISSION) + ") " + + "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_SUBMISSION_NAME) + "(" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_ID) + ") ON DELETE CASCADE, " + + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQCO_SQCT_ID) + " " + + "FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_TYPE) + ") " + + "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT_TYPE) + "(" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCT_ID) + "), " + + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQCO_SQCP_ID) + " " + + "FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_PROPERTY) + ") " + + "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT_PROPERTY) + "(" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCP_ID) + ") " + + ")"; + private PostgresqlSchemaCreateQuery() { // Disable explicit object creation } diff --git a/repository/repository-postgresql/src/test/java/org/apache/sqoop/integration/repository/postgresql/TestSubmissionHandling.java b/repository/repository-postgresql/src/test/java/org/apache/sqoop/integration/repository/postgresql/TestSubmissionHandling.java index 3bce8068..ffdf8e12 100644 --- a/repository/repository-postgresql/src/test/java/org/apache/sqoop/integration/repository/postgresql/TestSubmissionHandling.java +++ b/repository/repository-postgresql/src/test/java/org/apache/sqoop/integration/repository/postgresql/TestSubmissionHandling.java @@ -17,6 +17,8 @@ */ package org.apache.sqoop.integration.repository.postgresql; +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.common.test.db.TableName; import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MJob; @@ -139,6 +141,16 @@ public void testCreateSubmission() throws Exception { counters.addCounterGroup(firstGroup); counters.addCounterGroup(secondGroup); + MutableContext fromContext = new MutableMapContext(); + MutableContext toContext = new MutableMapContext(); + MutableContext driverContext = new MutableMapContext(); + fromContext.setString("from1", "value1"); + fromContext.setString("from2", "value2"); + toContext.setString("to1", "value1"); + toContext.setString("to2", "value2"); + driverContext.setString("driver1", "value1"); + driverContext.setString("driver2", "value2"); + MSubmission submission = new MSubmission(); submission.setJobId(1); submission.setStatus(SubmissionStatus.RUNNING); @@ -149,6 +161,9 @@ public void testCreateSubmission() throws Exception { submission.getError().setErrorSummary("RuntimeException"); submission.getError().setErrorDetails("Yeah it happens"); submission.setCounters(counters); + submission.setFromConnectorContext(fromContext); + submission.setToConnectorContext(toContext); + submission.setDriverContext(driverContext); handler.createSubmission(submission, provider.getConnection()); @@ -198,6 +213,16 @@ public void testCreateSubmission() throws Exception { assertNotNull(counter); assertEquals(400, counter.getValue()); + assertNotNull(submission.getFromConnectorContext()); + assertNotNull(submission.getToConnectorContext()); + assertNotNull(submission.getDriverContext()); + assertEquals(submission.getFromConnectorContext().getString("from1"), "value1"); + assertEquals(submission.getFromConnectorContext().getString("from2"), "value2"); + assertEquals(submission.getToConnectorContext().getString("to1"), "value1"); + assertEquals(submission.getToConnectorContext().getString("to2"), "value2"); + assertEquals(submission.getDriverContext().getString("driver1"), "value1"); + assertEquals(submission.getDriverContext().getString("driver2"), "value2"); + // Let's create second (simpler) connection submission = new MSubmission(1, new Date(), SubmissionStatus.SUCCEEDED, "job-x"); handler.createSubmission(submission, provider.getConnection());