From 520fc33ca33a5faa446c360efa48c6caa08f043d Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Mon, 27 Oct 2014 13:31:51 -0700 Subject: [PATCH] SQOOP-1620: Sqoop2: FileSystem should be configurable in HDFS connector (Abraham Elmahrek via Jarek Jarcec Cecho) --- .../sqoop/connector/hdfs/HdfsConnector.java | 4 +- .../sqoop/connector/hdfs/HdfsExtractor.java | 8 +-- .../connector/hdfs/HdfsFromDestroyer.java | 6 +- .../connector/hdfs/HdfsFromInitializer.java | 8 +-- .../sqoop/connector/hdfs/HdfsLoader.java | 15 ++-- .../sqoop/connector/hdfs/HdfsPartitioner.java | 9 +-- .../sqoop/connector/hdfs/HdfsToDestroyer.java | 6 +- .../connector/hdfs/HdfsToInitializer.java | 9 ++- .../sqoop/connector/hdfs/HdfsUtils.java | 43 +++++++++++ .../hdfs/configuration/LinkConfig.java | 48 +++++++++++++ .../hdfs/configuration/LinkConfiguration.java | 31 ++++++++ .../hdfs-connector-config.properties | 7 ++ .../sqoop/connector/hdfs/TestExtractor.java | 6 +- .../sqoop/connector/hdfs/TestHdfsUtils.java | 44 ++++++++++++ .../sqoop/connector/hdfs/TestLinkConfig.java | 71 +++++++++++++++++++ .../sqoop/connector/hdfs/TestLoader.java | 4 +- .../sqoop/connector/hdfs/TestPartitioner.java | 4 +- 17 files changed, 283 insertions(+), 40 deletions(-) create mode 100644 connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java create mode 100644 connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java create mode 100644 connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java create mode 100644 connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java create mode 100644 connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLinkConfig.java diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java index b5f1f770..1640f80c 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java @@ -24,8 +24,8 @@ import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.VersionInfo; -import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; import org.apache.sqoop.connector.spi.SqoopConnector; @@ -75,7 +75,7 @@ public ResourceBundle getBundle(Locale locale) { @SuppressWarnings("rawtypes") @Override public Class getLinkConfigurationClass() { - return EmptyConfiguration.class; + return LinkConfiguration.class; } /** diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java index 31b0a995..2586f945 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java @@ -32,8 +32,8 @@ import org.apache.log4j.Logger; import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.etl.io.DataWriter; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; @@ -42,7 +42,7 @@ * Extract from HDFS. * Default field delimiter of a record is comma. */ -public class HdfsExtractor extends Extractor { +public class HdfsExtractor extends Extractor { public static final Logger LOG = Logger.getLogger(HdfsExtractor.class); @@ -51,10 +51,10 @@ public class HdfsExtractor extends Extractor { +public class HdfsFromDestroyer extends Destroyer { /** * Callback to clean up after job execution. * @@ -31,7 +31,7 @@ public class HdfsFromDestroyer extends Destroyer { +public class HdfsFromInitializer extends Initializer { /** * Initialize new submission based on given configuration properties. Any * needed temporary values might be saved to context object and they will be @@ -35,13 +35,13 @@ public class HdfsFromInitializer extends Initializer { +public class HdfsLoader extends Loader { /** * Load data to target. * * @param context Loader context object - * @param linkConfig Link configuration - * @param toJobConfig Job configuration + * @param linkConfiguration Link configuration + * @param toJobConfig Job configuration * @throws Exception */ @Override - public void load(LoaderContext context, EmptyConfiguration linkConfig, ToJobConfiguration toJobConfig) throws Exception { + public void load(LoaderContext context, LinkConfiguration linkConfiguration, + ToJobConfiguration toJobConfig) throws Exception { DataReader reader = context.getDataReader(); - - Configuration conf = ((PrefixContext)context.getContext()).getConfiguration(); - + Configuration conf = HdfsUtils.configureURI(((PrefixContext) context.getContext()).getConfiguration(), linkConfiguration); String directoryName = toJobConfig.toJobConfig.outputDirectory; String codecname = getCompressionCodecName(toJobConfig); diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java index daa7fe26..181528cc 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java @@ -40,8 +40,8 @@ import org.apache.hadoop.net.NodeBase; import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; import org.apache.sqoop.job.etl.PartitionerContext; @@ -50,7 +50,7 @@ * This class derives mostly from CombineFileInputFormat of Hadoop, i.e. * org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat. */ -public class HdfsPartitioner extends Partitioner { +public class HdfsPartitioner extends Partitioner { public static final String SPLIT_MINSIZE_PERNODE = "mapreduce.input.fileinputformat.split.minsize.per.node"; @@ -68,9 +68,10 @@ public class HdfsPartitioner extends Partitioner getPartitions(PartitionerContext context, - EmptyConfiguration emptyConfig, FromJobConfiguration fromJobConfig) { + LinkConfiguration linkConfiguration, + FromJobConfiguration fromJobConfig) { - Configuration conf = ((PrefixContext)context.getContext()).getConfiguration(); + Configuration conf = HdfsUtils.configureURI(((PrefixContext) context.getContext()).getConfiguration(), linkConfiguration); try { long numInputBytes = getInputSize(conf, fromJobConfig.fromJobConfig.inputDirectory); diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java index 8bfd7272..3c85be89 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java @@ -17,12 +17,12 @@ */ package org.apache.sqoop.connector.hdfs; -import org.apache.sqoop.connector.common.EmptyConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.DestroyerContext; -public class HdfsToDestroyer extends Destroyer { +public class HdfsToDestroyer extends Destroyer { /** * Callback to clean up after job execution. * @@ -31,7 +31,7 @@ public class HdfsToDestroyer extends Destroyer { +public class HdfsToInitializer extends Initializer { /** * Initialize new submission based on given configuration properties. Any * needed temporary values might be saved to context object and they will be @@ -35,13 +34,13 @@ public class HdfsToInitializer extends Initializer { + private static final Pattern URI_PATTERN = Pattern.compile("((?<=\\()[A-Za-z][A-Za-z0-9\\+\\.\\-]*:([A-Za-z0-9\\.\\-_~:/\\?#\\[\\]@!\\$&'\\(\\)\\*\\+,;=]|%[A-Fa-f0-9]{2})+(?=\\)))|([A-Za-z][A-Za-z0-9\\+\\.\\-]*:([A-Za-z0-9\\.\\-_~:/\\?#\\[\\]@!\\$&'\\(\\)\\*\\+,;=]|%[A-Fa-f0-9]{2})+)"); + + @Override + public void validate(LinkConfig config) { + if (config.uri != null) { + Matcher matcher = URI_PATTERN.matcher(config.uri); + if (!matcher.matches()) { + addMessage(Status.UNACCEPTABLE, + "Invalid URI" + config.uri + ". URI must either be null or a valid URI. Here are a few valid example URIs:" + + " hdfs://example.com:8020/, hdfs://example.com/, file:///, file:///tmp, file://localhost/tmp"); + } + } + } + } +} diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java new file mode 100644 index 00000000..29063a81 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +import org.apache.sqoop.model.Config; +import org.apache.sqoop.model.ConfigurationClass; + +@ConfigurationClass +public class LinkConfiguration { + @Config + public LinkConfig linkConfig; + + public LinkConfiguration() { + linkConfig = new LinkConfig(); + } +} diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties index 90bc8bc0..3d088d07 100644 --- a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties +++ b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties @@ -17,6 +17,13 @@ ############################ +# Link Config +linkConfig.label = Link configuration +linkConfig.help = Here you supply information necessary to connect to HDFS + +linkConfig.uri.label = HDFS URI +linkConfig.uri.help = HDFS URI used to connect to HDFS + # To Job Config # toJobConfig.label = ToJob configuration diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java index 124c3df1..0a6369fb 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java @@ -30,8 +30,8 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.sqoop.common.PrefixContext; -import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.etl.io.DataWriter; import org.apache.sqoop.job.etl.Extractor; @@ -52,7 +52,7 @@ public class TestExtractor extends TestHdfsBase { private ToFormat outputFileType; private Class compressionClass; private final String inputDirectory; - private Extractor extractor; + private Extractor extractor; public TestExtractor(ToFormat outputFileType, Class compressionClass) @@ -131,7 +131,7 @@ public void writeRecord(Object obj) { } }); - EmptyConfiguration emptyLinkConfig = new EmptyConfiguration(); + LinkConfiguration emptyLinkConfig = new LinkConfiguration(); FromJobConfiguration emptyJobConfig = new FromJobConfiguration(); HdfsPartition partition = createPartition(FileUtils.listDir(inputDirectory)); diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java new file mode 100644 index 00000000..63e14ae2 --- /dev/null +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class TestHdfsUtils { + + @Test + public void testConfigureURI() throws Exception { + final String TEST_URI = "hdfs://argggg:1111"; + LinkConfiguration linkConfiguration = new LinkConfiguration(); + Configuration conf = new Configuration(); + + assertNotEquals(TEST_URI, conf.get("fs.default.name")); + assertNotEquals(TEST_URI, conf.get("fs.defaultFS")); + + linkConfiguration.linkConfig.uri = TEST_URI; + + assertEquals(conf, HdfsUtils.configureURI(conf, linkConfiguration)); + assertEquals(TEST_URI, conf.get("fs.default.name")); + assertEquals(TEST_URI, conf.get("fs.defaultFS")); + } +} \ No newline at end of file diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLinkConfig.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLinkConfig.java new file mode 100644 index 00000000..176d0dfe --- /dev/null +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLinkConfig.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.connector.hdfs.configuration.LinkConfig; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestLinkConfig { + @Test + public void testValidURI() { + String[] URIs = { + "hdfs://localhost:8020", + "hdfs://localhost:8020/", + "hdfs://localhost:8020/test", + "hdfs://localhost:8020/test/test", + "hdfs://localhost:8020/test/", + "hdfs://localhost/", + "hdfs://localhost", + "hdfs://a:8020", + "file:///", + "file://localhost/", + "file://localhost/tmp", + "file://localhost/tmp/" + }; + for (String uri : URIs) { + LinkConfig config = new LinkConfig(); + LinkConfig.ConfigValidator validator = new LinkConfig.ConfigValidator(); + config.uri = uri; + validator.validate(config); + assertTrue(uri, validator.getStatus().canProceed()); + } + } + + @Test + public void testInvalidURI() { + String[] URIs = { + "://localhost:8020", + ":///", + "://", + "hdfs:", + "hdfs//", + "file//localhost/", + "-://localhost/" + }; + for (String uri : URIs) { + LinkConfig config = new LinkConfig(); + LinkConfig.ConfigValidator validator = new LinkConfig.ConfigValidator(); + config.uri = uri; + validator.validate(config); + assertFalse(uri, validator.getStatus().canProceed()); + } + } +} diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java index 8429e155..a30d410b 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java @@ -37,7 +37,7 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.sqoop.common.PrefixContext; -import org.apache.sqoop.connector.common.EmptyConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToCompression; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; @@ -121,7 +121,7 @@ public Object readContent() { return null; } }, null); - EmptyConfiguration linkConf = new EmptyConfiguration(); + LinkConfiguration linkConf = new LinkConfiguration(); ToJobConfiguration jobConf = new ToJobConfiguration(); jobConf.toJobConfig.outputDirectory = outputDirectory; jobConf.toJobConfig.compression = compression; diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java index bef19847..04e09cd1 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java @@ -31,8 +31,8 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.sqoop.common.PrefixContext; -import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; @@ -98,7 +98,7 @@ public void testPartitioner() { Configuration conf = new Configuration(); PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context."); PartitionerContext context = new PartitionerContext(prefixContext, 5, null); - EmptyConfiguration linkConf = new EmptyConfiguration(); + LinkConfiguration linkConf = new LinkConfiguration(); FromJobConfiguration jobConf = new FromJobConfiguration(); jobConf.fromJobConfig.inputDirectory = inputDirectory;