diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java index aa91661f..8cbff994 100644 --- a/core/src/main/java/org/apache/sqoop/repository/Repository.java +++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java @@ -25,6 +25,7 @@ import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.common.SupportedDirections; import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; import org.apache.sqoop.connector.spi.SqoopConnector; @@ -547,55 +548,77 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec // corresponding connectors. LOG.info(" Job upgrade for job:" + job.getName()+ " for connector:" + connectorName); - if (newConnector.getSupportedDirections().isDirectionSupported(Direction.FROM)) { - List fromConfig = newConnector.getFromConfig().clone(false).getConfigs(); - if (job.getFromConnectorId() == newConnector.getPersistenceId()) { - MFromConfig newFromConfig = new MFromConfig(fromConfig); - MFromConfig oldFromConfig = job.getFromJobConfig(); - upgrader.upgradeFromJobConfig(oldFromConfig, newFromConfig); - MToConfig oldToConfig = job.getToJobConfig(); - // create a job with new FROM direction configs but old TO direction - // configs - MJob newJob = new MJob(job, newFromConfig, oldToConfig, job.getDriverConfig()); + SupportedDirections supportedDirections = newConnector.getSupportedDirections(); - ConfigValidationResult validationResult = ConfigUtils.validateConfigs( - newJob.getFromJobConfig().getConfigs(), - connector.getJobConfigurationClass(Direction.FROM) - ); + if (supportedDirections.isDirectionSupported(Direction.FROM) + && job.getFromConnectorId() == newConnector.getPersistenceId() + && supportedDirections.isDirectionSupported(Direction.TO) + && job.getToConnectorId() == newConnector.getPersistenceId()) { + // Upgrade both configs + MFromConfig newFromConfig = new MFromConfig(newConnector.getFromConfig().clone(false).getConfigs()); + MFromConfig oldFromConfig = job.getFromJobConfig(); + MToConfig newToConfig = new MToConfig(newConnector.getToConfig().clone(false).getConfigs()); + MToConfig oldToConfig = job.getToJobConfig(); + upgrader.upgradeFromJobConfig(oldFromConfig, newFromConfig); + upgrader.upgradeToJobConfig(oldToConfig, newToConfig); - if (validationResult.getStatus().canProceed()) { - updateJob(newJob, tx); - } else { - logInvalidModelObject("fromJob", newJob, validationResult); - upgradeSuccessful = false; - LOG.error(" From JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName); - } + MJob newJob = new MJob(job, newFromConfig, newToConfig, job.getDriverConfig()); + + ConfigValidationResult validationResult = ConfigUtils.validateConfigs( + newJob.getFromJobConfig().getConfigs(), + connector.getJobConfigurationClass(Direction.FROM) + ); + + if (validationResult.getStatus().canProceed()) { + updateJob(newJob, tx); + } else { + logInvalidModelObject("job", newJob, validationResult); + upgradeSuccessful = false; + LOG.error(" JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName); } - } + } else if (supportedDirections.isDirectionSupported(Direction.FROM) + && job.getFromConnectorId() == newConnector.getPersistenceId()) { + MFromConfig newFromConfig = new MFromConfig(newConnector.getFromConfig().clone(false).getConfigs()); + MFromConfig oldFromConfig = job.getFromJobConfig(); + upgrader.upgradeFromJobConfig(oldFromConfig, newFromConfig); + MToConfig oldToConfig = job.getToJobConfig(); + // create a job with new FROM direction configs but old TO direction + // configs + MJob newJob = new MJob(job, newFromConfig, oldToConfig, job.getDriverConfig()); - if (newConnector.getSupportedDirections().isDirectionSupported(Direction.TO)) { - List toConfig = newConnector.getToConfig().clone(false).getConfigs(); - if (job.getToConnectorId() == newConnector.getPersistenceId()) { - MToConfig oldToConfig = job.getToJobConfig(); - MToConfig newToConfig = new MToConfig(toConfig); - upgrader.upgradeToJobConfig(oldToConfig, newToConfig); - MFromConfig oldFromConfig = job.getFromJobConfig(); - // create a job with old FROM direction configs but new TO direction - // configs - MJob newJob = new MJob(job, oldFromConfig, newToConfig, job.getDriverConfig()); + ConfigValidationResult validationResult = ConfigUtils.validateConfigs( + newJob.getFromJobConfig().getConfigs(), + connector.getJobConfigurationClass(Direction.FROM) + ); - ConfigValidationResult validationResult = ConfigUtils.validateConfigs( - newJob.getToJobConfig().getConfigs(), - connector.getJobConfigurationClass(Direction.TO) - ); + if (validationResult.getStatus().canProceed()) { + updateJob(newJob, tx); + } else { + logInvalidModelObject("job", newJob, validationResult); + upgradeSuccessful = false; + LOG.error(" FROM JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName); + } + } else if (supportedDirections.isDirectionSupported(Direction.TO) + && job.getToConnectorId() == newConnector.getPersistenceId()) { + MToConfig oldToConfig = job.getToJobConfig(); + MToConfig newToConfig = new MToConfig(newConnector.getToConfig().clone(false).getConfigs()); + upgrader.upgradeToJobConfig(oldToConfig, newToConfig); + MFromConfig oldFromConfig = job.getFromJobConfig(); + // create a job with old FROM direction configs but new TO direction + // configs + MJob newJob = new MJob(job, oldFromConfig, newToConfig, job.getDriverConfig()); - if (validationResult.getStatus().canProceed()) { - updateJob(newJob, tx); - } else { - logInvalidModelObject("toJob", newJob, validationResult); - upgradeSuccessful = false; - LOG.error(" TO JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName); - } + ConfigValidationResult validationResult = ConfigUtils.validateConfigs( + newJob.getToJobConfig().getConfigs(), + connector.getJobConfigurationClass(Direction.TO) + ); + + if (validationResult.getStatus().canProceed()) { + updateJob(newJob, tx); + } else { + logInvalidModelObject("job", newJob, validationResult); + upgradeSuccessful = false; + LOG.error(" TO JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName); } } } diff --git a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java index 9b153fc5..f571c801 100644 --- a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java +++ b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java @@ -245,7 +245,7 @@ public void testConnectorConfigUpgradeWithValidLinksAndJobs() { repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(2, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(RepositoryTransaction.class)); repoOrder.verify(repoSpy, times(2)).updateLink(any(MLink.class), any(RepositoryTransaction.class)); - repoOrder.verify(repoSpy, times(4)).updateJob(any(MJob.class), any(RepositoryTransaction.class)); + repoOrder.verify(repoSpy, times(2)).updateJob(any(MJob.class), any(RepositoryTransaction.class)); repoOrder.verifyNoMoreInteractions(); txOrder.verify(repoTransactionMock, times(1)).begin(); txOrder.verify(repoTransactionMock, times(1)).commit(); diff --git a/test/src/test/java/org/apache/sqoop/integration/repository/derby/upgrade/Derby1_99_4UpgradeTest.java b/test/src/test/java/org/apache/sqoop/integration/repository/derby/upgrade/Derby1_99_4UpgradeTest.java index 85ce8ff5..9ee0379b 100644 --- a/test/src/test/java/org/apache/sqoop/integration/repository/derby/upgrade/Derby1_99_4UpgradeTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/repository/derby/upgrade/Derby1_99_4UpgradeTest.java @@ -33,6 +33,7 @@ * Job (-f 3 -t 1) with name "Export" and id 3 * Job (-f 3 -t 1) with blank name and id 4 * Job (-f 3 -t 1) with blank name and id 5 + * Job (-f 1 -t 1) with name "SameConnector" and id 6 * Job with id 1 has been executed 3 times * Job with id 2 has been executed 3 times * Job with id 3 has been executed 1 times @@ -55,7 +56,7 @@ public int getNumberOfLinks() { @Override public int getNumberOfJobs() { - return 5; + return 6; } @Override @@ -86,6 +87,6 @@ public Integer[] getDeleteLinkIds() { @Override public Integer[] getDeleteJobIds() { - return new Integer[] {1, 2, 3, 4, 5}; + return new Integer[] {1, 2, 3, 4, 5, 6}; } } diff --git a/test/src/test/resources/repository/derby/derby-repository-1.99.4.tar.gz b/test/src/test/resources/repository/derby/derby-repository-1.99.4.tar.gz index 7a6ceed1..5912e7c2 100644 Binary files a/test/src/test/resources/repository/derby/derby-repository-1.99.4.tar.gz and b/test/src/test/resources/repository/derby/derby-repository-1.99.4.tar.gz differ