diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java index 9aa69ede..f15c07ec 100644 --- a/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java +++ b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java @@ -18,14 +18,13 @@ package org.apache.sqoop.test.testcases; import kafka.message.MessageAndMetadata; -import org.apache.sqoop.common.Direction; import org.apache.sqoop.model.MConfigList; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; -import org.testng.annotations.AfterClass; import org.testng.Assert; -import org.testng.annotations.BeforeClass; import org.apache.sqoop.common.test.kafka.TestUtil; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import java.io.IOException; import java.io.UnsupportedEncodingException; @@ -39,16 +38,16 @@ public class KafkaConnectorTestCase extends ConnectorTestCase { private static TestUtil testUtil = TestUtil.getInstance(); - private static final String TOPIC = "mytopic"; + protected String topic; @BeforeClass(alwaysRun = true) - public static void startKafka() throws Exception { + public void startKafka() throws Exception { // starts Kafka server and its dependent zookeeper testUtil.prepare(); } @AfterClass(alwaysRun = true) - public static void stopKafka() throws IOException { + public void stopKafka() throws IOException { testUtil.tearDown(); } @@ -61,9 +60,9 @@ protected void fillKafkaLinkConfig(MLink link) { protected void fillKafkaToConfig(MJob job){ MConfigList toConfig = job.getToJobConfig(); - toConfig.getStringInput("toJobConfig.topic").setValue(TOPIC); + toConfig.getStringInput("toJobConfig.topic").setValue(topic); List topics = new ArrayList(1); - topics.add(TOPIC); + topics.add(topic); testUtil.initTopicList(topics); } @@ -79,7 +78,7 @@ protected void validateContent(String[] content) throws UnsupportedEncodingExcep for(String str: content) { MessageAndMetadata fetchedMsg = - testUtil.getNextMessageFromConsumer(TOPIC); + testUtil.getNextMessageFromConsumer(topic); outputSet.add(toText(new String(fetchedMsg.message(), "UTF-8"))); } diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java index 88db2f29..9ec4e8f1 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java @@ -34,6 +34,8 @@ public class FromHDFSToKafkaTest extends KafkaConnectorTestCase { }; @Test public void testBasic() throws Exception { + topic = getTestName(); + createFromFile("input-0001",input); // Create Kafka link diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java index 92a52b8a..dc1a80f2 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java @@ -37,6 +37,8 @@ public class FromRDBMSToKafkaTest extends KafkaConnectorTestCase { @Test public void testBasic() throws Exception { + topic = getTestName(); + createAndLoadTableCities(); // Kafka link