5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-13 07:21:08 +08:00

SQOOP-2299: Sqoop2: Store context classes in repository

(Jarek Jarcec Cecho via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2015-04-14 23:07:42 -07:00
parent bfc10914c1
commit d5584f27aa
15 changed files with 538 additions and 7 deletions

View File

@ -19,6 +19,7 @@
import org.apache.sqoop.classification.InterfaceAudience;
import org.apache.sqoop.classification.InterfaceStability;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.submission.counter.Counters;
@ -106,6 +107,16 @@ public class MSubmission extends MAccountableEntity {
*/
Schema toSchema;
/**
* Context objects that are associated with the job.
*
* Please note that we are not sending those to client as they can potentially contain
* sensitive information.
*/
ImmutableContext fromConnectorContext;
ImmutableContext toConnectorContext;
ImmutableContext driverContext;
public MSubmission() {
status = SubmissionStatus.UNKNOWN;
progress = -1;
@ -187,6 +198,30 @@ public void setError(SubmissionError error) {
this.error = error;
}
public ImmutableContext getFromConnectorContext() {
return fromConnectorContext;
}
public void setFromConnectorContext(ImmutableContext fromConnectorContext) {
this.fromConnectorContext = fromConnectorContext;
}
public ImmutableContext getToConnectorContext() {
return toConnectorContext;
}
public void setToConnectorContext(ImmutableContext toConnectorContext) {
this.toConnectorContext = toConnectorContext;
}
public ImmutableContext getDriverContext() {
return driverContext;
}
public void setDriverContext(ImmutableContext driverContext) {
this.driverContext = driverContext;
}
public SubmissionError getError() {
if(this.error == null) {
this.error = new SubmissionError();

View File

@ -340,6 +340,11 @@ private JobRequest createJobRequest(long jobId, MSubmission submission) {
jobRequest.setConnector(Direction.FROM, fromConnector);
jobRequest.setConnector(Direction.TO, toConnector);
// We also have to store the JobRequest's context pointers to the associated Submission
submission.setFromConnectorContext(jobRequest.getConnectorContext(Direction.FROM));
submission.setToConnectorContext(jobRequest.getConnectorContext(Direction.TO));
submission.setDriverContext(jobRequest.getDriverContext());
jobRequest.setConnectorLinkConfig(Direction.FROM, fromLinkConfig);
jobRequest.setConnectorLinkConfig(Direction.TO, toLinkConfig);

View File

@ -21,6 +21,9 @@
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.DirectionError;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.common.SupportedDirections;
import org.apache.sqoop.driver.Driver;
@ -970,6 +973,10 @@ public void createSubmission(MSubmission submission, Connection conn) {
createSubmissionCounters(submissionId, submission.getCounters(), conn);
}
createSubmissionContext(submissionId, submission.getFromConnectorContext(), ContextType.FROM, conn);
createSubmissionContext(submissionId, submission.getToConnectorContext(), ContextType.TO, conn);
createSubmissionContext(submissionId, submission.getDriverContext(), ContextType.DRIVER, conn);
// Save created persistence id
submission.setPersistenceId(submissionId);
@ -1035,6 +1042,8 @@ public void updateSubmission(MSubmission submission, Connection conn) {
createSubmissionCounters(submission.getPersistenceId(), submission.getCounters(), conn);
}
// We are not updating contexts as they are immutable once the submission is created
} catch (SQLException ex) {
logException(ex, submission);
throw new SqoopException(CommonRepositoryError.COMMON_0032, ex);
@ -1307,6 +1316,98 @@ private long insertAndGetDriverId(MDriver mDriver, Connection conn) {
}
}
private void createSubmissionContext(long submissionId, ImmutableContext context, ContextType contextType, Connection conn) throws SQLException {
PreparedStatement stmt = null;
if(context == null) {
return;
}
try {
stmt = conn.prepareStatement(crudQueries.getStmtInsertContext());
long contextTypeId = getContextType(contextType, conn);
for(Map.Entry<String, String> entry: context) {
long propertyId = getContextProperty(entry.getKey(), conn);
stmt.setLong(1, submissionId);
stmt.setLong(2, contextTypeId);
stmt.setLong(3, propertyId);
stmt.setString(4, entry.getValue());
stmt.executeUpdate();
}
} finally {
closeStatements(stmt);
}
}
private long getContextType(ContextType type, Connection conn) throws SQLException {
PreparedStatement select = null;
PreparedStatement insert = null;
ResultSet rsSelect = null;
ResultSet rsInsert = null;
try {
select = conn.prepareStatement(crudQueries.getStmtSelectContextType());
select.setString(1, type.toString());
rsSelect = select.executeQuery();
if(rsSelect.next()) {
return rsSelect.getLong(1);
}
insert = conn.prepareStatement(crudQueries.getStmtInsertContextType(), Statement.RETURN_GENERATED_KEYS);
insert.setString(1, type.toString());
insert.executeUpdate();
rsInsert = insert.getGeneratedKeys();
if (!rsInsert.next()) {
throw new SqoopException(CommonRepositoryError.COMMON_0010);
}
return rsInsert.getLong(1);
} finally {
closeResultSets(rsSelect, rsInsert);
closeStatements(select, insert);
}
}
private long getContextProperty(String property, Connection conn) throws SQLException {
PreparedStatement select = null;
PreparedStatement insert = null;
ResultSet rsSelect = null;
ResultSet rsInsert = null;
try {
select = conn.prepareStatement(crudQueries.getStmtSelectContextProperty());
select.setString(1, property);
rsSelect = select.executeQuery();
if(rsSelect.next()) {
return rsSelect.getLong(1);
}
insert = conn.prepareStatement(crudQueries.getStmtInsertContextProperty(), Statement.RETURN_GENERATED_KEYS);
insert.setString(1, property);
insert.executeUpdate();
rsInsert = insert.getGeneratedKeys();
if (!rsInsert.next()) {
throw new SqoopException(CommonRepositoryError.COMMON_0010);
}
return rsInsert.getLong(1);
} finally {
closeResultSets(rsSelect, rsInsert);
closeStatements(select, insert);
}
}
/**
* Stores counters for given submission in repository.
*
@ -1449,9 +1550,38 @@ private MSubmission loadSubmission(ResultSet rs, Connection conn) throws SQLExce
Counters counters = loadCountersSubmission(rs.getLong(1), conn);
submission.setCounters(counters);
submission.setFromConnectorContext(loadContextSubmission(rs.getLong(1), ContextType.FROM, conn));
submission.setToConnectorContext(loadContextSubmission(rs.getLong(1), ContextType.TO, conn));
submission.setDriverContext(loadContextSubmission(rs.getLong(1), ContextType.DRIVER, conn));
return submission;
}
private MutableContext loadContextSubmission(long submissionId, ContextType type, Connection conn) throws SQLException {
PreparedStatement stmt = null;
ResultSet rs = null;
try {
stmt = conn.prepareStatement(crudQueries.getStmtSelectContext());
stmt.setLong(1, submissionId);
stmt.setLong(2, getContextType(type, conn));
rs = stmt.executeQuery();
MutableContext context = new MutableMapContext();
while (rs.next()) {
String key = rs.getString(1);
String value = rs.getString(2);
context.setString(key, value);
}
return context;
} finally {
closeStatements(stmt);
closeResultSets(rs);
}
}
private Counters loadCountersSubmission(long submissionId, Connection conn) throws SQLException {
PreparedStatement stmt = null;
ResultSet rs = null;
@ -2486,6 +2616,7 @@ protected void runQuery(String query, Connection conn, Object... args) {
LOG.info("QUERY(" + query + ") Update count: " + updateCount);
}
} catch (SQLException ex) {
LOG.error("Can't execute query: " + query, ex);
throw new SqoopException(CommonRepositoryError.COMMON_0000, query, ex);
} finally {
closeStatements(stmt);

View File

@ -549,6 +549,54 @@ public class CommonRepositoryInsertUpdateDeleteSelectQuery {
+ " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQS_JOB) + " = ?"
+ " ORDER BY " + CommonRepoUtils.escapeColumnName(COLUMN_SQS_UPDATE_DATE) + " DESC";
// DML: Select context type
public static final String STMT_SELECT_CONTEXT_TYPE =
"SELECT "
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCT_ID) + ", "
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCT_NAME) + " "
+ "FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT_TYPE) + " "
+ "WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQCT_NAME) + " = substr(?, 1, 25)";
// DML: Insert new context type
public static final String STMT_INSERT_CONTEXT_TYPE =
"INSERT INTO " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT_TYPE) + " ("
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCT_NAME) + ") "
+ "VALUES (substr(?, 1, 25))";
// DML: Select context property
public static final String STMT_SELECT_CONTEXT_PROPERTY =
"SELECT "
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCP_ID) + ", "
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCP_NAME) + " "
+ "FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT_PROPERTY) + " "
+ "WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQCP_NAME) + " = substr(?, 1, 500)";
// DML: Insert new context property
public static final String STMT_INSERT_CONTEXT_PROPERTY =
"INSERT INTO " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT_PROPERTY) + " ("
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCP_NAME) + ") "
+ "VALUES (substr(?, 1, 500))";
// DML: Insert new context
public static final String STMT_INSERT_CONTEXT =
"INSERT INTO " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT) + " ("
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCO_SUBMISSION) + ", "
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCO_TYPE) + ", "
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCO_PROPERTY) + ", "
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCO_VALUE) + ") "
+ "VALUES (?, ?, ?, substr(?, 1, 500))";
// DML: Select context
public static final String STMT_SELECT_CONTEXT =
"SELECT "
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCP_NAME) + ", "
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCO_VALUE) + " "
+ "FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT) + " "
+ "LEFT JOIN " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT_PROPERTY)
+ " ON " + CommonRepoUtils.escapeColumnName(COLUMN_SQCP_ID) + " = " + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_PROPERTY) + " "
+ "WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_SUBMISSION) + " = ? "
+ " AND " + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_TYPE) + " = ? ";
// DML: Select counter group
public static final String STMT_SELECT_COUNTER_GROUP =
"SELECT "
@ -820,6 +868,30 @@ public String getStmtSelectSubmissionsForJob() {
return STMT_SELECT_SUBMISSIONS_FOR_JOB;
}
public String getStmtSelectContextType() {
return STMT_SELECT_CONTEXT_TYPE;
}
public String getStmtInsertContextType() {
return STMT_INSERT_CONTEXT_TYPE;
}
public String getStmtSelectContextProperty() {
return STMT_SELECT_CONTEXT_PROPERTY;
}
public String getStmtInsertContextProperty() {
return STMT_INSERT_CONTEXT_PROPERTY;
}
public String getStmtInsertContext() {
return STMT_INSERT_CONTEXT;
}
public String getStmtSelectContext() {
return STMT_SELECT_CONTEXT;
}
public String getStmtSelectCounterGroup() {
return STMT_SELECT_COUNTER_GROUP;
}

View File

@ -243,7 +243,35 @@ public final class CommonRepositorySchemaConstants {
public static final String COLUMN_SQRS_VALUE = "SQRS_VALUE";
// Constraints
// SQ_CONTEXT_TYPE
public static final String TABLE_SQ_CONTEXT_TYPE = "SQ_CONTEXT_TYPE";
public static final String COLUMN_SQCT_ID = "SQCT_ID";
public static final String COLUMN_SQCT_NAME = "SQCT_NAME";
// SQ_CONTEXT_PROPERTY
public static final String TABLE_SQ_CONTEXT_PROPERTY = "SQ_CONTEXT_PROPERTY";
public static final String COLUMN_SQCP_ID = "SQCP_ID";
public static final String COLUMN_SQCP_NAME = "SQCP_NAME";
// SQ_CONTEXT
public static final String TABLE_SQ_CONTEXT = "SQ_CONTEXT";
public static final String COLUMN_SQCO_ID = "SQCO_ID";
public static final String COLUMN_SQCO_SUBMISSION = "SQCO_SUBMISSION";
public static final String COLUMN_SQCO_TYPE = "SQCO_TYPE";
public static final String COLUMN_SQCO_PROPERTY = "SQCO_PROPERTY";
public static final String COLUMN_SQCO_VALUE = "SQCO_VALUE";
// Constraints
@ -319,6 +347,15 @@ public final class CommonRepositorySchemaConstants {
public static final String CONSTRAINT_SQRS_SQS_NAME
= CONSTRAINT_PREFIX + "SQRS_SQS";
public static final String CONSTRAINT_SQCO_SQS_ID
= CONSTRAINT_PREFIX + "SQCO_SQS_ID";
public static final String CONSTRAINT_SQCO_SQCT_ID
= CONSTRAINT_PREFIX + "SQCO_SQCT_ID";
public static final String CONSTRAINT_SQCO_SQCP_ID
= CONSTRAINT_PREFIX + "SQCO_SQCP_ID";
private CommonRepositorySchemaConstants() {
// Disable explicit object creation
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.repository.common;
/**
* Context types that can be stored in the repository
*/
public enum ContextType {
FROM,
TO,
DRIVER,
}

View File

@ -44,8 +44,10 @@ public final class DerbyRepoConstants {
* Changed to FROM/TO design.
* 5 - Version 1.99.5
* 6 - Version 1.99.6
* 7 - Version 1.99.7
* Stored context classes in SQ_CONTEXT tables
*/
public static final int LATEST_DERBY_REPOSITORY_VERSION = 6;
public static final int LATEST_DERBY_REPOSITORY_VERSION = 7;
private DerbyRepoConstants() {
// Disable explicit object creation

View File

@ -315,6 +315,12 @@ public void createOrUpgradeRepository(Connection conn) {
if (repositoryVersion < 6) {
runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_INPUT_CONSTRAINT_2, conn);
}
// 1.99.7 release changes
if (repositoryVersion < 7) {
runQuery(QUERY_CREATE_TABLE_SQ_CONTEXT_TYPE, conn);
runQuery(QUERY_CREATE_TABLE_SQ_CONTEXT_PROPERTY, conn);
runQuery(QUERY_CREATE_TABLE_SQ_CONTEXT, conn);
}
// last step upgrade the repository version to the latest value in the code
upgradeRepositoryVersion(conn);

View File

@ -269,6 +269,47 @@
* </pre>
*
* </p>
* <p>
* <strong>SQ_CONTEXT_TYPE</strong>: Type of stored context
*
* <pre>
* +----------------------------+
* | SQ_CONTEXT_TYPE |
* +----------------------------+
* | SQCT_ID: BIGINT PK |
* | SQCT_NAME: VARCHAR(25) |
* +----------------------------+
* </pre>
*
* </p>
* <p>
* <strong>SQ_CONTEXT_PROPERTY</strong>: Names (keys) of stored properties
* (We're assuming that they will repeat a lot in various submissions)
*
* <pre>
* +----------------------------+
* | SQ_CONTEXT_PROPERTY |
* +----------------------------+
* | SQCP_ID: BIGINT PK |
* | SQCP_NAME: VARCHAR(500) |
* +----------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_CONTEXT</strong>: Context instances for each submission
*
* <pre>
* +----------------------------+
* | SQ_CONTEXT |
* +----------------------------+
* | SQCO_ID: BIGINT PK |
* | SQCO_SUBMISSION: BIGINT | FK SQ_JOB_SUBMISSION(SQS_ID)
* | SQCO_TYPE: BIGINT | FK SQ_CONTEXT_TYPE(SQCT_ID)
* | SQCO_PROPERTY: BIGINT | FK SQ_CONTEXT_PROPERTY(SQCP_ID)
* | SQCO_VALUE: VARCHAR(500) |
* +----------------------------+
* </pre>
* </p>
*/
// NOTE: If you have signed yourself to modify the schema for the repository
@ -508,6 +549,44 @@ public final class DerbySchemaCreateQuery {
+ "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_SUBMISSION_NAME) + "(" + CommonRepoUtils.escapeColumnName(COLUMN_SQS_ID) + ") ON DELETE CASCADE "
+ ")";
// DDL: Create table SQ_CONTEXT_TYPE
public static final String QUERY_CREATE_TABLE_SQ_CONTEXT_TYPE =
"CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT_TYPE) + " ("
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCT_ID) + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCT_NAME) + " VARCHAR(25), "
+ "PRIMARY KEY (" + CommonRepoUtils.escapeColumnName(COLUMN_SQCT_ID) + "), "
+ "UNIQUE ( " + CommonRepoUtils.escapeColumnName(COLUMN_SQCT_NAME) + ")"
+ ")";
// DDL: Create table SQ_CONTEXT_PROPERTY
public static final String QUERY_CREATE_TABLE_SQ_CONTEXT_PROPERTY =
"CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT_PROPERTY) + " ("
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCP_ID) + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCP_NAME) + " VARCHAR(500), "
+ "PRIMARY KEY (" + CommonRepoUtils.escapeColumnName(COLUMN_SQCP_ID) + "), "
+ "UNIQUE ( " + CommonRepoUtils.escapeColumnName(COLUMN_SQCP_NAME) + ")"
+ ")";
// DDL: Create table SQ_CONTEXT
public static final String QUERY_CREATE_TABLE_SQ_CONTEXT =
"CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT) + " ("
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCO_ID) + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCO_SUBMISSION) + " BIGINT, "
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCO_TYPE) + " BIGINT, "
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCO_PROPERTY) + " BIGINT, "
+ CommonRepoUtils.escapeColumnName(COLUMN_SQCO_VALUE) + " VARCHAR(500), "
+ "PRIMARY KEY (" + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_ID) + "), "
+ "CONSTRAINT " + CommonRepoUtils.getConstraintName(SCHEMA_SQOOP, CONSTRAINT_SQCO_SQS_ID) + " "
+ "FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_SUBMISSION) + ") "
+ "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_SUBMISSION_NAME) + "(" + CommonRepoUtils.escapeColumnName(COLUMN_SQS_ID) + ") ON DELETE CASCADE, "
+ "CONSTRAINT " + CommonRepoUtils.getConstraintName(SCHEMA_SQOOP, CONSTRAINT_SQCO_SQCT_ID) + " "
+ "FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_TYPE) + ") "
+ "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT_TYPE) + "(" + CommonRepoUtils.escapeColumnName(COLUMN_SQCT_ID) + "), "
+ "CONSTRAINT " + CommonRepoUtils.getConstraintName(SCHEMA_SQOOP, CONSTRAINT_SQCO_SQCP_ID) + " "
+ "FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(COLUMN_SQCO_PROPERTY) + ") "
+ "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONTEXT_PROPERTY) + "(" + CommonRepoUtils.escapeColumnName(COLUMN_SQCP_ID) + ") "
+ ")";
private DerbySchemaCreateQuery() {
}

View File

@ -221,7 +221,12 @@ protected void createOrUpgradeSchema(int version) throws Exception {
runQuery(QUERY_UPGRADE_TABLE_SQ_INPUT_ADD_COLUMN_SQI_EDITABLE);
runQuery(QUERY_CREATE_TABLE_SQ_INPUT_RELATION);
runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_INPUT_CONSTRAINT_2);
}
if (version >= 7) {
runQuery(QUERY_CREATE_TABLE_SQ_CONTEXT_TYPE);
runQuery(QUERY_CREATE_TABLE_SQ_CONTEXT_PROPERTY);
runQuery(QUERY_CREATE_TABLE_SQ_CONTEXT);
}
// deprecated repository version
@ -559,6 +564,7 @@ protected void loadConnectorAndDriverConfig(int version) throws Exception {
case 4:
case 5:
case 6:
case 7:
loadConnectorAndDriverConfigVersion4();
break;
@ -597,6 +603,7 @@ public void loadConnectionsOrLinks(int version) throws Exception {
case 4:
case 5:
case 6:
case 7:
// Insert two links - CA and CB
// Connector 1 has one link config
runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONFIGURABLE) " + "VALUES('CA', 1)");
@ -658,6 +665,7 @@ public void loadJobs(int version) throws Exception {
case 4:
case 5:
case 6:
case 7:
for (String name : new String[] { "JA", "JB", "JC", "JD" }) {
runQuery("INSERT INTO SQOOP.SQ_JOB(SQB_NAME, SQB_FROM_LINK, SQB_TO_LINK)" + " VALUES('"
+ name + index + "', 1, 1)");

View File

@ -17,6 +17,8 @@
*/
package org.apache.sqoop.repository.derby;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.submission.counter.Counter;
@ -106,6 +108,16 @@ public void testCreateSubmission() throws Exception {
counters.addCounterGroup(firstGroup);
counters.addCounterGroup(secondGroup);
MutableContext fromContext = new MutableMapContext();
MutableContext toContext = new MutableMapContext();
MutableContext driverContext = new MutableMapContext();
fromContext.setString("from1", "value1");
fromContext.setString("from2", "value2");
toContext.setString("to1", "value1");
toContext.setString("to2", "value2");
driverContext.setString("driver1", "value1");
driverContext.setString("driver2", "value2");
MSubmission submission = new MSubmission();
submission.setJobId(1);
submission.setStatus(SubmissionStatus.RUNNING);
@ -116,6 +128,9 @@ public void testCreateSubmission() throws Exception {
submission.getError().setErrorSummary("RuntimeException");
submission.getError().setErrorDetails("Yeah it happens");
submission.setCounters(counters);
submission.setFromConnectorContext(fromContext);
submission.setToConnectorContext(toContext);
submission.setDriverContext(driverContext);
handler.createSubmission(submission, getDerbyDatabaseConnection());
@ -164,6 +179,16 @@ public void testCreateSubmission() throws Exception {
assertNotNull(counter);
assertEquals(400, counter.getValue());
assertNotNull(submission.getFromConnectorContext());
assertNotNull(submission.getToConnectorContext());
assertNotNull(submission.getDriverContext());
assertEquals(submission.getFromConnectorContext().getString("from1"), "value1");
assertEquals(submission.getFromConnectorContext().getString("from2"), "value2");
assertEquals(submission.getToConnectorContext().getString("to1"), "value1");
assertEquals(submission.getToConnectorContext().getString("to2"), "value2");
assertEquals(submission.getDriverContext().getString("driver1"), "value1");
assertEquals(submission.getDriverContext().getString("driver2"), "value2");
// Let's create second (simpler) connection
submission = new MSubmission(1, new Date(), SubmissionStatus.SUCCEEDED, "job-x");
handler.createSubmission(submission, getDerbyDatabaseConnection());

View File

@ -24,8 +24,10 @@ public class PostgresqlRepoConstants {
*
* History:
* 1 - Version 1.99.5
* 2 - Version 1.99.7
* Stored context classes in SQ_CONTEXT tables
*/
public static final int LATEST_POSTGRESQL_REPOSITORY_VERSION = 1;
public static final int LATEST_POSTGRESQL_REPOSITORY_VERSION = 2;
private PostgresqlRepoConstants() {
// Disable explicit object creation

View File

@ -110,12 +110,13 @@ public int detectRepositoryVersion(Connection conn) {
@Override
public void createOrUpgradeRepository(Connection conn) {
int version = detectRepositoryVersion(conn);
LOG.info("Detected repository version: " + version);
if (version == PostgresqlRepoConstants.LATEST_POSTGRESQL_REPOSITORY_VERSION) {
return;
}
if (version == 0) {
if (version < 1) {
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_SCHEMA_SQOOP, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONFIGURABLE, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONFIG, conn);
@ -136,9 +137,11 @@ public void createOrUpgradeRepository(Connection conn) {
// Insert FROM and TO directions.
insertDirections(conn);
} else if (version < 4) {
LOG.error("Found unknown version for PostgreSQL repository: " + version);
throw new SqoopException(PostgresqlRepoError.POSTGRESQLREPO_0005, "Found version: " + version);
}
if (version < 2) {
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONTEXT_TYPE, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONTEXT_PROPERTY, conn);
runQuery(PostgresqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONTEXT, conn);
}
ResultSet rs = null;

View File

@ -246,6 +246,47 @@
* +----------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_CONTEXT_TYPE</strong>: Type of stored context
*
* <pre>
* +----------------------------+
* | SQ_CONTEXT_TYPE |
* +----------------------------+
* | SQCT_ID: BIGINT PK |
* | SQCT_NAME: VARCHAR(25) |
* +----------------------------+
* </pre>
*
* </p>
* <p>
* <strong>SQ_CONTEXT_PROPERTY</strong>: Names (keys) of stored properties
* (We're assuming that they will repeat a lot in various submissions)
*
* <pre>
* +----------------------------+
* | SQ_CONTEXT_PROPERTY |
* +----------------------------+
* | SQCP_ID: BIGINT PK |
* | SQCP_NAME: VARCHAR(500) |
* +----------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_CONTEXT</strong>: Context instances for each submission
*
* <pre>
* +----------------------------+
* | SQ_CONTEXT |
* +----------------------------+
* | SQCO_ID: BIGINT PK |
* | SQCO_SUBMISSION: BIGINT | FK SQ_JOB_SUBMISSION(SQS_ID)
* | SQCO_TYPE: BIGINT | FK SQ_CONTEXT_TYPE(SQCT_ID)
* | SQCO_PROPERTY: BIGINT | FK SQ_CONTEXT_PROPERTY(SQCP_ID)
* | SQCO_VALUE: VARCHAR(500) |
* +----------------------------+
* </pre>
* </p>
*/
public class PostgresqlSchemaCreateQuery {
@ -460,6 +501,39 @@ public class PostgresqlSchemaCreateQuery {
+ CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_SUBMISSION_NAME) + "(" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_ID) + ") ON DELETE CASCADE"
+ ")";
// DDL: Create table SQ_CONTEXT_TYPE
public static final String QUERY_CREATE_TABLE_SQ_CONTEXT_TYPE =
"CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT_TYPE) + " ("
+ CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCT_ID) + " BIGSERIAL PRIMARY KEY NOT NULL, "
+ CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCT_NAME) + " VARCHAR(25) UNIQUE"
+ ")";
// DDL: Create table SQ_CONTEXT_PROPERTY
public static final String QUERY_CREATE_TABLE_SQ_CONTEXT_PROPERTY =
"CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT_PROPERTY) + " ("
+ CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCP_ID) + " BIGSERIAL PRIMARY KEY NOT NULL, "
+ CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCP_NAME) + " VARCHAR(500) UNIQUE"
+ ")";
// DDL: Create table SQ_CONTEXT
public static final String QUERY_CREATE_TABLE_SQ_CONTEXT =
"CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT) + " ("
+ CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_ID) + " BIGSERIAL PRIMARY KEY NOT NULL, "
+ CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_SUBMISSION) + " BIGINT, "
+ CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_TYPE) + " BIGINT, "
+ CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_PROPERTY) + " BIGINT, "
+ CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_VALUE) + " VARCHAR(500), "
+ "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQCO_SQS_ID) + " "
+ "FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_SUBMISSION) + ") "
+ "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_SUBMISSION_NAME) + "(" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_ID) + ") ON DELETE CASCADE, "
+ "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQCO_SQCT_ID) + " "
+ "FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_TYPE) + ") "
+ "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT_TYPE) + "(" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCT_ID) + "), "
+ "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQCO_SQCP_ID) + " "
+ "FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_PROPERTY) + ") "
+ "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT_PROPERTY) + "(" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCP_ID) + ") "
+ ")";
private PostgresqlSchemaCreateQuery() {
// Disable explicit object creation
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.sqoop.integration.repository.postgresql;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.common.test.db.TableName;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MJob;
@ -139,6 +141,16 @@ public void testCreateSubmission() throws Exception {
counters.addCounterGroup(firstGroup);
counters.addCounterGroup(secondGroup);
MutableContext fromContext = new MutableMapContext();
MutableContext toContext = new MutableMapContext();
MutableContext driverContext = new MutableMapContext();
fromContext.setString("from1", "value1");
fromContext.setString("from2", "value2");
toContext.setString("to1", "value1");
toContext.setString("to2", "value2");
driverContext.setString("driver1", "value1");
driverContext.setString("driver2", "value2");
MSubmission submission = new MSubmission();
submission.setJobId(1);
submission.setStatus(SubmissionStatus.RUNNING);
@ -149,6 +161,9 @@ public void testCreateSubmission() throws Exception {
submission.getError().setErrorSummary("RuntimeException");
submission.getError().setErrorDetails("Yeah it happens");
submission.setCounters(counters);
submission.setFromConnectorContext(fromContext);
submission.setToConnectorContext(toContext);
submission.setDriverContext(driverContext);
handler.createSubmission(submission, provider.getConnection());
@ -198,6 +213,16 @@ public void testCreateSubmission() throws Exception {
assertNotNull(counter);
assertEquals(400, counter.getValue());
assertNotNull(submission.getFromConnectorContext());
assertNotNull(submission.getToConnectorContext());
assertNotNull(submission.getDriverContext());
assertEquals(submission.getFromConnectorContext().getString("from1"), "value1");
assertEquals(submission.getFromConnectorContext().getString("from2"), "value2");
assertEquals(submission.getToConnectorContext().getString("to1"), "value1");
assertEquals(submission.getToConnectorContext().getString("to2"), "value2");
assertEquals(submission.getDriverContext().getString("driver1"), "value1");
assertEquals(submission.getDriverContext().getString("driver2"), "value2");
// Let's create second (simpler) connection
submission = new MSubmission(1, new Date(), SubmissionStatus.SUCCEEDED, "job-x");
handler.createSubmission(submission, provider.getConnection());