5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-06 20:02:04 +08:00

SQOOP-2196: Sqoop2: Add Runner factory for Kafka

(Syed A. Hashmi via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2015-04-16 18:50:00 -07:00
parent 36711af547
commit 31b2a1ae45
7 changed files with 239 additions and 92 deletions

View File

@ -0,0 +1,107 @@
/**
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.
*/
package org.apache.sqoop.common.test.kafka;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.commons.io.FileUtils;
import org.apache.sqoop.common.test.utils.NetworkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
/**
* A local Kafka server for running unit tests.
* Reference: https://gist.github.com/fjavieralba/7930018/
*/
public class KafkaLocalRunner extends KafkaRunnerBase {
public KafkaServerStartable kafka;
public ZooKeeperLocal zookeeperServer;
private KafkaConfig kafkaConfig;
private int kafkaLocalPort = 9022;
private int zkLocalPort = 2188;
private static final Logger logger = LoggerFactory.getLogger(KafkaLocalRunner.class);
public KafkaLocalRunner() throws IOException,
InterruptedException{
kafkaLocalPort = NetworkUtils.findAvailablePort();
zkLocalPort = NetworkUtils.findAvailablePort();
logger.info("Starting kafka server with kafka port " + kafkaLocalPort +
" and zookeeper port " + zkLocalPort );
try {
//start local Zookeeper
zookeeperServer = new ZooKeeperLocal(zkLocalPort);
logger.info("ZooKeeper instance is successfully started on port " +
zkLocalPort);
Properties kafkaProperties = getKafkaProperties();
kafkaConfig = new KafkaConfig(kafkaProperties);
//start local kafka broker
kafka = new KafkaServerStartable(kafkaConfig);
logger.info("Kafka Server is successfully started on port " +
kafkaLocalPort);
} catch (Exception e) {
logger.error("Error starting the Kafka Server.", e);
}
}
Properties getKafkaProperties() {
Properties kafkaProps = new Properties();
kafkaProps.put("broker.id","0");
// Kafka expects strings for all properties and KafkaConfig will throw an exception otherwise
kafkaProps.put("port",Integer.toString(kafkaLocalPort));
kafkaProps.put("log.dirs","target/kafka-logs");
kafkaProps.put("num.partitions","1");
kafkaProps.put("zookeeper.connect",zookeeperServer.getConnectString());
return kafkaProps;
}
@Override
public void start() throws Exception {
kafka.startup();
}
@Override
public void stop() throws IOException {
kafka.shutdown();
zookeeperServer.stopZookeeper();
File dir = new File(kafkaConfig.logDirs().head()).getAbsoluteFile();
FileUtils.deleteDirectory(dir);
}
@Override
public String getZkConnectionString() {
return zookeeperServer.getConnectString();
}
@Override
public String getKafkaUrl() {
return "localhost:"+kafkaLocalPort;
}
}

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.common.test.kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* This class encapsulates a real cluster Kafka service and enables tests
* to run kafka tasks against real cluster
*/
public class KafkaRealRunner extends KafkaRunnerBase {
private static final Logger logger = LoggerFactory.getLogger(KafkaLocalRunner.class);
private String kafkaServerUrl;
private String zkConnectionString;
private final String KAFKA_SERVER_URL_PROPERTY = "sqoop.kafka.server.url";
private final String ZK_CONNECTION_STRING_PROPERTY = "sqoop.kafka.zookeeper.url";
public KafkaRealRunner() {
logger.info("Setting up kafka to point to real cluster");
kafkaServerUrl = System.getProperty(KAFKA_SERVER_URL_PROPERTY);
if(kafkaServerUrl == null) {
logger.error("To run against real cluster, sqoop.kafka.server.url must be provided");
throw new RuntimeException("To run against real cluster, sqoop.kafka.server.url must be provided");
}
logger.info("Kafka server url: " + kafkaServerUrl);
zkConnectionString = System.getProperty(
ZK_CONNECTION_STRING_PROPERTY);
if(zkConnectionString == null) {
logger.error("To run against real cluster, sqoop.kafka.zookeeper.url must be provided");
throw new RuntimeException("To run against real cluster, sqoop.kafka.zookeeper.url must be provided");
}
logger.info("Zookeeper server connection string: " + zkConnectionString);
}
@Override
public void start() throws Exception {
// nothing to be done for real server
}
@Override
public void stop() throws IOException {
// nothing to be done for real server
}
@Override
public String getZkConnectionString() {
return this.zkConnectionString;
}
@Override
public String getKafkaUrl() {
return this.kafkaServerUrl;
}
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.common.test.kafka;
import java.io.IOException;
/**
* This class provides basic methods which will be overriden by derived classes
* to allow using either local kafka service or a real cluster.
*/
public abstract class KafkaRunnerBase {
public abstract void start() throws Exception;
public abstract void stop() throws IOException;
public abstract String getZkConnectionString();
public abstract String getKafkaUrl();
}

View File

@ -18,40 +18,24 @@ Licensed to the Apache Software Foundation (ASF) under one or more
package org.apache.sqoop.common.test.kafka;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
/**
* A local Kafka server for running unit tests.
* Reference: https://gist.github.com/fjavieralba/7930018/
* This class encapsulates logic behind which Kafka class to instantiate
*/
public class KafkaLocal {
public class KafkaRunnerFactory {
public KafkaServerStartable kafka;
public ZooKeeperLocal zookeeper;
private KafkaConfig kafkaConfig;
private static final String KAFKA_CLASS_PROPERTY = "sqoop.kafka.runner.class";
public KafkaLocal(Properties kafkaProperties) throws IOException,
InterruptedException{
kafkaConfig = new KafkaConfig(kafkaProperties);
//start local kafka broker
kafka = new KafkaServerStartable(kafkaConfig);
public static KafkaRunnerBase getKafkaRunner() throws ClassNotFoundException, IllegalAccessException,
InstantiationException, InterruptedException, IOException {
String className = System.getProperty(KAFKA_CLASS_PROPERTY);
if(className == null) {
return new KafkaLocalRunner();
} else {
Class<?> klass = Class.forName(className);
return (KafkaRunnerBase) klass.newInstance();
}
}
public void start() throws Exception{
kafka.startup();
}
public void stop() throws IOException {
kafka.shutdown();
File dir = new File(kafkaConfig.logDirs().head()).getAbsoluteFile();
FileUtils.deleteDirectory(dir);
}
}
}

View File

@ -19,16 +19,11 @@ Licensed to the Apache Software Foundation (ASF) under one or more
package org.apache.sqoop.common.test.kafka;
import kafka.message.MessageAndMetadata;
import org.apache.sqoop.common.test.utils.NetworkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Properties;
import java.util.Random;
/**
* A utility class for starting/stopping Kafka Server.
@ -38,71 +33,29 @@ public class TestUtil {
private static final Logger logger = LoggerFactory.getLogger(TestUtil.class);
private static TestUtil instance = new TestUtil();
private Random randPortGen = new Random(System.currentTimeMillis());
private KafkaLocal kafkaServer;
private ZooKeeperLocal zookeeperServer;
private KafkaRunnerBase kafkaServer;
private KafkaConsumer kafkaConsumer;
private String hostname = "localhost";
private int kafkaLocalPort = 9022;
private int zkLocalPort = 2188;
private TestUtil() {
init();
}
private TestUtil() {}
public static TestUtil getInstance() {
return instance;
}
private void init() {
// get the localhost.
private boolean startKafkaServer() throws IOException, InterruptedException, ClassNotFoundException,
IllegalAccessException, InstantiationException {
kafkaServer = KafkaRunnerFactory.getKafkaRunner();
try {
hostname = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
logger.warn("Error getting the value of localhost. " +
"Proceeding with 'localhost'.", e);
}
}
private boolean startKafkaServer() throws IOException {
kafkaLocalPort = NetworkUtils.findAvailablePort();
zkLocalPort = NetworkUtils.findAvailablePort();
logger.info("Starting kafka server with kafka port " + kafkaLocalPort +
" and zookeeper port " + zkLocalPort );
try {
//start local Zookeeper
zookeeperServer = new ZooKeeperLocal(zkLocalPort);
logger.info("ZooKeeper instance is successfully started on port " +
zkLocalPort);
Properties kafkaProperties = getKafkaProperties();
kafkaServer = new KafkaLocal(kafkaProperties);
kafkaServer.start();
logger.info("Kafka Server is successfully started on port " +
kafkaLocalPort);
return true;
} catch (Exception e) {
logger.error("Error starting the Kafka Server.", e);
return false;
}
return true;
}
Properties getKafkaProperties() {
Properties kafkaProps = new Properties();
kafkaProps.put("broker.id","0");
// Kafka expects strings for all properties and KafkaConfig will throw an exception otherwise
kafkaProps.put("port",Integer.toString(kafkaLocalPort));
kafkaProps.put("log.dirs","target/kafka-logs");
kafkaProps.put("num.partitions","1");
kafkaProps.put("zookeeper.connect",zookeeperServer.getConnectString());
return kafkaProps;
}
private KafkaConsumer getKafkaConsumer() {
synchronized (this) {
@ -121,7 +74,7 @@ public MessageAndMetadata getNextMessageFromConsumer(String topic) {
return getKafkaConsumer().getNextMessage(topic);
}
public void prepare() throws IOException {
public void prepare() throws Exception {
boolean startStatus = startKafkaServer();
if (!startStatus) {
throw new RuntimeException("Error starting the server!");
@ -147,16 +100,14 @@ public void tearDown() throws IOException {
}
logger.info("Shutting down the kafka Server.");
kafkaServer.stop();
logger.info("Shutting down Zookeeper Server.");
zookeeperServer.stopZookeeper();
logger.info("Completed the tearDown phase.");
}
public String getZkUrl() {
return zookeeperServer.getConnectString();
return kafkaServer.getZkConnectionString();
}
public String getKafkaServerUrl() {
return "localhost:"+kafkaLocalPort;
return kafkaServer.getKafkaUrl();
}
}

View File

@ -41,7 +41,7 @@ public class TestKafkaLoader {
private static String TOPIC = "mytopic";
@BeforeClass(alwaysRun = true)
public static void setup() throws IOException {
public static void setup() throws Exception {
testUtil.prepare();
List<String> topics = new ArrayList<String>(1);
topics.add(TOPIC);

View File

@ -42,7 +42,7 @@ public class KafkaConnectorTestCase extends ConnectorTestCase {
private static final String TOPIC = "mytopic";
@BeforeClass(alwaysRun = true)
public static void startKafka() throws IOException {
public static void startKafka() throws Exception {
// starts Kafka server and its dependent zookeeper
testUtil.prepare();
}