diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocalRunner.java b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocalRunner.java new file mode 100644 index 00000000..f3268f17 --- /dev/null +++ b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocalRunner.java @@ -0,0 +1,107 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + limitations under the License. + */ + +package org.apache.sqoop.common.test.kafka; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import org.apache.commons.io.FileUtils; +import org.apache.sqoop.common.test.utils.NetworkUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +/** + * A local Kafka server for running unit tests. + * Reference: https://gist.github.com/fjavieralba/7930018/ + */ +public class KafkaLocalRunner extends KafkaRunnerBase { + + public KafkaServerStartable kafka; + public ZooKeeperLocal zookeeperServer; + private KafkaConfig kafkaConfig; + private int kafkaLocalPort = 9022; + private int zkLocalPort = 2188; + private static final Logger logger = LoggerFactory.getLogger(KafkaLocalRunner.class); + + public KafkaLocalRunner() throws IOException, + InterruptedException{ + kafkaLocalPort = NetworkUtils.findAvailablePort(); + zkLocalPort = NetworkUtils.findAvailablePort(); + + logger.info("Starting kafka server with kafka port " + kafkaLocalPort + + " and zookeeper port " + zkLocalPort ); + try { + //start local Zookeeper + zookeeperServer = new ZooKeeperLocal(zkLocalPort); + logger.info("ZooKeeper instance is successfully started on port " + + zkLocalPort); + + Properties kafkaProperties = getKafkaProperties(); + kafkaConfig = new KafkaConfig(kafkaProperties); + + //start local kafka broker + kafka = new KafkaServerStartable(kafkaConfig); + logger.info("Kafka Server is successfully started on port " + + kafkaLocalPort); + + } catch (Exception e) { + logger.error("Error starting the Kafka Server.", e); + } + + } + + Properties getKafkaProperties() { + Properties kafkaProps = new Properties(); + kafkaProps.put("broker.id","0"); + // Kafka expects strings for all properties and KafkaConfig will throw an exception otherwise + kafkaProps.put("port",Integer.toString(kafkaLocalPort)); + kafkaProps.put("log.dirs","target/kafka-logs"); + kafkaProps.put("num.partitions","1"); + kafkaProps.put("zookeeper.connect",zookeeperServer.getConnectString()); + + return kafkaProps; + } + + @Override + public void start() throws Exception { + kafka.startup(); + } + + @Override + public void stop() throws IOException { + kafka.shutdown(); + zookeeperServer.stopZookeeper(); + File dir = new File(kafkaConfig.logDirs().head()).getAbsoluteFile(); + FileUtils.deleteDirectory(dir); + } + + @Override + public String getZkConnectionString() { + return zookeeperServer.getConnectString(); + } + + @Override + public String getKafkaUrl() { + return "localhost:"+kafkaLocalPort; + } + +} \ No newline at end of file diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRealRunner.java b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRealRunner.java new file mode 100644 index 00000000..cc9c4fb3 --- /dev/null +++ b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRealRunner.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.common.test.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * This class encapsulates a real cluster Kafka service and enables tests + * to run kafka tasks against real cluster + */ + +public class KafkaRealRunner extends KafkaRunnerBase { + + private static final Logger logger = LoggerFactory.getLogger(KafkaLocalRunner.class); + private String kafkaServerUrl; + private String zkConnectionString; + private final String KAFKA_SERVER_URL_PROPERTY = "sqoop.kafka.server.url"; + private final String ZK_CONNECTION_STRING_PROPERTY = "sqoop.kafka.zookeeper.url"; + + public KafkaRealRunner() { + logger.info("Setting up kafka to point to real cluster"); + kafkaServerUrl = System.getProperty(KAFKA_SERVER_URL_PROPERTY); + if(kafkaServerUrl == null) { + logger.error("To run against real cluster, sqoop.kafka.server.url must be provided"); + throw new RuntimeException("To run against real cluster, sqoop.kafka.server.url must be provided"); + } + logger.info("Kafka server url: " + kafkaServerUrl); + + zkConnectionString = System.getProperty( + ZK_CONNECTION_STRING_PROPERTY); + if(zkConnectionString == null) { + logger.error("To run against real cluster, sqoop.kafka.zookeeper.url must be provided"); + throw new RuntimeException("To run against real cluster, sqoop.kafka.zookeeper.url must be provided"); + } + logger.info("Zookeeper server connection string: " + zkConnectionString); + } + @Override + public void start() throws Exception { + // nothing to be done for real server + } + + @Override + public void stop() throws IOException { + // nothing to be done for real server + } + + @Override + public String getZkConnectionString() { + return this.zkConnectionString; + } + + @Override + public String getKafkaUrl() { + return this.kafkaServerUrl; + } +} diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRunnerBase.java b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRunnerBase.java new file mode 100644 index 00000000..025faf22 --- /dev/null +++ b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRunnerBase.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.common.test.kafka; + +import java.io.IOException; + +/** + * This class provides basic methods which will be overriden by derived classes + * to allow using either local kafka service or a real cluster. + */ +public abstract class KafkaRunnerBase { + public abstract void start() throws Exception; + public abstract void stop() throws IOException; + public abstract String getZkConnectionString(); + public abstract String getKafkaUrl(); +} diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocal.java b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRunnerFactory.java similarity index 50% rename from common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocal.java rename to common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRunnerFactory.java index b90d14ef..b26ca743 100644 --- a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocal.java +++ b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRunnerFactory.java @@ -18,40 +18,24 @@ Licensed to the Apache Software Foundation (ASF) under one or more package org.apache.sqoop.common.test.kafka; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServerStartable; -import org.apache.commons.io.FileUtils; - -import java.io.File; import java.io.IOException; -import java.util.Properties; /** - * A local Kafka server for running unit tests. - * Reference: https://gist.github.com/fjavieralba/7930018/ + * This class encapsulates logic behind which Kafka class to instantiate */ -public class KafkaLocal { +public class KafkaRunnerFactory { - public KafkaServerStartable kafka; - public ZooKeeperLocal zookeeper; - private KafkaConfig kafkaConfig; + private static final String KAFKA_CLASS_PROPERTY = "sqoop.kafka.runner.class"; - public KafkaLocal(Properties kafkaProperties) throws IOException, - InterruptedException{ - kafkaConfig = new KafkaConfig(kafkaProperties); - - //start local kafka broker - kafka = new KafkaServerStartable(kafkaConfig); + public static KafkaRunnerBase getKafkaRunner() throws ClassNotFoundException, IllegalAccessException, + InstantiationException, InterruptedException, IOException { + String className = System.getProperty(KAFKA_CLASS_PROPERTY); + if(className == null) { + return new KafkaLocalRunner(); + } else { + Class klass = Class.forName(className); + return (KafkaRunnerBase) klass.newInstance(); + } } - public void start() throws Exception{ - kafka.startup(); - } - - public void stop() throws IOException { - kafka.shutdown(); - File dir = new File(kafkaConfig.logDirs().head()).getAbsoluteFile(); - FileUtils.deleteDirectory(dir); - } - -} \ No newline at end of file +} diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java index 34b8f1e0..09ddcc7a 100644 --- a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java +++ b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java @@ -19,16 +19,11 @@ Licensed to the Apache Software Foundation (ASF) under one or more package org.apache.sqoop.common.test.kafka; import kafka.message.MessageAndMetadata; -import org.apache.sqoop.common.test.utils.NetworkUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.List; -import java.util.Properties; -import java.util.Random; /** * A utility class for starting/stopping Kafka Server. @@ -38,71 +33,29 @@ public class TestUtil { private static final Logger logger = LoggerFactory.getLogger(TestUtil.class); private static TestUtil instance = new TestUtil(); - private Random randPortGen = new Random(System.currentTimeMillis()); - private KafkaLocal kafkaServer; - private ZooKeeperLocal zookeeperServer; + private KafkaRunnerBase kafkaServer; private KafkaConsumer kafkaConsumer; - private String hostname = "localhost"; - private int kafkaLocalPort = 9022; - private int zkLocalPort = 2188; - private TestUtil() { - init(); - } + private TestUtil() {} public static TestUtil getInstance() { return instance; } - private void init() { - // get the localhost. + private boolean startKafkaServer() throws IOException, InterruptedException, ClassNotFoundException, + IllegalAccessException, InstantiationException { + kafkaServer = KafkaRunnerFactory.getKafkaRunner(); + try { - hostname = InetAddress.getLocalHost().getHostName(); - - } catch (UnknownHostException e) { - logger.warn("Error getting the value of localhost. " + - "Proceeding with 'localhost'.", e); - } - } - - private boolean startKafkaServer() throws IOException { - kafkaLocalPort = NetworkUtils.findAvailablePort(); - zkLocalPort = NetworkUtils.findAvailablePort(); - - logger.info("Starting kafka server with kafka port " + kafkaLocalPort + - " and zookeeper port " + zkLocalPort ); - try { - //start local Zookeeper - zookeeperServer = new ZooKeeperLocal(zkLocalPort); - logger.info("ZooKeeper instance is successfully started on port " + - zkLocalPort); - - Properties kafkaProperties = getKafkaProperties(); - - kafkaServer = new KafkaLocal(kafkaProperties); kafkaServer.start(); - - logger.info("Kafka Server is successfully started on port " + - kafkaLocalPort); - return true; - } catch (Exception e) { logger.error("Error starting the Kafka Server.", e); return false; } + + return true; } - Properties getKafkaProperties() { - Properties kafkaProps = new Properties(); - kafkaProps.put("broker.id","0"); - // Kafka expects strings for all properties and KafkaConfig will throw an exception otherwise - kafkaProps.put("port",Integer.toString(kafkaLocalPort)); - kafkaProps.put("log.dirs","target/kafka-logs"); - kafkaProps.put("num.partitions","1"); - kafkaProps.put("zookeeper.connect",zookeeperServer.getConnectString()); - - return kafkaProps; - } private KafkaConsumer getKafkaConsumer() { synchronized (this) { @@ -121,7 +74,7 @@ public MessageAndMetadata getNextMessageFromConsumer(String topic) { return getKafkaConsumer().getNextMessage(topic); } - public void prepare() throws IOException { + public void prepare() throws Exception { boolean startStatus = startKafkaServer(); if (!startStatus) { throw new RuntimeException("Error starting the server!"); @@ -147,16 +100,14 @@ public void tearDown() throws IOException { } logger.info("Shutting down the kafka Server."); kafkaServer.stop(); - logger.info("Shutting down Zookeeper Server."); - zookeeperServer.stopZookeeper(); logger.info("Completed the tearDown phase."); } public String getZkUrl() { - return zookeeperServer.getConnectString(); + return kafkaServer.getZkConnectionString(); } public String getKafkaServerUrl() { - return "localhost:"+kafkaLocalPort; + return kafkaServer.getKafkaUrl(); } } diff --git a/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java index 46c10570..da2a7087 100644 --- a/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java +++ b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java @@ -41,7 +41,7 @@ public class TestKafkaLoader { private static String TOPIC = "mytopic"; @BeforeClass(alwaysRun = true) - public static void setup() throws IOException { + public static void setup() throws Exception { testUtil.prepare(); List topics = new ArrayList(1); topics.add(TOPIC); diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java index 802677c0..9aa69ede 100644 --- a/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java +++ b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java @@ -42,7 +42,7 @@ public class KafkaConnectorTestCase extends ConnectorTestCase { private static final String TOPIC = "mytopic"; @BeforeClass(alwaysRun = true) - public static void startKafka() throws IOException { + public static void startKafka() throws Exception { // starts Kafka server and its dependent zookeeper testUtil.prepare(); }