From e2e5aa8bf531115f8ce4055184a3b8892ce6beb4 Mon Sep 17 00:00:00 2001 From: Abraham Elmahrek Date: Thu, 11 Dec 2014 17:53:48 -0600 Subject: [PATCH] SQOOP-1852: Sqoop2: Kafka connector supporting TO direction (Gwen Shapira via Abraham Elmahrek) --- common-test/pom.xml | 15 ++ .../common/test/kafka/KafkaConsumer.java | 98 +++++++++++ .../sqoop/common/test/kafka/KafkaLocal.java | 57 ++++++ .../sqoop/common/test/kafka/TestUtil.java | 162 ++++++++++++++++++ .../common/test/kafka/ZooKeeperLocal.java | 72 ++++++++ connector/connector-kafka/pom.xml | 35 ++++ .../sqoop/connector/kafka/KafkaConnector.java | 115 +++++++++++++ .../connector/kafka/KafkaConnectorErrors.java | 46 +++++ .../sqoop/connector/kafka/KafkaConstants.java | 42 +++++ .../sqoop/connector/kafka/KafkaLoader.java | 106 ++++++++++++ .../connector/kafka/KafkaToDestroyer.java | 35 ++++ .../connector/kafka/KafkaToInitializer.java | 51 ++++++ .../kafka/configuration/LinkConfig.java | 49 ++++++ .../configuration/LinkConfiguration.java | 31 ++++ .../kafka/configuration/ToJobConfig.java | 28 +++ .../configuration/ToJobConfiguration.java | 31 ++++ .../kafka-connector-config.properties | 38 ++++ .../main/resources/sqoopconnector.properties | 18 ++ .../connector/kafka/TestConfigValidator.java | 59 +++++++ .../connector/kafka/TestKafkaLoader.java | 96 +++++++++++ connector/pom.xml | 9 +- pom.xml | 23 +++ server/pom.xml | 5 + .../sqoop/handler/JobRequestHandler.java | 1 + test/pom.xml | 5 + .../testcases/KafkaConnectorTestCase.java | 78 +++++++++ .../connector/kafka/FromRDBMSToKafkaTest.java | 75 ++++++++ 27 files changed, 1376 insertions(+), 4 deletions(-) create mode 100644 common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaConsumer.java create mode 100644 common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocal.java create mode 100644 common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java create mode 100644 common-test/src/main/java/org/apache/sqoop/common/test/kafka/ZooKeeperLocal.java create mode 100644 connector/connector-kafka/pom.xml create mode 100644 connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnector.java create mode 100644 connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnectorErrors.java create mode 100644 connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConstants.java create mode 100644 connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java create mode 100644 connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToDestroyer.java create mode 100644 connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java create mode 100644 connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfig.java create mode 100644 connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfiguration.java create mode 100644 connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfig.java create mode 100644 connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfiguration.java create mode 100644 connector/connector-kafka/src/main/resources/kafka-connector-config.properties create mode 100644 connector/connector-kafka/src/main/resources/sqoopconnector.properties create mode 100644 connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestConfigValidator.java create mode 100644 connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java create mode 100644 test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java create mode 100644 test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java diff --git a/common-test/pom.xml b/common-test/pom.xml index 9fd671c6..609a8754 100644 --- a/common-test/pom.xml +++ b/common-test/pom.xml @@ -60,6 +60,21 @@ limitations under the License. postgresql + + org.apache.kafka + kafka_2.10 + + + + org.apache.zookeeper + zookeeper + + + + commons-io + commons-io + + diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaConsumer.java b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaConsumer.java new file mode 100644 index 00000000..78d651b3 --- /dev/null +++ b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaConsumer.java @@ -0,0 +1,98 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + limitations under the License. + */ + +package org.apache.sqoop.common.test.kafka; + +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.ConsumerTimeoutException; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.MessageAndMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * A Kafka Consumer implementation. This uses the current thread to fetch the + * next message from the queue and doesn't use a multi threaded implementation. + * So this implements a synchronous blocking call. + * To avoid infinite waiting, a timeout is implemented to wait only for + * 1 second before concluding that the message will not be available. + */ +public class KafkaConsumer { + + private static final Logger logger = LoggerFactory.getLogger( + KafkaConsumer.class); + + private final ConsumerConnector consumer; + Map>> consumerMap; + + public KafkaConsumer() { + consumer = kafka.consumer.Consumer.createJavaConsumerConnector( + createConsumerConfig(TestUtil.getInstance().getZkUrl(), "group_1")); + } + + private static ConsumerConfig createConsumerConfig(String zkUrl, + String groupId) { + Properties props = new Properties(); + props.put("zookeeper.connect", zkUrl); + props.put("group.id", groupId); + props.put("zookeeper.session.timeout.ms", "1000"); + props.put("zookeeper.sync.time.ms", "200"); + props.put("auto.commit.interval.ms", "1000"); + props.put("auto.offset.reset", "smallest"); + props.put("consumer.timeout.ms","1000"); + return new ConsumerConfig(props); + } + + public void initTopicList(List topics) { + Map topicCountMap = new HashMap(); + for (String topic : topics) { + // we need only single threaded consumers + topicCountMap.put(topic, new Integer(1)); + } + consumerMap = consumer.createMessageStreams(topicCountMap); + } + + public MessageAndMetadata getNextMessage(String topic) { + List> streams = consumerMap.get(topic); + // it has only a single stream, because there is only one consumer + KafkaStream stream = streams.get(0); + final ConsumerIterator it = stream.iterator(); + int counter = 0; + try { + if (it.hasNext()) { + return it.next(); + } else { + return null; + } + } catch (ConsumerTimeoutException e) { + logger.error("0 messages available to fetch for the topic " + topic); + return null; + } + } + + public void shutdown() { + consumer.shutdown(); + } +} diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocal.java b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocal.java new file mode 100644 index 00000000..b90d14ef --- /dev/null +++ b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocal.java @@ -0,0 +1,57 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + limitations under the License. + */ + +package org.apache.sqoop.common.test.kafka; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import org.apache.commons.io.FileUtils; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +/** + * A local Kafka server for running unit tests. + * Reference: https://gist.github.com/fjavieralba/7930018/ + */ +public class KafkaLocal { + + public KafkaServerStartable kafka; + public ZooKeeperLocal zookeeper; + private KafkaConfig kafkaConfig; + + public KafkaLocal(Properties kafkaProperties) throws IOException, + InterruptedException{ + kafkaConfig = new KafkaConfig(kafkaProperties); + + //start local kafka broker + kafka = new KafkaServerStartable(kafkaConfig); + } + + public void start() throws Exception{ + kafka.startup(); + } + + public void stop() throws IOException { + kafka.shutdown(); + File dir = new File(kafkaConfig.logDirs().head()).getAbsoluteFile(); + FileUtils.deleteDirectory(dir); + } + +} \ No newline at end of file diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java new file mode 100644 index 00000000..34b8f1e0 --- /dev/null +++ b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java @@ -0,0 +1,162 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + limitations under the License. + */ + +package org.apache.sqoop.common.test.kafka; + +import kafka.message.MessageAndMetadata; +import org.apache.sqoop.common.test.utils.NetworkUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.List; +import java.util.Properties; +import java.util.Random; + +/** + * A utility class for starting/stopping Kafka Server. + */ +public class TestUtil { + + private static final Logger logger = LoggerFactory.getLogger(TestUtil.class); + private static TestUtil instance = new TestUtil(); + + private Random randPortGen = new Random(System.currentTimeMillis()); + private KafkaLocal kafkaServer; + private ZooKeeperLocal zookeeperServer; + private KafkaConsumer kafkaConsumer; + private String hostname = "localhost"; + private int kafkaLocalPort = 9022; + private int zkLocalPort = 2188; + + private TestUtil() { + init(); + } + + public static TestUtil getInstance() { + return instance; + } + + private void init() { + // get the localhost. + try { + hostname = InetAddress.getLocalHost().getHostName(); + + } catch (UnknownHostException e) { + logger.warn("Error getting the value of localhost. " + + "Proceeding with 'localhost'.", e); + } + } + + private boolean startKafkaServer() throws IOException { + kafkaLocalPort = NetworkUtils.findAvailablePort(); + zkLocalPort = NetworkUtils.findAvailablePort(); + + logger.info("Starting kafka server with kafka port " + kafkaLocalPort + + " and zookeeper port " + zkLocalPort ); + try { + //start local Zookeeper + zookeeperServer = new ZooKeeperLocal(zkLocalPort); + logger.info("ZooKeeper instance is successfully started on port " + + zkLocalPort); + + Properties kafkaProperties = getKafkaProperties(); + + kafkaServer = new KafkaLocal(kafkaProperties); + kafkaServer.start(); + + logger.info("Kafka Server is successfully started on port " + + kafkaLocalPort); + return true; + + } catch (Exception e) { + logger.error("Error starting the Kafka Server.", e); + return false; + } + } + + Properties getKafkaProperties() { + Properties kafkaProps = new Properties(); + kafkaProps.put("broker.id","0"); + // Kafka expects strings for all properties and KafkaConfig will throw an exception otherwise + kafkaProps.put("port",Integer.toString(kafkaLocalPort)); + kafkaProps.put("log.dirs","target/kafka-logs"); + kafkaProps.put("num.partitions","1"); + kafkaProps.put("zookeeper.connect",zookeeperServer.getConnectString()); + + return kafkaProps; + } + + private KafkaConsumer getKafkaConsumer() { + synchronized (this) { + if (kafkaConsumer == null) { + kafkaConsumer = new KafkaConsumer(); + } + } + return kafkaConsumer; + } + + public void initTopicList(List topics) { + getKafkaConsumer().initTopicList(topics); + } + + public MessageAndMetadata getNextMessageFromConsumer(String topic) { + return getKafkaConsumer().getNextMessage(topic); + } + + public void prepare() throws IOException { + boolean startStatus = startKafkaServer(); + if (!startStatus) { + throw new RuntimeException("Error starting the server!"); + } + try { + Thread.sleep(3 * 1000); // add this sleep time to + // ensure that the server is fully started before proceeding with tests. + } catch (InterruptedException e) { + // ignore + } + getKafkaConsumer(); + logger.info("Completed the prepare phase."); + } + + public void tearDown() throws IOException { + logger.info("Shutting down the Kafka Consumer."); + getKafkaConsumer().shutdown(); + try { + Thread.sleep(3 * 1000); // add this sleep time to + // ensure that the server is fully started before proceeding with tests. + } catch (InterruptedException e) { + // ignore + } + logger.info("Shutting down the kafka Server."); + kafkaServer.stop(); + logger.info("Shutting down Zookeeper Server."); + zookeeperServer.stopZookeeper(); + logger.info("Completed the tearDown phase."); + } + + public String getZkUrl() { + return zookeeperServer.getConnectString(); + } + + public String getKafkaServerUrl() { + return "localhost:"+kafkaLocalPort; + } +} diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/ZooKeeperLocal.java b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/ZooKeeperLocal.java new file mode 100644 index 00000000..27660bf3 --- /dev/null +++ b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/ZooKeeperLocal.java @@ -0,0 +1,72 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + limitations under the License. + */ + +package org.apache.sqoop.common.test.kafka; + +import org.apache.commons.io.FileUtils; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Properties; + +public class ZooKeeperLocal { + private int zkPort; + private ZooKeeperServer zookeeper; + private NIOServerCnxnFactory factory; + File dir; + + + public ZooKeeperLocal(int zkPort){ + int numConnections = 5000; + int tickTime = 2000; + + this.zkPort = zkPort; + + String dataDirectory = "target"; + dir = new File(dataDirectory, "zookeeper").getAbsoluteFile(); + + try { + this.zookeeper = new ZooKeeperServer(dir,dir,tickTime); + this.factory = new NIOServerCnxnFactory(); + factory.configure(new InetSocketAddress("127.0.0.1",zkPort),0); + factory.startup(zookeeper); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public void stopZookeeper() throws IOException { + zookeeper.shutdown(); + factory.shutdown(); + FileUtils.deleteDirectory(dir); + } + + public String getConnectString() { + return "127.0.0.1:"+zkPort; + } +} diff --git a/connector/connector-kafka/pom.xml b/connector/connector-kafka/pom.xml new file mode 100644 index 00000000..8786bffd --- /dev/null +++ b/connector/connector-kafka/pom.xml @@ -0,0 +1,35 @@ + + + + connector + org.apache.sqoop + 2.0.0-SNAPSHOT + + 4.0.0 + + org.apache.sqoop.connector + sqoop-connector-kafka + Sqoop Kafka Connector + + + + org.apache.sqoop + sqoop-spi + + + org.apache.sqoop + connector-sdk + + + org.apache.kafka + kafka_2.10 + + + org.apache.sqoop + sqoop-common-test + + + + \ No newline at end of file diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnector.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnector.java new file mode 100644 index 00000000..84b4be8a --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnector.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.kafka; + +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.common.VersionInfo; +import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration; +import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; +import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.job.etl.From; +import org.apache.sqoop.job.etl.To; + + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.ResourceBundle; + +public class KafkaConnector extends SqoopConnector { + + private static final To TO = new To( + KafkaToInitializer.class, + KafkaLoader.class, + KafkaToDestroyer.class); + + /** + * Retrieve connector version. + * + * @return Version encoded as a string + */ + @Override + public String getVersion() { + return VersionInfo.getBuildVersion(); + } + + /** + * @param locale + * @return the resource bundle associated with the given locale. + */ + @Override + public ResourceBundle getBundle(Locale locale) { + return ResourceBundle.getBundle(KafkaConstants.RESOURCE_BUNDLE_NAME, locale); + } + + /** + * @return Get link configuration group class + */ + @Override + public Class getLinkConfigurationClass() { + return LinkConfiguration.class; + } + + /** + * @param direction + * @return Get job configuration group class per direction type or null if + * not supported + */ + @Override + public Class getJobConfigurationClass(Direction direction) { + return ToJobConfiguration.class; + } + + @Override + public List getSupportedDirections() { + // TODO: Remove when we add the FROM part of the connector (SQOOP-1583) + return Arrays.asList(Direction.TO); + } + + /** + * @return an From that provides classes for performing import. + */ + @Override + public From getFrom() { + //TODO: SQOOP-1583 + return null; + } + + /** + * @return an To that provides classes for performing export.n + */ + @Override + public To getTo() { + return TO; + } + + /** + * Returns an {@linkplain org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader} object that can upgrade the + * configs related to the link and job + * + * @return RespositoryUpgrader object + */ + @Override + public ConnectorConfigurableUpgrader getConfigurableUpgrader() { + // Nothing to upgrade at this point + return null; + } + +} diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnectorErrors.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnectorErrors.java new file mode 100644 index 00000000..f94efea6 --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnectorErrors.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka; + +import org.apache.sqoop.common.ErrorCode; + +public enum KafkaConnectorErrors implements ErrorCode { + + KAFKA_CONNECTOR_0000("Unknown error occurred."), + KAFKA_CONNECTOR_0001("Error occurred while sending data to Kafka") + ; + + private final String message; + + private KafkaConnectorErrors(String message) { + this.message = message; + } + + @Override + public String getCode() { + return name(); + } + + /** + * @return the message associated with error code. + */ + @Override + public String getMessage() { + return message; + } +} diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConstants.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConstants.java new file mode 100644 index 00000000..9d3877db --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConstants.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka; + +import org.apache.sqoop.job.Constants; + +public class KafkaConstants extends Constants { + // Resource bundle name + public static final String RESOURCE_BUNDLE_NAME = "kafka-connector-config"; + + // Kafka properties keys + public static final String MESSAGE_SERIALIZER_KEY = "serializer.class"; + public static final String KEY_SERIALIZER_KEY = "key.serializer.class"; + public static final String BROKER_LIST_KEY = "metadata.broker.list"; + public static final String REQUIRED_ACKS_KEY = "request.required.acks"; + public static final String PRODUCER_TYPE = "producer.type"; + + // Kafka properties default values + public static final String DEFAULT_MESSAGE_SERIALIZER = + "kafka.serializer.StringEncoder"; + public static final String DEFAULT_KEY_SERIALIZER = + "kafka.serializer.StringEncoder"; + public static final String DEFAULT_REQUIRED_ACKS = "-1"; + public static final String DEFAULT_PRODUCER_TYPE = "sync"; + public static final int DEFAULT_BATCH_SIZE = 100; + +} diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java new file mode 100644 index 00000000..5d795164 --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka; + +import kafka.producer.KeyedMessage; +import kafka.javaapi.producer.Producer; +import kafka.producer.ProducerConfig; +import org.apache.log4j.Logger; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration; +import org.apache.sqoop.job.etl.Loader; +import org.apache.sqoop.job.etl.LoaderContext; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +public class KafkaLoader extends Loader { + private static final Logger LOG = Logger.getLogger(KafkaLoader.class); + + private List> messageList = + new ArrayList>(KafkaConstants.DEFAULT_BATCH_SIZE); + private Producer producer; + + @Override + public void load(LoaderContext context,LinkConfiguration linkConfiguration, ToJobConfiguration jobConfiguration) throws + Exception { + producer = getProducer(linkConfiguration); + LOG.info("got producer"); + String topic = jobConfiguration.toJobConfig.topic; + LOG.info("topic is:"+topic); + String batchUUID = UUID.randomUUID().toString(); + String record; + + while ((record = context.getDataReader().readTextRecord()) != null) { + // create a message and add to buffer + KeyedMessage data = new KeyedMessage + (topic, null, batchUUID, record); + messageList.add(data); + // If we have enough messages, send the batch to Kafka + if (messageList.size() >= KafkaConstants.DEFAULT_BATCH_SIZE) { + sendToKafka(messageList); + } + } + + if (messageList.size() > 0) { + sendToKafka(messageList); + } + + producer.close(); + } + + private void sendToKafka(List> messageList) { + try { + producer.send(messageList); + messageList.clear(); + } catch (Exception ex) { + throw new SqoopException(KafkaConnectorErrors.KAFKA_CONNECTOR_0001); + } + } + + /** + * Initialize a Kafka producer using configs in LinkConfiguration + * @param linkConfiguration + * @return + */ + Producer getProducer(LinkConfiguration linkConfiguration) { + Properties kafkaProps = generateDefaultKafkaProps(); + kafkaProps.put(KafkaConstants.BROKER_LIST_KEY, linkConfiguration.linkConfig.brokerList); + ProducerConfig config = new ProducerConfig(kafkaProps); + return new Producer(config); + } + + /** + * Generate producer properties object with some defaults + * @return + */ + private Properties generateDefaultKafkaProps() { + Properties props = new Properties(); + props.put(KafkaConstants.MESSAGE_SERIALIZER_KEY, + KafkaConstants.DEFAULT_MESSAGE_SERIALIZER); + props.put(KafkaConstants.KEY_SERIALIZER_KEY, + KafkaConstants.DEFAULT_KEY_SERIALIZER); + props.put(KafkaConstants.REQUIRED_ACKS_KEY, + KafkaConstants.DEFAULT_REQUIRED_ACKS); + props.put(KafkaConstants.PRODUCER_TYPE,KafkaConstants.DEFAULT_PRODUCER_TYPE); + return props; + } +} diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToDestroyer.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToDestroyer.java new file mode 100644 index 00000000..c522d917 --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToDestroyer.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka; + +import org.apache.log4j.Logger; +import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.DestroyerContext; + +public class KafkaToDestroyer extends Destroyer { + + private static final Logger LOG = Logger.getLogger(KafkaToDestroyer.class); + + + public void destroy(DestroyerContext context, LinkConfiguration linkConfiguration, ToJobConfiguration jobConfiguration) { + LOG.info("Running Kafka Connector destroyer. This does nothing except log this message."); + + } +} diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java new file mode 100644 index 00000000..e1b065ab --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka; + +import org.apache.log4j.Logger; +import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.sqoop.utils.ClassUtils; + +import java.util.List; + +public class KafkaToInitializer extends Initializer { + + private static final Logger LOG = Logger.getLogger(KafkaToInitializer.class); + + @Override + public void initialize(InitializerContext context,LinkConfiguration linkConfiguration, ToJobConfiguration jobConfiguration) { + LOG.info("Running Kafka Connector initializer. This does nothing except log this message."); + } + + + @Override + public List getJars(InitializerContext context, LinkConfiguration + linkConfiguration, ToJobConfiguration toJobConfiguration) { + List jars = super.getJars(context, linkConfiguration, toJobConfiguration); + // Jars for Kafka, Scala and Yammer (required by Kafka) + jars.add(ClassUtils.jarForClass("kafka.javaapi.producer.Producer")); + jars.add(ClassUtils.jarForClass("scala.collection.immutable.StringLike")); + jars.add(ClassUtils.jarForClass("com.yammer.metrics.Metrics")); + return jars; + } + + +} diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfig.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfig.java new file mode 100644 index 00000000..98112e7e --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfig.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka.configuration; + +import org.apache.sqoop.model.ConfigClass; +import org.apache.sqoop.model.Input; +import org.apache.sqoop.model.Validator; +import org.apache.sqoop.validation.Status; +import org.apache.sqoop.validation.validators.AbstractValidator; + +@ConfigClass +public class LinkConfig { + @Input(size=1024, validators = { @Validator(CSVURIValidator.class) }) public String brokerList; + @Input(size=255, validators = { @Validator(CSVURIValidator.class) }) public String zookeeperConnect; + + public static class CSVURIValidator extends AbstractValidator { + + // validate that given string is a comma-separated list of host:port + @Override + public void validate(String str) { + if(str == null || str !="") { + String[] pairs = str.split("\\s*,\\s*"); + for (String pair: pairs) { + String[] parts = pair.split("\\s*:\\s*"); + if (parts.length == 1) { + addMessage(Status.ERROR,"can't parse into host:port pairs"); + } + } + } else { + addMessage(Status.ERROR, "Can't be null nor empty"); + } + } + } +} diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfiguration.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfiguration.java new file mode 100644 index 00000000..34713075 --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfiguration.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka.configuration; + +import org.apache.sqoop.model.Config; +import org.apache.sqoop.model.ConfigurationClass; + +@ConfigurationClass +public class LinkConfiguration { + @Config + public LinkConfig linkConfig; + + public LinkConfiguration() { + linkConfig = new LinkConfig(); + } +} diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfig.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfig.java new file mode 100644 index 00000000..04387d92 --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfig.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka.configuration; + +import org.apache.sqoop.model.ConfigClass; +import org.apache.sqoop.model.Input; +import org.apache.sqoop.model.Validator; +import org.apache.sqoop.validation.validators.NotEmpty; + +@ConfigClass +public class ToJobConfig { + @Input(size=255, validators = { @Validator(NotEmpty.class) }) public String topic; +} diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfiguration.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfiguration.java new file mode 100644 index 00000000..d294b273 --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfiguration.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka.configuration; + +import org.apache.sqoop.model.Config; +import org.apache.sqoop.model.ConfigurationClass; + +@ConfigurationClass +public class ToJobConfiguration { + @Config + public ToJobConfig toJobConfig; + + public ToJobConfiguration() { + toJobConfig = new ToJobConfig(); + } +} diff --git a/connector/connector-kafka/src/main/resources/kafka-connector-config.properties b/connector/connector-kafka/src/main/resources/kafka-connector-config.properties new file mode 100644 index 00000000..0d6fca35 --- /dev/null +++ b/connector/connector-kafka/src/main/resources/kafka-connector-config.properties @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Generic Kafka Connector Resources + +############################ + +# Link Config +linkConfig.label = Link configuration +linkConfig.help = Here you supply information necessary to connect to Kafka + +linkConfig.brokerList.label = List of Kafka brokers +linkConfig.brokerList.help = Comma-separated list of Kafka brokers in the form of host:port. \ + It doesn't need to contain all brokers, but at least two are recommended for high availability + +linkConfig.zookeeperConnect.label = Zookeeper address +linkConfig.zookeeperConnect.help = Address of Zookeeper used by the Kafka cluster. Usually host:port. \ + Multiple zookeeper nodes are supported. If Kafka is stored in its own znode \ + use host:port\kafka +# To Job Config +# +toJobConfig.label = ToJob configuration +toJobConfig.help = Configuration necessary when writing data to Kafka + +toJobConfig.topic.label = Kafka topic +toJobConfig.topic.help = Name of Kafka topic where we'll send the data diff --git a/connector/connector-kafka/src/main/resources/sqoopconnector.properties b/connector/connector-kafka/src/main/resources/sqoopconnector.properties new file mode 100644 index 00000000..90d5dd65 --- /dev/null +++ b/connector/connector-kafka/src/main/resources/sqoopconnector.properties @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Generic Kafka Connector Properties +org.apache.sqoop.connector.class = org.apache.sqoop.connector.kafka.KafkaConnector +org.apache.sqoop.connector.name = kafka-connector \ No newline at end of file diff --git a/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestConfigValidator.java b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestConfigValidator.java new file mode 100644 index 00000000..b61d9790 --- /dev/null +++ b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestConfigValidator.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka; + +import org.apache.sqoop.connector.kafka.configuration.LinkConfig; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestConfigValidator { + @Test + public void testValidURI() { + String[] URI = { + "broker1:9092", + "broker1:9092,broker2:9092", + "zk1:2181/kafka", + "zk1:2181,zk2:2181/kafka" + }; + + for (String uri: URI) { + LinkConfig.CSVURIValidator validator = new LinkConfig.CSVURIValidator(); + validator.validate(uri); + assertTrue(validator.getStatus().canProceed()); + } + } + + @Test + public void testInvalidURI() { + String[] URI = { + "", + "broker", + "broker1:9092,broker" + }; + for (String uri: URI) { + LinkConfig.CSVURIValidator validator = new LinkConfig.CSVURIValidator(); + validator.validate(uri); + assertFalse(validator.getStatus().canProceed()); + } + + + } + +} diff --git a/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java new file mode 100644 index 00000000..f896e9e6 --- /dev/null +++ b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka; + +import kafka.message.MessageAndMetadata; +import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration; +import org.apache.sqoop.common.test.kafka.TestUtil; +import org.apache.sqoop.etl.io.DataReader; +import org.apache.sqoop.job.etl.LoaderContext; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class TestKafkaLoader { + + private static TestUtil testUtil = TestUtil.getInstance(); + private static final int NUMBER_OF_ROWS = 1000; + private static KafkaLoader loader; + private static String TOPIC = "mytopic"; + + @BeforeClass + public static void setup() throws IOException { + testUtil.prepare(); + List topics = new ArrayList(1); + topics.add(TOPIC); + testUtil.initTopicList(topics); + loader = new KafkaLoader(); + } + + @AfterClass + public static void tearDown() throws IOException { + testUtil.tearDown(); + } + + @Test + public void testLoader() throws Exception { + LoaderContext context = new LoaderContext(null, new DataReader() { + private long index = 0L; + + @Override + public Object[] readArrayRecord() { + return null; + } + + @Override + public String readTextRecord() { + if (index++ < NUMBER_OF_ROWS) { + return index + "," + (double)index + ",'" + index + "'"; + } else { + return null; + } + } + + @Override + public Object readContent() { + return null; + } + }, null); + LinkConfiguration linkConf = new LinkConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + linkConf.linkConfig.brokerList = testUtil.getKafkaServerUrl(); + linkConf.linkConfig.zookeeperConnect = testUtil.getZkUrl(); + jobConf.toJobConfig.topic = TOPIC; + + loader.load(context, linkConf, jobConf); + + for(int i=1;i<=NUMBER_OF_ROWS;i++) { + MessageAndMetadata fetchedMsg = + testUtil.getNextMessageFromConsumer(TOPIC); + Assert.assertEquals(i + "," + (double) i + "," + "'" + i + "'", + new String((byte[]) fetchedMsg.message(), "UTF-8")); + } + } + +} diff --git a/connector/pom.xml b/connector/pom.xml index da4ed3e7..dfa7e884 100644 --- a/connector/pom.xml +++ b/connector/pom.xml @@ -37,10 +37,11 @@ limitations under the License. connector-generic-jdbc connector-hdfs connector-kite - + connector-kafka + diff --git a/pom.xml b/pom.xml index e1821761..efb96591 100644 --- a/pom.xml +++ b/pom.xml @@ -116,6 +116,8 @@ limitations under the License. 2.4 0.17.0 1.6.1 + 3.4.6 + 0.8.1.1 @@ -351,6 +353,17 @@ limitations under the License. test-jar ${project.version} + + org.apache.sqoop.connector + sqoop-connector-kafka + ${project.version} + + + org.apache.sqoop.connector + sqoop-connector-kafka + ${project.version} + test-jar + org.apache.sqoop.connector sqoop-connector-mysql-jdbc @@ -527,6 +540,16 @@ limitations under the License. ${tomcat.version} provided + + org.apache.zookeeper + zookeeper + ${zookeeper.version} + + + org.apache.kafka + kafka_2.10 + ${kafka.version} + diff --git a/server/pom.xml b/server/pom.xml index 1adcca04..77477eee 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -84,6 +84,11 @@ limitations under the License. sqoop-connector-kite + + org.apache.sqoop.connector + sqoop-connector-kafka + +