diff --git a/common/src/main/java/org/apache/sqoop/model/Configurable.java b/common/src/main/java/org/apache/sqoop/model/Configurable.java new file mode 100644 index 00000000..2033fcb7 --- /dev/null +++ b/common/src/main/java/org/apache/sqoop/model/Configurable.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.model; + +/** + * Marker class that identifies the Configurables in the Sqoop system + */ +public abstract class Configurable extends MPersistableEntity implements MClonable { + +} \ No newline at end of file diff --git a/common/src/main/java/org/apache/sqoop/model/MConfigurableType.java b/common/src/main/java/org/apache/sqoop/model/MConfigurableType.java new file mode 100644 index 00000000..7ab70329 --- /dev/null +++ b/common/src/main/java/org/apache/sqoop/model/MConfigurableType.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.model; + +/** + * Represents the sqoop entities that can own configs + */ +public enum MConfigurableType { + + /** Connector as a owner of config keys */ + CONNECTOR, + + /** Driver as a owner of config keys */ + DRIVER; +} \ No newline at end of file diff --git a/common/src/main/java/org/apache/sqoop/model/MConnector.java b/common/src/main/java/org/apache/sqoop/model/MConnector.java index 2f421914..174d0b9d 100644 --- a/common/src/main/java/org/apache/sqoop/model/MConnector.java +++ b/common/src/main/java/org/apache/sqoop/model/MConnector.java @@ -23,11 +23,11 @@ import org.apache.sqoop.common.SupportedDirections; /** - * Connector entity supports the FROM/TO {@link Transferable} Includes unique id + * Connector entity supports the FROM/TO {@link org.apache.sqoop.job.etl.Transfereable} Includes unique id * that identifies connector in the repository, unique human readable name, * corresponding name and all configs to support the from and to data sources */ -public final class MConnector extends MPersistableEntity implements MClonable { +public final class MConnector extends Configurable { private final String uniqueName; private final String className; diff --git a/common/src/main/java/org/apache/sqoop/model/MDriver.java b/common/src/main/java/org/apache/sqoop/model/MDriver.java index 685439e5..4241a313 100644 --- a/common/src/main/java/org/apache/sqoop/model/MDriver.java +++ b/common/src/main/java/org/apache/sqoop/model/MDriver.java @@ -22,7 +22,7 @@ /** * Describes the configs associated with the {@link Driver} for executing sqoop jobs. */ -public class MDriver extends MPersistableEntity implements MClonable { +public final class MDriver extends Configurable { private final MDriverConfig driverConfig; private final String version; diff --git a/common/src/main/java/org/apache/sqoop/model/MJob.java b/common/src/main/java/org/apache/sqoop/model/MJob.java index b3dec270..935dd181 100644 --- a/common/src/main/java/org/apache/sqoop/model/MJob.java +++ b/common/src/main/java/org/apache/sqoop/model/MJob.java @@ -139,6 +139,14 @@ public long getConnectorId(Direction type) { } } + public long getFromConnectorId() { + return fromConnectorId; + } + + public long getToConnectorId() { + return toConnectorId; + } + public MConfigList getJobConfig(Direction type) { switch(type) { case FROM: diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java index 87ac2af4..84690643 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java @@ -25,7 +25,7 @@ import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration; import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration; -import org.apache.sqoop.connector.spi.RepositoryUpgrader; +import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; import org.apache.sqoop.job.etl.From; import org.apache.sqoop.job.etl.To; import org.apache.sqoop.connector.spi.SqoopConnector; @@ -97,7 +97,7 @@ public Validator getConfigValidator() { } @Override - public RepositoryUpgrader getRepositoryUpgrader() { + public ConnectorConfigurableUpgrader getConfigurableUpgrader() { return new GenericJdbcConnectorUpgrader(); } diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorUpgrader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorUpgrader.java index a069b3e3..fb92a397 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorUpgrader.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorUpgrader.java @@ -18,64 +18,27 @@ */ package org.apache.sqoop.connector.jdbc; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.log4j.Logger; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.spi.RepositoryUpgrader; -import org.apache.sqoop.model.MConfigList; -import org.apache.sqoop.model.MConfig; -import org.apache.sqoop.model.MInput; +import org.apache.sqoop.configurable.ConfigurableUpgradeUtil; +import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; +import org.apache.sqoop.model.MFromConfig; import org.apache.sqoop.model.MLinkConfig; +import org.apache.sqoop.model.MToConfig; -public class GenericJdbcConnectorUpgrader extends RepositoryUpgrader { - private static final Logger LOG = Logger.getLogger(GenericJdbcConnectorUpgrader.class); - - /* - * For now, there is no real upgrade. So copy all data over, - * set the validation messages and error messages to be the same as for the - * inputs in the original one. - */ +// NOTE: All config types have the similar upgrade path at this point +public class GenericJdbcConnectorUpgrader extends ConnectorConfigurableUpgrader { @Override - public void upgrade(MLinkConfig original, MLinkConfig upgradeTarget) { - doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); + public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) { + ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); } @Override - public void upgrade(MConfigList original, MConfigList upgradeTarget) { - doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); + public void upgradeFromJobConfig(MFromConfig original, MFromConfig upgradeTarget) { + ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); } - @SuppressWarnings("unchecked") - private void doUpgrade(List original, List target) { - // Easier to find the config in the original list if we use a map. - // Since the constructor takes a list, - // index is not guaranteed to be the same, so we need to look for - // equivalence - Map configMap = new HashMap(); - for (MConfig config : original) { - configMap.put(config.getName(), config); - } - for (MConfig config : target) { - List> inputs = config.getInputs(); - MConfig orginalConfig = configMap.get(config.getName()); - if (orginalConfig == null) { - LOG.warn("Config: '" + config.getName() + "' not present in old " + - "generic JDBC connector. So it and its inputs will not be transferred by the upgrader."); - continue; - } - for (MInput input : inputs) { - try { - MInput originalInput = orginalConfig.getInput(input.getName()); - input.setValue(originalInput.getValue()); - } catch (SqoopException ex) { - LOG.warn("Input: '" + input.getName() + "' not present in old " + - "generic JDBC connector. So it will not be transferred by the upgrader."); - } - } - } + @Override + public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) { + ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); } } diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java index 606b9fa2..e63e464c 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java @@ -24,7 +24,7 @@ import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; -import org.apache.sqoop.connector.spi.RepositoryUpgrader; +import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.job.etl.From; import org.apache.sqoop.job.etl.To; @@ -123,13 +123,13 @@ public Validator getConfigValidator() { } /** - * Returns an {@linkplain org.apache.sqoop.connector.spi.RepositoryUpgrader} object that can upgrade the + * Returns an {@linkplain org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader} object that can upgrade the * connection and job metadata. * * @return MetadataUpgrader object */ @Override - public RepositoryUpgrader getRepositoryUpgrader() { - return new HdfsConfigUpgrader(); + public ConnectorConfigurableUpgrader getConfigurableUpgrader() { + return new HdfsConnectorUpgrader(); } } diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorUpgrader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorUpgrader.java new file mode 100644 index 00000000..14862ebd --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorUpgrader.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.configurable.ConfigurableUpgradeUtil; +import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; +import org.apache.sqoop.model.MFromConfig; +import org.apache.sqoop.model.MLinkConfig; +import org.apache.sqoop.model.MToConfig; + +//NOTE: All config types have the similar upgrade path at this point +public class HdfsConnectorUpgrader extends ConnectorConfigurableUpgrader { + + @Override + public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) { + ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); + } + + @Override + public void upgradeFromJobConfig(MFromConfig original, MFromConfig upgradeTarget) { + ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); + } + + @Override + public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) { + ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); + } + +} diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java index 54bdd135..1919b4b4 100644 --- a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java +++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java @@ -43,15 +43,14 @@ public final class ConnectorHandler { private final String connectorUniqueName; private final SqoopConnector connector; - private MConnector mConnector; + private MConnector connectorConfigurable; public ConnectorHandler(URL configFileUrl) { connectorUrl = configFileUrl.toString(); try { properties.load(configFileUrl.openStream()); } catch (IOException ex) { - throw new SqoopException(ConnectorError.CONN_0003, - configFileUrl.toString(), ex); + throw new SqoopException(ConnectorError.CONN_0003, configFileUrl.toString(), ex); } LOG.debug("Connector configuration: " + properties); @@ -64,12 +63,9 @@ public ConnectorHandler(URL configFileUrl) { ConfigurationConstants.CONPROP_PROVIDER_CLASS); } + connectorUniqueName = properties.getProperty(ConfigurationConstants.CONNPROP_CONNECTOR_NAME); - connectorUniqueName = properties.getProperty( - ConfigurationConstants.CONNPROP_CONNECTOR_NAME); - - if (connectorUniqueName == null || connectorUniqueName.trim().length() == 0) - { + if (connectorUniqueName == null || connectorUniqueName.trim().length() == 0) { throw new SqoopException(ConnectorError.CONN_0008, connectorClassName); } @@ -103,13 +99,11 @@ public ConnectorHandler(URL configFileUrl) { connector.getJobConfigurationClass(Direction.TO))); } - MLinkConfig connectionForms = new MLinkConfig( + MLinkConfig linkConfig = new MLinkConfig( ConfigUtils.toConfigs(connector.getLinkConfigurationClass())); - String connectorVersion = connector.getVersion(); - - mConnector = new MConnector(connectorUniqueName, connectorClassName, connectorVersion, - connectionForms, fromConfig, toConfig); + connectorConfigurable = new MConnector(connectorUniqueName, connectorClassName, connector.getVersion(), + linkConfig, fromConfig, toConfig); if (LOG.isInfoEnabled()) { LOG.info("Connector [" + connectorClassName + "] initialized."); @@ -133,15 +127,15 @@ public String getConnectorUrl() { return connectorUrl; } - public MConnector getMetadata() { - return mConnector; + public MConnector getConnectorConfigurable() { + return connectorConfigurable; } - public void setMetadata(MConnector connector) { - this.mConnector = connector; + public void setConnectorConfigurable(MConnector mConnector) { + this.connectorConfigurable = mConnector; } - public SqoopConnector getConnector() { + public SqoopConnector getSqoopConnector() { return connector; } -} +} \ No newline at end of file diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java index 52269264..0369b4db 100644 --- a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java +++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java @@ -92,10 +92,10 @@ public static void setInstance(ConnectorManager newInstance) { private Map handlerMap = new HashMap(); - public List getConnectorsMetadata() { + public List getConnectorConfigurables() { List connectors = new LinkedList(); for(ConnectorHandler handler : handlerMap.values()) { - connectors.add(handler.getMetadata()); + connectors.add(handler.getConnectorConfigurable()); } return connectors; } @@ -107,8 +107,8 @@ public Set getConnectorIds() { public Map getResourceBundles(Locale locale) { Map bundles = new HashMap(); for(ConnectorHandler handler : handlerMap.values()) { - long id = handler.getMetadata().getPersistenceId(); - ResourceBundle bundle = handler.getConnector().getBundle(locale); + long id = handler.getConnectorConfigurable().getPersistenceId(); + ResourceBundle bundle = handler.getSqoopConnector().getBundle(locale); bundles.put(id, bundle); } return bundles; @@ -116,25 +116,24 @@ public Map getResourceBundles(Locale locale) { public ResourceBundle getResourceBundle(long connectorId, Locale locale) { ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId)); - return handler.getConnector().getBundle(locale); + return handler.getSqoopConnector().getBundle(locale); } - public MConnector getConnectorConfig(long connectorId) { + public MConnector getConnectorConfigurable(long connectorId) { ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId)); if(handler == null) { return null; } - - return handler.getMetadata(); + return handler.getConnectorConfigurable(); } - public SqoopConnector getConnector(long connectorId) { + public SqoopConnector getSqoopConnector(long connectorId) { ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId)); - return handler.getConnector(); + return handler.getSqoopConnector(); } - public SqoopConnector getConnector(String uniqueName) { - return handlerMap.get(uniqueName).getConnector(); + public SqoopConnector getSqoopConnector(String uniqueName) { + return handlerMap.get(uniqueName).getSqoopConnector(); } public synchronized void initialize() { @@ -182,21 +181,21 @@ private synchronized void registerConnectors(boolean autoUpgrade) { rtx.begin(); for (String name : handlerMap.keySet()) { ConnectorHandler handler = handlerMap.get(name); - MConnector connectorMetadata = handler.getMetadata(); + MConnector connectorMetadata = handler.getConnectorConfigurable(); MConnector registeredMetadata = repository.registerConnector(connectorMetadata, autoUpgrade); // Set registered metadata instead of connector metadata as they will // have filled persistent ids. We should be confident at this point that // there are no differences between those two structures. - handler.setMetadata(registeredMetadata); + handler.setConnectorConfigurable(registeredMetadata); String connectorName = handler.getUniqueName(); - if (!handler.getMetadata().hasPersistenceId()) { + if (!handler.getConnectorConfigurable().hasPersistenceId()) { throw new SqoopException(ConnectorError.CONN_0010, connectorName); } - nameMap.put(handler.getMetadata().getPersistenceId(), connectorName); - LOG.debug("Registered connector: " + handler.getMetadata()); + nameMap.put(handler.getConnectorConfigurable().getPersistenceId(), connectorName); + LOG.debug("Registered connector: " + handler.getConnectorConfigurable()); } rtx.commit(); } catch (Exception ex) { diff --git a/core/src/main/java/org/apache/sqoop/driver/Driver.java b/core/src/main/java/org/apache/sqoop/driver/Driver.java index f1b45bbd..46a16ac9 100644 --- a/core/src/main/java/org/apache/sqoop/driver/Driver.java +++ b/core/src/main/java/org/apache/sqoop/driver/Driver.java @@ -22,12 +22,11 @@ import java.util.ResourceBundle; import org.apache.log4j.Logger; -import org.apache.sqoop.connector.spi.RepositoryUpgrader; import org.apache.sqoop.core.ConfigurationConstants; import org.apache.sqoop.core.Reconfigurable; import org.apache.sqoop.core.SqoopConfiguration; import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener; -import org.apache.sqoop.driver.configuration.DriverConfiguration; +import org.apache.sqoop.driver.configuration.JobConfiguration; import org.apache.sqoop.json.DriverBean; import org.apache.sqoop.model.ConfigUtils; import org.apache.sqoop.model.MConfig; @@ -105,25 +104,26 @@ public static void setInstance(Driver newInstance) { /** * Driver config upgrader instance */ - private final RepositoryUpgrader driverConfigUpgrader; + private final DriverUpgrader driverUpgrader; /** * Default driver config auto upgrade option value */ private static final boolean DEFAULT_AUTO_UPGRADE = false; - public Class getDriverConfigurationGroupClass() { - return DriverConfiguration.class; + @SuppressWarnings("rawtypes") + public Class getDriverJobConfigurationClass() { + return JobConfiguration.class; } public Driver() { - List driverConfig = ConfigUtils.toConfigs(getDriverConfigurationGroupClass()); + List driverConfig = ConfigUtils.toConfigs(getDriverJobConfigurationClass()); mDriver = new MDriver(new MDriverConfig(driverConfig), DriverBean.CURRENT_DRIVER_VERSION); // Build validator driverValidator = new DriverConfigValidator(); // Build upgrader - driverConfigUpgrader = new DriverConfigUpgrader(); + driverUpgrader = new DriverUpgrader(); } public synchronized void initialize() { @@ -150,8 +150,8 @@ public Validator getValidator() { return driverValidator; } - public RepositoryUpgrader getDriverConfigRepositoryUpgrader() { - return driverConfigUpgrader; + public DriverUpgrader getConfigurableUpgrader() { + return driverUpgrader; } public MDriver getDriver() { diff --git a/core/src/main/java/org/apache/sqoop/driver/DriverConfigUpgrader.java b/core/src/main/java/org/apache/sqoop/driver/DriverConfigUpgrader.java deleted file mode 100644 index 847b73d4..00000000 --- a/core/src/main/java/org/apache/sqoop/driver/DriverConfigUpgrader.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sqoop.driver; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.log4j.Logger; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.spi.RepositoryUpgrader; -import org.apache.sqoop.model.MConfigList; -import org.apache.sqoop.model.MConfig; -import org.apache.sqoop.model.MInput; -import org.apache.sqoop.model.MLinkConfig; - -public class DriverConfigUpgrader extends RepositoryUpgrader{ - - private static final Logger LOG = Logger.getLogger(DriverConfigUpgrader.class); - - @Override - public void upgrade(MLinkConfig original, MLinkConfig upgradeTarget) { - // NOTE(VB): There are no link configs anymore for driver, this code remains for previous versions - } - - @Override - public void upgrade(MConfigList original, MConfigList upgradeTarget) { - doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); - } - - @SuppressWarnings("unchecked") - private void doUpgrade(List original, List target) { - // Easier to find the config in the original list if we use a map. - // Since the constructor takes a list, - // index is not guaranteed to be the same, so we need to look for - // equivalence - Map configMap = new HashMap(); - for (MConfig config : original) { - configMap.put(config.getName(), config); - } - for (MConfig config : target) { - List> inputs = config.getInputs(); - MConfig originalConfig = configMap.get(config.getName()); - if(originalConfig == null) { - LOG.warn("Config: " + config.getName() + " not present in old " + - "driver config. So it will not be transferred by the upgrader."); - continue; - } - - for (MInput input : inputs) { - try { - MInput originalInput = originalConfig.getInput(input.getName()); - input.setValue(originalInput.getValue()); - } catch (SqoopException ex) { - LOG.warn("Input: " + input.getName() + " not present in old " + - "driver config. So it will not be transferred by the upgrader."); - } - } - } - } -} diff --git a/core/src/main/java/org/apache/sqoop/driver/DriverConfigValidator.java b/core/src/main/java/org/apache/sqoop/driver/DriverConfigValidator.java index 9c3b660c..0d9a9b8e 100644 --- a/core/src/main/java/org/apache/sqoop/driver/DriverConfigValidator.java +++ b/core/src/main/java/org/apache/sqoop/driver/DriverConfigValidator.java @@ -17,7 +17,7 @@ */ package org.apache.sqoop.driver; -import org.apache.sqoop.driver.configuration.DriverConfiguration; +import org.apache.sqoop.driver.configuration.JobConfiguration; import org.apache.sqoop.driver.configuration.ThrottlingConfig; import org.apache.sqoop.validation.Status; import org.apache.sqoop.validation.ConfigValidator; @@ -26,8 +26,8 @@ public class DriverConfigValidator extends Validator { @Override public ConfigValidator validateConfigForJob(Object jobConfiguration) { - ConfigValidator validation = new ConfigValidator(DriverConfiguration.class); - DriverConfiguration conf = (DriverConfiguration)jobConfiguration; + ConfigValidator validation = new ConfigValidator(JobConfiguration.class); + JobConfiguration conf = (JobConfiguration)jobConfiguration; validateThrottlingConfig(validation,conf.throttlingConfig); return validation; diff --git a/core/src/main/java/org/apache/sqoop/driver/DriverUpgrader.java b/core/src/main/java/org/apache/sqoop/driver/DriverUpgrader.java new file mode 100644 index 00000000..b880d3bb --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/driver/DriverUpgrader.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.driver; + +import org.apache.sqoop.configurable.ConfigurableUpgradeUtil; +import org.apache.sqoop.model.MDriverConfig; + +public class DriverUpgrader { + + public void upgradeJobConfig(MDriverConfig original, MDriverConfig upgradeTarget) { + ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); + } +} diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java index df2a5ab3..51e562cb 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -30,7 +30,7 @@ import org.apache.sqoop.core.Reconfigurable; import org.apache.sqoop.core.SqoopConfiguration; import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener; -import org.apache.sqoop.driver.configuration.DriverConfiguration; +import org.apache.sqoop.driver.configuration.JobConfiguration; import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.DestroyerContext; import org.apache.sqoop.job.etl.Initializer; @@ -306,9 +306,9 @@ private JobRequest createJobRequest(long jobId, MSubmission submission) { MLink toConnection = getLink(job.getLinkId(Direction.TO)); // get from/to connectors for the connection - SqoopConnector fromConnector = getConnector(fromConnection.getConnectorId()); + SqoopConnector fromConnector = getSqoopConnector(fromConnection.getConnectorId()); validateSupportedDirection(fromConnector, Direction.FROM); - SqoopConnector toConnector = getConnector(toConnection.getConnectorId()); + SqoopConnector toConnector = getSqoopConnector(toConnection.getConnectorId()); validateSupportedDirection(toConnector, Direction.TO); // link config for the FROM part of the job @@ -329,7 +329,7 @@ private JobRequest createJobRequest(long jobId, MSubmission submission) { // the only driver config for the job Object driverConfig = ClassUtils - .instantiate(Driver.getInstance().getDriverConfigurationGroupClass()); + .instantiate(Driver.getInstance().getDriverJobConfigurationClass()); ConfigUtils.fromConfigs(job.getDriverConfig().getConfigs(), driverConfig); @@ -402,8 +402,8 @@ MSubmission createJobSubmission(HttpEventContext ctx, long jobId) { return summary; } - SqoopConnector getConnector(long connnectorId) { - return ConnectorManager.getInstance().getConnector(connnectorId); + SqoopConnector getSqoopConnector(long connnectorId) { + return ConnectorManager.getInstance().getSqoopConnector(connnectorId); } void validateSupportedDirection(SqoopConnector connector, Direction direction) { @@ -480,7 +480,7 @@ private InitializerContext getConnectorInitializerContext(JobRequest jobRequest, } void prepareJob(JobRequest request) { - DriverConfiguration jobConfiguration = (DriverConfiguration) request.getDriverConfig(); + JobConfiguration jobConfiguration = (JobConfiguration) request.getDriverConfig(); // We're directly moving configured number of extractors and loaders to // underlying request object. In the future we might need to throttle this // count based on other running jobs to meet our SLAs. diff --git a/core/src/main/java/org/apache/sqoop/driver/configuration/DriverConfiguration.java b/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java similarity index 90% rename from core/src/main/java/org/apache/sqoop/driver/configuration/DriverConfiguration.java rename to core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java index d4e2254f..bf1328ad 100644 --- a/core/src/main/java/org/apache/sqoop/driver/configuration/DriverConfiguration.java +++ b/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java @@ -21,14 +21,14 @@ import org.apache.sqoop.model.Config; /** - * Representing the core job configuration + * Representing the driver job configuration */ @ConfigurationClass -public class DriverConfiguration { +public class JobConfiguration { @Config public ThrottlingConfig throttlingConfig; - public DriverConfiguration() { + public JobConfiguration() { throttlingConfig = new ThrottlingConfig(); } } diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java index f06fd0cd..476830d0 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java @@ -163,10 +163,6 @@ public Object doIt(Connection conn) throws Exception { handler.registerConnector(mConnector, conn); return mConnector; } else { - // Same connector, check if the version is the same. - // For now, use the "string" versions itself - later we should - // probably include a build number or something that is - // monotonically increasing. if (connectorResult.getUniqueName().equals(mConnector.getUniqueName()) && mConnector.getVersion().compareTo(connectorResult.getVersion()) > 0) { if (autoUpgrade) { @@ -652,23 +648,23 @@ public Object doIt(Connection conn) throws Exception { * {@inheritDoc} */ @Override - protected void upgradeConnector(final MConnector newConnector, + protected void upgradeConnectorConfigs(final MConnector newConnector, RepositoryTransaction tx) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) throws Exception { - handler.upgradeConnector(newConnector, conn); + handler.upgradeConnectorConfigs(newConnector, conn); return null; } }, (JdbcRepositoryTransaction) tx); } - protected void upgradeDriver(final MDriver mDriver, RepositoryTransaction tx) { + protected void upgradeDriverConfigs(final MDriver mDriver, RepositoryTransaction tx) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) throws Exception { - handler.upgradeDriver(mDriver, conn); + handler.upgradeDriverConfigs(mDriver, conn); return null; } }, (JdbcRepositoryTransaction) tx); diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java index 5a8e026f..4c5229fd 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java @@ -101,7 +101,7 @@ public abstract List findJobsForConnector(long connectorID, * @param conn JDBC link for querying repository */ - public abstract void upgradeConnector(MConnector mConnector, Connection conn); + public abstract void upgradeConnectorConfigs(MConnector mConnector, Connection conn); /** @@ -117,7 +117,7 @@ public abstract List findJobsForConnector(long connectorID, * the driverConfig. * @param conn JDBC link for querying repository */ - public abstract void upgradeDriver(MDriver mDriver, Connection conn); + public abstract void upgradeDriverConfigs(MDriver mDriver, Connection conn); /** diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java index 74a9e124..8f780526 100644 --- a/core/src/main/java/org/apache/sqoop/repository/Repository.java +++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java @@ -22,12 +22,12 @@ import java.util.Map; import org.apache.log4j.Logger; -import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.ConnectorManager; -import org.apache.sqoop.connector.spi.RepositoryUpgrader; +import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.driver.Driver; +import org.apache.sqoop.driver.DriverUpgrader; import org.apache.sqoop.json.DriverBean; import org.apache.sqoop.model.ConfigUtils; import org.apache.sqoop.model.MConfig; @@ -317,7 +317,7 @@ public abstract class Repository { * method will not call begin, commit, * rollback or close on this transaction. */ - protected abstract void upgradeConnector(MConnector newConnector, RepositoryTransaction tx); + protected abstract void upgradeConnectorConfigs(MConnector newConnector, RepositoryTransaction tx); /** * Upgrade the driver with the new data supplied in the @@ -335,7 +335,7 @@ public abstract class Repository { * method will not call begin, commit, * rollback or close on this transaction. */ - protected abstract void upgradeDriver(MDriver newDriver, RepositoryTransaction tx); + protected abstract void upgradeDriverConfigs(MDriver newDriver, RepositoryTransaction tx); /** * Delete all inputs for a job @@ -388,84 +388,88 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec LOG.info("Upgrading connector: " + oldConnector.getUniqueName()); long connectorID = oldConnector.getPersistenceId(); newConnector.setPersistenceId(connectorID); - /* Algorithms: - * 1. Get an upgrader for the connector. - * 2. Get all links associated with the connector. - * 3. Get all jobs associated with the connector. - * 4. Delete the inputs for all of the jobs and links (in that order) - * 5. Remove all inputs and configs associated with the connector, and - * register the new configs and inputs. - * 6. Create new links and jobs with connector part being the ones - * returned by the upgrader. - * 7. Validate new links and jobs with connector's validator - * 8. If any invalid links or jobs detected, throw an exception - * and stop the bootup of Sqoop server - * 9. Otherwise, Insert the link inputs followed by job inputs (using - * updateJob and updatelink) - */ + RepositoryTransaction tx = null; try { - SqoopConnector connector = - ConnectorManager.getInstance().getConnector(newConnector - .getUniqueName()); + SqoopConnector connector = ConnectorManager.getInstance().getSqoopConnector( + newConnector.getUniqueName()); Validator connectorConfigValidator = connector.getConfigValidator(); boolean upgradeSuccessful = true; - RepositoryUpgrader upgrader = connector.getRepositoryUpgrader(); - List linksByConnector = findLinksForConnector(connectorID); - List jobsByConnector = findJobsForConnector(connectorID); + // 1. Get an upgrader for the connector + ConnectorConfigurableUpgrader upgrader = connector.getConfigurableUpgrader(); + // 2. Get all links associated with the connector. + List existingLinksByConnector = findLinksForConnector(connectorID); + // 3. Get all jobs associated with the connector. + List existingJobsByConnector = findJobsForConnector(connectorID); // -- BEGIN TXN -- tx = getTransaction(); tx.begin(); - deletelinksAndJobs(linksByConnector, jobsByConnector, tx); - upgradeConnector(newConnector, tx); - for (MLink oldLink : linksByConnector) { - // Make a new copy of the configs - List linkConfig = newConnector.getLinkConfig().clone(false).getConfigs(); - MLinkConfig newLinkConfig = new MLinkConfig(linkConfig); - MLinkConfig oldLinkConfig = oldLink.getConnectorLinkConfig(); - upgrader.upgrade(oldLinkConfig, newLinkConfig); + // 4. Delete the inputs for all of the jobs and links (in that order) for + // this connector + deletelinksAndJobs(existingLinksByConnector, existingJobsByConnector, tx); + // 5. Delete all inputs and configs associated with the connector, and + // insert the new configs and inputs for this connector + upgradeConnectorConfigs(newConnector, tx); + // 6. Run upgrade logic for the configs related to the link objects + // dont always rely on the repository implementation to return empty list for links + if (existingLinksByConnector != null) { + for (MLink link : existingLinksByConnector) { + // Make a new copy of the configs + List linkConfig = newConnector.getLinkConfig().clone(false).getConfigs(); + MLinkConfig newLinkConfig = new MLinkConfig(linkConfig); + MLinkConfig oldLinkConfig = link.getConnectorLinkConfig(); + upgrader.upgradeLinkConfig(oldLinkConfig, newLinkConfig); + MLink newlink = new MLink(link, newLinkConfig); - MLink newlink = new MLink(oldLink, newLinkConfig); - - Object newConfigurationObject = ClassUtils.instantiate(connector.getLinkConfigurationClass()); - ConfigUtils.fromConfigs(newlink.getConnectorLinkConfig().getConfigs(), newConfigurationObject); - - ConfigValidator configValidator = connectorConfigValidator.validateConfigForLink(newConfigurationObject); - if (configValidator.getStatus().canProceed()) { - updateLink(newlink, tx); - } else { - logInvalidModelObject("link", newlink, configValidator); - upgradeSuccessful = false; + Object newConfigurationObject = ClassUtils.instantiate(connector + .getLinkConfigurationClass()); + ConfigUtils.fromConfigs(newlink.getConnectorLinkConfig().getConfigs(), + newConfigurationObject); + // 7. Run link config validation + ConfigValidator configValidator = connectorConfigValidator + .validateConfigForLink(newConfigurationObject); + if (configValidator.getStatus().canProceed()) { + updateLink(newlink, tx); + } else { + // If any invalid links or jobs detected, throw an exception + // and stop the bootup of Sqoop server + logInvalidModelObject("link", newlink, configValidator); + upgradeSuccessful = false; + } } } - for (MJob job : jobsByConnector) { - // Make a new copy of the configs - // else the values will get set in the configs in the connector for - // each job. - List fromConfig = newConnector.getConfig(Direction.FROM).clone(false).getConfigs(); - List toConfig = newConnector.getConfig(Direction.TO).clone(false).getConfigs(); - - // New FROM direction configs, old TO direction configs. - if (job.getConnectorId(Direction.FROM) == newConnector.getPersistenceId()) { - MFromConfig newFromConfig = new MFromConfig(fromConfig); - MFromConfig oldFromCOnfig = job.getFromJobConfig(); - upgrader.upgrade(oldFromCOnfig, newFromConfig); - - MToConfig oldToConfig = job.getToJobConfig(); - MJob newJob = new MJob(job, newFromConfig, oldToConfig, job.getDriverConfig()); - updateJob(newJob, tx); - } - - // Old FROM direction configs, new TO direction configs. - if (job.getConnectorId(Direction.TO) == newConnector.getPersistenceId()) { - - MToConfig oldToConfig = job.getToJobConfig(); - MToConfig newToConfig = new MToConfig(toConfig); - upgrader.upgrade(oldToConfig, newToConfig); - MFromConfig oldFromConfig = job.getFromJobConfig(); - MJob newJob = new MJob(job, oldFromConfig, newToConfig, job.getDriverConfig()); - updateJob(newJob, tx); + // 8. Run upgrade logic for the configs related to the job objects + if (existingJobsByConnector != null) { + for (MJob job : existingJobsByConnector) { + // every job has 2 parts, the FROM and the TO links and their + // corresponding connectors. + List fromConfig = newConnector.getFromConfig().clone(false).getConfigs(); + if (job.getFromConnectorId() == newConnector.getPersistenceId()) { + MFromConfig newFromConfig = new MFromConfig(fromConfig); + MFromConfig oldFromCOnfig = job.getFromJobConfig(); + upgrader.upgradeFromJobConfig(oldFromCOnfig, newFromConfig); + MToConfig oldToConfig = job.getToJobConfig(); + // create a job with new FROM direction configs but old TO direction + // configs + MJob newJob = new MJob(job, newFromConfig, oldToConfig, job.getDriverConfig()); + // TODO( jarcec) : will add the job config validation logic similar + // to the link config validation before updating job + updateJob(newJob, tx); + } + List toConfig = newConnector.getToConfig().clone(false).getConfigs(); + if (job.getToConnectorId() == newConnector.getPersistenceId()) { + MToConfig oldToConfig = job.getToJobConfig(); + MToConfig newToConfig = new MToConfig(toConfig); + upgrader.upgradeToJobConfig(oldToConfig, newToConfig); + MFromConfig oldFromConfig = job.getFromJobConfig(); + // create a job with old FROM direction configs but new TO direction + // configs + MJob newJob = new MJob(job, oldFromConfig, newToConfig, job.getDriverConfig()); + // TODO( jarcec) : will add the job config validation logic similar + // to the link config validation before updating job + updateJob(newJob, tx); + } } } @@ -475,20 +479,20 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec throw new SqoopException(RepositoryError.JDBCREPO_0027); } } catch (SqoopException ex) { - if(tx != null) { + if (tx != null) { tx.rollback(); } throw ex; } catch (Exception ex) { - if(tx != null) { + if (tx != null) { tx.rollback(); } throw new SqoopException(RepositoryError.JDBCREPO_0000, ex); } finally { - if(tx != null) { + if (tx != null) { tx.close(); } - LOG.info("Metadata upgrade finished for connector: " + oldConnector.getUniqueName()); + LOG.info("Connector upgrade finished: " + oldConnector.getUniqueName()); } } @@ -496,31 +500,38 @@ public final void upgradeDriver(MDriver driver) { LOG.info("Upgrading driver"); RepositoryTransaction tx = null; try { - RepositoryUpgrader upgrader = Driver.getInstance().getDriverConfigRepositoryUpgrader(); - List jobs = findJobs(); - + //1. find upgrader + DriverUpgrader upgrader = Driver.getInstance().getConfigurableUpgrader(); + //2. find all jobs in the system + List existingJobs = findJobs(); Validator validator = Driver.getInstance().getValidator(); boolean upgradeSuccessful = true; // -- BEGIN TXN -- tx = getTransaction(); tx.begin(); - deleteJobs(jobs, tx); - upgradeDriver(driver, tx); + //3. delete all jobs in the system + deleteJobs(existingJobs, tx); + // 4. Delete all inputs and configs associated with the driver, and + // insert the new configs and inputs for this driver + upgradeDriverConfigs(driver, tx); - for (MJob job : jobs) { + for (MJob job : existingJobs) { // Make a new copy of the configs MDriverConfig driverConfig = driver.getDriverConfig().clone(false); MDriver newDriver = new MDriver(driverConfig, DriverBean.CURRENT_DRIVER_VERSION); - upgrader.upgrade(job.getDriverConfig(), newDriver.getDriverConfig()); + // At this point, the driver only supports JOB config type + upgrader.upgradeJobConfig(job.getDriverConfig(), newDriver.getDriverConfig()); + // create a new job with old FROM and TO configs but new driver configs MJob newJob = new MJob(job, job.getFromJobConfig(), job.getToJobConfig(), newDriver.getDriverConfig()); - // Transform config structures to objects for validations - Object newConfigurationObject = ClassUtils.instantiate(Driver.getInstance().getDriverConfigurationGroupClass()); + Object newConfigurationObject = ClassUtils.instantiate(Driver.getInstance().getDriverJobConfigurationClass()); ConfigUtils.fromConfigs(newJob.getDriverConfig().getConfigs(), newConfigurationObject); + // 5. validate configs ConfigValidator validation = validator.validateConfigForJob(newConfigurationObject); if (validation.getStatus().canProceed()) { + // 6. update job updateJob(newJob, tx); } else { logInvalidModelObject("job", newJob, validation); diff --git a/core/src/test/java/org/apache/sqoop/driver/TestDriverConfigUpgrader.java b/core/src/test/java/org/apache/sqoop/driver/TestDriverConfigUpgrader.java index dc4e8c82..e5201fc0 100644 --- a/core/src/test/java/org/apache/sqoop/driver/TestDriverConfigUpgrader.java +++ b/core/src/test/java/org/apache/sqoop/driver/TestDriverConfigUpgrader.java @@ -26,6 +26,7 @@ import org.apache.sqoop.model.MConfig; import org.apache.sqoop.model.MConfigList; +import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MInput; import org.apache.sqoop.model.MIntegerInput; import org.apache.sqoop.model.MStringInput; @@ -36,30 +37,31 @@ */ public class TestDriverConfigUpgrader { - DriverConfigUpgrader upgrader; + DriverUpgrader upgrader; @Before public void initializeUpgrader() { - upgrader = new DriverConfigUpgrader(); + upgrader = new DriverUpgrader(); } + /** - * We take the same configs on input and output and we - * expect that all values will be correctly transferred. + * We take the same configs on input and output and we expect that all values + * will be correctly transferred. */ @Test public void testJobConfigTyeUpgrade() { - MConfigList original = job(); - MConfigList target = job(); + MDriverConfig original = job(); + MDriverConfig target = job(); original.getStringInput("f1.s1").setValue("A"); original.getStringInput("f1.s2").setValue("B"); original.getIntegerInput("f1.i").setValue(3); - upgrader.upgrade(original, target); + upgrader.upgradeJobConfig(original, target); assertEquals("A", target.getStringInput("f1.s1").getValue()); assertEquals("B", target.getStringInput("f1.s2").getValue()); - assertEquals(3, (long)target.getIntegerInput("f1.i").getValue()); + assertEquals(3, (long) target.getIntegerInput("f1.i").getValue()); } /** @@ -67,54 +69,54 @@ public void testJobConfigTyeUpgrade() { */ @Test public void testNonExistingInput() { - MConfigList original = job1(); - MConfigList target = job2(); + MDriverConfig original = job1(); + MDriverConfig target = job2(); original.getStringInput("f1.s1").setValue("A"); original.getStringInput("f1.s2").setValue("B"); original.getIntegerInput("f1.i").setValue(3); - upgrader.upgrade(original, target); + upgrader.upgradeJobConfig(original, target); assertEquals("A", target.getStringInput("f1.s1").getValue()); assertNull(target.getStringInput("f1.s2_").getValue()); - assertEquals(3, (long)target.getIntegerInput("f1.i").getValue()); + assertEquals(3, (long) target.getIntegerInput("f1.i").getValue()); } /** - * Upgrade scenario when entire has been added in the target and - * therefore is missing in the original. + * Upgrade scenario when entire has been added in the target and therefore is + * missing in the original. */ @Test public void testNonExistingConfig() { - MConfigList original = job1(); - MConfigList target = job3(); + MDriverConfig original = job1(); + MDriverConfig target = job3(); original.getStringInput("f1.s1").setValue("A"); original.getStringInput("f1.s2").setValue("B"); original.getIntegerInput("f1.i").setValue(3); - upgrader.upgrade(original, target); + upgrader.upgradeJobConfig(original, target); assertNull(target.getStringInput("f2.s1").getValue()); assertNull(target.getStringInput("f2.s2").getValue()); assertNull(target.getIntegerInput("f2.i").getValue()); } - MConfigList job() { - return new MConfigList(configs1()); + MDriverConfig job() { + return new MDriverConfig(configs1()); } - MConfigList job1() { - return new MConfigList(configs1()); + MDriverConfig job1() { + return new MDriverConfig(configs1()); } - MConfigList job2() { - return new MConfigList(configs2()); + MDriverConfig job2() { + return new MDriverConfig(configs2()); } - MConfigList job3() { - return new MConfigList(configs3()); + MDriverConfig job3() { + return new MDriverConfig(configs3()); } List configs1() { @@ -125,8 +127,8 @@ List configs1() { List> inputs1(String formName) { List> list = new LinkedList>(); - list.add(new MStringInput(formName + ".s1", false, (short)30)); - list.add(new MStringInput(formName + ".s2", false, (short)30)); + list.add(new MStringInput(formName + ".s1", false, (short) 30)); + list.add(new MStringInput(formName + ".s2", false, (short) 30)); list.add(new MIntegerInput(formName + ".i", false)); return list; } @@ -139,8 +141,8 @@ List configs2() { List> inputs2(String formName) { List> list = new LinkedList>(); - list.add(new MStringInput(formName + ".s1", false, (short)30)); - list.add(new MStringInput(formName + ".s2_", false, (short)30)); + list.add(new MStringInput(formName + ".s1", false, (short) 30)); + list.add(new MStringInput(formName + ".s2_", false, (short) 30)); list.add(new MIntegerInput(formName + ".i", false)); return list; } @@ -150,4 +152,4 @@ List configs3() { list.add(new MConfig("f2", inputs1("f2"))); return list; } -} \ No newline at end of file +} diff --git a/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java b/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java index 3b475c6a..5bc1b036 100644 --- a/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java +++ b/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java @@ -71,10 +71,10 @@ public void testCreateJobSubmission() { @Test public void testGetConnector() { - when(connectorMgrMock.getConnector(123l)).thenReturn(sqoopConnectorMock); + when(connectorMgrMock.getSqoopConnector(123l)).thenReturn(sqoopConnectorMock); when(sqoopConnectorMock.getSupportedDirections()).thenReturn(getSupportedDirections()); - assertEquals(jobManager.getConnector(123l), sqoopConnectorMock); - verify(connectorMgrMock, times(1)).getConnector(123l); + assertEquals(jobManager.getSqoopConnector(123l), sqoopConnectorMock); + verify(connectorMgrMock, times(1)).getSqoopConnector(123l); } @Test diff --git a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java index 34bd8a55..ff9e0c33 100644 --- a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java +++ b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java @@ -43,14 +43,14 @@ import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.ConnectorManager; -import org.apache.sqoop.connector.spi.RepositoryUpgrader; +import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.driver.Driver; +import org.apache.sqoop.driver.DriverUpgrader; import org.apache.sqoop.json.DriverBean; import org.apache.sqoop.model.ConfigUtils; import org.apache.sqoop.model.ConfigurationClass; import org.apache.sqoop.model.MConfig; -import org.apache.sqoop.model.MConfigList; import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MDriver; import org.apache.sqoop.model.MDriverConfig; @@ -65,7 +65,6 @@ import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; -import org.mockito.Mockito; public class TestJdbcRepository { @@ -75,7 +74,8 @@ public class TestJdbcRepository { private Driver driverMock; private JdbcRepositoryHandler repoHandlerMock; private Validator validatorMock; - private RepositoryUpgrader upgraderMock; + private ConnectorConfigurableUpgrader connectorUpgraderMock; + private DriverUpgrader driverUpgraderMock; private ConfigValidator validRepoMock; private ConfigValidator invalidRepoMock; @@ -87,7 +87,8 @@ public void setUp() throws Exception { driverMock = mock(Driver.class); repoHandlerMock = mock(JdbcRepositoryHandler.class); validatorMock = mock(Validator.class); - upgraderMock = mock(RepositoryUpgrader.class); + connectorUpgraderMock = mock(ConnectorConfigurableUpgrader.class); + driverUpgraderMock = mock(DriverUpgrader.class); repoSpy = spy(new JdbcRepository(repoHandlerMock, null)); // setup transaction and connector manager @@ -100,8 +101,15 @@ public void setUp() throws Exception { invalidRepoMock = mock(ConfigValidator.class); when(invalidRepoMock.getStatus()).thenReturn(Status.UNACCEPTABLE); - doNothing().when(upgraderMock).upgrade(any(MLinkConfig.class), any(MLinkConfig.class)); - doNothing().when(upgraderMock).upgrade(any(MFromConfig.class), any(MFromConfig.class)); + doNothing().when(connectorUpgraderMock).upgradeLinkConfig(any(MLinkConfig.class), + any(MLinkConfig.class)); + doNothing().when(connectorUpgraderMock).upgradeFromJobConfig(any(MFromConfig.class), + any(MFromConfig.class)); + doNothing().when(connectorUpgraderMock).upgradeToJobConfig(any(MToConfig.class), + any(MToConfig.class)); + doNothing().when(driverUpgraderMock).upgradeJobConfig(any(MDriverConfig.class), + any(MDriverConfig.class)); + } /** @@ -117,14 +125,14 @@ public void testConnectorConfigEnableAutoUpgrade() { // make the upgradeConnector to throw an exception to prove that it has been called SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, "upgradeConnector() has been called."); - doThrow(exception).when(connectorMgrMock).getConnector(anyString()); + doThrow(exception).when(connectorMgrMock).getSqoopConnector(anyString()); try { repoSpy.registerConnector(newConnector, true); } catch (SqoopException ex) { assertEquals(ex.getMessage(), exception.getMessage()); verify(repoHandlerMock, times(1)).findConnector(anyString(), any(Connection.class)); - verify(connectorMgrMock, times(1)).getConnector(anyString()); + verify(connectorMgrMock, times(1)).getSqoopConnector(anyString()); verifyNoMoreInteractions(repoHandlerMock); return ; } @@ -218,28 +226,29 @@ public void testConnectorConfigUpgradeWithValidLinksAndJobs() { when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock); when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock); when(sqconnector.getConfigValidator()).thenReturn(validatorMock); - when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock); + when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock); when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyLinkConfiguration.class); when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn( EmptyJobConfiguration.class); - when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector); + when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector); // prepare the links and jobs + // the connector Id for both are the same List linkList = links(link(1,1), link(2,1)); - List jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); + List jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,2)); // mock necessary methods for upgradeConnector() procedure doReturn(linkList).when(repoSpy).findLinksForConnector(anyLong()); doReturn(jobList).when(repoSpy).findJobsForConnector(anyLong()); doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class)); doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class)); - doNothing().when(repoSpy).upgradeConnector(any(MConnector.class), any(RepositoryTransaction.class)); + doNothing().when(repoSpy).upgradeConnectorConfigs(any(MConnector.class), any(RepositoryTransaction.class)); repoSpy.upgradeConnector(oldConnector, newConnector); InOrder repoOrder = inOrder(repoSpy); InOrder txOrder = inOrder(repoTransactionMock); - InOrder upgraderOrder = inOrder(upgraderMock); + InOrder upgraderOrder = inOrder(connectorUpgraderMock); InOrder validatorOrder = inOrder(validatorMock); repoOrder.verify(repoSpy, times(1)).findLinksForConnector(anyLong()); @@ -249,7 +258,7 @@ public void testConnectorConfigUpgradeWithValidLinksAndJobs() { repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(1, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(2, repoTransactionMock); - repoOrder.verify(repoSpy, times(1)).upgradeConnector(any(MConnector.class), any(RepositoryTransaction.class)); + repoOrder.verify(repoSpy, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(RepositoryTransaction.class)); repoOrder.verify(repoSpy, times(2)).updateLink(any(MLink.class), any(RepositoryTransaction.class)); repoOrder.verify(repoSpy, times(4)).updateJob(any(MJob.class), any(RepositoryTransaction.class)); repoOrder.verifyNoMoreInteractions(); @@ -257,8 +266,11 @@ public void testConnectorConfigUpgradeWithValidLinksAndJobs() { txOrder.verify(repoTransactionMock, times(1)).commit(); txOrder.verify(repoTransactionMock, times(1)).close(); txOrder.verifyNoMoreInteractions(); - upgraderOrder.verify(upgraderMock, times(2)).upgrade(any(MLinkConfig.class), any(MLinkConfig.class)); - upgraderOrder.verify(upgraderMock, times(4)).upgrade(any(MFromConfig.class), any(MFromConfig.class)); + upgraderOrder.verify(connectorUpgraderMock, times(2)).upgradeLinkConfig(any(MLinkConfig.class), any(MLinkConfig.class)); + upgraderOrder.verify(connectorUpgraderMock, times(1)).upgradeFromJobConfig(any(MFromConfig.class), any(MFromConfig.class)); + upgraderOrder.verify(connectorUpgraderMock, times(1)).upgradeToJobConfig(any(MToConfig.class), any(MToConfig.class)); + upgraderOrder.verify(connectorUpgraderMock, times(1)).upgradeFromJobConfig(any(MFromConfig.class), any(MFromConfig.class)); + upgraderOrder.verify(connectorUpgraderMock, times(1)).upgradeToJobConfig(any(MToConfig.class), any(MToConfig.class)); upgraderOrder.verifyNoMoreInteractions(); validatorOrder.verify(validatorMock, times(2)).validateConfigForLink(anyObject()); // @TODO(Abe): Re-enable job validation? @@ -277,34 +289,34 @@ public void testDriverConfigUpgradeWithValidJobs() { when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock); when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock); when(driverMock.getValidator()).thenReturn(validatorMock); - when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock); - when(driverMock.getDriverConfigurationGroupClass()).thenReturn(EmptyJobConfiguration.class); + when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock); + when(driverMock.getDriverJobConfigurationClass()).thenReturn(EmptyJobConfiguration.class); List jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); doReturn(jobList).when(repoSpy).findJobs(); doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class)); doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class)); - doNothing().when(repoSpy).upgradeDriver(any(MDriver.class), any(RepositoryTransaction.class)); + doNothing().when(repoSpy).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class)); repoSpy.upgradeDriver(newDriverConfig); InOrder repoOrder = inOrder(repoSpy); InOrder txOrder = inOrder(repoTransactionMock); - InOrder upgraderOrder = inOrder(upgraderMock); + InOrder upgraderOrder = inOrder(driverUpgraderMock); InOrder validatorOrder = inOrder(validatorMock); repoOrder.verify(repoSpy, times(1)).findJobs(); repoOrder.verify(repoSpy, times(1)).getTransaction(); repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock); - repoOrder.verify(repoSpy, times(1)).upgradeDriver(any(MDriver.class), any(RepositoryTransaction.class)); + repoOrder.verify(repoSpy, times(1)).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class)); repoOrder.verify(repoSpy, times(2)).updateJob(any(MJob.class), any(RepositoryTransaction.class)); repoOrder.verifyNoMoreInteractions(); txOrder.verify(repoTransactionMock, times(1)).begin(); txOrder.verify(repoTransactionMock, times(1)).commit(); txOrder.verify(repoTransactionMock, times(1)).close(); txOrder.verifyNoMoreInteractions(); - upgraderOrder.verify(upgraderMock, times(2)).upgrade(any(MConfigList.class), any(MConfigList.class)); + upgraderOrder.verify(driverUpgraderMock, times(2)).upgradeJobConfig(any(MDriverConfig.class), any(MDriverConfig.class)); upgraderOrder.verifyNoMoreInteractions(); validatorOrder.verify(validatorMock, times(2)).validateConfigForJob(anyObject()); validatorOrder.verifyNoMoreInteractions(); @@ -321,13 +333,13 @@ public void testDriverConfigUpgradeWithInvalidJobs() { when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(invalidRepoMock); when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(invalidRepoMock); when(driverMock.getValidator()).thenReturn(validatorMock); - when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock); - when(driverMock.getDriverConfigurationGroupClass()).thenReturn(EmptyJobConfiguration.class); + when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock); + when(driverMock.getDriverJobConfigurationClass()).thenReturn(EmptyJobConfiguration.class); List jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); doReturn(jobList).when(repoSpy).findJobs(); doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class)); - doNothing().when(repoSpy).upgradeDriver(any(MDriver.class), any(RepositoryTransaction.class)); + doNothing().when(repoSpy).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class)); try { repoSpy.upgradeDriver(newDriverConfig); @@ -336,20 +348,20 @@ public void testDriverConfigUpgradeWithInvalidJobs() { InOrder repoOrder = inOrder(repoSpy); InOrder txOrder = inOrder(repoTransactionMock); - InOrder upgraderOrder = inOrder(upgraderMock); + InOrder upgraderOrder = inOrder(driverUpgraderMock); InOrder validatorOrder = inOrder(validatorMock); repoOrder.verify(repoSpy, times(1)).findJobs(); repoOrder.verify(repoSpy, times(1)).getTransaction(); repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock); - repoOrder.verify(repoSpy, times(1)).upgradeDriver(any(MDriver.class), any(RepositoryTransaction.class)); + repoOrder.verify(repoSpy, times(1)).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class)); repoOrder.verifyNoMoreInteractions(); txOrder.verify(repoTransactionMock, times(1)).begin(); txOrder.verify(repoTransactionMock, times(1)).rollback(); txOrder.verify(repoTransactionMock, times(1)).close(); txOrder.verifyNoMoreInteractions(); - upgraderOrder.verify(upgraderMock, times(2)).upgrade(any(MConfigList.class), any(MConfigList.class)); + upgraderOrder.verify(driverUpgraderMock, times(2)).upgradeJobConfig(any(MDriverConfig.class), any(MDriverConfig.class)); upgraderOrder.verifyNoMoreInteractions(); // driver configs are per job. validatorOrder.verify(validatorMock, times(2)).validateConfigForJob(anyObject()); @@ -371,8 +383,8 @@ public void testConnectorConfigUpgradeHandlerWithFindLinksForConnectorError() { SqoopConnector sqconnector = mock(SqoopConnector.class); when(sqconnector.getConfigValidator()).thenReturn(validatorMock); - when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock); - when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector); + when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock); + when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector); SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, "find links for connector error."); @@ -401,8 +413,8 @@ public void testConnectorConfigUpgradeHandlerWithFindJobsForConnectorError() { SqoopConnector sqconnector = mock(SqoopConnector.class); when(sqconnector.getConfigValidator()).thenReturn(validatorMock); - when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock); - when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector); + when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock); + when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector); List linkList = links(link(1,1), link(2,1)); doReturn(linkList).when(repoHandlerMock).findLinksForConnector(anyLong(), any(Connection.class)); @@ -435,8 +447,8 @@ public void testConnectorConfigUpgradeHandlerWithDeleteJobInputsError() { SqoopConnector sqconnector = mock(SqoopConnector.class); when(sqconnector.getConfigValidator()).thenReturn(validatorMock); - when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock); - when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector); + when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock); + when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector); List linkList = links(link(1,1), link(2,1)); List jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); @@ -472,8 +484,8 @@ public void testConnectorConfigUpgradeHandlerWithDeleteLinkInputsError() { SqoopConnector sqconnector = mock(SqoopConnector.class); when(sqconnector.getConfigValidator()).thenReturn(validatorMock); - when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock); - when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector); + when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock); + when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector); List linkList = links(link(1,1), link(2,1)); List jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); @@ -511,8 +523,8 @@ public void testConnectorConfigUpgradeHandlerWithUpdateConnectorError() { SqoopConnector sqconnector = mock(SqoopConnector.class); when(sqconnector.getConfigValidator()).thenReturn(validatorMock); - when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock); - when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector); + when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock); + when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector); List linkList = links(link(1,1), link(2,1)); List jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); @@ -523,7 +535,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateConnectorError() { SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, "update connector error."); - doThrow(exception).when(repoHandlerMock).upgradeConnector(any(MConnector.class), any(Connection.class)); + doThrow(exception).when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class)); try { repoSpy.upgradeConnector(oldConnector, newConnector); @@ -533,7 +545,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateConnectorError() { verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(1)).upgradeConnector(any(MConnector.class), any(Connection.class)); + verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; } @@ -554,10 +566,10 @@ public void testConnectorConfigUpgradeHandlerWithUpdateLinkError() { when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock); when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock); when(sqconnector.getConfigValidator()).thenReturn(validatorMock); - when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock); + when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock); when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyLinkConfiguration.class); when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(EmptyJobConfiguration.class); - when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector); + when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector); List linkList = links(link(1,1), link(2,1)); List jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); @@ -565,7 +577,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateLinkError() { doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class)); doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class)); - doNothing().when(repoHandlerMock).upgradeConnector(any(MConnector.class), any(Connection.class)); + doNothing().when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class)); doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class)); SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, @@ -580,7 +592,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateLinkError() { verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(1)).upgradeConnector(any(MConnector.class), any(Connection.class)); + verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class)); verify(repoHandlerMock, times(1)).existsLink(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(1)).updateLink(any(MLink.class), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); @@ -603,10 +615,10 @@ public void testConnectorConfigUpgradeHandlerWithUpdateJobError() { when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock); when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock); when(sqconnector.getConfigValidator()).thenReturn(validatorMock); - when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock); + when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock); when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyLinkConfiguration.class); when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(EmptyJobConfiguration.class); - when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector); + when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector); List linkList = links(link(1,1), link(2,1)); List jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); @@ -614,7 +626,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateJobError() { doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class)); doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class)); - doNothing().when(repoHandlerMock).upgradeConnector(any(MConnector.class), any(Connection.class)); + doNothing().when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class)); doNothing().when(repoHandlerMock).updateLink(any(MLink.class), any(Connection.class)); doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class)); doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class)); @@ -631,7 +643,7 @@ public void testConnectorConfigUpgradeHandlerWithUpdateJobError() { verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(1)).upgradeConnector(any(MConnector.class), any(Connection.class)); + verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class)); verify(repoHandlerMock, times(2)).existsLink(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).updateLink(any(MLink.class), any(Connection.class)); verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class)); @@ -652,7 +664,7 @@ public void testDriverConfigUpgradeHandlerWithFindJobsError() { MDriver newDriverConfig = driver(); when(driverMock.getValidator()).thenReturn(validatorMock); - when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock); + when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock); SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, "find jobs error."); @@ -679,7 +691,7 @@ public void testDriverConfigUpgradeHandlerWithDeleteJobInputsError() { MDriver newDriverConfig = driver(); when(driverMock.getValidator()).thenReturn(validatorMock); - when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock); + when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock); List jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class)); @@ -710,7 +722,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateDriverConfigError() { MDriver newDriverConfig = driver(); when(driverMock.getValidator()).thenReturn(validatorMock); - when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock); + when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock); List jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class)); @@ -719,7 +731,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateDriverConfigError() { SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, "update driverConfig entity error."); - doThrow(exception).when(repoHandlerMock).upgradeDriver(any(MDriver.class), any(Connection.class)); + doThrow(exception).when(repoHandlerMock).upgradeDriverConfigs(any(MDriver.class), any(Connection.class)); try { repoSpy.upgradeDriver(newDriverConfig); @@ -727,7 +739,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateDriverConfigError() { assertEquals(ex.getMessage(), exception.getMessage()); verify(repoHandlerMock, times(1)).findJobs(any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(1)).upgradeDriver(any(MDriver.class), any(Connection.class)); + verify(repoHandlerMock, times(1)).upgradeDriverConfigs(any(MDriver.class), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; } @@ -747,12 +759,12 @@ public void testDriverConfigUpgradeHandlerWithUpdateJobError() { when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock); when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock); when(driverMock.getValidator()).thenReturn(validatorMock); - when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock); - when(driverMock.getDriverConfigurationGroupClass()).thenReturn(EmptyJobConfiguration.class); + when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock); + when(driverMock.getDriverJobConfigurationClass()).thenReturn(EmptyJobConfiguration.class); List jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class)); doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); - doNothing().when(repoHandlerMock).upgradeDriver(any(MDriver.class), any(Connection.class)); + doNothing().when(repoHandlerMock).upgradeDriverConfigs(any(MDriver.class), any(Connection.class)); doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class)); SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, @@ -765,7 +777,7 @@ public void testDriverConfigUpgradeHandlerWithUpdateJobError() { assertEquals(ex.getMessage(), exception.getMessage()); verify(repoHandlerMock, times(1)).findJobs(any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(1)).upgradeDriver(any(MDriver.class), any(Connection.class)); + verify(repoHandlerMock, times(1)).upgradeDriverConfigs(any(MDriver.class), any(Connection.class)); verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(1)).updateJob(any(MJob.class), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java index c888910b..aa58850f 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java @@ -138,7 +138,7 @@ private void insertConfigsForDriver(MDriver mDriver, Connection conn) { * repository. The job and connector configs within mc will get * updated with the id of the configs when this function returns. * @param mc The connector to use for updating configs - * @param conn JDBC link to use for updating the configs + * @param conn JDBC connection to use for inserting the configs */ private void insertConfigsForConnector (MConnector mc, Connection conn) { long connectorId = mc.getPersistenceId(); @@ -151,17 +151,18 @@ private void insertConfigsForConnector (MConnector mc, Connection conn) { baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE, Statement.RETURN_GENERATED_KEYS); - // Register link type config + // Register link type config for connector + // NOTE: The direction is null for LINK type registerConfigs(connectorId, null, mc.getLinkConfig().getConfigs(), MConfigType.LINK.name(), baseConfigStmt, baseInputStmt, conn); - // Register both from/to job type config + // Register both from/to job type config for connector if (mc.getSupportedDirections().isDirectionSupported(Direction.FROM)) { - registerConfigs(connectorId, Direction.FROM, mc.getConfig(Direction.FROM).getConfigs(), + registerConfigs(connectorId, Direction.FROM, mc.getFromConfig().getConfigs(), MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn); } if (mc.getSupportedDirections().isDirectionSupported(Direction.TO)) { - registerConfigs(connectorId, Direction.TO, mc.getConfig(Direction.TO).getConfigs(), + registerConfigs(connectorId, Direction.TO, mc.getToConfig().getConfigs(), MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn); } } catch (SQLException ex) { @@ -752,8 +753,8 @@ protected long registerHdfsConnector(Connection conn) { for (URL url : connectorConfigs) { handler = new ConnectorHandler(url); - if (handler.getMetadata().getPersistenceId() != -1) { - return handler.getMetadata().getPersistenceId(); + if (handler.getConnectorConfigurable().getPersistenceId() != -1) { + return handler.getConnectorConfigurable().getPersistenceId(); } if (handler.getUniqueName().equals(CONNECTOR_HDFS)) { @@ -761,8 +762,8 @@ protected long registerHdfsConnector(Connection conn) { PreparedStatement baseConnectorStmt = conn.prepareStatement( STMT_INSERT_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS, Statement.RETURN_GENERATED_KEYS); - baseConnectorStmt.setString(1, handler.getMetadata().getUniqueName()); - baseConnectorStmt.setString(2, handler.getMetadata().getClassName()); + baseConnectorStmt.setString(1, handler.getConnectorConfigurable().getUniqueName()); + baseConnectorStmt.setString(2, handler.getConnectorConfigurable().getClassName()); baseConnectorStmt.setString(3, "0"); if (baseConnectorStmt.executeUpdate() == 1) { ResultSet rsetConnectorId = baseConnectorStmt.getGeneratedKeys(); @@ -1229,9 +1230,7 @@ public List findLinksForConnector(long connectorID, Connection conn) { try { stmt = conn.prepareStatement(STMT_SELECT_LINK_FOR_CONNECTOR); stmt.setLong(1, connectorID); - return loadLinks(stmt, conn); - } catch (SQLException ex) { logException(ex, connectorID); throw new SqoopException(DerbyRepoError.DERBYREPO_0023, ex); @@ -1244,7 +1243,12 @@ public List findLinksForConnector(long connectorID, Connection conn) { * {@inheritDoc} */ @Override - public void upgradeConnector(MConnector mConnector, Connection conn) { + public void upgradeConnectorConfigs(MConnector mConnector, Connection conn) { + updateConnectorAndDeleteConfigs(mConnector, conn); + insertConfigsForConnector(mConnector, conn); + } + + private void updateConnectorAndDeleteConfigs(MConnector mConnector, Connection conn) { PreparedStatement updateConnectorStatement = null; PreparedStatement deleteConfig = null; PreparedStatement deleteInput = null; @@ -1271,15 +1275,19 @@ public void upgradeConnector(MConnector mConnector, Connection conn) { } finally { closeStatements(updateConnectorStatement, deleteConfig, deleteInput); } - insertConfigsForConnector(mConnector, conn); - } /** * {@inheritDoc} */ @Override - public void upgradeDriver(MDriver mDriver, Connection conn) { + public void upgradeDriverConfigs(MDriver mDriver, Connection conn) { + updateDriverAndDeleteConfigs(mDriver, conn); + createOrUpdateDriverSystemVersion(conn, mDriver.getVersion()); + insertConfigsForDriver(mDriver, conn); + } + + private void updateDriverAndDeleteConfigs(MDriver mDriver, Connection conn) { PreparedStatement deleteConfig = null; PreparedStatement deleteInput = null; try { @@ -1295,8 +1303,6 @@ public void upgradeDriver(MDriver mDriver, Connection conn) { } finally { closeStatements(deleteConfig, deleteInput); } - createOrUpdateDriverSystemVersion(conn, mDriver.getVersion()); - insertConfigsForDriver(mDriver, conn); } /** diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java index 95fbe07c..bbf721f2 100644 --- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java +++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java @@ -125,7 +125,7 @@ public void testDriverVersion() throws Exception { + DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION + "'"); assertEquals(lowerVersion, getDriverVersion()); - handler.upgradeDriver(driver, getDerbyDatabaseConnection()); + handler.upgradeDriverConfigs(driver, getDerbyDatabaseConnection()); assertEquals(CURRENT_DRIVER_VERSION, driver.getVersion()); diff --git a/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java index 7109ae51..c50e0297 100644 --- a/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java +++ b/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java @@ -70,7 +70,7 @@ public JsonBean handleEvent(RequestContext ctx) { LOG.info("ConnectorRequestHandler handles cid: " + cid); if (cid.equals("all")) { // display all connectors - connectors = ConnectorManager.getInstance().getConnectorsMetadata(); + connectors = ConnectorManager.getInstance().getConnectorConfigurables(); bundles = ConnectorManager.getInstance().getResourceBundles(locale); AuditLoggerManager.getInstance() @@ -87,7 +87,7 @@ public JsonBean handleEvent(RequestContext ctx) { connectors = new LinkedList(); bundles = new HashMap(); - connectors.add(ConnectorManager.getInstance().getConnectorConfig(id)); + connectors.add(ConnectorManager.getInstance().getConnectorConfigurable(id)); bundles.put(id, ConnectorManager.getInstance().getResourceBundle(id, locale)); AuditLoggerManager.getInstance() diff --git a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java index 462579cc..0cd5acbd 100644 --- a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java +++ b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java @@ -164,10 +164,10 @@ private JsonBean createUpdateJob(RequestContext ctx, boolean update) { // Verify that user is not trying to spoof us MFromConfig fromConfig = ConnectorManager.getInstance() - .getConnectorConfig(job.getConnectorId(Direction.FROM)) + .getConnectorConfigurable(job.getConnectorId(Direction.FROM)) .getFromConfig(); MToConfig toConfig = ConnectorManager.getInstance() - .getConnectorConfig(job.getConnectorId(Direction.TO)) + .getConnectorConfigurable(job.getConnectorId(Direction.TO)) .getToConfig(); MDriverConfig driverConfig = Driver.getInstance().getDriver().getDriverConfig(); @@ -179,8 +179,8 @@ private JsonBean createUpdateJob(RequestContext ctx, boolean update) { } // Corresponding connectors for this - SqoopConnector fromConnector = ConnectorManager.getInstance().getConnector(job.getConnectorId(Direction.FROM)); - SqoopConnector toConnector = ConnectorManager.getInstance().getConnector(job.getConnectorId(Direction.TO)); + SqoopConnector fromConnector = ConnectorManager.getInstance().getSqoopConnector(job.getConnectorId(Direction.FROM)); + SqoopConnector toConnector = ConnectorManager.getInstance().getSqoopConnector(job.getConnectorId(Direction.TO)); if (!fromConnector.getSupportedDirections().contains(Direction.FROM)) { throw new SqoopException(ServerError.SERVER_0004, "Connector " + fromConnector.getClass().getCanonicalName() @@ -196,7 +196,7 @@ private JsonBean createUpdateJob(RequestContext ctx, boolean update) { Object fromConfigObject = ClassUtils.instantiate(fromConnector.getJobConfigurationClass(Direction.FROM)); Object toConfigObject = ClassUtils.instantiate(toConnector.getJobConfigurationClass(Direction.TO)); - Object driverConfigObject = ClassUtils.instantiate(Driver.getInstance().getDriverConfigurationGroupClass()); + Object driverConfigObject = ClassUtils.instantiate(Driver.getInstance().getDriverJobConfigurationClass()); ConfigUtils.fromConfigs(job.getJobConfig(Direction.FROM).getConfigs(), fromConfigObject); ConfigUtils.fromConfigs(job.getJobConfig(Direction.TO).getConfigs(), toConfigObject); diff --git a/server/src/main/java/org/apache/sqoop/handler/LinkRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/LinkRequestHandler.java index 80e65b81..b715ad37 100644 --- a/server/src/main/java/org/apache/sqoop/handler/LinkRequestHandler.java +++ b/server/src/main/java/org/apache/sqoop/handler/LinkRequestHandler.java @@ -158,7 +158,7 @@ private JsonBean createUpdateLink(RequestContext ctx, boolean update) { // Verify that user is not trying to spoof us MLinkConfig linkConfig = - ConnectorManager.getInstance().getConnectorConfig(link.getConnectorId()) + ConnectorManager.getInstance().getConnectorConfigurable(link.getConnectorId()) .getLinkConfig(); if(!linkConfig.equals(link.getConnectorLinkConfig())) { throw new SqoopException(ServerError.SERVER_0003, @@ -166,7 +166,7 @@ private JsonBean createUpdateLink(RequestContext ctx, boolean update) { } // Responsible connector for this session - SqoopConnector connector = ConnectorManager.getInstance().getConnector(link.getConnectorId()); + SqoopConnector connector = ConnectorManager.getInstance().getSqoopConnector(link.getConnectorId()); // We need translate configs Object connectorLinkConfig = ClassUtils.instantiate(connector.getLinkConfigurationClass()); diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConfigUpgrader.java b/spi/src/main/java/org/apache/sqoop/configurable/ConfigurableUpgradeUtil.java similarity index 53% rename from connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConfigUpgrader.java rename to spi/src/main/java/org/apache/sqoop/configurable/ConfigurableUpgradeUtil.java index b17aa21d..715a61cf 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConfigUpgrader.java +++ b/spi/src/main/java/org/apache/sqoop/configurable/ConfigurableUpgradeUtil.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,16 +7,15 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ -package org.apache.sqoop.connector.hdfs; +package org.apache.sqoop.configurable; import java.util.HashMap; import java.util.List; @@ -24,37 +23,19 @@ import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.spi.RepositoryUpgrader; -import org.apache.sqoop.model.MConfigList; import org.apache.sqoop.model.MConfig; import org.apache.sqoop.model.MInput; -import org.apache.sqoop.model.MLinkConfig; -public class HdfsConfigUpgrader extends RepositoryUpgrader { - private static final Logger LOG = Logger.getLogger(HdfsConfigUpgrader.class); +public class ConfigurableUpgradeUtil { + private static final Logger LOG = Logger.getLogger(ConfigurableUpgradeUtil.class); /* * For now, there is no real upgrade. So copy all data over, * set the validation messages and error messages to be the same as for the * inputs in the original one. */ - - @Override - public void upgrade(MLinkConfig original, MLinkConfig upgradeTarget) { - doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); - } - - @Override - public void upgrade(MConfigList original, MConfigList upgradeTarget) { - doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); - } - @SuppressWarnings("unchecked") - private void doUpgrade(List original, List target) { - // Easier to find the config in the original list if we use a map. - // Since the constructor takes a list, - // index is not guaranteed to be the same, so we need to look for - // equivalence + public static void doUpgrade(List original, List target) { Map configMap = new HashMap(); for (MConfig config : original) { configMap.put(config.getName(), config); @@ -64,7 +45,7 @@ private void doUpgrade(List original, List target) { MConfig originalConfig = configMap.get(config.getName()); if (originalConfig == null) { LOG.warn("Config: '" + config.getName() + "' not present in old " + - "connector. So it and its inputs will not be transferred by the upgrader."); + "configurable. So it and its inputs will not be transferred by the upgrader."); continue; } for (MInput input : inputs) { @@ -73,7 +54,7 @@ private void doUpgrade(List original, List target) { input.setValue(originalInput.getValue()); } catch (SqoopException ex) { LOG.warn("Input: '" + input.getName() + "' not present in old " + - "connector. So it will not be transferred by the upgrader."); + "configurable. So it will not be transferred by the upgrader."); } } } diff --git a/spi/src/main/java/org/apache/sqoop/connector/ConfigurableError.java b/spi/src/main/java/org/apache/sqoop/connector/ConfigurableError.java new file mode 100644 index 00000000..4d346915 --- /dev/null +++ b/spi/src/main/java/org/apache/sqoop/connector/ConfigurableError.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector; + +import org.apache.sqoop.common.ErrorCode; + +public enum ConfigurableError implements ErrorCode { + + /** An unknown error has occurred. */ + CONFIGURABLE_0001("Link object upgrade called, but no upgrade routine provided for LINK config"), + CONFIGURABLE_0002("Job object upgrade called, but no upgrade routine provided for FROM job config"), + CONFIGURABLE_0003("Job object upgrade called, but no upgrade routine provided for TO job config"), + ; + private final String message; + + private ConfigurableError(String message) { + this.message = message; + } + + public String getCode() { + return name(); + } + + public String getMessage() { + return message; + } +} + diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/ConnectorConfigurableUpgrader.java b/spi/src/main/java/org/apache/sqoop/connector/spi/ConnectorConfigurableUpgrader.java new file mode 100644 index 00000000..a1123090 --- /dev/null +++ b/spi/src/main/java/org/apache/sqoop/connector/spi/ConnectorConfigurableUpgrader.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.connector.spi; + +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.ConfigurableError; +import org.apache.sqoop.model.MFromConfig; +import org.apache.sqoop.model.MLinkConfig; +import org.apache.sqoop.model.MToConfig; + +/** + * Configurable represents an entity that can provide configurations for the + * support config types {@linkplain ConfigType} + * This api represents the interface that configurable such as the connector/driver + * will implement to upgrade both the config and its corresponding data across different + * versions + * + */ +public abstract class ConnectorConfigurableUpgrader { + + /** + * Upgrade the original link config for the given config type and fill into the upgradeTarget. Note + * that any data already in {@code upgradeTarget} maybe overwritten. + * @param original - original config as in the repository + * @param upgradeTarget - the instance that will be filled in with the + * upgraded config + */ + public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) { + // The reasoning for throwing an exception by default is as follows. + // Sqoop calls the upgrade apis for every connector if and only if the + // corresponding link object that the config is associated with exists in the sqoop + // repository. In unexpected scenarios, if a link object is created in the + // sqoop repository without a corresponding upgrade routine for + // the link config, then this exception will be thrown to indicate a + // unexpected code path. In normal circumstances this + // scenario of having a link object for a connector without link config is + // very unlikely to happen. A likely scenario is that a connector will not have a link config and hence + // no link object will be created and thus this method will not be invoked. + throw new SqoopException(ConfigurableError.CONFIGURABLE_0001); + + } + + /** + * Upgrade the original FROM job config for the given config type and fill into the upgradeTarget. Note + * that any data already in {@code upgradeTarget} maybe overwritten. + * @param original - original config as in the repository + * @param upgradeTarget - the instance that will be filled in with the + * upgraded config + */ + + public void upgradeFromJobConfig(MFromConfig original, MFromConfig upgradeTarget) { + // see above for the reasoning behind the exception + throw new SqoopException(ConfigurableError.CONFIGURABLE_0002); + } + /** + * Upgrade the original TO job config for the given config type and fill into the upgradeTarget. Note + * that any data already in {@code upgradeTarget} maybe overwritten. + * @param original - original config as in the repository + * @param upgradeTarget - the instance that will be filled in with the + * upgraded config + */ + public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) { + // see above for the reasoning behind the exception + throw new SqoopException(ConfigurableError.CONFIGURABLE_0003); + + } + +} diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/RepositoryUpgrader.java b/spi/src/main/java/org/apache/sqoop/connector/spi/RepositoryUpgrader.java deleted file mode 100644 index 879e428f..00000000 --- a/spi/src/main/java/org/apache/sqoop/connector/spi/RepositoryUpgrader.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sqoop.connector.spi; - -import org.apache.sqoop.model.MConfigList; -import org.apache.sqoop.model.MLinkConfig; - -/** - * Repository represents the sqoop entity store. Sqoop entities include - * the connectors, links, jobs and submissions and corresponding configs. - * - */ -public abstract class RepositoryUpgrader { - - /** - * Upgrade the original link config and fill into the upgradeTarget. Note - * that any data already in {@code upgradeTarget} maybe overwritten. - * @param original - original link config as in the repository - * @param upgradeTarget - the instance that will be filled in with the - * upgraded link config. - */ - public abstract void upgrade(MLinkConfig original, MLinkConfig upgradeTarget); - /** - * Upgrade the original job config and fill into the upgradeTarget. Note - * that any config data already in {@code upgradeTarget} maybe overwritten. - * This method must be called only after the link config has - * already been upgraded. - * @param original - original job config as in the repository - * @param upgradeTarget - the instance that will be filled in with the - * upgraded job config. - * NOTE(VB): This api will be revisited to accomodate from and to job config update - */ - public abstract void upgrade(MConfigList original, MConfigList upgradeTarget); -} - diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java index 5315e1fa..6ca6c184 100644 --- a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java +++ b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java @@ -60,11 +60,13 @@ public List getSupportedDirections() { /** * @return Get link configuration group class */ + @SuppressWarnings("rawtypes") public abstract Class getLinkConfigurationClass(); /** * @return Get job configuration group class per direction type or null if not supported */ + @SuppressWarnings("rawtypes") public abstract Class getJobConfigurationClass(Direction direction); /** @@ -85,11 +87,11 @@ public List getSupportedDirections() { public abstract Validator getConfigValidator(); /** - * Returns an {@linkplain RepositoryUpgrader} object that can upgrade the + * Returns an {@linkplain ConnectorConfigurableUpgrader} object that can upgrade the * configs related to the link and job * @return RespositoryUpgrader object */ - public abstract RepositoryUpgrader getRepositoryUpgrader(); + public abstract ConnectorConfigurableUpgrader getConfigurableUpgrader(); /** * Returns the {@linkplain IntermediateDataFormat} this connector diff --git a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java index f89c546a..819cf6ab 100644 --- a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java +++ b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java @@ -139,7 +139,7 @@ private JSONObject addConnectorName(JSONObject json) { while (iterator.hasNext()) { JSONObject result = iterator.next(); Long connectorId = (Long) result.get(JSONConstants.CONNECTOR_ID); - result.put(JSONConstants.CONNECTOR_NAME, connectorManager.getConnectorConfig(connectorId).getUniqueName()); + result.put(JSONConstants.CONNECTOR_NAME, connectorManager.getConnectorConfigurable(connectorId).getUniqueName()); } return json; diff --git a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java index 76ebd3bf..8cf9cf15 100644 --- a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java +++ b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java @@ -18,6 +18,14 @@ package org.apache.sqoop.tools.tool; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -25,13 +33,15 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.io.Charsets; +import org.apache.commons.io.IOUtils; import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.VersionInfo; import org.apache.sqoop.connector.ConnectorManager; -import org.apache.sqoop.connector.spi.RepositoryUpgrader; +import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.driver.Driver; +import org.apache.sqoop.driver.DriverUpgrader; import org.apache.sqoop.json.JobBean; import org.apache.sqoop.json.LinkBean; import org.apache.sqoop.json.SubmissionBean; @@ -50,16 +60,6 @@ import org.apache.sqoop.repository.Repository; import org.apache.sqoop.repository.RepositoryManager; import org.apache.sqoop.tools.ConfiguredTool; - -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.io.IOUtils; import org.apache.sqoop.utils.ClassUtils; import org.apache.sqoop.validation.ConfigValidationResult; import org.apache.sqoop.validation.ConfigValidationRunner; @@ -140,8 +140,7 @@ private boolean load(JSONObject repo) { Repository repository = RepositoryManager.getInstance().getRepository(); ConnectorManager.getInstance().initialize(); - ConnectorManager connectorManager = ConnectorManager.getInstance(); - + LOG.info("Loading Connections"); JSONObject jsonConns = (JSONObject) repo.get(JSONConstants.LINKS); @@ -247,20 +246,21 @@ private long loadLink(MLink link) { //starting by pretending we have a brand new connection resetPersistenceId(link); - RepositoryUpgrader upgrader = Driver.getInstance().getDriverConfigRepositoryUpgrader(); Repository repository = RepositoryManager.getInstance().getRepository(); - MConnector mConnector = ConnectorManager.getInstance().getConnectorConfig(link.getConnectorId()); + MConnector mConnector = ConnectorManager.getInstance().getConnectorConfigurable(link.getConnectorId()); + ConnectorConfigurableUpgrader connectorConfigUpgrader = ConnectorManager.getInstance().getSqoopConnector(mConnector.getUniqueName()).getConfigurableUpgrader(); + List connectorConfigs = mConnector.getLinkConfig().clone(false).getConfigs(); MLinkConfig newLinkConfigs = new MLinkConfig(connectorConfigs); - // upgrading the forms to make sure they match the current repository - upgrader.upgrade(link.getConnectorLinkConfig(), newLinkConfigs); + // upgrading the configs to make sure they match the current repository + connectorConfigUpgrader.upgradeLinkConfig(link.getConnectorLinkConfig(), newLinkConfigs); MLink newLink = new MLink(link, newLinkConfigs); // Transform config structures to objects for validations - SqoopConnector connector = - ConnectorManager.getInstance().getConnector(link.getConnectorId()); + SqoopConnector connector = ConnectorManager.getInstance().getSqoopConnector( + link.getConnectorId()); Object connectorConfig = ClassUtils.instantiate( connector.getLinkConfigurationClass()); @@ -286,27 +286,32 @@ private long loadLink(MLink link) { private long loadJob(MJob job) { //starting by pretending we have a brand new job resetPersistenceId(job); + MConnector mFromConnector = ConnectorManager.getInstance().getConnectorConfigurable(job.getFromConnectorId()); + MConnector mToConnector = ConnectorManager.getInstance().getConnectorConfigurable(job.getToConnectorId()); - RepositoryUpgrader upgrader = Driver.getInstance().getDriverConfigRepositoryUpgrader(); + MFromConfig fromConfig = job.getFromJobConfig(); + MToConfig toConfig = job.getToJobConfig(); + + ConnectorConfigurableUpgrader fromConnectorConfigUpgrader = ConnectorManager.getInstance().getSqoopConnector(mFromConnector.getUniqueName()).getConfigurableUpgrader(); + ConnectorConfigurableUpgrader toConnectorConfigUpgrader = ConnectorManager.getInstance().getSqoopConnector(mToConnector.getUniqueName()).getConfigurableUpgrader(); + + fromConnectorConfigUpgrader.upgradeFromJobConfig(job.getFromJobConfig(), fromConfig); + + toConnectorConfigUpgrader.upgradeToJobConfig(job.getToJobConfig(), toConfig); + + DriverUpgrader driverConfigUpgrader = Driver.getInstance().getConfigurableUpgrader(); MDriver driver = Driver.getInstance().getDriver(); - Repository repository = RepositoryManager.getInstance().getRepository(); - MDriverConfig driverConfigs = driver.getDriverConfig(); - MFromConfig fromConfigs = job.getFromJobConfig(); - MToConfig toConfigs = job.getToJobConfig(); + driverConfigUpgrader.upgradeJobConfig( job.getDriverConfig(), driverConfigs); - // upgrading the configs to make sure they match the current repository - upgrader.upgrade(job.getDriverConfig(), driverConfigs); - upgrader.upgrade(job.getFromJobConfig(), fromConfigs); - upgrader.upgrade(job.getToJobConfig(), toConfigs); - MJob newJob = new MJob(job, fromConfigs, toConfigs, driverConfigs); + MJob newJob = new MJob(job, fromConfig, toConfig, driverConfigs); // Transform config structures to objects for validations SqoopConnector fromConnector = - ConnectorManager.getInstance().getConnector( + ConnectorManager.getInstance().getSqoopConnector( job.getConnectorId(Direction.FROM)); SqoopConnector toConnector = - ConnectorManager.getInstance().getConnector( + ConnectorManager.getInstance().getSqoopConnector( job.getConnectorId(Direction.TO)); Object fromConnectorConfig = ClassUtils.instantiate( @@ -314,7 +319,7 @@ private long loadJob(MJob job) { Object toConnectorConfig = ClassUtils.instantiate( toConnector.getJobConfigurationClass(Direction.TO)); Object driverConfig = ClassUtils.instantiate( - Driver.getInstance().getDriverConfigurationGroupClass()); + Driver.getInstance().getDriverJobConfigurationClass()); ConfigUtils.fromConfigs( job.getFromJobConfig().getConfigs(), fromConnectorConfig); @@ -332,7 +337,7 @@ private long loadJob(MJob job) { toConnectorConfigResult.getStatus(), driverConfigResult.getStatus()); if (finalStatus.canProceed()) { - repository.createJob(newJob); + RepositoryManager.getInstance().getRepository().createJob(newJob); } else { LOG.error("Failed to load job:" + job.getName());