mirror of
https://github.com/apache/sqoop.git
synced 2025-05-06 13:50:43 +08:00
SQOOP-2231: Sqoop2: Kite connector should use authority in link configuration
(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
parent
598460f4a5
commit
42d84a84e4
@ -18,17 +18,57 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.sqoop.connector.kite;
|
package org.apache.sqoop.connector.kite;
|
||||||
|
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.apache.sqoop.common.SqoopException;
|
||||||
import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
|
import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
|
||||||
import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
|
import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
|
||||||
|
import org.apache.sqoop.model.MConfig;
|
||||||
import org.apache.sqoop.model.MFromConfig;
|
import org.apache.sqoop.model.MFromConfig;
|
||||||
|
import org.apache.sqoop.model.MInput;
|
||||||
import org.apache.sqoop.model.MLinkConfig;
|
import org.apache.sqoop.model.MLinkConfig;
|
||||||
import org.apache.sqoop.model.MToConfig;
|
import org.apache.sqoop.model.MToConfig;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
public class KiteConnectorUpgrader extends ConnectorConfigurableUpgrader {
|
public class KiteConnectorUpgrader extends ConnectorConfigurableUpgrader {
|
||||||
|
private static final Logger LOG = Logger.getLogger(KiteConnectorUpgrader.class);
|
||||||
|
|
||||||
|
private static final Map<String, String> LINK_CONFIG_MAP;
|
||||||
|
|
||||||
|
static {
|
||||||
|
LINK_CONFIG_MAP = new HashMap<String, String>();
|
||||||
|
LINK_CONFIG_MAP.put("linkConfig.authority", "linkConfig.hdfsHostAndPort");
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) {
|
public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) {
|
||||||
ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
|
Map<String, MConfig> configMap = new HashMap<String, MConfig>();
|
||||||
|
for (MConfig config : original.getConfigs()) {
|
||||||
|
configMap.put(config.getName(), config);
|
||||||
|
}
|
||||||
|
for (MConfig config : upgradeTarget.getConfigs()) {
|
||||||
|
List<MInput<?>> inputs = config.getInputs();
|
||||||
|
MConfig originalConfig = configMap.get(config.getName());
|
||||||
|
if (originalConfig == null) {
|
||||||
|
LOG.warn("Config: '" + config.getName() + "' not present in old " +
|
||||||
|
"configurable. So it and its inputs will not be transferred by the upgrader.");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
for (MInput input : inputs) {
|
||||||
|
try {
|
||||||
|
if (LINK_CONFIG_MAP.containsKey(input.getName())) {
|
||||||
|
input.setValue(originalConfig.getInput(LINK_CONFIG_MAP.get(input.getName())).getValue());
|
||||||
|
} else {
|
||||||
|
input.setValue(originalConfig.getInput(input.getName()).getValue());
|
||||||
|
}
|
||||||
|
} catch (SqoopException ex) {
|
||||||
|
LOG.warn("Input: '" + input.getName() + "' not present in old " +
|
||||||
|
"configurable. So it will not be transferred by the upgrader.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -26,11 +26,11 @@ public class ConfigUtil {
|
|||||||
* Returns a dataset uri, including the filesystem location part, if it is
|
* Returns a dataset uri, including the filesystem location part, if it is
|
||||||
* provided separated,
|
* provided separated,
|
||||||
*/
|
*/
|
||||||
public static String buildDatasetUri(String fsLocation, String uri) {
|
public static String buildDatasetUri(String authority, String uri) {
|
||||||
if (!Strings.isNullOrEmpty(fsLocation) && !uri.contains("://")) {
|
if (!Strings.isNullOrEmpty(authority) && !uri.contains("://")) {
|
||||||
URIBuilder builder = new URIBuilder(uri);
|
URIBuilder builder = new URIBuilder(uri);
|
||||||
|
|
||||||
String[] parts = fsLocation.split(":");
|
String[] parts = authority.split(":");
|
||||||
if (parts.length > 0) {
|
if (parts.length > 0) {
|
||||||
builder.with("auth:host", parts[0]);
|
builder.with("auth:host", parts[0]);
|
||||||
}
|
}
|
||||||
@ -50,7 +50,7 @@ public static String buildDatasetUri(String fsLocation, String uri) {
|
|||||||
*/
|
*/
|
||||||
public static String buildDatasetUri(LinkConfig linkConfig,
|
public static String buildDatasetUri(LinkConfig linkConfig,
|
||||||
ToJobConfig toJobConfig) {
|
ToJobConfig toJobConfig) {
|
||||||
return buildDatasetUri(linkConfig.hdfsHostAndPort, toJobConfig.uri);
|
return buildDatasetUri(linkConfig.authority, toJobConfig.uri);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -29,16 +29,16 @@
|
|||||||
public class LinkConfig {
|
public class LinkConfig {
|
||||||
|
|
||||||
@Input(size = 255)
|
@Input(size = 255)
|
||||||
public String hdfsHostAndPort;
|
public String authority;
|
||||||
|
|
||||||
public static class ConfigValidator extends AbstractValidator<LinkConfig> {
|
public static class ConfigValidator extends AbstractValidator<LinkConfig> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void validate(LinkConfig config) {
|
public void validate(LinkConfig config) {
|
||||||
// TODO: There is no way to declare it as optional (SQOOP-1643), we cannot validate it directly using HostAndPortValidator.
|
// TODO: There is no way to declare it as optional (SQOOP-1643), we cannot validate it directly using HostAndPortValidator.
|
||||||
if (!Strings.isNullOrEmpty(config.hdfsHostAndPort)) {
|
if (!Strings.isNullOrEmpty(config.authority)) {
|
||||||
HostAndPortValidator validator = new HostAndPortValidator();
|
HostAndPortValidator validator = new HostAndPortValidator();
|
||||||
validator.validate(config.hdfsHostAndPort);
|
validator.validate(config.authority);
|
||||||
if (!validator.getStatus().equals(Status.OK)) {
|
if (!validator.getStatus().equals(Status.OK)) {
|
||||||
addMessage(validator.getStatus(), getMessages().toString());
|
addMessage(validator.getStatus(), getMessages().toString());
|
||||||
}
|
}
|
||||||
|
@ -22,8 +22,8 @@ linkConfig.label = Link Configuration
|
|||||||
linkConfig.help = You must supply the information requested in order to create a \
|
linkConfig.help = You must supply the information requested in order to create a \
|
||||||
connection object.
|
connection object.
|
||||||
|
|
||||||
linkConfig.hdfsHostAndPort.label = HDFS host and port
|
linkConfig.authority.label = HDFS host and port
|
||||||
linkConfig.hdfsHostAndPort.help = Optional to override HDFS file system location.
|
linkConfig.authority.help = Optional to override HDFS file system location.
|
||||||
|
|
||||||
# To Job Config
|
# To Job Config
|
||||||
#
|
#
|
||||||
|
@ -0,0 +1,77 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.sqoop.connector.kite;
|
||||||
|
|
||||||
|
import org.apache.sqoop.common.Direction;
|
||||||
|
import org.apache.sqoop.model.ConfigUtils;
|
||||||
|
import org.apache.sqoop.model.MConfig;
|
||||||
|
import org.apache.sqoop.model.MInput;
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.ResourceBundle;
|
||||||
|
|
||||||
|
import static org.testng.Assert.assertNotNull;
|
||||||
|
import static org.testng.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class TestKiteConnector {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBundleForLink() {
|
||||||
|
KiteConnector connector = new KiteConnector();
|
||||||
|
verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getLinkConfigurationClass());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testBundleForJobToDirection() {
|
||||||
|
KiteConnector connector = new KiteConnector();
|
||||||
|
verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getJobConfigurationClass(Direction.TO));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testBundleForJobFromDirection() {
|
||||||
|
KiteConnector connector = new KiteConnector();
|
||||||
|
verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getJobConfigurationClass(Direction.FROM));
|
||||||
|
}
|
||||||
|
|
||||||
|
void verifyBundleForConfigClass(ResourceBundle bundle, Class klass) {
|
||||||
|
assertNotNull(bundle);
|
||||||
|
assertNotNull(klass);
|
||||||
|
|
||||||
|
List<MConfig> configs = ConfigUtils.toConfigs(klass);
|
||||||
|
|
||||||
|
for(MConfig config : configs) {
|
||||||
|
assertNotNull(config.getHelpKey());
|
||||||
|
assertNotNull(config.getLabelKey());
|
||||||
|
|
||||||
|
assertTrue(bundle.containsKey(config.getHelpKey()), "Can't find help for " + config.getName());
|
||||||
|
assertTrue(bundle.containsKey(config.getLabelKey()), "Can't find label for " + config.getName());
|
||||||
|
|
||||||
|
for(MInput input : config.getInputs()) {
|
||||||
|
assertNotNull(input.getHelpKey());
|
||||||
|
assertNotNull(input.getLabelKey());
|
||||||
|
|
||||||
|
assertTrue(bundle.containsKey(input.getHelpKey()), "Can't find help for " + input.getName());
|
||||||
|
assertTrue(bundle.containsKey(input.getLabelKey()), "Can't find label for " + input.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,54 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.sqoop.connector.kite;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
|
||||||
|
import org.apache.sqoop.model.ConfigUtils;
|
||||||
|
import org.apache.sqoop.model.InputEditable;
|
||||||
|
import org.apache.sqoop.model.MConfig;
|
||||||
|
import org.apache.sqoop.model.MInput;
|
||||||
|
import org.apache.sqoop.model.MLinkConfig;
|
||||||
|
import org.apache.sqoop.model.MStringInput;
|
||||||
|
import org.testng.annotations.BeforeMethod;
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.util.LinkedList;
|
||||||
|
|
||||||
|
import static org.testng.AssertJUnit.assertEquals;
|
||||||
|
|
||||||
|
public class TestKiteConnectorUpgrader {
|
||||||
|
private KiteConnectorUpgrader upgrader;
|
||||||
|
|
||||||
|
@BeforeMethod(alwaysRun = true)
|
||||||
|
public void setup() {
|
||||||
|
upgrader = new KiteConnectorUpgrader();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLinkUpgrade() throws Exception {
|
||||||
|
MLinkConfig originalConfigs = new MLinkConfig(new LinkedList<MConfig>());
|
||||||
|
MLinkConfig newConfigs = new MLinkConfig(ConfigUtils.toConfigs(LinkConfiguration.class));
|
||||||
|
originalConfigs.getConfigs().add(new MConfig("linkConfig", new LinkedList<MInput<?>>()));
|
||||||
|
originalConfigs.getConfigs().get(0).getInputs().add(new MStringInput("linkConfig.hdfsHostAndPort", false, InputEditable.ANY, StringUtils.EMPTY, (short)255));
|
||||||
|
originalConfigs.getInput("linkConfig.hdfsHostAndPort").setValue("test:8020");
|
||||||
|
upgrader.upgradeLinkConfig(originalConfigs, newConfigs);
|
||||||
|
assertEquals("test:8020", newConfigs.getInput("linkConfig.authority").getValue());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user