5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-05 04:20:08 +08:00

SQOOP-1557: Sqoop2: SQ_CONFIGURABLE ( for entities who own configs)

(Veena Basavaraj via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2014-10-21 20:47:46 -07:00
parent 39a2200007
commit 151a0a12a9
17 changed files with 818 additions and 661 deletions

View File

@ -181,6 +181,10 @@ public String getVersion() {
return version;
}
public MConfigurableType getType() {
return MConfigurableType.CONNECTOR;
}
public SupportedDirections getSupportedDirections() {
return new SupportedDirections(this.getConfig(Direction.FROM) != null,
this.getConfig(Direction.TO) != null);

View File

@ -17,15 +17,17 @@
*/
package org.apache.sqoop.model;
import java.sql.Driver;
/**
* Describes the configs associated with the {@link Driver} for executing sqoop jobs.
*/
public final class MDriver extends Configurable {
public static final String DRIVER_NAME = "SqoopDriver";
private final MDriverConfig driverConfig;
private final String version;
private String version;
// Since there is only one Driver in the system, the name is not user specified
private static final String uniqueName = DRIVER_NAME;
public MDriver(MDriverConfig driverConfig, String version) {
this.driverConfig = driverConfig;
@ -68,6 +70,14 @@ public MDriverConfig getDriverConfig() {
return driverConfig;
}
public MConfigurableType getType() {
return MConfigurableType.DRIVER;
}
public String getUniqueName() {
return uniqueName;
}
@Override
public MDriver clone(boolean cloneWithValue) {
cloneWithValue = false;
@ -79,4 +89,8 @@ public MDriver clone(boolean cloneWithValue) {
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
}

View File

@ -158,6 +158,10 @@ public MDriver getDriver() {
return mDriver;
}
public static String getClassName() {
return Driver.getInstance().getClass().getSimpleName();
}
public ResourceBundle getBundle(Locale locale) {
return ResourceBundle.getBundle(DriverConstants.DRIVER_CONFIG_BUNDLE, locale);
}

View File

@ -23,10 +23,11 @@
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.driver.Driver;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MDriver;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MSubmission;
public class JdbcRepository extends Repository {
@ -220,7 +221,7 @@ public MDriver registerDriver(final MDriver mDriver, final boolean autoUpgrade)
return (MDriver) doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) {
MDriver existingDriverConfig = handler.findDriver(conn);
MDriver existingDriverConfig = handler.findDriver(mDriver.getUniqueName(), conn);
if (existingDriverConfig == null) {
handler.registerDriver(mDriver, conn);
return mDriver;
@ -233,7 +234,7 @@ public Object doIt(Connection conn) {
return mDriver;
} else {
throw new SqoopException(RepositoryError.JDBCREPO_0026,
"DriverConfig: " + mDriver.getPersistenceId());
"Driver: " + mDriver.getPersistenceId());
}
}
return existingDriverConfig;
@ -242,6 +243,19 @@ public Object doIt(Connection conn) {
});
}
/**
* {@inheritDoc}
*/
@Override
public MDriver findDriver(final String shortName) {
return (MDriver) doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) throws Exception {
return handler.findDriver(shortName, conn);
}
});
}
/**
* {@inheritDoc}
*/
@ -648,23 +662,23 @@ public Object doIt(Connection conn) throws Exception {
* {@inheritDoc}
*/
@Override
protected void upgradeConnectorConfigs(final MConnector newConnector,
protected void upgradeConnectorAndConfigs(final MConnector newConnector,
RepositoryTransaction tx) {
doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) throws Exception {
handler.upgradeConnectorConfigs(newConnector, conn);
handler.upgradeConnectorAndConfigs(newConnector, conn);
return null;
}
}, (JdbcRepositoryTransaction) tx);
}
protected void upgradeDriverConfigs(final MDriver mDriver, RepositoryTransaction tx) {
protected void upgradeDriverAndConfigs(final MDriver mDriver, RepositoryTransaction tx) {
doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) throws Exception {
handler.upgradeDriverConfigs(mDriver, conn);
handler.upgradeDriverAndConfigs(mDriver, conn);
return null;
}
}, (JdbcRepositoryTransaction) tx);

View File

@ -21,10 +21,10 @@
import java.util.Date;
import java.util.List;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MDriver;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MSubmission;
/**
@ -41,7 +41,7 @@ public abstract class JdbcRepositoryHandler {
/**
* Search for connector with given name in repository.
* And return corresponding connector structure.
* And return corresponding connector entity.
*
* @param shortName Connector unique name
* @param conn JDBC link for querying repository.
@ -101,8 +101,7 @@ public abstract List<MJob> findJobsForConnector(long connectorID,
* @param conn JDBC link for querying repository
*/
public abstract void upgradeConnectorConfigs(MConnector mConnector, Connection conn);
public abstract void upgradeConnectorAndConfigs(MConnector mConnector, Connection conn);
/**
* Upgrade the driver with the new data supplied in the
@ -117,17 +116,16 @@ public abstract List<MJob> findJobsForConnector(long connectorID,
* the driverConfig.
* @param conn JDBC link for querying repository
*/
public abstract void upgradeDriverConfigs(MDriver mDriver, Connection conn);
public abstract void upgradeDriverAndConfigs(MDriver mDriver, Connection conn);
/**
* Search for driverConfigin the repository.
*
* Search for driver in the repository.
* @params shortName the name for the driver
* @param conn JDBC link for querying repository.
* @return null if driverConfig are not yet present in repository or
* @return null if driver are not yet present in repository or
* loaded representation.
*/
public abstract MDriver findDriver(Connection conn);
public abstract MDriver findDriver(String shortName, Connection conn);
/**
* Register driver config in repository.

View File

@ -118,6 +118,14 @@ public abstract class Repository {
*/
public abstract List<MConnector> findConnectors();
/**
* Search for driver in the repository.
* @param shortName Driver unique name
* @return null if driver are not yet present in repository or
* loaded representation.
*/
public abstract MDriver findDriver(String shortName);
/**
* Save given link to repository. This link must not be already
* present in the repository otherwise exception will be thrown.
@ -317,7 +325,7 @@ public abstract class Repository {
* method will not call begin, commit,
* rollback or close on this transaction.
*/
protected abstract void upgradeConnectorConfigs(MConnector newConnector, RepositoryTransaction tx);
protected abstract void upgradeConnectorAndConfigs(MConnector newConnector, RepositoryTransaction tx);
/**
* Upgrade the driver with the new data supplied in the
@ -335,7 +343,7 @@ public abstract class Repository {
* method will not call begin, commit,
* rollback or close on this transaction.
*/
protected abstract void upgradeDriverConfigs(MDriver newDriver, RepositoryTransaction tx);
protected abstract void upgradeDriverAndConfigs(MDriver newDriver, RepositoryTransaction tx);
/**
* Delete all inputs for a job
@ -410,7 +418,7 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
deletelinksAndJobs(existingLinksByConnector, existingJobsByConnector, tx);
// 5. Delete all inputs and configs associated with the connector, and
// insert the new configs and inputs for this connector
upgradeConnectorConfigs(newConnector, tx);
upgradeConnectorAndConfigs(newConnector, tx);
// 6. Run upgrade logic for the configs related to the link objects
// dont always rely on the repository implementation to return empty list for links
if (existingLinksByConnector != null) {
@ -514,7 +522,7 @@ public final void upgradeDriver(MDriver driver) {
deleteJobs(existingJobs, tx);
// 4. Delete all inputs and configs associated with the driver, and
// insert the new configs and inputs for this driver
upgradeDriverConfigs(driver, tx);
upgradeDriverAndConfigs(driver, tx);
for (MJob job : existingJobs) {
// Make a new copy of the configs

View File

@ -167,10 +167,10 @@ public void testConnectorDisableAutoUpgrade() {
*/
@Test
public void testDriverConfigEnableAutoUpgrade() {
MDriver newDriverConfig = driver();
MDriver oldDriverConfig = anotherDriver();
MDriver newDriver = driver();
MDriver oldDriver = anotherDriver();
when(repoHandlerMock.findDriver(any(Connection.class))).thenReturn(oldDriverConfig);
when(repoHandlerMock.findDriver(anyString(), any(Connection.class))).thenReturn(oldDriver);
// make the upgradeDriverConfig to throw an exception to prove that it has been called
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
@ -178,10 +178,10 @@ public void testDriverConfigEnableAutoUpgrade() {
doThrow(exception).when(repoHandlerMock).findJobs(any(Connection.class));
try {
repoSpy.registerDriver(newDriverConfig, true);
repoSpy.registerDriver(newDriver, true);
} catch (SqoopException ex) {
assertEquals(ex.getMessage(), exception.getMessage());
verify(repoHandlerMock, times(1)).findDriver(any(Connection.class));
verify(repoHandlerMock, times(1)).findDriver(anyString(), any(Connection.class));
verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
verifyNoMoreInteractions(repoHandlerMock);
return ;
@ -195,16 +195,16 @@ public void testDriverConfigEnableAutoUpgrade() {
*/
@Test
public void testDriverConfigDisableAutoUpgrade() {
MDriver newDriverConfig = driver();
MDriver oldDriverConfig = anotherDriver();
MDriver newDriver = driver();
MDriver oldDriver = anotherDriver();
when(repoHandlerMock.findDriver(any(Connection.class))).thenReturn(oldDriverConfig);
when(repoHandlerMock.findDriver(anyString(), any(Connection.class))).thenReturn(oldDriver);
try {
repoSpy.registerDriver(newDriverConfig, false);
repoSpy.registerDriver(newDriver, false);
} catch (SqoopException ex) {
assertEquals(ex.getErrorCode(), RepositoryError.JDBCREPO_0026);
verify(repoHandlerMock, times(1)).findDriver(any(Connection.class));
verify(repoHandlerMock, times(1)).findDriver(anyString(),any(Connection.class));
verifyNoMoreInteractions(repoHandlerMock);
return ;
}
@ -242,7 +242,7 @@ public void testConnectorConfigUpgradeWithValidLinksAndJobs() {
doReturn(jobList).when(repoSpy).findJobsForConnector(anyLong());
doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class));
doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
doNothing().when(repoSpy).upgradeConnectorConfigs(any(MConnector.class), any(RepositoryTransaction.class));
doNothing().when(repoSpy).upgradeConnectorAndConfigs(any(MConnector.class), any(RepositoryTransaction.class));
repoSpy.upgradeConnector(oldConnector, newConnector);
@ -258,7 +258,7 @@ public void testConnectorConfigUpgradeWithValidLinksAndJobs() {
repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(1, repoTransactionMock);
repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(2, repoTransactionMock);
repoOrder.verify(repoSpy, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(RepositoryTransaction.class));
repoOrder.verify(repoSpy, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(RepositoryTransaction.class));
repoOrder.verify(repoSpy, times(2)).updateLink(any(MLink.class), any(RepositoryTransaction.class));
repoOrder.verify(repoSpy, times(4)).updateJob(any(MJob.class), any(RepositoryTransaction.class));
repoOrder.verifyNoMoreInteractions();
@ -296,7 +296,7 @@ public void testDriverConfigUpgradeWithValidJobs() {
doReturn(jobList).when(repoSpy).findJobs();
doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class));
doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
doNothing().when(repoSpy).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
doNothing().when(repoSpy).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class));
repoSpy.upgradeDriver(newDriverConfig);
@ -309,7 +309,7 @@ public void testDriverConfigUpgradeWithValidJobs() {
repoOrder.verify(repoSpy, times(1)).getTransaction();
repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock);
repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
repoOrder.verify(repoSpy, times(1)).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
repoOrder.verify(repoSpy, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class));
repoOrder.verify(repoSpy, times(2)).updateJob(any(MJob.class), any(RepositoryTransaction.class));
repoOrder.verifyNoMoreInteractions();
txOrder.verify(repoTransactionMock, times(1)).begin();
@ -339,7 +339,7 @@ public void testDriverConfigUpgradeWithInvalidJobs() {
doReturn(jobList).when(repoSpy).findJobs();
doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
doNothing().when(repoSpy).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
doNothing().when(repoSpy).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class));
try {
repoSpy.upgradeDriver(newDriverConfig);
@ -355,7 +355,7 @@ public void testDriverConfigUpgradeWithInvalidJobs() {
repoOrder.verify(repoSpy, times(1)).getTransaction();
repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock);
repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
repoOrder.verify(repoSpy, times(1)).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
repoOrder.verify(repoSpy, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class));
repoOrder.verifyNoMoreInteractions();
txOrder.verify(repoTransactionMock, times(1)).begin();
txOrder.verify(repoTransactionMock, times(1)).rollback();
@ -535,7 +535,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateConnectorError() {
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
"update connector error.");
doThrow(exception).when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
doThrow(exception).when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
try {
repoSpy.upgradeConnector(oldConnector, newConnector);
@ -545,7 +545,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateConnectorError() {
verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
verifyNoMoreInteractions(repoHandlerMock);
return ;
}
@ -577,7 +577,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateLinkError() {
doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class));
doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class));
doNothing().when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
doNothing().when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class));
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
@ -592,7 +592,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateLinkError() {
verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
verify(repoHandlerMock, times(1)).existsLink(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(1)).updateLink(any(MLink.class), any(Connection.class));
verifyNoMoreInteractions(repoHandlerMock);
@ -626,7 +626,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateJobError() {
doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class));
doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class));
doNothing().when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
doNothing().when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
doNothing().when(repoHandlerMock).updateLink(any(MLink.class), any(Connection.class));
doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class));
doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class));
@ -643,7 +643,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateJobError() {
verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
verify(repoHandlerMock, times(2)).existsLink(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).updateLink(any(MLink.class), any(Connection.class));
verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class));
@ -731,7 +731,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateDriverConfigError() {
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
"update driverConfig entity error.");
doThrow(exception).when(repoHandlerMock).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
doThrow(exception).when(repoHandlerMock).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class));
try {
repoSpy.upgradeDriver(newDriverConfig);
@ -739,7 +739,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateDriverConfigError() {
assertEquals(ex.getMessage(), exception.getMessage());
verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(1)).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
verify(repoHandlerMock, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class));
verifyNoMoreInteractions(repoHandlerMock);
return ;
}
@ -764,7 +764,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateJobError() {
List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
doNothing().when(repoHandlerMock).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
doNothing().when(repoHandlerMock).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class));
doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class));
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
@ -777,7 +777,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateJobError() {
assertEquals(ex.getMessage(), exception.getMessage());
verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(1)).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
verify(repoHandlerMock, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class));
verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(1)).updateJob(any(MJob.class), any(Connection.class));
verifyNoMoreInteractions(repoHandlerMock);

View File

@ -22,13 +22,9 @@ public final class DerbyRepoConstants {
public static final String CONF_PREFIX_DERBY = "derby.";
@Deprecated
// use only for the upgrade code should be removed soon
// use only for the upgrade code
public static final String SYSKEY_VERSION = "version";
public static final String SYSKEY_DERBY_REPOSITORY_VERSION = "version";
// TOOD(VB): SQOOP-1557 move the driver config version to the SQ_CONFIGURABLE, IT SHOULD NOT BE HERE, nor stored in SYSTEM table
public static final String SYSKEY_DRIVER_CONFIG_VERSION = "driver.config.version";
public static final String SYSKEY_DERBY_REPOSITORY_VERSION = "repository.version";
/**
* Expected version of the repository structures.

View File

@ -188,7 +188,11 @@ public enum DerbyRepoError implements ErrorCode {
DERBYREPO_0048("Could not register config direction"),
DERBYREPO_0049("Could not set connector direction")
DERBYREPO_0049("Could not set connector direction"),
/** The system was unable to register driver due to a server error **/
DERBYREPO_0050("Registration of driver failed"),
;
private final String message;

View File

@ -45,9 +45,11 @@
import org.apache.sqoop.common.SupportedDirections;
import org.apache.sqoop.connector.ConnectorHandler;
import org.apache.sqoop.connector.ConnectorManagerUtils;
import org.apache.sqoop.driver.Driver;
import org.apache.sqoop.model.MBooleanInput;
import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MConfigType;
import org.apache.sqoop.model.MConfigurableType;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MDriver;
import org.apache.sqoop.model.MDriverConfig;
@ -102,7 +104,7 @@ public void registerConnector(MConnector mc, Connection conn) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0011,
mc.getUniqueName());
}
mc.setPersistenceId(getConnectorId(mc, conn));
mc.setPersistenceId(insertAndGetConnectorId(mc, conn));
insertConfigsForConnector(mc, conn);
}
@ -116,10 +118,10 @@ private void insertConfigsForDriver(MDriver mDriver, Connection conn) {
PreparedStatement baseConfigStmt = null;
PreparedStatement baseInputStmt = null;
try{
baseConfigStmt = conn.prepareStatement(STMT_INSERT_CONFIG_BASE,
baseConfigStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIG,
Statement.RETURN_GENERATED_KEYS);
baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT,
Statement.RETURN_GENERATED_KEYS);
// Register the job config type, since driver config is per job
@ -145,15 +147,14 @@ private void insertConfigsForConnector (MConnector mc, Connection conn) {
PreparedStatement baseConfigStmt = null;
PreparedStatement baseInputStmt = null;
try{
baseConfigStmt = conn.prepareStatement(STMT_INSERT_CONFIG_BASE,
baseConfigStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIG,
Statement.RETURN_GENERATED_KEYS);
baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT,
Statement.RETURN_GENERATED_KEYS);
// Register link type config for connector
// NOTE: The direction is null for LINK type
registerConfigs(connectorId, null, mc.getLinkConfig().getConfigs(),
// Register link type config
registerConfigs(connectorId, null /* No direction for LINK type config*/, mc.getLinkConfig().getConfigs(),
MConfigType.LINK.name(), baseConfigStmt, baseInputStmt, conn);
// Register both from/to job type config for connector
@ -202,14 +203,15 @@ private void insertConnectorDirections(Long connectorId, SupportedDirections dir
}
}
private long getConnectorId(MConnector mc, Connection conn) {
private long insertAndGetConnectorId(MConnector mc, Connection conn) {
PreparedStatement baseConnectorStmt = null;
try {
baseConnectorStmt = conn.prepareStatement(STMT_INSERT_CONNECTOR_BASE,
baseConnectorStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIGURABLE,
Statement.RETURN_GENERATED_KEYS);
baseConnectorStmt.setString(1, mc.getUniqueName());
baseConnectorStmt.setString(2, mc.getClassName());
baseConnectorStmt.setString(3, mc.getVersion());
baseConnectorStmt.setString(4, mc.getType().name());
int baseConnectorCount = baseConnectorStmt.executeUpdate();
if (baseConnectorCount != 1) {
@ -222,19 +224,44 @@ private long getConnectorId(MConnector mc, Connection conn) {
if (!rsetConnectorId.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
}
insertConnectorDirections(rsetConnectorId.getLong(1),
mc.getSupportedDirections(), conn);
// connector configurable also have directions
insertConnectorDirections(rsetConnectorId.getLong(1), mc.getSupportedDirections(), conn);
return rsetConnectorId.getLong(1);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0014,
mc.toString(), ex);
throw new SqoopException(DerbyRepoError.DERBYREPO_0014, mc.toString(), ex);
} finally {
closeStatements(baseConnectorStmt);
}
}
private long insertAndGetDriverId(MDriver mDriver, Connection conn) {
PreparedStatement baseDriverStmt = null;
try {
baseDriverStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIGURABLE,
Statement.RETURN_GENERATED_KEYS);
baseDriverStmt.setString(1, mDriver.getUniqueName());
baseDriverStmt.setString(2, Driver.getClassName());
baseDriverStmt.setString(3, mDriver.getVersion());
baseDriverStmt.setString(4, mDriver.getType().name());
int baseDriverCount = baseDriverStmt.executeUpdate();
if (baseDriverCount != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0012, Integer.toString(baseDriverCount));
}
ResultSet rsetDriverId = baseDriverStmt.getGeneratedKeys();
if (!rsetDriverId.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
}
return rsetDriverId.getLong(1);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0050, mDriver.toString(), ex);
} finally {
closeStatements(baseDriverStmt);
}
}
/**
* {@inheritDoc}
*/
@ -350,59 +377,6 @@ public int detectRepositoryVersion(Connection conn) {
}
}
/**
* Detect version of the driver
*
* @param conn Connection to the repository
* @return Version of the Driver
*/
private String detectDriverVersion (Connection conn) {
ResultSet rs = null;
PreparedStatement stmt = null;
try {
stmt = conn.prepareStatement(DerbySchemaQuery.STMT_SELECT_SYSTEM);
stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION);
rs = stmt.executeQuery();
if(!rs.next()) {
return null;
}
return rs.getString(1);
} catch (SQLException e) {
LOG.info("Can't fetch driver version.", e);
return null;
} finally {
closeResultSets(rs);
closeStatements(stmt);
}
}
/**
* Create or update driver version
* @param conn Connection to the the repository
* @param mDriver
*/
private void createOrUpdateDriverSystemVersion(Connection conn, String version) {
ResultSet rs = null;
PreparedStatement stmt = null;
try {
stmt = conn.prepareStatement(STMT_DELETE_SYSTEM);
stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION);
stmt.executeUpdate();
closeStatements(stmt);
stmt = conn.prepareStatement(STMT_INSERT_SYSTEM);
stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION);
stmt.setString(2, version);
stmt.executeUpdate();
} catch (SQLException e) {
logException(e);
throw new SqoopException(DerbyRepoError.DERBYREPO_0044, e);
} finally {
closeResultSets(rs);
closeStatements(stmt);
}
}
/**
* {@inheritDoc}
*/
@ -460,9 +434,11 @@ public void createOrUpgradeRepository(Connection conn) {
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE, conn);
// SQOOP-1498 rename entities
renameEntitiesForUpgrade(conn);
renameEntitiesForConnectionAndForm(conn);
// Change direction from VARCHAR to BIGINT + foreign key.
updateDirections(conn, insertDirections(conn));
renameConnectorToConfigurable(conn);
}
// Add unique constraints on job and links for version 4 onwards
if (repositoryVersion > 3) {
@ -474,7 +450,7 @@ public void createOrUpgradeRepository(Connection conn) {
}
// SQOOP-1498 refactoring related upgrades for table and column names
void renameEntitiesForUpgrade(Connection conn) {
void renameEntitiesForConnectionAndForm(Connection conn) {
// LINK
// drop the constraint before rename
runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_1, conn);
@ -491,6 +467,10 @@ void renameEntitiesForUpgrade(Connection conn) {
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_7, conn);
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_8, conn);
// rename constraints
runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONNECTOR_CONSTRAINT, conn);
runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONNECTOR_CONSTRAINT, conn);
LOG.info("LINK TABLE altered");
// LINK_INPUT
@ -511,6 +491,8 @@ void renameEntitiesForUpgrade(Connection conn) {
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_4, conn);
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_5, conn);
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_6, conn);
runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_FORM_CONNECTOR_CONSTRAINT, conn);
runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT, conn);
LOG.info("CONFIG TABLE altered");
@ -528,7 +510,24 @@ void renameEntitiesForUpgrade(Connection conn) {
runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_TO, conn);
LOG.info("JOB TABLE altered and constraints added");
}
private void renameConnectorToConfigurable(Connection conn) {
// SQ_CONNECTOR to SQ_CONFIGURABLE upgrade
runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT, conn);
runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_LINK_CONSTRAINT, conn);
runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT, conn);
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTOR_TO_SQ_CONFIGURABLE, conn);
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONFIG_COLUMN_1, conn);
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_LINK_COLUMN_1, conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_COLUMN_SQC_TYPE, conn);
runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONFIGURABLE_CONSTRAINT, conn);
runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONFIGURABLE_CONSTRAINT, conn);
runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT, conn);
LOG.info("CONNECTOR TABLE altered and constraints added for CONFIGURABLE");
}
private void upgradeRepositoryVersion(Connection conn) {
@ -538,7 +537,6 @@ private void upgradeRepositoryVersion(Connection conn) {
runQuery(STMT_INSERT_SYSTEM, conn, DerbyRepoConstants.SYSKEY_DERBY_REPOSITORY_VERSION, ""
+ DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION);
}
/**
* Insert directions: FROM and TO.
* @param conn
@ -643,6 +641,7 @@ protected void updateDirections(Connection conn, Map<Direction, Long> directionM
}
}
/**
* Upgrade job data from IMPORT/EXPORT to FROM/TO.
* Since the framework is no longer responsible for HDFS,
@ -712,13 +711,13 @@ private void updateJobRepositorySchemaAndData(Connection conn, long connectorId)
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DRIVER_INDEX, conn,
new Long(0), "throttling");
Long linkId = createHdfsConnection(conn, connectorId);
Long connectionId = createHdfsConnection(conn, connectorId);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION_COPY_SQB_FROM_CONNECTION, conn,
"EXPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_CONNECTION, conn,
new Long(linkId), "EXPORT");
new Long(connectionId), "EXPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION, conn,
new Long(linkId), "IMPORT");
new Long(connectionId), "IMPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME, conn,
"fromJobConfig", "table", Direction.FROM.toString());
@ -738,6 +737,7 @@ private void updateJobRepositorySchemaAndData(Connection conn, long connectorId)
* Pre-register HDFS Connector so that config upgrade will work.
* NOTE: This should be used only in the upgrade path
*/
@Deprecated
protected long registerHdfsConnector(Connection conn) {
if (LOG.isTraceEnabled()) {
LOG.trace("Begin HDFS Connector pre-loading.");
@ -760,7 +760,7 @@ protected long registerHdfsConnector(Connection conn) {
if (handler.getUniqueName().equals(CONNECTOR_HDFS)) {
try {
PreparedStatement baseConnectorStmt = conn.prepareStatement(
STMT_INSERT_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS,
STMT_INSERT_INTO_CONFIGURABLE_WITHOUT_SUPPORTED_DIRECTIONS,
Statement.RETURN_GENERATED_KEYS);
baseConnectorStmt.setString(1, handler.getConnectorConfigurable().getUniqueName());
baseConnectorStmt.setString(2, handler.getConnectorConfigurable().getClassName());
@ -854,21 +854,20 @@ public MConnector findConnector(String shortName, Connection conn) {
LOG.debug("Looking up connector: " + shortName);
}
MConnector mc = null;
PreparedStatement baseConnectorFetchStmt = null;
PreparedStatement connectorFetchStmt = null;
try {
baseConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_BASE_CONNECTOR);
baseConnectorFetchStmt.setString(1, shortName);
connectorFetchStmt = conn.prepareStatement(STMT_SELECT_FROM_CONFIGURABLE);
connectorFetchStmt.setString(1, shortName);
List<MConnector> connectors = loadConnectors(baseConnectorFetchStmt, conn);
List<MConnector> connectors = loadConnectors(connectorFetchStmt, conn);
if (connectors.size()==0) {
if (connectors.size() == 0) {
LOG.debug("No connector found by name: " + shortName);
return null;
} else if (connectors.size()==1) {
} else if (connectors.size() == 1) {
LOG.debug("Looking up connector: " + shortName + ", found: " + mc);
return connectors.get(0);
}
else {
} else {
throw new SqoopException(DerbyRepoError.DERBYREPO_0005, shortName);
}
@ -876,7 +875,7 @@ public MConnector findConnector(String shortName, Connection conn) {
logException(ex, shortName);
throw new SqoopException(DerbyRepoError.DERBYREPO_0004, shortName, ex);
} finally {
closeStatements(baseConnectorFetchStmt);
closeStatements(connectorFetchStmt);
}
}
@ -887,7 +886,9 @@ public MConnector findConnector(String shortName, Connection conn) {
public List<MConnector> findConnectors(Connection conn) {
PreparedStatement stmt = null;
try {
stmt = conn.prepareStatement(STMT_SELECT_CONNECTOR_ALL);
stmt = conn.prepareStatement(STMT_SELECT_CONFIGURABLE_ALL_FOR_TYPE);
stmt.setString(1, MConfigurableType.CONNECTOR.name());
return loadConnectors(stmt,conn);
} catch (SQLException ex) {
logException(ex);
@ -897,84 +898,101 @@ public List<MConnector> findConnectors(Connection conn) {
}
}
/**
* {@inheritDoc}
*/
@Override
public void registerDriver(MDriver mDriver, Connection conn) {
if (mDriver.hasPersistenceId()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0011,
"Driver");
throw new SqoopException(DerbyRepoError.DERBYREPO_0011, mDriver.getUniqueName());
}
mDriver.setPersistenceId(insertAndGetDriverId(mDriver, conn));
insertConfigsforDriver(mDriver, conn);
}
private void insertConfigsforDriver(MDriver mDriver, Connection conn) {
PreparedStatement baseConfigStmt = null;
PreparedStatement baseInputStmt = null;
try {
baseConfigStmt = conn.prepareStatement(STMT_INSERT_CONFIG_BASE,
baseConfigStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIG,
Statement.RETURN_GENERATED_KEYS);
baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT,
Statement.RETURN_GENERATED_KEYS);
// Register a driver config as a job type with no owner/connector and direction
registerConfigs(null/* owner*/, null /*direction*/, mDriver.getDriverConfig().getConfigs(),
registerConfigs(mDriver.getPersistenceId(), null /* no direction*/, mDriver.getDriverConfig().getConfigs(),
MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
// We're using hardcoded value for driver config as they are
// represented as NULL in the database.
mDriver.setPersistenceId(1);
} catch (SQLException ex) {
logException(ex, mDriver);
throw new SqoopException(DerbyRepoError.DERBYREPO_0014, ex);
} finally {
closeStatements(baseConfigStmt, baseInputStmt);
}
createOrUpdateDriverSystemVersion(conn, mDriver.getVersion());
}
/**
* {@inheritDoc}
*/
@Override
public MDriver findDriver(Connection conn) {
LOG.debug("Looking up Driver config to create a driver ");
MDriver mDriver = null;
public MDriver findDriver(String shortName, Connection conn) {
LOG.debug("Looking up Driver and config ");
PreparedStatement driverFetchStmt = null;
PreparedStatement driverConfigFetchStmt = null;
PreparedStatement driverConfigInputFetchStmt = null;
MDriver mDriver;
try {
driverConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_DRIVER);
driverConfigInputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT);
driverFetchStmt = conn.prepareStatement(STMT_SELECT_FROM_CONFIGURABLE);
driverFetchStmt.setString(1, shortName);
ResultSet rsDriverSet = driverFetchStmt.executeQuery();
if (!rsDriverSet.next()) {
return null;
}
Long driverId = rsDriverSet.getLong(1);
String driverVersion = rsDriverSet.getString(4);
driverConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
driverConfigFetchStmt.setLong(1, driverId);
driverConfigInputFetchStmt = conn.prepareStatement(STMT_SELECT_INPUT);
List<MConfig> driverConfigs = new ArrayList<MConfig>();
loadDriverConfigs(driverConfigs, driverConfigFetchStmt, driverConfigInputFetchStmt, 1);
if(driverConfigs.isEmpty()) {
if (driverConfigs.isEmpty()) {
return null;
}
mDriver = new MDriver(new MDriverConfig(driverConfigs), detectDriverVersion(conn));
mDriver.setPersistenceId(1);
mDriver = new MDriver(new MDriverConfig(driverConfigs), driverVersion);
mDriver.setPersistenceId(driverId);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0004,
"Driver config", ex);
throw new SqoopException(DerbyRepoError.DERBYREPO_0004, "Driver", ex);
} finally {
if (driverConfigFetchStmt != null) {
try {
driverConfigFetchStmt.close();
} catch (SQLException ex) {
LOG.error("Unable to close config fetch statement", ex);
LOG.error("Unable to close driver config fetch statement", ex);
}
}
if (driverConfigInputFetchStmt != null) {
try {
driverConfigInputFetchStmt.close();
} catch (SQLException ex) {
LOG.error("Unable to close input fetch statement", ex);
LOG.error("Unable to close driver input fetch statement", ex);
}
}
if (driverFetchStmt != null) {
try {
driverFetchStmt.close();
} catch (SQLException ex) {
LOG.error("Unable to close driver fetch statement", ex);
}
}
}
LOG.debug("Looking up Driver config and created driver:" + mDriver);
LOG.debug("Looked up Driver and config");
return mDriver;
}
@ -1228,7 +1246,7 @@ public List<MLink> findLinks(Connection conn) {
public List<MLink> findLinksForConnector(long connectorID, Connection conn) {
PreparedStatement stmt = null;
try {
stmt = conn.prepareStatement(STMT_SELECT_LINK_FOR_CONNECTOR);
stmt = conn.prepareStatement(STMT_SELECT_LINK_FOR_CONNECTOR_CONFIGURABLE);
stmt.setLong(1, connectorID);
return loadLinks(stmt, conn);
} catch (SQLException ex) {
@ -1243,7 +1261,7 @@ public List<MLink> findLinksForConnector(long connectorID, Connection conn) {
* {@inheritDoc}
*/
@Override
public void upgradeConnectorConfigs(MConnector mConnector, Connection conn) {
public void upgradeConnectorAndConfigs(MConnector mConnector, Connection conn) {
updateConnectorAndDeleteConfigs(mConnector, conn);
insertConfigsForConnector(mConnector, conn);
}
@ -1253,13 +1271,14 @@ private void updateConnectorAndDeleteConfigs(MConnector mConnector, Connection c
PreparedStatement deleteConfig = null;
PreparedStatement deleteInput = null;
try {
updateConnectorStatement = conn.prepareStatement(STMT_UPDATE_CONNECTOR);
deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONNECTOR);
deleteConfig = conn.prepareStatement(STMT_DELETE_CONFIGS_FOR_CONNECTOR);
updateConnectorStatement = conn.prepareStatement(STMT_UPDATE_CONFIGURABLE);
deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONFIGURABLE);
deleteConfig = conn.prepareStatement(STMT_DELETE_CONFIGS_FOR_CONFIGURABLE);
updateConnectorStatement.setString(1, mConnector.getUniqueName());
updateConnectorStatement.setString(2, mConnector.getClassName());
updateConnectorStatement.setString(3, mConnector.getVersion());
updateConnectorStatement.setLong(4, mConnector.getPersistenceId());
updateConnectorStatement.setString(4, mConnector.getType().name());
updateConnectorStatement.setLong(5, mConnector.getPersistenceId());
if (updateConnectorStatement.executeUpdate() != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0038);
@ -1281,19 +1300,30 @@ private void updateConnectorAndDeleteConfigs(MConnector mConnector, Connection c
* {@inheritDoc}
*/
@Override
public void upgradeDriverConfigs(MDriver mDriver, Connection conn) {
public void upgradeDriverAndConfigs(MDriver mDriver, Connection conn) {
updateDriverAndDeleteConfigs(mDriver, conn);
createOrUpdateDriverSystemVersion(conn, mDriver.getVersion());
insertConfigsForDriver(mDriver, conn);
}
private void updateDriverAndDeleteConfigs(MDriver mDriver, Connection conn) {
PreparedStatement updateDriverStatement = null;
PreparedStatement deleteConfig = null;
PreparedStatement deleteInput = null;
try {
deleteInput = conn.prepareStatement(STMT_DELETE_DRIVER_INPUTS);
deleteConfig = conn.prepareStatement(STMT_DELETE_DRIVER_CONFIGS);
updateDriverStatement = conn.prepareStatement(STMT_UPDATE_CONFIGURABLE);
deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONFIGURABLE);
deleteConfig = conn.prepareStatement(STMT_DELETE_CONFIGS_FOR_CONFIGURABLE);
updateDriverStatement.setString(1, mDriver.getUniqueName());
updateDriverStatement.setString(2, Driver.getClassName());
updateDriverStatement.setString(3, mDriver.getVersion());
updateDriverStatement.setString(4, mDriver.getType().name());
updateDriverStatement.setLong(5, mDriver.getPersistenceId());
if (updateDriverStatement.executeUpdate() != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0038);
}
deleteInput.setLong(1, mDriver.getPersistenceId());
deleteConfig.setLong(1, mDriver.getPersistenceId());
deleteInput.executeUpdate();
deleteConfig.executeUpdate();
@ -1301,7 +1331,7 @@ private void updateDriverAndDeleteConfigs(MDriver mDriver, Connection conn) {
logException(e, mDriver);
throw new SqoopException(DerbyRepoError.DERBYREPO_0044, e);
} finally {
closeStatements(deleteConfig, deleteInput);
closeStatements(updateDriverStatement, deleteConfig, deleteInput);
}
}
@ -1557,7 +1587,7 @@ public List<MJob> findJobs(Connection conn) {
public List<MJob> findJobsForConnector(long connectorId, Connection conn) {
PreparedStatement stmt = null;
try {
stmt = conn.prepareStatement(STMT_SELECT_ALL_JOBS_FOR_CONNECTOR);
stmt = conn.prepareStatement(STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE);
stmt.setLong(1, connectorId);
stmt.setLong(2, connectorId);
return loadJobs(stmt, conn);
@ -2080,8 +2110,8 @@ private List<MConnector> loadConnectors(PreparedStatement stmt, Connection conn)
try {
rsConnectors = stmt.executeQuery();
connectorConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR);
connectorConfigInputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT);
connectorConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
connectorConfigInputFetchStmt = conn.prepareStatement(STMT_SELECT_INPUT);
while(rsConnectors.next()) {
long connectorId = rsConnectors.getLong(1);
@ -2116,9 +2146,8 @@ private List<MConnector> loadConnectors(PreparedStatement stmt, Connection conn)
}
} finally {
closeResultSets(rsConnectors);
closeStatements(connectorConfigFetchStmt,connectorConfigInputFetchStmt);
closeStatements(connectorConfigFetchStmt, connectorConfigInputFetchStmt);
}
return connectors;
}
@ -2134,7 +2163,7 @@ private List<MLink> loadLinks(PreparedStatement stmt,
rsConnection = stmt.executeQuery();
//
connectorConfigFetchStatement = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR);
connectorConfigFetchStatement = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
connectorConfigInputStatement = conn.prepareStatement(STMT_FETCH_LINK_INPUT);
while(rsConnection.next()) {
@ -2189,14 +2218,15 @@ private List<MJob> loadJobs(PreparedStatement stmt,
try {
rsJob = stmt.executeQuery();
fromConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR);
toConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR);
driverConfigfetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_DRIVER);
// Note: Job does not hold a explicit reference to the driver since every
// job has the same driver
long driverId = this.findDriver(MDriver.DRIVER_NAME, conn).getPersistenceId();
fromConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
toConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
driverConfigfetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
jobInputFetchStmt = conn.prepareStatement(STMT_FETCH_JOB_INPUT);
while(rsJob.next()) {
// why use connector? why cant it be link id?
long fromConnectorId = rsJob.getLong(1);
long toConnectorId = rsJob.getLong(2);
long id = rsJob.getLong(3);
@ -2211,9 +2241,9 @@ private List<MJob> loadJobs(PreparedStatement stmt,
fromConfigFetchStmt.setLong(1, fromConnectorId);
toConfigFetchStmt.setLong(1,toConnectorId);
driverConfigfetchStmt.setLong(1, driverId);
jobInputFetchStmt.setLong(1, id);
//inputFetchStmt.setLong(1, XXX); // Will be filled by loadFrameworkConfigs
jobInputFetchStmt.setLong(3, id);
// FROM entity configs
@ -2283,7 +2313,7 @@ private void registerConfigDirection(Long configId, Direction direction, Connect
*
* Use given prepared statements to create entire config structure in database.
*
* @param connectorId
* @param configurableId
* @param configs
* @param type
* @param baseConfigStmt
@ -2292,17 +2322,17 @@ private void registerConfigDirection(Long configId, Direction direction, Connect
* @return short number of configs registered.
* @throws SQLException
*/
private short registerConfigs(Long connectorId, Direction direction,
private short registerConfigs(Long configurableId, Direction direction,
List<MConfig> configs, String type, PreparedStatement baseConfigStmt,
PreparedStatement baseInputStmt, Connection conn)
throws SQLException {
short configIndex = 0;
for (MConfig config : configs) {
if(connectorId == null) {
if (configurableId == null) {
baseConfigStmt.setNull(1, Types.BIGINT);
} else {
baseConfigStmt.setLong(1, connectorId);
baseConfigStmt.setLong(1, configurableId);
}
baseConfigStmt.setString(2, config.getName());

View File

@ -49,9 +49,14 @@ public final class DerbySchemaConstants {
public static final String COLUMN_SQD_NAME = "SQD_NAME";
// SQ_CONNECTOR
@Deprecated // used only for upgrade
public static final String TABLE_SQ_CONNECTOR_NAME = "SQ_CONNECTOR";
// SQ_CONFIGURABLE
public static final String TABLE_SQ_CONFIGURABLE_NAME = "SQ_CONFIGURABLE";
@Deprecated // used only for upgrade
public static final String TABLE_SQ_CONNECTOR = SCHEMA_PREFIX + TABLE_SQ_CONNECTOR_NAME;
public static final String TABLE_SQ_CONFIGURABLE = SCHEMA_PREFIX + TABLE_SQ_CONFIGURABLE_NAME;
public static final String COLUMN_SQC_ID = "SQC_ID";
@ -61,6 +66,8 @@ public final class DerbySchemaConstants {
public static final String COLUMN_SQC_VERSION = "SQC_VERSION";
public static final String COLUMN_SQC_TYPE = "SQC_TYPE";
// SQ_CONNECTOR_DIRECTIONS
public static final String TABLE_SQ_CONNECTOR_DIRECTIONS_NAME = "SQ_CONNECTOR_DIRECTIONS";
@ -75,12 +82,10 @@ public final class DerbySchemaConstants {
public static final String COLUMN_SQCD_DIRECTION = "SQCD_DIRECTION";
public static final String CONSTRAINT_SQCD_SQC_NAME = CONSTRAINT_PREFIX + "SQCD_SQC";
// FK to the SQ_CONNECTOR table
public static final String CONSTRAINT_SQCD_SQC = SCHEMA_PREFIX + CONSTRAINT_SQCD_SQC_NAME;
public static final String CONSTRAINT_SQCD_SQD_NAME = CONSTRAINT_PREFIX + "SQCD_SQD";
// FK to the SQ_DIRECTION able
public static final String CONSTRAINT_SQCD_SQD = SCHEMA_PREFIX + CONSTRAINT_SQCD_SQD_NAME;
@ -99,7 +104,10 @@ public final class DerbySchemaConstants {
@Deprecated // used only for upgrade
public static final String COLUMN_SQF_CONNECTOR = "SQF_CONNECTOR";
@Deprecated // used only for upgrade path
public static final String COLUMN_SQ_CFG_CONNECTOR = "SQ_CFG_CONNECTOR";
// note this column was renamed again
public static final String COLUMN_SQ_CFG_CONFIGURABLE = "SQ_CFG_CONFIGURABLE";
@Deprecated // used only for upgrade
public static final String COLUMN_SQF_OPERATION = "SQF_OPERATION";
@ -125,8 +133,8 @@ public final class DerbySchemaConstants {
@Deprecated // used only for upgrade
public static final String CONSTRAINT_SQF_SQC = SCHEMA_PREFIX + CONSTRAINT_SQF_SQC_NAME;
// FK constraint on configurable
public static final String CONSTRAINT_SQ_CFG_SQC_NAME = CONSTRAINT_PREFIX + "SQ_CFG_SQC";
public static final String CONSTRAINT_SQ_CFG_SQC = SCHEMA_PREFIX + CONSTRAINT_SQ_CFG_SQC_NAME;
// SQ_CONFIG_DIRECTIONS
@ -202,7 +210,11 @@ public final class DerbySchemaConstants {
public static final String COLUMN_SQ_LNK_NAME = "SQ_LNK_NAME";
@Deprecated // used only for upgrade
public static final String COLUMN_SQN_CONNECTOR = "SQN_CONNECTOR";
@Deprecated // used only for upgrade
public static final String COLUMN_SQ_LNK_CONNECTOR = "SQ_LNK_CONNECTOR";
// Note this column has been renamed twice
public static final String COLUMN_SQ_LNK_CONFIGURABLE = "SQ_LNK_CONFIGURABLE";
@Deprecated // used only for upgrade
public static final String COLUMN_SQN_CREATION_USER = "SQN_CREATION_USER";
public static final String COLUMN_SQ_LNK_CREATION_USER = "SQ_LNK_CREATION_USER";
@ -225,10 +237,10 @@ public final class DerbySchemaConstants {
@Deprecated
public static final String CONSTRAINT_SQN_SQC = SCHEMA_PREFIX + CONSTRAINT_SQN_SQC_NAME;
// FK constraint on the connector configurable
public static final String CONSTRAINT_SQ_LNK_SQC = SCHEMA_PREFIX + CONSTRAINT_SQ_LNK_SQC_NAME;
public static final String CONSTRAINT_SQ_LNK_NAME_UNIQUE_NAME = CONSTRAINT_PREFIX + "SQ_LNK_NAME_UNIQUE";
public static final String CONSTRAINT_SQ_LNK_NAME_UNIQUE = SCHEMA_PREFIX + CONSTRAINT_SQ_LNK_NAME_UNIQUE_NAME;
// SQ_JOB
@ -437,12 +449,12 @@ public final class DerbySchemaConstants {
static {
tablesV1 = new HashSet<String>();
tablesV1.add(TABLE_SQ_CONNECTOR_NAME);
tablesV1.add(TABLE_SQ_LINK_NAME);
tablesV1.add(TABLE_SQ_LINK_INPUT_NAME);
tablesV1.add(TABLE_SQ_CONNECTION_NAME);
tablesV1.add(TABLE_SQ_CONNECTION_INPUT_NAME);
tablesV1.add(TABLE_SQ_COUNTER_NAME);
tablesV1.add(TABLE_SQ_COUNTER_GROUP_NAME);
tablesV1.add(TABLE_SQ_COUNTER_SUBMISSION_NAME);
tablesV1.add(TABLE_SQ_CONFIG_NAME);
tablesV1.add(TABLE_SQ_FORM_NAME);
tablesV1.add(TABLE_SQ_INPUT_NAME);
tablesV1.add(TABLE_SQ_JOB_NAME);
tablesV1.add(TABLE_SQ_JOB_INPUT_NAME);

View File

@ -48,15 +48,16 @@
* </pre>
* </p>
* <p>
* <strong>SQ_CONNECTOR</strong>: Connector registration.
* <strong>SQ_CONFIGURABLE</strong>: Configurable registration.
*
* <pre>
* +-----------------------------+
* | SQ_CONNECTOR |
* | SQ_CONFIGURABLE |
* +-----------------------------+
* | SQC_ID: BIGINT PK AUTO-GEN |
* | SQC_NAME: VARCHAR(64) |
* | SQC_CLASS: VARCHAR(255) |
* | SQC_TYPE: VARCHAR(32) |"CONNECTOR"|"DRIVER"
* | SQC_VERSION: VARCHAR(64) |
* +-----------------------------+
* </pre>
@ -252,7 +253,6 @@
*
* </p>
*/
// NOTE: If you have signed yourself to modify the schema for the repository
// such as a rename, change in table relationships or constraints, embrace yourself!
// The following code is supposed to be a chronological order of how the
@ -561,17 +561,28 @@ public final class DerbySchemaQuery {
"SELECT " + COLUMN_SQD_NAME + " FROM " + TABLE_SQ_DIRECTION
+ " WHERE " + COLUMN_SQD_ID + "=?";
// DML: Fetch connector Given Name
public static final String STMT_FETCH_BASE_CONNECTOR =
//DML: Get configurable by given name
public static final String STMT_SELECT_FROM_CONFIGURABLE =
"SELECT "
+ COLUMN_SQC_ID + ", "
+ COLUMN_SQC_NAME + ", "
+ COLUMN_SQC_CLASS + ", "
+ COLUMN_SQC_VERSION
+ " FROM " + TABLE_SQ_CONNECTOR
+ " FROM " + TABLE_SQ_CONFIGURABLE
+ " WHERE " + COLUMN_SQC_NAME + " = ?";
//DML: Get all configurables for a given type
public static final String STMT_SELECT_CONFIGURABLE_ALL_FOR_TYPE =
"SELECT "
+ COLUMN_SQC_ID + ", "
+ COLUMN_SQC_NAME + ", "
+ COLUMN_SQC_CLASS + ", "
+ COLUMN_SQC_VERSION
+ " FROM " + TABLE_SQ_CONFIGURABLE
+ " WHERE " + COLUMN_SQC_TYPE + " = ?";
// DML: Select all connectors
@Deprecated // used only for upgrade logic
public static final String STMT_SELECT_CONNECTOR_ALL =
"SELECT "
+ COLUMN_SQC_ID + ", "
@ -580,32 +591,20 @@ public final class DerbySchemaQuery {
+ COLUMN_SQC_VERSION
+ " FROM " + TABLE_SQ_CONNECTOR;
// DML: Fetch all configs for a given connector
public static final String STMT_FETCH_CONFIG_CONNECTOR =
//DML: Get all configs for a given configurable
public static final String STMT_SELECT_CONFIG_FOR_CONFIGURABLE =
"SELECT "
+ COLUMN_SQ_CFG_ID + ", "
+ COLUMN_SQ_CFG_CONNECTOR + ", "
+ COLUMN_SQ_CFG_CONFIGURABLE + ", "
+ COLUMN_SQ_CFG_NAME + ", "
+ COLUMN_SQ_CFG_TYPE + ", "
+ COLUMN_SQ_CFG_INDEX
+ " FROM " + TABLE_SQ_CONFIG
+ " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " = ? "
+ " WHERE " + COLUMN_SQ_CFG_CONFIGURABLE + " = ? "
+ " ORDER BY " + COLUMN_SQ_CFG_INDEX;
// DML: Fetch all driver configs
public static final String STMT_FETCH_CONFIG_DRIVER =
"SELECT "
+ COLUMN_SQ_CFG_ID + ", "
+ COLUMN_SQ_CFG_CONNECTOR + ", "
+ COLUMN_SQ_CFG_NAME + ", "
+ COLUMN_SQ_CFG_TYPE + ", "
+ COLUMN_SQ_CFG_INDEX
+ " FROM " + TABLE_SQ_CONFIG
+ " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " IS NULL "
+ " ORDER BY " + COLUMN_SQ_CFG_TYPE + ", " + COLUMN_SQ_CFG_INDEX;
// DML: Fetch inputs for a given config
public static final String STMT_FETCH_INPUT =
// DML: Get inputs for a given config
public static final String STMT_SELECT_INPUT =
"SELECT "
+ COLUMN_SQI_ID + ", "
+ COLUMN_SQI_NAME + ", "
@ -620,7 +619,7 @@ public final class DerbySchemaQuery {
+ " WHERE " + COLUMN_SQI_CONFIG + " = ?"
+ " ORDER BY " + COLUMN_SQI_INDEX;
// DML: Fetch inputs and values for a given link
//DML: Get inputs and values for a given link
public static final String STMT_FETCH_LINK_INPUT =
"SELECT "
+ COLUMN_SQI_ID + ", "
@ -640,7 +639,7 @@ public final class DerbySchemaQuery {
+ " AND (" + COLUMN_SQ_LNKI_LINK + " = ?" + " OR " + COLUMN_SQ_LNKI_LINK + " IS NULL)"
+ " ORDER BY " + COLUMN_SQI_INDEX;
// DML: Fetch inputs and values for a given job
//DML: Fetch inputs and values for a given job
public static final String STMT_FETCH_JOB_INPUT =
"SELECT "
+ COLUMN_SQI_ID + ", "
@ -660,32 +659,34 @@ public final class DerbySchemaQuery {
+ " AND (" + COLUMN_SQBI_JOB + " = ? OR " + COLUMN_SQBI_JOB + " IS NULL)"
+ " ORDER BY " + COLUMN_SQI_INDEX;
// DML: Insert connector base
public static final String STMT_INSERT_CONNECTOR_BASE =
"INSERT INTO " + TABLE_SQ_CONNECTOR + " ("
//DML: Insert into configurable
public static final String STMT_INSERT_INTO_CONFIGURABLE =
"INSERT INTO " + TABLE_SQ_CONFIGURABLE + " ("
+ COLUMN_SQC_NAME + ", "
+ COLUMN_SQC_CLASS + ", "
+ COLUMN_SQC_VERSION + ", "
+ COLUMN_SQC_TYPE
+ ") VALUES (?, ?, ?, ?)";
@Deprecated // used only in the upgrade path
public static final String STMT_INSERT_INTO_CONFIGURABLE_WITHOUT_SUPPORTED_DIRECTIONS =
"INSERT INTO " + TABLE_SQ_CONNECTOR+ " ("
+ COLUMN_SQC_NAME + ", "
+ COLUMN_SQC_CLASS + ", "
+ COLUMN_SQC_VERSION
+ ") VALUES (?, ?, ?)";
public static final String STMT_INSERT_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS =
"INSERT INTO " + TABLE_SQ_CONNECTOR + " ("
+ COLUMN_SQC_NAME + ", "
+ COLUMN_SQC_CLASS + ", "
+ COLUMN_SQC_VERSION
+ ") VALUES (?, ?, ?)";
// DML: Insert config base
public static final String STMT_INSERT_CONFIG_BASE =
//DML: Insert into config
public static final String STMT_INSERT_INTO_CONFIG =
"INSERT INTO " + TABLE_SQ_CONFIG + " ("
+ COLUMN_SQ_CFG_CONNECTOR + ", "
+ COLUMN_SQ_CFG_CONFIGURABLE + ", "
+ COLUMN_SQ_CFG_NAME + ", "
+ COLUMN_SQ_CFG_TYPE + ", "
+ COLUMN_SQ_CFG_INDEX
+ ") VALUES ( ?, ?, ?, ?)";
// DML: Insert config input
public static final String STMT_INSERT_INPUT_BASE =
//DML: Insert into config input
public static final String STMT_INSERT_INTO_INPUT =
"INSERT INTO " + TABLE_SQ_INPUT + " ("
+ COLUMN_SQI_NAME + ", "
+ COLUMN_SQI_CONFIG + ", "
@ -696,13 +697,13 @@ public final class DerbySchemaQuery {
+ COLUMN_SQI_ENUMVALS
+ ") VALUES (?, ?, ?, ?, ?, ?, ?)";
// Delete all configs for a given connector
public static final String STMT_DELETE_CONFIGS_FOR_CONNECTOR =
//Delete all configs for a given configurable
public static final String STMT_DELETE_CONFIGS_FOR_CONFIGURABLE =
"DELETE FROM " + TABLE_SQ_CONFIG
+ " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " = ?";
+ " WHERE " + COLUMN_SQ_CFG_CONFIGURABLE + " = ?";
// Delete all inputs for a given connector
public static final String STMT_DELETE_INPUTS_FOR_CONNECTOR =
//Delete all inputs for a given configurable
public static final String STMT_DELETE_INPUTS_FOR_CONFIGURABLE =
"DELETE FROM " + TABLE_SQ_INPUT
+ " WHERE "
+ COLUMN_SQI_CONFIG
@ -710,34 +711,18 @@ public final class DerbySchemaQuery {
+ COLUMN_SQ_CFG_ID
+ " FROM " + TABLE_SQ_CONFIG
+ " WHERE "
+ COLUMN_SQ_CFG_CONNECTOR + " = ?)";
+ COLUMN_SQ_CFG_CONFIGURABLE + " = ?)";
// Delete all driver inputs
public static final String STMT_DELETE_DRIVER_INPUTS =
"DELETE FROM " + TABLE_SQ_INPUT
+ " WHERE "
+ COLUMN_SQI_CONFIG
+ " IN (SELECT "
+ COLUMN_SQ_CFG_ID
+ " FROM " + TABLE_SQ_CONFIG
+ " WHERE "
+ COLUMN_SQ_CFG_CONNECTOR + " IS NULL)";
// Delete all driver configs
public static final String STMT_DELETE_DRIVER_CONFIGS =
"DELETE FROM " + TABLE_SQ_CONFIG
+ " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " IS NULL";
// Update the connector
public static final String STMT_UPDATE_CONNECTOR =
"UPDATE " + TABLE_SQ_CONNECTOR
//Update the configurable
public static final String STMT_UPDATE_CONFIGURABLE =
"UPDATE " + TABLE_SQ_CONFIGURABLE
+ " SET " + COLUMN_SQC_NAME + " = ?, "
+ COLUMN_SQC_CLASS + " = ?, "
+ COLUMN_SQC_VERSION + " = ? "
+ COLUMN_SQC_VERSION + " = ?, "
+ COLUMN_SQC_TYPE + " = ? "
+ " WHERE " + COLUMN_SQC_ID + " = ?";
// DML: Insert new connection
//DML: Insert new connection
@Deprecated // used only in upgrade path
public static final String STMT_INSERT_CONNECTION =
"INSERT INTO " + TABLE_SQ_CONNECTION + " ("
@ -753,7 +738,7 @@ public final class DerbySchemaQuery {
public static final String STMT_INSERT_LINK =
"INSERT INTO " + TABLE_SQ_LINK + " ("
+ COLUMN_SQ_LNK_NAME + ", "
+ COLUMN_SQ_LNK_CONNECTOR + ", "
+ COLUMN_SQ_LNK_CONFIGURABLE + ", "
+ COLUMN_SQ_LNK_ENABLED + ", "
+ COLUMN_SQ_LNK_CREATION_USER + ", "
+ COLUMN_SQ_LNK_CREATION_DATE + ", "
@ -798,7 +783,7 @@ public final class DerbySchemaQuery {
"SELECT "
+ COLUMN_SQ_LNK_ID + ", "
+ COLUMN_SQ_LNK_NAME + ", "
+ COLUMN_SQ_LNK_CONNECTOR + ", "
+ COLUMN_SQ_LNK_CONFIGURABLE + ", "
+ COLUMN_SQ_LNK_ENABLED + ", "
+ COLUMN_SQ_LNK_CREATION_USER + ", "
+ COLUMN_SQ_LNK_CREATION_DATE + ", "
@ -807,12 +792,12 @@ public final class DerbySchemaQuery {
+ " FROM " + TABLE_SQ_LINK
+ " WHERE " + COLUMN_SQ_LNK_ID + " = ?";
// DML: Select all connections
// DML: Select all links
public static final String STMT_SELECT_LINK_ALL =
"SELECT "
+ COLUMN_SQ_LNK_ID + ", "
+ COLUMN_SQ_LNK_NAME + ", "
+ COLUMN_SQ_LNK_CONNECTOR + ", "
+ COLUMN_SQ_LNK_CONFIGURABLE + ", "
+ COLUMN_SQ_LNK_ENABLED + ", "
+ COLUMN_SQ_LNK_CREATION_USER + ", "
+ COLUMN_SQ_LNK_CREATION_DATE + ", "
@ -820,19 +805,19 @@ public final class DerbySchemaQuery {
+ COLUMN_SQ_LNK_UPDATE_DATE
+ " FROM " + TABLE_SQ_LINK;
// DML: Select all connections for a specific connector.
public static final String STMT_SELECT_LINK_FOR_CONNECTOR =
// DML: Select all links for a specific connector.
public static final String STMT_SELECT_LINK_FOR_CONNECTOR_CONFIGURABLE =
"SELECT "
+ COLUMN_SQ_LNK_ID + ", "
+ COLUMN_SQ_LNK_NAME + ", "
+ COLUMN_SQ_LNK_CONNECTOR + ", "
+ COLUMN_SQ_LNK_CONFIGURABLE + ", "
+ COLUMN_SQ_LNK_ENABLED + ", "
+ COLUMN_SQ_LNK_CREATION_USER + ", "
+ COLUMN_SQ_LNK_CREATION_DATE + ", "
+ COLUMN_SQ_LNK_UPDATE_USER + ", "
+ COLUMN_SQ_LNK_UPDATE_DATE
+ " FROM " + TABLE_SQ_LINK
+ " WHERE " + COLUMN_SQ_LNK_CONNECTOR + " = ?";
+ " WHERE " + COLUMN_SQ_LNK_CONFIGURABLE + " = ?";
// DML: Check if given link exists
public static final String STMT_SELECT_LINK_CHECK_BY_ID =
@ -897,11 +882,11 @@ public final class DerbySchemaQuery {
+ " ON " + COLUMN_SQB_FROM_LINK + " = " + COLUMN_SQ_LNK_ID
+ " WHERE " + COLUMN_SQ_LNK_ID + " = ? ";
// DML: Select all jobs
//DML: Select all jobs
public static final String STMT_SELECT_JOB =
"SELECT "
+ "FROM_LINK." + COLUMN_SQ_LNK_CONNECTOR + ", "
+ "TO_LINK." + COLUMN_SQ_LNK_CONNECTOR + ", "
+ "FROM_CONNECTOR." + COLUMN_SQ_LNK_CONFIGURABLE + ", "
+ "TO_CONNECTOR." + COLUMN_SQ_LNK_CONFIGURABLE + ", "
+ "JOB." + COLUMN_SQB_ID + ", "
+ "JOB." + COLUMN_SQB_NAME + ", "
+ "JOB." + COLUMN_SQB_FROM_LINK + ", "
@ -912,19 +897,20 @@ public final class DerbySchemaQuery {
+ "JOB." + COLUMN_SQB_UPDATE_USER + ", "
+ "JOB." + COLUMN_SQB_UPDATE_DATE
+ " FROM " + TABLE_SQ_JOB + " JOB"
+ " LEFT JOIN " + TABLE_SQ_LINK + " FROM_LINK"
+ " ON " + COLUMN_SQB_FROM_LINK + " = FROM_LINK." + COLUMN_SQ_LNK_ID
+ " LEFT JOIN " + TABLE_SQ_LINK + " TO_LINK"
+ " ON " + COLUMN_SQB_TO_LINK + " = TO_LINK." + COLUMN_SQ_LNK_ID;
+ " LEFT JOIN " + TABLE_SQ_LINK + " FROM_CONNECTOR"
+ " ON " + COLUMN_SQB_FROM_LINK + " = FROM_CONNECTOR." + COLUMN_SQ_LNK_ID
+ " LEFT JOIN " + TABLE_SQ_LINK + " TO_CONNECTOR"
+ " ON " + COLUMN_SQB_TO_LINK + " = TO_CONNECTOR." + COLUMN_SQ_LNK_ID;
// DML: Select one specific job
public static final String STMT_SELECT_JOB_SINGLE_BY_ID =
STMT_SELECT_JOB + " WHERE " + COLUMN_SQB_ID + " = ?";
// DML: Select all jobs for a Connector
public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR = STMT_SELECT_JOB
+ " WHERE FROM_LINK." + COLUMN_SQ_LNK_CONNECTOR + " = ? OR TO_LINK."
+ COLUMN_SQ_LNK_CONNECTOR + " = ?";
public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE =
STMT_SELECT_JOB
+ " WHERE FROM_LINK." + COLUMN_SQ_LNK_CONFIGURABLE + " = ? OR TO_LINK."
+ COLUMN_SQ_LNK_CONFIGURABLE + " = ?";
// DML: Insert new submission
public static final String STMT_INSERT_SUBMISSION =
@ -1225,6 +1211,15 @@ public final class DerbySchemaQuery {
public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_8 = "RENAME COLUMN "
+ TABLE_SQ_LINK + "." + COLUMN_SQN_ENABLED + " TO " + COLUMN_SQ_LNK_ENABLED;
// rename the constraint CONSTRAINT_SQF_SQC to CONSTRAINT_SQ_CFG_SQC
public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONNECTOR_CONSTRAINT = "ALTER TABLE "
+ TABLE_SQ_LINK + " DROP CONSTRAINT " + CONSTRAINT_SQN_SQC;
public static final String QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONNECTOR_CONSTRAINT = "ALTER TABLE "
+ TABLE_SQ_LINK + " ADD CONSTRAINT " + CONSTRAINT_SQ_LNK_SQC + " " + "FOREIGN KEY ("
+ COLUMN_SQ_LNK_CONNECTOR + ") " + "REFERENCES " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID
+ ")";
// table rename for CONNECTION_INPUT -> LINK_INPUT
public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_TO_SQ_LINK_INPUT = "RENAME TABLE "
+ TABLE_SQ_CONNECTION_INPUT + " TO SQ_LINK_INPUT";
@ -1262,6 +1257,23 @@ public final class DerbySchemaQuery {
public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_6 = "RENAME COLUMN "
+ TABLE_SQ_CONFIG + "." + COLUMN_SQF_INDEX + " TO " + COLUMN_SQ_CFG_INDEX;
// rename the constraint CONSTRAINT_SQF_SQC to CONSTRAINT_SQ_CFG_SQC
public static final String QUERY_UPGRADE_DROP_TABLE_SQ_FORM_CONNECTOR_CONSTRAINT = "ALTER TABLE "
+ TABLE_SQ_CONFIG + " DROP CONSTRAINT " + CONSTRAINT_SQF_SQC;
public static final String QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT = "ALTER TABLE "
+ TABLE_SQ_CONFIG
+ " ADD CONSTRAINT "
+ CONSTRAINT_SQ_CFG_SQC
+ " "
+ "FOREIGN KEY ("
+ COLUMN_SQ_CFG_CONNECTOR
+ ") "
+ "REFERENCES "
+ TABLE_SQ_CONNECTOR
+ " ("
+ COLUMN_SQC_ID
+ ")";
// column rename and constraint add for SQ_INPUT
public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_INPUT_FORM_COLUMN = "RENAME COLUMN "
@ -1285,18 +1297,96 @@ public final class DerbySchemaQuery {
+ TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_SQ_LNK_TO + " FOREIGN KEY ("
+ COLUMN_SQB_TO_LINK + ") REFERENCES " + TABLE_SQ_LINK + " (" + COLUMN_SQ_LNK_ID + ")";
public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME =
"ALTER TABLE " + TABLE_SQ_JOB + " ADD CONSTRAINT "
+ CONSTRAINT_SQB_NAME_UNIQUE + " UNIQUE (" + COLUMN_SQB_NAME + ")";
public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME = "ALTER TABLE "
+ TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_NAME_UNIQUE + " UNIQUE ("
+ COLUMN_SQB_NAME + ")";
public static final String QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME =
"ALTER TABLE " + TABLE_SQ_LINK + " ADD CONSTRAINT "
+ CONSTRAINT_SQ_LNK_NAME_UNIQUE + " UNIQUE (" + COLUMN_SQ_LNK_NAME + ")";
public static final String QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME = "ALTER TABLE "
+ TABLE_SQ_LINK
+ " ADD CONSTRAINT "
+ CONSTRAINT_SQ_LNK_NAME_UNIQUE
+ " UNIQUE ("
+ COLUMN_SQ_LNK_NAME + ")";
// SQOOP-1557 upgrade queries for table rename for CONNECTOR-> CONFIGURABLE
// drop the SQ_CONFIG FK for connector table
public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT = "ALTER TABLE "
+ TABLE_SQ_CONFIG + " DROP CONSTRAINT " + CONSTRAINT_SQ_CFG_SQC;
// drop the SQ_LINK FK for connector table
public static final String QUERY_UPGRADE_DROP_TABLE_SQ_LINK_CONSTRAINT = "ALTER TABLE "
+ TABLE_SQ_LINK + " DROP CONSTRAINT " + CONSTRAINT_SQ_LNK_SQC;
// drop the SQ_CONNECTOR_DIRECTION FK for connector table
public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT = "ALTER TABLE "
+ TABLE_SQ_CONNECTOR_DIRECTIONS + " DROP CONSTRAINT " + CONSTRAINT_SQCD_SQC;
// rename
public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTOR_TO_SQ_CONFIGURABLE = "RENAME TABLE "
+ TABLE_SQ_CONNECTOR + " TO SQ_CONFIGURABLE";
public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONFIG_COLUMN_1 = "RENAME COLUMN "
+ TABLE_SQ_CONFIG + "." + COLUMN_SQ_CFG_CONNECTOR + " TO " + COLUMN_SQ_CFG_CONFIGURABLE;
public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_LINK_COLUMN_1 = "RENAME COLUMN "
+ TABLE_SQ_LINK + "." + COLUMN_SQ_LNK_CONNECTOR + " TO " + COLUMN_SQ_LNK_CONFIGURABLE;
// add a type column to the configurable
public static final String QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_COLUMN_SQC_TYPE = "ALTER TABLE "
+ TABLE_SQ_CONFIGURABLE + " ADD COLUMN " + COLUMN_SQC_TYPE + " VARCHAR(32)";
// add the constraints back for SQ_CONFIG
public static final String QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONFIGURABLE_CONSTRAINT = "ALTER TABLE "
+ TABLE_SQ_CONFIG
+ " ADD CONSTRAINT "
+ CONSTRAINT_SQ_CFG_SQC
+ " "
+ "FOREIGN KEY ("
+ COLUMN_SQ_CFG_CONFIGURABLE
+ ") "
+ "REFERENCES "
+ TABLE_SQ_CONFIGURABLE
+ " ("
+ COLUMN_SQC_ID + ")";
// add the constraints back for SQ_LINK
public static final String QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONFIGURABLE_CONSTRAINT = "ALTER TABLE "
+ TABLE_SQ_LINK
+ " ADD CONSTRAINT "
+ CONSTRAINT_SQ_LNK_SQC
+ " "
+ "FOREIGN KEY ("
+ COLUMN_SQ_LNK_CONFIGURABLE
+ ") "
+ "REFERENCES "
+ TABLE_SQ_CONFIGURABLE
+ " ("
+ COLUMN_SQC_ID + ")";
// add the constraints back for SQ_CONNECTOR_DIRECTION
public static final String QUERY_UPGRADE_ADD_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT = "ALTER TABLE "
+ TABLE_SQ_CONNECTOR_DIRECTIONS
+ " ADD CONSTRAINT "
+ CONSTRAINT_SQCD_SQC
+ " "
+ "FOREIGN KEY ("
+ COLUMN_SQCD_CONNECTOR
+ ") "
+ "REFERENCES "
+ TABLE_SQ_CONFIGURABLE
+ " (" + COLUMN_SQC_ID + ")";
// add the constraints back for SQ_CONNECTOR_DIRECTION
public static final String QUERY_UPGRADE_ADD_TABLE_SQ_CONNECTOR_DIRECTION_CONFIGURABLE_CONSTRAINT = "ALTER TABLE "
+ TABLE_SQ_LINK + " ADD CONSTRAINT " + CONSTRAINT_SQCD_SQC + " "
+ "FOREIGN KEY (" + COLUMN_SQCD_CONNECTOR + ") "
+ "REFERENCES " + TABLE_SQ_CONFIGURABLE + " (" + COLUMN_SQC_ID + ")";
// Config and Connector directions
public static final String STMT_INSERT_DIRECTION = "INSERT INTO " + TABLE_SQ_DIRECTION + " "
+ "(" + COLUMN_SQD_NAME + ") VALUES (?)";
// DML: Fetch all configs
public static final String STMT_FETCH_CONFIG_DIRECTIONS =
"SELECT "
+ COLUMN_SQ_CFG_ID + ", "
@ -1306,6 +1396,7 @@ public final class DerbySchemaQuery {
public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR =
"ALTER TABLE " + TABLE_SQ_CONFIG + " DROP COLUMN " + COLUMN_SQ_CFG_DIRECTION;
public static final String STMT_INSERT_SQ_CONNECTOR_DIRECTIONS =
"INSERT INTO " + TABLE_SQ_CONNECTOR_DIRECTIONS + " "
+ "(" + COLUMN_SQCD_CONNECTOR + ", " + COLUMN_SQCD_DIRECTION + ")"

View File

@ -108,7 +108,7 @@ private Map<String, List<Long>> getNameToIdListMap(PreparedStatement ps) throws
return nameToIdListMap;
}
void renameEntities() throws Exception {
void renameEntitiesForConnectionAndForm() throws Exception {
// SQ_LINK schema upgrades
// drop the constraint before rename and add it back later
runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_1);
@ -124,6 +124,8 @@ void renameEntities() throws Exception {
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_6);
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_7);
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_8);
runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONNECTOR_CONSTRAINT);
runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONNECTOR_CONSTRAINT);
// SQ_LINK_INPUT schema upgrades
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_TO_SQ_LINK_INPUT);
@ -141,6 +143,8 @@ void renameEntities() throws Exception {
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_4);
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_5);
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_6);
runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_FORM_CONNECTOR_CONSTRAINT);
runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT);
// SQ_INPUT schema upgrades
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_INPUT_FORM_COLUMN);
@ -151,12 +155,30 @@ void renameEntities() throws Exception {
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_JOB_COLUMN_2);
runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_FROM);
runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_TO);
}
void renameConnectorToConfigurable() throws Exception {
// SQ_CONNECTOR to SQ_CONFIGURABLE upgrade
runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT);
runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_LINK_CONSTRAINT);
runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT);
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTOR_TO_SQ_CONFIGURABLE);
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONFIG_COLUMN_1);
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_LINK_COLUMN_1);
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_COLUMN_SQC_TYPE);
runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONFIGURABLE_CONSTRAINT);
runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONFIGURABLE_CONSTRAINT);
runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT);
}
/**
* Create derby schema.
* FIX(SQOOP-1583): This code needs heavy refactoring. Details are in the ticket.
* Create derby schema. FIX(SQOOP-1583): This code needs heavy refactoring.
* Details are in the ticket.
*
* @throws Exception
*/
protected void createOrUpgradeSchema(int version) throws Exception {
@ -196,11 +218,11 @@ protected void createOrUpgradeSchema(int version) throws Exception {
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_FROM);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_TO);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE);
// todo:rename entities code
renameEntities();
renameEntitiesForConnectionAndForm();
// add the name constraints
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME);
runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME);
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR);
runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS);
runQuery(QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS);
@ -208,12 +230,13 @@ protected void createOrUpgradeSchema(int version) throws Exception {
for (Direction direction : Direction.values()) {
runQuery(STMT_INSERT_DIRECTION, direction.toString());
}
renameConnectorToConfigurable();
}
// deprecated repository version
runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) VALUES('version', '" + version + "')");
// why the heck do we insert driver version here?
runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) " +
"VALUES('" + DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION + "', '1')");
// new repository version
runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) VALUES('repository.version', '" + version + "')");
}
@ -382,19 +405,17 @@ protected void loadConnectorAndDriverConfigVersion2() throws Exception {
protected void loadConnectorAndDriverConfigVersion4() throws Exception {
Long configId;
runQuery("INSERT INTO SQOOP.SQ_CONFIGURABLE(SQC_NAME, SQC_CLASS, SQC_VERSION, SQC_TYPE)"
+ "VALUES('A', 'org.apache.sqoop.test.A', '1.0-test', 'CONNECTOR')");
// Connector entry
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
+ "VALUES('A', 'org.apache.sqoop.test.A', '1.0-test')");
for (String connector : new String[]{"1"}) {
for (String connector : new String[] { "1" }) {
// Directions
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)"
+ "VALUES(" + connector + ", 1)");
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)"
+ "VALUES(" + connector + ", 2)");
// connector configs
// connector configs with connectorId as 1
for (String direction : new String[]{null, "1", "2"}) {
String type;
@ -405,7 +426,7 @@ protected void loadConnectorAndDriverConfigVersion4() throws Exception {
}
configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(" + connector + ", 'C1', '" + type + "', 0)");
if (direction != null) {
@ -415,7 +436,7 @@ protected void loadConnectorAndDriverConfigVersion4() throws Exception {
}
configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(" + connector + ", 'C2', '" + type + "', 1)");
if (direction != null) {
@ -426,14 +447,18 @@ protected void loadConnectorAndDriverConfigVersion4() throws Exception {
}
}
// driver config
// insert a driver
runQuery("INSERT INTO SQOOP.SQ_CONFIGURABLE(SQC_NAME, SQC_CLASS, SQC_VERSION, SQC_TYPE)"
+ "VALUES('SqoopDriver', 'org.apache.sqoop.driver.Driver', '1.0-test', 'DRIVER')");
// driver config with driverId as 2
for (String type : new String[]{"JOB"}) {
runQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(NULL" + ", 'C1', '" + type + "', 0)");
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(2" + ", 'C1', '" + type + "', 0)");
runQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(NULL" + ", 'C2', '" + type + "', 1)");
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(2" + ", 'C2', '" + type + "', 1)");
}
// Input entries
@ -460,6 +485,8 @@ protected void loadConnectorAndDriverConfigVersion4() throws Exception {
}
}
/**
* Load testing connector and driver config into repository.
*
@ -511,9 +538,9 @@ public void loadConnectionsOrLinks(int version) throws Exception {
case 4:
// Insert two links - CA and CB
runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONNECTOR) "
runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONFIGURABLE) "
+ "VALUES('CA', 1)");
runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONNECTOR) "
runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONFIGURABLE) "
+ "VALUES('CB', 1)");
for (String ci : new String[]{"1", "2"}) {
@ -644,10 +671,10 @@ protected void removeDuplicateJobNames(int version) throws Exception {
/**
* Add a second connector for testing with multiple connectors
*/
public void addConnector() throws Exception {
public void addConnectorB() throws Exception {
// Connector entry
Long connectorId = runInsertQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
+ "VALUES('B', 'org.apache.sqoop.test.B', '1.0-test')");
Long connectorId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIGURABLE(SQC_NAME, SQC_CLASS, SQC_VERSION, SQC_TYPE)"
+ "VALUES('B', 'org.apache.sqoop.test.B', '1.0-test', 'CONNECTOR')");
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS (SQCD_CONNECTOR, SQCD_DIRECTION) VALUES (" + connectorId + ", 1)");
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS (SQCD_CONNECTOR, SQCD_DIRECTION) VALUES (" + connectorId + ", 2)");
}
@ -756,7 +783,7 @@ protected List<MConfig> getConfigs() {
List<MConfig> jobConfigs = new LinkedList<MConfig>();
List<MInput<?>> inputs = new LinkedList<MInput<?>>();
MInput input = new MStringInput("I1", false, (short)30);
MInput input = new MStringInput("I1", false, (short) 30);
inputs.add(input);
input = new MMapInput("I2", false);
inputs.add(input);

View File

@ -62,17 +62,17 @@ public void testFindConnector() throws Exception {
@Test
public void testFindAllConnectors() throws Exception {
// No connectors in an empty repository, we expect an empty list
assertEquals(handler.findConnectors(getDerbyDatabaseConnection()).size(),0);
assertEquals(handler.findConnectors(getDerbyDatabaseConnection()).size(), 0);
// add connector A
loadConnectorAndDriverConfig();
addConnector();
// adding connector B
addConnectorB();
// Retrieve connectors
List<MConnector> connectors = handler.findConnectors(getDerbyDatabaseConnection());
assertNotNull(connectors);
assertEquals(connectors.size(),2);
assertEquals(connectors.get(0).getUniqueName(),"A");
assertEquals(connectors.get(1).getUniqueName(),"B");
assertEquals(connectors.size(), 2);
assertEquals(connectors.get(0).getUniqueName(), "A");
assertEquals(connectors.get(1).getUniqueName(), "B");
}
@Test
@ -83,7 +83,7 @@ public void testRegisterConnector() throws Exception {
assertEquals(1, connector.getPersistenceId());
// Now check content in corresponding tables
assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
assertCountForTable("SQOOP.SQ_CONFIGURABLE", 1);
assertCountForTable("SQOOP.SQ_CONFIG", 6);
assertCountForTable("SQOOP.SQ_INPUT", 12);
@ -92,6 +92,7 @@ public void testRegisterConnector() throws Exception {
assertNotNull(retrieved);
assertEquals(connector, retrieved);
}
@Test
public void testFromDirection() throws Exception {
MConnector connector = getConnector(true, false);
@ -102,7 +103,7 @@ public void testFromDirection() throws Exception {
assertEquals(1, connector.getPersistenceId());
// Now check content in corresponding tables
assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
assertCountForTable("SQOOP.SQ_CONFIGURABLE", 1);
assertCountForTable("SQOOP.SQ_CONFIG", 4);
assertCountForTable("SQOOP.SQ_INPUT", 8);
@ -122,7 +123,7 @@ public void testToDirection() throws Exception {
assertEquals(1, connector.getPersistenceId());
// Now check content in corresponding tables
assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
assertCountForTable("SQOOP.SQ_CONFIGURABLE", 1);
assertCountForTable("SQOOP.SQ_CONFIG", 4);
assertCountForTable("SQOOP.SQ_INPUT", 8);
@ -142,7 +143,7 @@ public void testNeitherDirection() throws Exception {
assertEquals(1, connector.getPersistenceId());
// Now check content in corresponding tables
assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
assertCountForTable("SQOOP.SQ_CONFIGURABLE", 1);
assertCountForTable("SQOOP.SQ_CONFIG", 2);
assertCountForTable("SQOOP.SQ_INPUT", 4);

View File

@ -21,11 +21,6 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.sqoop.json.DriverBean;
import org.apache.sqoop.model.MDriver;
import org.apache.sqoop.model.MDriverConfig;
import org.junit.Before;
@ -48,15 +43,19 @@ public void setUp() throws Exception {
}
@Test
public void testFindDriverConfig() throws Exception {
public void testFindDriver() throws Exception {
// On empty repository, no driverConfig should be there
assertNull(handler.findDriver(getDerbyDatabaseConnection()));
assertNull(handler.findDriver(MDriver.DRIVER_NAME, getDerbyDatabaseConnection()));
// Load Connector and DriverConfig into repository
// TODO(SQOOP-1582):FIX why load connector config for driver testing?
// add a connector A and driver SqoopDriver
loadConnectorAndDriverConfig();
// Retrieve it
MDriver driver = handler.findDriver(getDerbyDatabaseConnection());
MDriver driver = handler.findDriver(MDriver.DRIVER_NAME, getDerbyDatabaseConnection());
assertNotNull(driver);
assertNotNull(driver.getDriverConfig());
assertEquals("1.0-test", driver.getVersion());
assertEquals("1.0-test", driver.getVersion());
// Get original structure
MDriverConfig originalDriverConfig = getDriverConfig();
@ -64,7 +63,7 @@ public void testFindDriverConfig() throws Exception {
assertEquals(originalDriverConfig, driver.getDriverConfig());
}
public void testRegisterDriverAndConnectorConfig() throws Exception {
public void testRegisterDriver() throws Exception {
MDriver driver = getDriver();
handler.registerDriver(driver, getDerbyDatabaseConnection());
@ -76,59 +75,22 @@ public void testRegisterDriverAndConnectorConfig() throws Exception {
assertCountForTable("SQOOP.SQ_CONFIG", 2);
assertCountForTable("SQOOP.SQ_INPUT", 4);
// Registered driver config should be easily recovered back
MDriver retrieved = handler.findDriver(getDerbyDatabaseConnection());
// Registered driver and config should be easily recovered back
MDriver retrieved = handler.findDriver(MDriver.DRIVER_NAME, getDerbyDatabaseConnection());
assertNotNull(retrieved);
assertEquals(driver, retrieved);
assertEquals(driver.getVersion(), retrieved.getVersion());
}
private String getDriverVersion() throws Exception {
final String frameworkVersionQuery =
"SELECT SQM_VALUE FROM SQOOP.SQ_SYSTEM WHERE SQM_KEY=?";
String retVal = null;
PreparedStatement preparedStmt = null;
ResultSet resultSet = null;
try {
preparedStmt =
getDerbyDatabaseConnection().prepareStatement(frameworkVersionQuery);
preparedStmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION);
resultSet = preparedStmt.executeQuery();
if(resultSet.next())
retVal = resultSet.getString(1);
return retVal;
} finally {
if(preparedStmt !=null) {
try {
preparedStmt.close();
} catch(SQLException e) {
}
}
if(resultSet != null) {
try {
resultSet.close();
} catch(SQLException e) {
}
}
}
}
@Test
public void testDriverVersion() throws Exception {
public void testDriverVersionUpgrade() throws Exception {
MDriver driver = getDriver();
handler.registerDriver(driver, getDerbyDatabaseConnection());
final String lowerVersion = Integer.toString(Integer
.parseInt(DriverBean.CURRENT_DRIVER_VERSION) - 1);
assertEquals(CURRENT_DRIVER_VERSION, getDriverVersion());
runQuery("UPDATE SQOOP.SQ_SYSTEM SET SQM_VALUE='" + lowerVersion + "' WHERE SQM_KEY = '"
+ DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION + "'");
assertEquals(lowerVersion, getDriverVersion());
handler.upgradeDriverConfigs(driver, getDerbyDatabaseConnection());
assertEquals(CURRENT_DRIVER_VERSION, driver.getVersion());
assertEquals(CURRENT_DRIVER_VERSION, getDriverVersion());
String registeredDriverVersion = handler.findDriver(MDriver.DRIVER_NAME, getDerbyDatabaseConnection()).getVersion();
assertEquals(CURRENT_DRIVER_VERSION, registeredDriverVersion);
driver.setVersion("2");
handler.upgradeDriverAndConfigs(driver, getDerbyDatabaseConnection());
assertEquals("2", driver.getVersion());
}
}

View File

@ -32,6 +32,7 @@
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MDriver;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MMapInput;
import org.apache.sqoop.model.MStringInput;
@ -298,7 +299,7 @@ public MJob getJob() {
return new MJob(1, 1, 1, 1,
handler.findConnector("A", derbyConnection).getFromConfig(),
handler.findConnector("A", derbyConnection).getToConfig(),
handler.findDriver(derbyConnection).getDriverConfig()
handler.findDriver(MDriver.DRIVER_NAME, derbyConnection).getDriverConfig()
);
}
}

View File

@ -69,25 +69,20 @@
import org.json.simple.JSONValue;
/**
* Load user-created content of Sqoop repository from a JSON formatted file
* The loaded connector IDs will be modified to match existing connectors
* Load user-created content of Sqoop repository from a JSON formatted file The
* loaded connector IDs will be modified to match existing connectors
*/
public class RepositoryLoadTool extends ConfiguredTool {
public static final Logger LOG = Logger.getLogger(RepositoryLoadTool.class);
@SuppressWarnings("static-access")
@Override
public boolean runToolWithConfiguration(String[] arguments) {
Options options = new Options();
options.addOption(OptionBuilder.isRequired()
.hasArg()
.withArgName("filename")
.withLongOpt("input")
.create('i'));
options.addOption(OptionBuilder.isRequired().hasArg().withArgName("filename")
.withLongOpt("input").create('i'));
CommandLineParser parser = new GnuParser();
@ -98,8 +93,7 @@ public boolean runToolWithConfiguration(String[] arguments) {
LOG.info("Reading JSON from file" + inputFileName);
InputStream input = new FileInputStream(inputFileName);
String jsonTxt = IOUtils.toString(input, Charsets.UTF_8);
JSONObject json =
(JSONObject) JSONValue.parse(jsonTxt);
JSONObject json = (JSONObject) JSONValue.parse(jsonTxt);
boolean res = load(json);
input.close();
return res;
@ -114,23 +108,23 @@ public boolean runToolWithConfiguration(String[] arguments) {
return false;
} catch (ParseException e) {
LOG.error("Error parsing command line arguments:", e);
System.out.println("Error parsing command line arguments. Please check Server logs for details.");
System.out
.println("Error parsing command line arguments. Please check Server logs for details.");
return false;
}
}
private boolean load(JSONObject repo) {
// Validate that loading JSON into repository is supported
JSONObject metadata = (JSONObject) repo.get(JSONConstants.METADATA);
if (metadata == null) {
LOG.error("Malformed JSON. Key "+ JSONConstants.METADATA + " not found.");
LOG.error("Malformed JSON. Key " + JSONConstants.METADATA + " not found.");
return false;
}
if (!validateMetadata(metadata)){
if (!validateMetadata(metadata)) {
LOG.error("Metadata of repository dump file failed validation (see error above for cause). Aborting repository load.");
return false;
}
@ -140,29 +134,29 @@ private boolean load(JSONObject repo) {
Repository repository = RepositoryManager.getInstance().getRepository();
ConnectorManager.getInstance().initialize();
LOG.info("Loading Connections");
JSONObject jsonConns = (JSONObject) repo.get(JSONConstants.LINKS);
if (jsonConns == null) {
LOG.error("Malformed JSON file. Key "+ JSONConstants.LINKS + " not found.");
LOG.error("Malformed JSON file. Key " + JSONConstants.LINKS + " not found.");
return false;
}
LinkBean linkBean = new LinkBean();
linkBean.restore(updateConnectorIDUsingName(jsonConns));
HashMap<Long,Long> connectionIds = new HashMap<Long, Long>();
HashMap<Long, Long> connectionIds = new HashMap<Long, Long>();
for (MLink link : linkBean.getLinks()) {
long oldId = link.getPersistenceId();
long newId = loadLink(link);
if (newId == link.PERSISTANCE_ID_DEFAULT) {
LOG.error("loading connection " + link.getName() + " with previous ID " + oldId + " failed. Aborting repository load. Check log for details.");
LOG.error("loading connection " + link.getName() + " with previous ID " + oldId
+ " failed. Aborting repository load. Check log for details.");
return false;
}
connectionIds.put(oldId,newId);
connectionIds.put(oldId, newId);
}
LOG.info("Loaded " + connectionIds.size() + " connections");
@ -170,23 +164,25 @@ private boolean load(JSONObject repo) {
JSONObject jsonJobs = (JSONObject) repo.get(JSONConstants.JOBS);
if (jsonJobs == null) {
LOG.error("Malformed JSON file. Key "+ JSONConstants.JOBS + " not found.");
LOG.error("Malformed JSON file. Key " + JSONConstants.JOBS + " not found.");
return false;
}
JobBean jobBean = new JobBean();
jobBean.restore(updateIdUsingMap(updateConnectorIDUsingName(jsonJobs), connectionIds,JSONConstants.LINK_ID));
jobBean.restore(updateIdUsingMap(updateConnectorIDUsingName(jsonJobs), connectionIds,
JSONConstants.LINK_ID));
HashMap<Long,Long> jobIds = new HashMap<Long, Long>();
for (MJob job: jobBean.getJobs()) {
HashMap<Long, Long> jobIds = new HashMap<Long, Long>();
for (MJob job : jobBean.getJobs()) {
long oldId = job.getPersistenceId();
long newId = loadJob(job);
if (newId == job.PERSISTANCE_ID_DEFAULT) {
LOG.error("loading job " + job.getName() + " failed. Aborting repository load. Check log for details.");
LOG.error("loading job " + job.getName()
+ " failed. Aborting repository load. Check log for details.");
return false;
}
jobIds.put(oldId,newId);
jobIds.put(oldId, newId);
}
LOG.info("Loaded " + jobIds.size() + " jobs");
@ -195,14 +191,14 @@ private boolean load(JSONObject repo) {
JSONObject jsonSubmissions = (JSONObject) repo.get(JSONConstants.SUBMISSIONS);
if (jsonSubmissions == null) {
LOG.error("Malformed JSON file. Key "+ JSONConstants.SUBMISSIONS + " not found.");
LOG.error("Malformed JSON file. Key " + JSONConstants.SUBMISSIONS + " not found.");
return false;
}
SubmissionBean submissionBean = new SubmissionBean();
submissionBean.restore(updateIdUsingMap(jsonSubmissions,jobIds,JSONConstants.JOB_ID));
submissionBean.restore(updateIdUsingMap(jsonSubmissions, jobIds, JSONConstants.JOB_ID));
int submissionCount = 0;
for (MSubmission submission: submissionBean.getSubmissions()) {
for (MSubmission submission : submissionBean.getSubmissions()) {
resetPersistenceId(submission);
repository.createSubmission(submission);
submissionCount++;
@ -216,12 +212,10 @@ private void resetPersistenceId(MPersistableEntity ent) {
ent.setPersistenceId(ent.PERSISTANCE_ID_DEFAULT);
}
/**
* Even though the metadata contains version, revision, compile-date and compile-user
* We are only validating that version match for now.
* More interesting logic can be added later
* Even though the metadata contains version, revision, compile-date and
* compile-user We are only validating that version match for now. More
* interesting logic can be added later
*/
private boolean validateMetadata(JSONObject metadata) {
String jsonVersion = (String) metadata.get(JSONConstants.VERSION);
@ -229,13 +223,14 @@ private boolean validateMetadata(JSONObject metadata) {
String repoVersion = VersionInfo.getVersion();
if (!jsonVersion.equals(repoVersion)) {
LOG.error("Repository version in file (" + jsonVersion + ") does not match this version of Sqoop (" + repoVersion + ")");
LOG.error("Repository version in file (" + jsonVersion
+ ") does not match this version of Sqoop (" + repoVersion + ")");
return false;
}
if (!includeSensitive) {
LOG.warn("Loading repository which was dumped without --include-sensitive=true. " +
"This means some sensitive information such as passwords is not included in the dump file and will need to be manually added later.");
LOG.warn("Loading repository which was dumped without --include-sensitive=true. "
+ "This means some sensitive information such as passwords is not included in the dump file and will need to be manually added later.");
}
return true;
@ -243,7 +238,7 @@ private boolean validateMetadata(JSONObject metadata) {
private long loadLink(MLink link) {
//starting by pretending we have a brand new connection
// starting by pretending we have a brand new connection
resetPersistenceId(link);
Repository repository = RepositoryManager.getInstance().getRepository();
@ -262,11 +257,9 @@ private long loadLink(MLink link) {
SqoopConnector connector = ConnectorManager.getInstance().getSqoopConnector(
link.getConnectorId());
Object connectorConfig = ClassUtils.instantiate(
connector.getLinkConfigurationClass());
Object connectorConfig = ClassUtils.instantiate(connector.getLinkConfigurationClass());
ConfigUtils.fromConfigs(
link.getConnectorLinkConfig().getConfigs(), connectorConfig);
ConfigUtils.fromConfigs(link.getConnectorLinkConfig().getConfigs(), connectorConfig);
ConfigValidationRunner validationRunner = new ConfigValidationRunner();
ConfigValidationResult result = validationRunner.validate(connectorConfig);
@ -284,7 +277,7 @@ private long loadLink(MLink link) {
}
private long loadJob(MJob job) {
//starting by pretending we have a brand new job
// starting by pretending we have a brand new job
resetPersistenceId(job);
MConnector mFromConnector = ConnectorManager.getInstance().getConnectorConfigurable(job.getFromConnectorId());
MConnector mToConnector = ConnectorManager.getInstance().getConnectorConfigurable(job.getToConnectorId());
@ -329,7 +322,8 @@ private long loadJob(MJob job) {
job.getDriverConfig().getConfigs(), driverConfig);
ConfigValidationRunner validationRunner = new ConfigValidationRunner();
ConfigValidationResult fromConnectorConfigResult = validationRunner.validate(fromConnectorConfig);
ConfigValidationResult fromConnectorConfigResult = validationRunner
.validate(fromConnectorConfig);
ConfigValidationResult toConnectorConfigResult = validationRunner.validate(toConnectorConfig);
ConfigValidationResult driverConfigResult = validationRunner.validate(driverConfig);
@ -341,17 +335,17 @@ private long loadJob(MJob job) {
} else {
LOG.error("Failed to load job:" + job.getName());
LOG.error("Status of from connector configs:" + fromConnectorConfigResult.getStatus().toString());
LOG.error("Status of from connector configs:"
+ fromConnectorConfigResult.getStatus().toString());
LOG.error("Status of to connector configs:" + toConnectorConfigResult.getStatus().toString());
LOG.error("Status of driver configs:" + driverConfigResult.getStatus().toString());
}
return newJob.getPersistenceId();
}
private JSONObject updateConnectorIDUsingName( JSONObject json) {
private JSONObject updateConnectorIDUsingName(JSONObject json) {
JSONArray array = (JSONArray) json.get(JSONConstants.ALL);
Repository repository = RepositoryManager.getInstance().getRepository();
@ -370,11 +364,10 @@ private JSONObject updateConnectorIDUsingName( JSONObject json) {
long currentConnectorId = connectorMap.get(connectorName);
String connectionName = (String) object.get(JSONConstants.NAME);
// If a given connector now has a different ID, we need to update the ID
if (connectorId != currentConnectorId) {
LOG.warn("Connection " + connectionName + " uses connector " + connectorName + ". " +
"Replacing previous ID " + connectorId + " with new ID " + currentConnectorId);
LOG.warn("Connection " + connectionName + " uses connector " + connectorName + ". "
+ "Replacing previous ID " + connectorId + " with new ID " + currentConnectorId);
object.put(JSONConstants.CONNECTOR_ID, currentConnectorId);
}
@ -382,7 +375,7 @@ private JSONObject updateConnectorIDUsingName( JSONObject json) {
return json;
}
private JSONObject updateIdUsingMap(JSONObject json, HashMap<Long,Long> idMap, String fieldName) {
private JSONObject updateIdUsingMap(JSONObject json, HashMap<Long, Long> idMap, String fieldName) {
JSONArray array = (JSONArray) json.get(JSONConstants.ALL);
for (Object obj : array) {
@ -394,6 +387,4 @@ private JSONObject updateIdUsingMap(JSONObject json, HashMap<Long,Long> idMap, S
return json;
}
}