5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-21 19:31:13 +08:00

SQOOP-1852: Sqoop2: Kafka connector supporting TO direction

(Gwen Shapira via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2014-12-11 17:53:48 -06:00
parent 9151d305af
commit e2e5aa8bf5
27 changed files with 1376 additions and 4 deletions

View File

@ -60,6 +60,21 @@ limitations under the License.
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
</dependencies>
<profiles>

View File

@ -0,0 +1,98 @@
/**
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.
*/
package org.apache.sqoop.common.test.kafka;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* A Kafka Consumer implementation. This uses the current thread to fetch the
* next message from the queue and doesn't use a multi threaded implementation.
* So this implements a synchronous blocking call.
* To avoid infinite waiting, a timeout is implemented to wait only for
* 1 second before concluding that the message will not be available.
*/
public class KafkaConsumer {
private static final Logger logger = LoggerFactory.getLogger(
KafkaConsumer.class);
private final ConsumerConnector consumer;
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap;
public KafkaConsumer() {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(TestUtil.getInstance().getZkUrl(), "group_1"));
}
private static ConsumerConfig createConsumerConfig(String zkUrl,
String groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", zkUrl);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "1000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
props.put("consumer.timeout.ms","1000");
return new ConsumerConfig(props);
}
public void initTopicList(List<String> topics) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
for (String topic : topics) {
// we need only single threaded consumers
topicCountMap.put(topic, new Integer(1));
}
consumerMap = consumer.createMessageStreams(topicCountMap);
}
public MessageAndMetadata getNextMessage(String topic) {
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// it has only a single stream, because there is only one consumer
KafkaStream stream = streams.get(0);
final ConsumerIterator<byte[], byte[]> it = stream.iterator();
int counter = 0;
try {
if (it.hasNext()) {
return it.next();
} else {
return null;
}
} catch (ConsumerTimeoutException e) {
logger.error("0 messages available to fetch for the topic " + topic);
return null;
}
}
public void shutdown() {
consumer.shutdown();
}
}

View File

@ -0,0 +1,57 @@
/**
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.
*/
package org.apache.sqoop.common.test.kafka;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
/**
* A local Kafka server for running unit tests.
* Reference: https://gist.github.com/fjavieralba/7930018/
*/
public class KafkaLocal {
public KafkaServerStartable kafka;
public ZooKeeperLocal zookeeper;
private KafkaConfig kafkaConfig;
public KafkaLocal(Properties kafkaProperties) throws IOException,
InterruptedException{
kafkaConfig = new KafkaConfig(kafkaProperties);
//start local kafka broker
kafka = new KafkaServerStartable(kafkaConfig);
}
public void start() throws Exception{
kafka.startup();
}
public void stop() throws IOException {
kafka.shutdown();
File dir = new File(kafkaConfig.logDirs().head()).getAbsoluteFile();
FileUtils.deleteDirectory(dir);
}
}

View File

@ -0,0 +1,162 @@
/**
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.
*/
package org.apache.sqoop.common.test.kafka;
import kafka.message.MessageAndMetadata;
import org.apache.sqoop.common.test.utils.NetworkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Properties;
import java.util.Random;
/**
* A utility class for starting/stopping Kafka Server.
*/
public class TestUtil {
private static final Logger logger = LoggerFactory.getLogger(TestUtil.class);
private static TestUtil instance = new TestUtil();
private Random randPortGen = new Random(System.currentTimeMillis());
private KafkaLocal kafkaServer;
private ZooKeeperLocal zookeeperServer;
private KafkaConsumer kafkaConsumer;
private String hostname = "localhost";
private int kafkaLocalPort = 9022;
private int zkLocalPort = 2188;
private TestUtil() {
init();
}
public static TestUtil getInstance() {
return instance;
}
private void init() {
// get the localhost.
try {
hostname = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
logger.warn("Error getting the value of localhost. " +
"Proceeding with 'localhost'.", e);
}
}
private boolean startKafkaServer() throws IOException {
kafkaLocalPort = NetworkUtils.findAvailablePort();
zkLocalPort = NetworkUtils.findAvailablePort();
logger.info("Starting kafka server with kafka port " + kafkaLocalPort +
" and zookeeper port " + zkLocalPort );
try {
//start local Zookeeper
zookeeperServer = new ZooKeeperLocal(zkLocalPort);
logger.info("ZooKeeper instance is successfully started on port " +
zkLocalPort);
Properties kafkaProperties = getKafkaProperties();
kafkaServer = new KafkaLocal(kafkaProperties);
kafkaServer.start();
logger.info("Kafka Server is successfully started on port " +
kafkaLocalPort);
return true;
} catch (Exception e) {
logger.error("Error starting the Kafka Server.", e);
return false;
}
}
Properties getKafkaProperties() {
Properties kafkaProps = new Properties();
kafkaProps.put("broker.id","0");
// Kafka expects strings for all properties and KafkaConfig will throw an exception otherwise
kafkaProps.put("port",Integer.toString(kafkaLocalPort));
kafkaProps.put("log.dirs","target/kafka-logs");
kafkaProps.put("num.partitions","1");
kafkaProps.put("zookeeper.connect",zookeeperServer.getConnectString());
return kafkaProps;
}
private KafkaConsumer getKafkaConsumer() {
synchronized (this) {
if (kafkaConsumer == null) {
kafkaConsumer = new KafkaConsumer();
}
}
return kafkaConsumer;
}
public void initTopicList(List<String> topics) {
getKafkaConsumer().initTopicList(topics);
}
public MessageAndMetadata getNextMessageFromConsumer(String topic) {
return getKafkaConsumer().getNextMessage(topic);
}
public void prepare() throws IOException {
boolean startStatus = startKafkaServer();
if (!startStatus) {
throw new RuntimeException("Error starting the server!");
}
try {
Thread.sleep(3 * 1000); // add this sleep time to
// ensure that the server is fully started before proceeding with tests.
} catch (InterruptedException e) {
// ignore
}
getKafkaConsumer();
logger.info("Completed the prepare phase.");
}
public void tearDown() throws IOException {
logger.info("Shutting down the Kafka Consumer.");
getKafkaConsumer().shutdown();
try {
Thread.sleep(3 * 1000); // add this sleep time to
// ensure that the server is fully started before proceeding with tests.
} catch (InterruptedException e) {
// ignore
}
logger.info("Shutting down the kafka Server.");
kafkaServer.stop();
logger.info("Shutting down Zookeeper Server.");
zookeeperServer.stopZookeeper();
logger.info("Completed the tearDown phase.");
}
public String getZkUrl() {
return zookeeperServer.getConnectString();
}
public String getKafkaServerUrl() {
return "localhost:"+kafkaLocalPort;
}
}

View File

@ -0,0 +1,72 @@
/**
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.
*/
package org.apache.sqoop.common.test.kafka;
import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Properties;
public class ZooKeeperLocal {
private int zkPort;
private ZooKeeperServer zookeeper;
private NIOServerCnxnFactory factory;
File dir;
public ZooKeeperLocal(int zkPort){
int numConnections = 5000;
int tickTime = 2000;
this.zkPort = zkPort;
String dataDirectory = "target";
dir = new File(dataDirectory, "zookeeper").getAbsoluteFile();
try {
this.zookeeper = new ZooKeeperServer(dir,dir,tickTime);
this.factory = new NIOServerCnxnFactory();
factory.configure(new InetSocketAddress("127.0.0.1",zkPort),0);
factory.startup(zookeeper);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void stopZookeeper() throws IOException {
zookeeper.shutdown();
factory.shutdown();
FileUtils.deleteDirectory(dir);
}
public String getConnectString() {
return "127.0.0.1:"+zkPort;
}
}

View File

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>connector</artifactId>
<groupId>org.apache.sqoop</groupId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-kafka</artifactId>
<name>Sqoop Kafka Connector</name>
<dependencies>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-spi</artifactId>
</dependency>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>connector-sdk</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-common-test</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kafka;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.VersionInfo;
import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration;
import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.job.etl.From;
import org.apache.sqoop.job.etl.To;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.ResourceBundle;
public class KafkaConnector extends SqoopConnector {
private static final To TO = new To(
KafkaToInitializer.class,
KafkaLoader.class,
KafkaToDestroyer.class);
/**
* Retrieve connector version.
*
* @return Version encoded as a string
*/
@Override
public String getVersion() {
return VersionInfo.getBuildVersion();
}
/**
* @param locale
* @return the resource bundle associated with the given locale.
*/
@Override
public ResourceBundle getBundle(Locale locale) {
return ResourceBundle.getBundle(KafkaConstants.RESOURCE_BUNDLE_NAME, locale);
}
/**
* @return Get link configuration group class
*/
@Override
public Class getLinkConfigurationClass() {
return LinkConfiguration.class;
}
/**
* @param direction
* @return Get job configuration group class per direction type or null if
* not supported
*/
@Override
public Class getJobConfigurationClass(Direction direction) {
return ToJobConfiguration.class;
}
@Override
public List<Direction> getSupportedDirections() {
// TODO: Remove when we add the FROM part of the connector (SQOOP-1583)
return Arrays.asList(Direction.TO);
}
/**
* @return an <tt>From</tt> that provides classes for performing import.
*/
@Override
public From getFrom() {
//TODO: SQOOP-1583
return null;
}
/**
* @return an <tt>To</tt> that provides classes for performing export.n
*/
@Override
public To getTo() {
return TO;
}
/**
* Returns an {@linkplain org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader} object that can upgrade the
* configs related to the link and job
*
* @return RespositoryUpgrader object
*/
@Override
public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
// Nothing to upgrade at this point
return null;
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kafka;
import org.apache.sqoop.common.ErrorCode;
public enum KafkaConnectorErrors implements ErrorCode {
KAFKA_CONNECTOR_0000("Unknown error occurred."),
KAFKA_CONNECTOR_0001("Error occurred while sending data to Kafka")
;
private final String message;
private KafkaConnectorErrors(String message) {
this.message = message;
}
@Override
public String getCode() {
return name();
}
/**
* @return the message associated with error code.
*/
@Override
public String getMessage() {
return message;
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kafka;
import org.apache.sqoop.job.Constants;
public class KafkaConstants extends Constants {
// Resource bundle name
public static final String RESOURCE_BUNDLE_NAME = "kafka-connector-config";
// Kafka properties keys
public static final String MESSAGE_SERIALIZER_KEY = "serializer.class";
public static final String KEY_SERIALIZER_KEY = "key.serializer.class";
public static final String BROKER_LIST_KEY = "metadata.broker.list";
public static final String REQUIRED_ACKS_KEY = "request.required.acks";
public static final String PRODUCER_TYPE = "producer.type";
// Kafka properties default values
public static final String DEFAULT_MESSAGE_SERIALIZER =
"kafka.serializer.StringEncoder";
public static final String DEFAULT_KEY_SERIALIZER =
"kafka.serializer.StringEncoder";
public static final String DEFAULT_REQUIRED_ACKS = "-1";
public static final String DEFAULT_PRODUCER_TYPE = "sync";
public static final int DEFAULT_BATCH_SIZE = 100;
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kafka;
import kafka.producer.KeyedMessage;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
public class KafkaLoader extends Loader<LinkConfiguration,ToJobConfiguration> {
private static final Logger LOG = Logger.getLogger(KafkaLoader.class);
private List<KeyedMessage<String, String>> messageList =
new ArrayList<KeyedMessage<String, String>>(KafkaConstants.DEFAULT_BATCH_SIZE);
private Producer producer;
@Override
public void load(LoaderContext context,LinkConfiguration linkConfiguration, ToJobConfiguration jobConfiguration) throws
Exception {
producer = getProducer(linkConfiguration);
LOG.info("got producer");
String topic = jobConfiguration.toJobConfig.topic;
LOG.info("topic is:"+topic);
String batchUUID = UUID.randomUUID().toString();
String record;
while ((record = context.getDataReader().readTextRecord()) != null) {
// create a message and add to buffer
KeyedMessage<String, String> data = new KeyedMessage<String, String>
(topic, null, batchUUID, record);
messageList.add(data);
// If we have enough messages, send the batch to Kafka
if (messageList.size() >= KafkaConstants.DEFAULT_BATCH_SIZE) {
sendToKafka(messageList);
}
}
if (messageList.size() > 0) {
sendToKafka(messageList);
}
producer.close();
}
private void sendToKafka(List<KeyedMessage<String,String>> messageList) {
try {
producer.send(messageList);
messageList.clear();
} catch (Exception ex) {
throw new SqoopException(KafkaConnectorErrors.KAFKA_CONNECTOR_0001);
}
}
/**
* Initialize a Kafka producer using configs in LinkConfiguration
* @param linkConfiguration
* @return
*/
Producer getProducer(LinkConfiguration linkConfiguration) {
Properties kafkaProps = generateDefaultKafkaProps();
kafkaProps.put(KafkaConstants.BROKER_LIST_KEY, linkConfiguration.linkConfig.brokerList);
ProducerConfig config = new ProducerConfig(kafkaProps);
return new Producer<String, String>(config);
}
/**
* Generate producer properties object with some defaults
* @return
*/
private Properties generateDefaultKafkaProps() {
Properties props = new Properties();
props.put(KafkaConstants.MESSAGE_SERIALIZER_KEY,
KafkaConstants.DEFAULT_MESSAGE_SERIALIZER);
props.put(KafkaConstants.KEY_SERIALIZER_KEY,
KafkaConstants.DEFAULT_KEY_SERIALIZER);
props.put(KafkaConstants.REQUIRED_ACKS_KEY,
KafkaConstants.DEFAULT_REQUIRED_ACKS);
props.put(KafkaConstants.PRODUCER_TYPE,KafkaConstants.DEFAULT_PRODUCER_TYPE);
return props;
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kafka;
import org.apache.log4j.Logger;
import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
public class KafkaToDestroyer extends Destroyer<LinkConfiguration,ToJobConfiguration> {
private static final Logger LOG = Logger.getLogger(KafkaToDestroyer.class);
public void destroy(DestroyerContext context, LinkConfiguration linkConfiguration, ToJobConfiguration jobConfiguration) {
LOG.info("Running Kafka Connector destroyer. This does nothing except log this message.");
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kafka;
import org.apache.log4j.Logger;
import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.utils.ClassUtils;
import java.util.List;
public class KafkaToInitializer extends Initializer<LinkConfiguration,ToJobConfiguration> {
private static final Logger LOG = Logger.getLogger(KafkaToInitializer.class);
@Override
public void initialize(InitializerContext context,LinkConfiguration linkConfiguration, ToJobConfiguration jobConfiguration) {
LOG.info("Running Kafka Connector initializer. This does nothing except log this message.");
}
@Override
public List<String> getJars(InitializerContext context, LinkConfiguration
linkConfiguration, ToJobConfiguration toJobConfiguration) {
List<String> jars = super.getJars(context, linkConfiguration, toJobConfiguration);
// Jars for Kafka, Scala and Yammer (required by Kafka)
jars.add(ClassUtils.jarForClass("kafka.javaapi.producer.Producer"));
jars.add(ClassUtils.jarForClass("scala.collection.immutable.StringLike"));
jars.add(ClassUtils.jarForClass("com.yammer.metrics.Metrics"));
return jars;
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kafka.configuration;
import org.apache.sqoop.model.ConfigClass;
import org.apache.sqoop.model.Input;
import org.apache.sqoop.model.Validator;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.validators.AbstractValidator;
@ConfigClass
public class LinkConfig {
@Input(size=1024, validators = { @Validator(CSVURIValidator.class) }) public String brokerList;
@Input(size=255, validators = { @Validator(CSVURIValidator.class) }) public String zookeeperConnect;
public static class CSVURIValidator extends AbstractValidator<String> {
// validate that given string is a comma-separated list of host:port
@Override
public void validate(String str) {
if(str == null || str !="") {
String[] pairs = str.split("\\s*,\\s*");
for (String pair: pairs) {
String[] parts = pair.split("\\s*:\\s*");
if (parts.length == 1) {
addMessage(Status.ERROR,"can't parse into host:port pairs");
}
}
} else {
addMessage(Status.ERROR, "Can't be null nor empty");
}
}
}
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kafka.configuration;
import org.apache.sqoop.model.Config;
import org.apache.sqoop.model.ConfigurationClass;
@ConfigurationClass
public class LinkConfiguration {
@Config
public LinkConfig linkConfig;
public LinkConfiguration() {
linkConfig = new LinkConfig();
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kafka.configuration;
import org.apache.sqoop.model.ConfigClass;
import org.apache.sqoop.model.Input;
import org.apache.sqoop.model.Validator;
import org.apache.sqoop.validation.validators.NotEmpty;
@ConfigClass
public class ToJobConfig {
@Input(size=255, validators = { @Validator(NotEmpty.class) }) public String topic;
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kafka.configuration;
import org.apache.sqoop.model.Config;
import org.apache.sqoop.model.ConfigurationClass;
@ConfigurationClass
public class ToJobConfiguration {
@Config
public ToJobConfig toJobConfig;
public ToJobConfiguration() {
toJobConfig = new ToJobConfig();
}
}

View File

@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Generic Kafka Connector Resources
############################
# Link Config
linkConfig.label = Link configuration
linkConfig.help = Here you supply information necessary to connect to Kafka
linkConfig.brokerList.label = List of Kafka brokers
linkConfig.brokerList.help = Comma-separated list of Kafka brokers in the form of host:port. \
It doesn't need to contain all brokers, but at least two are recommended for high availability
linkConfig.zookeeperConnect.label = Zookeeper address
linkConfig.zookeeperConnect.help = Address of Zookeeper used by the Kafka cluster. Usually host:port. \
Multiple zookeeper nodes are supported. If Kafka is stored in its own znode \
use host:port\kafka
# To Job Config
#
toJobConfig.label = ToJob configuration
toJobConfig.help = Configuration necessary when writing data to Kafka
toJobConfig.topic.label = Kafka topic
toJobConfig.topic.help = Name of Kafka topic where we'll send the data

View File

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Generic Kafka Connector Properties
org.apache.sqoop.connector.class = org.apache.sqoop.connector.kafka.KafkaConnector
org.apache.sqoop.connector.name = kafka-connector

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kafka;
import org.apache.sqoop.connector.kafka.configuration.LinkConfig;
import org.junit.Test;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestConfigValidator {
@Test
public void testValidURI() {
String[] URI = {
"broker1:9092",
"broker1:9092,broker2:9092",
"zk1:2181/kafka",
"zk1:2181,zk2:2181/kafka"
};
for (String uri: URI) {
LinkConfig.CSVURIValidator validator = new LinkConfig.CSVURIValidator();
validator.validate(uri);
assertTrue(validator.getStatus().canProceed());
}
}
@Test
public void testInvalidURI() {
String[] URI = {
"",
"broker",
"broker1:9092,broker"
};
for (String uri: URI) {
LinkConfig.CSVURIValidator validator = new LinkConfig.CSVURIValidator();
validator.validate(uri);
assertFalse(validator.getStatus().canProceed());
}
}
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.kafka;
import kafka.message.MessageAndMetadata;
import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration;
import org.apache.sqoop.common.test.kafka.TestUtil;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class TestKafkaLoader {
private static TestUtil testUtil = TestUtil.getInstance();
private static final int NUMBER_OF_ROWS = 1000;
private static KafkaLoader loader;
private static String TOPIC = "mytopic";
@BeforeClass
public static void setup() throws IOException {
testUtil.prepare();
List<String> topics = new ArrayList<String>(1);
topics.add(TOPIC);
testUtil.initTopicList(topics);
loader = new KafkaLoader();
}
@AfterClass
public static void tearDown() throws IOException {
testUtil.tearDown();
}
@Test
public void testLoader() throws Exception {
LoaderContext context = new LoaderContext(null, new DataReader() {
private long index = 0L;
@Override
public Object[] readArrayRecord() {
return null;
}
@Override
public String readTextRecord() {
if (index++ < NUMBER_OF_ROWS) {
return index + "," + (double)index + ",'" + index + "'";
} else {
return null;
}
}
@Override
public Object readContent() {
return null;
}
}, null);
LinkConfiguration linkConf = new LinkConfiguration();
ToJobConfiguration jobConf = new ToJobConfiguration();
linkConf.linkConfig.brokerList = testUtil.getKafkaServerUrl();
linkConf.linkConfig.zookeeperConnect = testUtil.getZkUrl();
jobConf.toJobConfig.topic = TOPIC;
loader.load(context, linkConf, jobConf);
for(int i=1;i<=NUMBER_OF_ROWS;i++) {
MessageAndMetadata<byte[],byte[]> fetchedMsg =
testUtil.getNextMessageFromConsumer(TOPIC);
Assert.assertEquals(i + "," + (double) i + "," + "'" + i + "'",
new String((byte[]) fetchedMsg.message(), "UTF-8"));
}
}
}

View File

@ -37,10 +37,11 @@ limitations under the License.
<module>connector-generic-jdbc</module>
<module>connector-hdfs</module>
<module>connector-kite</module>
<!-- Uncomment and finish connectors after sqoop framework will become stable
<module>connector-mysql-jdbc</module>
<module>connector-mysql-fastpath</module>
-->
<module>connector-kafka</module>
<!-- Uncomment and finish connectors after sqoop framework will become stable
<module>connector-mysql-jdbc</module>
<module>connector-mysql-fastpath</module>
-->
</modules>
</project>

23
pom.xml
View File

@ -116,6 +116,8 @@ limitations under the License.
<joda.version>2.4</joda.version>
<kitesdk.version>0.17.0</kitesdk.version>
<slf4j.version>1.6.1</slf4j.version>
<zookeeper.version>3.4.6</zookeeper.version>
<kafka.version>0.8.1.1</kafka.version>
</properties>
<dependencies>
@ -351,6 +353,17 @@ limitations under the License.
<type>test-jar</type>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-kafka</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-mysql-jdbc</artifactId>
@ -527,6 +540,16 @@ limitations under the License.
<version>${tomcat.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -84,6 +84,11 @@ limitations under the License.
<artifactId>sqoop-connector-kite</artifactId>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-kafka</artifactId>
</dependency>
<!--
<dependency>
<groupId>org.apache.sqoop.connector</groupId>

View File

@ -92,6 +92,7 @@ public JobRequestHandler() {
@Override
public JsonBean handleEvent(RequestContext ctx) {
LOG.info("Got job request");
switch (ctx.getMethod()) {
case GET:
if (STATUS.equals(ctx.getLastURLElement())) {

View File

@ -87,6 +87,11 @@ limitations under the License.
<artifactId>sqoop-connector-kite</artifactId>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.cargo</groupId>
<artifactId>cargo-core-container-tomcat</artifactId>

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.test.testcases;
import kafka.message.MessageAndMetadata;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.apache.sqoop.common.test.kafka.TestUtil;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
public class KafkaConnectorTestCase extends ConnectorTestCase {
private static TestUtil testUtil = TestUtil.getInstance();
private static final String TOPIC = "mytopic";
@BeforeClass
public static void startKafka() throws IOException {
// starts Kafka server and its dependent zookeeper
testUtil.prepare();
}
@AfterClass
public static void stopKafka() throws IOException {
testUtil.tearDown();
}
protected void fillKafkaLinkConfig(MLink link) {
MConfigList configs = link.getConnectorLinkConfig();
configs.getStringInput("linkConfig.brokerList").setValue(testUtil.getKafkaServerUrl());
configs.getStringInput("linkConfig.zookeeperConnect").setValue(testUtil.getZkUrl());
}
protected void fillKafkaToConfig(MJob job){
MConfigList toConfig = job.getJobConfig(Direction.TO);
toConfig.getStringInput("toJobConfig.topic").setValue(TOPIC);
List<String> topics = new ArrayList<String>(1);
topics.add(TOPIC);
testUtil.initTopicList(topics);
}
/**
* Compare strings in content to the messages in Kafka topic
* @param content
* @throws UnsupportedEncodingException
*/
protected void validateContent(String[] content) throws UnsupportedEncodingException {
for(String str: content) {
MessageAndMetadata<byte[],byte[]> fetchedMsg =
testUtil.getNextMessageFromConsumer(TOPIC);
Assert.assertEquals(str,
new String(fetchedMsg.message(), "UTF-8"));
}
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.connector.kafka;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.testcases.KafkaConnectorTestCase;
import org.junit.Test;
public class FromRDBMSToKafkaTest extends KafkaConnectorTestCase {
private static final String[] input = {
"1,'USA','San Francisco'",
"2,'USA','Sunnyvale'",
"3,'Czech Republic','Brno'",
"4,'USA','Palo Alto'"
};
@Test
public void testBasic() throws Exception {
createAndLoadTableCities();
// Kafka link
MLink kafkaLink = getClient().createLink("kafka-connector");
fillKafkaLinkConfig(kafkaLink);
saveLink(kafkaLink);
// RDBMS link
MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
fillRdbmsLinkConfig(rdbmsLink);
saveLink(rdbmsLink);
// Job creation
MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), kafkaLink.getPersistenceId());
// set rdbms "FROM" job config
MConfigList fromConfig = job.getJobConfig(Direction.FROM);
fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName()));
fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
// set Kafka "TO" job config
fillKafkaToConfig(job);
// driver config
MDriverConfig driverConfig = job.getDriverConfig();
driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3);
saveJob(job);
executeJob(job);
// this will assert the content of the array matches the content of the topic
validateContent(input);
}
}