5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-13 23:41:56 +08:00

SQOOP-2322: Sqoop2: Kafka topic should vary on a per method basis

(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2015-04-26 12:09:29 -07:00
parent 8f82051188
commit 1857086722
3 changed files with 12 additions and 9 deletions

View File

@ -18,14 +18,13 @@
package org.apache.sqoop.test.testcases; package org.apache.sqoop.test.testcases;
import kafka.message.MessageAndMetadata; import kafka.message.MessageAndMetadata;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.model.MConfigList; import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink; import org.apache.sqoop.model.MLink;
import org.testng.annotations.AfterClass;
import org.testng.Assert; import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.apache.sqoop.common.test.kafka.TestUtil; import org.apache.sqoop.common.test.kafka.TestUtil;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
@ -39,16 +38,16 @@
public class KafkaConnectorTestCase extends ConnectorTestCase { public class KafkaConnectorTestCase extends ConnectorTestCase {
private static TestUtil testUtil = TestUtil.getInstance(); private static TestUtil testUtil = TestUtil.getInstance();
private static final String TOPIC = "mytopic"; protected String topic;
@BeforeClass(alwaysRun = true) @BeforeClass(alwaysRun = true)
public static void startKafka() throws Exception { public void startKafka() throws Exception {
// starts Kafka server and its dependent zookeeper // starts Kafka server and its dependent zookeeper
testUtil.prepare(); testUtil.prepare();
} }
@AfterClass(alwaysRun = true) @AfterClass(alwaysRun = true)
public static void stopKafka() throws IOException { public void stopKafka() throws IOException {
testUtil.tearDown(); testUtil.tearDown();
} }
@ -61,9 +60,9 @@ protected void fillKafkaLinkConfig(MLink link) {
protected void fillKafkaToConfig(MJob job){ protected void fillKafkaToConfig(MJob job){
MConfigList toConfig = job.getToJobConfig(); MConfigList toConfig = job.getToJobConfig();
toConfig.getStringInput("toJobConfig.topic").setValue(TOPIC); toConfig.getStringInput("toJobConfig.topic").setValue(topic);
List<String> topics = new ArrayList<String>(1); List<String> topics = new ArrayList<String>(1);
topics.add(TOPIC); topics.add(topic);
testUtil.initTopicList(topics); testUtil.initTopicList(topics);
} }
@ -79,7 +78,7 @@ protected void validateContent(String[] content) throws UnsupportedEncodingExcep
for(String str: content) { for(String str: content) {
MessageAndMetadata<byte[],byte[]> fetchedMsg = MessageAndMetadata<byte[],byte[]> fetchedMsg =
testUtil.getNextMessageFromConsumer(TOPIC); testUtil.getNextMessageFromConsumer(topic);
outputSet.add(toText(new String(fetchedMsg.message(), "UTF-8"))); outputSet.add(toText(new String(fetchedMsg.message(), "UTF-8")));
} }

View File

@ -34,6 +34,8 @@ public class FromHDFSToKafkaTest extends KafkaConnectorTestCase {
}; };
@Test @Test
public void testBasic() throws Exception { public void testBasic() throws Exception {
topic = getTestName();
createFromFile("input-0001",input); createFromFile("input-0001",input);
// Create Kafka link // Create Kafka link

View File

@ -37,6 +37,8 @@ public class FromRDBMSToKafkaTest extends KafkaConnectorTestCase {
@Test @Test
public void testBasic() throws Exception { public void testBasic() throws Exception {
topic = getTestName();
createAndLoadTableCities(); createAndLoadTableCities();
// Kafka link // Kafka link