5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-05 03:49:14 +08:00

SQOOP-1751: Sqoop2: Rearrange LinkConfig and ToJobConfig of Kite Connector

(Qian Xu via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2015-01-06 00:27:27 -08:00
parent 4ffa806bac
commit ae26b9668e
15 changed files with 315 additions and 84 deletions

View File

@ -15,25 +15,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kite.util;
package org.apache.sqoop.validation.validators;
import com.google.common.base.Strings;
import org.apache.sqoop.validation.Status;
import java.util.regex.Pattern;
/**
* The helper class arranges to validate user inputs.
* Ensure that given string represents a Kite dataset uri.
*/
public class InputValidation {
public class DatasetURIValidator extends AbstractValidator<String> {
private static Pattern DATASET_URI_PATTERN = Pattern
private static final Pattern DATASET_URI_PATTERN = Pattern
.compile("^dataset:(hive|hdfs|file):.*$");
/**
* Validates the correctness of user input dataset uri.
*/
public static void validateDatasetUriScheme(String uri)
throws IllegalArgumentException {
@Override
public void validate(String uri) {
if (Strings.isNullOrEmpty(uri)) {
addMessage(Status.ERROR, "Cannot be null nor empty");
return;
}
if (!DATASET_URI_PATTERN.matcher(uri).matches()) {
throw new IllegalArgumentException("Invalid dataset URI: " + uri);
addMessage(Status.ERROR, "Invalid dataset URI: " + uri);
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.validation.validators;
import com.google.common.base.Strings;
import org.apache.sqoop.validation.Status;
import java.net.URI;
import java.net.URISyntaxException;
/**
* Ensure that given string represents a hostname or hostname:port.
*/
public class HostAndPortValidator extends AbstractValidator<String> {
@Override
public void validate(String hostPortString) {
if (Strings.isNullOrEmpty(hostPortString)) {
addMessage(Status.ERROR, "Cannot be null nor empty");
return;
}
boolean valid = false;
try {
URI uri = new URI("hdfs://" + hostPortString);
valid = uri.getHost() != null &&
(!hostPortString.contains(":") || uri.getPort() > -1);
} catch (URISyntaxException ignored) {
}
if (!valid) {
addMessage(Status.ERROR, "Invalid host and port string: " +
hostPortString);
}
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.validation.validators;
import org.apache.sqoop.validation.Status;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class TestHostAndPortValidator {
AbstractValidator<String> validator = new HostAndPortValidator();
@Before
public void setUp() {
validator.reset();
assertEquals(0, validator.getMessages().size());
}
@Test
public void testValidHostAndPort() {
expectValid("host1:8020");
}
@Test
public void testValidHost() {
expectValid("host1");
}
private void expectValid(String input) {
validator.validate(input);
assertEquals(Status.OK, validator.getStatus());
assertEquals(0, validator.getMessages().size());
}
@Test
public void testInvalidPort() {
expectInvalid("host1:invalid_port");
}
@Test
public void testNegativePort() {
expectInvalid("host1:-1");
}
@Test
public void testHostNameWithInvalidChars() {
expectInvalid("hostname has space:8020");
}
private void expectInvalid(String input) {
validator.validate(input);
assertEquals(Status.ERROR, validator.getStatus());
assertEquals(1, validator.getMessages().size());
}
}

View File

@ -18,15 +18,12 @@
*/
package org.apache.sqoop.connector.kite;
import org.apache.log4j.Logger;
import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MLinkConfig;
import org.apache.sqoop.model.MToConfig;
//NOTE: All config types have the similar upgrade path at this point
public class KiteConnectorUpgrader extends ConnectorConfigurableUpgrader {
@Override

View File

@ -20,6 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.log4j.Logger;
import org.apache.sqoop.connector.common.FileFormat;
import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
import org.apache.sqoop.etl.io.DataReader;
@ -50,9 +51,11 @@ protected KiteDatasetExecutor getExecutor(String uri, Schema schema,
@Override
public void load(LoaderContext context, LinkConfiguration linkConfig,
ToJobConfiguration jobConfig) throws Exception {
KiteDatasetExecutor executor = getExecutor(jobConfig.toJobConfig.uri,
context.getSchema(), linkConfig.linkConfig.fileFormat);
ToJobConfiguration toJobConfig) throws Exception {
String uri = ConfigUtil.buildDatasetUri(
linkConfig.linkConfig, toJobConfig.toJobConfig);
KiteDatasetExecutor executor = getExecutor(
uri, context.getSchema(), toJobConfig.toJobConfig.fileFormat);
LOG.info("Temporary dataset created.");
DataReader reader = context.getDataReader();

View File

@ -20,6 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.log4j.Logger;
import org.apache.sqoop.connector.common.FileFormat;
import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
import org.apache.sqoop.job.etl.Destroyer;
@ -39,23 +40,23 @@ public class KiteToDestroyer extends Destroyer<LinkConfiguration,
@Override
public void destroy(DestroyerContext context,
LinkConfiguration linkConfig, ToJobConfiguration jobConfig) {
LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
LOG.info("Running Kite connector destroyer");
String[] uris = KiteDatasetExecutor.listTemporaryDatasetUris(
jobConfig.toJobConfig.uri);
String uri = ConfigUtil.buildDatasetUri(
linkConfig.linkConfig, toJobConfig.toJobConfig);
String[] tempUris = KiteDatasetExecutor.listTemporaryDatasetUris(uri);
if (context.isSuccess()) {
KiteDatasetExecutor executor = getExecutor(
jobConfig.toJobConfig.uri, context.getSchema(),
linkConfig.linkConfig.fileFormat);
for (String uri : uris) {
executor.mergeDataset(uri);
LOG.info(String.format("Temporary dataset %s has been merged", uri));
uri, context.getSchema(), toJobConfig.toJobConfig.fileFormat);
for (String tempUri : tempUris) {
executor.mergeDataset(tempUri);
LOG.info(String.format("Temporary dataset %s has been merged", tempUri));
}
} else {
for (String uri : uris) {
KiteDatasetExecutor.deleteDataset(uri);
for (String tempUri : tempUris) {
KiteDatasetExecutor.deleteDataset(tempUri);
LOG.warn(String.format("Failed to import. " +
"Temporary dataset %s has been deleted", uri));
"Temporary dataset %s has been deleted", tempUri));
}
}
}

View File

@ -20,6 +20,7 @@
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.common.FileFormat;
import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
import org.apache.sqoop.job.etl.Initializer;
@ -42,8 +43,10 @@ public class KiteToInitializer extends Initializer<LinkConfiguration,
@Override
public void initialize(InitializerContext context,
LinkConfiguration linkConfig, ToJobConfiguration jobConfig) {
if (KiteDatasetExecutor.datasetExists(jobConfig.toJobConfig.uri)) {
LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
String uri = ConfigUtil.buildDatasetUri(
linkConfig.linkConfig, toJobConfig.toJobConfig);
if (KiteDatasetExecutor.datasetExists(uri)) {
LOG.error("Overwrite an existing dataset is not expected in new create mode.");
throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0001);
}
@ -56,7 +59,7 @@ public Set<String> getJars(InitializerContext context,
jars.add(ClassUtils.jarForClass("org.kitesdk.data.Formats"));
jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.databind.JsonNode"));
jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.core.TreeNode"));
if (FileFormat.CSV.equals(linkConfig.linkConfig.fileFormat)) {
if (FileFormat.CSV.equals(toJobConfig.toJobConfig.fileFormat)) {
jars.add(ClassUtils.jarForClass("au.com.bytecode.opencsv.CSVWriter"));
}
return jars;
@ -64,7 +67,7 @@ public Set<String> getJars(InitializerContext context,
@Override
public Schema getSchema(InitializerContext context,
LinkConfiguration linkConfig, ToJobConfiguration jobConfig) {
LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
return NullSchema.getInstance();
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kite.configuration;
import com.google.common.base.Strings;
public class ConfigUtil {
/**
* Returns a dataset uri, including the filesystem location part, if it is
* provided separated,
*/
public static String buildDatasetUri(String fsLocation, String uri) {
if (!Strings.isNullOrEmpty(fsLocation) && !uri.contains("://")) {
// Add fsLocation after the second colon
int p = uri.indexOf(":", uri.indexOf(":") + 1);
return uri.substring(0, p + 1) + "//" + fsLocation + uri.substring(p + 1);
}
return uri;
}
/**
* Returns a dataset uri, including the filesystem location part, if it is
* provided separated,
*/
public static String buildDatasetUri(LinkConfig linkConfig,
ToJobConfig toJobConfig) {
return buildDatasetUri(linkConfig.hdfsHostAndPort, toJobConfig.uri);
}
}

View File

@ -17,14 +17,33 @@
*/
package org.apache.sqoop.connector.kite.configuration;
import org.apache.sqoop.connector.common.FileFormat;
import com.google.common.base.Strings;
import org.apache.sqoop.model.ConfigClass;
import org.apache.sqoop.model.Input;
import org.apache.sqoop.model.Validator;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.validators.AbstractValidator;
import org.apache.sqoop.validation.validators.HostAndPortValidator;
@ConfigClass
@ConfigClass(validators = {@Validator(LinkConfig.ConfigValidator.class)})
public class LinkConfig {
@Input
public FileFormat fileFormat = FileFormat.AVRO;
@Input(size = 255)
public String hdfsHostAndPort;
public static class ConfigValidator extends AbstractValidator<LinkConfig> {
@Override
public void validate(LinkConfig config) {
// TODO: There is no way to declare it as optional (SQOOP-1643), we cannot validate it directly using HostAndPortValidator.
if (!Strings.isNullOrEmpty(config.hdfsHostAndPort)) {
HostAndPortValidator validator = new HostAndPortValidator();
validator.validate(config.hdfsHostAndPort);
if (!validator.getStatus().equals(Status.OK)) {
addMessage(validator.getStatus(), getMessages().toString());
}
}
}
}
}

View File

@ -17,30 +17,20 @@
*/
package org.apache.sqoop.connector.kite.configuration;
import org.apache.sqoop.connector.kite.util.InputValidation;
import org.apache.sqoop.connector.common.FileFormat;
import org.apache.sqoop.model.ConfigClass;
import org.apache.sqoop.model.Input;
import org.apache.sqoop.model.Validator;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.validators.AbstractValidator;
import org.apache.sqoop.validation.validators.DatasetURIValidator;
import org.apache.sqoop.validation.validators.NotNull;
@ConfigClass(validators = {@Validator(ToJobConfig.ConfigValidator.class)})
@ConfigClass
public class ToJobConfig {
@Input(size = 255)
@Input(size = 255, validators = {@Validator(DatasetURIValidator.class)})
public String uri;
public static class ConfigValidator extends AbstractValidator<ToJobConfig> {
@Override
public void validate(ToJobConfig config) {
try {
InputValidation.validateDatasetUriScheme(config.uri);
} catch (IllegalArgumentException ex) {
addMessage(Status.ERROR, ex.toString());
}
}
}
@Input(validators = {@Validator(NotNull.class)})
public FileFormat fileFormat;
}

View File

@ -22,11 +22,8 @@ linkConfig.label = Link Configuration
linkConfig.help = You must supply the information requested in order to create a \
connection object.
linkConfig.fileFormat.label = File format
linkConfig.fileFormat.help = Format in which data should be serialized
linkConfig.compression.label = Compression format
linkConfig.compression.help = Compression that should be used for the data
linkConfig.hdfsHostAndPort.label = HDFS host and port
linkConfig.hdfsHostAndPort.help = Optional to override HDFS file system location.
# To Job Config
#
@ -36,5 +33,8 @@ toJobConfig.help = You must supply the information requested in order to \
toJobConfig.uri.label = Dataset URI
toJobConfig.uri.help = Location to store dataset (i.e. \
"dataset:hdfs://host:port/user/me/job", \
"dataset:hive://host:port/table")
"dataset:hdfs://<host>[:port]/<path>/<namespace>/<dataset>", \
"dataset:hive://<namespace>/<dataset>")
toJobConfig.fileFormat.label = File format
toJobConfig.fileFormat.help = Specify storage format to create a dataset and cannot be changed.

View File

@ -31,14 +31,13 @@
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.MockitoAnnotations.Mock;
import static org.mockito.MockitoAnnotations.initMocks;
public class TestKiteLoader {
private KiteLoader loader;
@Mock
@org.mockito.Mock
private KiteDatasetExecutor executorMock;
@Before
@ -84,10 +83,10 @@ public Object readContent() {
};
LoaderContext context = new LoaderContext(null, reader, schema);
LinkConfiguration linkConfig = new LinkConfiguration();
ToJobConfiguration jobConfig = new ToJobConfiguration();
ToJobConfiguration toJobConfig = new ToJobConfiguration();
// exercise
loader.load(context, linkConfig, jobConfig);
loader.load(context, linkConfig, toJobConfig);
// verify
verify(executorMock, times(NUMBER_OF_ROWS)).writeRecord(

View File

@ -32,7 +32,6 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.Mock;
import static org.mockito.MockitoAnnotations.initMocks;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.verifyStatic;
@ -45,11 +44,11 @@ public class TestKiteToDestroyer {
private LinkConfiguration linkConfig;
private ToJobConfiguration jobConfig;
private ToJobConfiguration toJobConfig;
private final String[] expectedUris = new String[]{"a", "b"};
@Mock
@org.mockito.Mock
private KiteDatasetExecutor executorMock;
@Before
@ -66,20 +65,20 @@ protected KiteDatasetExecutor getExecutor(String uri, Schema schema,
};
linkConfig = new LinkConfiguration();
linkConfig.linkConfig.fileFormat = FileFormat.AVRO;
jobConfig = new ToJobConfiguration();
jobConfig.toJobConfig.uri = "dataset:file:/foo/bar";
toJobConfig = new ToJobConfiguration();
toJobConfig.toJobConfig.uri = "dataset:file:/foo/bar";
toJobConfig.toJobConfig.fileFormat = FileFormat.AVRO;
}
@Test
public void testDestroyForSuccessfulJob() {
// setup
DestroyerContext context = new DestroyerContext(null, true, null);
when(KiteDatasetExecutor.listTemporaryDatasetUris(jobConfig.toJobConfig.uri))
when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri))
.thenReturn(expectedUris);
// exercise
destroyer.destroy(context, linkConfig, jobConfig);
destroyer.destroy(context, linkConfig, toJobConfig);
// verify
for (String uri : expectedUris) {
@ -91,14 +90,14 @@ public void testDestroyForSuccessfulJob() {
public void testDestroyForFailedJob() {
// setup
DestroyerContext context = new DestroyerContext(null, false, null);
when(KiteDatasetExecutor.listTemporaryDatasetUris(jobConfig.toJobConfig.uri))
when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri))
.thenReturn(expectedUris);
for (String uri : expectedUris) {
when(KiteDatasetExecutor.deleteDataset(uri)).thenReturn(true);
}
// exercise
destroyer.destroy(context, linkConfig, jobConfig);
destroyer.destroy(context, linkConfig, toJobConfig);
// verify
for (String uri : expectedUris) {

View File

@ -19,6 +19,7 @@
package org.apache.sqoop.connector.kite;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
import org.apache.sqoop.schema.Schema;
import org.junit.Before;
@ -30,7 +31,6 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.Mock;
import static org.mockito.MockitoAnnotations.initMocks;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
@ -40,7 +40,7 @@ public class TestKiteToInitializer {
private KiteToInitializer initializer;
@Mock
@org.mockito.Mock
private KiteDatasetExecutor executorMock;
@Before
@ -54,25 +54,27 @@ public void setUp() {
@Test
public void testInitializePassed() {
// setup
ToJobConfiguration jobConfig = new ToJobConfiguration();
jobConfig.toJobConfig.uri = "dataset:file:/ds/not/exist";
when(KiteDatasetExecutor.datasetExists(jobConfig.toJobConfig.uri))
LinkConfiguration linkConfig = new LinkConfiguration();
ToJobConfiguration toJobConfig = new ToJobConfiguration();
toJobConfig.toJobConfig.uri = "dataset:file:/ds/not/exist";
when(KiteDatasetExecutor.datasetExists(toJobConfig.toJobConfig.uri))
.thenReturn(false);
// exercise
initializer.initialize(null, null, jobConfig);
initializer.initialize(null, linkConfig, toJobConfig);
}
@Test(expected = SqoopException.class)
public void testInitializeFailed() {
// setup
ToJobConfiguration jobConfig = new ToJobConfiguration();
jobConfig.toJobConfig.uri = "dataset:file:/ds/exist";
when(KiteDatasetExecutor.datasetExists(jobConfig.toJobConfig.uri))
LinkConfiguration linkConfig = new LinkConfiguration();
ToJobConfiguration toJobConfig = new ToJobConfiguration();
toJobConfig.toJobConfig.uri = "dataset:file:/ds/exist";
when(KiteDatasetExecutor.datasetExists(toJobConfig.toJobConfig.uri))
.thenReturn(true);
// exercise
initializer.initialize(null, null, jobConfig);
initializer.initialize(null, linkConfig, toJobConfig);
}
@Test

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kite.configuration;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
/**
* Test configuration objects.
*/
public class TestConfigUtil {
@Test
public void testBuildDatasetUri() {
String actual = ConfigUtil.buildDatasetUri("namenode:8020",
"dataset:hdfs:/path/to/ds");
assertEquals("dataset:hdfs://namenode:8020/path/to/ds", actual);
}
@Test
public void testBuildDatasetUriHdfsHostPortIgnored() {
String expected = "dataset:hdfs://namenode2:8020/path/to/ds";
String actual = ConfigUtil.buildDatasetUri("namenode:8020", expected);
assertEquals(expected, actual);
}
}