5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-21 11:21:39 +08:00

SQOOP-2201: Sqoop2: Add possibility to read Hadoop configuration files to HFDS connector

(Jarek Jarcec Cecho via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2015-03-16 22:43:23 -07:00
parent 639fdbe0a2
commit 6ca31c5054
19 changed files with 184 additions and 17 deletions

View File

@ -30,7 +30,6 @@
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.util.LineReader;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.common.SqoopIDFUtils;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
@ -49,16 +48,14 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
public static final Logger LOG = Logger.getLogger(HdfsExtractor.class);
private Configuration conf;
private Configuration conf = new Configuration();
private DataWriter dataWriter;
private Schema schema;
private long rowsRead = 0;
@Override
public void extract(ExtractorContext context, LinkConfiguration linkConfiguration,
FromJobConfiguration jobConfiguration, HdfsPartition partition) {
conf = HdfsUtils.configureURI(((PrefixContext) context.getContext()).getConfiguration(), linkConfiguration);
public void extract(ExtractorContext context, LinkConfiguration linkConfiguration, FromJobConfiguration jobConfiguration, HdfsPartition partition) {
HdfsUtils.contextToConfiguration(context.getContext(), conf);
dataWriter = context.getDataWriter();
schema = context.getSchema();

View File

@ -17,6 +17,7 @@
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.job.etl.Initializer;
@ -34,8 +35,8 @@ public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobC
* @param jobConfig FROM job configuration object
*/
@Override
public void initialize(InitializerContext context, LinkConfiguration linkConfig,
FromJobConfiguration jobConfig) {
// do nothing at this point
public void initialize(InitializerContext context, LinkConfiguration linkConfig, FromJobConfiguration jobConfig) {
Configuration configuration = HdfsUtils.createConfiguration(linkConfig);
HdfsUtils.configurationToContext(configuration, context.getContext());
}
}

View File

@ -24,7 +24,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.common.SqoopIDFUtils;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
@ -54,9 +53,10 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
@Override
public void load(LoaderContext context, LinkConfiguration linkConfiguration,
ToJobConfiguration toJobConfig) throws Exception {
Configuration conf = new Configuration();
HdfsUtils.contextToConfiguration(context.getContext(), conf);
DataReader reader = context.getDataReader();
Configuration conf = HdfsUtils.configureURI(((PrefixContext) context.getContext()).getConfiguration(), linkConfiguration);
String directoryName = toJobConfig.toJobConfig.outputDirectory;
String codecname = getCompressionCodecName(toJobConfig);

View File

@ -38,7 +38,6 @@
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
@ -71,8 +70,8 @@ public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfi
public List<Partition> getPartitions(PartitionerContext context,
LinkConfiguration linkConfiguration,
FromJobConfiguration fromJobConfig) {
Configuration conf = HdfsUtils.configureURI(((PrefixContext) context.getContext()).getConfiguration(), linkConfiguration);
Configuration conf = new Configuration();
HdfsUtils.contextToConfiguration(context.getContext(), conf);
try {
long numInputBytes = getInputSize(conf, fromJobConfig.fromJobConfig.inputDirectory);

View File

@ -17,6 +17,7 @@
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
import org.apache.sqoop.job.etl.Initializer;
@ -33,8 +34,8 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi
* @param jobConfig TO job configuration object
*/
@Override
public void initialize(InitializerContext context, LinkConfiguration linkConfig,
ToJobConfiguration jobConfig) {
// do nothing at this point
public void initialize(InitializerContext context, LinkConfiguration linkConfig, ToJobConfiguration jobConfig) {
Configuration configuration = HdfsUtils.createConfiguration(linkConfig);
HdfsUtils.configurationToContext(configuration, context.getContext());
}
}

View File

@ -17,16 +17,77 @@
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
import java.io.File;
import java.io.FilenameFilter;
import java.net.MalformedURLException;
import java.util.Map;
/**
* Utilities for HDFS.
*/
public class HdfsUtils {
public static final String DEFAULT_HADOOP_CONF_DIR = "/etc/hadoop/conf";
private static final Logger LOG = Logger.getLogger(HdfsUtils.class);
/**
* Create Hadoop configuration object
*/
public static Configuration createConfiguration(LinkConfiguration linkConfig) {
Configuration configuration = new Configuration();
String confDir = linkConfig.linkConfig.confDir;
// If the configuration directory wasn't specify we will use default
if (StringUtils.isBlank(confDir)) {
confDir = DEFAULT_HADOOP_CONF_DIR;
}
// In case that the configuration directory is valid, load all config files
File dir = new File(confDir);
if (dir.exists() && dir.isDirectory()) {
String[] files = dir.list(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.endsWith("-site.xml");
}
});
if (files != null) {
for (String file : files) {
LOG.info("Found Hadoop configuration file " + file);
try {
configuration.addResource(new File(confDir, file).toURI().toURL());
} catch (MalformedURLException e) {
LOG.warn("Can't load configuration file: " + file, e);
}
}
}
}
return configureURI(configuration, linkConfig);
}
public static void configurationToContext(Configuration configuration, MutableContext context) {
for (Map.Entry<String, String> entry : configuration) {
context.setString(entry.getKey(), entry.getValue());
}
}
public static void contextToConfiguration(ImmutableContext context, Configuration configuration) {
for (Map.Entry<String, String> entry : context) {
configuration.set(entry.getKey(), entry.getValue());
}
}
/**
* Configures the URI to connect to.
* @param conf Configuration object to be configured.

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.model.ConfigUtils;
import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MInput;
import org.testng.annotations.Test;
import java.util.List;
import java.util.Locale;
import java.util.ResourceBundle;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
/**
*/
public class TestHdfsConnector {
@Test
public void testBundleForLink() {
HdfsConnector connector = new HdfsConnector();
verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getLinkConfigurationClass());
}
@Test
void testBundleForJobToDirection() {
HdfsConnector connector = new HdfsConnector();
verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getJobConfigurationClass(Direction.TO));
}
@Test
void testBundleForJobFromDirection() {
HdfsConnector connector = new HdfsConnector();
verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getJobConfigurationClass(Direction.FROM));
}
void verifyBundleForConfigClass(ResourceBundle bundle, Class klass) {
assertNotNull(bundle);
assertNotNull(klass);
List<MConfig> configs = ConfigUtils.toConfigs(klass);
for(MConfig config : configs) {
assertNotNull(config.getHelpKey());
assertNotNull(config.getLabelKey());
assertTrue(bundle.containsKey(config.getHelpKey()), "Can't find help for " + config.getName());
assertTrue(bundle.containsKey(config.getLabelKey()), "Can't find label for " + config.getName());
for(MInput input : config.getInputs()) {
assertNotNull(input.getHelpKey());
assertNotNull(input.getLabelKey());
assertTrue(bundle.containsKey(input.getHelpKey()), "Can't find help for " + input.getName());
assertTrue(bundle.containsKey(input.getLabelKey()), "Can't find label for " + input.getName());
}
}
}
}

View File

@ -22,6 +22,7 @@
import org.apache.sqoop.model.Validator;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.validators.AbstractValidator;
import org.apache.sqoop.validation.validators.DirectoryExistsValidator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -30,6 +31,9 @@
public class LinkConfig {
@Input(size = 255) public String uri;
@Input(size = 255, validators = { @Validator(DirectoryExistsValidator.class)})
public String confDir;
public static class ConfigValidator extends AbstractValidator<LinkConfig> {
private static final Pattern URI_PATTERN = Pattern.compile("((?<=\\()[A-Za-z][A-Za-z0-9\\+\\.\\-]*:([A-Za-z0-9\\.\\-_~:/\\?#\\[\\]@!\\$&'\\(\\)\\*\\+,;=]|%[A-Fa-f0-9]{2})+(?=\\)))|([A-Za-z][A-Za-z0-9\\+\\.\\-]*:([A-Za-z0-9\\.\\-_~:/\\?#\\[\\]@!\\$&'\\(\\)\\*\\+,;=]|%[A-Fa-f0-9]{2})+)");

View File

@ -24,6 +24,9 @@ linkConfig.help = Here you supply information necessary to connect to HDFS
linkConfig.uri.label = HDFS URI
linkConfig.uri.help = HDFS URI used to connect to HDFS
linkConfig.confDir.label = Hadoop conf directory:
linkConfig.confDir.help = Directory with Hadoop configuration files. The connector will load all -site.xml files.
# To Job Config
#
toJobConfig.label = ToJob configuration

View File

@ -144,6 +144,11 @@ protected void fillRdbmsLinkConfig(MLink link) {
configs.getStringInput("linkConfig.password").setValue(provider.getConnectionPassword());
}
protected void fillHdfsLink(MLink link) {
MConfigList configs = link.getConnectorLinkConfig();
configs.getStringInput("linkConfig.confDir").setValue(getCluster().getConfigurationPath());
}
/**
* Fill TO config with specific storage and output type.
*

View File

@ -30,6 +30,7 @@
import org.apache.sqoop.test.hadoop.HadoopRunner;
import org.apache.sqoop.test.hadoop.HadoopRunnerFactory;
import org.apache.sqoop.test.hadoop.HadoopLocalRunner;
import org.apache.sqoop.test.minicluster.SqoopMiniCluster;
import org.apache.sqoop.test.minicluster.TomcatSqoopMiniCluster;
import org.apache.sqoop.test.utils.HdfsUtils;
import org.testng.ITest;
@ -159,6 +160,10 @@ public SqoopClient getClient() {
return client;
}
public SqoopMiniCluster getCluster() {
return cluster;
}
public String getTemporaryPath() {
return tmpPath;
}

View File

@ -77,6 +77,7 @@ public void testFrom() throws Exception {
// HDFS link
MLink hdfsConnection = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsConnection);
saveLink(hdfsConnection);
// Job creation
@ -120,6 +121,7 @@ public void testTo() throws Exception {
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsLink);
saveLink(hdfsLink);
// Job creation

View File

@ -60,6 +60,7 @@ public void testBasic() throws Exception {
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsLink);
saveLink(hdfsLink);
// Job creation

View File

@ -45,6 +45,7 @@ public void testCities() throws Exception {
// HDFS link
MLink hdfsConnection = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsConnection);
saveLink(hdfsConnection);
// Job creation
@ -88,6 +89,7 @@ public void testStories() throws Exception {
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsLink);
saveLink(hdfsLink);
// Job creation
@ -133,6 +135,7 @@ public void testColumns() throws Exception {
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsLink);
saveLink(hdfsLink);
// Job creation
@ -179,6 +182,7 @@ public void testSql() throws Exception {
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsLink);
saveLink(hdfsLink);
// Job creation
@ -225,6 +229,7 @@ public void testDuplicateColumns() throws Exception {
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsLink);
saveLink(hdfsLink);
// Job creation

View File

@ -71,6 +71,7 @@ public void testTable() throws Exception {
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsLink);
saveLink(hdfsLink);
// Job creation
@ -121,6 +122,7 @@ public void testQuery() throws Exception {
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsLink);
saveLink(hdfsLink);
// Job creation

View File

@ -88,6 +88,7 @@ public void testSplitter() throws Exception {
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsLink);
saveLink(hdfsLink);
// Job creation

View File

@ -51,6 +51,7 @@ public void testStagedTransfer() throws Exception {
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsLink);
saveLink(hdfsLink);
// Job creation

View File

@ -43,6 +43,7 @@ public void testBasic() throws Exception {
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsLink);
saveLink(hdfsLink);
// Job creation

View File

@ -70,6 +70,7 @@ public void testWithDisabledObjects() throws Exception {
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsLink);
saveLink(hdfsLink);
// Job creation