5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-20 19:00:48 +08:00

SQOOP-1551: Repository Upgrader api - Extensibility

(Veena Basavaraj via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2014-10-21 06:39:32 -07:00
parent 3257b38555
commit 39a2200007
35 changed files with 632 additions and 525 deletions

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.model;
/**
* Marker class that identifies the Configurables in the Sqoop system
*/
public abstract class Configurable extends MPersistableEntity implements MClonable {
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.model;
/**
* Represents the sqoop entities that can own configs
*/
public enum MConfigurableType {
/** Connector as a owner of config keys */
CONNECTOR,
/** Driver as a owner of config keys */
DRIVER;
}

View File

@ -23,11 +23,11 @@
import org.apache.sqoop.common.SupportedDirections; import org.apache.sqoop.common.SupportedDirections;
/** /**
* Connector entity supports the FROM/TO {@link Transferable} Includes unique id * Connector entity supports the FROM/TO {@link org.apache.sqoop.job.etl.Transfereable} Includes unique id
* that identifies connector in the repository, unique human readable name, * that identifies connector in the repository, unique human readable name,
* corresponding name and all configs to support the from and to data sources * corresponding name and all configs to support the from and to data sources
*/ */
public final class MConnector extends MPersistableEntity implements MClonable { public final class MConnector extends Configurable {
private final String uniqueName; private final String uniqueName;
private final String className; private final String className;

View File

@ -22,7 +22,7 @@
/** /**
* Describes the configs associated with the {@link Driver} for executing sqoop jobs. * Describes the configs associated with the {@link Driver} for executing sqoop jobs.
*/ */
public class MDriver extends MPersistableEntity implements MClonable { public final class MDriver extends Configurable {
private final MDriverConfig driverConfig; private final MDriverConfig driverConfig;
private final String version; private final String version;

View File

@ -139,6 +139,14 @@ public long getConnectorId(Direction type) {
} }
} }
public long getFromConnectorId() {
return fromConnectorId;
}
public long getToConnectorId() {
return toConnectorId;
}
public MConfigList getJobConfig(Direction type) { public MConfigList getJobConfig(Direction type) {
switch(type) { switch(type) {
case FROM: case FROM:

View File

@ -25,7 +25,7 @@
import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration; import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration; import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.spi.RepositoryUpgrader; import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
import org.apache.sqoop.job.etl.From; import org.apache.sqoop.job.etl.From;
import org.apache.sqoop.job.etl.To; import org.apache.sqoop.job.etl.To;
import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.connector.spi.SqoopConnector;
@ -97,7 +97,7 @@ public Validator getConfigValidator() {
} }
@Override @Override
public RepositoryUpgrader getRepositoryUpgrader() { public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
return new GenericJdbcConnectorUpgrader(); return new GenericJdbcConnectorUpgrader();
} }

View File

@ -18,64 +18,27 @@
*/ */
package org.apache.sqoop.connector.jdbc; package org.apache.sqoop.connector.jdbc;
import java.util.HashMap; import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
import java.util.List; import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
import java.util.Map; import org.apache.sqoop.model.MFromConfig;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.spi.RepositoryUpgrader;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MInput;
import org.apache.sqoop.model.MLinkConfig; import org.apache.sqoop.model.MLinkConfig;
import org.apache.sqoop.model.MToConfig;
public class GenericJdbcConnectorUpgrader extends RepositoryUpgrader { // NOTE: All config types have the similar upgrade path at this point
private static final Logger LOG = Logger.getLogger(GenericJdbcConnectorUpgrader.class); public class GenericJdbcConnectorUpgrader extends ConnectorConfigurableUpgrader {
/*
* For now, there is no real upgrade. So copy all data over,
* set the validation messages and error messages to be the same as for the
* inputs in the original one.
*/
@Override @Override
public void upgrade(MLinkConfig original, MLinkConfig upgradeTarget) { public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) {
doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
} }
@Override @Override
public void upgrade(MConfigList original, MConfigList upgradeTarget) { public void upgradeFromJobConfig(MFromConfig original, MFromConfig upgradeTarget) {
doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
} }
@SuppressWarnings("unchecked") @Override
private void doUpgrade(List<MConfig> original, List<MConfig> target) { public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) {
// Easier to find the config in the original list if we use a map. ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
// Since the constructor takes a list,
// index is not guaranteed to be the same, so we need to look for
// equivalence
Map<String, MConfig> configMap = new HashMap<String, MConfig>();
for (MConfig config : original) {
configMap.put(config.getName(), config);
}
for (MConfig config : target) {
List<MInput<?>> inputs = config.getInputs();
MConfig orginalConfig = configMap.get(config.getName());
if (orginalConfig == null) {
LOG.warn("Config: '" + config.getName() + "' not present in old " +
"generic JDBC connector. So it and its inputs will not be transferred by the upgrader.");
continue;
}
for (MInput input : inputs) {
try {
MInput originalInput = orginalConfig.getInput(input.getName());
input.setValue(originalInput.getValue());
} catch (SqoopException ex) {
LOG.warn("Input: '" + input.getName() + "' not present in old " +
"generic JDBC connector. So it will not be transferred by the upgrader.");
}
}
}
} }
} }

View File

@ -24,7 +24,7 @@
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.spi.RepositoryUpgrader; import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.job.etl.From; import org.apache.sqoop.job.etl.From;
import org.apache.sqoop.job.etl.To; import org.apache.sqoop.job.etl.To;
@ -123,13 +123,13 @@ public Validator getConfigValidator() {
} }
/** /**
* Returns an {@linkplain org.apache.sqoop.connector.spi.RepositoryUpgrader} object that can upgrade the * Returns an {@linkplain org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader} object that can upgrade the
* connection and job metadata. * connection and job metadata.
* *
* @return MetadataUpgrader object * @return MetadataUpgrader object
*/ */
@Override @Override
public RepositoryUpgrader getRepositoryUpgrader() { public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
return new HdfsConfigUpgrader(); return new HdfsConnectorUpgrader();
} }
} }

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MLinkConfig;
import org.apache.sqoop.model.MToConfig;
//NOTE: All config types have the similar upgrade path at this point
public class HdfsConnectorUpgrader extends ConnectorConfigurableUpgrader {
@Override
public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) {
ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
}
@Override
public void upgradeFromJobConfig(MFromConfig original, MFromConfig upgradeTarget) {
ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
}
@Override
public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) {
ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
}
}

View File

@ -43,15 +43,14 @@ public final class ConnectorHandler {
private final String connectorUniqueName; private final String connectorUniqueName;
private final SqoopConnector connector; private final SqoopConnector connector;
private MConnector mConnector; private MConnector connectorConfigurable;
public ConnectorHandler(URL configFileUrl) { public ConnectorHandler(URL configFileUrl) {
connectorUrl = configFileUrl.toString(); connectorUrl = configFileUrl.toString();
try { try {
properties.load(configFileUrl.openStream()); properties.load(configFileUrl.openStream());
} catch (IOException ex) { } catch (IOException ex) {
throw new SqoopException(ConnectorError.CONN_0003, throw new SqoopException(ConnectorError.CONN_0003, configFileUrl.toString(), ex);
configFileUrl.toString(), ex);
} }
LOG.debug("Connector configuration: " + properties); LOG.debug("Connector configuration: " + properties);
@ -64,12 +63,9 @@ public ConnectorHandler(URL configFileUrl) {
ConfigurationConstants.CONPROP_PROVIDER_CLASS); ConfigurationConstants.CONPROP_PROVIDER_CLASS);
} }
connectorUniqueName = properties.getProperty(ConfigurationConstants.CONNPROP_CONNECTOR_NAME);
connectorUniqueName = properties.getProperty( if (connectorUniqueName == null || connectorUniqueName.trim().length() == 0) {
ConfigurationConstants.CONNPROP_CONNECTOR_NAME);
if (connectorUniqueName == null || connectorUniqueName.trim().length() == 0)
{
throw new SqoopException(ConnectorError.CONN_0008, connectorClassName); throw new SqoopException(ConnectorError.CONN_0008, connectorClassName);
} }
@ -103,13 +99,11 @@ public ConnectorHandler(URL configFileUrl) {
connector.getJobConfigurationClass(Direction.TO))); connector.getJobConfigurationClass(Direction.TO)));
} }
MLinkConfig connectionForms = new MLinkConfig( MLinkConfig linkConfig = new MLinkConfig(
ConfigUtils.toConfigs(connector.getLinkConfigurationClass())); ConfigUtils.toConfigs(connector.getLinkConfigurationClass()));
String connectorVersion = connector.getVersion(); connectorConfigurable = new MConnector(connectorUniqueName, connectorClassName, connector.getVersion(),
linkConfig, fromConfig, toConfig);
mConnector = new MConnector(connectorUniqueName, connectorClassName, connectorVersion,
connectionForms, fromConfig, toConfig);
if (LOG.isInfoEnabled()) { if (LOG.isInfoEnabled()) {
LOG.info("Connector [" + connectorClassName + "] initialized."); LOG.info("Connector [" + connectorClassName + "] initialized.");
@ -133,15 +127,15 @@ public String getConnectorUrl() {
return connectorUrl; return connectorUrl;
} }
public MConnector getMetadata() { public MConnector getConnectorConfigurable() {
return mConnector; return connectorConfigurable;
} }
public void setMetadata(MConnector connector) { public void setConnectorConfigurable(MConnector mConnector) {
this.mConnector = connector; this.connectorConfigurable = mConnector;
} }
public SqoopConnector getConnector() { public SqoopConnector getSqoopConnector() {
return connector; return connector;
} }
} }

View File

@ -92,10 +92,10 @@ public static void setInstance(ConnectorManager newInstance) {
private Map<String, ConnectorHandler> handlerMap = private Map<String, ConnectorHandler> handlerMap =
new HashMap<String, ConnectorHandler>(); new HashMap<String, ConnectorHandler>();
public List<MConnector> getConnectorsMetadata() { public List<MConnector> getConnectorConfigurables() {
List<MConnector> connectors = new LinkedList<MConnector>(); List<MConnector> connectors = new LinkedList<MConnector>();
for(ConnectorHandler handler : handlerMap.values()) { for(ConnectorHandler handler : handlerMap.values()) {
connectors.add(handler.getMetadata()); connectors.add(handler.getConnectorConfigurable());
} }
return connectors; return connectors;
} }
@ -107,8 +107,8 @@ public Set<Long> getConnectorIds() {
public Map<Long, ResourceBundle> getResourceBundles(Locale locale) { public Map<Long, ResourceBundle> getResourceBundles(Locale locale) {
Map<Long, ResourceBundle> bundles = new HashMap<Long, ResourceBundle>(); Map<Long, ResourceBundle> bundles = new HashMap<Long, ResourceBundle>();
for(ConnectorHandler handler : handlerMap.values()) { for(ConnectorHandler handler : handlerMap.values()) {
long id = handler.getMetadata().getPersistenceId(); long id = handler.getConnectorConfigurable().getPersistenceId();
ResourceBundle bundle = handler.getConnector().getBundle(locale); ResourceBundle bundle = handler.getSqoopConnector().getBundle(locale);
bundles.put(id, bundle); bundles.put(id, bundle);
} }
return bundles; return bundles;
@ -116,25 +116,24 @@ public Map<Long, ResourceBundle> getResourceBundles(Locale locale) {
public ResourceBundle getResourceBundle(long connectorId, Locale locale) { public ResourceBundle getResourceBundle(long connectorId, Locale locale) {
ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId)); ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId));
return handler.getConnector().getBundle(locale); return handler.getSqoopConnector().getBundle(locale);
} }
public MConnector getConnectorConfig(long connectorId) { public MConnector getConnectorConfigurable(long connectorId) {
ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId)); ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId));
if(handler == null) { if(handler == null) {
return null; return null;
} }
return handler.getConnectorConfigurable();
return handler.getMetadata();
} }
public SqoopConnector getConnector(long connectorId) { public SqoopConnector getSqoopConnector(long connectorId) {
ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId)); ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId));
return handler.getConnector(); return handler.getSqoopConnector();
} }
public SqoopConnector getConnector(String uniqueName) { public SqoopConnector getSqoopConnector(String uniqueName) {
return handlerMap.get(uniqueName).getConnector(); return handlerMap.get(uniqueName).getSqoopConnector();
} }
public synchronized void initialize() { public synchronized void initialize() {
@ -182,21 +181,21 @@ private synchronized void registerConnectors(boolean autoUpgrade) {
rtx.begin(); rtx.begin();
for (String name : handlerMap.keySet()) { for (String name : handlerMap.keySet()) {
ConnectorHandler handler = handlerMap.get(name); ConnectorHandler handler = handlerMap.get(name);
MConnector connectorMetadata = handler.getMetadata(); MConnector connectorMetadata = handler.getConnectorConfigurable();
MConnector registeredMetadata = MConnector registeredMetadata =
repository.registerConnector(connectorMetadata, autoUpgrade); repository.registerConnector(connectorMetadata, autoUpgrade);
// Set registered metadata instead of connector metadata as they will // Set registered metadata instead of connector metadata as they will
// have filled persistent ids. We should be confident at this point that // have filled persistent ids. We should be confident at this point that
// there are no differences between those two structures. // there are no differences between those two structures.
handler.setMetadata(registeredMetadata); handler.setConnectorConfigurable(registeredMetadata);
String connectorName = handler.getUniqueName(); String connectorName = handler.getUniqueName();
if (!handler.getMetadata().hasPersistenceId()) { if (!handler.getConnectorConfigurable().hasPersistenceId()) {
throw new SqoopException(ConnectorError.CONN_0010, connectorName); throw new SqoopException(ConnectorError.CONN_0010, connectorName);
} }
nameMap.put(handler.getMetadata().getPersistenceId(), connectorName); nameMap.put(handler.getConnectorConfigurable().getPersistenceId(), connectorName);
LOG.debug("Registered connector: " + handler.getMetadata()); LOG.debug("Registered connector: " + handler.getConnectorConfigurable());
} }
rtx.commit(); rtx.commit();
} catch (Exception ex) { } catch (Exception ex) {

View File

@ -22,12 +22,11 @@
import java.util.ResourceBundle; import java.util.ResourceBundle;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.connector.spi.RepositoryUpgrader;
import org.apache.sqoop.core.ConfigurationConstants; import org.apache.sqoop.core.ConfigurationConstants;
import org.apache.sqoop.core.Reconfigurable; import org.apache.sqoop.core.Reconfigurable;
import org.apache.sqoop.core.SqoopConfiguration; import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener; import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
import org.apache.sqoop.driver.configuration.DriverConfiguration; import org.apache.sqoop.driver.configuration.JobConfiguration;
import org.apache.sqoop.json.DriverBean; import org.apache.sqoop.json.DriverBean;
import org.apache.sqoop.model.ConfigUtils; import org.apache.sqoop.model.ConfigUtils;
import org.apache.sqoop.model.MConfig; import org.apache.sqoop.model.MConfig;
@ -105,25 +104,26 @@ public static void setInstance(Driver newInstance) {
/** /**
* Driver config upgrader instance * Driver config upgrader instance
*/ */
private final RepositoryUpgrader driverConfigUpgrader; private final DriverUpgrader driverUpgrader;
/** /**
* Default driver config auto upgrade option value * Default driver config auto upgrade option value
*/ */
private static final boolean DEFAULT_AUTO_UPGRADE = false; private static final boolean DEFAULT_AUTO_UPGRADE = false;
public Class getDriverConfigurationGroupClass() { @SuppressWarnings("rawtypes")
return DriverConfiguration.class; public Class getDriverJobConfigurationClass() {
return JobConfiguration.class;
} }
public Driver() { public Driver() {
List<MConfig> driverConfig = ConfigUtils.toConfigs(getDriverConfigurationGroupClass()); List<MConfig> driverConfig = ConfigUtils.toConfigs(getDriverJobConfigurationClass());
mDriver = new MDriver(new MDriverConfig(driverConfig), DriverBean.CURRENT_DRIVER_VERSION); mDriver = new MDriver(new MDriverConfig(driverConfig), DriverBean.CURRENT_DRIVER_VERSION);
// Build validator // Build validator
driverValidator = new DriverConfigValidator(); driverValidator = new DriverConfigValidator();
// Build upgrader // Build upgrader
driverConfigUpgrader = new DriverConfigUpgrader(); driverUpgrader = new DriverUpgrader();
} }
public synchronized void initialize() { public synchronized void initialize() {
@ -150,8 +150,8 @@ public Validator getValidator() {
return driverValidator; return driverValidator;
} }
public RepositoryUpgrader getDriverConfigRepositoryUpgrader() { public DriverUpgrader getConfigurableUpgrader() {
return driverConfigUpgrader; return driverUpgrader;
} }
public MDriver getDriver() { public MDriver getDriver() {

View File

@ -1,77 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.driver;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.spi.RepositoryUpgrader;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MInput;
import org.apache.sqoop.model.MLinkConfig;
public class DriverConfigUpgrader extends RepositoryUpgrader{
private static final Logger LOG = Logger.getLogger(DriverConfigUpgrader.class);
@Override
public void upgrade(MLinkConfig original, MLinkConfig upgradeTarget) {
// NOTE(VB): There are no link configs anymore for driver, this code remains for previous versions
}
@Override
public void upgrade(MConfigList original, MConfigList upgradeTarget) {
doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
}
@SuppressWarnings("unchecked")
private void doUpgrade(List<MConfig> original, List<MConfig> target) {
// Easier to find the config in the original list if we use a map.
// Since the constructor takes a list,
// index is not guaranteed to be the same, so we need to look for
// equivalence
Map<String, MConfig> configMap = new HashMap<String, MConfig>();
for (MConfig config : original) {
configMap.put(config.getName(), config);
}
for (MConfig config : target) {
List<MInput<?>> inputs = config.getInputs();
MConfig originalConfig = configMap.get(config.getName());
if(originalConfig == null) {
LOG.warn("Config: " + config.getName() + " not present in old " +
"driver config. So it will not be transferred by the upgrader.");
continue;
}
for (MInput input : inputs) {
try {
MInput originalInput = originalConfig.getInput(input.getName());
input.setValue(originalInput.getValue());
} catch (SqoopException ex) {
LOG.warn("Input: " + input.getName() + " not present in old " +
"driver config. So it will not be transferred by the upgrader.");
}
}
}
}
}

View File

@ -17,7 +17,7 @@
*/ */
package org.apache.sqoop.driver; package org.apache.sqoop.driver;
import org.apache.sqoop.driver.configuration.DriverConfiguration; import org.apache.sqoop.driver.configuration.JobConfiguration;
import org.apache.sqoop.driver.configuration.ThrottlingConfig; import org.apache.sqoop.driver.configuration.ThrottlingConfig;
import org.apache.sqoop.validation.Status; import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.ConfigValidator; import org.apache.sqoop.validation.ConfigValidator;
@ -26,8 +26,8 @@
public class DriverConfigValidator extends Validator { public class DriverConfigValidator extends Validator {
@Override @Override
public ConfigValidator validateConfigForJob(Object jobConfiguration) { public ConfigValidator validateConfigForJob(Object jobConfiguration) {
ConfigValidator validation = new ConfigValidator(DriverConfiguration.class); ConfigValidator validation = new ConfigValidator(JobConfiguration.class);
DriverConfiguration conf = (DriverConfiguration)jobConfiguration; JobConfiguration conf = (JobConfiguration)jobConfiguration;
validateThrottlingConfig(validation,conf.throttlingConfig); validateThrottlingConfig(validation,conf.throttlingConfig);
return validation; return validation;

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.driver;
import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
import org.apache.sqoop.model.MDriverConfig;
public class DriverUpgrader {
public void upgradeJobConfig(MDriverConfig original, MDriverConfig upgradeTarget) {
ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
}
}

View File

@ -30,7 +30,7 @@
import org.apache.sqoop.core.Reconfigurable; import org.apache.sqoop.core.Reconfigurable;
import org.apache.sqoop.core.SqoopConfiguration; import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener; import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
import org.apache.sqoop.driver.configuration.DriverConfiguration; import org.apache.sqoop.driver.configuration.JobConfiguration;
import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext; import org.apache.sqoop.job.etl.DestroyerContext;
import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.Initializer;
@ -306,9 +306,9 @@ private JobRequest createJobRequest(long jobId, MSubmission submission) {
MLink toConnection = getLink(job.getLinkId(Direction.TO)); MLink toConnection = getLink(job.getLinkId(Direction.TO));
// get from/to connectors for the connection // get from/to connectors for the connection
SqoopConnector fromConnector = getConnector(fromConnection.getConnectorId()); SqoopConnector fromConnector = getSqoopConnector(fromConnection.getConnectorId());
validateSupportedDirection(fromConnector, Direction.FROM); validateSupportedDirection(fromConnector, Direction.FROM);
SqoopConnector toConnector = getConnector(toConnection.getConnectorId()); SqoopConnector toConnector = getSqoopConnector(toConnection.getConnectorId());
validateSupportedDirection(toConnector, Direction.TO); validateSupportedDirection(toConnector, Direction.TO);
// link config for the FROM part of the job // link config for the FROM part of the job
@ -329,7 +329,7 @@ private JobRequest createJobRequest(long jobId, MSubmission submission) {
// the only driver config for the job // the only driver config for the job
Object driverConfig = ClassUtils Object driverConfig = ClassUtils
.instantiate(Driver.getInstance().getDriverConfigurationGroupClass()); .instantiate(Driver.getInstance().getDriverJobConfigurationClass());
ConfigUtils.fromConfigs(job.getDriverConfig().getConfigs(), driverConfig); ConfigUtils.fromConfigs(job.getDriverConfig().getConfigs(), driverConfig);
@ -402,8 +402,8 @@ MSubmission createJobSubmission(HttpEventContext ctx, long jobId) {
return summary; return summary;
} }
SqoopConnector getConnector(long connnectorId) { SqoopConnector getSqoopConnector(long connnectorId) {
return ConnectorManager.getInstance().getConnector(connnectorId); return ConnectorManager.getInstance().getSqoopConnector(connnectorId);
} }
void validateSupportedDirection(SqoopConnector connector, Direction direction) { void validateSupportedDirection(SqoopConnector connector, Direction direction) {
@ -480,7 +480,7 @@ private InitializerContext getConnectorInitializerContext(JobRequest jobRequest,
} }
void prepareJob(JobRequest request) { void prepareJob(JobRequest request) {
DriverConfiguration jobConfiguration = (DriverConfiguration) request.getDriverConfig(); JobConfiguration jobConfiguration = (JobConfiguration) request.getDriverConfig();
// We're directly moving configured number of extractors and loaders to // We're directly moving configured number of extractors and loaders to
// underlying request object. In the future we might need to throttle this // underlying request object. In the future we might need to throttle this
// count based on other running jobs to meet our SLAs. // count based on other running jobs to meet our SLAs.

View File

@ -21,14 +21,14 @@
import org.apache.sqoop.model.Config; import org.apache.sqoop.model.Config;
/** /**
* Representing the core job configuration * Representing the driver job configuration
*/ */
@ConfigurationClass @ConfigurationClass
public class DriverConfiguration { public class JobConfiguration {
@Config @Config
public ThrottlingConfig throttlingConfig; public ThrottlingConfig throttlingConfig;
public DriverConfiguration() { public JobConfiguration() {
throttlingConfig = new ThrottlingConfig(); throttlingConfig = new ThrottlingConfig();
} }
} }

View File

@ -163,10 +163,6 @@ public Object doIt(Connection conn) throws Exception {
handler.registerConnector(mConnector, conn); handler.registerConnector(mConnector, conn);
return mConnector; return mConnector;
} else { } else {
// Same connector, check if the version is the same.
// For now, use the "string" versions itself - later we should
// probably include a build number or something that is
// monotonically increasing.
if (connectorResult.getUniqueName().equals(mConnector.getUniqueName()) && if (connectorResult.getUniqueName().equals(mConnector.getUniqueName()) &&
mConnector.getVersion().compareTo(connectorResult.getVersion()) > 0) { mConnector.getVersion().compareTo(connectorResult.getVersion()) > 0) {
if (autoUpgrade) { if (autoUpgrade) {
@ -652,23 +648,23 @@ public Object doIt(Connection conn) throws Exception {
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
protected void upgradeConnector(final MConnector newConnector, protected void upgradeConnectorConfigs(final MConnector newConnector,
RepositoryTransaction tx) { RepositoryTransaction tx) {
doWithConnection(new DoWithConnection() { doWithConnection(new DoWithConnection() {
@Override @Override
public Object doIt(Connection conn) throws Exception { public Object doIt(Connection conn) throws Exception {
handler.upgradeConnector(newConnector, conn); handler.upgradeConnectorConfigs(newConnector, conn);
return null; return null;
} }
}, (JdbcRepositoryTransaction) tx); }, (JdbcRepositoryTransaction) tx);
} }
protected void upgradeDriver(final MDriver mDriver, RepositoryTransaction tx) { protected void upgradeDriverConfigs(final MDriver mDriver, RepositoryTransaction tx) {
doWithConnection(new DoWithConnection() { doWithConnection(new DoWithConnection() {
@Override @Override
public Object doIt(Connection conn) throws Exception { public Object doIt(Connection conn) throws Exception {
handler.upgradeDriver(mDriver, conn); handler.upgradeDriverConfigs(mDriver, conn);
return null; return null;
} }
}, (JdbcRepositoryTransaction) tx); }, (JdbcRepositoryTransaction) tx);

View File

@ -101,7 +101,7 @@ public abstract List<MJob> findJobsForConnector(long connectorID,
* @param conn JDBC link for querying repository * @param conn JDBC link for querying repository
*/ */
public abstract void upgradeConnector(MConnector mConnector, Connection conn); public abstract void upgradeConnectorConfigs(MConnector mConnector, Connection conn);
/** /**
@ -117,7 +117,7 @@ public abstract List<MJob> findJobsForConnector(long connectorID,
* the driverConfig. * the driverConfig.
* @param conn JDBC link for querying repository * @param conn JDBC link for querying repository
*/ */
public abstract void upgradeDriver(MDriver mDriver, Connection conn); public abstract void upgradeDriverConfigs(MDriver mDriver, Connection conn);
/** /**

View File

@ -22,12 +22,12 @@
import java.util.Map; import java.util.Map;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.RepositoryUpgrader; import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.driver.Driver; import org.apache.sqoop.driver.Driver;
import org.apache.sqoop.driver.DriverUpgrader;
import org.apache.sqoop.json.DriverBean; import org.apache.sqoop.json.DriverBean;
import org.apache.sqoop.model.ConfigUtils; import org.apache.sqoop.model.ConfigUtils;
import org.apache.sqoop.model.MConfig; import org.apache.sqoop.model.MConfig;
@ -317,7 +317,7 @@ public abstract class Repository {
* method will not call begin, commit, * method will not call begin, commit,
* rollback or close on this transaction. * rollback or close on this transaction.
*/ */
protected abstract void upgradeConnector(MConnector newConnector, RepositoryTransaction tx); protected abstract void upgradeConnectorConfigs(MConnector newConnector, RepositoryTransaction tx);
/** /**
* Upgrade the driver with the new data supplied in the * Upgrade the driver with the new data supplied in the
@ -335,7 +335,7 @@ public abstract class Repository {
* method will not call begin, commit, * method will not call begin, commit,
* rollback or close on this transaction. * rollback or close on this transaction.
*/ */
protected abstract void upgradeDriver(MDriver newDriver, RepositoryTransaction tx); protected abstract void upgradeDriverConfigs(MDriver newDriver, RepositoryTransaction tx);
/** /**
* Delete all inputs for a job * Delete all inputs for a job
@ -388,84 +388,88 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
LOG.info("Upgrading connector: " + oldConnector.getUniqueName()); LOG.info("Upgrading connector: " + oldConnector.getUniqueName());
long connectorID = oldConnector.getPersistenceId(); long connectorID = oldConnector.getPersistenceId();
newConnector.setPersistenceId(connectorID); newConnector.setPersistenceId(connectorID);
/* Algorithms:
* 1. Get an upgrader for the connector.
* 2. Get all links associated with the connector.
* 3. Get all jobs associated with the connector.
* 4. Delete the inputs for all of the jobs and links (in that order)
* 5. Remove all inputs and configs associated with the connector, and
* register the new configs and inputs.
* 6. Create new links and jobs with connector part being the ones
* returned by the upgrader.
* 7. Validate new links and jobs with connector's validator
* 8. If any invalid links or jobs detected, throw an exception
* and stop the bootup of Sqoop server
* 9. Otherwise, Insert the link inputs followed by job inputs (using
* updateJob and updatelink)
*/
RepositoryTransaction tx = null; RepositoryTransaction tx = null;
try { try {
SqoopConnector connector = SqoopConnector connector = ConnectorManager.getInstance().getSqoopConnector(
ConnectorManager.getInstance().getConnector(newConnector newConnector.getUniqueName());
.getUniqueName());
Validator connectorConfigValidator = connector.getConfigValidator(); Validator connectorConfigValidator = connector.getConfigValidator();
boolean upgradeSuccessful = true; boolean upgradeSuccessful = true;
RepositoryUpgrader upgrader = connector.getRepositoryUpgrader(); // 1. Get an upgrader for the connector
List<MLink> linksByConnector = findLinksForConnector(connectorID); ConnectorConfigurableUpgrader upgrader = connector.getConfigurableUpgrader();
List<MJob> jobsByConnector = findJobsForConnector(connectorID); // 2. Get all links associated with the connector.
List<MLink> existingLinksByConnector = findLinksForConnector(connectorID);
// 3. Get all jobs associated with the connector.
List<MJob> existingJobsByConnector = findJobsForConnector(connectorID);
// -- BEGIN TXN -- // -- BEGIN TXN --
tx = getTransaction(); tx = getTransaction();
tx.begin(); tx.begin();
deletelinksAndJobs(linksByConnector, jobsByConnector, tx); // 4. Delete the inputs for all of the jobs and links (in that order) for
upgradeConnector(newConnector, tx); // this connector
for (MLink oldLink : linksByConnector) { deletelinksAndJobs(existingLinksByConnector, existingJobsByConnector, tx);
// Make a new copy of the configs // 5. Delete all inputs and configs associated with the connector, and
List<MConfig> linkConfig = newConnector.getLinkConfig().clone(false).getConfigs(); // insert the new configs and inputs for this connector
MLinkConfig newLinkConfig = new MLinkConfig(linkConfig); upgradeConnectorConfigs(newConnector, tx);
MLinkConfig oldLinkConfig = oldLink.getConnectorLinkConfig(); // 6. Run upgrade logic for the configs related to the link objects
upgrader.upgrade(oldLinkConfig, newLinkConfig); // dont always rely on the repository implementation to return empty list for links
if (existingLinksByConnector != null) {
for (MLink link : existingLinksByConnector) {
// Make a new copy of the configs
List<MConfig> linkConfig = newConnector.getLinkConfig().clone(false).getConfigs();
MLinkConfig newLinkConfig = new MLinkConfig(linkConfig);
MLinkConfig oldLinkConfig = link.getConnectorLinkConfig();
upgrader.upgradeLinkConfig(oldLinkConfig, newLinkConfig);
MLink newlink = new MLink(link, newLinkConfig);
MLink newlink = new MLink(oldLink, newLinkConfig); Object newConfigurationObject = ClassUtils.instantiate(connector
.getLinkConfigurationClass());
Object newConfigurationObject = ClassUtils.instantiate(connector.getLinkConfigurationClass()); ConfigUtils.fromConfigs(newlink.getConnectorLinkConfig().getConfigs(),
ConfigUtils.fromConfigs(newlink.getConnectorLinkConfig().getConfigs(), newConfigurationObject); newConfigurationObject);
// 7. Run link config validation
ConfigValidator configValidator = connectorConfigValidator.validateConfigForLink(newConfigurationObject); ConfigValidator configValidator = connectorConfigValidator
if (configValidator.getStatus().canProceed()) { .validateConfigForLink(newConfigurationObject);
updateLink(newlink, tx); if (configValidator.getStatus().canProceed()) {
} else { updateLink(newlink, tx);
logInvalidModelObject("link", newlink, configValidator); } else {
upgradeSuccessful = false; // If any invalid links or jobs detected, throw an exception
// and stop the bootup of Sqoop server
logInvalidModelObject("link", newlink, configValidator);
upgradeSuccessful = false;
}
} }
} }
for (MJob job : jobsByConnector) { // 8. Run upgrade logic for the configs related to the job objects
// Make a new copy of the configs if (existingJobsByConnector != null) {
// else the values will get set in the configs in the connector for for (MJob job : existingJobsByConnector) {
// each job. // every job has 2 parts, the FROM and the TO links and their
List<MConfig> fromConfig = newConnector.getConfig(Direction.FROM).clone(false).getConfigs(); // corresponding connectors.
List<MConfig> toConfig = newConnector.getConfig(Direction.TO).clone(false).getConfigs(); List<MConfig> fromConfig = newConnector.getFromConfig().clone(false).getConfigs();
if (job.getFromConnectorId() == newConnector.getPersistenceId()) {
// New FROM direction configs, old TO direction configs. MFromConfig newFromConfig = new MFromConfig(fromConfig);
if (job.getConnectorId(Direction.FROM) == newConnector.getPersistenceId()) { MFromConfig oldFromCOnfig = job.getFromJobConfig();
MFromConfig newFromConfig = new MFromConfig(fromConfig); upgrader.upgradeFromJobConfig(oldFromCOnfig, newFromConfig);
MFromConfig oldFromCOnfig = job.getFromJobConfig(); MToConfig oldToConfig = job.getToJobConfig();
upgrader.upgrade(oldFromCOnfig, newFromConfig); // create a job with new FROM direction configs but old TO direction
// configs
MToConfig oldToConfig = job.getToJobConfig(); MJob newJob = new MJob(job, newFromConfig, oldToConfig, job.getDriverConfig());
MJob newJob = new MJob(job, newFromConfig, oldToConfig, job.getDriverConfig()); // TODO( jarcec) : will add the job config validation logic similar
updateJob(newJob, tx); // to the link config validation before updating job
} updateJob(newJob, tx);
}
// Old FROM direction configs, new TO direction configs. List<MConfig> toConfig = newConnector.getToConfig().clone(false).getConfigs();
if (job.getConnectorId(Direction.TO) == newConnector.getPersistenceId()) { if (job.getToConnectorId() == newConnector.getPersistenceId()) {
MToConfig oldToConfig = job.getToJobConfig();
MToConfig oldToConfig = job.getToJobConfig(); MToConfig newToConfig = new MToConfig(toConfig);
MToConfig newToConfig = new MToConfig(toConfig); upgrader.upgradeToJobConfig(oldToConfig, newToConfig);
upgrader.upgrade(oldToConfig, newToConfig); MFromConfig oldFromConfig = job.getFromJobConfig();
MFromConfig oldFromConfig = job.getFromJobConfig(); // create a job with old FROM direction configs but new TO direction
MJob newJob = new MJob(job, oldFromConfig, newToConfig, job.getDriverConfig()); // configs
updateJob(newJob, tx); MJob newJob = new MJob(job, oldFromConfig, newToConfig, job.getDriverConfig());
// TODO( jarcec) : will add the job config validation logic similar
// to the link config validation before updating job
updateJob(newJob, tx);
}
} }
} }
@ -475,20 +479,20 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
throw new SqoopException(RepositoryError.JDBCREPO_0027); throw new SqoopException(RepositoryError.JDBCREPO_0027);
} }
} catch (SqoopException ex) { } catch (SqoopException ex) {
if(tx != null) { if (tx != null) {
tx.rollback(); tx.rollback();
} }
throw ex; throw ex;
} catch (Exception ex) { } catch (Exception ex) {
if(tx != null) { if (tx != null) {
tx.rollback(); tx.rollback();
} }
throw new SqoopException(RepositoryError.JDBCREPO_0000, ex); throw new SqoopException(RepositoryError.JDBCREPO_0000, ex);
} finally { } finally {
if(tx != null) { if (tx != null) {
tx.close(); tx.close();
} }
LOG.info("Metadata upgrade finished for connector: " + oldConnector.getUniqueName()); LOG.info("Connector upgrade finished: " + oldConnector.getUniqueName());
} }
} }
@ -496,31 +500,38 @@ public final void upgradeDriver(MDriver driver) {
LOG.info("Upgrading driver"); LOG.info("Upgrading driver");
RepositoryTransaction tx = null; RepositoryTransaction tx = null;
try { try {
RepositoryUpgrader upgrader = Driver.getInstance().getDriverConfigRepositoryUpgrader(); //1. find upgrader
List<MJob> jobs = findJobs(); DriverUpgrader upgrader = Driver.getInstance().getConfigurableUpgrader();
//2. find all jobs in the system
List<MJob> existingJobs = findJobs();
Validator validator = Driver.getInstance().getValidator(); Validator validator = Driver.getInstance().getValidator();
boolean upgradeSuccessful = true; boolean upgradeSuccessful = true;
// -- BEGIN TXN -- // -- BEGIN TXN --
tx = getTransaction(); tx = getTransaction();
tx.begin(); tx.begin();
deleteJobs(jobs, tx); //3. delete all jobs in the system
upgradeDriver(driver, tx); deleteJobs(existingJobs, tx);
// 4. Delete all inputs and configs associated with the driver, and
// insert the new configs and inputs for this driver
upgradeDriverConfigs(driver, tx);
for (MJob job : jobs) { for (MJob job : existingJobs) {
// Make a new copy of the configs // Make a new copy of the configs
MDriverConfig driverConfig = driver.getDriverConfig().clone(false); MDriverConfig driverConfig = driver.getDriverConfig().clone(false);
MDriver newDriver = new MDriver(driverConfig, DriverBean.CURRENT_DRIVER_VERSION); MDriver newDriver = new MDriver(driverConfig, DriverBean.CURRENT_DRIVER_VERSION);
upgrader.upgrade(job.getDriverConfig(), newDriver.getDriverConfig()); // At this point, the driver only supports JOB config type
upgrader.upgradeJobConfig(job.getDriverConfig(), newDriver.getDriverConfig());
// create a new job with old FROM and TO configs but new driver configs
MJob newJob = new MJob(job, job.getFromJobConfig(), job.getToJobConfig(), newDriver.getDriverConfig()); MJob newJob = new MJob(job, job.getFromJobConfig(), job.getToJobConfig(), newDriver.getDriverConfig());
// Transform config structures to objects for validations Object newConfigurationObject = ClassUtils.instantiate(Driver.getInstance().getDriverJobConfigurationClass());
Object newConfigurationObject = ClassUtils.instantiate(Driver.getInstance().getDriverConfigurationGroupClass());
ConfigUtils.fromConfigs(newJob.getDriverConfig().getConfigs(), newConfigurationObject); ConfigUtils.fromConfigs(newJob.getDriverConfig().getConfigs(), newConfigurationObject);
// 5. validate configs
ConfigValidator validation = validator.validateConfigForJob(newConfigurationObject); ConfigValidator validation = validator.validateConfigForJob(newConfigurationObject);
if (validation.getStatus().canProceed()) { if (validation.getStatus().canProceed()) {
// 6. update job
updateJob(newJob, tx); updateJob(newJob, tx);
} else { } else {
logInvalidModelObject("job", newJob, validation); logInvalidModelObject("job", newJob, validation);

View File

@ -26,6 +26,7 @@
import org.apache.sqoop.model.MConfig; import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MConfigList; import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MInput; import org.apache.sqoop.model.MInput;
import org.apache.sqoop.model.MIntegerInput; import org.apache.sqoop.model.MIntegerInput;
import org.apache.sqoop.model.MStringInput; import org.apache.sqoop.model.MStringInput;
@ -36,30 +37,31 @@
*/ */
public class TestDriverConfigUpgrader { public class TestDriverConfigUpgrader {
DriverConfigUpgrader upgrader; DriverUpgrader upgrader;
@Before @Before
public void initializeUpgrader() { public void initializeUpgrader() {
upgrader = new DriverConfigUpgrader(); upgrader = new DriverUpgrader();
} }
/** /**
* We take the same configs on input and output and we * We take the same configs on input and output and we expect that all values
* expect that all values will be correctly transferred. * will be correctly transferred.
*/ */
@Test @Test
public void testJobConfigTyeUpgrade() { public void testJobConfigTyeUpgrade() {
MConfigList original = job(); MDriverConfig original = job();
MConfigList target = job(); MDriverConfig target = job();
original.getStringInput("f1.s1").setValue("A"); original.getStringInput("f1.s1").setValue("A");
original.getStringInput("f1.s2").setValue("B"); original.getStringInput("f1.s2").setValue("B");
original.getIntegerInput("f1.i").setValue(3); original.getIntegerInput("f1.i").setValue(3);
upgrader.upgrade(original, target); upgrader.upgradeJobConfig(original, target);
assertEquals("A", target.getStringInput("f1.s1").getValue()); assertEquals("A", target.getStringInput("f1.s1").getValue());
assertEquals("B", target.getStringInput("f1.s2").getValue()); assertEquals("B", target.getStringInput("f1.s2").getValue());
assertEquals(3, (long)target.getIntegerInput("f1.i").getValue()); assertEquals(3, (long) target.getIntegerInput("f1.i").getValue());
} }
/** /**
@ -67,54 +69,54 @@ public void testJobConfigTyeUpgrade() {
*/ */
@Test @Test
public void testNonExistingInput() { public void testNonExistingInput() {
MConfigList original = job1(); MDriverConfig original = job1();
MConfigList target = job2(); MDriverConfig target = job2();
original.getStringInput("f1.s1").setValue("A"); original.getStringInput("f1.s1").setValue("A");
original.getStringInput("f1.s2").setValue("B"); original.getStringInput("f1.s2").setValue("B");
original.getIntegerInput("f1.i").setValue(3); original.getIntegerInput("f1.i").setValue(3);
upgrader.upgrade(original, target); upgrader.upgradeJobConfig(original, target);
assertEquals("A", target.getStringInput("f1.s1").getValue()); assertEquals("A", target.getStringInput("f1.s1").getValue());
assertNull(target.getStringInput("f1.s2_").getValue()); assertNull(target.getStringInput("f1.s2_").getValue());
assertEquals(3, (long)target.getIntegerInput("f1.i").getValue()); assertEquals(3, (long) target.getIntegerInput("f1.i").getValue());
} }
/** /**
* Upgrade scenario when entire has been added in the target and * Upgrade scenario when entire has been added in the target and therefore is
* therefore is missing in the original. * missing in the original.
*/ */
@Test @Test
public void testNonExistingConfig() { public void testNonExistingConfig() {
MConfigList original = job1(); MDriverConfig original = job1();
MConfigList target = job3(); MDriverConfig target = job3();
original.getStringInput("f1.s1").setValue("A"); original.getStringInput("f1.s1").setValue("A");
original.getStringInput("f1.s2").setValue("B"); original.getStringInput("f1.s2").setValue("B");
original.getIntegerInput("f1.i").setValue(3); original.getIntegerInput("f1.i").setValue(3);
upgrader.upgrade(original, target); upgrader.upgradeJobConfig(original, target);
assertNull(target.getStringInput("f2.s1").getValue()); assertNull(target.getStringInput("f2.s1").getValue());
assertNull(target.getStringInput("f2.s2").getValue()); assertNull(target.getStringInput("f2.s2").getValue());
assertNull(target.getIntegerInput("f2.i").getValue()); assertNull(target.getIntegerInput("f2.i").getValue());
} }
MConfigList job() { MDriverConfig job() {
return new MConfigList(configs1()); return new MDriverConfig(configs1());
} }
MConfigList job1() { MDriverConfig job1() {
return new MConfigList(configs1()); return new MDriverConfig(configs1());
} }
MConfigList job2() { MDriverConfig job2() {
return new MConfigList(configs2()); return new MDriverConfig(configs2());
} }
MConfigList job3() { MDriverConfig job3() {
return new MConfigList(configs3()); return new MDriverConfig(configs3());
} }
List<MConfig> configs1() { List<MConfig> configs1() {
@ -125,8 +127,8 @@ List<MConfig> configs1() {
List<MInput<?>> inputs1(String formName) { List<MInput<?>> inputs1(String formName) {
List<MInput<?>> list = new LinkedList<MInput<?>>(); List<MInput<?>> list = new LinkedList<MInput<?>>();
list.add(new MStringInput(formName + ".s1", false, (short)30)); list.add(new MStringInput(formName + ".s1", false, (short) 30));
list.add(new MStringInput(formName + ".s2", false, (short)30)); list.add(new MStringInput(formName + ".s2", false, (short) 30));
list.add(new MIntegerInput(formName + ".i", false)); list.add(new MIntegerInput(formName + ".i", false));
return list; return list;
} }
@ -139,8 +141,8 @@ List<MConfig> configs2() {
List<MInput<?>> inputs2(String formName) { List<MInput<?>> inputs2(String formName) {
List<MInput<?>> list = new LinkedList<MInput<?>>(); List<MInput<?>> list = new LinkedList<MInput<?>>();
list.add(new MStringInput(formName + ".s1", false, (short)30)); list.add(new MStringInput(formName + ".s1", false, (short) 30));
list.add(new MStringInput(formName + ".s2_", false, (short)30)); list.add(new MStringInput(formName + ".s2_", false, (short) 30));
list.add(new MIntegerInput(formName + ".i", false)); list.add(new MIntegerInput(formName + ".i", false));
return list; return list;
} }
@ -150,4 +152,4 @@ List<MConfig> configs3() {
list.add(new MConfig("f2", inputs1("f2"))); list.add(new MConfig("f2", inputs1("f2")));
return list; return list;
} }
} }

View File

@ -71,10 +71,10 @@ public void testCreateJobSubmission() {
@Test @Test
public void testGetConnector() { public void testGetConnector() {
when(connectorMgrMock.getConnector(123l)).thenReturn(sqoopConnectorMock); when(connectorMgrMock.getSqoopConnector(123l)).thenReturn(sqoopConnectorMock);
when(sqoopConnectorMock.getSupportedDirections()).thenReturn(getSupportedDirections()); when(sqoopConnectorMock.getSupportedDirections()).thenReturn(getSupportedDirections());
assertEquals(jobManager.getConnector(123l), sqoopConnectorMock); assertEquals(jobManager.getSqoopConnector(123l), sqoopConnectorMock);
verify(connectorMgrMock, times(1)).getConnector(123l); verify(connectorMgrMock, times(1)).getSqoopConnector(123l);
} }
@Test @Test

View File

@ -43,14 +43,14 @@
import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.RepositoryUpgrader; import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.driver.Driver; import org.apache.sqoop.driver.Driver;
import org.apache.sqoop.driver.DriverUpgrader;
import org.apache.sqoop.json.DriverBean; import org.apache.sqoop.json.DriverBean;
import org.apache.sqoop.model.ConfigUtils; import org.apache.sqoop.model.ConfigUtils;
import org.apache.sqoop.model.ConfigurationClass; import org.apache.sqoop.model.ConfigurationClass;
import org.apache.sqoop.model.MConfig; import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MDriver; import org.apache.sqoop.model.MDriver;
import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MDriverConfig;
@ -65,7 +65,6 @@
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.InOrder; import org.mockito.InOrder;
import org.mockito.Mockito;
public class TestJdbcRepository { public class TestJdbcRepository {
@ -75,7 +74,8 @@ public class TestJdbcRepository {
private Driver driverMock; private Driver driverMock;
private JdbcRepositoryHandler repoHandlerMock; private JdbcRepositoryHandler repoHandlerMock;
private Validator validatorMock; private Validator validatorMock;
private RepositoryUpgrader upgraderMock; private ConnectorConfigurableUpgrader connectorUpgraderMock;
private DriverUpgrader driverUpgraderMock;
private ConfigValidator validRepoMock; private ConfigValidator validRepoMock;
private ConfigValidator invalidRepoMock; private ConfigValidator invalidRepoMock;
@ -87,7 +87,8 @@ public void setUp() throws Exception {
driverMock = mock(Driver.class); driverMock = mock(Driver.class);
repoHandlerMock = mock(JdbcRepositoryHandler.class); repoHandlerMock = mock(JdbcRepositoryHandler.class);
validatorMock = mock(Validator.class); validatorMock = mock(Validator.class);
upgraderMock = mock(RepositoryUpgrader.class); connectorUpgraderMock = mock(ConnectorConfigurableUpgrader.class);
driverUpgraderMock = mock(DriverUpgrader.class);
repoSpy = spy(new JdbcRepository(repoHandlerMock, null)); repoSpy = spy(new JdbcRepository(repoHandlerMock, null));
// setup transaction and connector manager // setup transaction and connector manager
@ -100,8 +101,15 @@ public void setUp() throws Exception {
invalidRepoMock = mock(ConfigValidator.class); invalidRepoMock = mock(ConfigValidator.class);
when(invalidRepoMock.getStatus()).thenReturn(Status.UNACCEPTABLE); when(invalidRepoMock.getStatus()).thenReturn(Status.UNACCEPTABLE);
doNothing().when(upgraderMock).upgrade(any(MLinkConfig.class), any(MLinkConfig.class)); doNothing().when(connectorUpgraderMock).upgradeLinkConfig(any(MLinkConfig.class),
doNothing().when(upgraderMock).upgrade(any(MFromConfig.class), any(MFromConfig.class)); any(MLinkConfig.class));
doNothing().when(connectorUpgraderMock).upgradeFromJobConfig(any(MFromConfig.class),
any(MFromConfig.class));
doNothing().when(connectorUpgraderMock).upgradeToJobConfig(any(MToConfig.class),
any(MToConfig.class));
doNothing().when(driverUpgraderMock).upgradeJobConfig(any(MDriverConfig.class),
any(MDriverConfig.class));
} }
/** /**
@ -117,14 +125,14 @@ public void testConnectorConfigEnableAutoUpgrade() {
// make the upgradeConnector to throw an exception to prove that it has been called // make the upgradeConnector to throw an exception to prove that it has been called
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
"upgradeConnector() has been called."); "upgradeConnector() has been called.");
doThrow(exception).when(connectorMgrMock).getConnector(anyString()); doThrow(exception).when(connectorMgrMock).getSqoopConnector(anyString());
try { try {
repoSpy.registerConnector(newConnector, true); repoSpy.registerConnector(newConnector, true);
} catch (SqoopException ex) { } catch (SqoopException ex) {
assertEquals(ex.getMessage(), exception.getMessage()); assertEquals(ex.getMessage(), exception.getMessage());
verify(repoHandlerMock, times(1)).findConnector(anyString(), any(Connection.class)); verify(repoHandlerMock, times(1)).findConnector(anyString(), any(Connection.class));
verify(connectorMgrMock, times(1)).getConnector(anyString()); verify(connectorMgrMock, times(1)).getSqoopConnector(anyString());
verifyNoMoreInteractions(repoHandlerMock); verifyNoMoreInteractions(repoHandlerMock);
return ; return ;
} }
@ -218,28 +226,29 @@ public void testConnectorConfigUpgradeWithValidLinksAndJobs() {
when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock); when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock);
when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock); when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock);
when(sqconnector.getConfigValidator()).thenReturn(validatorMock); when(sqconnector.getConfigValidator()).thenReturn(validatorMock);
when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock); when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyLinkConfiguration.class); when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyLinkConfiguration.class);
when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn( when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(
EmptyJobConfiguration.class); EmptyJobConfiguration.class);
when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector); when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
// prepare the links and jobs // prepare the links and jobs
// the connector Id for both are the same
List<MLink> linkList = links(link(1,1), link(2,1)); List<MLink> linkList = links(link(1,1), link(2,1));
List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,2));
// mock necessary methods for upgradeConnector() procedure // mock necessary methods for upgradeConnector() procedure
doReturn(linkList).when(repoSpy).findLinksForConnector(anyLong()); doReturn(linkList).when(repoSpy).findLinksForConnector(anyLong());
doReturn(jobList).when(repoSpy).findJobsForConnector(anyLong()); doReturn(jobList).when(repoSpy).findJobsForConnector(anyLong());
doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class)); doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class));
doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class)); doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
doNothing().when(repoSpy).upgradeConnector(any(MConnector.class), any(RepositoryTransaction.class)); doNothing().when(repoSpy).upgradeConnectorConfigs(any(MConnector.class), any(RepositoryTransaction.class));
repoSpy.upgradeConnector(oldConnector, newConnector); repoSpy.upgradeConnector(oldConnector, newConnector);
InOrder repoOrder = inOrder(repoSpy); InOrder repoOrder = inOrder(repoSpy);
InOrder txOrder = inOrder(repoTransactionMock); InOrder txOrder = inOrder(repoTransactionMock);
InOrder upgraderOrder = inOrder(upgraderMock); InOrder upgraderOrder = inOrder(connectorUpgraderMock);
InOrder validatorOrder = inOrder(validatorMock); InOrder validatorOrder = inOrder(validatorMock);
repoOrder.verify(repoSpy, times(1)).findLinksForConnector(anyLong()); repoOrder.verify(repoSpy, times(1)).findLinksForConnector(anyLong());
@ -249,7 +258,7 @@ public void testConnectorConfigUpgradeWithValidLinksAndJobs() {
repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(1, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(1, repoTransactionMock);
repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(2, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(2, repoTransactionMock);
repoOrder.verify(repoSpy, times(1)).upgradeConnector(any(MConnector.class), any(RepositoryTransaction.class)); repoOrder.verify(repoSpy, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(RepositoryTransaction.class));
repoOrder.verify(repoSpy, times(2)).updateLink(any(MLink.class), any(RepositoryTransaction.class)); repoOrder.verify(repoSpy, times(2)).updateLink(any(MLink.class), any(RepositoryTransaction.class));
repoOrder.verify(repoSpy, times(4)).updateJob(any(MJob.class), any(RepositoryTransaction.class)); repoOrder.verify(repoSpy, times(4)).updateJob(any(MJob.class), any(RepositoryTransaction.class));
repoOrder.verifyNoMoreInteractions(); repoOrder.verifyNoMoreInteractions();
@ -257,8 +266,11 @@ public void testConnectorConfigUpgradeWithValidLinksAndJobs() {
txOrder.verify(repoTransactionMock, times(1)).commit(); txOrder.verify(repoTransactionMock, times(1)).commit();
txOrder.verify(repoTransactionMock, times(1)).close(); txOrder.verify(repoTransactionMock, times(1)).close();
txOrder.verifyNoMoreInteractions(); txOrder.verifyNoMoreInteractions();
upgraderOrder.verify(upgraderMock, times(2)).upgrade(any(MLinkConfig.class), any(MLinkConfig.class)); upgraderOrder.verify(connectorUpgraderMock, times(2)).upgradeLinkConfig(any(MLinkConfig.class), any(MLinkConfig.class));
upgraderOrder.verify(upgraderMock, times(4)).upgrade(any(MFromConfig.class), any(MFromConfig.class)); upgraderOrder.verify(connectorUpgraderMock, times(1)).upgradeFromJobConfig(any(MFromConfig.class), any(MFromConfig.class));
upgraderOrder.verify(connectorUpgraderMock, times(1)).upgradeToJobConfig(any(MToConfig.class), any(MToConfig.class));
upgraderOrder.verify(connectorUpgraderMock, times(1)).upgradeFromJobConfig(any(MFromConfig.class), any(MFromConfig.class));
upgraderOrder.verify(connectorUpgraderMock, times(1)).upgradeToJobConfig(any(MToConfig.class), any(MToConfig.class));
upgraderOrder.verifyNoMoreInteractions(); upgraderOrder.verifyNoMoreInteractions();
validatorOrder.verify(validatorMock, times(2)).validateConfigForLink(anyObject()); validatorOrder.verify(validatorMock, times(2)).validateConfigForLink(anyObject());
// @TODO(Abe): Re-enable job validation? // @TODO(Abe): Re-enable job validation?
@ -277,34 +289,34 @@ public void testDriverConfigUpgradeWithValidJobs() {
when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock); when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock);
when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock); when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock);
when(driverMock.getValidator()).thenReturn(validatorMock); when(driverMock.getValidator()).thenReturn(validatorMock);
when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock); when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock);
when(driverMock.getDriverConfigurationGroupClass()).thenReturn(EmptyJobConfiguration.class); when(driverMock.getDriverJobConfigurationClass()).thenReturn(EmptyJobConfiguration.class);
List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
doReturn(jobList).when(repoSpy).findJobs(); doReturn(jobList).when(repoSpy).findJobs();
doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class)); doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class));
doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class)); doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
doNothing().when(repoSpy).upgradeDriver(any(MDriver.class), any(RepositoryTransaction.class)); doNothing().when(repoSpy).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
repoSpy.upgradeDriver(newDriverConfig); repoSpy.upgradeDriver(newDriverConfig);
InOrder repoOrder = inOrder(repoSpy); InOrder repoOrder = inOrder(repoSpy);
InOrder txOrder = inOrder(repoTransactionMock); InOrder txOrder = inOrder(repoTransactionMock);
InOrder upgraderOrder = inOrder(upgraderMock); InOrder upgraderOrder = inOrder(driverUpgraderMock);
InOrder validatorOrder = inOrder(validatorMock); InOrder validatorOrder = inOrder(validatorMock);
repoOrder.verify(repoSpy, times(1)).findJobs(); repoOrder.verify(repoSpy, times(1)).findJobs();
repoOrder.verify(repoSpy, times(1)).getTransaction(); repoOrder.verify(repoSpy, times(1)).getTransaction();
repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock);
repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
repoOrder.verify(repoSpy, times(1)).upgradeDriver(any(MDriver.class), any(RepositoryTransaction.class)); repoOrder.verify(repoSpy, times(1)).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
repoOrder.verify(repoSpy, times(2)).updateJob(any(MJob.class), any(RepositoryTransaction.class)); repoOrder.verify(repoSpy, times(2)).updateJob(any(MJob.class), any(RepositoryTransaction.class));
repoOrder.verifyNoMoreInteractions(); repoOrder.verifyNoMoreInteractions();
txOrder.verify(repoTransactionMock, times(1)).begin(); txOrder.verify(repoTransactionMock, times(1)).begin();
txOrder.verify(repoTransactionMock, times(1)).commit(); txOrder.verify(repoTransactionMock, times(1)).commit();
txOrder.verify(repoTransactionMock, times(1)).close(); txOrder.verify(repoTransactionMock, times(1)).close();
txOrder.verifyNoMoreInteractions(); txOrder.verifyNoMoreInteractions();
upgraderOrder.verify(upgraderMock, times(2)).upgrade(any(MConfigList.class), any(MConfigList.class)); upgraderOrder.verify(driverUpgraderMock, times(2)).upgradeJobConfig(any(MDriverConfig.class), any(MDriverConfig.class));
upgraderOrder.verifyNoMoreInteractions(); upgraderOrder.verifyNoMoreInteractions();
validatorOrder.verify(validatorMock, times(2)).validateConfigForJob(anyObject()); validatorOrder.verify(validatorMock, times(2)).validateConfigForJob(anyObject());
validatorOrder.verifyNoMoreInteractions(); validatorOrder.verifyNoMoreInteractions();
@ -321,13 +333,13 @@ public void testDriverConfigUpgradeWithInvalidJobs() {
when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(invalidRepoMock); when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(invalidRepoMock);
when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(invalidRepoMock); when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(invalidRepoMock);
when(driverMock.getValidator()).thenReturn(validatorMock); when(driverMock.getValidator()).thenReturn(validatorMock);
when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock); when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock);
when(driverMock.getDriverConfigurationGroupClass()).thenReturn(EmptyJobConfiguration.class); when(driverMock.getDriverJobConfigurationClass()).thenReturn(EmptyJobConfiguration.class);
List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
doReturn(jobList).when(repoSpy).findJobs(); doReturn(jobList).when(repoSpy).findJobs();
doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class)); doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
doNothing().when(repoSpy).upgradeDriver(any(MDriver.class), any(RepositoryTransaction.class)); doNothing().when(repoSpy).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
try { try {
repoSpy.upgradeDriver(newDriverConfig); repoSpy.upgradeDriver(newDriverConfig);
@ -336,20 +348,20 @@ public void testDriverConfigUpgradeWithInvalidJobs() {
InOrder repoOrder = inOrder(repoSpy); InOrder repoOrder = inOrder(repoSpy);
InOrder txOrder = inOrder(repoTransactionMock); InOrder txOrder = inOrder(repoTransactionMock);
InOrder upgraderOrder = inOrder(upgraderMock); InOrder upgraderOrder = inOrder(driverUpgraderMock);
InOrder validatorOrder = inOrder(validatorMock); InOrder validatorOrder = inOrder(validatorMock);
repoOrder.verify(repoSpy, times(1)).findJobs(); repoOrder.verify(repoSpy, times(1)).findJobs();
repoOrder.verify(repoSpy, times(1)).getTransaction(); repoOrder.verify(repoSpy, times(1)).getTransaction();
repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock);
repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
repoOrder.verify(repoSpy, times(1)).upgradeDriver(any(MDriver.class), any(RepositoryTransaction.class)); repoOrder.verify(repoSpy, times(1)).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
repoOrder.verifyNoMoreInteractions(); repoOrder.verifyNoMoreInteractions();
txOrder.verify(repoTransactionMock, times(1)).begin(); txOrder.verify(repoTransactionMock, times(1)).begin();
txOrder.verify(repoTransactionMock, times(1)).rollback(); txOrder.verify(repoTransactionMock, times(1)).rollback();
txOrder.verify(repoTransactionMock, times(1)).close(); txOrder.verify(repoTransactionMock, times(1)).close();
txOrder.verifyNoMoreInteractions(); txOrder.verifyNoMoreInteractions();
upgraderOrder.verify(upgraderMock, times(2)).upgrade(any(MConfigList.class), any(MConfigList.class)); upgraderOrder.verify(driverUpgraderMock, times(2)).upgradeJobConfig(any(MDriverConfig.class), any(MDriverConfig.class));
upgraderOrder.verifyNoMoreInteractions(); upgraderOrder.verifyNoMoreInteractions();
// driver configs are per job. // driver configs are per job.
validatorOrder.verify(validatorMock, times(2)).validateConfigForJob(anyObject()); validatorOrder.verify(validatorMock, times(2)).validateConfigForJob(anyObject());
@ -371,8 +383,8 @@ public void testConnectorConfigUpgradeHandlerWithFindLinksForConnectorError() {
SqoopConnector sqconnector = mock(SqoopConnector.class); SqoopConnector sqconnector = mock(SqoopConnector.class);
when(sqconnector.getConfigValidator()).thenReturn(validatorMock); when(sqconnector.getConfigValidator()).thenReturn(validatorMock);
when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock); when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector); when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
"find links for connector error."); "find links for connector error.");
@ -401,8 +413,8 @@ public void testConnectorConfigUpgradeHandlerWithFindJobsForConnectorError() {
SqoopConnector sqconnector = mock(SqoopConnector.class); SqoopConnector sqconnector = mock(SqoopConnector.class);
when(sqconnector.getConfigValidator()).thenReturn(validatorMock); when(sqconnector.getConfigValidator()).thenReturn(validatorMock);
when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock); when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector); when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
List<MLink> linkList = links(link(1,1), link(2,1)); List<MLink> linkList = links(link(1,1), link(2,1));
doReturn(linkList).when(repoHandlerMock).findLinksForConnector(anyLong(), any(Connection.class)); doReturn(linkList).when(repoHandlerMock).findLinksForConnector(anyLong(), any(Connection.class));
@ -435,8 +447,8 @@ public void testConnectorConfigUpgradeHandlerWithDeleteJobInputsError() {
SqoopConnector sqconnector = mock(SqoopConnector.class); SqoopConnector sqconnector = mock(SqoopConnector.class);
when(sqconnector.getConfigValidator()).thenReturn(validatorMock); when(sqconnector.getConfigValidator()).thenReturn(validatorMock);
when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock); when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector); when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
List<MLink> linkList = links(link(1,1), link(2,1)); List<MLink> linkList = links(link(1,1), link(2,1));
List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
@ -472,8 +484,8 @@ public void testConnectorConfigUpgradeHandlerWithDeleteLinkInputsError() {
SqoopConnector sqconnector = mock(SqoopConnector.class); SqoopConnector sqconnector = mock(SqoopConnector.class);
when(sqconnector.getConfigValidator()).thenReturn(validatorMock); when(sqconnector.getConfigValidator()).thenReturn(validatorMock);
when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock); when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector); when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
List<MLink> linkList = links(link(1,1), link(2,1)); List<MLink> linkList = links(link(1,1), link(2,1));
List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
@ -511,8 +523,8 @@ public void testConnectorConfigUpgradeHandlerWithUpdateConnectorError() {
SqoopConnector sqconnector = mock(SqoopConnector.class); SqoopConnector sqconnector = mock(SqoopConnector.class);
when(sqconnector.getConfigValidator()).thenReturn(validatorMock); when(sqconnector.getConfigValidator()).thenReturn(validatorMock);
when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock); when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector); when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
List<MLink> linkList = links(link(1,1), link(2,1)); List<MLink> linkList = links(link(1,1), link(2,1));
List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
@ -523,7 +535,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateConnectorError() {
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
"update connector error."); "update connector error.");
doThrow(exception).when(repoHandlerMock).upgradeConnector(any(MConnector.class), any(Connection.class)); doThrow(exception).when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
try { try {
repoSpy.upgradeConnector(oldConnector, newConnector); repoSpy.upgradeConnector(oldConnector, newConnector);
@ -533,7 +545,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateConnectorError() {
verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(1)).upgradeConnector(any(MConnector.class), any(Connection.class)); verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
verifyNoMoreInteractions(repoHandlerMock); verifyNoMoreInteractions(repoHandlerMock);
return ; return ;
} }
@ -554,10 +566,10 @@ public void testConnectorConfigUpgradeHandlerWithUpdateLinkError() {
when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock); when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock);
when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock); when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock);
when(sqconnector.getConfigValidator()).thenReturn(validatorMock); when(sqconnector.getConfigValidator()).thenReturn(validatorMock);
when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock); when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyLinkConfiguration.class); when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyLinkConfiguration.class);
when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(EmptyJobConfiguration.class); when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(EmptyJobConfiguration.class);
when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector); when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
List<MLink> linkList = links(link(1,1), link(2,1)); List<MLink> linkList = links(link(1,1), link(2,1));
List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
@ -565,7 +577,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateLinkError() {
doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class)); doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class));
doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class)); doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class));
doNothing().when(repoHandlerMock).upgradeConnector(any(MConnector.class), any(Connection.class)); doNothing().when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class)); doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class));
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
@ -580,7 +592,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateLinkError() {
verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(1)).upgradeConnector(any(MConnector.class), any(Connection.class)); verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
verify(repoHandlerMock, times(1)).existsLink(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(1)).existsLink(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(1)).updateLink(any(MLink.class), any(Connection.class)); verify(repoHandlerMock, times(1)).updateLink(any(MLink.class), any(Connection.class));
verifyNoMoreInteractions(repoHandlerMock); verifyNoMoreInteractions(repoHandlerMock);
@ -603,10 +615,10 @@ public void testConnectorConfigUpgradeHandlerWithUpdateJobError() {
when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock); when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock);
when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock); when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock);
when(sqconnector.getConfigValidator()).thenReturn(validatorMock); when(sqconnector.getConfigValidator()).thenReturn(validatorMock);
when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock); when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyLinkConfiguration.class); when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyLinkConfiguration.class);
when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(EmptyJobConfiguration.class); when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(EmptyJobConfiguration.class);
when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector); when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
List<MLink> linkList = links(link(1,1), link(2,1)); List<MLink> linkList = links(link(1,1), link(2,1));
List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
@ -614,7 +626,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateJobError() {
doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class)); doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class));
doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class)); doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class));
doNothing().when(repoHandlerMock).upgradeConnector(any(MConnector.class), any(Connection.class)); doNothing().when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
doNothing().when(repoHandlerMock).updateLink(any(MLink.class), any(Connection.class)); doNothing().when(repoHandlerMock).updateLink(any(MLink.class), any(Connection.class));
doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class)); doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class));
doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class)); doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class));
@ -631,7 +643,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateJobError() {
verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(1)).upgradeConnector(any(MConnector.class), any(Connection.class)); verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
verify(repoHandlerMock, times(2)).existsLink(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).existsLink(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).updateLink(any(MLink.class), any(Connection.class)); verify(repoHandlerMock, times(2)).updateLink(any(MLink.class), any(Connection.class));
verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class));
@ -652,7 +664,7 @@ public void testDriverConfigUpgradeHandlerWithFindJobsError() {
MDriver newDriverConfig = driver(); MDriver newDriverConfig = driver();
when(driverMock.getValidator()).thenReturn(validatorMock); when(driverMock.getValidator()).thenReturn(validatorMock);
when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock); when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock);
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
"find jobs error."); "find jobs error.");
@ -679,7 +691,7 @@ public void testDriverConfigUpgradeHandlerWithDeleteJobInputsError() {
MDriver newDriverConfig = driver(); MDriver newDriverConfig = driver();
when(driverMock.getValidator()).thenReturn(validatorMock); when(driverMock.getValidator()).thenReturn(validatorMock);
when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock); when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock);
List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class)); doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
@ -710,7 +722,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateDriverConfigError() {
MDriver newDriverConfig = driver(); MDriver newDriverConfig = driver();
when(driverMock.getValidator()).thenReturn(validatorMock); when(driverMock.getValidator()).thenReturn(validatorMock);
when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock); when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock);
List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class)); doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
@ -719,7 +731,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateDriverConfigError() {
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
"update driverConfig entity error."); "update driverConfig entity error.");
doThrow(exception).when(repoHandlerMock).upgradeDriver(any(MDriver.class), any(Connection.class)); doThrow(exception).when(repoHandlerMock).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
try { try {
repoSpy.upgradeDriver(newDriverConfig); repoSpy.upgradeDriver(newDriverConfig);
@ -727,7 +739,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateDriverConfigError() {
assertEquals(ex.getMessage(), exception.getMessage()); assertEquals(ex.getMessage(), exception.getMessage());
verify(repoHandlerMock, times(1)).findJobs(any(Connection.class)); verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(1)).upgradeDriver(any(MDriver.class), any(Connection.class)); verify(repoHandlerMock, times(1)).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
verifyNoMoreInteractions(repoHandlerMock); verifyNoMoreInteractions(repoHandlerMock);
return ; return ;
} }
@ -747,12 +759,12 @@ public void testDriverConfigUpgradeHandlerWithUpdateJobError() {
when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock); when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock);
when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock); when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock);
when(driverMock.getValidator()).thenReturn(validatorMock); when(driverMock.getValidator()).thenReturn(validatorMock);
when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock); when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock);
when(driverMock.getDriverConfigurationGroupClass()).thenReturn(EmptyJobConfiguration.class); when(driverMock.getDriverJobConfigurationClass()).thenReturn(EmptyJobConfiguration.class);
List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class)); doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
doNothing().when(repoHandlerMock).upgradeDriver(any(MDriver.class), any(Connection.class)); doNothing().when(repoHandlerMock).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class)); doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class));
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
@ -765,7 +777,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateJobError() {
assertEquals(ex.getMessage(), exception.getMessage()); assertEquals(ex.getMessage(), exception.getMessage());
verify(repoHandlerMock, times(1)).findJobs(any(Connection.class)); verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(1)).upgradeDriver(any(MDriver.class), any(Connection.class)); verify(repoHandlerMock, times(1)).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(1)).updateJob(any(MJob.class), any(Connection.class)); verify(repoHandlerMock, times(1)).updateJob(any(MJob.class), any(Connection.class));
verifyNoMoreInteractions(repoHandlerMock); verifyNoMoreInteractions(repoHandlerMock);

View File

@ -138,7 +138,7 @@ private void insertConfigsForDriver(MDriver mDriver, Connection conn) {
* repository. The job and connector configs within <code>mc</code> will get * repository. The job and connector configs within <code>mc</code> will get
* updated with the id of the configs when this function returns. * updated with the id of the configs when this function returns.
* @param mc The connector to use for updating configs * @param mc The connector to use for updating configs
* @param conn JDBC link to use for updating the configs * @param conn JDBC connection to use for inserting the configs
*/ */
private void insertConfigsForConnector (MConnector mc, Connection conn) { private void insertConfigsForConnector (MConnector mc, Connection conn) {
long connectorId = mc.getPersistenceId(); long connectorId = mc.getPersistenceId();
@ -151,17 +151,18 @@ private void insertConfigsForConnector (MConnector mc, Connection conn) {
baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE, baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
Statement.RETURN_GENERATED_KEYS); Statement.RETURN_GENERATED_KEYS);
// Register link type config // Register link type config for connector
// NOTE: The direction is null for LINK type
registerConfigs(connectorId, null, mc.getLinkConfig().getConfigs(), registerConfigs(connectorId, null, mc.getLinkConfig().getConfigs(),
MConfigType.LINK.name(), baseConfigStmt, baseInputStmt, conn); MConfigType.LINK.name(), baseConfigStmt, baseInputStmt, conn);
// Register both from/to job type config // Register both from/to job type config for connector
if (mc.getSupportedDirections().isDirectionSupported(Direction.FROM)) { if (mc.getSupportedDirections().isDirectionSupported(Direction.FROM)) {
registerConfigs(connectorId, Direction.FROM, mc.getConfig(Direction.FROM).getConfigs(), registerConfigs(connectorId, Direction.FROM, mc.getFromConfig().getConfigs(),
MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn); MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
} }
if (mc.getSupportedDirections().isDirectionSupported(Direction.TO)) { if (mc.getSupportedDirections().isDirectionSupported(Direction.TO)) {
registerConfigs(connectorId, Direction.TO, mc.getConfig(Direction.TO).getConfigs(), registerConfigs(connectorId, Direction.TO, mc.getToConfig().getConfigs(),
MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn); MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
} }
} catch (SQLException ex) { } catch (SQLException ex) {
@ -752,8 +753,8 @@ protected long registerHdfsConnector(Connection conn) {
for (URL url : connectorConfigs) { for (URL url : connectorConfigs) {
handler = new ConnectorHandler(url); handler = new ConnectorHandler(url);
if (handler.getMetadata().getPersistenceId() != -1) { if (handler.getConnectorConfigurable().getPersistenceId() != -1) {
return handler.getMetadata().getPersistenceId(); return handler.getConnectorConfigurable().getPersistenceId();
} }
if (handler.getUniqueName().equals(CONNECTOR_HDFS)) { if (handler.getUniqueName().equals(CONNECTOR_HDFS)) {
@ -761,8 +762,8 @@ protected long registerHdfsConnector(Connection conn) {
PreparedStatement baseConnectorStmt = conn.prepareStatement( PreparedStatement baseConnectorStmt = conn.prepareStatement(
STMT_INSERT_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS, STMT_INSERT_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS,
Statement.RETURN_GENERATED_KEYS); Statement.RETURN_GENERATED_KEYS);
baseConnectorStmt.setString(1, handler.getMetadata().getUniqueName()); baseConnectorStmt.setString(1, handler.getConnectorConfigurable().getUniqueName());
baseConnectorStmt.setString(2, handler.getMetadata().getClassName()); baseConnectorStmt.setString(2, handler.getConnectorConfigurable().getClassName());
baseConnectorStmt.setString(3, "0"); baseConnectorStmt.setString(3, "0");
if (baseConnectorStmt.executeUpdate() == 1) { if (baseConnectorStmt.executeUpdate() == 1) {
ResultSet rsetConnectorId = baseConnectorStmt.getGeneratedKeys(); ResultSet rsetConnectorId = baseConnectorStmt.getGeneratedKeys();
@ -1229,9 +1230,7 @@ public List<MLink> findLinksForConnector(long connectorID, Connection conn) {
try { try {
stmt = conn.prepareStatement(STMT_SELECT_LINK_FOR_CONNECTOR); stmt = conn.prepareStatement(STMT_SELECT_LINK_FOR_CONNECTOR);
stmt.setLong(1, connectorID); stmt.setLong(1, connectorID);
return loadLinks(stmt, conn); return loadLinks(stmt, conn);
} catch (SQLException ex) { } catch (SQLException ex) {
logException(ex, connectorID); logException(ex, connectorID);
throw new SqoopException(DerbyRepoError.DERBYREPO_0023, ex); throw new SqoopException(DerbyRepoError.DERBYREPO_0023, ex);
@ -1244,7 +1243,12 @@ public List<MLink> findLinksForConnector(long connectorID, Connection conn) {
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public void upgradeConnector(MConnector mConnector, Connection conn) { public void upgradeConnectorConfigs(MConnector mConnector, Connection conn) {
updateConnectorAndDeleteConfigs(mConnector, conn);
insertConfigsForConnector(mConnector, conn);
}
private void updateConnectorAndDeleteConfigs(MConnector mConnector, Connection conn) {
PreparedStatement updateConnectorStatement = null; PreparedStatement updateConnectorStatement = null;
PreparedStatement deleteConfig = null; PreparedStatement deleteConfig = null;
PreparedStatement deleteInput = null; PreparedStatement deleteInput = null;
@ -1271,15 +1275,19 @@ public void upgradeConnector(MConnector mConnector, Connection conn) {
} finally { } finally {
closeStatements(updateConnectorStatement, deleteConfig, deleteInput); closeStatements(updateConnectorStatement, deleteConfig, deleteInput);
} }
insertConfigsForConnector(mConnector, conn);
} }
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public void upgradeDriver(MDriver mDriver, Connection conn) { public void upgradeDriverConfigs(MDriver mDriver, Connection conn) {
updateDriverAndDeleteConfigs(mDriver, conn);
createOrUpdateDriverSystemVersion(conn, mDriver.getVersion());
insertConfigsForDriver(mDriver, conn);
}
private void updateDriverAndDeleteConfigs(MDriver mDriver, Connection conn) {
PreparedStatement deleteConfig = null; PreparedStatement deleteConfig = null;
PreparedStatement deleteInput = null; PreparedStatement deleteInput = null;
try { try {
@ -1295,8 +1303,6 @@ public void upgradeDriver(MDriver mDriver, Connection conn) {
} finally { } finally {
closeStatements(deleteConfig, deleteInput); closeStatements(deleteConfig, deleteInput);
} }
createOrUpdateDriverSystemVersion(conn, mDriver.getVersion());
insertConfigsForDriver(mDriver, conn);
} }
/** /**

View File

@ -125,7 +125,7 @@ public void testDriverVersion() throws Exception {
+ DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION + "'"); + DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION + "'");
assertEquals(lowerVersion, getDriverVersion()); assertEquals(lowerVersion, getDriverVersion());
handler.upgradeDriver(driver, getDerbyDatabaseConnection()); handler.upgradeDriverConfigs(driver, getDerbyDatabaseConnection());
assertEquals(CURRENT_DRIVER_VERSION, driver.getVersion()); assertEquals(CURRENT_DRIVER_VERSION, driver.getVersion());

View File

@ -70,7 +70,7 @@ public JsonBean handleEvent(RequestContext ctx) {
LOG.info("ConnectorRequestHandler handles cid: " + cid); LOG.info("ConnectorRequestHandler handles cid: " + cid);
if (cid.equals("all")) { if (cid.equals("all")) {
// display all connectors // display all connectors
connectors = ConnectorManager.getInstance().getConnectorsMetadata(); connectors = ConnectorManager.getInstance().getConnectorConfigurables();
bundles = ConnectorManager.getInstance().getResourceBundles(locale); bundles = ConnectorManager.getInstance().getResourceBundles(locale);
AuditLoggerManager.getInstance() AuditLoggerManager.getInstance()
@ -87,7 +87,7 @@ public JsonBean handleEvent(RequestContext ctx) {
connectors = new LinkedList<MConnector>(); connectors = new LinkedList<MConnector>();
bundles = new HashMap<Long, ResourceBundle>(); bundles = new HashMap<Long, ResourceBundle>();
connectors.add(ConnectorManager.getInstance().getConnectorConfig(id)); connectors.add(ConnectorManager.getInstance().getConnectorConfigurable(id));
bundles.put(id, ConnectorManager.getInstance().getResourceBundle(id, locale)); bundles.put(id, ConnectorManager.getInstance().getResourceBundle(id, locale));
AuditLoggerManager.getInstance() AuditLoggerManager.getInstance()

View File

@ -164,10 +164,10 @@ private JsonBean createUpdateJob(RequestContext ctx, boolean update) {
// Verify that user is not trying to spoof us // Verify that user is not trying to spoof us
MFromConfig fromConfig = ConnectorManager.getInstance() MFromConfig fromConfig = ConnectorManager.getInstance()
.getConnectorConfig(job.getConnectorId(Direction.FROM)) .getConnectorConfigurable(job.getConnectorId(Direction.FROM))
.getFromConfig(); .getFromConfig();
MToConfig toConfig = ConnectorManager.getInstance() MToConfig toConfig = ConnectorManager.getInstance()
.getConnectorConfig(job.getConnectorId(Direction.TO)) .getConnectorConfigurable(job.getConnectorId(Direction.TO))
.getToConfig(); .getToConfig();
MDriverConfig driverConfig = Driver.getInstance().getDriver().getDriverConfig(); MDriverConfig driverConfig = Driver.getInstance().getDriver().getDriverConfig();
@ -179,8 +179,8 @@ private JsonBean createUpdateJob(RequestContext ctx, boolean update) {
} }
// Corresponding connectors for this // Corresponding connectors for this
SqoopConnector fromConnector = ConnectorManager.getInstance().getConnector(job.getConnectorId(Direction.FROM)); SqoopConnector fromConnector = ConnectorManager.getInstance().getSqoopConnector(job.getConnectorId(Direction.FROM));
SqoopConnector toConnector = ConnectorManager.getInstance().getConnector(job.getConnectorId(Direction.TO)); SqoopConnector toConnector = ConnectorManager.getInstance().getSqoopConnector(job.getConnectorId(Direction.TO));
if (!fromConnector.getSupportedDirections().contains(Direction.FROM)) { if (!fromConnector.getSupportedDirections().contains(Direction.FROM)) {
throw new SqoopException(ServerError.SERVER_0004, "Connector " + fromConnector.getClass().getCanonicalName() throw new SqoopException(ServerError.SERVER_0004, "Connector " + fromConnector.getClass().getCanonicalName()
@ -196,7 +196,7 @@ private JsonBean createUpdateJob(RequestContext ctx, boolean update) {
Object fromConfigObject = ClassUtils.instantiate(fromConnector.getJobConfigurationClass(Direction.FROM)); Object fromConfigObject = ClassUtils.instantiate(fromConnector.getJobConfigurationClass(Direction.FROM));
Object toConfigObject = ClassUtils.instantiate(toConnector.getJobConfigurationClass(Direction.TO)); Object toConfigObject = ClassUtils.instantiate(toConnector.getJobConfigurationClass(Direction.TO));
Object driverConfigObject = ClassUtils.instantiate(Driver.getInstance().getDriverConfigurationGroupClass()); Object driverConfigObject = ClassUtils.instantiate(Driver.getInstance().getDriverJobConfigurationClass());
ConfigUtils.fromConfigs(job.getJobConfig(Direction.FROM).getConfigs(), fromConfigObject); ConfigUtils.fromConfigs(job.getJobConfig(Direction.FROM).getConfigs(), fromConfigObject);
ConfigUtils.fromConfigs(job.getJobConfig(Direction.TO).getConfigs(), toConfigObject); ConfigUtils.fromConfigs(job.getJobConfig(Direction.TO).getConfigs(), toConfigObject);

View File

@ -158,7 +158,7 @@ private JsonBean createUpdateLink(RequestContext ctx, boolean update) {
// Verify that user is not trying to spoof us // Verify that user is not trying to spoof us
MLinkConfig linkConfig = MLinkConfig linkConfig =
ConnectorManager.getInstance().getConnectorConfig(link.getConnectorId()) ConnectorManager.getInstance().getConnectorConfigurable(link.getConnectorId())
.getLinkConfig(); .getLinkConfig();
if(!linkConfig.equals(link.getConnectorLinkConfig())) { if(!linkConfig.equals(link.getConnectorLinkConfig())) {
throw new SqoopException(ServerError.SERVER_0003, throw new SqoopException(ServerError.SERVER_0003,
@ -166,7 +166,7 @@ private JsonBean createUpdateLink(RequestContext ctx, boolean update) {
} }
// Responsible connector for this session // Responsible connector for this session
SqoopConnector connector = ConnectorManager.getInstance().getConnector(link.getConnectorId()); SqoopConnector connector = ConnectorManager.getInstance().getSqoopConnector(link.getConnectorId());
// We need translate configs // We need translate configs
Object connectorLinkConfig = ClassUtils.instantiate(connector.getLinkConfigurationClass()); Object connectorLinkConfig = ClassUtils.instantiate(connector.getLinkConfigurationClass());

View File

@ -1,4 +1,4 @@
/* /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -7,16 +7,15 @@
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, * Unless required by applicable law or agreed to in writing, software
* software distributed under the License is distributed on an * distributed under the License is distributed on an "AS IS" BASIS,
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* KIND, either express or implied. See the License for the * See the License for the specific language governing permissions and
* specific language governing permissions and limitations * limitations under the License.
* under the License.
*/ */
package org.apache.sqoop.connector.hdfs; package org.apache.sqoop.configurable;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -24,37 +23,19 @@
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.spi.RepositoryUpgrader;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MConfig; import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MInput; import org.apache.sqoop.model.MInput;
import org.apache.sqoop.model.MLinkConfig;
public class HdfsConfigUpgrader extends RepositoryUpgrader { public class ConfigurableUpgradeUtil {
private static final Logger LOG = Logger.getLogger(HdfsConfigUpgrader.class);
private static final Logger LOG = Logger.getLogger(ConfigurableUpgradeUtil.class);
/* /*
* For now, there is no real upgrade. So copy all data over, * For now, there is no real upgrade. So copy all data over,
* set the validation messages and error messages to be the same as for the * set the validation messages and error messages to be the same as for the
* inputs in the original one. * inputs in the original one.
*/ */
@Override
public void upgrade(MLinkConfig original, MLinkConfig upgradeTarget) {
doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
}
@Override
public void upgrade(MConfigList original, MConfigList upgradeTarget) {
doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void doUpgrade(List<MConfig> original, List<MConfig> target) { public static void doUpgrade(List<MConfig> original, List<MConfig> target) {
// Easier to find the config in the original list if we use a map.
// Since the constructor takes a list,
// index is not guaranteed to be the same, so we need to look for
// equivalence
Map<String, MConfig> configMap = new HashMap<String, MConfig>(); Map<String, MConfig> configMap = new HashMap<String, MConfig>();
for (MConfig config : original) { for (MConfig config : original) {
configMap.put(config.getName(), config); configMap.put(config.getName(), config);
@ -64,7 +45,7 @@ private void doUpgrade(List<MConfig> original, List<MConfig> target) {
MConfig originalConfig = configMap.get(config.getName()); MConfig originalConfig = configMap.get(config.getName());
if (originalConfig == null) { if (originalConfig == null) {
LOG.warn("Config: '" + config.getName() + "' not present in old " + LOG.warn("Config: '" + config.getName() + "' not present in old " +
"connector. So it and its inputs will not be transferred by the upgrader."); "configurable. So it and its inputs will not be transferred by the upgrader.");
continue; continue;
} }
for (MInput input : inputs) { for (MInput input : inputs) {
@ -73,7 +54,7 @@ private void doUpgrade(List<MConfig> original, List<MConfig> target) {
input.setValue(originalInput.getValue()); input.setValue(originalInput.getValue());
} catch (SqoopException ex) { } catch (SqoopException ex) {
LOG.warn("Input: '" + input.getName() + "' not present in old " + LOG.warn("Input: '" + input.getName() + "' not present in old " +
"connector. So it will not be transferred by the upgrader."); "configurable. So it will not be transferred by the upgrader.");
} }
} }
} }

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector;
import org.apache.sqoop.common.ErrorCode;
public enum ConfigurableError implements ErrorCode {
/** An unknown error has occurred. */
CONFIGURABLE_0001("Link object upgrade called, but no upgrade routine provided for LINK config"),
CONFIGURABLE_0002("Job object upgrade called, but no upgrade routine provided for FROM job config"),
CONFIGURABLE_0003("Job object upgrade called, but no upgrade routine provided for TO job config"),
;
private final String message;
private ConfigurableError(String message) {
this.message = message;
}
public String getCode() {
return name();
}
public String getMessage() {
return message;
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.connector.spi;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConfigurableError;
import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MLinkConfig;
import org.apache.sqoop.model.MToConfig;
/**
* Configurable represents an entity that can provide configurations for the
* support config types {@linkplain ConfigType}
* This api represents the interface that configurable such as the connector/driver
* will implement to upgrade both the config and its corresponding data across different
* versions
*
*/
public abstract class ConnectorConfigurableUpgrader {
/**
* Upgrade the original link config for the given config type and fill into the upgradeTarget. Note
* that any data already in {@code upgradeTarget} maybe overwritten.
* @param original - original config as in the repository
* @param upgradeTarget - the instance that will be filled in with the
* upgraded config
*/
public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) {
// The reasoning for throwing an exception by default is as follows.
// Sqoop calls the upgrade apis for every connector if and only if the
// corresponding link object that the config is associated with exists in the sqoop
// repository. In unexpected scenarios, if a link object is created in the
// sqoop repository without a corresponding upgrade routine for
// the link config, then this exception will be thrown to indicate a
// unexpected code path. In normal circumstances this
// scenario of having a link object for a connector without link config is
// very unlikely to happen. A likely scenario is that a connector will not have a link config and hence
// no link object will be created and thus this method will not be invoked.
throw new SqoopException(ConfigurableError.CONFIGURABLE_0001);
}
/**
* Upgrade the original FROM job config for the given config type and fill into the upgradeTarget. Note
* that any data already in {@code upgradeTarget} maybe overwritten.
* @param original - original config as in the repository
* @param upgradeTarget - the instance that will be filled in with the
* upgraded config
*/
public void upgradeFromJobConfig(MFromConfig original, MFromConfig upgradeTarget) {
// see above for the reasoning behind the exception
throw new SqoopException(ConfigurableError.CONFIGURABLE_0002);
}
/**
* Upgrade the original TO job config for the given config type and fill into the upgradeTarget. Note
* that any data already in {@code upgradeTarget} maybe overwritten.
* @param original - original config as in the repository
* @param upgradeTarget - the instance that will be filled in with the
* upgraded config
*/
public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) {
// see above for the reasoning behind the exception
throw new SqoopException(ConfigurableError.CONFIGURABLE_0003);
}
}

View File

@ -1,51 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.connector.spi;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MLinkConfig;
/**
* Repository represents the sqoop entity store. Sqoop entities include
* the connectors, links, jobs and submissions and corresponding configs.
*
*/
public abstract class RepositoryUpgrader {
/**
* Upgrade the original link config and fill into the upgradeTarget. Note
* that any data already in {@code upgradeTarget} maybe overwritten.
* @param original - original link config as in the repository
* @param upgradeTarget - the instance that will be filled in with the
* upgraded link config.
*/
public abstract void upgrade(MLinkConfig original, MLinkConfig upgradeTarget);
/**
* Upgrade the original job config and fill into the upgradeTarget. Note
* that any config data already in {@code upgradeTarget} maybe overwritten.
* This method must be called only after the link config has
* already been upgraded.
* @param original - original job config as in the repository
* @param upgradeTarget - the instance that will be filled in with the
* upgraded job config.
* NOTE(VB): This api will be revisited to accomodate from and to job config update
*/
public abstract void upgrade(MConfigList original, MConfigList upgradeTarget);
}

View File

@ -60,11 +60,13 @@ public List<Direction> getSupportedDirections() {
/** /**
* @return Get link configuration group class * @return Get link configuration group class
*/ */
@SuppressWarnings("rawtypes")
public abstract Class getLinkConfigurationClass(); public abstract Class getLinkConfigurationClass();
/** /**
* @return Get job configuration group class per direction type or null if not supported * @return Get job configuration group class per direction type or null if not supported
*/ */
@SuppressWarnings("rawtypes")
public abstract Class getJobConfigurationClass(Direction direction); public abstract Class getJobConfigurationClass(Direction direction);
/** /**
@ -85,11 +87,11 @@ public List<Direction> getSupportedDirections() {
public abstract Validator getConfigValidator(); public abstract Validator getConfigValidator();
/** /**
* Returns an {@linkplain RepositoryUpgrader} object that can upgrade the * Returns an {@linkplain ConnectorConfigurableUpgrader} object that can upgrade the
* configs related to the link and job * configs related to the link and job
* @return RespositoryUpgrader object * @return RespositoryUpgrader object
*/ */
public abstract RepositoryUpgrader getRepositoryUpgrader(); public abstract ConnectorConfigurableUpgrader getConfigurableUpgrader();
/** /**
* Returns the {@linkplain IntermediateDataFormat} this connector * Returns the {@linkplain IntermediateDataFormat} this connector

View File

@ -139,7 +139,7 @@ private JSONObject addConnectorName(JSONObject json) {
while (iterator.hasNext()) { while (iterator.hasNext()) {
JSONObject result = iterator.next(); JSONObject result = iterator.next();
Long connectorId = (Long) result.get(JSONConstants.CONNECTOR_ID); Long connectorId = (Long) result.get(JSONConstants.CONNECTOR_ID);
result.put(JSONConstants.CONNECTOR_NAME, connectorManager.getConnectorConfig(connectorId).getUniqueName()); result.put(JSONConstants.CONNECTOR_NAME, connectorManager.getConnectorConfigurable(connectorId).getUniqueName());
} }
return json; return json;

View File

@ -18,6 +18,14 @@
package org.apache.sqoop.tools.tool; package org.apache.sqoop.tools.tool;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.GnuParser;
@ -25,13 +33,15 @@
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException; import org.apache.commons.cli.ParseException;
import org.apache.commons.io.Charsets; import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.VersionInfo; import org.apache.sqoop.common.VersionInfo;
import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.RepositoryUpgrader; import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.driver.Driver; import org.apache.sqoop.driver.Driver;
import org.apache.sqoop.driver.DriverUpgrader;
import org.apache.sqoop.json.JobBean; import org.apache.sqoop.json.JobBean;
import org.apache.sqoop.json.LinkBean; import org.apache.sqoop.json.LinkBean;
import org.apache.sqoop.json.SubmissionBean; import org.apache.sqoop.json.SubmissionBean;
@ -50,16 +60,6 @@
import org.apache.sqoop.repository.Repository; import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager; import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.tools.ConfiguredTool; import org.apache.sqoop.tools.ConfiguredTool;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.sqoop.utils.ClassUtils; import org.apache.sqoop.utils.ClassUtils;
import org.apache.sqoop.validation.ConfigValidationResult; import org.apache.sqoop.validation.ConfigValidationResult;
import org.apache.sqoop.validation.ConfigValidationRunner; import org.apache.sqoop.validation.ConfigValidationRunner;
@ -140,8 +140,7 @@ private boolean load(JSONObject repo) {
Repository repository = RepositoryManager.getInstance().getRepository(); Repository repository = RepositoryManager.getInstance().getRepository();
ConnectorManager.getInstance().initialize(); ConnectorManager.getInstance().initialize();
ConnectorManager connectorManager = ConnectorManager.getInstance();
LOG.info("Loading Connections"); LOG.info("Loading Connections");
JSONObject jsonConns = (JSONObject) repo.get(JSONConstants.LINKS); JSONObject jsonConns = (JSONObject) repo.get(JSONConstants.LINKS);
@ -247,20 +246,21 @@ private long loadLink(MLink link) {
//starting by pretending we have a brand new connection //starting by pretending we have a brand new connection
resetPersistenceId(link); resetPersistenceId(link);
RepositoryUpgrader upgrader = Driver.getInstance().getDriverConfigRepositoryUpgrader();
Repository repository = RepositoryManager.getInstance().getRepository(); Repository repository = RepositoryManager.getInstance().getRepository();
MConnector mConnector = ConnectorManager.getInstance().getConnectorConfig(link.getConnectorId()); MConnector mConnector = ConnectorManager.getInstance().getConnectorConfigurable(link.getConnectorId());
ConnectorConfigurableUpgrader connectorConfigUpgrader = ConnectorManager.getInstance().getSqoopConnector(mConnector.getUniqueName()).getConfigurableUpgrader();
List<MConfig> connectorConfigs = mConnector.getLinkConfig().clone(false).getConfigs(); List<MConfig> connectorConfigs = mConnector.getLinkConfig().clone(false).getConfigs();
MLinkConfig newLinkConfigs = new MLinkConfig(connectorConfigs); MLinkConfig newLinkConfigs = new MLinkConfig(connectorConfigs);
// upgrading the forms to make sure they match the current repository // upgrading the configs to make sure they match the current repository
upgrader.upgrade(link.getConnectorLinkConfig(), newLinkConfigs); connectorConfigUpgrader.upgradeLinkConfig(link.getConnectorLinkConfig(), newLinkConfigs);
MLink newLink = new MLink(link, newLinkConfigs); MLink newLink = new MLink(link, newLinkConfigs);
// Transform config structures to objects for validations // Transform config structures to objects for validations
SqoopConnector connector = SqoopConnector connector = ConnectorManager.getInstance().getSqoopConnector(
ConnectorManager.getInstance().getConnector(link.getConnectorId()); link.getConnectorId());
Object connectorConfig = ClassUtils.instantiate( Object connectorConfig = ClassUtils.instantiate(
connector.getLinkConfigurationClass()); connector.getLinkConfigurationClass());
@ -286,27 +286,32 @@ private long loadLink(MLink link) {
private long loadJob(MJob job) { private long loadJob(MJob job) {
//starting by pretending we have a brand new job //starting by pretending we have a brand new job
resetPersistenceId(job); resetPersistenceId(job);
MConnector mFromConnector = ConnectorManager.getInstance().getConnectorConfigurable(job.getFromConnectorId());
MConnector mToConnector = ConnectorManager.getInstance().getConnectorConfigurable(job.getToConnectorId());
RepositoryUpgrader upgrader = Driver.getInstance().getDriverConfigRepositoryUpgrader(); MFromConfig fromConfig = job.getFromJobConfig();
MToConfig toConfig = job.getToJobConfig();
ConnectorConfigurableUpgrader fromConnectorConfigUpgrader = ConnectorManager.getInstance().getSqoopConnector(mFromConnector.getUniqueName()).getConfigurableUpgrader();
ConnectorConfigurableUpgrader toConnectorConfigUpgrader = ConnectorManager.getInstance().getSqoopConnector(mToConnector.getUniqueName()).getConfigurableUpgrader();
fromConnectorConfigUpgrader.upgradeFromJobConfig(job.getFromJobConfig(), fromConfig);
toConnectorConfigUpgrader.upgradeToJobConfig(job.getToJobConfig(), toConfig);
DriverUpgrader driverConfigUpgrader = Driver.getInstance().getConfigurableUpgrader();
MDriver driver = Driver.getInstance().getDriver(); MDriver driver = Driver.getInstance().getDriver();
Repository repository = RepositoryManager.getInstance().getRepository();
MDriverConfig driverConfigs = driver.getDriverConfig(); MDriverConfig driverConfigs = driver.getDriverConfig();
MFromConfig fromConfigs = job.getFromJobConfig(); driverConfigUpgrader.upgradeJobConfig( job.getDriverConfig(), driverConfigs);
MToConfig toConfigs = job.getToJobConfig();
// upgrading the configs to make sure they match the current repository MJob newJob = new MJob(job, fromConfig, toConfig, driverConfigs);
upgrader.upgrade(job.getDriverConfig(), driverConfigs);
upgrader.upgrade(job.getFromJobConfig(), fromConfigs);
upgrader.upgrade(job.getToJobConfig(), toConfigs);
MJob newJob = new MJob(job, fromConfigs, toConfigs, driverConfigs);
// Transform config structures to objects for validations // Transform config structures to objects for validations
SqoopConnector fromConnector = SqoopConnector fromConnector =
ConnectorManager.getInstance().getConnector( ConnectorManager.getInstance().getSqoopConnector(
job.getConnectorId(Direction.FROM)); job.getConnectorId(Direction.FROM));
SqoopConnector toConnector = SqoopConnector toConnector =
ConnectorManager.getInstance().getConnector( ConnectorManager.getInstance().getSqoopConnector(
job.getConnectorId(Direction.TO)); job.getConnectorId(Direction.TO));
Object fromConnectorConfig = ClassUtils.instantiate( Object fromConnectorConfig = ClassUtils.instantiate(
@ -314,7 +319,7 @@ private long loadJob(MJob job) {
Object toConnectorConfig = ClassUtils.instantiate( Object toConnectorConfig = ClassUtils.instantiate(
toConnector.getJobConfigurationClass(Direction.TO)); toConnector.getJobConfigurationClass(Direction.TO));
Object driverConfig = ClassUtils.instantiate( Object driverConfig = ClassUtils.instantiate(
Driver.getInstance().getDriverConfigurationGroupClass()); Driver.getInstance().getDriverJobConfigurationClass());
ConfigUtils.fromConfigs( ConfigUtils.fromConfigs(
job.getFromJobConfig().getConfigs(), fromConnectorConfig); job.getFromJobConfig().getConfigs(), fromConnectorConfig);
@ -332,7 +337,7 @@ private long loadJob(MJob job) {
toConnectorConfigResult.getStatus(), driverConfigResult.getStatus()); toConnectorConfigResult.getStatus(), driverConfigResult.getStatus());
if (finalStatus.canProceed()) { if (finalStatus.canProceed()) {
repository.createJob(newJob); RepositoryManager.getInstance().getRepository().createJob(newJob);
} else { } else {
LOG.error("Failed to load job:" + job.getName()); LOG.error("Failed to load job:" + job.getName());