5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-06 21:53:44 +08:00

SQOOP-2360: Sqoop2: Upgrade should consider both directions for a single connector

(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2015-05-26 12:29:20 -07:00
parent 788fd54a3d
commit d013d94dbb
4 changed files with 70 additions and 46 deletions

View File

@ -25,6 +25,7 @@
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.common.SupportedDirections;
import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
import org.apache.sqoop.connector.spi.SqoopConnector;
@ -547,55 +548,77 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
// corresponding connectors.
LOG.info(" Job upgrade for job:" + job.getName()+ " for connector:" + connectorName);
if (newConnector.getSupportedDirections().isDirectionSupported(Direction.FROM)) {
List<MConfig> fromConfig = newConnector.getFromConfig().clone(false).getConfigs();
if (job.getFromConnectorId() == newConnector.getPersistenceId()) {
MFromConfig newFromConfig = new MFromConfig(fromConfig);
MFromConfig oldFromConfig = job.getFromJobConfig();
upgrader.upgradeFromJobConfig(oldFromConfig, newFromConfig);
MToConfig oldToConfig = job.getToJobConfig();
// create a job with new FROM direction configs but old TO direction
// configs
MJob newJob = new MJob(job, newFromConfig, oldToConfig, job.getDriverConfig());
SupportedDirections supportedDirections = newConnector.getSupportedDirections();
ConfigValidationResult validationResult = ConfigUtils.validateConfigs(
newJob.getFromJobConfig().getConfigs(),
connector.getJobConfigurationClass(Direction.FROM)
);
if (supportedDirections.isDirectionSupported(Direction.FROM)
&& job.getFromConnectorId() == newConnector.getPersistenceId()
&& supportedDirections.isDirectionSupported(Direction.TO)
&& job.getToConnectorId() == newConnector.getPersistenceId()) {
// Upgrade both configs
MFromConfig newFromConfig = new MFromConfig(newConnector.getFromConfig().clone(false).getConfigs());
MFromConfig oldFromConfig = job.getFromJobConfig();
MToConfig newToConfig = new MToConfig(newConnector.getToConfig().clone(false).getConfigs());
MToConfig oldToConfig = job.getToJobConfig();
upgrader.upgradeFromJobConfig(oldFromConfig, newFromConfig);
upgrader.upgradeToJobConfig(oldToConfig, newToConfig);
if (validationResult.getStatus().canProceed()) {
updateJob(newJob, tx);
} else {
logInvalidModelObject("fromJob", newJob, validationResult);
upgradeSuccessful = false;
LOG.error(" From JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName);
}
MJob newJob = new MJob(job, newFromConfig, newToConfig, job.getDriverConfig());
ConfigValidationResult validationResult = ConfigUtils.validateConfigs(
newJob.getFromJobConfig().getConfigs(),
connector.getJobConfigurationClass(Direction.FROM)
);
if (validationResult.getStatus().canProceed()) {
updateJob(newJob, tx);
} else {
logInvalidModelObject("job", newJob, validationResult);
upgradeSuccessful = false;
LOG.error(" JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName);
}
}
} else if (supportedDirections.isDirectionSupported(Direction.FROM)
&& job.getFromConnectorId() == newConnector.getPersistenceId()) {
MFromConfig newFromConfig = new MFromConfig(newConnector.getFromConfig().clone(false).getConfigs());
MFromConfig oldFromConfig = job.getFromJobConfig();
upgrader.upgradeFromJobConfig(oldFromConfig, newFromConfig);
MToConfig oldToConfig = job.getToJobConfig();
// create a job with new FROM direction configs but old TO direction
// configs
MJob newJob = new MJob(job, newFromConfig, oldToConfig, job.getDriverConfig());
if (newConnector.getSupportedDirections().isDirectionSupported(Direction.TO)) {
List<MConfig> toConfig = newConnector.getToConfig().clone(false).getConfigs();
if (job.getToConnectorId() == newConnector.getPersistenceId()) {
MToConfig oldToConfig = job.getToJobConfig();
MToConfig newToConfig = new MToConfig(toConfig);
upgrader.upgradeToJobConfig(oldToConfig, newToConfig);
MFromConfig oldFromConfig = job.getFromJobConfig();
// create a job with old FROM direction configs but new TO direction
// configs
MJob newJob = new MJob(job, oldFromConfig, newToConfig, job.getDriverConfig());
ConfigValidationResult validationResult = ConfigUtils.validateConfigs(
newJob.getFromJobConfig().getConfigs(),
connector.getJobConfigurationClass(Direction.FROM)
);
ConfigValidationResult validationResult = ConfigUtils.validateConfigs(
newJob.getToJobConfig().getConfigs(),
connector.getJobConfigurationClass(Direction.TO)
);
if (validationResult.getStatus().canProceed()) {
updateJob(newJob, tx);
} else {
logInvalidModelObject("job", newJob, validationResult);
upgradeSuccessful = false;
LOG.error(" FROM JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName);
}
} else if (supportedDirections.isDirectionSupported(Direction.TO)
&& job.getToConnectorId() == newConnector.getPersistenceId()) {
MToConfig oldToConfig = job.getToJobConfig();
MToConfig newToConfig = new MToConfig(newConnector.getToConfig().clone(false).getConfigs());
upgrader.upgradeToJobConfig(oldToConfig, newToConfig);
MFromConfig oldFromConfig = job.getFromJobConfig();
// create a job with old FROM direction configs but new TO direction
// configs
MJob newJob = new MJob(job, oldFromConfig, newToConfig, job.getDriverConfig());
if (validationResult.getStatus().canProceed()) {
updateJob(newJob, tx);
} else {
logInvalidModelObject("toJob", newJob, validationResult);
upgradeSuccessful = false;
LOG.error(" TO JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName);
}
ConfigValidationResult validationResult = ConfigUtils.validateConfigs(
newJob.getToJobConfig().getConfigs(),
connector.getJobConfigurationClass(Direction.TO)
);
if (validationResult.getStatus().canProceed()) {
updateJob(newJob, tx);
} else {
logInvalidModelObject("job", newJob, validationResult);
upgradeSuccessful = false;
LOG.error(" TO JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName);
}
}
}

View File

@ -245,7 +245,7 @@ public void testConnectorConfigUpgradeWithValidLinksAndJobs() {
repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(2, repoTransactionMock);
repoOrder.verify(repoSpy, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(RepositoryTransaction.class));
repoOrder.verify(repoSpy, times(2)).updateLink(any(MLink.class), any(RepositoryTransaction.class));
repoOrder.verify(repoSpy, times(4)).updateJob(any(MJob.class), any(RepositoryTransaction.class));
repoOrder.verify(repoSpy, times(2)).updateJob(any(MJob.class), any(RepositoryTransaction.class));
repoOrder.verifyNoMoreInteractions();
txOrder.verify(repoTransactionMock, times(1)).begin();
txOrder.verify(repoTransactionMock, times(1)).commit();

View File

@ -33,6 +33,7 @@
* Job (-f 3 -t 1) with name "Export" and id 3
* Job (-f 3 -t 1) with blank name and id 4
* Job (-f 3 -t 1) with blank name and id 5
* Job (-f 1 -t 1) with name "SameConnector" and id 6
* Job with id 1 has been executed 3 times
* Job with id 2 has been executed 3 times
* Job with id 3 has been executed 1 times
@ -55,7 +56,7 @@ public int getNumberOfLinks() {
@Override
public int getNumberOfJobs() {
return 5;
return 6;
}
@Override
@ -86,6 +87,6 @@ public Integer[] getDeleteLinkIds() {
@Override
public Integer[] getDeleteJobIds() {
return new Integer[] {1, 2, 3, 4, 5};
return new Integer[] {1, 2, 3, 4, 5, 6};
}
}