5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 05:51:17 +08:00

SQOOP-659: Design metadata upgrade procedure

(Hari Shreedharan via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2013-04-18 21:15:28 -07:00
parent a69b1cc663
commit c4467c6770
12 changed files with 843 additions and 109 deletions

View File

@ -57,6 +57,18 @@ public long getConnectorId() {
return connectorId;
}
public void setConnectorPart(MConnectionForms connectorPart) {
this.connectorPart = connectorPart;
}
public void setFrameworkPart(MConnectionForms frameworkPart) {
this.frameworkPart = frameworkPart;
}
public void setConnectorId(long connectorId) {
this.connectorId = connectorId;
}
public MConnectionForms getConnectorPart() {
return connectorPart;
}

View File

@ -98,10 +98,18 @@ public long getConnectionId() {
return connectionId;
}
public void setConnectionId(long connectionId) {
this.connectionId = connectionId;
}
public long getConnectorId() {
return connectorId;
}
public void setConnectorId(long connectorId) {
this.connectorId = connectorId;
}
public MJobForms getConnectorPart() {
return connectorPart;
}
@ -110,6 +118,15 @@ public MJobForms getFrameworkPart() {
return frameworkPart;
}
public void setConnectorPart(MJobForms connectorPart) {
this.connectorPart = connectorPart;
}
public void setFrameworkPart(MJobForms frameworkPart) {
this.frameworkPart = frameworkPart;
}
public Type getType() {
return type;
}

View File

@ -24,6 +24,7 @@
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
import org.apache.sqoop.connector.spi.MetadataUpgrader;
import org.apache.sqoop.job.etl.Exporter;
import org.apache.sqoop.job.etl.Importer;
import org.apache.sqoop.connector.spi.SqoopConnector;
@ -94,4 +95,9 @@ public Validator getValidator() {
return new GenericJdbcValidator();
}
@Override
public MetadataUpgrader getMetadataUpgrader() {
return new GenericJdbcConnectorMetadataUpgrader();
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.connector.jdbc;
import org.apache.sqoop.connector.spi.MetadataUpgrader;
import org.apache.sqoop.model.MConnectionForms;
import org.apache.sqoop.model.MForm;
import org.apache.sqoop.model.MInput;
import org.apache.sqoop.model.MJobForms;
import org.apache.sqoop.validation.Status;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class GenericJdbcConnectorMetadataUpgrader extends MetadataUpgrader {
/*
* For now, there is no real upgrade. So copy all data over,
* set the validation messages and error messages to be the same as for the
* inputs in the original one.
*/
@Override
public void upgrade(MConnectionForms original,
MConnectionForms upgradeTarget) {
doUpgrade(original.getForms(), upgradeTarget.getForms());
}
@Override
public void upgrade(MJobForms original, MJobForms upgradeTarget) {
doUpgrade(original.getForms(), upgradeTarget.getForms());
}
@SuppressWarnings("unchecked")
private void doUpgrade(List<MForm> original, List<MForm> target) {
// Easier to find the form in the original forms list if we use a map.
// Since the constructor of MJobForms takes a list,
// index is not guaranteed to be the same, so we need to look for
// equivalence
Map<String, MForm> formMap = new HashMap<String, MForm>();
for (MForm form : original) {
formMap.put(form.getName(), form);
}
for (MForm form : target) {
List<MInput<?>> inputs = form.getInputs();
MForm originalForm = formMap.get(form.getName());
for (MInput input : inputs) {
MInput originalInput = originalForm.getInput(input.getName());
input.setValue(originalInput.getValue());
}
}
}
}

View File

@ -29,7 +29,7 @@
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MSubmission;
public class JdbcRepository implements Repository {
public class JdbcRepository extends Repository {
private static final Logger LOG =
Logger.getLogger(JdbcRepository.class);
@ -58,27 +58,42 @@ private interface DoWithConnection {
Object doIt(Connection conn) throws Exception;
}
private Object doWithConnection(DoWithConnection delegator) {
return doWithConnection(delegator, null);
}
/**
* Handle transaction and connection functionality and delegate action to
* given delegator.
*
* @param delegator Code for specific action
* @param tx The transaction to use for the operation. If a transaction is
* specified, this method will not commit, rollback or close it.
* If null, a new transaction will be created - which will be
* committed/closed/rolled back.
* @return Arbitrary value
*/
private Object doWithConnection(DoWithConnection delegator) {
JdbcRepositoryTransaction tx = null;
private Object doWithConnection(DoWithConnection delegator,
JdbcRepositoryTransaction tx) {
boolean shouldCloseTxn = false;
try {
// Get transaction and connection
tx = getTransaction();
tx.begin();
Connection conn = tx.getConnection();
Connection conn;
if (tx == null) {
tx = getTransaction();
shouldCloseTxn = true;
tx.begin();
}
conn = tx.getConnection();
// Delegate the functionality to our delegator
Object returnValue = delegator.doIt(conn);
// Commit transaction
tx.commit();
if (shouldCloseTxn) {
// Commit transaction
tx.commit();
}
// Return value that the underlying code needs to return
return returnValue;
@ -86,12 +101,12 @@ private Object doWithConnection(DoWithConnection delegator) {
} catch (SqoopException ex) {
throw ex;
} catch (Exception ex) {
if (tx != null) {
if (tx != null && shouldCloseTxn) {
tx.rollback();
}
throw new SqoopException(RepositoryError.JDBCREPO_0012, ex);
} finally {
if (tx != null) {
if (tx != null && shouldCloseTxn) {
tx.close();
}
}
@ -121,11 +136,20 @@ public Object doIt(Connection conn) throws Exception {
handler.registerConnector(mConnector, conn);
return mConnector;
} else {
// Same connector, check if the version is the same.
// For now, use the "string" versions itself - later we should
// probably include a build number or something that is
// monotonically increasing.
if (result.getUniqueName().equals(mConnector.getUniqueName()) &&
mConnector.getVersion().compareTo(result.getVersion()) > 0) {
upgradeConnector(result, mConnector);
return mConnector;
}
if (!result.equals(mConnector)) {
throw new SqoopException(RepositoryError.JDBCREPO_0013,
"Connector: " + mConnector.getUniqueName()
+ " given: " + mConnector
+ " found: " + result);
+ " given: " + mConnector
+ " found: " + result);
}
return result;
}
@ -133,6 +157,19 @@ public Object doIt(Connection conn) throws Exception {
});
}
/**
* {@inheritDoc}
*/
@Override
public MConnector findConnector(final String shortName) {
return (MConnector) doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) throws Exception {
return handler.findConnector(shortName, conn);
}
});
}
/**
* {@inheritDoc}
*/
@ -179,6 +216,15 @@ public Object doIt(Connection conn) {
*/
@Override
public void updateConnection(final MConnection connection) {
updateConnection(connection, null);
}
/**
* {@inheritDoc}
*/
@Override
public void updateConnection(final MConnection connection,
RepositoryTransaction tx) {
doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) {
@ -193,7 +239,7 @@ public Object doIt(Connection conn) {
handler.updateConnection(connection, conn);
return null;
}
});
}, (JdbcRepositoryTransaction) tx);
}
/**
@ -269,6 +315,14 @@ public Object doIt(Connection conn) {
*/
@Override
public void updateJob(final MJob job) {
updateJob(job, null);
}
/**
* {@inheritDoc}
*/
@Override
public void updateJob(final MJob job, RepositoryTransaction tx) {
doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) {
@ -283,7 +337,7 @@ public Object doIt(Connection conn) {
handler.updateJob(job, conn);
return null;
}
});
}, (JdbcRepositoryTransaction) tx);
}
/**
@ -420,4 +474,71 @@ public Object doIt(Connection conn) {
}
});
}
/**
* {@inheritDoc}
*/
@Override
public List<MConnection> findConnectionsForConnector(final long
connectorID) {
return (List<MConnection>) doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) throws Exception {
return handler.findConnectionsForConnector(connectorID, conn);
}
});
}
/**
* {@inheritDoc}
*/
@Override
public List<MJob> findJobsForConnector(final long connectorID) {
return (List<MJob>) doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) throws Exception {
return handler.findJobsForConnector(connectorID, conn);
}
});
}
@Override
protected void deleteJobInputs(final long jobID, RepositoryTransaction tx) {
doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) throws Exception {
handler.deleteJobInputs(jobID, conn);
return null;
}
}, (JdbcRepositoryTransaction) tx);
}
@Override
protected void deleteConnectionInputs(final long connectionID,
RepositoryTransaction tx) {
doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) throws Exception {
handler.deleteConnectionInputs(connectionID, conn);
return null;
}
}, (JdbcRepositoryTransaction) tx);
}
/**
* {@inheritDoc}
*/
@Override
protected void updateConnector(final MConnector newConnector,
RepositoryTransaction tx) {
doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) throws Exception {
handler.updateConnector(newConnector, conn);
return null;
}
}, (JdbcRepositoryTransaction) tx);
}
}

View File

@ -30,14 +30,14 @@
/**
* Set of methods required from each JDBC based repository.
*/
public interface JdbcRepositoryHandler {
public abstract class JdbcRepositoryHandler {
/**
* Initialize JDBC based repository.
*
* @param repoContext Context for this instance
*/
void initialize(JdbcRepositoryContext repoContext);
public abstract void initialize(JdbcRepositoryContext repoContext);
/**
* Search for connector with given name in repository.
@ -49,7 +49,7 @@ public interface JdbcRepositoryHandler {
* @return null if connector is not yet registered in repository or
* loaded representation.
*/
MConnector findConnector(String shortName, Connection conn);
public abstract MConnector findConnector(String shortName, Connection conn);
/**
* Register given connector in repository.
@ -60,7 +60,42 @@ public interface JdbcRepositoryHandler {
* @param mc Connector that should be registered.
* @param conn JDBC connection for querying repository.
*/
void registerConnector(MConnector mc, Connection conn);
public abstract void registerConnector(MConnector mc, Connection conn);
/**
* Retrieve connections which use the given connector.
* @param connectorID Connector ID whose connections should be fetched
* @param conn JDBC connection for querying repository
* @return List of MConnections that use <code>connectorID</code>.
*/
public abstract List<MConnection> findConnectionsForConnector(long
connectorID, Connection conn);
/**
* Retrieve jobs which use the given connection.
*
* @param connectorID Connector ID whose jobs should be fetched
* @param conn JDBC connection for querying repository
* @return List of MJobs that use <code>connectionID</code>.
*/
public abstract List<MJob> findJobsForConnector(long connectorID,
Connection conn);
/**
* Update the connector with the new data supplied in the <tt>newConnector</tt>.
* Also Update all forms associated with this connector in the repository
* with the forms specified in <tt>mConnector</tt>. <tt>mConnector </tt> must
* minimally have the connectorID and all required forms (including ones
* which may not have changed). After this operation the repository is
* guaranteed to only have the new forms specified in this object.
*
* @param mConnector The new data to be inserted into the repository for
* this connector.
* @param conn JDBC connection for querying repository
*/
public abstract void updateConnector(MConnector mConnector, Connection conn);
/**
* Search for framework metadata in the repository.
@ -69,7 +104,7 @@ public interface JdbcRepositoryHandler {
* @return null if framework metadata are not yet present in repository or
* loaded representation.
*/
MFramework findFramework(Connection conn);
public abstract MFramework findFramework(Connection conn);
/**
* Register framework metadata in repository.
@ -80,26 +115,26 @@ public interface JdbcRepositoryHandler {
* @param mf Framework metadata that should be registered.
* @param conn JDBC connection for querying repository.
*/
void registerFramework(MFramework mf, Connection conn);
public abstract void registerFramework(MFramework mf, Connection conn);
/**
* Check if schema is already present in the repository.
*
* @return true if schema is already present or false if it's not
*/
boolean schemaExists();
public abstract boolean schemaExists();
/**
* Create required schema in repository.
*/
void createSchema();
public abstract void createSchema();
/**
* Termination callback for repository.
*
* Should clean up all resources and commit all uncommitted data.
*/
void shutdown();
public abstract void shutdown();
/**
* Specify query that Sqoop framework can use to validate connection to
@ -108,7 +143,7 @@ public interface JdbcRepositoryHandler {
* @return Query or NULL in case that this repository do not support or do not
* want to validate live connections.
*/
String validationQuery();
public abstract String validationQuery();
/**
* Save given connection to repository. This connection must not be already
@ -117,7 +152,8 @@ public interface JdbcRepositoryHandler {
* @param connection Connection object to serialize into repository.
* @param conn Connection to metadata repository
*/
void createConnection(MConnection connection, Connection conn);
public abstract void createConnection(MConnection connection,
Connection conn);
/**
* Update given connection representation in repository. This connection
@ -127,7 +163,8 @@ public interface JdbcRepositoryHandler {
* @param connection Connection object that should be updated in repository.
* @param conn Connection to metadata repository
*/
void updateConnection(MConnection connection, Connection conn);
public abstract void updateConnection(MConnection connection,
Connection conn);
/**
* Check if given connection exists in metastore.
@ -136,7 +173,7 @@ public interface JdbcRepositoryHandler {
* @param conn Connection to metadata repository
* @return True if the connection exists
*/
boolean existsConnection(long connetionId, Connection conn);
public abstract boolean existsConnection(long connetionId, Connection conn);
/**
* Check if given Connection id is referenced somewhere and thus can't
@ -146,7 +183,7 @@ public interface JdbcRepositoryHandler {
* @param conn Connection to metadata repository
* @return
*/
boolean inUseConnection(long connectionId, Connection conn);
public abstract boolean inUseConnection(long connectionId, Connection conn);
/**
* Delete connection with given id from metadata repository.
@ -154,8 +191,15 @@ public interface JdbcRepositoryHandler {
* @param connectionId Connection object that should be removed from repository
* @param conn Connection to metadata repository
*/
void deleteConnection(long connectionId, Connection conn);
public abstract void deleteConnection(long connectionId, Connection conn);
/**
* Delete the input values for the connection with given id from the
* repository.
* @param id Connection object whose inputs should be removed from repository
* @param conn Connection to metadata repository
*/
public abstract void deleteConnectionInputs(long id, Connection conn);
/**
* Find connection with given id in repository.
*
@ -163,7 +207,8 @@ public interface JdbcRepositoryHandler {
* @param conn Connection to metadata repository
* @return Deserialized form of the connection that is saved in repository
*/
MConnection findConnection(long connectionId, Connection conn);
public abstract MConnection findConnection(long connectionId,
Connection conn);
/**
* Get all connection objects.
@ -171,7 +216,7 @@ public interface JdbcRepositoryHandler {
* @param conn Connection to metadata repository
* @return List will all saved connection objects
*/
List<MConnection> findConnections(Connection conn);
public abstract List<MConnection> findConnections(Connection conn);
/**
@ -181,7 +226,7 @@ public interface JdbcRepositoryHandler {
* @param job Job object to serialize into repository.
* @param conn Connection to metadata repository
*/
void createJob(MJob job, Connection conn);
public abstract void createJob(MJob job, Connection conn);
/**
* Update given job representation in repository. This job object must
@ -191,7 +236,7 @@ public interface JdbcRepositoryHandler {
* @param job Job object that should be updated in repository.
* @param conn Connection to metadata repository
*/
void updateJob(MJob job, Connection conn);
public abstract void updateJob(MJob job, Connection conn);
/**
* Check if given job exists in metastore.
@ -200,7 +245,7 @@ public interface JdbcRepositoryHandler {
* @param conn Connection to metadata repository
* @return True if the job exists
*/
boolean existsJob(long jobId, Connection conn);
public abstract boolean existsJob(long jobId, Connection conn);
/**
* Check if given job id is referenced somewhere and thus can't
@ -210,15 +255,23 @@ public interface JdbcRepositoryHandler {
* @param conn Connection to metadata repository
* @return
*/
boolean inUseJob(long jobId, Connection conn);
public abstract boolean inUseJob(long jobId, Connection conn);
/**
* Delete job with given id from metadata repository.
* Delete the input values for the job with given id from the repository.
* @param id Job object whose inputs should be removed from repository
* @param conn Connection to metadata repository
*/
public abstract void deleteJobInputs(long id, Connection conn);
/**
* Delete job with given id from metadata repository. This method will
* delete all inputs for this job also.
*
* @param jobId Job object that should be removed from repository
* @param conn Connection to metadata repository
*/
void deleteJob(long jobId, Connection conn);
public abstract void deleteJob(long jobId, Connection conn);
/**
* Find job with given id in repository.
@ -227,7 +280,7 @@ public interface JdbcRepositoryHandler {
* @param conn Connection to metadata repository
* @return Deserialized form of the job that is present in the repository
*/
MJob findJob(long jobId, Connection conn);
public abstract MJob findJob(long jobId, Connection conn);
/**
* Get all job objects.
@ -235,7 +288,7 @@ public interface JdbcRepositoryHandler {
* @param conn Connection to metadata repository
* @return List will all saved job objects
*/
List<MJob> findJobs(Connection conn);
public abstract List<MJob> findJobs(Connection conn);
/**
* Save given submission in repository.
@ -243,7 +296,8 @@ public interface JdbcRepositoryHandler {
* @param submission Submission object
* @param conn Connection to metadata repository
*/
void createSubmission(MSubmission submission, Connection conn);
public abstract void createSubmission(MSubmission submission,
Connection conn);
/**
* Check if submission with given id already exists in repository.
@ -251,7 +305,7 @@ public interface JdbcRepositoryHandler {
* @param submissionId Submission internal id
* @param conn Connection to metadata repository
*/
boolean existsSubmission(long submissionId, Connection conn);
public abstract boolean existsSubmission(long submissionId, Connection conn);
/**
* Update given submission in repository.
@ -259,7 +313,8 @@ public interface JdbcRepositoryHandler {
* @param submission Submission object
* @param conn Connection to metadata repository
*/
void updateSubmission(MSubmission submission, Connection conn);
public abstract void updateSubmission(MSubmission submission,
Connection conn);
/**
* Remove submissions older then threshold from repository.
@ -267,7 +322,7 @@ public interface JdbcRepositoryHandler {
* @param threshold Threshold date
* @param conn Connection to metadata repository
*/
void purgeSubmissions(Date threshold, Connection conn);
public abstract void purgeSubmissions(Date threshold, Connection conn);
/**
* Return list of unfinished submissions (as far as repository is concerned).
@ -275,7 +330,7 @@ public interface JdbcRepositoryHandler {
* @param conn Connection to metadata repository
* @return List of unfinished submissions.
*/
List<MSubmission> findSubmissionsUnfinished(Connection conn);
public abstract List<MSubmission> findSubmissionsUnfinished(Connection conn);
/**
* Find last submission for given jobId.
@ -284,5 +339,6 @@ public interface JdbcRepositoryHandler {
* @param conn Connection to metadata repository
* @return Most recent submission
*/
MSubmission findSubmissionLastForJob(long jobId, Connection conn);
public abstract MSubmission findSubmissionLastForJob(long jobId,
Connection conn);
}

View File

@ -17,12 +17,27 @@
*/
package org.apache.sqoop.repository;
import org.apache.sqoop.common.ErrorCode;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.MetadataUpgrader;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MConnectionForms;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MEnumInput;
import org.apache.sqoop.model.MForm;
import org.apache.sqoop.model.MFramework;
import org.apache.sqoop.model.MInput;
import org.apache.sqoop.model.MIntegerInput;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MJobForms;
import org.apache.sqoop.model.MMapInput;
import org.apache.sqoop.model.MStringInput;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.model.ModelError;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@ -32,9 +47,9 @@
* Sqoop to store metadata, statistics and other state relevant to Sqoop
* Jobs in the system.
*/
public interface Repository {
public abstract class Repository {
RepositoryTransaction getTransaction();
public abstract RepositoryTransaction getTransaction();
/**
* Registers given connector in the repository and return registered
@ -44,7 +59,18 @@ public interface Repository {
* @param mConnector the connector metadata to be registered
* @return Registered connector structure
*/
MConnector registerConnector(MConnector mConnector);
public abstract MConnector registerConnector(MConnector mConnector);
/**
* Search for connector with given name in repository.
*
* And return corresponding metadata structure.
*
* @param shortName Connector unique name
* @return null if connector is not yet registered in repository or
* loaded representation.
*/
public abstract MConnector findConnector(String shortName);
/**
@ -55,7 +81,7 @@ public interface Repository {
* @param mFramework framework metadata to be registered
* @return Registered connector structure
*/
MFramework registerFramework(MFramework mFramework);
public abstract MFramework registerFramework(MFramework mFramework);
/**
* Save given connection to repository. This connection must not be already
@ -63,7 +89,7 @@ public interface Repository {
*
* @param connection Connection object to serialize into repository.
*/
void createConnection(MConnection connection);
public abstract void createConnection(MConnection connection);
/**
* Update given connection representation in repository. This connection
@ -72,14 +98,28 @@ public interface Repository {
*
* @param connection Connection object that should be updated in repository.
*/
void updateConnection(MConnection connection);
public abstract void updateConnection(MConnection connection);
/**
* Update given connection representation in repository. This connection
* object must already exists in the repository otherwise exception will be
* thrown.
*
* @param connection Connection object that should be updated in repository.
* @param tx The repository transaction to use to push the data to the
* repository. If this is null, a new transaction will be created.
* method will not call begin, commit,
* rollback or close on this transaction.
*/
public abstract void updateConnection(final MConnection connection,
RepositoryTransaction tx);
/**
* Delete connection with given id from metadata repository.
*
* @param id Connection object that should be removed from repository
*/
void deleteConnection(long id);
public abstract void deleteConnection(long id);
/**
* Find connection with given id in repository.
@ -87,14 +127,14 @@ public interface Repository {
* @param id Connection id
* @return Deserialized form of the connection that is saved in repository
*/
MConnection findConnection(long id);
public abstract MConnection findConnection(long id);
/**
* Get all connection objects.
*
* @return List will all saved connection objects
*/
List<MConnection> findConnections();
public abstract List<MConnection> findConnections();
/**
* Save given job to repository. This job object must not be already present
@ -102,7 +142,7 @@ public interface Repository {
*
* @param job Job object that should be saved to repository
*/
void createJob(MJob job);
public abstract void createJob(MJob job);
/**
* Update given job metadata in repository. This object must already be saved
@ -110,14 +150,26 @@ public interface Repository {
*
* @param job Job object that should be updated in the repository
*/
void updateJob(MJob job);
public abstract void updateJob(MJob job);
/**
* Update given job metadata in repository. This object must already be saved
* in repository otherwise exception will be thrown.
*
* @param job Job object that should be updated in the repository
* @param tx The repository transaction to use to push the data to the
* repository. If this is null, a new transaction will be created.
* method will not call begin, commit,
* rollback or close on this transaction.
*/
public abstract void updateJob(MJob job, RepositoryTransaction tx);
/**
* Delete job with given id from metadata repository.
*
* @param id Job id that should be removed
*/
void deleteJob(long id);
public abstract void deleteJob(long id);
/**
* Find job object with given id.
@ -125,42 +177,42 @@ public interface Repository {
* @param id Job id
* @return Deserialized form of job loaded from repository
*/
MJob findJob(long id);
public abstract MJob findJob(long id);
/**
* Get all job objects.
*
* @return List of all jobs in the repository
*/
List<MJob> findJobs();
public abstract List<MJob> findJobs();
/**
* Create new submission record in repository.
*
* @param submission Submission object that should be serialized to repository
*/
void createSubmission(MSubmission submission);
public abstract void createSubmission(MSubmission submission);
/**
* Update already existing submission record in repository.
*
* @param submission Submission object that should be updated
*/
void updateSubmission(MSubmission submission);
public abstract void updateSubmission(MSubmission submission);
/**
* Remove submissions older then given date from repository.
*
* @param threshold Threshold date
*/
void purgeSubmissions(Date threshold);
public abstract void purgeSubmissions(Date threshold);
/**
* Return all unfinished submissions as far as repository is concerned.
*
* @return List of unfinished submissions
*/
List<MSubmission> findSubmissionsUnfinished();
public abstract List<MSubmission> findSubmissionsUnfinished();
/**
* Find last submission for given jobId.
@ -168,5 +220,181 @@ public interface Repository {
* @param jobId Job id
* @return Most recent submission
*/
MSubmission findSubmissionLastForJob(long jobId);
public abstract MSubmission findSubmissionLastForJob(long jobId);
/**
* Retrieve connections which use the given connector.
* @param connectorID Connector ID whose connections should be fetched
* @return List of MConnections that use <code>connectorID</code>.
*/
public abstract List<MConnection> findConnectionsForConnector(long
connectorID);
/**
* Retrieve jobs which use the given connection.
*
* @param connectorID Connector ID whose jobs should be fetched
* @return List of MJobs that use <code>connectionID</code>.
*/
public abstract List<MJob> findJobsForConnector(long
connectorID);
/**
* Update the connector with the new data supplied in the
* <tt>newConnector</tt>. Also Update all forms associated with this
* connector in the repository with the forms specified in
* <tt>mConnector</tt>. <tt>mConnector </tt> must
* minimally have the connectorID and all required forms (including ones
* which may not have changed). After this operation the repository is
* guaranteed to only have the new forms specified in this object.
*
* @param newConnector The new data to be inserted into the repository for
* this connector.
* @param tx The repository transaction to use to push the data to the
* repository. If this is null, a new transaction will be created.
* method will not call begin, commit,
* rollback or close on this transaction.
*/
protected abstract void updateConnector(MConnector newConnector,
RepositoryTransaction tx);
/**
* Delete all inputs for a job
* @param jobId The id of the job whose inputs are to be deleted.
* @param tx A transaction on the repository. This
* method will not call <code>begin, commit,
* rollback or close on this transaction.</code>
*/
protected abstract void deleteJobInputs(long jobId, RepositoryTransaction tx);
/**
* Delete all inputs for a connection
* @param connectionID The id of the connection whose inputs are to be
* deleted.
* @param tx The repository transaction to use to push the data to the
* repository. If this is null, a new transaction will be created.
* method will not call begin, commit,
* rollback or close on this transaction.
*/
protected abstract void deleteConnectionInputs(long connectionID,
RepositoryTransaction tx);
/**
* Upgrade the connector with the same {@linkplain MConnector#uniqueName}
* in the repository with values from <code>newConnector</code>.
* <p/>
* All connections and jobs associated with this connector will be upgraded
* automatically.
*
* @param oldConnector The old connector that should be upgraded.
* @param newConnector New properties for the Connector that should be
* upgraded.
*/
public final void upgradeConnector(MConnector oldConnector,
MConnector newConnector) {
long connectorID = oldConnector.getPersistenceId();
newConnector.setPersistenceId(connectorID);
/* Algorithms:
* 1. Get an upgrader for the connector.
* 2. Get all connections associated with the connector.
* 3. Get all jobs associated with the connector.
* 4. Delete the inputs for all of the jobs and connections (in that order)
* 5. Remove all inputs and forms associated with the connector, and
* register the new forms and inputs.
* 6. Create new connections and jobs with connector part being the ones
* returned by the upgrader.
* 7. Insert the connection inputs followed by job inputs (using
* updateJob and updateConnection)
*/
RepositoryTransaction tx = null;
try {
SqoopConnector connector = ConnectorManager.getInstance().getConnector(
connectorID);
MetadataUpgrader upgrader = connector.getMetadataUpgrader();
List<MConnection> connections = findConnectionsForConnector(
connectorID);
List<MJob> jobs = findJobsForConnector(connectorID);
// -- BEGIN TXN --
tx = getTransaction();
tx.begin();
for (MJob job : jobs) {
deleteJobInputs(job.getPersistenceId(), tx);
}
for (MConnection connection : connections) {
deleteConnectionInputs(connection.getPersistenceId(), tx);
}
updateConnector(newConnector, tx);
for (MConnection connection : connections) {
long connectionID = connection.getPersistenceId();
// Make a new copy of the forms from the connector,
// else the values will get set in the forms in the connector for
// each connection.
List<MForm> forms = cloneForms(newConnector.getConnectionForms()
.getForms());
MConnectionForms newConnectionForms = new MConnectionForms(forms);
upgrader.upgrade(connection.getConnectorPart(), newConnectionForms);
MConnection newConnection = new MConnection(connectorID,
newConnectionForms, connection.getFrameworkPart());
newConnection.setPersistenceId(connectionID);
updateConnection(newConnection, tx);
}
for (MJob job : jobs) {
// Make a new copy of the forms from the connector,
// else the values will get set in the forms in the connector for
// each connection.
List<MForm> forms = cloneForms(newConnector.getJobForms(job.getType())
.getForms());
MJobForms newJobForms = new MJobForms(job.getType(), forms);
upgrader.upgrade(job.getConnectorPart(), newJobForms);
MJob newJob = new MJob(connectorID, job.getConnectionId(),
job.getType(), newJobForms, job.getFrameworkPart());
updateJob(newJob, tx);
}
tx.commit();
} catch (Exception ex) {
if(tx != null) {
tx.rollback();
}
throw new SqoopException(RepositoryError.JDBCREPO_0000, ex);
} finally {
if(tx != null) {
tx.close();
}
}
}
/**
* Clones the forms, but does not set the actual data,
* validation message etc in the inputs, but only the persistence id of the
* inputs.
* @param mForms MForms which must be cloned
* @return Cloned MForms
* @throws Exception
*/
private List<MForm> cloneForms(List<MForm> mForms) throws Exception {
List<MForm> forms = new ArrayList<MForm>();
for(MForm mForm : mForms) {
List<MInput<?>> inputs = new ArrayList<MInput<?>>();
for (MInput<?> input : mForm.getInputs()) {
MInput newInput;
if(input instanceof MEnumInput) {
newInput = new MEnumInput(input.getName(), input.isSensitive(),
((MEnumInput) input).getValues());
} else if (input instanceof MMapInput) {
newInput = new MMapInput(input.getName(), input.isSensitive());
} else if(input instanceof MStringInput) {
newInput = new MStringInput(input.getName(), input.isSensitive(),
((MStringInput) input).getMaxLength());
} else if (input instanceof MIntegerInput) {
newInput = new MIntegerInput(input.getName(), input.isSensitive());
} else {
throw new SqoopException(ModelError.MODEL_003);
}
newInput.setPersistenceId(input.getPersistenceId());
inputs.add(newInput);
}
forms.add(new MForm(mForm.getName(), inputs));
}
return forms;
}
}

View File

@ -161,6 +161,8 @@ public enum DerbyRepoError implements ErrorCode {
/** Can't retrieve unfinished submissions **/
DERBYREPO_0037("Can't retrieve unfinished submissions"),
DERBYREPO_0038("Update of connector failed"),
;
private final String message;

View File

@ -67,7 +67,7 @@
*
* Repository implementation for Derby database.
*/
public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
private static final Logger LOG =
Logger.getLogger(DerbyRepositoryHandler.class);
@ -86,15 +86,54 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
public void registerConnector(MConnector mc, Connection conn) {
if (mc.hasPersistenceId()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0011,
mc.getUniqueName());
mc.getUniqueName());
}
mc.setPersistenceId(getConnectorId(mc, conn));
insertFormsForConnector(mc, conn);
}
PreparedStatement baseConnectorStmt = null;
/**
* Helper method to insert the forms from the MConnector into the
* repository. The job and connector forms within <code>mc</code> will get
* updated with the id of the forms when this function returns.
* @param mc The connector to use for updating forms
* @param conn JDBC connection to use for updating the forms
*/
private void insertFormsForConnector (MConnector mc, Connection conn) {
long connectorId = mc.getPersistenceId();
PreparedStatement baseFormStmt = null;
PreparedStatement baseInputStmt = null;
try{
baseFormStmt = conn.prepareStatement(STMT_INSERT_FORM_BASE,
Statement.RETURN_GENERATED_KEYS);
baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
Statement.RETURN_GENERATED_KEYS);
// Register connector forms
registerForms(connectorId, null, mc.getConnectionForms().getForms(),
MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt);
// Register all jobs
for (MJobForms jobForms : mc.getAllJobsForms().values()) {
registerForms(connectorId, jobForms.getType(), jobForms.getForms(),
MFormType.JOB.name(), baseFormStmt, baseInputStmt);
}
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0014,
mc.toString(), ex);
} finally {
closeStatements(baseFormStmt, baseInputStmt);
}
}
private long getConnectorId(MConnector mc, Connection conn) {
PreparedStatement baseConnectorStmt = null;
try {
baseConnectorStmt = conn.prepareStatement(STMT_INSERT_CONNECTOR_BASE,
Statement.RETURN_GENERATED_KEYS);
Statement.RETURN_GENERATED_KEYS);
baseConnectorStmt.setString(1, mc.getUniqueName());
baseConnectorStmt.setString(2, mc.getClassName());
baseConnectorStmt.setString(3, mc.getVersion());
@ -110,31 +149,12 @@ public void registerConnector(MConnector mc, Connection conn) {
if (!rsetConnectorId.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
}
long connectorId = rsetConnectorId.getLong(1);
mc.setPersistenceId(connectorId);
baseFormStmt = conn.prepareStatement(STMT_INSERT_FORM_BASE,
Statement.RETURN_GENERATED_KEYS);
baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
Statement.RETURN_GENERATED_KEYS);
// Register connector forms
registerForms(connectorId, null, mc.getConnectionForms().getForms(),
MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt);
// Register all jobs
for (MJobForms jobForms : mc.getAllJobsForms().values()) {
registerForms(connectorId, jobForms.getType(), jobForms.getForms(),
MFormType.JOB.name(), baseFormStmt, baseInputStmt);
}
return rsetConnectorId.getLong(1);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0014,
mc.toString(), ex);
mc.toString(), ex);
} finally {
closeStatements(baseConnectorStmt, baseFormStmt, baseInputStmt);
closeStatements(baseConnectorStmt);
}
}
@ -227,7 +247,6 @@ public boolean schemaExists() {
}
String sqoopSchemaId = rset.getString(1);
LOG.debug("SQOOP schema ID: " + sqoopSchemaId);
connection.commit();
} catch (SQLException ex) {
if (connection != null) {
@ -550,21 +569,33 @@ public boolean inUseConnection(long connectionId, Connection conn) {
@Override
public void deleteConnection(long id, Connection conn) {
PreparedStatement dltConn = null;
PreparedStatement dltConnInput = null;
try {
dltConnInput = conn.prepareStatement(STMT_DELETE_CONNECTION_INPUT);
deleteConnectionInputs(id, conn);
dltConn = conn.prepareStatement(STMT_DELETE_CONNECTION);
dltConnInput.setLong(1, id);
dltConn.setLong(1, id);
dltConnInput.executeUpdate();
dltConn.executeUpdate();
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0022, ex);
} finally {
closeStatements(dltConn, dltConnInput);
closeStatements(dltConn);
}
}
/**
* {@inheritDoc}
*/
@Override
public void deleteConnectionInputs(long id, Connection conn) {
PreparedStatement dltConnInput = null;
try {
dltConnInput = conn.prepareStatement(STMT_DELETE_CONNECTION_INPUT);
dltConnInput.setLong(1, id);
dltConnInput.executeUpdate();
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0022, ex);
} finally {
closeStatements(dltConnInput);
}
}
@ -613,6 +644,63 @@ public List<MConnection> findConnections(Connection conn) {
}
}
/**
*
* {@inheritDoc}
*
*/
@Override
public List<MConnection> findConnectionsForConnector(long
connectorID, Connection conn) {
PreparedStatement stmt = null;
try {
stmt = conn.prepareStatement(STMT_SELECT_CONNECTION_FOR_CONNECTOR);
stmt.setLong(1, connectorID);
return loadConnections(stmt, conn);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0023, ex);
} finally {
closeStatements(stmt);
}
}
/**
* {@inheritDoc}
*/
@Override
public void updateConnector(MConnector mConnector, Connection conn) {
PreparedStatement updateConnectorStatement = null;
PreparedStatement deleteForm = null;
PreparedStatement deleteInput = null;
try {
updateConnectorStatement = conn.prepareStatement(STMT_UPDATE_CONNECTOR);
deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONNECTOR);
deleteForm = conn.prepareStatement(STMT_DELETE_FORMS_FOR_CONNECTOR);
updateConnectorStatement.setString(1, mConnector.getUniqueName());
updateConnectorStatement.setString(2, mConnector.getClassName());
updateConnectorStatement.setString(3, mConnector.getVersion());
updateConnectorStatement.setLong(4, mConnector.getPersistenceId());
if (updateConnectorStatement.executeUpdate() != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0038);
}
deleteInput.setLong(1, mConnector.getPersistenceId());
deleteForm.setLong(1, mConnector.getPersistenceId());
deleteInput.executeUpdate();
deleteForm.executeUpdate();
} catch (SQLException e) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0038, e);
} finally {
closeStatements(updateConnectorStatement, deleteForm, deleteInput);
}
insertFormsForConnector(mConnector, conn);
}
/**
* {@inheritDoc}
*/
@ -746,21 +834,32 @@ public boolean inUseJob(long jobId, Connection conn) {
@Override
public void deleteJob(long id, Connection conn) {
PreparedStatement dlt = null;
PreparedStatement dltInput = null;
try {
dltInput = conn.prepareStatement(STMT_DELETE_JOB_INPUT);
deleteJobInputs(id, conn);
dlt = conn.prepareStatement(STMT_DELETE_JOB);
dltInput.setLong(1, id);
dlt.setLong(1, id);
dltInput.executeUpdate();
dlt.executeUpdate();
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0028, ex);
} finally {
closeStatements(dlt, dltInput);
closeStatements(dlt);
}
}
/**
* {@inheritDoc}
*/
@Override
public void deleteJobInputs(long id, Connection conn) {
PreparedStatement dltInput = null;
try {
dltInput = conn.prepareStatement(STMT_DELETE_JOB_INPUT);
dltInput.setLong(1, id);
dltInput.executeUpdate();
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0028, ex);
} finally {
closeStatements(dltInput);
}
}
@ -809,6 +908,24 @@ public List<MJob> findJobs(Connection conn) {
}
}
/**
* {@inheritDoc}
*/
@Override
public List<MJob> findJobsForConnector(long connectorId, Connection conn) {
PreparedStatement stmt = null;
try {
stmt = conn.prepareStatement(STMT_SELECT_ALL_JOBS_FOR_CONNECTOR);
stmt.setLong(1, connectorId);
return loadJobs(stmt, conn);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0031, ex);
} finally {
closeStatements(stmt);
}
}
/**
* {@inheritDoc}
*/
@ -1297,7 +1414,8 @@ private List<MJob> loadJobs(PreparedStatement stmt,
}
/**
* Register forms in derby database.
* Register forms in derby database. This method will insert the ids
* generated by the repository into the forms passed in itself.
*
* Use given prepared statements to create entire form structure in database.
*

View File

@ -447,6 +447,30 @@ public final class DerbySchemaQuery {
+ COLUMN_SQI_ENUMVALS
+ ") VALUES (?, ?, ?, ?, ?, ?, ?)";
// Delete all forms for a given connector
public static final String STMT_DELETE_FORMS_FOR_CONNECTOR =
"DELETE FROM " + TABLE_SQ_FORM
+ " WHERE " + COLUMN_SQN_CONNECTOR + " = ?";
// Delete all inputs for a given connector
public static final String STMT_DELETE_INPUTS_FOR_CONNECTOR =
"DELETE FROM " + TABLE_SQ_INPUT
+ " WHERE "
+ COLUMN_SQI_FORM
+ " IN (SELECT "
+ COLUMN_SQF_ID
+ " FROM " + TABLE_SQ_FORM
+ " WHERE "
+ COLUMN_SQF_CONNECTOR + " = ?)";
// Update the connector
public static final String STMT_UPDATE_CONNECTOR =
"UPDATE " + TABLE_SQ_CONNECTOR
+ " SET " + COLUMN_SQC_NAME + " = ?, "
+ COLUMN_SQC_CLASS + " = ?, "
+ COLUMN_SQC_VERSION + " = ? "
+ " WHERE " + COLUMN_SQC_ID + " = ?";
// DML: Insert new connection
public static final String STMT_INSERT_CONNECTION =
"INSERT INTO " + TABLE_SQ_CONNECTION + " ("
@ -502,6 +526,17 @@ public final class DerbySchemaQuery {
+ COLUMN_SQN_UPDATE_DATE
+ " FROM " + TABLE_SQ_CONNECTION;
// DML: Select all connections for a specific connector.
public static final String STMT_SELECT_CONNECTION_FOR_CONNECTOR =
"SELECT "
+ COLUMN_SQN_ID + ", "
+ COLUMN_SQN_NAME + ", "
+ COLUMN_SQN_CONNECTOR + ", "
+ COLUMN_SQN_CREATION_DATE + ", "
+ COLUMN_SQN_UPDATE_DATE
+ " FROM " + TABLE_SQ_CONNECTION
+ " WHERE " + COLUMN_SQN_CONNECTOR + " = ?";
// DML: Check if given connection exists
public static final String STMT_SELECT_CONNECTION_CHECK =
"SELECT count(*) FROM " + TABLE_SQ_CONNECTION
@ -567,7 +602,7 @@ public final class DerbySchemaQuery {
+ COLUMN_SQB_UPDATE_DATE
+ " FROM " + TABLE_SQ_JOB
+ " LEFT JOIN " + TABLE_SQ_CONNECTION
+ " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID
+ " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID
+ " WHERE " + COLUMN_SQB_ID + " = ?";
// DML: Select all jobs
@ -584,6 +619,21 @@ public final class DerbySchemaQuery {
+ " LEFT JOIN " + TABLE_SQ_CONNECTION
+ " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID;
// DML: Select all jobs for a Connector
public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR =
"SELECT "
+ COLUMN_SQN_CONNECTOR + ", "
+ COLUMN_SQB_ID + ", "
+ COLUMN_SQB_NAME + ", "
+ COLUMN_SQB_CONNECTION + ", "
+ COLUMN_SQB_TYPE + ", "
+ COLUMN_SQB_CREATION_DATE + ", "
+ COLUMN_SQB_UPDATE_DATE
+ " FROM " + TABLE_SQ_JOB
+ " LEFT JOIN " + TABLE_SQ_CONNECTION
+ " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID
+ " AND " + COLUMN_SQN_CONNECTOR + " = ? ";
// DML: Insert new submission
public static final String STMT_INSERT_SUBMISSION =
"INSERT INTO " + TABLE_SQ_SUBMISSION + "("

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.connector.spi;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MConnectionForms;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MJobForms;
public abstract class MetadataUpgrader {
/**
* Upgrade the original connection and fill into the upgradeTarget. Note
* that any metadata already in {@code upgradeTarget} maybe overwritten.
* @param original - original connection metadata as in the repository
* @param upgradeTarget - the instance that will be filled in with the
* upgraded metadata.
*/
public abstract void upgrade(MConnectionForms original,
MConnectionForms upgradeTarget);
/**
* Upgrade the original job and fill into the upgradeTarget. Note
* that any metadata already in {@code upgradeTarget} maybe overwritten.
* This method must be called only after the connection metadata has
* already been upgraded.
* @param original - original connection metadata as in the repository
* @param upgradeTarget - the instance that will be filled in with the
* upgraded metadata.
*/
public abstract void upgrade(MJobForms original, MJobForms upgradeTarget);
}

View File

@ -72,4 +72,11 @@ public abstract class SqoopConnector {
*/
public abstract Validator getValidator();
/**
* Returns an {@linkplain MetadataUpgrader} object that can upgrade the
* connection and job metadata.
* @return MetadataUpgrader object
*/
public abstract MetadataUpgrader getMetadataUpgrader();
}