diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnector.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnector.java index c864882a..982d6dde 100644 --- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnector.java +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnector.java @@ -92,8 +92,7 @@ public To getTo() { @Override public ConnectorConfigurableUpgrader getConfigurableUpgrader() { - // TODO: SQOOP-1624 - return null; + return new KiteConnectorUpgrader(); } } \ No newline at end of file diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorUpgrader.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorUpgrader.java new file mode 100644 index 00000000..d3b9f950 --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorUpgrader.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.connector.kite; + +import org.apache.log4j.Logger; +import org.apache.sqoop.configurable.ConfigurableUpgradeUtil; +import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; +import org.apache.sqoop.model.MConfig; +import org.apache.sqoop.model.MFromConfig; +import org.apache.sqoop.model.MLinkConfig; +import org.apache.sqoop.model.MToConfig; + +//NOTE: All config types have the similar upgrade path at this point +public class KiteConnectorUpgrader extends ConnectorConfigurableUpgrader { + + @Override + public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) { + ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); + } + + @Override + public void upgradeFromJobConfig(MFromConfig original, MFromConfig upgradeTarget) { + ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); + } + + @Override + public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) { + ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); + } + +} diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java index 9461afe2..1e8350cf 100644 --- a/core/src/main/java/org/apache/sqoop/repository/Repository.java +++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java @@ -471,52 +471,57 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec // every job has 2 parts, the FROM and the TO links and their // corresponding connectors. LOG.info(" Job upgrade for job:" + job.getName()+ " for connector:" + connectorName); - List fromConfig = newConnector.getFromConfig().clone(false).getConfigs(); - if (job.getFromConnectorId() == newConnector.getPersistenceId()) { - MFromConfig newFromConfig = new MFromConfig(fromConfig); - MFromConfig oldFromCOnfig = job.getFromJobConfig(); - upgrader.upgradeFromJobConfig(oldFromCOnfig, newFromConfig); - MToConfig oldToConfig = job.getToJobConfig(); - // create a job with new FROM direction configs but old TO direction - // configs - MJob newJob = new MJob(job, newFromConfig, oldToConfig, job.getDriverConfig()); - ConfigValidationResult validationResult = ConfigUtils.validateConfigs( - newJob.getFromJobConfig().getConfigs(), - connector.getJobConfigurationClass(Direction.FROM) - ); + if (newConnector.getSupportedDirections().isDirectionSupported(Direction.FROM)) { + List fromConfig = newConnector.getFromConfig().clone(false).getConfigs(); + if (job.getFromConnectorId() == newConnector.getPersistenceId()) { + MFromConfig newFromConfig = new MFromConfig(fromConfig); + MFromConfig oldFromConfig = job.getFromJobConfig(); + upgrader.upgradeFromJobConfig(oldFromConfig, newFromConfig); + MToConfig oldToConfig = job.getToJobConfig(); + // create a job with new FROM direction configs but old TO direction + // configs + MJob newJob = new MJob(job, newFromConfig, oldToConfig, job.getDriverConfig()); - if(validationResult.getStatus().canProceed()) { - updateJob(newJob, tx); - } else { - logInvalidModelObject("fromJob", newJob, validationResult); - upgradeSuccessful = false; - LOG.error(" From JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName); + ConfigValidationResult validationResult = ConfigUtils.validateConfigs( + newJob.getFromJobConfig().getConfigs(), + connector.getJobConfigurationClass(Direction.FROM) + ); + + if (validationResult.getStatus().canProceed()) { + updateJob(newJob, tx); + } else { + logInvalidModelObject("fromJob", newJob, validationResult); + upgradeSuccessful = false; + LOG.error(" From JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName); + } } } - List toConfig = newConnector.getToConfig().clone(false).getConfigs(); - if (job.getToConnectorId() == newConnector.getPersistenceId()) { - MToConfig oldToConfig = job.getToJobConfig(); - MToConfig newToConfig = new MToConfig(toConfig); - upgrader.upgradeToJobConfig(oldToConfig, newToConfig); - MFromConfig oldFromConfig = job.getFromJobConfig(); - // create a job with old FROM direction configs but new TO direction - // configs - MJob newJob = new MJob(job, oldFromConfig, newToConfig, job.getDriverConfig()); + if (newConnector.getSupportedDirections().isDirectionSupported(Direction.TO)) { + List toConfig = newConnector.getToConfig().clone(false).getConfigs(); + if (job.getToConnectorId() == newConnector.getPersistenceId()) { + MToConfig oldToConfig = job.getToJobConfig(); + MToConfig newToConfig = new MToConfig(toConfig); + upgrader.upgradeToJobConfig(oldToConfig, newToConfig); + MFromConfig oldFromConfig = job.getFromJobConfig(); + // create a job with old FROM direction configs but new TO direction + // configs + MJob newJob = new MJob(job, oldFromConfig, newToConfig, job.getDriverConfig()); - ConfigValidationResult validationResult = ConfigUtils.validateConfigs( - newJob.getToJobConfig().getConfigs(), - connector.getJobConfigurationClass(Direction.TO) - ); + ConfigValidationResult validationResult = ConfigUtils.validateConfigs( + newJob.getToJobConfig().getConfigs(), + connector.getJobConfigurationClass(Direction.TO) + ); - if(validationResult.getStatus().canProceed()) { - updateJob(newJob, tx); - } else { - logInvalidModelObject("toJob", newJob, validationResult); - upgradeSuccessful = false; - LOG.error(" TO JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName); - } + if (validationResult.getStatus().canProceed()) { + updateJob(newJob, tx); + } else { + logInvalidModelObject("toJob", newJob, validationResult); + upgradeSuccessful = false; + LOG.error(" TO JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName); + } + } } } }