mirror of
https://github.com/apache/sqoop.git
synced 2025-05-06 17:01:59 +08:00
SQOOP-1620: Sqoop2: FileSystem should be configurable in HDFS connector
(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
parent
24b8107ab1
commit
520fc33ca3
@ -24,8 +24,8 @@
|
||||
import org.apache.sqoop.common.Direction;
|
||||
import org.apache.sqoop.common.SqoopException;
|
||||
import org.apache.sqoop.common.VersionInfo;
|
||||
import org.apache.sqoop.connector.common.EmptyConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
|
||||
import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
|
||||
import org.apache.sqoop.connector.spi.SqoopConnector;
|
||||
@ -75,7 +75,7 @@ public ResourceBundle getBundle(Locale locale) {
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Override
|
||||
public Class getLinkConfigurationClass() {
|
||||
return EmptyConfiguration.class;
|
||||
return LinkConfiguration.class;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -32,8 +32,8 @@
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.sqoop.common.PrefixContext;
|
||||
import org.apache.sqoop.common.SqoopException;
|
||||
import org.apache.sqoop.connector.common.EmptyConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
|
||||
import org.apache.sqoop.etl.io.DataWriter;
|
||||
import org.apache.sqoop.job.etl.Extractor;
|
||||
import org.apache.sqoop.job.etl.ExtractorContext;
|
||||
@ -42,7 +42,7 @@
|
||||
* Extract from HDFS.
|
||||
* Default field delimiter of a record is comma.
|
||||
*/
|
||||
public class HdfsExtractor extends Extractor<EmptyConfiguration, FromJobConfiguration, HdfsPartition> {
|
||||
public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfiguration, HdfsPartition> {
|
||||
|
||||
public static final Logger LOG = Logger.getLogger(HdfsExtractor.class);
|
||||
|
||||
@ -51,10 +51,10 @@ public class HdfsExtractor extends Extractor<EmptyConfiguration, FromJobConfigur
|
||||
private long rowRead = 0;
|
||||
|
||||
@Override
|
||||
public void extract(ExtractorContext context, EmptyConfiguration linkConfig,
|
||||
public void extract(ExtractorContext context, LinkConfiguration linkConfiguration,
|
||||
FromJobConfiguration jobConfig, HdfsPartition partition) {
|
||||
|
||||
conf = ((PrefixContext) context.getContext()).getConfiguration();
|
||||
conf = HdfsUtils.configureURI(((PrefixContext) context.getContext()).getConfiguration(), linkConfiguration);
|
||||
dataWriter = context.getDataWriter();
|
||||
|
||||
try {
|
||||
|
@ -17,12 +17,12 @@
|
||||
*/
|
||||
package org.apache.sqoop.connector.hdfs;
|
||||
|
||||
import org.apache.sqoop.connector.common.EmptyConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
|
||||
import org.apache.sqoop.job.etl.Destroyer;
|
||||
import org.apache.sqoop.job.etl.DestroyerContext;
|
||||
|
||||
public class HdfsFromDestroyer extends Destroyer<EmptyConfiguration, FromJobConfiguration> {
|
||||
public class HdfsFromDestroyer extends Destroyer<LinkConfiguration, FromJobConfiguration> {
|
||||
/**
|
||||
* Callback to clean up after job execution.
|
||||
*
|
||||
@ -31,7 +31,7 @@ public class HdfsFromDestroyer extends Destroyer<EmptyConfiguration, FromJobConf
|
||||
* @param jobConfig FROM job configuration object
|
||||
*/
|
||||
@Override
|
||||
public void destroy(DestroyerContext context, EmptyConfiguration linkConfig,
|
||||
public void destroy(DestroyerContext context, LinkConfiguration linkConfig,
|
||||
FromJobConfiguration jobConfig) {
|
||||
// do nothing at this point
|
||||
}
|
||||
|
@ -17,14 +17,14 @@
|
||||
*/
|
||||
package org.apache.sqoop.connector.hdfs;
|
||||
|
||||
import org.apache.sqoop.connector.common.EmptyConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
|
||||
import org.apache.sqoop.job.etl.Initializer;
|
||||
import org.apache.sqoop.job.etl.InitializerContext;
|
||||
import org.apache.sqoop.schema.Schema;
|
||||
|
||||
|
||||
public class HdfsFromInitializer extends Initializer<EmptyConfiguration, FromJobConfiguration> {
|
||||
public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobConfiguration> {
|
||||
/**
|
||||
* Initialize new submission based on given configuration properties. Any
|
||||
* needed temporary values might be saved to context object and they will be
|
||||
@ -35,13 +35,13 @@ public class HdfsFromInitializer extends Initializer<EmptyConfiguration, FromJob
|
||||
* @param jobConfig FROM job configuration object
|
||||
*/
|
||||
@Override
|
||||
public void initialize(InitializerContext context, EmptyConfiguration linkConfig,
|
||||
public void initialize(InitializerContext context, LinkConfiguration linkConfig,
|
||||
FromJobConfiguration jobConfig) {
|
||||
// do nothing at this point
|
||||
}
|
||||
|
||||
@Override
|
||||
public Schema getSchema(InitializerContext context, EmptyConfiguration linkConfig,
|
||||
public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig,
|
||||
FromJobConfiguration jobConfig) {
|
||||
return new Schema("HDFS file");
|
||||
}
|
||||
|
@ -26,7 +26,7 @@
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.sqoop.common.PrefixContext;
|
||||
import org.apache.sqoop.common.SqoopException;
|
||||
import org.apache.sqoop.connector.common.EmptyConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter;
|
||||
@ -37,22 +37,21 @@
|
||||
import org.apache.sqoop.job.etl.LoaderContext;
|
||||
import org.apache.sqoop.utils.ClassUtils;
|
||||
|
||||
public class HdfsLoader extends Loader<EmptyConfiguration, ToJobConfiguration> {
|
||||
public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
|
||||
/**
|
||||
* Load data to target.
|
||||
*
|
||||
* @param context Loader context object
|
||||
* @param linkConfig Link configuration
|
||||
* @param linkConfiguration Link configuration
|
||||
* @param toJobConfig Job configuration
|
||||
* @throws Exception
|
||||
*/
|
||||
@Override
|
||||
public void load(LoaderContext context, EmptyConfiguration linkConfig, ToJobConfiguration toJobConfig) throws Exception {
|
||||
public void load(LoaderContext context, LinkConfiguration linkConfiguration,
|
||||
ToJobConfiguration toJobConfig) throws Exception {
|
||||
|
||||
DataReader reader = context.getDataReader();
|
||||
|
||||
Configuration conf = ((PrefixContext)context.getContext()).getConfiguration();
|
||||
|
||||
Configuration conf = HdfsUtils.configureURI(((PrefixContext) context.getContext()).getConfiguration(), linkConfiguration);
|
||||
String directoryName = toJobConfig.toJobConfig.outputDirectory;
|
||||
String codecname = getCompressionCodecName(toJobConfig);
|
||||
|
||||
|
@ -40,8 +40,8 @@
|
||||
import org.apache.hadoop.net.NodeBase;
|
||||
import org.apache.sqoop.common.PrefixContext;
|
||||
import org.apache.sqoop.common.SqoopException;
|
||||
import org.apache.sqoop.connector.common.EmptyConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
|
||||
import org.apache.sqoop.job.etl.Partition;
|
||||
import org.apache.sqoop.job.etl.Partitioner;
|
||||
import org.apache.sqoop.job.etl.PartitionerContext;
|
||||
@ -50,7 +50,7 @@
|
||||
* This class derives mostly from CombineFileInputFormat of Hadoop, i.e.
|
||||
* org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.
|
||||
*/
|
||||
public class HdfsPartitioner extends Partitioner<EmptyConfiguration, FromJobConfiguration> {
|
||||
public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfiguration> {
|
||||
|
||||
public static final String SPLIT_MINSIZE_PERNODE =
|
||||
"mapreduce.input.fileinputformat.split.minsize.per.node";
|
||||
@ -68,9 +68,10 @@ public class HdfsPartitioner extends Partitioner<EmptyConfiguration, FromJobConf
|
||||
|
||||
@Override
|
||||
public List<Partition> getPartitions(PartitionerContext context,
|
||||
EmptyConfiguration emptyConfig, FromJobConfiguration fromJobConfig) {
|
||||
LinkConfiguration linkConfiguration,
|
||||
FromJobConfiguration fromJobConfig) {
|
||||
|
||||
Configuration conf = ((PrefixContext)context.getContext()).getConfiguration();
|
||||
Configuration conf = HdfsUtils.configureURI(((PrefixContext) context.getContext()).getConfiguration(), linkConfiguration);
|
||||
|
||||
try {
|
||||
long numInputBytes = getInputSize(conf, fromJobConfig.fromJobConfig.inputDirectory);
|
||||
|
@ -17,12 +17,12 @@
|
||||
*/
|
||||
package org.apache.sqoop.connector.hdfs;
|
||||
|
||||
import org.apache.sqoop.connector.common.EmptyConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
|
||||
import org.apache.sqoop.job.etl.Destroyer;
|
||||
import org.apache.sqoop.job.etl.DestroyerContext;
|
||||
|
||||
public class HdfsToDestroyer extends Destroyer<EmptyConfiguration, ToJobConfiguration> {
|
||||
public class HdfsToDestroyer extends Destroyer<LinkConfiguration, ToJobConfiguration> {
|
||||
/**
|
||||
* Callback to clean up after job execution.
|
||||
*
|
||||
@ -31,7 +31,7 @@ public class HdfsToDestroyer extends Destroyer<EmptyConfiguration, ToJobConfigur
|
||||
* @param jobConfig TO job configuration object
|
||||
*/
|
||||
@Override
|
||||
public void destroy(DestroyerContext context, EmptyConfiguration linkConfig,
|
||||
public void destroy(DestroyerContext context, LinkConfiguration linkConfig,
|
||||
ToJobConfiguration jobConfig) {
|
||||
// do nothing at this point
|
||||
}
|
||||
|
@ -17,14 +17,13 @@
|
||||
*/
|
||||
package org.apache.sqoop.connector.hdfs;
|
||||
|
||||
import org.apache.sqoop.connector.common.EmptyConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
|
||||
import org.apache.sqoop.job.etl.Initializer;
|
||||
import org.apache.sqoop.job.etl.InitializerContext;
|
||||
import org.apache.sqoop.schema.Schema;
|
||||
|
||||
|
||||
public class HdfsToInitializer extends Initializer<EmptyConfiguration, ToJobConfiguration> {
|
||||
public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfiguration> {
|
||||
/**
|
||||
* Initialize new submission based on given configuration properties. Any
|
||||
* needed temporary values might be saved to context object and they will be
|
||||
@ -35,13 +34,13 @@ public class HdfsToInitializer extends Initializer<EmptyConfiguration, ToJobConf
|
||||
* @param jobConfig TO job configuration object
|
||||
*/
|
||||
@Override
|
||||
public void initialize(InitializerContext context, EmptyConfiguration linkConfig,
|
||||
public void initialize(InitializerContext context, LinkConfiguration linkConfig,
|
||||
ToJobConfiguration jobConfig) {
|
||||
// do nothing at this point
|
||||
}
|
||||
|
||||
@Override
|
||||
public Schema getSchema(InitializerContext context, EmptyConfiguration linkConfig,
|
||||
public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig,
|
||||
ToJobConfiguration jobConfig) {
|
||||
return new Schema("HDFS file");
|
||||
}
|
||||
|
@ -0,0 +1,43 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.sqoop.connector.hdfs;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
|
||||
|
||||
/**
|
||||
* Utilities for HDFS.
|
||||
*/
|
||||
public class HdfsUtils {
|
||||
|
||||
/**
|
||||
* Configures the URI to connect to.
|
||||
* @param conf Configuration object to be configured.
|
||||
* @param linkConfiguration LinkConfiguration object that
|
||||
* provides configuration.
|
||||
* @return Configuration object.
|
||||
*/
|
||||
public static Configuration configureURI(Configuration conf, LinkConfiguration linkConfiguration) {
|
||||
if (linkConfiguration.linkConfig.uri != null) {
|
||||
conf.set("fs.default.name", linkConfiguration.linkConfig.uri);
|
||||
conf.set("fs.defaultFS", linkConfiguration.linkConfig.uri);
|
||||
}
|
||||
|
||||
return conf;
|
||||
}
|
||||
}
|
@ -0,0 +1,48 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.sqoop.connector.hdfs.configuration;
|
||||
|
||||
import org.apache.sqoop.model.ConfigClass;
|
||||
import org.apache.sqoop.model.Input;
|
||||
import org.apache.sqoop.model.Validator;
|
||||
import org.apache.sqoop.validation.Status;
|
||||
import org.apache.sqoop.validation.validators.AbstractValidator;
|
||||
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@ConfigClass(validators = {@Validator(LinkConfig.ConfigValidator.class)})
|
||||
public class LinkConfig {
|
||||
@Input(size = 255) public String uri;
|
||||
|
||||
public static class ConfigValidator extends AbstractValidator<LinkConfig> {
|
||||
private static final Pattern URI_PATTERN = Pattern.compile("((?<=\\()[A-Za-z][A-Za-z0-9\\+\\.\\-]*:([A-Za-z0-9\\.\\-_~:/\\?#\\[\\]@!\\$&'\\(\\)\\*\\+,;=]|%[A-Fa-f0-9]{2})+(?=\\)))|([A-Za-z][A-Za-z0-9\\+\\.\\-]*:([A-Za-z0-9\\.\\-_~:/\\?#\\[\\]@!\\$&'\\(\\)\\*\\+,;=]|%[A-Fa-f0-9]{2})+)");
|
||||
|
||||
@Override
|
||||
public void validate(LinkConfig config) {
|
||||
if (config.uri != null) {
|
||||
Matcher matcher = URI_PATTERN.matcher(config.uri);
|
||||
if (!matcher.matches()) {
|
||||
addMessage(Status.UNACCEPTABLE,
|
||||
"Invalid URI" + config.uri + ". URI must either be null or a valid URI. Here are a few valid example URIs:"
|
||||
+ " hdfs://example.com:8020/, hdfs://example.com/, file:///, file:///tmp, file://localhost/tmp");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,31 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.sqoop.connector.hdfs.configuration;
|
||||
|
||||
import org.apache.sqoop.model.Config;
|
||||
import org.apache.sqoop.model.ConfigurationClass;
|
||||
|
||||
@ConfigurationClass
|
||||
public class LinkConfiguration {
|
||||
@Config
|
||||
public LinkConfig linkConfig;
|
||||
|
||||
public LinkConfiguration() {
|
||||
linkConfig = new LinkConfig();
|
||||
}
|
||||
}
|
@ -17,6 +17,13 @@
|
||||
|
||||
############################
|
||||
|
||||
# Link Config
|
||||
linkConfig.label = Link configuration
|
||||
linkConfig.help = Here you supply information necessary to connect to HDFS
|
||||
|
||||
linkConfig.uri.label = HDFS URI
|
||||
linkConfig.uri.help = HDFS URI used to connect to HDFS
|
||||
|
||||
# To Job Config
|
||||
#
|
||||
toJobConfig.label = ToJob configuration
|
||||
|
@ -30,8 +30,8 @@
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.io.compress.DefaultCodec;
|
||||
import org.apache.sqoop.common.PrefixContext;
|
||||
import org.apache.sqoop.connector.common.EmptyConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
|
||||
import org.apache.sqoop.etl.io.DataWriter;
|
||||
import org.apache.sqoop.job.etl.Extractor;
|
||||
@ -52,7 +52,7 @@ public class TestExtractor extends TestHdfsBase {
|
||||
private ToFormat outputFileType;
|
||||
private Class<? extends CompressionCodec> compressionClass;
|
||||
private final String inputDirectory;
|
||||
private Extractor<EmptyConfiguration, FromJobConfiguration, HdfsPartition> extractor;
|
||||
private Extractor<LinkConfiguration, FromJobConfiguration, HdfsPartition> extractor;
|
||||
|
||||
public TestExtractor(ToFormat outputFileType,
|
||||
Class<? extends CompressionCodec> compressionClass)
|
||||
@ -131,7 +131,7 @@ public void writeRecord(Object obj) {
|
||||
}
|
||||
});
|
||||
|
||||
EmptyConfiguration emptyLinkConfig = new EmptyConfiguration();
|
||||
LinkConfiguration emptyLinkConfig = new LinkConfiguration();
|
||||
FromJobConfiguration emptyJobConfig = new FromJobConfiguration();
|
||||
HdfsPartition partition = createPartition(FileUtils.listDir(inputDirectory));
|
||||
|
||||
|
@ -0,0 +1,44 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.sqoop.connector.hdfs;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
|
||||
public class TestHdfsUtils {
|
||||
|
||||
@Test
|
||||
public void testConfigureURI() throws Exception {
|
||||
final String TEST_URI = "hdfs://argggg:1111";
|
||||
LinkConfiguration linkConfiguration = new LinkConfiguration();
|
||||
Configuration conf = new Configuration();
|
||||
|
||||
assertNotEquals(TEST_URI, conf.get("fs.default.name"));
|
||||
assertNotEquals(TEST_URI, conf.get("fs.defaultFS"));
|
||||
|
||||
linkConfiguration.linkConfig.uri = TEST_URI;
|
||||
|
||||
assertEquals(conf, HdfsUtils.configureURI(conf, linkConfiguration));
|
||||
assertEquals(TEST_URI, conf.get("fs.default.name"));
|
||||
assertEquals(TEST_URI, conf.get("fs.defaultFS"));
|
||||
}
|
||||
}
|
@ -0,0 +1,71 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.sqoop.connector.hdfs;
|
||||
|
||||
import org.apache.sqoop.connector.hdfs.configuration.LinkConfig;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestLinkConfig {
|
||||
@Test
|
||||
public void testValidURI() {
|
||||
String[] URIs = {
|
||||
"hdfs://localhost:8020",
|
||||
"hdfs://localhost:8020/",
|
||||
"hdfs://localhost:8020/test",
|
||||
"hdfs://localhost:8020/test/test",
|
||||
"hdfs://localhost:8020/test/",
|
||||
"hdfs://localhost/",
|
||||
"hdfs://localhost",
|
||||
"hdfs://a:8020",
|
||||
"file:///",
|
||||
"file://localhost/",
|
||||
"file://localhost/tmp",
|
||||
"file://localhost/tmp/"
|
||||
};
|
||||
for (String uri : URIs) {
|
||||
LinkConfig config = new LinkConfig();
|
||||
LinkConfig.ConfigValidator validator = new LinkConfig.ConfigValidator();
|
||||
config.uri = uri;
|
||||
validator.validate(config);
|
||||
assertTrue(uri, validator.getStatus().canProceed());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidURI() {
|
||||
String[] URIs = {
|
||||
"://localhost:8020",
|
||||
":///",
|
||||
"://",
|
||||
"hdfs:",
|
||||
"hdfs//",
|
||||
"file//localhost/",
|
||||
"-://localhost/"
|
||||
};
|
||||
for (String uri : URIs) {
|
||||
LinkConfig config = new LinkConfig();
|
||||
LinkConfig.ConfigValidator validator = new LinkConfig.ConfigValidator();
|
||||
config.uri = uri;
|
||||
validator.validate(config);
|
||||
assertFalse(uri, validator.getStatus().canProceed());
|
||||
}
|
||||
}
|
||||
}
|
@ -37,7 +37,7 @@
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
||||
import org.apache.sqoop.common.PrefixContext;
|
||||
import org.apache.sqoop.connector.common.EmptyConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.ToCompression;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
|
||||
@ -121,7 +121,7 @@ public Object readContent() {
|
||||
return null;
|
||||
}
|
||||
}, null);
|
||||
EmptyConfiguration linkConf = new EmptyConfiguration();
|
||||
LinkConfiguration linkConf = new LinkConfiguration();
|
||||
ToJobConfiguration jobConf = new ToJobConfiguration();
|
||||
jobConf.toJobConfig.outputDirectory = outputDirectory;
|
||||
jobConf.toJobConfig.compression = compression;
|
||||
|
@ -31,8 +31,8 @@
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.io.compress.DefaultCodec;
|
||||
import org.apache.sqoop.common.PrefixContext;
|
||||
import org.apache.sqoop.connector.common.EmptyConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
|
||||
import org.apache.sqoop.job.etl.Partition;
|
||||
import org.apache.sqoop.job.etl.Partitioner;
|
||||
@ -98,7 +98,7 @@ public void testPartitioner() {
|
||||
Configuration conf = new Configuration();
|
||||
PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
|
||||
PartitionerContext context = new PartitionerContext(prefixContext, 5, null);
|
||||
EmptyConfiguration linkConf = new EmptyConfiguration();
|
||||
LinkConfiguration linkConf = new LinkConfiguration();
|
||||
FromJobConfiguration jobConf = new FromJobConfiguration();
|
||||
|
||||
jobConf.fromJobConfig.inputDirectory = inputDirectory;
|
||||
|
Loading…
Reference in New Issue
Block a user