mirror of
https://github.com/apache/sqoop.git
synced 2025-05-15 16:31:17 +08:00
SQOOP-1732: Sqoop2: Add version to connector upgrade API
(Abraham Fine via Jarek Jarcec Cecho)
This commit is contained in:
parent
72a9d43831
commit
1ea3db992b
@ -122,7 +122,7 @@ public To getTo() {
|
|||||||
* @return configurable upgrader object
|
* @return configurable upgrader object
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
|
public ConnectorConfigurableUpgrader getConfigurableUpgrader(String oldConnectorVersion) {
|
||||||
return new FtpConnectorUpgrader();
|
return new FtpConnectorUpgrader();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,7 +91,7 @@ public To getTo() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
|
public ConnectorConfigurableUpgrader getConfigurableUpgrader(String oldConnectorVersion) {
|
||||||
return new GenericJdbcConnectorUpgrader();
|
return new GenericJdbcConnectorUpgrader();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,7 +118,7 @@ public To getTo() {
|
|||||||
* @return configurable upgrader object
|
* @return configurable upgrader object
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
|
public ConnectorConfigurableUpgrader getConfigurableUpgrader(String oldConnectorVersion) {
|
||||||
return new HdfsConnectorUpgrader();
|
return new HdfsConnectorUpgrader();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -107,7 +107,7 @@ public To getTo() {
|
|||||||
* @return ConnectorConfigurableUpgrader object
|
* @return ConnectorConfigurableUpgrader object
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
|
public ConnectorConfigurableUpgrader getConfigurableUpgrader(String oldConnectorVersion) {
|
||||||
// Nothing to upgrade at this point
|
// Nothing to upgrade at this point
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -88,7 +88,7 @@ public To getTo() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
|
public ConnectorConfigurableUpgrader getConfigurableUpgrader(String oldConnectorVersion) {
|
||||||
return new KiteConnectorUpgrader();
|
return new KiteConnectorUpgrader();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,7 +87,7 @@ public List<Direction> getSupportedDirections() {
|
|||||||
* configs related to the link and job
|
* configs related to the link and job
|
||||||
* @return ConnectorConfigurableUpgrader object
|
* @return ConnectorConfigurableUpgrader object
|
||||||
*/
|
*/
|
||||||
public abstract ConnectorConfigurableUpgrader getConfigurableUpgrader();
|
public abstract ConnectorConfigurableUpgrader getConfigurableUpgrader(String oldConnectorVersion);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the {@linkplain IntermediateDataFormat} this connector
|
* Returns the {@linkplain IntermediateDataFormat} this connector
|
||||||
|
@ -122,7 +122,7 @@ public To getTo() {
|
|||||||
* @return configurable upgrader object
|
* @return configurable upgrader object
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
|
public ConnectorConfigurableUpgrader getConfigurableUpgrader(String oldConnectorVersion) {
|
||||||
return new SftpConnectorUpgrader();
|
return new SftpConnectorUpgrader();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,7 +142,7 @@ public synchronized void destroy() {
|
|||||||
LOG.trace("Begin Driver destroy");
|
LOG.trace("Begin Driver destroy");
|
||||||
}
|
}
|
||||||
|
|
||||||
public DriverUpgrader getConfigurableUpgrader() {
|
public DriverUpgrader getConfigurableUpgrader(String oldDriverVersion) {
|
||||||
return driverUpgrader;
|
return driverUpgrader;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -243,7 +243,7 @@ public Object doIt(Connection conn) {
|
|||||||
if(!mDriver.equals(existingDriver)) {
|
if(!mDriver.equals(existingDriver)) {
|
||||||
if (autoUpgrade) {
|
if (autoUpgrade) {
|
||||||
mDriver.setPersistenceId(existingDriver.getPersistenceId());
|
mDriver.setPersistenceId(existingDriver.getPersistenceId());
|
||||||
upgradeDriver(mDriver);
|
upgradeDriver(mDriver, existingDriver.getVersion());
|
||||||
return mDriver;
|
return mDriver;
|
||||||
} else {
|
} else {
|
||||||
throw new SqoopException(RepositoryError.JDBCREPO_0026,
|
throw new SqoopException(RepositoryError.JDBCREPO_0026,
|
||||||
|
@ -448,6 +448,7 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
|
|||||||
LOG.info("Upgrading connector: " + oldConnector.getUniqueName());
|
LOG.info("Upgrading connector: " + oldConnector.getUniqueName());
|
||||||
long connectorId = oldConnector.getPersistenceId();
|
long connectorId = oldConnector.getPersistenceId();
|
||||||
String connectorName = oldConnector.getUniqueName();
|
String connectorName = oldConnector.getUniqueName();
|
||||||
|
String oldVersion = oldConnector.getVersion();
|
||||||
newConnector.setPersistenceId(connectorId);
|
newConnector.setPersistenceId(connectorId);
|
||||||
|
|
||||||
RepositoryTransaction tx = null;
|
RepositoryTransaction tx = null;
|
||||||
@ -457,7 +458,7 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
|
|||||||
|
|
||||||
boolean upgradeSuccessful = true;
|
boolean upgradeSuccessful = true;
|
||||||
// 1. Get an upgrader for the connector
|
// 1. Get an upgrader for the connector
|
||||||
ConnectorConfigurableUpgrader upgrader = connector.getConfigurableUpgrader();
|
ConnectorConfigurableUpgrader upgrader = connector.getConfigurableUpgrader(oldVersion);
|
||||||
// 2. Get all links associated with the connector.
|
// 2. Get all links associated with the connector.
|
||||||
List<MLink> existingLinksByConnector = findLinksForConnectorUpgrade(connectorName);
|
List<MLink> existingLinksByConnector = findLinksForConnectorUpgrade(connectorName);
|
||||||
// 3. Get all jobs associated with the connector.
|
// 3. Get all jobs associated with the connector.
|
||||||
@ -606,12 +607,12 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public final void upgradeDriver(MDriver driver) {
|
public final void upgradeDriver(MDriver driver, String oldDriverVersion) {
|
||||||
LOG.info("Upgrading driver");
|
LOG.info("Upgrading driver");
|
||||||
RepositoryTransaction tx = null;
|
RepositoryTransaction tx = null;
|
||||||
try {
|
try {
|
||||||
//1. find upgrader
|
//1. find upgrader
|
||||||
DriverUpgrader upgrader = Driver.getInstance().getConfigurableUpgrader();
|
DriverUpgrader upgrader = Driver.getInstance().getConfigurableUpgrader(oldDriverVersion);
|
||||||
//2. find all jobs in the system
|
//2. find all jobs in the system
|
||||||
List<MJob> existingJobs = findJobs();
|
List<MJob> existingJobs = findJobs();
|
||||||
boolean upgradeSuccessful = true;
|
boolean upgradeSuccessful = true;
|
||||||
|
@ -214,7 +214,7 @@ public void testConnectorConfigUpgradeWithValidLinksAndJobs() {
|
|||||||
|
|
||||||
// prepare the sqoop connector
|
// prepare the sqoop connector
|
||||||
SqoopConnector sqconnector = mock(SqoopConnector.class);
|
SqoopConnector sqconnector = mock(SqoopConnector.class);
|
||||||
when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
|
when(sqconnector.getConfigurableUpgrader(oldConnector.getVersion())).thenReturn(connectorUpgraderMock);
|
||||||
when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyConfiguration.class);
|
when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyConfiguration.class);
|
||||||
when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(EmptyConfiguration.class);
|
when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(EmptyConfiguration.class);
|
||||||
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
||||||
@ -268,7 +268,7 @@ public void testConnectorConfigUpgradeWithValidLinksAndJobs() {
|
|||||||
public void testDriverConfigUpgradeWithValidJobs() {
|
public void testDriverConfigUpgradeWithValidJobs() {
|
||||||
MDriver newDriverConfig = driver();
|
MDriver newDriverConfig = driver();
|
||||||
|
|
||||||
when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock);
|
when(driverMock.getConfigurableUpgrader(DriverBean.CURRENT_DRIVER_VERSION)).thenReturn(driverUpgraderMock);
|
||||||
when(driverMock.getDriverJobConfigurationClass()).thenReturn(ValidConfiguration.class);
|
when(driverMock.getDriverJobConfigurationClass()).thenReturn(ValidConfiguration.class);
|
||||||
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
||||||
|
|
||||||
@ -277,7 +277,7 @@ public void testDriverConfigUpgradeWithValidJobs() {
|
|||||||
doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
|
doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
|
||||||
doNothing().when(repoSpy).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class));
|
doNothing().when(repoSpy).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class));
|
||||||
|
|
||||||
repoSpy.upgradeDriver(newDriverConfig);
|
repoSpy.upgradeDriver(newDriverConfig, DriverBean.CURRENT_DRIVER_VERSION);
|
||||||
|
|
||||||
InOrder repoOrder = inOrder(repoSpy);
|
InOrder repoOrder = inOrder(repoSpy);
|
||||||
InOrder txOrder = inOrder(repoTransactionMock);
|
InOrder txOrder = inOrder(repoTransactionMock);
|
||||||
@ -306,7 +306,7 @@ public void testDriverConfigUpgradeWithValidJobs() {
|
|||||||
public void testDriverConfigUpgradeWithInvalidJobs() {
|
public void testDriverConfigUpgradeWithInvalidJobs() {
|
||||||
MDriver newDriverConfig = driver();
|
MDriver newDriverConfig = driver();
|
||||||
|
|
||||||
when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock);
|
when(driverMock.getConfigurableUpgrader(DriverBean.CURRENT_DRIVER_VERSION)).thenReturn(driverUpgraderMock);
|
||||||
when(driverMock.getDriverJobConfigurationClass()).thenReturn(InvalidConfiguration.class);
|
when(driverMock.getDriverJobConfigurationClass()).thenReturn(InvalidConfiguration.class);
|
||||||
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
||||||
|
|
||||||
@ -315,7 +315,7 @@ public void testDriverConfigUpgradeWithInvalidJobs() {
|
|||||||
doNothing().when(repoSpy).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class));
|
doNothing().when(repoSpy).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
repoSpy.upgradeDriver(newDriverConfig);
|
repoSpy.upgradeDriver(newDriverConfig, DriverBean.CURRENT_DRIVER_VERSION);
|
||||||
} catch (SqoopException ex) {
|
} catch (SqoopException ex) {
|
||||||
assertEquals(ex.getErrorCode(), RepositoryError.JDBCREPO_0027);
|
assertEquals(ex.getErrorCode(), RepositoryError.JDBCREPO_0027);
|
||||||
|
|
||||||
@ -351,7 +351,7 @@ public void testConnectorConfigUpgradeHandlerWithFindLinksForConnectorError() {
|
|||||||
MConnector oldConnector = connector(1);
|
MConnector oldConnector = connector(1);
|
||||||
|
|
||||||
SqoopConnector sqconnector = mock(SqoopConnector.class);
|
SqoopConnector sqconnector = mock(SqoopConnector.class);
|
||||||
when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
|
when(sqconnector.getConfigurableUpgrader(oldConnector.getVersion())).thenReturn(connectorUpgraderMock);
|
||||||
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
||||||
|
|
||||||
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
|
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
|
||||||
@ -380,7 +380,7 @@ public void testConnectorConfigUpgradeHandlerWithFindJobsForConnectorError() {
|
|||||||
MConnector oldConnector = connector(1);
|
MConnector oldConnector = connector(1);
|
||||||
|
|
||||||
SqoopConnector sqconnector = mock(SqoopConnector.class);
|
SqoopConnector sqconnector = mock(SqoopConnector.class);
|
||||||
when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
|
when(sqconnector.getConfigurableUpgrader(oldConnector.getVersion())).thenReturn(connectorUpgraderMock);
|
||||||
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
||||||
|
|
||||||
List<MLink> linkList = links(link(1, "LA", 1), link(2, "LB", 1));
|
List<MLink> linkList = links(link(1, "LA", 1), link(2, "LB", 1));
|
||||||
@ -414,7 +414,7 @@ public void testConnectorConfigUpgradeHandlerWithDeleteJobInputsError() {
|
|||||||
MConnector oldConnector = connector(1);
|
MConnector oldConnector = connector(1);
|
||||||
|
|
||||||
SqoopConnector sqconnector = mock(SqoopConnector.class);
|
SqoopConnector sqconnector = mock(SqoopConnector.class);
|
||||||
when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
|
when(sqconnector.getConfigurableUpgrader(oldConnector.getVersion())).thenReturn(connectorUpgraderMock);
|
||||||
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
||||||
|
|
||||||
List<MLink> linkList = links(link(1, "LA", 1), link(2, "LB", 1));
|
List<MLink> linkList = links(link(1, "LA", 1), link(2, "LB", 1));
|
||||||
@ -452,7 +452,7 @@ public void testConnectorConfigUpgradeHandlerWithDeleteLinkInputsError() {
|
|||||||
MConnector oldConnector = connector(1);
|
MConnector oldConnector = connector(1);
|
||||||
|
|
||||||
SqoopConnector sqconnector = mock(SqoopConnector.class);
|
SqoopConnector sqconnector = mock(SqoopConnector.class);
|
||||||
when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
|
when(sqconnector.getConfigurableUpgrader(oldConnector.getVersion())).thenReturn(connectorUpgraderMock);
|
||||||
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
||||||
|
|
||||||
List<MLink> linkList = links(link(1, "LA", 1), link(2, "LB", 1));
|
List<MLink> linkList = links(link(1, "LA", 1), link(2, "LB", 1));
|
||||||
@ -490,7 +490,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateConnectorError() {
|
|||||||
MConnector oldConnector = connector(1);
|
MConnector oldConnector = connector(1);
|
||||||
|
|
||||||
SqoopConnector sqconnector = mock(SqoopConnector.class);
|
SqoopConnector sqconnector = mock(SqoopConnector.class);
|
||||||
when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
|
when(sqconnector.getConfigurableUpgrader(oldConnector.getVersion())).thenReturn(connectorUpgraderMock);
|
||||||
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
||||||
|
|
||||||
List<MLink> linkList = links(link(1, "LA", 1), link(2, "LB", 1));
|
List<MLink> linkList = links(link(1, "LA", 1), link(2, "LB", 1));
|
||||||
@ -530,7 +530,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateLinkError() {
|
|||||||
MConnector oldConnector = connector(1);
|
MConnector oldConnector = connector(1);
|
||||||
|
|
||||||
SqoopConnector sqconnector = mock(SqoopConnector.class);
|
SqoopConnector sqconnector = mock(SqoopConnector.class);
|
||||||
when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
|
when(sqconnector.getConfigurableUpgrader(oldConnector.getVersion())).thenReturn(connectorUpgraderMock);
|
||||||
when(sqconnector.getLinkConfigurationClass()).thenReturn(ValidConfiguration.class);
|
when(sqconnector.getLinkConfigurationClass()).thenReturn(ValidConfiguration.class);
|
||||||
when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(ValidConfiguration.class);
|
when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(ValidConfiguration.class);
|
||||||
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
||||||
@ -576,7 +576,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateJobError() {
|
|||||||
MConnector oldConnector = connector(1);
|
MConnector oldConnector = connector(1);
|
||||||
|
|
||||||
SqoopConnector sqconnector = mock(SqoopConnector.class);
|
SqoopConnector sqconnector = mock(SqoopConnector.class);
|
||||||
when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
|
when(sqconnector.getConfigurableUpgrader(oldConnector.getVersion())).thenReturn(connectorUpgraderMock);
|
||||||
when(sqconnector.getLinkConfigurationClass()).thenReturn(ValidConfiguration.class);
|
when(sqconnector.getLinkConfigurationClass()).thenReturn(ValidConfiguration.class);
|
||||||
when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(ValidConfiguration.class);
|
when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(ValidConfiguration.class);
|
||||||
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
|
||||||
@ -624,14 +624,14 @@ public void testConnectorConfigUpgradeHandlerWithUpdateJobError() {
|
|||||||
public void testDriverConfigUpgradeHandlerWithFindJobsError() {
|
public void testDriverConfigUpgradeHandlerWithFindJobsError() {
|
||||||
MDriver newDriverConfig = driver();
|
MDriver newDriverConfig = driver();
|
||||||
|
|
||||||
when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock);
|
when(driverMock.getConfigurableUpgrader(DriverBean.CURRENT_DRIVER_VERSION)).thenReturn(driverUpgraderMock);
|
||||||
|
|
||||||
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
|
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
|
||||||
"find jobs error.");
|
"find jobs error.");
|
||||||
doThrow(exception).when(repoHandlerMock).findJobs(any(Connection.class));
|
doThrow(exception).when(repoHandlerMock).findJobs(any(Connection.class));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
repoSpy.upgradeDriver(newDriverConfig);
|
repoSpy.upgradeDriver(newDriverConfig, "1.0");
|
||||||
} catch (SqoopException ex) {
|
} catch (SqoopException ex) {
|
||||||
assertEquals(ex.getMessage(), exception.getMessage());
|
assertEquals(ex.getMessage(), exception.getMessage());
|
||||||
verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
|
verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
|
||||||
@ -650,7 +650,7 @@ public void testDriverConfigUpgradeHandlerWithFindJobsError() {
|
|||||||
public void testDriverConfigUpgradeHandlerWithDeleteJobInputsError() {
|
public void testDriverConfigUpgradeHandlerWithDeleteJobInputsError() {
|
||||||
MDriver newDriverConfig = driver();
|
MDriver newDriverConfig = driver();
|
||||||
|
|
||||||
when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock);
|
when(driverMock.getConfigurableUpgrader(DriverBean.CURRENT_DRIVER_VERSION)).thenReturn(driverUpgraderMock);
|
||||||
|
|
||||||
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
||||||
doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
|
doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
|
||||||
@ -660,7 +660,7 @@ public void testDriverConfigUpgradeHandlerWithDeleteJobInputsError() {
|
|||||||
doThrow(exception).when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class));
|
doThrow(exception).when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
repoSpy.upgradeDriver(newDriverConfig);
|
repoSpy.upgradeDriver(newDriverConfig, "1.0");
|
||||||
} catch (SqoopException ex) {
|
} catch (SqoopException ex) {
|
||||||
assertEquals(ex.getMessage(), exception.getMessage());
|
assertEquals(ex.getMessage(), exception.getMessage());
|
||||||
verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
|
verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
|
||||||
@ -680,7 +680,7 @@ public void testDriverConfigUpgradeHandlerWithDeleteJobInputsError() {
|
|||||||
public void testDriverConfigUpgradeHandlerWithUpdateDriverConfigError() {
|
public void testDriverConfigUpgradeHandlerWithUpdateDriverConfigError() {
|
||||||
MDriver newDriverConfig = driver();
|
MDriver newDriverConfig = driver();
|
||||||
|
|
||||||
when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock);
|
when(driverMock.getConfigurableUpgrader(DriverBean.CURRENT_DRIVER_VERSION)).thenReturn(driverUpgraderMock);
|
||||||
|
|
||||||
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
||||||
doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
|
doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
|
||||||
@ -692,7 +692,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateDriverConfigError() {
|
|||||||
doThrow(exception).when(repoHandlerMock).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class));
|
doThrow(exception).when(repoHandlerMock).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
repoSpy.upgradeDriver(newDriverConfig);
|
repoSpy.upgradeDriver(newDriverConfig, "1.0");
|
||||||
} catch (SqoopException ex) {
|
} catch (SqoopException ex) {
|
||||||
assertEquals(ex.getMessage(), exception.getMessage());
|
assertEquals(ex.getMessage(), exception.getMessage());
|
||||||
verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
|
verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
|
||||||
@ -714,7 +714,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateDriverConfigError() {
|
|||||||
public void testDriverConfigUpgradeHandlerWithUpdateJobError() {
|
public void testDriverConfigUpgradeHandlerWithUpdateJobError() {
|
||||||
MDriver driverConfig = driver();
|
MDriver driverConfig = driver();
|
||||||
|
|
||||||
when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock);
|
when(driverMock.getConfigurableUpgrader(DriverBean.CURRENT_DRIVER_VERSION)).thenReturn(driverUpgraderMock);
|
||||||
when(driverMock.getDriverJobConfigurationClass()).thenReturn(ValidConfiguration.class);
|
when(driverMock.getDriverJobConfigurationClass()).thenReturn(ValidConfiguration.class);
|
||||||
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1));
|
||||||
doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
|
doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
|
||||||
@ -727,7 +727,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateJobError() {
|
|||||||
doThrow(exception).when(repoHandlerMock).updateJob(any(MJob.class), any(Connection.class));
|
doThrow(exception).when(repoHandlerMock).updateJob(any(MJob.class), any(Connection.class));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
repoSpy.upgradeDriver(driverConfig);
|
repoSpy.upgradeDriver(driverConfig, DriverBean.CURRENT_DRIVER_VERSION);
|
||||||
} catch (SqoopException ex) {
|
} catch (SqoopException ex) {
|
||||||
assertEquals(ex.getMessage(), exception.getMessage());
|
assertEquals(ex.getMessage(), exception.getMessage());
|
||||||
verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
|
verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
|
||||||
|
@ -54,7 +54,7 @@ The ``SqoopConnector`` class defines an API for the connectors that must be impl
|
|||||||
public abstract Class getJobConfigurationClass(Direction direction);
|
public abstract Class getJobConfigurationClass(Direction direction);
|
||||||
public abstract From getFrom();
|
public abstract From getFrom();
|
||||||
public abstract To getTo();
|
public abstract To getTo();
|
||||||
public abstract ConnectorConfigurableUpgrader getConfigurableUpgrader()
|
public abstract ConnectorConfigurableUpgrader getConfigurableUpgrader(String oldConnectorVersion)
|
||||||
|
|
||||||
Connectors can optionally override the following methods:
|
Connectors can optionally override the following methods:
|
||||||
::
|
::
|
||||||
|
@ -78,7 +78,7 @@ public To getTo() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
|
public ConnectorConfigurableUpgrader getConfigurableUpgrader(String oldConnectorVersion) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -263,6 +263,14 @@ private boolean validateMetadata(JSONObject metadata) {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We currently pass through null as the old connector version because we do
|
||||||
|
* not have a good way of determining what the old version of the connector is
|
||||||
|
* here.
|
||||||
|
*
|
||||||
|
* According to Jarcec, this chunk of code will receive some much needed
|
||||||
|
* attention in the near future and this will be fixed.
|
||||||
|
*/
|
||||||
private long loadLink(MLink link) {
|
private long loadLink(MLink link) {
|
||||||
|
|
||||||
// starting by pretending we have a brand new link
|
// starting by pretending we have a brand new link
|
||||||
@ -271,7 +279,7 @@ private long loadLink(MLink link) {
|
|||||||
Repository repository = RepositoryManager.getInstance().getRepository();
|
Repository repository = RepositoryManager.getInstance().getRepository();
|
||||||
|
|
||||||
MConnector mConnector = ConnectorManager.getInstance().getConnectorConfigurable(link.getConnectorId());
|
MConnector mConnector = ConnectorManager.getInstance().getConnectorConfigurable(link.getConnectorId());
|
||||||
ConnectorConfigurableUpgrader connectorConfigUpgrader = ConnectorManager.getInstance().getSqoopConnector(mConnector.getUniqueName()).getConfigurableUpgrader();
|
ConnectorConfigurableUpgrader connectorConfigUpgrader = ConnectorManager.getInstance().getSqoopConnector(mConnector.getUniqueName()).getConfigurableUpgrader(null);
|
||||||
|
|
||||||
List<MConfig> connectorConfigs = mConnector.getLinkConfig().clone(false).getConfigs();
|
List<MConfig> connectorConfigs = mConnector.getLinkConfig().clone(false).getConfigs();
|
||||||
List<MValidator> connectorValidators = mConnector.getLinkConfig().getCloneOfValidators();
|
List<MValidator> connectorValidators = mConnector.getLinkConfig().getCloneOfValidators();
|
||||||
@ -304,6 +312,15 @@ private long loadLink(MLink link) {
|
|||||||
return newLink.getPersistenceId();
|
return newLink.getPersistenceId();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We currently pass through null as the old connector version because we do
|
||||||
|
* not have a good way of determining what the old version of the connector is
|
||||||
|
* here.
|
||||||
|
*
|
||||||
|
* According to Jarcec, this chunk of code will receive some much needed
|
||||||
|
* attention in the near future and this will be fixed.
|
||||||
|
*/
|
||||||
private long loadJob(MJob job) {
|
private long loadJob(MJob job) {
|
||||||
// starting by pretending we have a brand new job
|
// starting by pretending we have a brand new job
|
||||||
resetPersistenceId(job);
|
resetPersistenceId(job);
|
||||||
@ -313,14 +330,14 @@ private long loadJob(MJob job) {
|
|||||||
MFromConfig fromConfig = job.getFromJobConfig();
|
MFromConfig fromConfig = job.getFromJobConfig();
|
||||||
MToConfig toConfig = job.getToJobConfig();
|
MToConfig toConfig = job.getToJobConfig();
|
||||||
|
|
||||||
ConnectorConfigurableUpgrader fromConnectorConfigUpgrader = ConnectorManager.getInstance().getSqoopConnector(mFromConnector.getUniqueName()).getConfigurableUpgrader();
|
ConnectorConfigurableUpgrader fromConnectorConfigUpgrader = ConnectorManager.getInstance().getSqoopConnector(mFromConnector.getUniqueName()).getConfigurableUpgrader(null);
|
||||||
ConnectorConfigurableUpgrader toConnectorConfigUpgrader = ConnectorManager.getInstance().getSqoopConnector(mToConnector.getUniqueName()).getConfigurableUpgrader();
|
ConnectorConfigurableUpgrader toConnectorConfigUpgrader = ConnectorManager.getInstance().getSqoopConnector(mToConnector.getUniqueName()).getConfigurableUpgrader(null);
|
||||||
|
|
||||||
fromConnectorConfigUpgrader.upgradeFromJobConfig(job.getFromJobConfig(), fromConfig);
|
fromConnectorConfigUpgrader.upgradeFromJobConfig(job.getFromJobConfig(), fromConfig);
|
||||||
|
|
||||||
toConnectorConfigUpgrader.upgradeToJobConfig(job.getToJobConfig(), toConfig);
|
toConnectorConfigUpgrader.upgradeToJobConfig(job.getToJobConfig(), toConfig);
|
||||||
|
|
||||||
DriverUpgrader driverConfigUpgrader = Driver.getInstance().getConfigurableUpgrader();
|
DriverUpgrader driverConfigUpgrader = Driver.getInstance().getConfigurableUpgrader(null);
|
||||||
MDriver driver = Driver.getInstance().getDriver();
|
MDriver driver = Driver.getInstance().getDriver();
|
||||||
MDriverConfig driverConfigs = driver.getDriverConfig();
|
MDriverConfig driverConfigs = driver.getDriverConfig();
|
||||||
driverConfigUpgrader.upgradeJobConfig( job.getDriverConfig(), driverConfigs);
|
driverConfigUpgrader.upgradeJobConfig( job.getDriverConfig(), driverConfigs);
|
||||||
|
Loading…
Reference in New Issue
Block a user