diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java index 8237e51e..b35c9572 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java @@ -30,7 +30,6 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.util.LineReader; import org.apache.log4j.Logger; -import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.common.SqoopIDFUtils; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; @@ -49,16 +48,14 @@ public class HdfsExtractor extends Extractor { @Override public void load(LoaderContext context, LinkConfiguration linkConfiguration, ToJobConfiguration toJobConfig) throws Exception { + Configuration conf = new Configuration(); + HdfsUtils.contextToConfiguration(context.getContext(), conf); DataReader reader = context.getDataReader(); - Configuration conf = HdfsUtils.configureURI(((PrefixContext) context.getContext()).getConfiguration(), linkConfiguration); String directoryName = toJobConfig.toJobConfig.outputDirectory; String codecname = getCompressionCodecName(toJobConfig); diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java index 78fd60ab..dcc11573 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java @@ -38,7 +38,6 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NodeBase; -import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; @@ -71,8 +70,8 @@ public class HdfsPartitioner extends Partitioner getPartitions(PartitionerContext context, LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfig) { - - Configuration conf = HdfsUtils.configureURI(((PrefixContext) context.getContext()).getConfiguration(), linkConfiguration); + Configuration conf = new Configuration(); + HdfsUtils.contextToConfiguration(context.getContext(), conf); try { long numInputBytes = getInputSize(conf, fromJobConfig.fromJobConfig.inputDirectory); diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java index 991e6c99..ad500c2f 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java @@ -17,6 +17,7 @@ */ package org.apache.sqoop.connector.hdfs; +import org.apache.hadoop.conf.Configuration; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; import org.apache.sqoop.job.etl.Initializer; @@ -33,8 +34,8 @@ public class HdfsToInitializer extends Initializer entry : configuration) { + context.setString(entry.getKey(), entry.getValue()); + } + } + + public static void contextToConfiguration(ImmutableContext context, Configuration configuration) { + for (Map.Entry entry : context) { + configuration.set(entry.getKey(), entry.getValue()); + } + } + /** * Configures the URI to connect to. * @param conf Configuration object to be configured. diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/TestHdfsConnector.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/TestHdfsConnector.java new file mode 100644 index 00000000..b41bd5a1 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/TestHdfsConnector.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.model.ConfigUtils; +import org.apache.sqoop.model.MConfig; +import org.apache.sqoop.model.MInput; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Locale; +import java.util.ResourceBundle; + +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +/** + */ +public class TestHdfsConnector { + + @Test + public void testBundleForLink() { + HdfsConnector connector = new HdfsConnector(); + verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getLinkConfigurationClass()); + } + + @Test + void testBundleForJobToDirection() { + HdfsConnector connector = new HdfsConnector(); + verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getJobConfigurationClass(Direction.TO)); + } + + @Test + void testBundleForJobFromDirection() { + HdfsConnector connector = new HdfsConnector(); + verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getJobConfigurationClass(Direction.FROM)); + } + + void verifyBundleForConfigClass(ResourceBundle bundle, Class klass) { + assertNotNull(bundle); + assertNotNull(klass); + + List configs = ConfigUtils.toConfigs(klass); + + for(MConfig config : configs) { + assertNotNull(config.getHelpKey()); + assertNotNull(config.getLabelKey()); + + assertTrue(bundle.containsKey(config.getHelpKey()), "Can't find help for " + config.getName()); + assertTrue(bundle.containsKey(config.getLabelKey()), "Can't find label for " + config.getName()); + + for(MInput input : config.getInputs()) { + assertNotNull(input.getHelpKey()); + assertNotNull(input.getLabelKey()); + + assertTrue(bundle.containsKey(input.getHelpKey()), "Can't find help for " + input.getName()); + assertTrue(bundle.containsKey(input.getLabelKey()), "Can't find label for " + input.getName()); + } + } + } +} diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java index 146c3b17..b54ad159 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java @@ -22,6 +22,7 @@ import org.apache.sqoop.model.Validator; import org.apache.sqoop.validation.Status; import org.apache.sqoop.validation.validators.AbstractValidator; +import org.apache.sqoop.validation.validators.DirectoryExistsValidator; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -30,6 +31,9 @@ public class LinkConfig { @Input(size = 255) public String uri; + @Input(size = 255, validators = { @Validator(DirectoryExistsValidator.class)}) + public String confDir; + public static class ConfigValidator extends AbstractValidator { private static final Pattern URI_PATTERN = Pattern.compile("((?<=\\()[A-Za-z][A-Za-z0-9\\+\\.\\-]*:([A-Za-z0-9\\.\\-_~:/\\?#\\[\\]@!\\$&'\\(\\)\\*\\+,;=]|%[A-Fa-f0-9]{2})+(?=\\)))|([A-Za-z][A-Za-z0-9\\+\\.\\-]*:([A-Za-z0-9\\.\\-_~:/\\?#\\[\\]@!\\$&'\\(\\)\\*\\+,;=]|%[A-Fa-f0-9]{2})+)"); diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties index 3904856d..8d5a562c 100644 --- a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties +++ b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties @@ -24,6 +24,9 @@ linkConfig.help = Here you supply information necessary to connect to HDFS linkConfig.uri.label = HDFS URI linkConfig.uri.help = HDFS URI used to connect to HDFS +linkConfig.confDir.label = Hadoop conf directory: +linkConfig.confDir.help = Directory with Hadoop configuration files. The connector will load all -site.xml files. + # To Job Config # toJobConfig.label = ToJob configuration diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java index ce6af6ea..80f21cc3 100644 --- a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java +++ b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java @@ -144,6 +144,11 @@ protected void fillRdbmsLinkConfig(MLink link) { configs.getStringInput("linkConfig.password").setValue(provider.getConnectionPassword()); } + protected void fillHdfsLink(MLink link) { + MConfigList configs = link.getConnectorLinkConfig(); + configs.getStringInput("linkConfig.confDir").setValue(getCluster().getConfigurationPath()); + } + /** * Fill TO config with specific storage and output type. * diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/TomcatTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/TomcatTestCase.java index 2ef971da..94164733 100644 --- a/test/src/main/java/org/apache/sqoop/test/testcases/TomcatTestCase.java +++ b/test/src/main/java/org/apache/sqoop/test/testcases/TomcatTestCase.java @@ -30,6 +30,7 @@ import org.apache.sqoop.test.hadoop.HadoopRunner; import org.apache.sqoop.test.hadoop.HadoopRunnerFactory; import org.apache.sqoop.test.hadoop.HadoopLocalRunner; +import org.apache.sqoop.test.minicluster.SqoopMiniCluster; import org.apache.sqoop.test.minicluster.TomcatSqoopMiniCluster; import org.apache.sqoop.test.utils.HdfsUtils; import org.testng.ITest; @@ -159,6 +160,10 @@ public SqoopClient getClient() { return client; } + public SqoopMiniCluster getCluster() { + return cluster; + } + public String getTemporaryPath() { return tmpPath; } diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/AllTypesTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/AllTypesTest.java index ac90eac8..6823ed26 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/AllTypesTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/AllTypesTest.java @@ -77,6 +77,7 @@ public void testFrom() throws Exception { // HDFS link MLink hdfsConnection = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsConnection); saveLink(hdfsConnection); // Job creation @@ -120,6 +121,7 @@ public void testTo() throws Exception { // HDFS link MLink hdfsLink = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLink); saveLink(hdfsLink); // Job creation diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java index a21e4a16..034ae431 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java @@ -60,6 +60,7 @@ public void testBasic() throws Exception { // HDFS link MLink hdfsLink = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLink); saveLink(hdfsLink); // Job creation diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java index 5552e04f..6e1e031f 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java @@ -45,6 +45,7 @@ public void testCities() throws Exception { // HDFS link MLink hdfsConnection = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsConnection); saveLink(hdfsConnection); // Job creation @@ -88,6 +89,7 @@ public void testStories() throws Exception { // HDFS link MLink hdfsLink = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLink); saveLink(hdfsLink); // Job creation @@ -133,6 +135,7 @@ public void testColumns() throws Exception { // HDFS link MLink hdfsLink = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLink); saveLink(hdfsLink); // Job creation @@ -179,6 +182,7 @@ public void testSql() throws Exception { // HDFS link MLink hdfsLink = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLink); saveLink(hdfsLink); // Job creation @@ -225,6 +229,7 @@ public void testDuplicateColumns() throws Exception { // HDFS link MLink hdfsLink = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLink); saveLink(hdfsLink); // Job creation diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java index 716de307..b37cdb44 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java @@ -71,6 +71,7 @@ public void testTable() throws Exception { // HDFS link MLink hdfsLink = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLink); saveLink(hdfsLink); // Job creation @@ -121,6 +122,7 @@ public void testQuery() throws Exception { // HDFS link MLink hdfsLink = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLink); saveLink(hdfsLink); // Job creation diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/PartitionerTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/PartitionerTest.java index f69f08ce..ef9720a3 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/PartitionerTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/PartitionerTest.java @@ -88,6 +88,7 @@ public void testSplitter() throws Exception { // HDFS link MLink hdfsLink = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLink); saveLink(hdfsLink); // Job creation diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java index f8507778..5ef7c8f6 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java @@ -51,6 +51,7 @@ public void testStagedTransfer() throws Exception { // HDFS link MLink hdfsLink = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLink); saveLink(hdfsLink); // Job creation diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java index 83273f1b..88db2f29 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java @@ -43,6 +43,7 @@ public void testBasic() throws Exception { // HDFS link MLink hdfsLink = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLink); saveLink(hdfsLink); // Job creation diff --git a/test/src/test/java/org/apache/sqoop/integration/server/SubmissionWithDisabledModelObjectsTest.java b/test/src/test/java/org/apache/sqoop/integration/server/SubmissionWithDisabledModelObjectsTest.java index 38235839..5f9f41d1 100644 --- a/test/src/test/java/org/apache/sqoop/integration/server/SubmissionWithDisabledModelObjectsTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/server/SubmissionWithDisabledModelObjectsTest.java @@ -70,6 +70,7 @@ public void testWithDisabledObjects() throws Exception { // HDFS link MLink hdfsLink = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLink); saveLink(hdfsLink); // Job creation