diff --git a/common/src/main/java/org/apache/sqoop/model/MConnector.java b/common/src/main/java/org/apache/sqoop/model/MConnector.java index 174d0b9d..1b9462e2 100644 --- a/common/src/main/java/org/apache/sqoop/model/MConnector.java +++ b/common/src/main/java/org/apache/sqoop/model/MConnector.java @@ -181,6 +181,10 @@ public String getVersion() { return version; } + public MConfigurableType getType() { + return MConfigurableType.CONNECTOR; + } + public SupportedDirections getSupportedDirections() { return new SupportedDirections(this.getConfig(Direction.FROM) != null, this.getConfig(Direction.TO) != null); diff --git a/common/src/main/java/org/apache/sqoop/model/MDriver.java b/common/src/main/java/org/apache/sqoop/model/MDriver.java index 4241a313..cc475117 100644 --- a/common/src/main/java/org/apache/sqoop/model/MDriver.java +++ b/common/src/main/java/org/apache/sqoop/model/MDriver.java @@ -17,15 +17,17 @@ */ package org.apache.sqoop.model; -import java.sql.Driver; /** * Describes the configs associated with the {@link Driver} for executing sqoop jobs. */ public final class MDriver extends Configurable { + public static final String DRIVER_NAME = "SqoopDriver"; private final MDriverConfig driverConfig; - private final String version; + private String version; + // Since there is only one Driver in the system, the name is not user specified + private static final String uniqueName = DRIVER_NAME; public MDriver(MDriverConfig driverConfig, String version) { this.driverConfig = driverConfig; @@ -68,6 +70,14 @@ public MDriverConfig getDriverConfig() { return driverConfig; } + public MConfigurableType getType() { + return MConfigurableType.DRIVER; + } + + public String getUniqueName() { + return uniqueName; + } + @Override public MDriver clone(boolean cloneWithValue) { cloneWithValue = false; @@ -79,4 +89,8 @@ public MDriver clone(boolean cloneWithValue) { public String getVersion() { return version; } + + public void setVersion(String version) { + this.version = version; + } } \ No newline at end of file diff --git a/core/src/main/java/org/apache/sqoop/driver/Driver.java b/core/src/main/java/org/apache/sqoop/driver/Driver.java index 46a16ac9..6942891e 100644 --- a/core/src/main/java/org/apache/sqoop/driver/Driver.java +++ b/core/src/main/java/org/apache/sqoop/driver/Driver.java @@ -158,6 +158,10 @@ public MDriver getDriver() { return mDriver; } + public static String getClassName() { + return Driver.getInstance().getClass().getSimpleName(); + } + public ResourceBundle getBundle(Locale locale) { return ResourceBundle.getBundle(DriverConstants.DRIVER_CONFIG_BUNDLE, locale); } diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java index 476830d0..d7b526ab 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java @@ -23,10 +23,11 @@ import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.model.MLink; +import org.apache.sqoop.driver.Driver; import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MDriver; import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; import org.apache.sqoop.model.MSubmission; public class JdbcRepository extends Repository { @@ -220,7 +221,7 @@ public MDriver registerDriver(final MDriver mDriver, final boolean autoUpgrade) return (MDriver) doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { - MDriver existingDriverConfig = handler.findDriver(conn); + MDriver existingDriverConfig = handler.findDriver(mDriver.getUniqueName(), conn); if (existingDriverConfig == null) { handler.registerDriver(mDriver, conn); return mDriver; @@ -233,7 +234,7 @@ public Object doIt(Connection conn) { return mDriver; } else { throw new SqoopException(RepositoryError.JDBCREPO_0026, - "DriverConfig: " + mDriver.getPersistenceId()); + "Driver: " + mDriver.getPersistenceId()); } } return existingDriverConfig; @@ -242,6 +243,19 @@ public Object doIt(Connection conn) { }); } + /** + * {@inheritDoc} + */ + @Override + public MDriver findDriver(final String shortName) { + return (MDriver) doWithConnection(new DoWithConnection() { + @Override + public Object doIt(Connection conn) throws Exception { + return handler.findDriver(shortName, conn); + } + }); + } + /** * {@inheritDoc} */ @@ -648,23 +662,23 @@ public Object doIt(Connection conn) throws Exception { * {@inheritDoc} */ @Override - protected void upgradeConnectorConfigs(final MConnector newConnector, + protected void upgradeConnectorAndConfigs(final MConnector newConnector, RepositoryTransaction tx) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) throws Exception { - handler.upgradeConnectorConfigs(newConnector, conn); + handler.upgradeConnectorAndConfigs(newConnector, conn); return null; } }, (JdbcRepositoryTransaction) tx); } - protected void upgradeDriverConfigs(final MDriver mDriver, RepositoryTransaction tx) { + protected void upgradeDriverAndConfigs(final MDriver mDriver, RepositoryTransaction tx) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) throws Exception { - handler.upgradeDriverConfigs(mDriver, conn); + handler.upgradeDriverAndConfigs(mDriver, conn); return null; } }, (JdbcRepositoryTransaction) tx); diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java index 4c5229fd..7d788262 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java @@ -21,10 +21,10 @@ import java.util.Date; import java.util.List; -import org.apache.sqoop.model.MLink; import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MDriver; import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; import org.apache.sqoop.model.MSubmission; /** @@ -41,7 +41,7 @@ public abstract class JdbcRepositoryHandler { /** * Search for connector with given name in repository. - * And return corresponding connector structure. + * And return corresponding connector entity. * * @param shortName Connector unique name * @param conn JDBC link for querying repository. @@ -101,8 +101,7 @@ public abstract List findJobsForConnector(long connectorID, * @param conn JDBC link for querying repository */ - public abstract void upgradeConnectorConfigs(MConnector mConnector, Connection conn); - + public abstract void upgradeConnectorAndConfigs(MConnector mConnector, Connection conn); /** * Upgrade the driver with the new data supplied in the @@ -117,17 +116,16 @@ public abstract List findJobsForConnector(long connectorID, * the driverConfig. * @param conn JDBC link for querying repository */ - public abstract void upgradeDriverConfigs(MDriver mDriver, Connection conn); - + public abstract void upgradeDriverAndConfigs(MDriver mDriver, Connection conn); /** - * Search for driverConfigin the repository. - * + * Search for driver in the repository. + * @params shortName the name for the driver * @param conn JDBC link for querying repository. - * @return null if driverConfig are not yet present in repository or + * @return null if driver are not yet present in repository or * loaded representation. */ - public abstract MDriver findDriver(Connection conn); + public abstract MDriver findDriver(String shortName, Connection conn); /** * Register driver config in repository. diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java index 8f780526..bd2a3bea 100644 --- a/core/src/main/java/org/apache/sqoop/repository/Repository.java +++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java @@ -118,6 +118,14 @@ public abstract class Repository { */ public abstract List findConnectors(); + /** + * Search for driver in the repository. + * @param shortName Driver unique name + * @return null if driver are not yet present in repository or + * loaded representation. + */ + public abstract MDriver findDriver(String shortName); + /** * Save given link to repository. This link must not be already * present in the repository otherwise exception will be thrown. @@ -317,7 +325,7 @@ public abstract class Repository { * method will not call begin, commit, * rollback or close on this transaction. */ - protected abstract void upgradeConnectorConfigs(MConnector newConnector, RepositoryTransaction tx); + protected abstract void upgradeConnectorAndConfigs(MConnector newConnector, RepositoryTransaction tx); /** * Upgrade the driver with the new data supplied in the @@ -335,7 +343,7 @@ public abstract class Repository { * method will not call begin, commit, * rollback or close on this transaction. */ - protected abstract void upgradeDriverConfigs(MDriver newDriver, RepositoryTransaction tx); + protected abstract void upgradeDriverAndConfigs(MDriver newDriver, RepositoryTransaction tx); /** * Delete all inputs for a job @@ -410,7 +418,7 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec deletelinksAndJobs(existingLinksByConnector, existingJobsByConnector, tx); // 5. Delete all inputs and configs associated with the connector, and // insert the new configs and inputs for this connector - upgradeConnectorConfigs(newConnector, tx); + upgradeConnectorAndConfigs(newConnector, tx); // 6. Run upgrade logic for the configs related to the link objects // dont always rely on the repository implementation to return empty list for links if (existingLinksByConnector != null) { @@ -514,7 +522,7 @@ public final void upgradeDriver(MDriver driver) { deleteJobs(existingJobs, tx); // 4. Delete all inputs and configs associated with the driver, and // insert the new configs and inputs for this driver - upgradeDriverConfigs(driver, tx); + upgradeDriverAndConfigs(driver, tx); for (MJob job : existingJobs) { // Make a new copy of the configs diff --git a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java index ff9e0c33..ae0e9222 100644 --- a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java +++ b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java @@ -167,10 +167,10 @@ public void testConnectorDisableAutoUpgrade() { */ @Test public void testDriverConfigEnableAutoUpgrade() { - MDriver newDriverConfig = driver(); - MDriver oldDriverConfig = anotherDriver(); + MDriver newDriver = driver(); + MDriver oldDriver = anotherDriver(); - when(repoHandlerMock.findDriver(any(Connection.class))).thenReturn(oldDriverConfig); + when(repoHandlerMock.findDriver(anyString(), any(Connection.class))).thenReturn(oldDriver); // make the upgradeDriverConfig to throw an exception to prove that it has been called SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, @@ -178,10 +178,10 @@ public void testDriverConfigEnableAutoUpgrade() { doThrow(exception).when(repoHandlerMock).findJobs(any(Connection.class)); try { - repoSpy.registerDriver(newDriverConfig, true); + repoSpy.registerDriver(newDriver, true); } catch (SqoopException ex) { assertEquals(ex.getMessage(), exception.getMessage()); - verify(repoHandlerMock, times(1)).findDriver(any(Connection.class)); + verify(repoHandlerMock, times(1)).findDriver(anyString(), any(Connection.class)); verify(repoHandlerMock, times(1)).findJobs(any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; @@ -195,16 +195,16 @@ public void testDriverConfigEnableAutoUpgrade() { */ @Test public void testDriverConfigDisableAutoUpgrade() { - MDriver newDriverConfig = driver(); - MDriver oldDriverConfig = anotherDriver(); + MDriver newDriver = driver(); + MDriver oldDriver = anotherDriver(); - when(repoHandlerMock.findDriver(any(Connection.class))).thenReturn(oldDriverConfig); + when(repoHandlerMock.findDriver(anyString(), any(Connection.class))).thenReturn(oldDriver); try { - repoSpy.registerDriver(newDriverConfig, false); + repoSpy.registerDriver(newDriver, false); } catch (SqoopException ex) { assertEquals(ex.getErrorCode(), RepositoryError.JDBCREPO_0026); - verify(repoHandlerMock, times(1)).findDriver(any(Connection.class)); + verify(repoHandlerMock, times(1)).findDriver(anyString(),any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; } @@ -242,7 +242,7 @@ public void testConnectorConfigUpgradeWithValidLinksAndJobs() { doReturn(jobList).when(repoSpy).findJobsForConnector(anyLong()); doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class)); doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class)); - doNothing().when(repoSpy).upgradeConnectorConfigs(any(MConnector.class), any(RepositoryTransaction.class)); + doNothing().when(repoSpy).upgradeConnectorAndConfigs(any(MConnector.class), any(RepositoryTransaction.class)); repoSpy.upgradeConnector(oldConnector, newConnector); @@ -258,7 +258,7 @@ public void testConnectorConfigUpgradeWithValidLinksAndJobs() { repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(1, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(2, repoTransactionMock); - repoOrder.verify(repoSpy, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(RepositoryTransaction.class)); + repoOrder.verify(repoSpy, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(RepositoryTransaction.class)); repoOrder.verify(repoSpy, times(2)).updateLink(any(MLink.class), any(RepositoryTransaction.class)); repoOrder.verify(repoSpy, times(4)).updateJob(any(MJob.class), any(RepositoryTransaction.class)); repoOrder.verifyNoMoreInteractions(); @@ -296,7 +296,7 @@ public void testDriverConfigUpgradeWithValidJobs() { doReturn(jobList).when(repoSpy).findJobs(); doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class)); doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class)); - doNothing().when(repoSpy).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class)); + doNothing().when(repoSpy).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class)); repoSpy.upgradeDriver(newDriverConfig); @@ -309,7 +309,7 @@ public void testDriverConfigUpgradeWithValidJobs() { repoOrder.verify(repoSpy, times(1)).getTransaction(); repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock); - repoOrder.verify(repoSpy, times(1)).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class)); + repoOrder.verify(repoSpy, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class)); repoOrder.verify(repoSpy, times(2)).updateJob(any(MJob.class), any(RepositoryTransaction.class)); repoOrder.verifyNoMoreInteractions(); txOrder.verify(repoTransactionMock, times(1)).begin(); @@ -339,7 +339,7 @@ public void testDriverConfigUpgradeWithInvalidJobs() { doReturn(jobList).when(repoSpy).findJobs(); doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class)); - doNothing().when(repoSpy).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class)); + doNothing().when(repoSpy).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class)); try { repoSpy.upgradeDriver(newDriverConfig); @@ -355,7 +355,7 @@ public void testDriverConfigUpgradeWithInvalidJobs() { repoOrder.verify(repoSpy, times(1)).getTransaction(); repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock); - repoOrder.verify(repoSpy, times(1)).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class)); + repoOrder.verify(repoSpy, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class)); repoOrder.verifyNoMoreInteractions(); txOrder.verify(repoTransactionMock, times(1)).begin(); txOrder.verify(repoTransactionMock, times(1)).rollback(); @@ -535,7 +535,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateConnectorError() { SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, "update connector error."); - doThrow(exception).when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class)); + doThrow(exception).when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class)); try { repoSpy.upgradeConnector(oldConnector, newConnector); @@ -545,7 +545,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateConnectorError() { verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class)); + verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; } @@ -577,7 +577,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateLinkError() { doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class)); doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class)); - doNothing().when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class)); + doNothing().when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class)); doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class)); SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, @@ -592,7 +592,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateLinkError() { verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class)); + verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class)); verify(repoHandlerMock, times(1)).existsLink(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(1)).updateLink(any(MLink.class), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); @@ -626,7 +626,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateJobError() { doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class)); doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class)); - doNothing().when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class)); + doNothing().when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class)); doNothing().when(repoHandlerMock).updateLink(any(MLink.class), any(Connection.class)); doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class)); doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class)); @@ -643,7 +643,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateJobError() { verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class)); + verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class)); verify(repoHandlerMock, times(2)).existsLink(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).updateLink(any(MLink.class), any(Connection.class)); verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class)); @@ -731,7 +731,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateDriverConfigError() { SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, "update driverConfig entity error."); - doThrow(exception).when(repoHandlerMock).upgradeDriverConfigs(any(MDriver.class), any(Connection.class)); + doThrow(exception).when(repoHandlerMock).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class)); try { repoSpy.upgradeDriver(newDriverConfig); @@ -739,7 +739,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateDriverConfigError() { assertEquals(ex.getMessage(), exception.getMessage()); verify(repoHandlerMock, times(1)).findJobs(any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(1)).upgradeDriverConfigs(any(MDriver.class), any(Connection.class)); + verify(repoHandlerMock, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; } @@ -764,7 +764,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateJobError() { List jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class)); doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); - doNothing().when(repoHandlerMock).upgradeDriverConfigs(any(MDriver.class), any(Connection.class)); + doNothing().when(repoHandlerMock).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class)); doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class)); SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, @@ -777,7 +777,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateJobError() { assertEquals(ex.getMessage(), exception.getMessage()); verify(repoHandlerMock, times(1)).findJobs(any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(1)).upgradeDriverConfigs(any(MDriver.class), any(Connection.class)); + verify(repoHandlerMock, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class)); verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(1)).updateJob(any(MJob.class), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java index 40dcc498..8fbf47fc 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java @@ -22,13 +22,9 @@ public final class DerbyRepoConstants { public static final String CONF_PREFIX_DERBY = "derby."; @Deprecated - // use only for the upgrade code should be removed soon + // use only for the upgrade code public static final String SYSKEY_VERSION = "version"; - - public static final String SYSKEY_DERBY_REPOSITORY_VERSION = "version"; - - // TOOD(VB): SQOOP-1557 move the driver config version to the SQ_CONFIGURABLE, IT SHOULD NOT BE HERE, nor stored in SYSTEM table - public static final String SYSKEY_DRIVER_CONFIG_VERSION = "driver.config.version"; + public static final String SYSKEY_DERBY_REPOSITORY_VERSION = "repository.version"; /** * Expected version of the repository structures. diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java index 3e4a4a90..aad219eb 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java @@ -188,8 +188,12 @@ public enum DerbyRepoError implements ErrorCode { DERBYREPO_0048("Could not register config direction"), - DERBYREPO_0049("Could not set connector direction") - ; + DERBYREPO_0049("Could not set connector direction"), + + /** The system was unable to register driver due to a server error **/ + DERBYREPO_0050("Registration of driver failed"), + + ; private final String message; diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java index aa58850f..633e9dfa 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java @@ -45,9 +45,11 @@ import org.apache.sqoop.common.SupportedDirections; import org.apache.sqoop.connector.ConnectorHandler; import org.apache.sqoop.connector.ConnectorManagerUtils; +import org.apache.sqoop.driver.Driver; import org.apache.sqoop.model.MBooleanInput; import org.apache.sqoop.model.MConfig; import org.apache.sqoop.model.MConfigType; +import org.apache.sqoop.model.MConfigurableType; import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MDriver; import org.apache.sqoop.model.MDriverConfig; @@ -102,7 +104,7 @@ public void registerConnector(MConnector mc, Connection conn) { throw new SqoopException(DerbyRepoError.DERBYREPO_0011, mc.getUniqueName()); } - mc.setPersistenceId(getConnectorId(mc, conn)); + mc.setPersistenceId(insertAndGetConnectorId(mc, conn)); insertConfigsForConnector(mc, conn); } @@ -116,10 +118,10 @@ private void insertConfigsForDriver(MDriver mDriver, Connection conn) { PreparedStatement baseConfigStmt = null; PreparedStatement baseInputStmt = null; try{ - baseConfigStmt = conn.prepareStatement(STMT_INSERT_CONFIG_BASE, + baseConfigStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIG, Statement.RETURN_GENERATED_KEYS); - baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE, + baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT, Statement.RETURN_GENERATED_KEYS); // Register the job config type, since driver config is per job @@ -145,15 +147,14 @@ private void insertConfigsForConnector (MConnector mc, Connection conn) { PreparedStatement baseConfigStmt = null; PreparedStatement baseInputStmt = null; try{ - baseConfigStmt = conn.prepareStatement(STMT_INSERT_CONFIG_BASE, + baseConfigStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIG, Statement.RETURN_GENERATED_KEYS); - baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE, + baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT, Statement.RETURN_GENERATED_KEYS); - // Register link type config for connector - // NOTE: The direction is null for LINK type - registerConfigs(connectorId, null, mc.getLinkConfig().getConfigs(), + // Register link type config + registerConfigs(connectorId, null /* No direction for LINK type config*/, mc.getLinkConfig().getConfigs(), MConfigType.LINK.name(), baseConfigStmt, baseInputStmt, conn); // Register both from/to job type config for connector @@ -202,19 +203,20 @@ private void insertConnectorDirections(Long connectorId, SupportedDirections dir } } - private long getConnectorId(MConnector mc, Connection conn) { + private long insertAndGetConnectorId(MConnector mc, Connection conn) { PreparedStatement baseConnectorStmt = null; try { - baseConnectorStmt = conn.prepareStatement(STMT_INSERT_CONNECTOR_BASE, - Statement.RETURN_GENERATED_KEYS); + baseConnectorStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIGURABLE, + Statement.RETURN_GENERATED_KEYS); baseConnectorStmt.setString(1, mc.getUniqueName()); baseConnectorStmt.setString(2, mc.getClassName()); baseConnectorStmt.setString(3, mc.getVersion()); + baseConnectorStmt.setString(4, mc.getType().name()); int baseConnectorCount = baseConnectorStmt.executeUpdate(); if (baseConnectorCount != 1) { throw new SqoopException(DerbyRepoError.DERBYREPO_0012, - Integer.toString(baseConnectorCount)); + Integer.toString(baseConnectorCount)); } ResultSet rsetConnectorId = baseConnectorStmt.getGeneratedKeys(); @@ -222,19 +224,44 @@ private long getConnectorId(MConnector mc, Connection conn) { if (!rsetConnectorId.next()) { throw new SqoopException(DerbyRepoError.DERBYREPO_0013); } - - insertConnectorDirections(rsetConnectorId.getLong(1), - mc.getSupportedDirections(), conn); - + // connector configurable also have directions + insertConnectorDirections(rsetConnectorId.getLong(1), mc.getSupportedDirections(), conn); return rsetConnectorId.getLong(1); } catch (SQLException ex) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0014, - mc.toString(), ex); + throw new SqoopException(DerbyRepoError.DERBYREPO_0014, mc.toString(), ex); } finally { closeStatements(baseConnectorStmt); } } + private long insertAndGetDriverId(MDriver mDriver, Connection conn) { + PreparedStatement baseDriverStmt = null; + try { + baseDriverStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIGURABLE, + Statement.RETURN_GENERATED_KEYS); + baseDriverStmt.setString(1, mDriver.getUniqueName()); + baseDriverStmt.setString(2, Driver.getClassName()); + baseDriverStmt.setString(3, mDriver.getVersion()); + baseDriverStmt.setString(4, mDriver.getType().name()); + + int baseDriverCount = baseDriverStmt.executeUpdate(); + if (baseDriverCount != 1) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0012, Integer.toString(baseDriverCount)); + } + + ResultSet rsetDriverId = baseDriverStmt.getGeneratedKeys(); + + if (!rsetDriverId.next()) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0013); + } + return rsetDriverId.getLong(1); + } catch (SQLException ex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0050, mDriver.toString(), ex); + } finally { + closeStatements(baseDriverStmt); + } + } + /** * {@inheritDoc} */ @@ -350,59 +377,6 @@ public int detectRepositoryVersion(Connection conn) { } } - /** - * Detect version of the driver - * - * @param conn Connection to the repository - * @return Version of the Driver - */ - private String detectDriverVersion (Connection conn) { - ResultSet rs = null; - PreparedStatement stmt = null; - try { - stmt = conn.prepareStatement(DerbySchemaQuery.STMT_SELECT_SYSTEM); - stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION); - rs = stmt.executeQuery(); - if(!rs.next()) { - return null; - } - return rs.getString(1); - } catch (SQLException e) { - LOG.info("Can't fetch driver version.", e); - return null; - } finally { - closeResultSets(rs); - closeStatements(stmt); - } - } - - /** - * Create or update driver version - * @param conn Connection to the the repository - * @param mDriver - */ - private void createOrUpdateDriverSystemVersion(Connection conn, String version) { - ResultSet rs = null; - PreparedStatement stmt = null; - try { - stmt = conn.prepareStatement(STMT_DELETE_SYSTEM); - stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION); - stmt.executeUpdate(); - closeStatements(stmt); - - stmt = conn.prepareStatement(STMT_INSERT_SYSTEM); - stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION); - stmt.setString(2, version); - stmt.executeUpdate(); - } catch (SQLException e) { - logException(e); - throw new SqoopException(DerbyRepoError.DERBYREPO_0044, e); - } finally { - closeResultSets(rs); - closeStatements(stmt); - } - } - /** * {@inheritDoc} */ @@ -460,9 +434,11 @@ public void createOrUpgradeRepository(Connection conn) { runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE, conn); // SQOOP-1498 rename entities - renameEntitiesForUpgrade(conn); + renameEntitiesForConnectionAndForm(conn); // Change direction from VARCHAR to BIGINT + foreign key. updateDirections(conn, insertDirections(conn)); + + renameConnectorToConfigurable(conn); } // Add unique constraints on job and links for version 4 onwards if (repositoryVersion > 3) { @@ -474,7 +450,7 @@ public void createOrUpgradeRepository(Connection conn) { } // SQOOP-1498 refactoring related upgrades for table and column names - void renameEntitiesForUpgrade(Connection conn) { + void renameEntitiesForConnectionAndForm(Connection conn) { // LINK // drop the constraint before rename runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_1, conn); @@ -491,6 +467,10 @@ void renameEntitiesForUpgrade(Connection conn) { runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_7, conn); runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_8, conn); + // rename constraints + runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONNECTOR_CONSTRAINT, conn); + runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONNECTOR_CONSTRAINT, conn); + LOG.info("LINK TABLE altered"); // LINK_INPUT @@ -511,6 +491,8 @@ void renameEntitiesForUpgrade(Connection conn) { runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_4, conn); runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_5, conn); runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_6, conn); + runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_FORM_CONNECTOR_CONSTRAINT, conn); + runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT, conn); LOG.info("CONFIG TABLE altered"); @@ -528,7 +510,24 @@ void renameEntitiesForUpgrade(Connection conn) { runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_TO, conn); LOG.info("JOB TABLE altered and constraints added"); + } + private void renameConnectorToConfigurable(Connection conn) { + // SQ_CONNECTOR to SQ_CONFIGURABLE upgrade + runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT, conn); + runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_LINK_CONSTRAINT, conn); + runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT, conn); + + runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTOR_TO_SQ_CONFIGURABLE, conn); + runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONFIG_COLUMN_1, conn); + runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_LINK_COLUMN_1, conn); + runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_COLUMN_SQC_TYPE, conn); + + runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONFIGURABLE_CONSTRAINT, conn); + runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONFIGURABLE_CONSTRAINT, conn); + runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT, conn); + + LOG.info("CONNECTOR TABLE altered and constraints added for CONFIGURABLE"); } private void upgradeRepositoryVersion(Connection conn) { @@ -538,7 +537,6 @@ private void upgradeRepositoryVersion(Connection conn) { runQuery(STMT_INSERT_SYSTEM, conn, DerbyRepoConstants.SYSKEY_DERBY_REPOSITORY_VERSION, "" + DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION); } - /** * Insert directions: FROM and TO. * @param conn @@ -643,6 +641,7 @@ protected void updateDirections(Connection conn, Map directionM } } + /** * Upgrade job data from IMPORT/EXPORT to FROM/TO. * Since the framework is no longer responsible for HDFS, @@ -712,13 +711,13 @@ private void updateJobRepositorySchemaAndData(Connection conn, long connectorId) runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DRIVER_INDEX, conn, new Long(0), "throttling"); - Long linkId = createHdfsConnection(conn, connectorId); + Long connectionId = createHdfsConnection(conn, connectorId); runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION_COPY_SQB_FROM_CONNECTION, conn, "EXPORT"); runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_CONNECTION, conn, - new Long(linkId), "EXPORT"); + new Long(connectionId), "EXPORT"); runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION, conn, - new Long(linkId), "IMPORT"); + new Long(connectionId), "IMPORT"); runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME, conn, "fromJobConfig", "table", Direction.FROM.toString()); @@ -738,6 +737,7 @@ private void updateJobRepositorySchemaAndData(Connection conn, long connectorId) * Pre-register HDFS Connector so that config upgrade will work. * NOTE: This should be used only in the upgrade path */ + @Deprecated protected long registerHdfsConnector(Connection conn) { if (LOG.isTraceEnabled()) { LOG.trace("Begin HDFS Connector pre-loading."); @@ -760,7 +760,7 @@ protected long registerHdfsConnector(Connection conn) { if (handler.getUniqueName().equals(CONNECTOR_HDFS)) { try { PreparedStatement baseConnectorStmt = conn.prepareStatement( - STMT_INSERT_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS, + STMT_INSERT_INTO_CONFIGURABLE_WITHOUT_SUPPORTED_DIRECTIONS, Statement.RETURN_GENERATED_KEYS); baseConnectorStmt.setString(1, handler.getConnectorConfigurable().getUniqueName()); baseConnectorStmt.setString(2, handler.getConnectorConfigurable().getClassName()); @@ -854,21 +854,20 @@ public MConnector findConnector(String shortName, Connection conn) { LOG.debug("Looking up connector: " + shortName); } MConnector mc = null; - PreparedStatement baseConnectorFetchStmt = null; + PreparedStatement connectorFetchStmt = null; try { - baseConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_BASE_CONNECTOR); - baseConnectorFetchStmt.setString(1, shortName); + connectorFetchStmt = conn.prepareStatement(STMT_SELECT_FROM_CONFIGURABLE); + connectorFetchStmt.setString(1, shortName); - List connectors = loadConnectors(baseConnectorFetchStmt, conn); + List connectors = loadConnectors(connectorFetchStmt, conn); - if (connectors.size()==0) { + if (connectors.size() == 0) { LOG.debug("No connector found by name: " + shortName); return null; - } else if (connectors.size()==1) { + } else if (connectors.size() == 1) { LOG.debug("Looking up connector: " + shortName + ", found: " + mc); return connectors.get(0); - } - else { + } else { throw new SqoopException(DerbyRepoError.DERBYREPO_0005, shortName); } @@ -876,7 +875,7 @@ public MConnector findConnector(String shortName, Connection conn) { logException(ex, shortName); throw new SqoopException(DerbyRepoError.DERBYREPO_0004, shortName, ex); } finally { - closeStatements(baseConnectorFetchStmt); + closeStatements(connectorFetchStmt); } } @@ -887,7 +886,9 @@ public MConnector findConnector(String shortName, Connection conn) { public List findConnectors(Connection conn) { PreparedStatement stmt = null; try { - stmt = conn.prepareStatement(STMT_SELECT_CONNECTOR_ALL); + stmt = conn.prepareStatement(STMT_SELECT_CONFIGURABLE_ALL_FOR_TYPE); + stmt.setString(1, MConfigurableType.CONNECTOR.name()); + return loadConnectors(stmt,conn); } catch (SQLException ex) { logException(ex); @@ -897,84 +898,101 @@ public List findConnectors(Connection conn) { } } - /** * {@inheritDoc} */ @Override public void registerDriver(MDriver mDriver, Connection conn) { if (mDriver.hasPersistenceId()) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0011, - "Driver"); + throw new SqoopException(DerbyRepoError.DERBYREPO_0011, mDriver.getUniqueName()); } + mDriver.setPersistenceId(insertAndGetDriverId(mDriver, conn)); + insertConfigsforDriver(mDriver, conn); + } + private void insertConfigsforDriver(MDriver mDriver, Connection conn) { PreparedStatement baseConfigStmt = null; PreparedStatement baseInputStmt = null; try { - baseConfigStmt = conn.prepareStatement(STMT_INSERT_CONFIG_BASE, + baseConfigStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIG, Statement.RETURN_GENERATED_KEYS); - baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE, + baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT, Statement.RETURN_GENERATED_KEYS); // Register a driver config as a job type with no owner/connector and direction - registerConfigs(null/* owner*/, null /*direction*/, mDriver.getDriverConfig().getConfigs(), + registerConfigs(mDriver.getPersistenceId(), null /* no direction*/, mDriver.getDriverConfig().getConfigs(), MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn); - // We're using hardcoded value for driver config as they are - // represented as NULL in the database. - mDriver.setPersistenceId(1); } catch (SQLException ex) { logException(ex, mDriver); throw new SqoopException(DerbyRepoError.DERBYREPO_0014, ex); } finally { closeStatements(baseConfigStmt, baseInputStmt); } - createOrUpdateDriverSystemVersion(conn, mDriver.getVersion()); } /** * {@inheritDoc} */ @Override - public MDriver findDriver(Connection conn) { - LOG.debug("Looking up Driver config to create a driver "); - MDriver mDriver = null; + public MDriver findDriver(String shortName, Connection conn) { + LOG.debug("Looking up Driver and config "); + PreparedStatement driverFetchStmt = null; PreparedStatement driverConfigFetchStmt = null; PreparedStatement driverConfigInputFetchStmt = null; + + MDriver mDriver; try { - driverConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_DRIVER); - driverConfigInputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT); + driverFetchStmt = conn.prepareStatement(STMT_SELECT_FROM_CONFIGURABLE); + driverFetchStmt.setString(1, shortName); + + ResultSet rsDriverSet = driverFetchStmt.executeQuery(); + if (!rsDriverSet.next()) { + return null; + } + Long driverId = rsDriverSet.getLong(1); + String driverVersion = rsDriverSet.getString(4); + + driverConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE); + driverConfigFetchStmt.setLong(1, driverId); + + driverConfigInputFetchStmt = conn.prepareStatement(STMT_SELECT_INPUT); List driverConfigs = new ArrayList(); loadDriverConfigs(driverConfigs, driverConfigFetchStmt, driverConfigInputFetchStmt, 1); - if(driverConfigs.isEmpty()) { + if (driverConfigs.isEmpty()) { return null; } - - mDriver = new MDriver(new MDriverConfig(driverConfigs), detectDriverVersion(conn)); - mDriver.setPersistenceId(1); + mDriver = new MDriver(new MDriverConfig(driverConfigs), driverVersion); + mDriver.setPersistenceId(driverId); } catch (SQLException ex) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0004, - "Driver config", ex); + throw new SqoopException(DerbyRepoError.DERBYREPO_0004, "Driver", ex); } finally { if (driverConfigFetchStmt != null) { try { driverConfigFetchStmt.close(); } catch (SQLException ex) { - LOG.error("Unable to close config fetch statement", ex); + LOG.error("Unable to close driver config fetch statement", ex); } } if (driverConfigInputFetchStmt != null) { try { driverConfigInputFetchStmt.close(); } catch (SQLException ex) { - LOG.error("Unable to close input fetch statement", ex); + LOG.error("Unable to close driver input fetch statement", ex); + } + } + if (driverFetchStmt != null) { + try { + driverFetchStmt.close(); + } catch (SQLException ex) { + LOG.error("Unable to close driver fetch statement", ex); } } } - LOG.debug("Looking up Driver config and created driver:" + mDriver); + LOG.debug("Looked up Driver and config"); return mDriver; } @@ -1228,7 +1246,7 @@ public List findLinks(Connection conn) { public List findLinksForConnector(long connectorID, Connection conn) { PreparedStatement stmt = null; try { - stmt = conn.prepareStatement(STMT_SELECT_LINK_FOR_CONNECTOR); + stmt = conn.prepareStatement(STMT_SELECT_LINK_FOR_CONNECTOR_CONFIGURABLE); stmt.setLong(1, connectorID); return loadLinks(stmt, conn); } catch (SQLException ex) { @@ -1243,7 +1261,7 @@ public List findLinksForConnector(long connectorID, Connection conn) { * {@inheritDoc} */ @Override - public void upgradeConnectorConfigs(MConnector mConnector, Connection conn) { + public void upgradeConnectorAndConfigs(MConnector mConnector, Connection conn) { updateConnectorAndDeleteConfigs(mConnector, conn); insertConfigsForConnector(mConnector, conn); } @@ -1253,13 +1271,14 @@ private void updateConnectorAndDeleteConfigs(MConnector mConnector, Connection c PreparedStatement deleteConfig = null; PreparedStatement deleteInput = null; try { - updateConnectorStatement = conn.prepareStatement(STMT_UPDATE_CONNECTOR); - deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONNECTOR); - deleteConfig = conn.prepareStatement(STMT_DELETE_CONFIGS_FOR_CONNECTOR); + updateConnectorStatement = conn.prepareStatement(STMT_UPDATE_CONFIGURABLE); + deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONFIGURABLE); + deleteConfig = conn.prepareStatement(STMT_DELETE_CONFIGS_FOR_CONFIGURABLE); updateConnectorStatement.setString(1, mConnector.getUniqueName()); updateConnectorStatement.setString(2, mConnector.getClassName()); updateConnectorStatement.setString(3, mConnector.getVersion()); - updateConnectorStatement.setLong(4, mConnector.getPersistenceId()); + updateConnectorStatement.setString(4, mConnector.getType().name()); + updateConnectorStatement.setLong(5, mConnector.getPersistenceId()); if (updateConnectorStatement.executeUpdate() != 1) { throw new SqoopException(DerbyRepoError.DERBYREPO_0038); @@ -1281,19 +1300,30 @@ private void updateConnectorAndDeleteConfigs(MConnector mConnector, Connection c * {@inheritDoc} */ @Override - public void upgradeDriverConfigs(MDriver mDriver, Connection conn) { + public void upgradeDriverAndConfigs(MDriver mDriver, Connection conn) { updateDriverAndDeleteConfigs(mDriver, conn); - createOrUpdateDriverSystemVersion(conn, mDriver.getVersion()); insertConfigsForDriver(mDriver, conn); } private void updateDriverAndDeleteConfigs(MDriver mDriver, Connection conn) { + PreparedStatement updateDriverStatement = null; PreparedStatement deleteConfig = null; PreparedStatement deleteInput = null; try { - deleteInput = conn.prepareStatement(STMT_DELETE_DRIVER_INPUTS); - deleteConfig = conn.prepareStatement(STMT_DELETE_DRIVER_CONFIGS); + updateDriverStatement = conn.prepareStatement(STMT_UPDATE_CONFIGURABLE); + deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONFIGURABLE); + deleteConfig = conn.prepareStatement(STMT_DELETE_CONFIGS_FOR_CONFIGURABLE); + updateDriverStatement.setString(1, mDriver.getUniqueName()); + updateDriverStatement.setString(2, Driver.getClassName()); + updateDriverStatement.setString(3, mDriver.getVersion()); + updateDriverStatement.setString(4, mDriver.getType().name()); + updateDriverStatement.setLong(5, mDriver.getPersistenceId()); + if (updateDriverStatement.executeUpdate() != 1) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0038); + } + deleteInput.setLong(1, mDriver.getPersistenceId()); + deleteConfig.setLong(1, mDriver.getPersistenceId()); deleteInput.executeUpdate(); deleteConfig.executeUpdate(); @@ -1301,7 +1331,7 @@ private void updateDriverAndDeleteConfigs(MDriver mDriver, Connection conn) { logException(e, mDriver); throw new SqoopException(DerbyRepoError.DERBYREPO_0044, e); } finally { - closeStatements(deleteConfig, deleteInput); + closeStatements(updateDriverStatement, deleteConfig, deleteInput); } } @@ -1557,7 +1587,7 @@ public List findJobs(Connection conn) { public List findJobsForConnector(long connectorId, Connection conn) { PreparedStatement stmt = null; try { - stmt = conn.prepareStatement(STMT_SELECT_ALL_JOBS_FOR_CONNECTOR); + stmt = conn.prepareStatement(STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE); stmt.setLong(1, connectorId); stmt.setLong(2, connectorId); return loadJobs(stmt, conn); @@ -2080,8 +2110,8 @@ private List loadConnectors(PreparedStatement stmt, Connection conn) try { rsConnectors = stmt.executeQuery(); - connectorConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR); - connectorConfigInputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT); + connectorConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE); + connectorConfigInputFetchStmt = conn.prepareStatement(STMT_SELECT_INPUT); while(rsConnectors.next()) { long connectorId = rsConnectors.getLong(1); @@ -2116,9 +2146,8 @@ private List loadConnectors(PreparedStatement stmt, Connection conn) } } finally { closeResultSets(rsConnectors); - closeStatements(connectorConfigFetchStmt,connectorConfigInputFetchStmt); + closeStatements(connectorConfigFetchStmt, connectorConfigInputFetchStmt); } - return connectors; } @@ -2134,7 +2163,7 @@ private List loadLinks(PreparedStatement stmt, rsConnection = stmt.executeQuery(); // - connectorConfigFetchStatement = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR); + connectorConfigFetchStatement = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE); connectorConfigInputStatement = conn.prepareStatement(STMT_FETCH_LINK_INPUT); while(rsConnection.next()) { @@ -2189,14 +2218,15 @@ private List loadJobs(PreparedStatement stmt, try { rsJob = stmt.executeQuery(); - - fromConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR); - toConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR); - driverConfigfetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_DRIVER); + // Note: Job does not hold a explicit reference to the driver since every + // job has the same driver + long driverId = this.findDriver(MDriver.DRIVER_NAME, conn).getPersistenceId(); + fromConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE); + toConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE); + driverConfigfetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE); jobInputFetchStmt = conn.prepareStatement(STMT_FETCH_JOB_INPUT); while(rsJob.next()) { - // why use connector? why cant it be link id? long fromConnectorId = rsJob.getLong(1); long toConnectorId = rsJob.getLong(2); long id = rsJob.getLong(3); @@ -2211,9 +2241,9 @@ private List loadJobs(PreparedStatement stmt, fromConfigFetchStmt.setLong(1, fromConnectorId); toConfigFetchStmt.setLong(1,toConnectorId); + driverConfigfetchStmt.setLong(1, driverId); jobInputFetchStmt.setLong(1, id); - //inputFetchStmt.setLong(1, XXX); // Will be filled by loadFrameworkConfigs jobInputFetchStmt.setLong(3, id); // FROM entity configs @@ -2283,7 +2313,7 @@ private void registerConfigDirection(Long configId, Direction direction, Connect * * Use given prepared statements to create entire config structure in database. * - * @param connectorId + * @param configurableId * @param configs * @param type * @param baseConfigStmt @@ -2292,17 +2322,17 @@ private void registerConfigDirection(Long configId, Direction direction, Connect * @return short number of configs registered. * @throws SQLException */ - private short registerConfigs(Long connectorId, Direction direction, + private short registerConfigs(Long configurableId, Direction direction, List configs, String type, PreparedStatement baseConfigStmt, PreparedStatement baseInputStmt, Connection conn) throws SQLException { short configIndex = 0; for (MConfig config : configs) { - if(connectorId == null) { + if (configurableId == null) { baseConfigStmt.setNull(1, Types.BIGINT); } else { - baseConfigStmt.setLong(1, connectorId); + baseConfigStmt.setLong(1, configurableId); } baseConfigStmt.setString(2, config.getName()); diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java index 59773e16..de08261d 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java @@ -49,9 +49,14 @@ public final class DerbySchemaConstants { public static final String COLUMN_SQD_NAME = "SQD_NAME"; // SQ_CONNECTOR + @Deprecated // used only for upgrade public static final String TABLE_SQ_CONNECTOR_NAME = "SQ_CONNECTOR"; + // SQ_CONFIGURABLE + public static final String TABLE_SQ_CONFIGURABLE_NAME = "SQ_CONFIGURABLE"; + @Deprecated // used only for upgrade public static final String TABLE_SQ_CONNECTOR = SCHEMA_PREFIX + TABLE_SQ_CONNECTOR_NAME; + public static final String TABLE_SQ_CONFIGURABLE = SCHEMA_PREFIX + TABLE_SQ_CONFIGURABLE_NAME; public static final String COLUMN_SQC_ID = "SQC_ID"; @@ -61,6 +66,8 @@ public final class DerbySchemaConstants { public static final String COLUMN_SQC_VERSION = "SQC_VERSION"; + public static final String COLUMN_SQC_TYPE = "SQC_TYPE"; + // SQ_CONNECTOR_DIRECTIONS public static final String TABLE_SQ_CONNECTOR_DIRECTIONS_NAME = "SQ_CONNECTOR_DIRECTIONS"; @@ -75,12 +82,10 @@ public final class DerbySchemaConstants { public static final String COLUMN_SQCD_DIRECTION = "SQCD_DIRECTION"; public static final String CONSTRAINT_SQCD_SQC_NAME = CONSTRAINT_PREFIX + "SQCD_SQC"; - // FK to the SQ_CONNECTOR table public static final String CONSTRAINT_SQCD_SQC = SCHEMA_PREFIX + CONSTRAINT_SQCD_SQC_NAME; public static final String CONSTRAINT_SQCD_SQD_NAME = CONSTRAINT_PREFIX + "SQCD_SQD"; - // FK to the SQ_DIRECTION able public static final String CONSTRAINT_SQCD_SQD = SCHEMA_PREFIX + CONSTRAINT_SQCD_SQD_NAME; @@ -99,7 +104,10 @@ public final class DerbySchemaConstants { @Deprecated // used only for upgrade public static final String COLUMN_SQF_CONNECTOR = "SQF_CONNECTOR"; + @Deprecated // used only for upgrade path public static final String COLUMN_SQ_CFG_CONNECTOR = "SQ_CFG_CONNECTOR"; + // note this column was renamed again + public static final String COLUMN_SQ_CFG_CONFIGURABLE = "SQ_CFG_CONFIGURABLE"; @Deprecated // used only for upgrade public static final String COLUMN_SQF_OPERATION = "SQF_OPERATION"; @@ -125,8 +133,8 @@ public final class DerbySchemaConstants { @Deprecated // used only for upgrade public static final String CONSTRAINT_SQF_SQC = SCHEMA_PREFIX + CONSTRAINT_SQF_SQC_NAME; + // FK constraint on configurable public static final String CONSTRAINT_SQ_CFG_SQC_NAME = CONSTRAINT_PREFIX + "SQ_CFG_SQC"; - public static final String CONSTRAINT_SQ_CFG_SQC = SCHEMA_PREFIX + CONSTRAINT_SQ_CFG_SQC_NAME; // SQ_CONFIG_DIRECTIONS @@ -202,7 +210,11 @@ public final class DerbySchemaConstants { public static final String COLUMN_SQ_LNK_NAME = "SQ_LNK_NAME"; @Deprecated // used only for upgrade public static final String COLUMN_SQN_CONNECTOR = "SQN_CONNECTOR"; + @Deprecated // used only for upgrade public static final String COLUMN_SQ_LNK_CONNECTOR = "SQ_LNK_CONNECTOR"; + // Note this column has been renamed twice + public static final String COLUMN_SQ_LNK_CONFIGURABLE = "SQ_LNK_CONFIGURABLE"; + @Deprecated // used only for upgrade public static final String COLUMN_SQN_CREATION_USER = "SQN_CREATION_USER"; public static final String COLUMN_SQ_LNK_CREATION_USER = "SQ_LNK_CREATION_USER"; @@ -225,10 +237,10 @@ public final class DerbySchemaConstants { @Deprecated public static final String CONSTRAINT_SQN_SQC = SCHEMA_PREFIX + CONSTRAINT_SQN_SQC_NAME; + // FK constraint on the connector configurable public static final String CONSTRAINT_SQ_LNK_SQC = SCHEMA_PREFIX + CONSTRAINT_SQ_LNK_SQC_NAME; public static final String CONSTRAINT_SQ_LNK_NAME_UNIQUE_NAME = CONSTRAINT_PREFIX + "SQ_LNK_NAME_UNIQUE"; - public static final String CONSTRAINT_SQ_LNK_NAME_UNIQUE = SCHEMA_PREFIX + CONSTRAINT_SQ_LNK_NAME_UNIQUE_NAME; // SQ_JOB @@ -437,12 +449,12 @@ public final class DerbySchemaConstants { static { tablesV1 = new HashSet(); tablesV1.add(TABLE_SQ_CONNECTOR_NAME); - tablesV1.add(TABLE_SQ_LINK_NAME); - tablesV1.add(TABLE_SQ_LINK_INPUT_NAME); + tablesV1.add(TABLE_SQ_CONNECTION_NAME); + tablesV1.add(TABLE_SQ_CONNECTION_INPUT_NAME); tablesV1.add(TABLE_SQ_COUNTER_NAME); tablesV1.add(TABLE_SQ_COUNTER_GROUP_NAME); tablesV1.add(TABLE_SQ_COUNTER_SUBMISSION_NAME); - tablesV1.add(TABLE_SQ_CONFIG_NAME); + tablesV1.add(TABLE_SQ_FORM_NAME); tablesV1.add(TABLE_SQ_INPUT_NAME); tablesV1.add(TABLE_SQ_JOB_NAME); tablesV1.add(TABLE_SQ_JOB_INPUT_NAME); diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java index 44ec2e3f..ce1830ec 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java @@ -48,15 +48,16 @@ * *

*

- * SQ_CONNECTOR: Connector registration. + * SQ_CONFIGURABLE: Configurable registration. * *

  *    +-----------------------------+
- *    | SQ_CONNECTOR                |
+ *    | SQ_CONFIGURABLE             |
  *    +-----------------------------+
  *    | SQC_ID: BIGINT PK AUTO-GEN  |
  *    | SQC_NAME: VARCHAR(64)       |
  *    | SQC_CLASS: VARCHAR(255)     |
+ *    | SQC_TYPE: VARCHAR(32)       |"CONNECTOR"|"DRIVER"
  *    | SQC_VERSION: VARCHAR(64)    |
  *    +-----------------------------+
  * 
@@ -252,7 +253,6 @@ * *

*/ - // NOTE: If you have signed yourself to modify the schema for the repository // such as a rename, change in table relationships or constraints, embrace yourself! // The following code is supposed to be a chronological order of how the @@ -561,199 +561,184 @@ public final class DerbySchemaQuery { "SELECT " + COLUMN_SQD_NAME + " FROM " + TABLE_SQ_DIRECTION + " WHERE " + COLUMN_SQD_ID + "=?"; - // DML: Fetch connector Given Name - public static final String STMT_FETCH_BASE_CONNECTOR = + //DML: Get configurable by given name + public static final String STMT_SELECT_FROM_CONFIGURABLE = + "SELECT " + + COLUMN_SQC_ID + ", " + + COLUMN_SQC_NAME + ", " + + COLUMN_SQC_CLASS + ", " + + COLUMN_SQC_VERSION + + " FROM " + TABLE_SQ_CONFIGURABLE + + " WHERE " + COLUMN_SQC_NAME + " = ?"; + + //DML: Get all configurables for a given type + public static final String STMT_SELECT_CONFIGURABLE_ALL_FOR_TYPE = + "SELECT " + + COLUMN_SQC_ID + ", " + + COLUMN_SQC_NAME + ", " + + COLUMN_SQC_CLASS + ", " + + COLUMN_SQC_VERSION + + " FROM " + TABLE_SQ_CONFIGURABLE + + " WHERE " + COLUMN_SQC_TYPE + " = ?"; + + // DML: Select all connectors + @Deprecated // used only for upgrade logic + public static final String STMT_SELECT_CONNECTOR_ALL = + "SELECT " + + COLUMN_SQC_ID + ", " + + COLUMN_SQC_NAME + ", " + + COLUMN_SQC_CLASS + ", " + + COLUMN_SQC_VERSION + + " FROM " + TABLE_SQ_CONNECTOR; + + //DML: Get all configs for a given configurable + public static final String STMT_SELECT_CONFIG_FOR_CONFIGURABLE = "SELECT " - + COLUMN_SQC_ID + ", " + + COLUMN_SQ_CFG_ID + ", " + + COLUMN_SQ_CFG_CONFIGURABLE + ", " + + COLUMN_SQ_CFG_NAME + ", " + + COLUMN_SQ_CFG_TYPE + ", " + + COLUMN_SQ_CFG_INDEX + + " FROM " + TABLE_SQ_CONFIG + + " WHERE " + COLUMN_SQ_CFG_CONFIGURABLE + " = ? " + + " ORDER BY " + COLUMN_SQ_CFG_INDEX; + + // DML: Get inputs for a given config + public static final String STMT_SELECT_INPUT = + "SELECT " + + COLUMN_SQI_ID + ", " + + COLUMN_SQI_NAME + ", " + + COLUMN_SQI_CONFIG + ", " + + COLUMN_SQI_INDEX + ", " + + COLUMN_SQI_TYPE + ", " + + COLUMN_SQI_STRMASK + ", " + + COLUMN_SQI_STRLENGTH + ", " + + COLUMN_SQI_ENUMVALS + ", " + + "cast(null as varchar(100))" + + " FROM " + TABLE_SQ_INPUT + + " WHERE " + COLUMN_SQI_CONFIG + " = ?" + + " ORDER BY " + COLUMN_SQI_INDEX; + + //DML: Get inputs and values for a given link + public static final String STMT_FETCH_LINK_INPUT = + "SELECT " + + COLUMN_SQI_ID + ", " + + COLUMN_SQI_NAME + ", " + + COLUMN_SQI_CONFIG + ", " + + COLUMN_SQI_INDEX + ", " + + COLUMN_SQI_TYPE + ", " + + COLUMN_SQI_STRMASK + ", " + + COLUMN_SQI_STRLENGTH + "," + + COLUMN_SQI_ENUMVALS + ", " + + COLUMN_SQ_LNKI_VALUE + + " FROM " + TABLE_SQ_INPUT + + " LEFT OUTER JOIN " + TABLE_SQ_LINK_INPUT + + " ON " + COLUMN_SQ_LNKI_INPUT + " = " + COLUMN_SQI_ID + + " AND " + COLUMN_SQ_LNKI_LINK + " = ?" + + " WHERE " + COLUMN_SQI_CONFIG + " = ?" + + " AND (" + COLUMN_SQ_LNKI_LINK + " = ?" + " OR " + COLUMN_SQ_LNKI_LINK + " IS NULL)" + + " ORDER BY " + COLUMN_SQI_INDEX; + + //DML: Fetch inputs and values for a given job + public static final String STMT_FETCH_JOB_INPUT = + "SELECT " + + COLUMN_SQI_ID + ", " + + COLUMN_SQI_NAME + ", " + + COLUMN_SQI_CONFIG + ", " + + COLUMN_SQI_INDEX + ", " + + COLUMN_SQI_TYPE + ", " + + COLUMN_SQI_STRMASK + ", " + + COLUMN_SQI_STRLENGTH + ", " + + COLUMN_SQI_ENUMVALS + ", " + + COLUMN_SQBI_VALUE + + " FROM " + TABLE_SQ_INPUT + + " LEFT OUTER JOIN " + TABLE_SQ_JOB_INPUT + + " ON " + COLUMN_SQBI_INPUT + " = " + COLUMN_SQI_ID + + " AND " + COLUMN_SQBI_JOB + " = ?" + + " WHERE " + COLUMN_SQI_CONFIG + " = ?" + + " AND (" + COLUMN_SQBI_JOB + " = ? OR " + COLUMN_SQBI_JOB + " IS NULL)" + + " ORDER BY " + COLUMN_SQI_INDEX; + + //DML: Insert into configurable + public static final String STMT_INSERT_INTO_CONFIGURABLE = + "INSERT INTO " + TABLE_SQ_CONFIGURABLE + " (" + COLUMN_SQC_NAME + ", " + COLUMN_SQC_CLASS + ", " - + COLUMN_SQC_VERSION - + " FROM " + TABLE_SQ_CONNECTOR - + " WHERE " + COLUMN_SQC_NAME + " = ?"; + + COLUMN_SQC_VERSION + ", " + + COLUMN_SQC_TYPE + + ") VALUES (?, ?, ?, ?)"; - // DML: Select all connectors - public static final String STMT_SELECT_CONNECTOR_ALL = - "SELECT " - + COLUMN_SQC_ID + ", " - + COLUMN_SQC_NAME + ", " - + COLUMN_SQC_CLASS + ", " - + COLUMN_SQC_VERSION - + " FROM " + TABLE_SQ_CONNECTOR; + @Deprecated // used only in the upgrade path + public static final String STMT_INSERT_INTO_CONFIGURABLE_WITHOUT_SUPPORTED_DIRECTIONS = + "INSERT INTO " + TABLE_SQ_CONNECTOR+ " (" + + COLUMN_SQC_NAME + ", " + + COLUMN_SQC_CLASS + ", " + + COLUMN_SQC_VERSION + + ") VALUES (?, ?, ?)"; - // DML: Fetch all configs for a given connector - public static final String STMT_FETCH_CONFIG_CONNECTOR = - "SELECT " - + COLUMN_SQ_CFG_ID + ", " - + COLUMN_SQ_CFG_CONNECTOR + ", " - + COLUMN_SQ_CFG_NAME + ", " - + COLUMN_SQ_CFG_TYPE + ", " - + COLUMN_SQ_CFG_INDEX - + " FROM " + TABLE_SQ_CONFIG - + " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " = ? " - + " ORDER BY " + COLUMN_SQ_CFG_INDEX; + //DML: Insert into config + public static final String STMT_INSERT_INTO_CONFIG = + "INSERT INTO " + TABLE_SQ_CONFIG + " (" + + COLUMN_SQ_CFG_CONFIGURABLE + ", " + + COLUMN_SQ_CFG_NAME + ", " + + COLUMN_SQ_CFG_TYPE + ", " + + COLUMN_SQ_CFG_INDEX + + ") VALUES ( ?, ?, ?, ?)"; - // DML: Fetch all driver configs - public static final String STMT_FETCH_CONFIG_DRIVER = - "SELECT " - + COLUMN_SQ_CFG_ID + ", " - + COLUMN_SQ_CFG_CONNECTOR + ", " - + COLUMN_SQ_CFG_NAME + ", " - + COLUMN_SQ_CFG_TYPE + ", " - + COLUMN_SQ_CFG_INDEX - + " FROM " + TABLE_SQ_CONFIG - + " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " IS NULL " - + " ORDER BY " + COLUMN_SQ_CFG_TYPE + ", " + COLUMN_SQ_CFG_INDEX; - - // DML: Fetch inputs for a given config - public static final String STMT_FETCH_INPUT = - "SELECT " - + COLUMN_SQI_ID + ", " - + COLUMN_SQI_NAME + ", " - + COLUMN_SQI_CONFIG + ", " - + COLUMN_SQI_INDEX + ", " - + COLUMN_SQI_TYPE + ", " - + COLUMN_SQI_STRMASK + ", " - + COLUMN_SQI_STRLENGTH + ", " - + COLUMN_SQI_ENUMVALS + ", " - + "cast(null as varchar(100))" - + " FROM " + TABLE_SQ_INPUT - + " WHERE " + COLUMN_SQI_CONFIG + " = ?" - + " ORDER BY " + COLUMN_SQI_INDEX; - - // DML: Fetch inputs and values for a given link - public static final String STMT_FETCH_LINK_INPUT = - "SELECT " - + COLUMN_SQI_ID + ", " - + COLUMN_SQI_NAME + ", " - + COLUMN_SQI_CONFIG + ", " - + COLUMN_SQI_INDEX + ", " - + COLUMN_SQI_TYPE + ", " - + COLUMN_SQI_STRMASK + ", " - + COLUMN_SQI_STRLENGTH + "," - + COLUMN_SQI_ENUMVALS + ", " - + COLUMN_SQ_LNKI_VALUE - + " FROM " + TABLE_SQ_INPUT - + " LEFT OUTER JOIN " + TABLE_SQ_LINK_INPUT - + " ON " + COLUMN_SQ_LNKI_INPUT + " = " + COLUMN_SQI_ID - + " AND " + COLUMN_SQ_LNKI_LINK + " = ?" - + " WHERE " + COLUMN_SQI_CONFIG + " = ?" - + " AND (" + COLUMN_SQ_LNKI_LINK + " = ?" + " OR " + COLUMN_SQ_LNKI_LINK + " IS NULL)" - + " ORDER BY " + COLUMN_SQI_INDEX; - - // DML: Fetch inputs and values for a given job - public static final String STMT_FETCH_JOB_INPUT = - "SELECT " - + COLUMN_SQI_ID + ", " - + COLUMN_SQI_NAME + ", " - + COLUMN_SQI_CONFIG + ", " - + COLUMN_SQI_INDEX + ", " - + COLUMN_SQI_TYPE + ", " - + COLUMN_SQI_STRMASK + ", " - + COLUMN_SQI_STRLENGTH + ", " - + COLUMN_SQI_ENUMVALS + ", " - + COLUMN_SQBI_VALUE - + " FROM " + TABLE_SQ_INPUT - + " LEFT OUTER JOIN " + TABLE_SQ_JOB_INPUT - + " ON " + COLUMN_SQBI_INPUT + " = " + COLUMN_SQI_ID - + " AND " + COLUMN_SQBI_JOB + " = ?" - + " WHERE " + COLUMN_SQI_CONFIG + " = ?" - + " AND (" + COLUMN_SQBI_JOB + " = ? OR " + COLUMN_SQBI_JOB + " IS NULL)" - + " ORDER BY " + COLUMN_SQI_INDEX; - - // DML: Insert connector base - public static final String STMT_INSERT_CONNECTOR_BASE = - "INSERT INTO " + TABLE_SQ_CONNECTOR + " (" - + COLUMN_SQC_NAME + ", " - + COLUMN_SQC_CLASS + ", " - + COLUMN_SQC_VERSION - + ") VALUES (?, ?, ?)"; - - public static final String STMT_INSERT_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS = - "INSERT INTO " + TABLE_SQ_CONNECTOR + " (" - + COLUMN_SQC_NAME + ", " - + COLUMN_SQC_CLASS + ", " - + COLUMN_SQC_VERSION - + ") VALUES (?, ?, ?)"; - - // DML: Insert config base - public static final String STMT_INSERT_CONFIG_BASE = - "INSERT INTO " + TABLE_SQ_CONFIG + " (" - + COLUMN_SQ_CFG_CONNECTOR + ", " - + COLUMN_SQ_CFG_NAME + ", " - + COLUMN_SQ_CFG_TYPE + ", " - + COLUMN_SQ_CFG_INDEX - + ") VALUES ( ?, ?, ?, ?)"; - - // DML: Insert config input - public static final String STMT_INSERT_INPUT_BASE = - "INSERT INTO " + TABLE_SQ_INPUT + " (" - + COLUMN_SQI_NAME + ", " - + COLUMN_SQI_CONFIG + ", " - + COLUMN_SQI_INDEX + ", " - + COLUMN_SQI_TYPE + ", " - + COLUMN_SQI_STRMASK + ", " - + COLUMN_SQI_STRLENGTH + ", " - + COLUMN_SQI_ENUMVALS - + ") VALUES (?, ?, ?, ?, ?, ?, ?)"; - - // Delete all configs for a given connector - public static final String STMT_DELETE_CONFIGS_FOR_CONNECTOR = - "DELETE FROM " + TABLE_SQ_CONFIG - + " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " = ?"; - - // Delete all inputs for a given connector - public static final String STMT_DELETE_INPUTS_FOR_CONNECTOR = - "DELETE FROM " + TABLE_SQ_INPUT - + " WHERE " - + COLUMN_SQI_CONFIG - + " IN (SELECT " - + COLUMN_SQ_CFG_ID - + " FROM " + TABLE_SQ_CONFIG - + " WHERE " - + COLUMN_SQ_CFG_CONNECTOR + " = ?)"; - - // Delete all driver inputs - public static final String STMT_DELETE_DRIVER_INPUTS = - "DELETE FROM " + TABLE_SQ_INPUT - + " WHERE " - + COLUMN_SQI_CONFIG - + " IN (SELECT " - + COLUMN_SQ_CFG_ID - + " FROM " + TABLE_SQ_CONFIG - + " WHERE " - + COLUMN_SQ_CFG_CONNECTOR + " IS NULL)"; - - // Delete all driver configs - public static final String STMT_DELETE_DRIVER_CONFIGS = - "DELETE FROM " + TABLE_SQ_CONFIG - + " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " IS NULL"; - - - // Update the connector - public static final String STMT_UPDATE_CONNECTOR = - "UPDATE " + TABLE_SQ_CONNECTOR - + " SET " + COLUMN_SQC_NAME + " = ?, " - + COLUMN_SQC_CLASS + " = ?, " - + COLUMN_SQC_VERSION + " = ? " - + " WHERE " + COLUMN_SQC_ID + " = ?"; - - // DML: Insert new connection - @Deprecated // used only in upgrade path - public static final String STMT_INSERT_CONNECTION = - "INSERT INTO " + TABLE_SQ_CONNECTION + " (" - + COLUMN_SQN_NAME + ", " - + COLUMN_SQN_CONNECTOR + "," - + COLUMN_SQN_ENABLED + ", " - + COLUMN_SQN_CREATION_USER + ", " - + COLUMN_SQN_CREATION_DATE + ", " - + COLUMN_SQN_UPDATE_USER + ", " + COLUMN_SQN_UPDATE_DATE + //DML: Insert into config input + public static final String STMT_INSERT_INTO_INPUT = + "INSERT INTO " + TABLE_SQ_INPUT + " (" + + COLUMN_SQI_NAME + ", " + + COLUMN_SQI_CONFIG + ", " + + COLUMN_SQI_INDEX + ", " + + COLUMN_SQI_TYPE + ", " + + COLUMN_SQI_STRMASK + ", " + + COLUMN_SQI_STRLENGTH + ", " + + COLUMN_SQI_ENUMVALS + ") VALUES (?, ?, ?, ?, ?, ?, ?)"; + //Delete all configs for a given configurable + public static final String STMT_DELETE_CONFIGS_FOR_CONFIGURABLE = + "DELETE FROM " + TABLE_SQ_CONFIG + + " WHERE " + COLUMN_SQ_CFG_CONFIGURABLE + " = ?"; + + //Delete all inputs for a given configurable + public static final String STMT_DELETE_INPUTS_FOR_CONFIGURABLE = + "DELETE FROM " + TABLE_SQ_INPUT + + " WHERE " + + COLUMN_SQI_CONFIG + + " IN (SELECT " + + COLUMN_SQ_CFG_ID + + " FROM " + TABLE_SQ_CONFIG + + " WHERE " + + COLUMN_SQ_CFG_CONFIGURABLE + " = ?)"; + + //Update the configurable + public static final String STMT_UPDATE_CONFIGURABLE = + "UPDATE " + TABLE_SQ_CONFIGURABLE + + " SET " + COLUMN_SQC_NAME + " = ?, " + + COLUMN_SQC_CLASS + " = ?, " + + COLUMN_SQC_VERSION + " = ?, " + + COLUMN_SQC_TYPE + " = ? " + + " WHERE " + COLUMN_SQC_ID + " = ?"; + + //DML: Insert new connection + @Deprecated // used only in upgrade path + public static final String STMT_INSERT_CONNECTION = + "INSERT INTO " + TABLE_SQ_CONNECTION + " (" + + COLUMN_SQN_NAME + ", " + + COLUMN_SQN_CONNECTOR + "," + + COLUMN_SQN_ENABLED + ", " + + COLUMN_SQN_CREATION_USER + ", " + + COLUMN_SQN_CREATION_DATE + ", " + + COLUMN_SQN_UPDATE_USER + ", " + COLUMN_SQN_UPDATE_DATE + + ") VALUES (?, ?, ?, ?, ?, ?, ?)"; + // DML: Insert new link public static final String STMT_INSERT_LINK = "INSERT INTO " + TABLE_SQ_LINK + " (" + COLUMN_SQ_LNK_NAME + ", " - + COLUMN_SQ_LNK_CONNECTOR + ", " + + COLUMN_SQ_LNK_CONFIGURABLE + ", " + COLUMN_SQ_LNK_ENABLED + ", " + COLUMN_SQ_LNK_CREATION_USER + ", " + COLUMN_SQ_LNK_CREATION_DATE + ", " @@ -798,7 +783,7 @@ public final class DerbySchemaQuery { "SELECT " + COLUMN_SQ_LNK_ID + ", " + COLUMN_SQ_LNK_NAME + ", " - + COLUMN_SQ_LNK_CONNECTOR + ", " + + COLUMN_SQ_LNK_CONFIGURABLE + ", " + COLUMN_SQ_LNK_ENABLED + ", " + COLUMN_SQ_LNK_CREATION_USER + ", " + COLUMN_SQ_LNK_CREATION_DATE + ", " @@ -807,12 +792,12 @@ public final class DerbySchemaQuery { + " FROM " + TABLE_SQ_LINK + " WHERE " + COLUMN_SQ_LNK_ID + " = ?"; - // DML: Select all connections + // DML: Select all links public static final String STMT_SELECT_LINK_ALL = "SELECT " + COLUMN_SQ_LNK_ID + ", " + COLUMN_SQ_LNK_NAME + ", " - + COLUMN_SQ_LNK_CONNECTOR + ", " + + COLUMN_SQ_LNK_CONFIGURABLE + ", " + COLUMN_SQ_LNK_ENABLED + ", " + COLUMN_SQ_LNK_CREATION_USER + ", " + COLUMN_SQ_LNK_CREATION_DATE + ", " @@ -820,19 +805,19 @@ public final class DerbySchemaQuery { + COLUMN_SQ_LNK_UPDATE_DATE + " FROM " + TABLE_SQ_LINK; - // DML: Select all connections for a specific connector. - public static final String STMT_SELECT_LINK_FOR_CONNECTOR = + // DML: Select all links for a specific connector. + public static final String STMT_SELECT_LINK_FOR_CONNECTOR_CONFIGURABLE = "SELECT " + COLUMN_SQ_LNK_ID + ", " + COLUMN_SQ_LNK_NAME + ", " - + COLUMN_SQ_LNK_CONNECTOR + ", " + + COLUMN_SQ_LNK_CONFIGURABLE + ", " + COLUMN_SQ_LNK_ENABLED + ", " + COLUMN_SQ_LNK_CREATION_USER + ", " + COLUMN_SQ_LNK_CREATION_DATE + ", " + COLUMN_SQ_LNK_UPDATE_USER + ", " + COLUMN_SQ_LNK_UPDATE_DATE + " FROM " + TABLE_SQ_LINK - + " WHERE " + COLUMN_SQ_LNK_CONNECTOR + " = ?"; + + " WHERE " + COLUMN_SQ_LNK_CONFIGURABLE + " = ?"; // DML: Check if given link exists public static final String STMT_SELECT_LINK_CHECK_BY_ID = @@ -897,34 +882,35 @@ public final class DerbySchemaQuery { + " ON " + COLUMN_SQB_FROM_LINK + " = " + COLUMN_SQ_LNK_ID + " WHERE " + COLUMN_SQ_LNK_ID + " = ? "; - // DML: Select all jobs - public static final String STMT_SELECT_JOB = - "SELECT " - + "FROM_LINK." + COLUMN_SQ_LNK_CONNECTOR + ", " - + "TO_LINK." + COLUMN_SQ_LNK_CONNECTOR + ", " - + "JOB." + COLUMN_SQB_ID + ", " - + "JOB." + COLUMN_SQB_NAME + ", " - + "JOB." + COLUMN_SQB_FROM_LINK + ", " - + "JOB." + COLUMN_SQB_TO_LINK + ", " - + "JOB." + COLUMN_SQB_ENABLED + ", " - + "JOB." + COLUMN_SQB_CREATION_USER + ", " - + "JOB." + COLUMN_SQB_CREATION_DATE + ", " - + "JOB." + COLUMN_SQB_UPDATE_USER + ", " - + "JOB." + COLUMN_SQB_UPDATE_DATE - + " FROM " + TABLE_SQ_JOB + " JOB" - + " LEFT JOIN " + TABLE_SQ_LINK + " FROM_LINK" - + " ON " + COLUMN_SQB_FROM_LINK + " = FROM_LINK." + COLUMN_SQ_LNK_ID - + " LEFT JOIN " + TABLE_SQ_LINK + " TO_LINK" - + " ON " + COLUMN_SQB_TO_LINK + " = TO_LINK." + COLUMN_SQ_LNK_ID; + //DML: Select all jobs + public static final String STMT_SELECT_JOB = + "SELECT " + + "FROM_CONNECTOR." + COLUMN_SQ_LNK_CONFIGURABLE + ", " + + "TO_CONNECTOR." + COLUMN_SQ_LNK_CONFIGURABLE + ", " + + "JOB." + COLUMN_SQB_ID + ", " + + "JOB." + COLUMN_SQB_NAME + ", " + + "JOB." + COLUMN_SQB_FROM_LINK + ", " + + "JOB." + COLUMN_SQB_TO_LINK + ", " + + "JOB." + COLUMN_SQB_ENABLED + ", " + + "JOB." + COLUMN_SQB_CREATION_USER + ", " + + "JOB." + COLUMN_SQB_CREATION_DATE + ", " + + "JOB." + COLUMN_SQB_UPDATE_USER + ", " + + "JOB." + COLUMN_SQB_UPDATE_DATE + + " FROM " + TABLE_SQ_JOB + " JOB" + + " LEFT JOIN " + TABLE_SQ_LINK + " FROM_CONNECTOR" + + " ON " + COLUMN_SQB_FROM_LINK + " = FROM_CONNECTOR." + COLUMN_SQ_LNK_ID + + " LEFT JOIN " + TABLE_SQ_LINK + " TO_CONNECTOR" + + " ON " + COLUMN_SQB_TO_LINK + " = TO_CONNECTOR." + COLUMN_SQ_LNK_ID; // DML: Select one specific job public static final String STMT_SELECT_JOB_SINGLE_BY_ID = STMT_SELECT_JOB + " WHERE " + COLUMN_SQB_ID + " = ?"; // DML: Select all jobs for a Connector - public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR = STMT_SELECT_JOB - + " WHERE FROM_LINK." + COLUMN_SQ_LNK_CONNECTOR + " = ? OR TO_LINK." - + COLUMN_SQ_LNK_CONNECTOR + " = ?"; + public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE = + STMT_SELECT_JOB + + " WHERE FROM_LINK." + COLUMN_SQ_LNK_CONFIGURABLE + " = ? OR TO_LINK." + + COLUMN_SQ_LNK_CONFIGURABLE + " = ?"; // DML: Insert new submission public static final String STMT_INSERT_SUBMISSION = @@ -1196,7 +1182,7 @@ public final class DerbySchemaQuery { public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_1 = "ALTER TABLE " + TABLE_SQ_CONNECTION_INPUT + " DROP CONSTRAINT " + CONSTRAINT_SQNI_SQI; public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_2 = "ALTER TABLE " - + TABLE_SQ_CONNECTION_INPUT + " DROP CONSTRAINT " + CONSTRAINT_SQNI_SQN; + + TABLE_SQ_CONNECTION_INPUT + " DROP CONSTRAINT " + CONSTRAINT_SQNI_SQN; public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_3 = "ALTER TABLE " + TABLE_SQ_JOB + " DROP CONSTRAINT " + CONSTRAINT_SQB_SQN_FROM; @@ -1225,6 +1211,15 @@ public final class DerbySchemaQuery { public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_8 = "RENAME COLUMN " + TABLE_SQ_LINK + "." + COLUMN_SQN_ENABLED + " TO " + COLUMN_SQ_LNK_ENABLED; + // rename the constraint CONSTRAINT_SQF_SQC to CONSTRAINT_SQ_CFG_SQC + public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONNECTOR_CONSTRAINT = "ALTER TABLE " + + TABLE_SQ_LINK + " DROP CONSTRAINT " + CONSTRAINT_SQN_SQC; + + public static final String QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONNECTOR_CONSTRAINT = "ALTER TABLE " + + TABLE_SQ_LINK + " ADD CONSTRAINT " + CONSTRAINT_SQ_LNK_SQC + " " + "FOREIGN KEY (" + + COLUMN_SQ_LNK_CONNECTOR + ") " + "REFERENCES " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + + ")"; + // table rename for CONNECTION_INPUT -> LINK_INPUT public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_TO_SQ_LINK_INPUT = "RENAME TABLE " + TABLE_SQ_CONNECTION_INPUT + " TO SQ_LINK_INPUT"; @@ -1262,6 +1257,23 @@ public final class DerbySchemaQuery { public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_6 = "RENAME COLUMN " + TABLE_SQ_CONFIG + "." + COLUMN_SQF_INDEX + " TO " + COLUMN_SQ_CFG_INDEX; + // rename the constraint CONSTRAINT_SQF_SQC to CONSTRAINT_SQ_CFG_SQC + public static final String QUERY_UPGRADE_DROP_TABLE_SQ_FORM_CONNECTOR_CONSTRAINT = "ALTER TABLE " + + TABLE_SQ_CONFIG + " DROP CONSTRAINT " + CONSTRAINT_SQF_SQC; + + public static final String QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT = "ALTER TABLE " + + TABLE_SQ_CONFIG + + " ADD CONSTRAINT " + + CONSTRAINT_SQ_CFG_SQC + + " " + + "FOREIGN KEY (" + + COLUMN_SQ_CFG_CONNECTOR + + ") " + + "REFERENCES " + + TABLE_SQ_CONNECTOR + + " (" + + COLUMN_SQC_ID + + ")"; // column rename and constraint add for SQ_INPUT public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_INPUT_FORM_COLUMN = "RENAME COLUMN " @@ -1285,52 +1297,131 @@ public final class DerbySchemaQuery { + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_SQ_LNK_TO + " FOREIGN KEY (" + COLUMN_SQB_TO_LINK + ") REFERENCES " + TABLE_SQ_LINK + " (" + COLUMN_SQ_LNK_ID + ")"; - public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME = - "ALTER TABLE " + TABLE_SQ_JOB + " ADD CONSTRAINT " - + CONSTRAINT_SQB_NAME_UNIQUE + " UNIQUE (" + COLUMN_SQB_NAME + ")"; + public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME = "ALTER TABLE " + + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_NAME_UNIQUE + " UNIQUE (" + + COLUMN_SQB_NAME + ")"; - public static final String QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME = - "ALTER TABLE " + TABLE_SQ_LINK + " ADD CONSTRAINT " - + CONSTRAINT_SQ_LNK_NAME_UNIQUE + " UNIQUE (" + COLUMN_SQ_LNK_NAME + ")"; + public static final String QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME = "ALTER TABLE " + + TABLE_SQ_LINK + + " ADD CONSTRAINT " + + CONSTRAINT_SQ_LNK_NAME_UNIQUE + + " UNIQUE (" + + COLUMN_SQ_LNK_NAME + ")"; + // SQOOP-1557 upgrade queries for table rename for CONNECTOR-> CONFIGURABLE + + // drop the SQ_CONFIG FK for connector table + public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT = "ALTER TABLE " + + TABLE_SQ_CONFIG + " DROP CONSTRAINT " + CONSTRAINT_SQ_CFG_SQC; + + // drop the SQ_LINK FK for connector table + public static final String QUERY_UPGRADE_DROP_TABLE_SQ_LINK_CONSTRAINT = "ALTER TABLE " + + TABLE_SQ_LINK + " DROP CONSTRAINT " + CONSTRAINT_SQ_LNK_SQC; + + // drop the SQ_CONNECTOR_DIRECTION FK for connector table + public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT = "ALTER TABLE " + + TABLE_SQ_CONNECTOR_DIRECTIONS + " DROP CONSTRAINT " + CONSTRAINT_SQCD_SQC; + + // rename + public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTOR_TO_SQ_CONFIGURABLE = "RENAME TABLE " + + TABLE_SQ_CONNECTOR + " TO SQ_CONFIGURABLE"; + + public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONFIG_COLUMN_1 = "RENAME COLUMN " + + TABLE_SQ_CONFIG + "." + COLUMN_SQ_CFG_CONNECTOR + " TO " + COLUMN_SQ_CFG_CONFIGURABLE; + + public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_LINK_COLUMN_1 = "RENAME COLUMN " + + TABLE_SQ_LINK + "." + COLUMN_SQ_LNK_CONNECTOR + " TO " + COLUMN_SQ_LNK_CONFIGURABLE; + + // add a type column to the configurable + public static final String QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_COLUMN_SQC_TYPE = "ALTER TABLE " + + TABLE_SQ_CONFIGURABLE + " ADD COLUMN " + COLUMN_SQC_TYPE + " VARCHAR(32)"; + + // add the constraints back for SQ_CONFIG + public static final String QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONFIGURABLE_CONSTRAINT = "ALTER TABLE " + + TABLE_SQ_CONFIG + + " ADD CONSTRAINT " + + CONSTRAINT_SQ_CFG_SQC + + " " + + "FOREIGN KEY (" + + COLUMN_SQ_CFG_CONFIGURABLE + + ") " + + "REFERENCES " + + TABLE_SQ_CONFIGURABLE + + " (" + + COLUMN_SQC_ID + ")"; + + // add the constraints back for SQ_LINK + public static final String QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONFIGURABLE_CONSTRAINT = "ALTER TABLE " + + TABLE_SQ_LINK + + " ADD CONSTRAINT " + + CONSTRAINT_SQ_LNK_SQC + + " " + + "FOREIGN KEY (" + + COLUMN_SQ_LNK_CONFIGURABLE + + ") " + + "REFERENCES " + + TABLE_SQ_CONFIGURABLE + + " (" + + COLUMN_SQC_ID + ")"; + + // add the constraints back for SQ_CONNECTOR_DIRECTION + public static final String QUERY_UPGRADE_ADD_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT = "ALTER TABLE " + + TABLE_SQ_CONNECTOR_DIRECTIONS + + " ADD CONSTRAINT " + + CONSTRAINT_SQCD_SQC + + " " + + "FOREIGN KEY (" + + COLUMN_SQCD_CONNECTOR + + ") " + + "REFERENCES " + + TABLE_SQ_CONFIGURABLE + + " (" + COLUMN_SQC_ID + ")"; + + // add the constraints back for SQ_CONNECTOR_DIRECTION + public static final String QUERY_UPGRADE_ADD_TABLE_SQ_CONNECTOR_DIRECTION_CONFIGURABLE_CONSTRAINT = "ALTER TABLE " + + TABLE_SQ_LINK + " ADD CONSTRAINT " + CONSTRAINT_SQCD_SQC + " " + + "FOREIGN KEY (" + COLUMN_SQCD_CONNECTOR + ") " + + "REFERENCES " + TABLE_SQ_CONFIGURABLE + " (" + COLUMN_SQC_ID + ")"; + + // Config and Connector directions public static final String STMT_INSERT_DIRECTION = "INSERT INTO " + TABLE_SQ_DIRECTION + " " - + "(" + COLUMN_SQD_NAME + ") VALUES (?)"; + + "(" + COLUMN_SQD_NAME + ") VALUES (?)"; - // DML: Fetch all configs public static final String STMT_FETCH_CONFIG_DIRECTIONS = - "SELECT " - + COLUMN_SQ_CFG_ID + ", " - + COLUMN_SQ_CFG_DIRECTION - + " FROM " + TABLE_SQ_CONFIG; + "SELECT " + + COLUMN_SQ_CFG_ID + ", " + + COLUMN_SQ_CFG_DIRECTION + + " FROM " + TABLE_SQ_CONFIG; public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR = - "ALTER TABLE " + TABLE_SQ_CONFIG + " DROP COLUMN " + COLUMN_SQ_CFG_DIRECTION; + "ALTER TABLE " + TABLE_SQ_CONFIG + " DROP COLUMN " + COLUMN_SQ_CFG_DIRECTION; + public static final String STMT_INSERT_SQ_CONNECTOR_DIRECTIONS = - "INSERT INTO " + TABLE_SQ_CONNECTOR_DIRECTIONS + " " - + "(" + COLUMN_SQCD_CONNECTOR + ", " + COLUMN_SQCD_DIRECTION + ")" - + " VALUES (?, ?)"; + "INSERT INTO " + TABLE_SQ_CONNECTOR_DIRECTIONS + " " + + "(" + COLUMN_SQCD_CONNECTOR + ", " + COLUMN_SQCD_DIRECTION + ")" + + " VALUES (?, ?)"; public static final String STMT_INSERT_SQ_CONFIG_DIRECTIONS = - "INSERT INTO " + TABLE_SQ_CONFIG_DIRECTIONS + " " - + "(" + COLUMN_SQ_CFG_DIR_CONFIG + ", " + COLUMN_SQ_CFG_DIR_DIRECTION + ")" - + " VALUES (?, ?)"; + "INSERT INTO " + TABLE_SQ_CONFIG_DIRECTIONS + " " + + "(" + COLUMN_SQ_CFG_DIR_CONFIG + ", " + COLUMN_SQ_CFG_DIR_DIRECTION + ")" + + " VALUES (?, ?)"; public static final String STMT_SELECT_SQ_CONNECTOR_DIRECTIONS_ALL = - "SELECT " + COLUMN_SQCD_CONNECTOR + ", " + COLUMN_SQCD_DIRECTION - + " FROM " + TABLE_SQ_CONNECTOR_DIRECTIONS; + "SELECT " + COLUMN_SQCD_CONNECTOR + ", " + COLUMN_SQCD_DIRECTION + + " FROM " + TABLE_SQ_CONNECTOR_DIRECTIONS; public static final String STMT_SELECT_SQ_CONNECTOR_DIRECTIONS = - STMT_SELECT_SQ_CONNECTOR_DIRECTIONS_ALL + " WHERE " - + COLUMN_SQCD_CONNECTOR + " = ?"; + STMT_SELECT_SQ_CONNECTOR_DIRECTIONS_ALL + " WHERE " + + COLUMN_SQCD_CONNECTOR + " = ?"; public static final String STMT_SELECT_SQ_CONFIG_DIRECTIONS_ALL = - "SELECT " + COLUMN_SQ_CFG_DIR_CONFIG + ", " + COLUMN_SQ_CFG_DIR_DIRECTION - + " FROM " + TABLE_SQ_CONFIG_DIRECTIONS; + "SELECT " + COLUMN_SQ_CFG_DIR_CONFIG + ", " + COLUMN_SQ_CFG_DIR_DIRECTION + + " FROM " + TABLE_SQ_CONFIG_DIRECTIONS; public static final String STMT_SELECT_SQ_CONFIG_DIRECTIONS = - STMT_SELECT_SQ_CONFIG_DIRECTIONS_ALL + " WHERE " - + COLUMN_SQ_CFG_DIR_CONFIG + " = ?"; + STMT_SELECT_SQ_CONFIG_DIRECTIONS_ALL + " WHERE " + + COLUMN_SQ_CFG_DIR_CONFIG + " = ?"; private DerbySchemaQuery() { // Disable explicit object creation diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java index 366e4eed..b22dbd5c 100644 --- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java +++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java @@ -108,7 +108,7 @@ private Map> getNameToIdListMap(PreparedStatement ps) throws return nameToIdListMap; } - void renameEntities() throws Exception { + void renameEntitiesForConnectionAndForm() throws Exception { // SQ_LINK schema upgrades // drop the constraint before rename and add it back later runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_1); @@ -124,6 +124,8 @@ void renameEntities() throws Exception { runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_6); runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_7); runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_8); + runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONNECTOR_CONSTRAINT); + runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONNECTOR_CONSTRAINT); // SQ_LINK_INPUT schema upgrades runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_TO_SQ_LINK_INPUT); @@ -141,6 +143,8 @@ void renameEntities() throws Exception { runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_4); runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_5); runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_6); + runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_FORM_CONNECTOR_CONSTRAINT); + runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT); // SQ_INPUT schema upgrades runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_INPUT_FORM_COLUMN); @@ -151,12 +155,30 @@ void renameEntities() throws Exception { runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_JOB_COLUMN_2); runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_FROM); runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_TO); + } + + void renameConnectorToConfigurable() throws Exception { + + // SQ_CONNECTOR to SQ_CONFIGURABLE upgrade + runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT); + runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_LINK_CONSTRAINT); + runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT); + + runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTOR_TO_SQ_CONFIGURABLE); + runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONFIG_COLUMN_1); + runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_LINK_COLUMN_1); + runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_COLUMN_SQC_TYPE); + + runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONFIGURABLE_CONSTRAINT); + runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONFIGURABLE_CONSTRAINT); + runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT); } /** - * Create derby schema. - * FIX(SQOOP-1583): This code needs heavy refactoring. Details are in the ticket. + * Create derby schema. FIX(SQOOP-1583): This code needs heavy refactoring. + * Details are in the ticket. + * * @throws Exception */ protected void createOrUpgradeSchema(int version) throws Exception { @@ -196,11 +218,11 @@ protected void createOrUpgradeSchema(int version) throws Exception { runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_FROM); runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_TO); runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE); - // todo:rename entities code - renameEntities(); + renameEntitiesForConnectionAndForm(); // add the name constraints runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME); runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME); + runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR); runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS); runQuery(QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS); @@ -208,12 +230,13 @@ protected void createOrUpgradeSchema(int version) throws Exception { for (Direction direction : Direction.values()) { runQuery(STMT_INSERT_DIRECTION, direction.toString()); } + renameConnectorToConfigurable(); } + // deprecated repository version runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) VALUES('version', '" + version + "')"); - // why the heck do we insert driver version here? - runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) " + - "VALUES('" + DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION + "', '1')"); + // new repository version + runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) VALUES('repository.version', '" + version + "')"); } @@ -382,19 +405,17 @@ protected void loadConnectorAndDriverConfigVersion2() throws Exception { protected void loadConnectorAndDriverConfigVersion4() throws Exception { Long configId; + runQuery("INSERT INTO SQOOP.SQ_CONFIGURABLE(SQC_NAME, SQC_CLASS, SQC_VERSION, SQC_TYPE)" + + "VALUES('A', 'org.apache.sqoop.test.A', '1.0-test', 'CONNECTOR')"); - // Connector entry - runQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)" - + "VALUES('A', 'org.apache.sqoop.test.A', '1.0-test')"); - - for (String connector : new String[]{"1"}) { + for (String connector : new String[] { "1" }) { // Directions runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)" + "VALUES(" + connector + ", 1)"); runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)" + "VALUES(" + connector + ", 2)"); - // connector configs + // connector configs with connectorId as 1 for (String direction : new String[]{null, "1", "2"}) { String type; @@ -405,7 +426,7 @@ protected void loadConnectorAndDriverConfigVersion4() throws Exception { } configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG" - + "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) " + + "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) " + "VALUES(" + connector + ", 'C1', '" + type + "', 0)"); if (direction != null) { @@ -415,7 +436,7 @@ protected void loadConnectorAndDriverConfigVersion4() throws Exception { } configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG" - + "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) " + + "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) " + "VALUES(" + connector + ", 'C2', '" + type + "', 1)"); if (direction != null) { @@ -426,14 +447,18 @@ protected void loadConnectorAndDriverConfigVersion4() throws Exception { } } - // driver config + // insert a driver + runQuery("INSERT INTO SQOOP.SQ_CONFIGURABLE(SQC_NAME, SQC_CLASS, SQC_VERSION, SQC_TYPE)" + + "VALUES('SqoopDriver', 'org.apache.sqoop.driver.Driver', '1.0-test', 'DRIVER')"); + + // driver config with driverId as 2 for (String type : new String[]{"JOB"}) { runQuery("INSERT INTO SQOOP.SQ_CONFIG" - + "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) " - + "VALUES(NULL" + ", 'C1', '" + type + "', 0)"); + + "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) " + + "VALUES(2" + ", 'C1', '" + type + "', 0)"); runQuery("INSERT INTO SQOOP.SQ_CONFIG" - + "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) " - + "VALUES(NULL" + ", 'C2', '" + type + "', 1)"); + + "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) " + + "VALUES(2" + ", 'C2', '" + type + "', 1)"); } // Input entries @@ -442,7 +467,7 @@ protected void loadConnectorAndDriverConfigVersion4() throws Exception { // Connector job (TO) config: 8-11 // Driver JOB config: 12-15 for (int i = 0; i < 4; i++) { - // First config + // First config runQuery("INSERT INTO SQOOP.SQ_INPUT" + "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)" + " VALUES('I1', " + (i * 2 + 1) + ", 0, 'STRING', false, 30)"); @@ -460,6 +485,8 @@ protected void loadConnectorAndDriverConfigVersion4() throws Exception { } } + + /** * Load testing connector and driver config into repository. * @@ -511,9 +538,9 @@ public void loadConnectionsOrLinks(int version) throws Exception { case 4: // Insert two links - CA and CB - runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONNECTOR) " + runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONFIGURABLE) " + "VALUES('CA', 1)"); - runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONNECTOR) " + runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONFIGURABLE) " + "VALUES('CB', 1)"); for (String ci : new String[]{"1", "2"}) { @@ -644,10 +671,10 @@ protected void removeDuplicateJobNames(int version) throws Exception { /** * Add a second connector for testing with multiple connectors */ - public void addConnector() throws Exception { + public void addConnectorB() throws Exception { // Connector entry - Long connectorId = runInsertQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)" - + "VALUES('B', 'org.apache.sqoop.test.B', '1.0-test')"); + Long connectorId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIGURABLE(SQC_NAME, SQC_CLASS, SQC_VERSION, SQC_TYPE)" + + "VALUES('B', 'org.apache.sqoop.test.B', '1.0-test', 'CONNECTOR')"); runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS (SQCD_CONNECTOR, SQCD_DIRECTION) VALUES (" + connectorId + ", 1)"); runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS (SQCD_CONNECTOR, SQCD_DIRECTION) VALUES (" + connectorId + ", 2)"); } @@ -745,18 +772,18 @@ protected MFromConfig getFromConfig() { } protected MToConfig getToConfig() { - return new MToConfig(getConfigs()); + return new MToConfig(getConfigs()); } - + protected MDriverConfig getDriverConfig() { - return new MDriverConfig(getConfigs()); + return new MDriverConfig(getConfigs()); } protected List getConfigs() { List jobConfigs = new LinkedList(); List> inputs = new LinkedList>(); - MInput input = new MStringInput("I1", false, (short)30); + MInput input = new MStringInput("I1", false, (short) 30); inputs.add(input); input = new MMapInput("I2", false); inputs.add(input); diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java index 68a173b6..ca40545b 100644 --- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java +++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java @@ -62,17 +62,17 @@ public void testFindConnector() throws Exception { @Test public void testFindAllConnectors() throws Exception { // No connectors in an empty repository, we expect an empty list - assertEquals(handler.findConnectors(getDerbyDatabaseConnection()).size(),0); - + assertEquals(handler.findConnectors(getDerbyDatabaseConnection()).size(), 0); + // add connector A loadConnectorAndDriverConfig(); - addConnector(); - + // adding connector B + addConnectorB(); // Retrieve connectors List connectors = handler.findConnectors(getDerbyDatabaseConnection()); assertNotNull(connectors); - assertEquals(connectors.size(),2); - assertEquals(connectors.get(0).getUniqueName(),"A"); - assertEquals(connectors.get(1).getUniqueName(),"B"); + assertEquals(connectors.size(), 2); + assertEquals(connectors.get(0).getUniqueName(), "A"); + assertEquals(connectors.get(1).getUniqueName(), "B"); } @Test @@ -83,7 +83,7 @@ public void testRegisterConnector() throws Exception { assertEquals(1, connector.getPersistenceId()); // Now check content in corresponding tables - assertCountForTable("SQOOP.SQ_CONNECTOR", 1); + assertCountForTable("SQOOP.SQ_CONFIGURABLE", 1); assertCountForTable("SQOOP.SQ_CONFIG", 6); assertCountForTable("SQOOP.SQ_INPUT", 12); @@ -92,6 +92,7 @@ public void testRegisterConnector() throws Exception { assertNotNull(retrieved); assertEquals(connector, retrieved); } + @Test public void testFromDirection() throws Exception { MConnector connector = getConnector(true, false); @@ -102,7 +103,7 @@ public void testFromDirection() throws Exception { assertEquals(1, connector.getPersistenceId()); // Now check content in corresponding tables - assertCountForTable("SQOOP.SQ_CONNECTOR", 1); + assertCountForTable("SQOOP.SQ_CONFIGURABLE", 1); assertCountForTable("SQOOP.SQ_CONFIG", 4); assertCountForTable("SQOOP.SQ_INPUT", 8); @@ -122,7 +123,7 @@ public void testToDirection() throws Exception { assertEquals(1, connector.getPersistenceId()); // Now check content in corresponding tables - assertCountForTable("SQOOP.SQ_CONNECTOR", 1); + assertCountForTable("SQOOP.SQ_CONFIGURABLE", 1); assertCountForTable("SQOOP.SQ_CONFIG", 4); assertCountForTable("SQOOP.SQ_INPUT", 8); @@ -142,7 +143,7 @@ public void testNeitherDirection() throws Exception { assertEquals(1, connector.getPersistenceId()); // Now check content in corresponding tables - assertCountForTable("SQOOP.SQ_CONNECTOR", 1); + assertCountForTable("SQOOP.SQ_CONFIGURABLE", 1); assertCountForTable("SQOOP.SQ_CONFIG", 2); assertCountForTable("SQOOP.SQ_INPUT", 4); @@ -151,4 +152,4 @@ public void testNeitherDirection() throws Exception { assertNotNull(retrieved); assertEquals(connector, retrieved); } -} \ No newline at end of file +} diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java index bbf721f2..25a0093f 100644 --- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java +++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java @@ -21,11 +21,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; - -import org.apache.sqoop.json.DriverBean; import org.apache.sqoop.model.MDriver; import org.apache.sqoop.model.MDriverConfig; import org.junit.Before; @@ -48,15 +43,19 @@ public void setUp() throws Exception { } @Test - public void testFindDriverConfig() throws Exception { + public void testFindDriver() throws Exception { // On empty repository, no driverConfig should be there - assertNull(handler.findDriver(getDerbyDatabaseConnection())); + assertNull(handler.findDriver(MDriver.DRIVER_NAME, getDerbyDatabaseConnection())); // Load Connector and DriverConfig into repository // TODO(SQOOP-1582):FIX why load connector config for driver testing? + // add a connector A and driver SqoopDriver loadConnectorAndDriverConfig(); // Retrieve it - MDriver driver = handler.findDriver(getDerbyDatabaseConnection()); + MDriver driver = handler.findDriver(MDriver.DRIVER_NAME, getDerbyDatabaseConnection()); assertNotNull(driver); + assertNotNull(driver.getDriverConfig()); + assertEquals("1.0-test", driver.getVersion()); + assertEquals("1.0-test", driver.getVersion()); // Get original structure MDriverConfig originalDriverConfig = getDriverConfig(); @@ -64,7 +63,7 @@ public void testFindDriverConfig() throws Exception { assertEquals(originalDriverConfig, driver.getDriverConfig()); } - public void testRegisterDriverAndConnectorConfig() throws Exception { + public void testRegisterDriver() throws Exception { MDriver driver = getDriver(); handler.registerDriver(driver, getDerbyDatabaseConnection()); @@ -76,59 +75,22 @@ public void testRegisterDriverAndConnectorConfig() throws Exception { assertCountForTable("SQOOP.SQ_CONFIG", 2); assertCountForTable("SQOOP.SQ_INPUT", 4); - // Registered driver config should be easily recovered back - MDriver retrieved = handler.findDriver(getDerbyDatabaseConnection()); + // Registered driver and config should be easily recovered back + MDriver retrieved = handler.findDriver(MDriver.DRIVER_NAME, getDerbyDatabaseConnection()); assertNotNull(retrieved); assertEquals(driver, retrieved); assertEquals(driver.getVersion(), retrieved.getVersion()); } - private String getDriverVersion() throws Exception { - final String frameworkVersionQuery = - "SELECT SQM_VALUE FROM SQOOP.SQ_SYSTEM WHERE SQM_KEY=?"; - String retVal = null; - PreparedStatement preparedStmt = null; - ResultSet resultSet = null; - try { - preparedStmt = - getDerbyDatabaseConnection().prepareStatement(frameworkVersionQuery); - preparedStmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION); - resultSet = preparedStmt.executeQuery(); - if(resultSet.next()) - retVal = resultSet.getString(1); - return retVal; - } finally { - if(preparedStmt !=null) { - try { - preparedStmt.close(); - } catch(SQLException e) { - } - } - if(resultSet != null) { - try { - resultSet.close(); - } catch(SQLException e) { - } - } - } - } @Test - public void testDriverVersion() throws Exception { + public void testDriverVersionUpgrade() throws Exception { MDriver driver = getDriver(); handler.registerDriver(driver, getDerbyDatabaseConnection()); - - final String lowerVersion = Integer.toString(Integer - .parseInt(DriverBean.CURRENT_DRIVER_VERSION) - 1); - assertEquals(CURRENT_DRIVER_VERSION, getDriverVersion()); - runQuery("UPDATE SQOOP.SQ_SYSTEM SET SQM_VALUE='" + lowerVersion + "' WHERE SQM_KEY = '" - + DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION + "'"); - assertEquals(lowerVersion, getDriverVersion()); - - handler.upgradeDriverConfigs(driver, getDerbyDatabaseConnection()); - - assertEquals(CURRENT_DRIVER_VERSION, driver.getVersion()); - - assertEquals(CURRENT_DRIVER_VERSION, getDriverVersion()); + String registeredDriverVersion = handler.findDriver(MDriver.DRIVER_NAME, getDerbyDatabaseConnection()).getVersion(); + assertEquals(CURRENT_DRIVER_VERSION, registeredDriverVersion); + driver.setVersion("2"); + handler.upgradeDriverAndConfigs(driver, getDerbyDatabaseConnection()); + assertEquals("2", driver.getVersion()); } } diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java index a15bda99..85140d5c 100644 --- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java +++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java @@ -32,6 +32,7 @@ import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.model.MConfig; +import org.apache.sqoop.model.MDriver; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MMapInput; import org.apache.sqoop.model.MStringInput; @@ -298,7 +299,7 @@ public MJob getJob() { return new MJob(1, 1, 1, 1, handler.findConnector("A", derbyConnection).getFromConfig(), handler.findConnector("A", derbyConnection).getToConfig(), - handler.findDriver(derbyConnection).getDriverConfig() + handler.findDriver(MDriver.DRIVER_NAME, derbyConnection).getDriverConfig() ); } } \ No newline at end of file diff --git a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java index 8cf9cf15..45c21a1d 100644 --- a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java +++ b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java @@ -69,25 +69,20 @@ import org.json.simple.JSONValue; /** - * Load user-created content of Sqoop repository from a JSON formatted file - * The loaded connector IDs will be modified to match existing connectors + * Load user-created content of Sqoop repository from a JSON formatted file The + * loaded connector IDs will be modified to match existing connectors */ public class RepositoryLoadTool extends ConfiguredTool { public static final Logger LOG = Logger.getLogger(RepositoryLoadTool.class); - - + @SuppressWarnings("static-access") @Override public boolean runToolWithConfiguration(String[] arguments) { - Options options = new Options(); - options.addOption(OptionBuilder.isRequired() - .hasArg() - .withArgName("filename") - .withLongOpt("input") - .create('i')); + options.addOption(OptionBuilder.isRequired().hasArg().withArgName("filename") + .withLongOpt("input").create('i')); CommandLineParser parser = new GnuParser(); @@ -98,8 +93,7 @@ public boolean runToolWithConfiguration(String[] arguments) { LOG.info("Reading JSON from file" + inputFileName); InputStream input = new FileInputStream(inputFileName); String jsonTxt = IOUtils.toString(input, Charsets.UTF_8); - JSONObject json = - (JSONObject) JSONValue.parse(jsonTxt); + JSONObject json = (JSONObject) JSONValue.parse(jsonTxt); boolean res = load(json); input.close(); return res; @@ -114,99 +108,101 @@ public boolean runToolWithConfiguration(String[] arguments) { return false; } catch (ParseException e) { LOG.error("Error parsing command line arguments:", e); - System.out.println("Error parsing command line arguments. Please check Server logs for details."); + System.out + .println("Error parsing command line arguments. Please check Server logs for details."); return false; } } - private boolean load(JSONObject repo) { - // Validate that loading JSON into repository is supported - JSONObject metadata = (JSONObject) repo.get(JSONConstants.METADATA); + // Validate that loading JSON into repository is supported + JSONObject metadata = (JSONObject) repo.get(JSONConstants.METADATA); - if (metadata == null) { - LOG.error("Malformed JSON. Key "+ JSONConstants.METADATA + " not found."); - return false; - } - - if (!validateMetadata(metadata)){ - LOG.error("Metadata of repository dump file failed validation (see error above for cause). Aborting repository load."); - return false; - } - - // initialize repository as mutable - RepositoryManager.getInstance().initialize(false); - Repository repository = RepositoryManager.getInstance().getRepository(); - - ConnectorManager.getInstance().initialize(); - - LOG.info("Loading Connections"); - - JSONObject jsonConns = (JSONObject) repo.get(JSONConstants.LINKS); - - if (jsonConns == null) { - LOG.error("Malformed JSON file. Key "+ JSONConstants.LINKS + " not found."); - return false; - } - - LinkBean linkBean = new LinkBean(); - linkBean.restore(updateConnectorIDUsingName(jsonConns)); - - HashMap connectionIds = new HashMap(); - - for (MLink link : linkBean.getLinks()) { - long oldId = link.getPersistenceId(); - long newId = loadLink(link); - if (newId == link.PERSISTANCE_ID_DEFAULT) { - LOG.error("loading connection " + link.getName() + " with previous ID " + oldId + " failed. Aborting repository load. Check log for details."); - return false; - } - connectionIds.put(oldId,newId); - } - LOG.info("Loaded " + connectionIds.size() + " connections"); - - LOG.info("Loading Jobs"); - JSONObject jsonJobs = (JSONObject) repo.get(JSONConstants.JOBS); - - if (jsonJobs == null) { - LOG.error("Malformed JSON file. Key "+ JSONConstants.JOBS + " not found."); - return false; - } - - JobBean jobBean = new JobBean(); - jobBean.restore(updateIdUsingMap(updateConnectorIDUsingName(jsonJobs), connectionIds,JSONConstants.LINK_ID)); - - HashMap jobIds = new HashMap(); - for (MJob job: jobBean.getJobs()) { - long oldId = job.getPersistenceId(); - long newId = loadJob(job); - - if (newId == job.PERSISTANCE_ID_DEFAULT) { - LOG.error("loading job " + job.getName() + " failed. Aborting repository load. Check log for details."); - return false; - } - jobIds.put(oldId,newId); - - } - LOG.info("Loaded " + jobIds.size() + " jobs"); - - LOG.info("Loading Submissions"); - JSONObject jsonSubmissions = (JSONObject) repo.get(JSONConstants.SUBMISSIONS); - - if (jsonSubmissions == null) { - LOG.error("Malformed JSON file. Key "+ JSONConstants.SUBMISSIONS + " not found."); + if (metadata == null) { + LOG.error("Malformed JSON. Key " + JSONConstants.METADATA + " not found."); return false; } - SubmissionBean submissionBean = new SubmissionBean(); - submissionBean.restore(updateIdUsingMap(jsonSubmissions,jobIds,JSONConstants.JOB_ID)); - int submissionCount = 0; - for (MSubmission submission: submissionBean.getSubmissions()) { - resetPersistenceId(submission); - repository.createSubmission(submission); - submissionCount++; - } + if (!validateMetadata(metadata)) { + LOG.error("Metadata of repository dump file failed validation (see error above for cause). Aborting repository load."); + return false; + } + + // initialize repository as mutable + RepositoryManager.getInstance().initialize(false); + Repository repository = RepositoryManager.getInstance().getRepository(); + + ConnectorManager.getInstance().initialize(); + LOG.info("Loading Connections"); + + JSONObject jsonConns = (JSONObject) repo.get(JSONConstants.LINKS); + + if (jsonConns == null) { + LOG.error("Malformed JSON file. Key " + JSONConstants.LINKS + " not found."); + return false; + } + + LinkBean linkBean = new LinkBean(); + linkBean.restore(updateConnectorIDUsingName(jsonConns)); + + HashMap connectionIds = new HashMap(); + + for (MLink link : linkBean.getLinks()) { + long oldId = link.getPersistenceId(); + long newId = loadLink(link); + if (newId == link.PERSISTANCE_ID_DEFAULT) { + LOG.error("loading connection " + link.getName() + " with previous ID " + oldId + + " failed. Aborting repository load. Check log for details."); + return false; + } + connectionIds.put(oldId, newId); + } + LOG.info("Loaded " + connectionIds.size() + " connections"); + + LOG.info("Loading Jobs"); + JSONObject jsonJobs = (JSONObject) repo.get(JSONConstants.JOBS); + + if (jsonJobs == null) { + LOG.error("Malformed JSON file. Key " + JSONConstants.JOBS + " not found."); + return false; + } + + JobBean jobBean = new JobBean(); + jobBean.restore(updateIdUsingMap(updateConnectorIDUsingName(jsonJobs), connectionIds, + JSONConstants.LINK_ID)); + + HashMap jobIds = new HashMap(); + for (MJob job : jobBean.getJobs()) { + long oldId = job.getPersistenceId(); + long newId = loadJob(job); + + if (newId == job.PERSISTANCE_ID_DEFAULT) { + LOG.error("loading job " + job.getName() + + " failed. Aborting repository load. Check log for details."); + return false; + } + jobIds.put(oldId, newId); + + } + LOG.info("Loaded " + jobIds.size() + " jobs"); + + LOG.info("Loading Submissions"); + JSONObject jsonSubmissions = (JSONObject) repo.get(JSONConstants.SUBMISSIONS); + + if (jsonSubmissions == null) { + LOG.error("Malformed JSON file. Key " + JSONConstants.SUBMISSIONS + " not found."); + return false; + } + + SubmissionBean submissionBean = new SubmissionBean(); + submissionBean.restore(updateIdUsingMap(jsonSubmissions, jobIds, JSONConstants.JOB_ID)); + int submissionCount = 0; + for (MSubmission submission : submissionBean.getSubmissions()) { + resetPersistenceId(submission); + repository.createSubmission(submission); + submissionCount++; + } LOG.info("Loaded " + submissionCount + " submissions."); LOG.info("Repository load completed successfully."); return true; @@ -216,12 +212,10 @@ private void resetPersistenceId(MPersistableEntity ent) { ent.setPersistenceId(ent.PERSISTANCE_ID_DEFAULT); } - - /** - * Even though the metadata contains version, revision, compile-date and compile-user - * We are only validating that version match for now. - * More interesting logic can be added later + * Even though the metadata contains version, revision, compile-date and + * compile-user We are only validating that version match for now. More + * interesting logic can be added later */ private boolean validateMetadata(JSONObject metadata) { String jsonVersion = (String) metadata.get(JSONConstants.VERSION); @@ -229,13 +223,14 @@ private boolean validateMetadata(JSONObject metadata) { String repoVersion = VersionInfo.getVersion(); if (!jsonVersion.equals(repoVersion)) { - LOG.error("Repository version in file (" + jsonVersion + ") does not match this version of Sqoop (" + repoVersion + ")"); + LOG.error("Repository version in file (" + jsonVersion + + ") does not match this version of Sqoop (" + repoVersion + ")"); return false; } if (!includeSensitive) { - LOG.warn("Loading repository which was dumped without --include-sensitive=true. " + - "This means some sensitive information such as passwords is not included in the dump file and will need to be manually added later."); + LOG.warn("Loading repository which was dumped without --include-sensitive=true. " + + "This means some sensitive information such as passwords is not included in the dump file and will need to be manually added later."); } return true; @@ -243,7 +238,7 @@ private boolean validateMetadata(JSONObject metadata) { private long loadLink(MLink link) { - //starting by pretending we have a brand new connection + // starting by pretending we have a brand new connection resetPersistenceId(link); Repository repository = RepositoryManager.getInstance().getRepository(); @@ -262,11 +257,9 @@ private long loadLink(MLink link) { SqoopConnector connector = ConnectorManager.getInstance().getSqoopConnector( link.getConnectorId()); - Object connectorConfig = ClassUtils.instantiate( - connector.getLinkConfigurationClass()); + Object connectorConfig = ClassUtils.instantiate(connector.getLinkConfigurationClass()); - ConfigUtils.fromConfigs( - link.getConnectorLinkConfig().getConfigs(), connectorConfig); + ConfigUtils.fromConfigs(link.getConnectorLinkConfig().getConfigs(), connectorConfig); ConfigValidationRunner validationRunner = new ConfigValidationRunner(); ConfigValidationResult result = validationRunner.validate(connectorConfig); @@ -284,7 +277,7 @@ private long loadLink(MLink link) { } private long loadJob(MJob job) { - //starting by pretending we have a brand new job + // starting by pretending we have a brand new job resetPersistenceId(job); MConnector mFromConnector = ConnectorManager.getInstance().getConnectorConfigurable(job.getFromConnectorId()); MConnector mToConnector = ConnectorManager.getInstance().getConnectorConfigurable(job.getToConnectorId()); @@ -329,7 +322,8 @@ private long loadJob(MJob job) { job.getDriverConfig().getConfigs(), driverConfig); ConfigValidationRunner validationRunner = new ConfigValidationRunner(); - ConfigValidationResult fromConnectorConfigResult = validationRunner.validate(fromConnectorConfig); + ConfigValidationResult fromConnectorConfigResult = validationRunner + .validate(fromConnectorConfig); ConfigValidationResult toConnectorConfigResult = validationRunner.validate(toConnectorConfig); ConfigValidationResult driverConfigResult = validationRunner.validate(driverConfig); @@ -341,17 +335,17 @@ private long loadJob(MJob job) { } else { LOG.error("Failed to load job:" + job.getName()); - LOG.error("Status of from connector configs:" + fromConnectorConfigResult.getStatus().toString()); + LOG.error("Status of from connector configs:" + + fromConnectorConfigResult.getStatus().toString()); LOG.error("Status of to connector configs:" + toConnectorConfigResult.getStatus().toString()); LOG.error("Status of driver configs:" + driverConfigResult.getStatus().toString()); } return newJob.getPersistenceId(); - } - private JSONObject updateConnectorIDUsingName( JSONObject json) { + private JSONObject updateConnectorIDUsingName(JSONObject json) { JSONArray array = (JSONArray) json.get(JSONConstants.ALL); Repository repository = RepositoryManager.getInstance().getRepository(); @@ -370,11 +364,10 @@ private JSONObject updateConnectorIDUsingName( JSONObject json) { long currentConnectorId = connectorMap.get(connectorName); String connectionName = (String) object.get(JSONConstants.NAME); - // If a given connector now has a different ID, we need to update the ID if (connectorId != currentConnectorId) { - LOG.warn("Connection " + connectionName + " uses connector " + connectorName + ". " + - "Replacing previous ID " + connectorId + " with new ID " + currentConnectorId); + LOG.warn("Connection " + connectionName + " uses connector " + connectorName + ". " + + "Replacing previous ID " + connectorId + " with new ID " + currentConnectorId); object.put(JSONConstants.CONNECTOR_ID, currentConnectorId); } @@ -382,7 +375,7 @@ private JSONObject updateConnectorIDUsingName( JSONObject json) { return json; } - private JSONObject updateIdUsingMap(JSONObject json, HashMap idMap, String fieldName) { + private JSONObject updateIdUsingMap(JSONObject json, HashMap idMap, String fieldName) { JSONArray array = (JSONArray) json.get(JSONConstants.ALL); for (Object obj : array) { @@ -394,6 +387,4 @@ private JSONObject updateIdUsingMap(JSONObject json, HashMap idMap, S return json; } - - }