5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-06 01:59:09 +08:00

SQOOP-1624: Sqoop2: Apply repository upgrader api

Add supported direction check when upgrading connector as well.

(Qian Xu via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2014-12-25 21:52:55 -08:00
parent 2f0e73067f
commit 02786f0e20
3 changed files with 92 additions and 41 deletions

View File

@ -92,8 +92,7 @@ public To getTo() {
@Override
public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
// TODO: SQOOP-1624
return null;
return new KiteConnectorUpgrader();
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.connector.kite;
import org.apache.log4j.Logger;
import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MLinkConfig;
import org.apache.sqoop.model.MToConfig;
//NOTE: All config types have the similar upgrade path at this point
public class KiteConnectorUpgrader extends ConnectorConfigurableUpgrader {
@Override
public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) {
ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
}
@Override
public void upgradeFromJobConfig(MFromConfig original, MFromConfig upgradeTarget) {
ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
}
@Override
public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) {
ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
}
}

View File

@ -471,52 +471,57 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
// every job has 2 parts, the FROM and the TO links and their
// corresponding connectors.
LOG.info(" Job upgrade for job:" + job.getName()+ " for connector:" + connectorName);
List<MConfig> fromConfig = newConnector.getFromConfig().clone(false).getConfigs();
if (job.getFromConnectorId() == newConnector.getPersistenceId()) {
MFromConfig newFromConfig = new MFromConfig(fromConfig);
MFromConfig oldFromCOnfig = job.getFromJobConfig();
upgrader.upgradeFromJobConfig(oldFromCOnfig, newFromConfig);
MToConfig oldToConfig = job.getToJobConfig();
// create a job with new FROM direction configs but old TO direction
// configs
MJob newJob = new MJob(job, newFromConfig, oldToConfig, job.getDriverConfig());
ConfigValidationResult validationResult = ConfigUtils.validateConfigs(
newJob.getFromJobConfig().getConfigs(),
connector.getJobConfigurationClass(Direction.FROM)
);
if (newConnector.getSupportedDirections().isDirectionSupported(Direction.FROM)) {
List<MConfig> fromConfig = newConnector.getFromConfig().clone(false).getConfigs();
if (job.getFromConnectorId() == newConnector.getPersistenceId()) {
MFromConfig newFromConfig = new MFromConfig(fromConfig);
MFromConfig oldFromConfig = job.getFromJobConfig();
upgrader.upgradeFromJobConfig(oldFromConfig, newFromConfig);
MToConfig oldToConfig = job.getToJobConfig();
// create a job with new FROM direction configs but old TO direction
// configs
MJob newJob = new MJob(job, newFromConfig, oldToConfig, job.getDriverConfig());
if(validationResult.getStatus().canProceed()) {
updateJob(newJob, tx);
} else {
logInvalidModelObject("fromJob", newJob, validationResult);
upgradeSuccessful = false;
LOG.error(" From JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName);
ConfigValidationResult validationResult = ConfigUtils.validateConfigs(
newJob.getFromJobConfig().getConfigs(),
connector.getJobConfigurationClass(Direction.FROM)
);
if (validationResult.getStatus().canProceed()) {
updateJob(newJob, tx);
} else {
logInvalidModelObject("fromJob", newJob, validationResult);
upgradeSuccessful = false;
LOG.error(" From JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName);
}
}
}
List<MConfig> toConfig = newConnector.getToConfig().clone(false).getConfigs();
if (job.getToConnectorId() == newConnector.getPersistenceId()) {
MToConfig oldToConfig = job.getToJobConfig();
MToConfig newToConfig = new MToConfig(toConfig);
upgrader.upgradeToJobConfig(oldToConfig, newToConfig);
MFromConfig oldFromConfig = job.getFromJobConfig();
// create a job with old FROM direction configs but new TO direction
// configs
MJob newJob = new MJob(job, oldFromConfig, newToConfig, job.getDriverConfig());
if (newConnector.getSupportedDirections().isDirectionSupported(Direction.TO)) {
List<MConfig> toConfig = newConnector.getToConfig().clone(false).getConfigs();
if (job.getToConnectorId() == newConnector.getPersistenceId()) {
MToConfig oldToConfig = job.getToJobConfig();
MToConfig newToConfig = new MToConfig(toConfig);
upgrader.upgradeToJobConfig(oldToConfig, newToConfig);
MFromConfig oldFromConfig = job.getFromJobConfig();
// create a job with old FROM direction configs but new TO direction
// configs
MJob newJob = new MJob(job, oldFromConfig, newToConfig, job.getDriverConfig());
ConfigValidationResult validationResult = ConfigUtils.validateConfigs(
newJob.getToJobConfig().getConfigs(),
connector.getJobConfigurationClass(Direction.TO)
);
ConfigValidationResult validationResult = ConfigUtils.validateConfigs(
newJob.getToJobConfig().getConfigs(),
connector.getJobConfigurationClass(Direction.TO)
);
if(validationResult.getStatus().canProceed()) {
updateJob(newJob, tx);
} else {
logInvalidModelObject("toJob", newJob, validationResult);
upgradeSuccessful = false;
LOG.error(" TO JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName);
}
if (validationResult.getStatus().canProceed()) {
updateJob(newJob, tx);
} else {
logInvalidModelObject("toJob", newJob, validationResult);
upgradeSuccessful = false;
LOG.error(" TO JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName);
}
}
}
}
}