diff --git a/common/src/main/java/org/apache/sqoop/model/MConnection.java b/common/src/main/java/org/apache/sqoop/model/MConnection.java index 36dca42a..c31eafd0 100644 --- a/common/src/main/java/org/apache/sqoop/model/MConnection.java +++ b/common/src/main/java/org/apache/sqoop/model/MConnection.java @@ -57,6 +57,18 @@ public long getConnectorId() { return connectorId; } + public void setConnectorPart(MConnectionForms connectorPart) { + this.connectorPart = connectorPart; + } + + public void setFrameworkPart(MConnectionForms frameworkPart) { + this.frameworkPart = frameworkPart; + } + + public void setConnectorId(long connectorId) { + this.connectorId = connectorId; + } + public MConnectionForms getConnectorPart() { return connectorPart; } diff --git a/common/src/main/java/org/apache/sqoop/model/MJob.java b/common/src/main/java/org/apache/sqoop/model/MJob.java index a53f04e3..5b50bfd5 100644 --- a/common/src/main/java/org/apache/sqoop/model/MJob.java +++ b/common/src/main/java/org/apache/sqoop/model/MJob.java @@ -98,10 +98,18 @@ public long getConnectionId() { return connectionId; } + public void setConnectionId(long connectionId) { + this.connectionId = connectionId; + } + public long getConnectorId() { return connectorId; } + public void setConnectorId(long connectorId) { + this.connectorId = connectorId; + } + public MJobForms getConnectorPart() { return connectorPart; } @@ -110,6 +118,15 @@ public MJobForms getFrameworkPart() { return frameworkPart; } + public void setConnectorPart(MJobForms connectorPart) { + this.connectorPart = connectorPart; + } + + public void setFrameworkPart(MJobForms frameworkPart) { + this.frameworkPart = frameworkPart; + } + + public Type getType() { return type; } diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java index c315e48d..11c10def 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java @@ -24,6 +24,7 @@ import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; +import org.apache.sqoop.connector.spi.MetadataUpgrader; import org.apache.sqoop.job.etl.Exporter; import org.apache.sqoop.job.etl.Importer; import org.apache.sqoop.connector.spi.SqoopConnector; @@ -94,4 +95,9 @@ public Validator getValidator() { return new GenericJdbcValidator(); } + @Override + public MetadataUpgrader getMetadataUpgrader() { + return new GenericJdbcConnectorMetadataUpgrader(); + } + } diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorMetadataUpgrader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorMetadataUpgrader.java new file mode 100644 index 00000000..cd461f43 --- /dev/null +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorMetadataUpgrader.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import org.apache.sqoop.connector.spi.MetadataUpgrader; +import org.apache.sqoop.model.MConnectionForms; +import org.apache.sqoop.model.MForm; +import org.apache.sqoop.model.MInput; +import org.apache.sqoop.model.MJobForms; +import org.apache.sqoop.validation.Status; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class GenericJdbcConnectorMetadataUpgrader extends MetadataUpgrader { + /* + * For now, there is no real upgrade. So copy all data over, + * set the validation messages and error messages to be the same as for the + * inputs in the original one. + */ + + @Override + public void upgrade(MConnectionForms original, + MConnectionForms upgradeTarget) { + doUpgrade(original.getForms(), upgradeTarget.getForms()); + } + + @Override + public void upgrade(MJobForms original, MJobForms upgradeTarget) { + doUpgrade(original.getForms(), upgradeTarget.getForms()); + + } + + @SuppressWarnings("unchecked") + private void doUpgrade(List original, List target) { + // Easier to find the form in the original forms list if we use a map. + // Since the constructor of MJobForms takes a list, + // index is not guaranteed to be the same, so we need to look for + // equivalence + Map formMap = new HashMap(); + for (MForm form : original) { + formMap.put(form.getName(), form); + } + for (MForm form : target) { + List> inputs = form.getInputs(); + MForm originalForm = formMap.get(form.getName()); + for (MInput input : inputs) { + MInput originalInput = originalForm.getInput(input.getName()); + input.setValue(originalInput.getValue()); + } + } + } +} diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java index 32df1e5d..b2259ced 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java @@ -29,7 +29,7 @@ import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MSubmission; -public class JdbcRepository implements Repository { +public class JdbcRepository extends Repository { private static final Logger LOG = Logger.getLogger(JdbcRepository.class); @@ -58,27 +58,42 @@ private interface DoWithConnection { Object doIt(Connection conn) throws Exception; } + private Object doWithConnection(DoWithConnection delegator) { + return doWithConnection(delegator, null); + } + /** * Handle transaction and connection functionality and delegate action to * given delegator. * * @param delegator Code for specific action + * @param tx The transaction to use for the operation. If a transaction is + * specified, this method will not commit, rollback or close it. + * If null, a new transaction will be created - which will be + * committed/closed/rolled back. * @return Arbitrary value */ - private Object doWithConnection(DoWithConnection delegator) { - JdbcRepositoryTransaction tx = null; + private Object doWithConnection(DoWithConnection delegator, + JdbcRepositoryTransaction tx) { + boolean shouldCloseTxn = false; try { // Get transaction and connection - tx = getTransaction(); - tx.begin(); - Connection conn = tx.getConnection(); + Connection conn; + if (tx == null) { + tx = getTransaction(); + shouldCloseTxn = true; + tx.begin(); + } + conn = tx.getConnection(); // Delegate the functionality to our delegator Object returnValue = delegator.doIt(conn); - // Commit transaction - tx.commit(); + if (shouldCloseTxn) { + // Commit transaction + tx.commit(); + } // Return value that the underlying code needs to return return returnValue; @@ -86,12 +101,12 @@ private Object doWithConnection(DoWithConnection delegator) { } catch (SqoopException ex) { throw ex; } catch (Exception ex) { - if (tx != null) { + if (tx != null && shouldCloseTxn) { tx.rollback(); } throw new SqoopException(RepositoryError.JDBCREPO_0012, ex); } finally { - if (tx != null) { + if (tx != null && shouldCloseTxn) { tx.close(); } } @@ -121,11 +136,20 @@ public Object doIt(Connection conn) throws Exception { handler.registerConnector(mConnector, conn); return mConnector; } else { + // Same connector, check if the version is the same. + // For now, use the "string" versions itself - later we should + // probably include a build number or something that is + // monotonically increasing. + if (result.getUniqueName().equals(mConnector.getUniqueName()) && + mConnector.getVersion().compareTo(result.getVersion()) > 0) { + upgradeConnector(result, mConnector); + return mConnector; + } if (!result.equals(mConnector)) { throw new SqoopException(RepositoryError.JDBCREPO_0013, "Connector: " + mConnector.getUniqueName() - + " given: " + mConnector - + " found: " + result); + + " given: " + mConnector + + " found: " + result); } return result; } @@ -133,6 +157,19 @@ public Object doIt(Connection conn) throws Exception { }); } + /** + * {@inheritDoc} + */ + @Override + public MConnector findConnector(final String shortName) { + return (MConnector) doWithConnection(new DoWithConnection() { + @Override + public Object doIt(Connection conn) throws Exception { + return handler.findConnector(shortName, conn); + } + }); + } + /** * {@inheritDoc} */ @@ -179,6 +216,15 @@ public Object doIt(Connection conn) { */ @Override public void updateConnection(final MConnection connection) { + updateConnection(connection, null); + } + + /** + * {@inheritDoc} + */ + @Override + public void updateConnection(final MConnection connection, + RepositoryTransaction tx) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { @@ -193,7 +239,7 @@ public Object doIt(Connection conn) { handler.updateConnection(connection, conn); return null; } - }); + }, (JdbcRepositoryTransaction) tx); } /** @@ -269,6 +315,14 @@ public Object doIt(Connection conn) { */ @Override public void updateJob(final MJob job) { + updateJob(job, null); + } + + /** + * {@inheritDoc} + */ + @Override + public void updateJob(final MJob job, RepositoryTransaction tx) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { @@ -283,7 +337,7 @@ public Object doIt(Connection conn) { handler.updateJob(job, conn); return null; } - }); + }, (JdbcRepositoryTransaction) tx); } /** @@ -420,4 +474,71 @@ public Object doIt(Connection conn) { } }); } + + /** + * {@inheritDoc} + */ + @Override + public List findConnectionsForConnector(final long + connectorID) { + return (List) doWithConnection(new DoWithConnection() { + @Override + public Object doIt(Connection conn) throws Exception { + return handler.findConnectionsForConnector(connectorID, conn); + } + }); + } + + /** + * {@inheritDoc} + */ + @Override + public List findJobsForConnector(final long connectorID) { + return (List) doWithConnection(new DoWithConnection() { + @Override + public Object doIt(Connection conn) throws Exception { + return handler.findJobsForConnector(connectorID, conn); + } + }); + } + + @Override + protected void deleteJobInputs(final long jobID, RepositoryTransaction tx) { + doWithConnection(new DoWithConnection() { + @Override + public Object doIt(Connection conn) throws Exception { + handler.deleteJobInputs(jobID, conn); + return null; + } + }, (JdbcRepositoryTransaction) tx); + + } + + @Override + protected void deleteConnectionInputs(final long connectionID, + RepositoryTransaction tx) { + doWithConnection(new DoWithConnection() { + @Override + public Object doIt(Connection conn) throws Exception { + handler.deleteConnectionInputs(connectionID, conn); + return null; + } + }, (JdbcRepositoryTransaction) tx); + + } + + /** + * {@inheritDoc} + */ + @Override + protected void updateConnector(final MConnector newConnector, + RepositoryTransaction tx) { + doWithConnection(new DoWithConnection() { + @Override + public Object doIt(Connection conn) throws Exception { + handler.updateConnector(newConnector, conn); + return null; + } + }, (JdbcRepositoryTransaction) tx); + } } diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java index ca51313c..1f88b6df 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java @@ -30,14 +30,14 @@ /** * Set of methods required from each JDBC based repository. */ -public interface JdbcRepositoryHandler { +public abstract class JdbcRepositoryHandler { /** * Initialize JDBC based repository. * * @param repoContext Context for this instance */ - void initialize(JdbcRepositoryContext repoContext); + public abstract void initialize(JdbcRepositoryContext repoContext); /** * Search for connector with given name in repository. @@ -49,7 +49,7 @@ public interface JdbcRepositoryHandler { * @return null if connector is not yet registered in repository or * loaded representation. */ - MConnector findConnector(String shortName, Connection conn); + public abstract MConnector findConnector(String shortName, Connection conn); /** * Register given connector in repository. @@ -60,7 +60,42 @@ public interface JdbcRepositoryHandler { * @param mc Connector that should be registered. * @param conn JDBC connection for querying repository. */ - void registerConnector(MConnector mc, Connection conn); + public abstract void registerConnector(MConnector mc, Connection conn); + + + /** + * Retrieve connections which use the given connector. + * @param connectorID Connector ID whose connections should be fetched + * @param conn JDBC connection for querying repository + * @return List of MConnections that use connectorID. + */ + public abstract List findConnectionsForConnector(long + connectorID, Connection conn); + + /** + * Retrieve jobs which use the given connection. + * + * @param connectorID Connector ID whose jobs should be fetched + * @param conn JDBC connection for querying repository + * @return List of MJobs that use connectionID. + */ + public abstract List findJobsForConnector(long connectorID, + Connection conn); + + /** + * Update the connector with the new data supplied in the newConnector. + * Also Update all forms associated with this connector in the repository + * with the forms specified in mConnector. mConnector must + * minimally have the connectorID and all required forms (including ones + * which may not have changed). After this operation the repository is + * guaranteed to only have the new forms specified in this object. + * + * @param mConnector The new data to be inserted into the repository for + * this connector. + * @param conn JDBC connection for querying repository + */ + + public abstract void updateConnector(MConnector mConnector, Connection conn); /** * Search for framework metadata in the repository. @@ -69,7 +104,7 @@ public interface JdbcRepositoryHandler { * @return null if framework metadata are not yet present in repository or * loaded representation. */ - MFramework findFramework(Connection conn); + public abstract MFramework findFramework(Connection conn); /** * Register framework metadata in repository. @@ -80,26 +115,26 @@ public interface JdbcRepositoryHandler { * @param mf Framework metadata that should be registered. * @param conn JDBC connection for querying repository. */ - void registerFramework(MFramework mf, Connection conn); + public abstract void registerFramework(MFramework mf, Connection conn); /** * Check if schema is already present in the repository. * * @return true if schema is already present or false if it's not */ - boolean schemaExists(); + public abstract boolean schemaExists(); /** * Create required schema in repository. */ - void createSchema(); + public abstract void createSchema(); /** * Termination callback for repository. * * Should clean up all resources and commit all uncommitted data. */ - void shutdown(); + public abstract void shutdown(); /** * Specify query that Sqoop framework can use to validate connection to @@ -108,7 +143,7 @@ public interface JdbcRepositoryHandler { * @return Query or NULL in case that this repository do not support or do not * want to validate live connections. */ - String validationQuery(); + public abstract String validationQuery(); /** * Save given connection to repository. This connection must not be already @@ -117,7 +152,8 @@ public interface JdbcRepositoryHandler { * @param connection Connection object to serialize into repository. * @param conn Connection to metadata repository */ - void createConnection(MConnection connection, Connection conn); + public abstract void createConnection(MConnection connection, + Connection conn); /** * Update given connection representation in repository. This connection @@ -127,7 +163,8 @@ public interface JdbcRepositoryHandler { * @param connection Connection object that should be updated in repository. * @param conn Connection to metadata repository */ - void updateConnection(MConnection connection, Connection conn); + public abstract void updateConnection(MConnection connection, + Connection conn); /** * Check if given connection exists in metastore. @@ -136,7 +173,7 @@ public interface JdbcRepositoryHandler { * @param conn Connection to metadata repository * @return True if the connection exists */ - boolean existsConnection(long connetionId, Connection conn); + public abstract boolean existsConnection(long connetionId, Connection conn); /** * Check if given Connection id is referenced somewhere and thus can't @@ -146,7 +183,7 @@ public interface JdbcRepositoryHandler { * @param conn Connection to metadata repository * @return */ - boolean inUseConnection(long connectionId, Connection conn); + public abstract boolean inUseConnection(long connectionId, Connection conn); /** * Delete connection with given id from metadata repository. @@ -154,8 +191,15 @@ public interface JdbcRepositoryHandler { * @param connectionId Connection object that should be removed from repository * @param conn Connection to metadata repository */ - void deleteConnection(long connectionId, Connection conn); + public abstract void deleteConnection(long connectionId, Connection conn); + /** + * Delete the input values for the connection with given id from the + * repository. + * @param id Connection object whose inputs should be removed from repository + * @param conn Connection to metadata repository + */ + public abstract void deleteConnectionInputs(long id, Connection conn); /** * Find connection with given id in repository. * @@ -163,7 +207,8 @@ public interface JdbcRepositoryHandler { * @param conn Connection to metadata repository * @return Deserialized form of the connection that is saved in repository */ - MConnection findConnection(long connectionId, Connection conn); + public abstract MConnection findConnection(long connectionId, + Connection conn); /** * Get all connection objects. @@ -171,7 +216,7 @@ public interface JdbcRepositoryHandler { * @param conn Connection to metadata repository * @return List will all saved connection objects */ - List findConnections(Connection conn); + public abstract List findConnections(Connection conn); /** @@ -181,7 +226,7 @@ public interface JdbcRepositoryHandler { * @param job Job object to serialize into repository. * @param conn Connection to metadata repository */ - void createJob(MJob job, Connection conn); + public abstract void createJob(MJob job, Connection conn); /** * Update given job representation in repository. This job object must @@ -191,7 +236,7 @@ public interface JdbcRepositoryHandler { * @param job Job object that should be updated in repository. * @param conn Connection to metadata repository */ - void updateJob(MJob job, Connection conn); + public abstract void updateJob(MJob job, Connection conn); /** * Check if given job exists in metastore. @@ -200,7 +245,7 @@ public interface JdbcRepositoryHandler { * @param conn Connection to metadata repository * @return True if the job exists */ - boolean existsJob(long jobId, Connection conn); + public abstract boolean existsJob(long jobId, Connection conn); /** * Check if given job id is referenced somewhere and thus can't @@ -210,15 +255,23 @@ public interface JdbcRepositoryHandler { * @param conn Connection to metadata repository * @return */ - boolean inUseJob(long jobId, Connection conn); + public abstract boolean inUseJob(long jobId, Connection conn); + /** - * Delete job with given id from metadata repository. + * Delete the input values for the job with given id from the repository. + * @param id Job object whose inputs should be removed from repository + * @param conn Connection to metadata repository + */ + public abstract void deleteJobInputs(long id, Connection conn); + /** + * Delete job with given id from metadata repository. This method will + * delete all inputs for this job also. * * @param jobId Job object that should be removed from repository * @param conn Connection to metadata repository */ - void deleteJob(long jobId, Connection conn); + public abstract void deleteJob(long jobId, Connection conn); /** * Find job with given id in repository. @@ -227,7 +280,7 @@ public interface JdbcRepositoryHandler { * @param conn Connection to metadata repository * @return Deserialized form of the job that is present in the repository */ - MJob findJob(long jobId, Connection conn); + public abstract MJob findJob(long jobId, Connection conn); /** * Get all job objects. @@ -235,7 +288,7 @@ public interface JdbcRepositoryHandler { * @param conn Connection to metadata repository * @return List will all saved job objects */ - List findJobs(Connection conn); + public abstract List findJobs(Connection conn); /** * Save given submission in repository. @@ -243,7 +296,8 @@ public interface JdbcRepositoryHandler { * @param submission Submission object * @param conn Connection to metadata repository */ - void createSubmission(MSubmission submission, Connection conn); + public abstract void createSubmission(MSubmission submission, + Connection conn); /** * Check if submission with given id already exists in repository. @@ -251,7 +305,7 @@ public interface JdbcRepositoryHandler { * @param submissionId Submission internal id * @param conn Connection to metadata repository */ - boolean existsSubmission(long submissionId, Connection conn); + public abstract boolean existsSubmission(long submissionId, Connection conn); /** * Update given submission in repository. @@ -259,7 +313,8 @@ public interface JdbcRepositoryHandler { * @param submission Submission object * @param conn Connection to metadata repository */ - void updateSubmission(MSubmission submission, Connection conn); + public abstract void updateSubmission(MSubmission submission, + Connection conn); /** * Remove submissions older then threshold from repository. @@ -267,7 +322,7 @@ public interface JdbcRepositoryHandler { * @param threshold Threshold date * @param conn Connection to metadata repository */ - void purgeSubmissions(Date threshold, Connection conn); + public abstract void purgeSubmissions(Date threshold, Connection conn); /** * Return list of unfinished submissions (as far as repository is concerned). @@ -275,7 +330,7 @@ public interface JdbcRepositoryHandler { * @param conn Connection to metadata repository * @return List of unfinished submissions. */ - List findSubmissionsUnfinished(Connection conn); + public abstract List findSubmissionsUnfinished(Connection conn); /** * Find last submission for given jobId. @@ -284,5 +339,6 @@ public interface JdbcRepositoryHandler { * @param conn Connection to metadata repository * @return Most recent submission */ - MSubmission findSubmissionLastForJob(long jobId, Connection conn); + public abstract MSubmission findSubmissionLastForJob(long jobId, + Connection conn); } diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java index d6ec3037..57c9be45 100644 --- a/core/src/main/java/org/apache/sqoop/repository/Repository.java +++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java @@ -17,12 +17,27 @@ */ package org.apache.sqoop.repository; +import org.apache.sqoop.common.ErrorCode; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.ConnectorManager; +import org.apache.sqoop.connector.spi.MetadataUpgrader; +import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.model.MConnection; +import org.apache.sqoop.model.MConnectionForms; import org.apache.sqoop.model.MConnector; +import org.apache.sqoop.model.MEnumInput; +import org.apache.sqoop.model.MForm; import org.apache.sqoop.model.MFramework; +import org.apache.sqoop.model.MInput; +import org.apache.sqoop.model.MIntegerInput; import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MJobForms; +import org.apache.sqoop.model.MMapInput; +import org.apache.sqoop.model.MStringInput; import org.apache.sqoop.model.MSubmission; +import org.apache.sqoop.model.ModelError; +import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -32,9 +47,9 @@ * Sqoop to store metadata, statistics and other state relevant to Sqoop * Jobs in the system. */ -public interface Repository { +public abstract class Repository { - RepositoryTransaction getTransaction(); + public abstract RepositoryTransaction getTransaction(); /** * Registers given connector in the repository and return registered @@ -44,7 +59,18 @@ public interface Repository { * @param mConnector the connector metadata to be registered * @return Registered connector structure */ - MConnector registerConnector(MConnector mConnector); + public abstract MConnector registerConnector(MConnector mConnector); + + /** + * Search for connector with given name in repository. + * + * And return corresponding metadata structure. + * + * @param shortName Connector unique name + * @return null if connector is not yet registered in repository or + * loaded representation. + */ + public abstract MConnector findConnector(String shortName); /** @@ -55,7 +81,7 @@ public interface Repository { * @param mFramework framework metadata to be registered * @return Registered connector structure */ - MFramework registerFramework(MFramework mFramework); + public abstract MFramework registerFramework(MFramework mFramework); /** * Save given connection to repository. This connection must not be already @@ -63,7 +89,7 @@ public interface Repository { * * @param connection Connection object to serialize into repository. */ - void createConnection(MConnection connection); + public abstract void createConnection(MConnection connection); /** * Update given connection representation in repository. This connection @@ -72,14 +98,28 @@ public interface Repository { * * @param connection Connection object that should be updated in repository. */ - void updateConnection(MConnection connection); + public abstract void updateConnection(MConnection connection); + + /** + * Update given connection representation in repository. This connection + * object must already exists in the repository otherwise exception will be + * thrown. + * + * @param connection Connection object that should be updated in repository. + * @param tx The repository transaction to use to push the data to the + * repository. If this is null, a new transaction will be created. + * method will not call begin, commit, + * rollback or close on this transaction. + */ + public abstract void updateConnection(final MConnection connection, + RepositoryTransaction tx); /** * Delete connection with given id from metadata repository. * * @param id Connection object that should be removed from repository */ - void deleteConnection(long id); + public abstract void deleteConnection(long id); /** * Find connection with given id in repository. @@ -87,14 +127,14 @@ public interface Repository { * @param id Connection id * @return Deserialized form of the connection that is saved in repository */ - MConnection findConnection(long id); + public abstract MConnection findConnection(long id); /** * Get all connection objects. * * @return List will all saved connection objects */ - List findConnections(); + public abstract List findConnections(); /** * Save given job to repository. This job object must not be already present @@ -102,7 +142,7 @@ public interface Repository { * * @param job Job object that should be saved to repository */ - void createJob(MJob job); + public abstract void createJob(MJob job); /** * Update given job metadata in repository. This object must already be saved @@ -110,14 +150,26 @@ public interface Repository { * * @param job Job object that should be updated in the repository */ - void updateJob(MJob job); + public abstract void updateJob(MJob job); + + /** + * Update given job metadata in repository. This object must already be saved + * in repository otherwise exception will be thrown. + * + * @param job Job object that should be updated in the repository + * @param tx The repository transaction to use to push the data to the + * repository. If this is null, a new transaction will be created. + * method will not call begin, commit, + * rollback or close on this transaction. + */ + public abstract void updateJob(MJob job, RepositoryTransaction tx); /** * Delete job with given id from metadata repository. * * @param id Job id that should be removed */ - void deleteJob(long id); + public abstract void deleteJob(long id); /** * Find job object with given id. @@ -125,42 +177,42 @@ public interface Repository { * @param id Job id * @return Deserialized form of job loaded from repository */ - MJob findJob(long id); + public abstract MJob findJob(long id); /** * Get all job objects. * * @return List of all jobs in the repository */ - List findJobs(); + public abstract List findJobs(); /** * Create new submission record in repository. * * @param submission Submission object that should be serialized to repository */ - void createSubmission(MSubmission submission); + public abstract void createSubmission(MSubmission submission); /** * Update already existing submission record in repository. * * @param submission Submission object that should be updated */ - void updateSubmission(MSubmission submission); + public abstract void updateSubmission(MSubmission submission); /** * Remove submissions older then given date from repository. * * @param threshold Threshold date */ - void purgeSubmissions(Date threshold); + public abstract void purgeSubmissions(Date threshold); /** * Return all unfinished submissions as far as repository is concerned. * * @return List of unfinished submissions */ - List findSubmissionsUnfinished(); + public abstract List findSubmissionsUnfinished(); /** * Find last submission for given jobId. @@ -168,5 +220,181 @@ public interface Repository { * @param jobId Job id * @return Most recent submission */ - MSubmission findSubmissionLastForJob(long jobId); + public abstract MSubmission findSubmissionLastForJob(long jobId); + + /** + * Retrieve connections which use the given connector. + * @param connectorID Connector ID whose connections should be fetched + * @return List of MConnections that use connectorID. + */ + public abstract List findConnectionsForConnector(long + connectorID); + + /** + * Retrieve jobs which use the given connection. + * + * @param connectorID Connector ID whose jobs should be fetched + * @return List of MJobs that use connectionID. + */ + public abstract List findJobsForConnector(long + connectorID); + + /** + * Update the connector with the new data supplied in the + * newConnector. Also Update all forms associated with this + * connector in the repository with the forms specified in + * mConnector. mConnector must + * minimally have the connectorID and all required forms (including ones + * which may not have changed). After this operation the repository is + * guaranteed to only have the new forms specified in this object. + * + * @param newConnector The new data to be inserted into the repository for + * this connector. + * @param tx The repository transaction to use to push the data to the + * repository. If this is null, a new transaction will be created. + * method will not call begin, commit, + * rollback or close on this transaction. + */ + protected abstract void updateConnector(MConnector newConnector, + RepositoryTransaction tx); + + /** + * Delete all inputs for a job + * @param jobId The id of the job whose inputs are to be deleted. + * @param tx A transaction on the repository. This + * method will not call begin, commit, + * rollback or close on this transaction. + */ + protected abstract void deleteJobInputs(long jobId, RepositoryTransaction tx); + + /** + * Delete all inputs for a connection + * @param connectionID The id of the connection whose inputs are to be + * deleted. + * @param tx The repository transaction to use to push the data to the + * repository. If this is null, a new transaction will be created. + * method will not call begin, commit, + * rollback or close on this transaction. + */ + protected abstract void deleteConnectionInputs(long connectionID, + RepositoryTransaction tx); + + /** + * Upgrade the connector with the same {@linkplain MConnector#uniqueName} + * in the repository with values from newConnector. + *

+ * All connections and jobs associated with this connector will be upgraded + * automatically. + * + * @param oldConnector The old connector that should be upgraded. + * @param newConnector New properties for the Connector that should be + * upgraded. + */ + public final void upgradeConnector(MConnector oldConnector, + MConnector newConnector) { + long connectorID = oldConnector.getPersistenceId(); + newConnector.setPersistenceId(connectorID); + /* Algorithms: + * 1. Get an upgrader for the connector. + * 2. Get all connections associated with the connector. + * 3. Get all jobs associated with the connector. + * 4. Delete the inputs for all of the jobs and connections (in that order) + * 5. Remove all inputs and forms associated with the connector, and + * register the new forms and inputs. + * 6. Create new connections and jobs with connector part being the ones + * returned by the upgrader. + * 7. Insert the connection inputs followed by job inputs (using + * updateJob and updateConnection) + */ + RepositoryTransaction tx = null; + try { + SqoopConnector connector = ConnectorManager.getInstance().getConnector( + connectorID); + MetadataUpgrader upgrader = connector.getMetadataUpgrader(); + List connections = findConnectionsForConnector( + connectorID); + List jobs = findJobsForConnector(connectorID); + // -- BEGIN TXN -- + tx = getTransaction(); + tx.begin(); + for (MJob job : jobs) { + deleteJobInputs(job.getPersistenceId(), tx); + } + for (MConnection connection : connections) { + deleteConnectionInputs(connection.getPersistenceId(), tx); + } + updateConnector(newConnector, tx); + for (MConnection connection : connections) { + long connectionID = connection.getPersistenceId(); + // Make a new copy of the forms from the connector, + // else the values will get set in the forms in the connector for + // each connection. + List forms = cloneForms(newConnector.getConnectionForms() + .getForms()); + MConnectionForms newConnectionForms = new MConnectionForms(forms); + upgrader.upgrade(connection.getConnectorPart(), newConnectionForms); + MConnection newConnection = new MConnection(connectorID, + newConnectionForms, connection.getFrameworkPart()); + newConnection.setPersistenceId(connectionID); + updateConnection(newConnection, tx); + } + for (MJob job : jobs) { + // Make a new copy of the forms from the connector, + // else the values will get set in the forms in the connector for + // each connection. + List forms = cloneForms(newConnector.getJobForms(job.getType()) + .getForms()); + MJobForms newJobForms = new MJobForms(job.getType(), forms); + upgrader.upgrade(job.getConnectorPart(), newJobForms); + MJob newJob = new MJob(connectorID, job.getConnectionId(), + job.getType(), newJobForms, job.getFrameworkPart()); + updateJob(newJob, tx); + } + tx.commit(); + } catch (Exception ex) { + if(tx != null) { + tx.rollback(); + } + throw new SqoopException(RepositoryError.JDBCREPO_0000, ex); + } finally { + if(tx != null) { + tx.close(); + } + } + } + + /** + * Clones the forms, but does not set the actual data, + * validation message etc in the inputs, but only the persistence id of the + * inputs. + * @param mForms MForms which must be cloned + * @return Cloned MForms + * @throws Exception + */ + private List cloneForms(List mForms) throws Exception { + List forms = new ArrayList(); + for(MForm mForm : mForms) { + List> inputs = new ArrayList>(); + for (MInput input : mForm.getInputs()) { + MInput newInput; + if(input instanceof MEnumInput) { + newInput = new MEnumInput(input.getName(), input.isSensitive(), + ((MEnumInput) input).getValues()); + } else if (input instanceof MMapInput) { + newInput = new MMapInput(input.getName(), input.isSensitive()); + } else if(input instanceof MStringInput) { + newInput = new MStringInput(input.getName(), input.isSensitive(), + ((MStringInput) input).getMaxLength()); + } else if (input instanceof MIntegerInput) { + newInput = new MIntegerInput(input.getName(), input.isSensitive()); + } else { + throw new SqoopException(ModelError.MODEL_003); + } + newInput.setPersistenceId(input.getPersistenceId()); + inputs.add(newInput); + } + forms.add(new MForm(mForm.getName(), inputs)); + } + return forms; + } } diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java index 95f6570c..327896cb 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java @@ -161,6 +161,8 @@ public enum DerbyRepoError implements ErrorCode { /** Can't retrieve unfinished submissions **/ DERBYREPO_0037("Can't retrieve unfinished submissions"), + DERBYREPO_0038("Update of connector failed"), + ; private final String message; diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java index 32cef8a1..556241e8 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java @@ -67,7 +67,7 @@ * * Repository implementation for Derby database. */ -public class DerbyRepositoryHandler implements JdbcRepositoryHandler { +public class DerbyRepositoryHandler extends JdbcRepositoryHandler { private static final Logger LOG = Logger.getLogger(DerbyRepositoryHandler.class); @@ -86,15 +86,54 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler { public void registerConnector(MConnector mc, Connection conn) { if (mc.hasPersistenceId()) { throw new SqoopException(DerbyRepoError.DERBYREPO_0011, - mc.getUniqueName()); + mc.getUniqueName()); } + mc.setPersistenceId(getConnectorId(mc, conn)); + insertFormsForConnector(mc, conn); + } - PreparedStatement baseConnectorStmt = null; + /** + * Helper method to insert the forms from the MConnector into the + * repository. The job and connector forms within mc will get + * updated with the id of the forms when this function returns. + * @param mc The connector to use for updating forms + * @param conn JDBC connection to use for updating the forms + */ + private void insertFormsForConnector (MConnector mc, Connection conn) { + long connectorId = mc.getPersistenceId(); PreparedStatement baseFormStmt = null; PreparedStatement baseInputStmt = null; + try{ + baseFormStmt = conn.prepareStatement(STMT_INSERT_FORM_BASE, + Statement.RETURN_GENERATED_KEYS); + + baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE, + Statement.RETURN_GENERATED_KEYS); + + // Register connector forms + registerForms(connectorId, null, mc.getConnectionForms().getForms(), + MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt); + + // Register all jobs + for (MJobForms jobForms : mc.getAllJobsForms().values()) { + registerForms(connectorId, jobForms.getType(), jobForms.getForms(), + MFormType.JOB.name(), baseFormStmt, baseInputStmt); + } + + } catch (SQLException ex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0014, + mc.toString(), ex); + } finally { + closeStatements(baseFormStmt, baseInputStmt); + } + + } + + private long getConnectorId(MConnector mc, Connection conn) { + PreparedStatement baseConnectorStmt = null; try { baseConnectorStmt = conn.prepareStatement(STMT_INSERT_CONNECTOR_BASE, - Statement.RETURN_GENERATED_KEYS); + Statement.RETURN_GENERATED_KEYS); baseConnectorStmt.setString(1, mc.getUniqueName()); baseConnectorStmt.setString(2, mc.getClassName()); baseConnectorStmt.setString(3, mc.getVersion()); @@ -110,31 +149,12 @@ public void registerConnector(MConnector mc, Connection conn) { if (!rsetConnectorId.next()) { throw new SqoopException(DerbyRepoError.DERBYREPO_0013); } - - long connectorId = rsetConnectorId.getLong(1); - mc.setPersistenceId(connectorId); - - baseFormStmt = conn.prepareStatement(STMT_INSERT_FORM_BASE, - Statement.RETURN_GENERATED_KEYS); - - baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE, - Statement.RETURN_GENERATED_KEYS); - - // Register connector forms - registerForms(connectorId, null, mc.getConnectionForms().getForms(), - MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt); - - // Register all jobs - for (MJobForms jobForms : mc.getAllJobsForms().values()) { - registerForms(connectorId, jobForms.getType(), jobForms.getForms(), - MFormType.JOB.name(), baseFormStmt, baseInputStmt); - } - + return rsetConnectorId.getLong(1); } catch (SQLException ex) { throw new SqoopException(DerbyRepoError.DERBYREPO_0014, - mc.toString(), ex); + mc.toString(), ex); } finally { - closeStatements(baseConnectorStmt, baseFormStmt, baseInputStmt); + closeStatements(baseConnectorStmt); } } @@ -227,7 +247,6 @@ public boolean schemaExists() { } String sqoopSchemaId = rset.getString(1); LOG.debug("SQOOP schema ID: " + sqoopSchemaId); - connection.commit(); } catch (SQLException ex) { if (connection != null) { @@ -550,21 +569,33 @@ public boolean inUseConnection(long connectionId, Connection conn) { @Override public void deleteConnection(long id, Connection conn) { PreparedStatement dltConn = null; - PreparedStatement dltConnInput = null; + try { - dltConnInput = conn.prepareStatement(STMT_DELETE_CONNECTION_INPUT); + deleteConnectionInputs(id, conn); dltConn = conn.prepareStatement(STMT_DELETE_CONNECTION); - - dltConnInput.setLong(1, id); dltConn.setLong(1, id); - - dltConnInput.executeUpdate(); dltConn.executeUpdate(); - } catch (SQLException ex) { throw new SqoopException(DerbyRepoError.DERBYREPO_0022, ex); } finally { - closeStatements(dltConn, dltConnInput); + closeStatements(dltConn); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void deleteConnectionInputs(long id, Connection conn) { + PreparedStatement dltConnInput = null; + try { + dltConnInput = conn.prepareStatement(STMT_DELETE_CONNECTION_INPUT); + dltConnInput.setLong(1, id); + dltConnInput.executeUpdate(); + } catch (SQLException ex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0022, ex); + } finally { + closeStatements(dltConnInput); } } @@ -613,6 +644,63 @@ public List findConnections(Connection conn) { } } + + /** + * + * {@inheritDoc} + * + */ + @Override + public List findConnectionsForConnector(long + connectorID, Connection conn) { + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(STMT_SELECT_CONNECTION_FOR_CONNECTOR); + stmt.setLong(1, connectorID); + + return loadConnections(stmt, conn); + + } catch (SQLException ex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0023, ex); + } finally { + closeStatements(stmt); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void updateConnector(MConnector mConnector, Connection conn) { + PreparedStatement updateConnectorStatement = null; + PreparedStatement deleteForm = null; + PreparedStatement deleteInput = null; + try { + updateConnectorStatement = conn.prepareStatement(STMT_UPDATE_CONNECTOR); + deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONNECTOR); + deleteForm = conn.prepareStatement(STMT_DELETE_FORMS_FOR_CONNECTOR); + updateConnectorStatement.setString(1, mConnector.getUniqueName()); + updateConnectorStatement.setString(2, mConnector.getClassName()); + updateConnectorStatement.setString(3, mConnector.getVersion()); + updateConnectorStatement.setLong(4, mConnector.getPersistenceId()); + + if (updateConnectorStatement.executeUpdate() != 1) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0038); + } + deleteInput.setLong(1, mConnector.getPersistenceId()); + deleteForm.setLong(1, mConnector.getPersistenceId()); + deleteInput.executeUpdate(); + deleteForm.executeUpdate(); + + } catch (SQLException e) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0038, e); + } finally { + closeStatements(updateConnectorStatement, deleteForm, deleteInput); + } + insertFormsForConnector(mConnector, conn); + + } + /** * {@inheritDoc} */ @@ -746,21 +834,32 @@ public boolean inUseJob(long jobId, Connection conn) { @Override public void deleteJob(long id, Connection conn) { PreparedStatement dlt = null; - PreparedStatement dltInput = null; try { - dltInput = conn.prepareStatement(STMT_DELETE_JOB_INPUT); + deleteJobInputs(id, conn); dlt = conn.prepareStatement(STMT_DELETE_JOB); - - dltInput.setLong(1, id); dlt.setLong(1, id); - - dltInput.executeUpdate(); dlt.executeUpdate(); - } catch (SQLException ex) { throw new SqoopException(DerbyRepoError.DERBYREPO_0028, ex); } finally { - closeStatements(dlt, dltInput); + closeStatements(dlt); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void deleteJobInputs(long id, Connection conn) { + PreparedStatement dltInput = null; + try { + dltInput = conn.prepareStatement(STMT_DELETE_JOB_INPUT); + dltInput.setLong(1, id); + dltInput.executeUpdate(); + } catch (SQLException ex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0028, ex); + } finally { + closeStatements(dltInput); } } @@ -809,6 +908,24 @@ public List findJobs(Connection conn) { } } + /** + * {@inheritDoc} + */ + @Override + public List findJobsForConnector(long connectorId, Connection conn) { + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(STMT_SELECT_ALL_JOBS_FOR_CONNECTOR); + stmt.setLong(1, connectorId); + return loadJobs(stmt, conn); + + } catch (SQLException ex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0031, ex); + } finally { + closeStatements(stmt); + } + } + /** * {@inheritDoc} */ @@ -1297,7 +1414,8 @@ private List loadJobs(PreparedStatement stmt, } /** - * Register forms in derby database. + * Register forms in derby database. This method will insert the ids + * generated by the repository into the forms passed in itself. * * Use given prepared statements to create entire form structure in database. * diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java index ea458ac2..4968c0d8 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java @@ -447,6 +447,30 @@ public final class DerbySchemaQuery { + COLUMN_SQI_ENUMVALS + ") VALUES (?, ?, ?, ?, ?, ?, ?)"; + // Delete all forms for a given connector + public static final String STMT_DELETE_FORMS_FOR_CONNECTOR = + "DELETE FROM " + TABLE_SQ_FORM + + " WHERE " + COLUMN_SQN_CONNECTOR + " = ?"; + + // Delete all inputs for a given connector + public static final String STMT_DELETE_INPUTS_FOR_CONNECTOR = + "DELETE FROM " + TABLE_SQ_INPUT + + " WHERE " + + COLUMN_SQI_FORM + + " IN (SELECT " + + COLUMN_SQF_ID + + " FROM " + TABLE_SQ_FORM + + " WHERE " + + COLUMN_SQF_CONNECTOR + " = ?)"; + + // Update the connector + public static final String STMT_UPDATE_CONNECTOR = + "UPDATE " + TABLE_SQ_CONNECTOR + + " SET " + COLUMN_SQC_NAME + " = ?, " + + COLUMN_SQC_CLASS + " = ?, " + + COLUMN_SQC_VERSION + " = ? " + + " WHERE " + COLUMN_SQC_ID + " = ?"; + // DML: Insert new connection public static final String STMT_INSERT_CONNECTION = "INSERT INTO " + TABLE_SQ_CONNECTION + " (" @@ -502,6 +526,17 @@ public final class DerbySchemaQuery { + COLUMN_SQN_UPDATE_DATE + " FROM " + TABLE_SQ_CONNECTION; + // DML: Select all connections for a specific connector. + public static final String STMT_SELECT_CONNECTION_FOR_CONNECTOR = + "SELECT " + + COLUMN_SQN_ID + ", " + + COLUMN_SQN_NAME + ", " + + COLUMN_SQN_CONNECTOR + ", " + + COLUMN_SQN_CREATION_DATE + ", " + + COLUMN_SQN_UPDATE_DATE + + " FROM " + TABLE_SQ_CONNECTION + + " WHERE " + COLUMN_SQN_CONNECTOR + " = ?"; + // DML: Check if given connection exists public static final String STMT_SELECT_CONNECTION_CHECK = "SELECT count(*) FROM " + TABLE_SQ_CONNECTION @@ -567,7 +602,7 @@ public final class DerbySchemaQuery { + COLUMN_SQB_UPDATE_DATE + " FROM " + TABLE_SQ_JOB + " LEFT JOIN " + TABLE_SQ_CONNECTION - + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID + + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID + " WHERE " + COLUMN_SQB_ID + " = ?"; // DML: Select all jobs @@ -584,6 +619,21 @@ public final class DerbySchemaQuery { + " LEFT JOIN " + TABLE_SQ_CONNECTION + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID; + // DML: Select all jobs for a Connector + public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR = + "SELECT " + + COLUMN_SQN_CONNECTOR + ", " + + COLUMN_SQB_ID + ", " + + COLUMN_SQB_NAME + ", " + + COLUMN_SQB_CONNECTION + ", " + + COLUMN_SQB_TYPE + ", " + + COLUMN_SQB_CREATION_DATE + ", " + + COLUMN_SQB_UPDATE_DATE + + " FROM " + TABLE_SQ_JOB + + " LEFT JOIN " + TABLE_SQ_CONNECTION + + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID + + " AND " + COLUMN_SQN_CONNECTOR + " = ? "; + // DML: Insert new submission public static final String STMT_INSERT_SUBMISSION = "INSERT INTO " + TABLE_SQ_SUBMISSION + "(" diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/MetadataUpgrader.java b/spi/src/main/java/org/apache/sqoop/connector/spi/MetadataUpgrader.java new file mode 100644 index 00000000..d840a782 --- /dev/null +++ b/spi/src/main/java/org/apache/sqoop/connector/spi/MetadataUpgrader.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.connector.spi; + +import org.apache.sqoop.model.MConnection; +import org.apache.sqoop.model.MConnectionForms; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MJobForms; + +public abstract class MetadataUpgrader { + + /** + * Upgrade the original connection and fill into the upgradeTarget. Note + * that any metadata already in {@code upgradeTarget} maybe overwritten. + * @param original - original connection metadata as in the repository + * @param upgradeTarget - the instance that will be filled in with the + * upgraded metadata. + */ + public abstract void upgrade(MConnectionForms original, + MConnectionForms upgradeTarget); + /** + * Upgrade the original job and fill into the upgradeTarget. Note + * that any metadata already in {@code upgradeTarget} maybe overwritten. + * This method must be called only after the connection metadata has + * already been upgraded. + * @param original - original connection metadata as in the repository + * @param upgradeTarget - the instance that will be filled in with the + * upgraded metadata. + */ + public abstract void upgrade(MJobForms original, MJobForms upgradeTarget); +} diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java index 540303af..2becc563 100644 --- a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java +++ b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java @@ -72,4 +72,11 @@ public abstract class SqoopConnector { */ public abstract Validator getValidator(); + /** + * Returns an {@linkplain MetadataUpgrader} object that can upgrade the + * connection and job metadata. + * @return MetadataUpgrader object + */ + public abstract MetadataUpgrader getMetadataUpgrader(); + }