From 1857086722ef5900bc2ecea1475a009a043a6ba2 Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Sun, 26 Apr 2015 12:09:29 -0700 Subject: [PATCH] SQOOP-2322: Sqoop2: Kafka topic should vary on a per method basis (Abraham Elmahrek via Jarek Jarcec Cecho) --- .../test/testcases/KafkaConnectorTestCase.java | 17 ++++++++--------- .../connector/kafka/FromHDFSToKafkaTest.java | 2 ++ .../connector/kafka/FromRDBMSToKafkaTest.java | 2 ++ 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java index 9aa69ede..f15c07ec 100644 --- a/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java +++ b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java @@ -18,14 +18,13 @@ package org.apache.sqoop.test.testcases; import kafka.message.MessageAndMetadata; -import org.apache.sqoop.common.Direction; import org.apache.sqoop.model.MConfigList; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; -import org.testng.annotations.AfterClass; import org.testng.Assert; -import org.testng.annotations.BeforeClass; import org.apache.sqoop.common.test.kafka.TestUtil; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import java.io.IOException; import java.io.UnsupportedEncodingException; @@ -39,16 +38,16 @@ public class KafkaConnectorTestCase extends ConnectorTestCase { private static TestUtil testUtil = TestUtil.getInstance(); - private static final String TOPIC = "mytopic"; + protected String topic; @BeforeClass(alwaysRun = true) - public static void startKafka() throws Exception { + public void startKafka() throws Exception { // starts Kafka server and its dependent zookeeper testUtil.prepare(); } @AfterClass(alwaysRun = true) - public static void stopKafka() throws IOException { + public void stopKafka() throws IOException { testUtil.tearDown(); } @@ -61,9 +60,9 @@ protected void fillKafkaLinkConfig(MLink link) { protected void fillKafkaToConfig(MJob job){ MConfigList toConfig = job.getToJobConfig(); - toConfig.getStringInput("toJobConfig.topic").setValue(TOPIC); + toConfig.getStringInput("toJobConfig.topic").setValue(topic); List topics = new ArrayList(1); - topics.add(TOPIC); + topics.add(topic); testUtil.initTopicList(topics); } @@ -79,7 +78,7 @@ protected void validateContent(String[] content) throws UnsupportedEncodingExcep for(String str: content) { MessageAndMetadata fetchedMsg = - testUtil.getNextMessageFromConsumer(TOPIC); + testUtil.getNextMessageFromConsumer(topic); outputSet.add(toText(new String(fetchedMsg.message(), "UTF-8"))); } diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java index 88db2f29..9ec4e8f1 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java @@ -34,6 +34,8 @@ public class FromHDFSToKafkaTest extends KafkaConnectorTestCase { }; @Test public void testBasic() throws Exception { + topic = getTestName(); + createFromFile("input-0001",input); // Create Kafka link diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java index 92a52b8a..dc1a80f2 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java @@ -37,6 +37,8 @@ public class FromRDBMSToKafkaTest extends KafkaConnectorTestCase { @Test public void testBasic() throws Exception { + topic = getTestName(); + createAndLoadTableCities(); // Kafka link