diff --git a/kafkawriter/doc/kafkawriter.md b/kafkawriter/doc/kafkawriter.md new file mode 100644 index 00000000..b38037fa --- /dev/null +++ b/kafkawriter/doc/kafkawriter.md @@ -0,0 +1,109 @@ +# DataX KafkaWriter + +## 1 Introduction +KafkaWriter is a plugin for DataX to write data to Kafka Queue from dataX records. +The plugin is based on [Apache Kafka driver](https://github.com/apache/kafka) +to produce message to queue.. + + +## 2 Detailed Implementation +KafkaWriter would use DataX to fetch records generated by DataX Reader, and then during each write batch, kafka writer +iterate each record and parse it into JSON string with types supported and produce that message to Kafka. + + +## 3 Features +### 3.1 Example Configurations +* the following configuration would read some generated data in memory and produce data into Kafka topic. + +#### Preparation + +#### Configurations +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "cassandrareader", + "parameter": { + "host": "", + "port": 9042, + "username": "", + "password": "", + "consitancyLevel": "ONE", + "useSSL": false, + "keyspace": "", + "table": "", + "column": [ + "col1", + "col2", + "col3" + ] + } + }, + "writer": { + "name": "kafkawriter", + "parameter": { + "isSilence": true, + "topic": "", + "brokers": "", + "column": [ + "col1", + "col2", + "col3" + ] + } + } + } + ], + "setting": { + "speed": { + "channel": "10" + } + } + } +} +``` + +### 3.2 Configuration Description +* isSilence + * Description: + * Required: yes + * Default: True +* topic + * Description: desired topic kafka + * Required: yes + * Default: "" + * Example: "DEV.MY-TOPIC" +* brokers + * Description: 1 or more brokers kafka address, seperate by "," + * Required: yes + * Default: "" + * Example: "addr1,addr2" +* column + * Description: A list of keys in Kafka JSON message. + * Required: yes + * Default: [""] + * Example: ["key1", "key2", "key3"] + +### 3.3 Type Convert +Data types in datax can be converted to the corresponding data types in databend. The following table shows the correspondence between the two types. + +| DataX Type | Kafka Message Type | +|------------|--------------------| +| INT | INT | +| LONG | LONG | +| STRING | STRING | +| DOUBLE | FLOAT | +| BOOL | BOOL | +| DATE | DATE | +| BYTES | BYTES | + + +## 4 Performance Test + + +## 5 Restrictions +Currently, complex data type support is not stable, if you want to use complex data type such as tuple, array, please check further release version of databend and jdbc driver. + +## FAQ diff --git a/kafkawriter/pom.xml b/kafkawriter/pom.xml new file mode 100755 index 00000000..0ec20352 --- /dev/null +++ b/kafkawriter/pom.xml @@ -0,0 +1,122 @@ + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + kafkawriter + kafkawriter + jar + + 1.2.14 + 3.4.0 + 2.3.1 + UTF-8 + + + + + nexus-snapshots + http://artifactory.zpapps.vn:8081/repository/mep-common-snapshots/ + + + nexus-releases + http://artifactory.zpapps.vn:8081/repository/mep-common-releases/ + + + + + + org.apache.kafka + kafka-clients + 2.3.1 + + + commons-logging + commons-logging + 1.2 + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + + com.alibaba.datax + plugin-rdbms-util + ${datax-project-version} + + + + mysql + mysql-connector-java + ${mysql.driver.version} + + + + org.apache.kafka + kafka-clients + ${kafka.clients.version} + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.3.2 + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + diff --git a/kafkawriter/src/main/assembly/package.xml b/kafkawriter/src/main/assembly/package.xml new file mode 100755 index 00000000..ea9a54da --- /dev/null +++ b/kafkawriter/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/kafkawriter + + + target/ + + kafkawriter-0.0.1-SNAPSHOT.jar + + plugin/writer/kafkawriter + + + + + + false + plugin/writer/kafkawriter/libs + runtime + + + diff --git a/kafkawriter/src/main/java/com/alibaba/datax/plugin/writer/kafkawriter/KafkaWriter.java b/kafkawriter/src/main/java/com/alibaba/datax/plugin/writer/kafkawriter/KafkaWriter.java new file mode 100755 index 00000000..0e9312df --- /dev/null +++ b/kafkawriter/src/main/java/com/alibaba/datax/plugin/writer/kafkawriter/KafkaWriter.java @@ -0,0 +1,161 @@ +package com.alibaba.datax.plugin.writer.kafkawriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.fastjson2.JSONObject; +import kafka.config.KafkaConfig; +import kafka.producer.BaseKafkaProducer; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class KafkaWriter extends Writer { + public static class Job extends Writer.Job { + private static final Logger LOG = LoggerFactory + .getLogger(Job.class); + + private Configuration originalConfig; + + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + String topic = this.originalConfig.getString(Key.TOPIC, null); + String brokers = this.originalConfig.getString(Key.BROKERS, null); + validateKafkaConfig(topic, brokers); + } + + private void validateKafkaConfig(String topic, String brokers) { + if (!StringUtils.isNoneBlank(topic)) { + throw DataXException.asDataXException( + KafkaWriterErrorCode.ILLEGAL_VALUE, + String.format("Config topic is invalid: [%s]", topic) + ); + } + if (!StringUtils.isNoneBlank(brokers)) { + throw DataXException.asDataXException( + KafkaWriterErrorCode.ILLEGAL_VALUE, + String.format("Config brokers is invalid: [%s]", brokers) + ); + } + } + + @Override + public void prepare() { + } + + @Override + public List split(int mandatoryNumber) { + List writerSplitConfigs = new ArrayList(); + for (int i = 0; i < mandatoryNumber; i++) { + writerSplitConfigs.add(this.originalConfig); + } + + return writerSplitConfigs; + } + + @Override + public void post() { + } + + @Override + public void destroy() { + } + } + + public static class Task extends Writer.Task { + private static final Logger LOG = LoggerFactory + .getLogger(Task.class); + + private Configuration writerSliceConfig; + + private String topic; + + private String brokers; + + private boolean isSilence; + + private List column; + + private BaseKafkaProducer producer; + + @Override + public void init() { + this.writerSliceConfig = getPluginJobConf(); + + this.topic = this.writerSliceConfig.getString(Key.TOPIC, null); + this.brokers = this.writerSliceConfig.getString(Key.BROKERS, null); + this.isSilence = this.writerSliceConfig.getBool(Key.IS_SILENCE, false); + + this.column = this.writerSliceConfig.getList(Key.COLUMN, String.class); + + KafkaConfig kafkaConfig = new KafkaConfig(); + kafkaConfig.brokers = brokers; + kafkaConfig.topic = topic; + this.producer = new BaseKafkaProducer(kafkaConfig); + } + + @Override + public void prepare() { + } + + @Override + public void startWrite(RecordReceiver recordReceiver) { + try { + Record r; + while ((r = recordReceiver.getFromReader()) != null) { + JSONObject msg = new JSONObject(); + for (int i = 0; i < r.getColumnNumber(); i++) { + switch (r.getColumn(i).getType()){ + case LONG: + msg.put(this.column.get(i), r.getColumn(i).asLong()); + break; + case BOOL: + msg.put(this.column.get(i), r.getColumn(i).asBoolean()); + break; + case DATE: + msg.put(this.column.get(i), r.getColumn(i).asDate()); + break; + case DOUBLE: + msg.put(this.column.get(i), r.getColumn(i).asDouble()); + break; + case BYTES: + msg.put(this.column.get(i), r.getColumn(i).asBytes()); + break; + case STRING: + msg.put(this.column.get(i), r.getColumn(i).asString()); + break; + default: + throw new Exception(String.format("Invalid column type [%s]: %s", + this.column.get(i),r.getColumn(i).getType())); + } + } + + String raw = msg.toJSONString(); + if (!isSilence) { + LOG.info("start write kafka :[{}]. context info:{}.", this.topic, raw); + } + this.producer.sendMessage(raw); + } + } catch (Exception e) { + throw DataXException.asDataXException(KafkaWriterErrorCode.RUNTIME_EXCEPTION, e); + } + } + + @Override + public void post() { + + } + + @Override + public void destroy() { + } + } + +} diff --git a/kafkawriter/src/main/java/com/alibaba/datax/plugin/writer/kafkawriter/KafkaWriterErrorCode.java b/kafkawriter/src/main/java/com/alibaba/datax/plugin/writer/kafkawriter/KafkaWriterErrorCode.java new file mode 100755 index 00000000..180ff4a5 --- /dev/null +++ b/kafkawriter/src/main/java/com/alibaba/datax/plugin/writer/kafkawriter/KafkaWriterErrorCode.java @@ -0,0 +1,36 @@ +package com.alibaba.datax.plugin.writer.kafkawriter; + +import com.alibaba.datax.common.spi.ErrorCode; + +public enum KafkaWriterErrorCode implements ErrorCode { + RUNTIME_EXCEPTION("KafkaWriter-00", "Runtime exception"), + ILLEGAL_VALUE("KafkaWriter-01", "Config value invalid."), + CONFIG_INVALID_EXCEPTION("KafkaWriter-02", "Config invalid."), + NETWORK_ERROR_KAFKA_WRITER("NetworkKafkaWriter-03", "Can not connect to Kafka Producer"); + + + + private final String code; + private final String description; + + private KafkaWriterErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s]. ", this.code, + this.description); + } +} diff --git a/kafkawriter/src/main/java/com/alibaba/datax/plugin/writer/kafkawriter/Key.java b/kafkawriter/src/main/java/com/alibaba/datax/plugin/writer/kafkawriter/Key.java new file mode 100755 index 00000000..56bea07a --- /dev/null +++ b/kafkawriter/src/main/java/com/alibaba/datax/plugin/writer/kafkawriter/Key.java @@ -0,0 +1,12 @@ +package com.alibaba.datax.plugin.writer.kafkawriter; + +public class Key { + public static final String IS_SILENCE = "isSilence"; + + public static final String BROKERS = "brokers"; + + public static final String TOPIC = "topic"; + + public static final String COLUMN = "column"; + +} diff --git a/kafkawriter/src/main/java/kafka/config/DNSLookupMode.java b/kafkawriter/src/main/java/kafka/config/DNSLookupMode.java new file mode 100644 index 00000000..56dcea55 --- /dev/null +++ b/kafkawriter/src/main/java/kafka/config/DNSLookupMode.java @@ -0,0 +1,10 @@ +package kafka.config; + +public class DNSLookupMode { + public static final String DEFAULT = "default"; + public static final String USE_ALL_DNS_IPS = "use_all_dns_ips"; + public static final String RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY = "resolve_canonical_bootstrap_servers_only"; + + private DNSLookupMode() { + } +} diff --git a/kafkawriter/src/main/java/kafka/config/KafkaConfig.java b/kafkawriter/src/main/java/kafka/config/KafkaConfig.java new file mode 100644 index 00000000..7669de93 --- /dev/null +++ b/kafkawriter/src/main/java/kafka/config/KafkaConfig.java @@ -0,0 +1,54 @@ +package kafka.config; + + +public class KafkaConfig { + + public String brokers; + public String topic; + + public String groupID = "default"; + public int noComsumer = 8; + public int noProducerPoolSize = 1; + + public int sessionTimoutMs = 30000; + public int commitIntervalMs = 30000; + + //Added by BangDQ + public String acks = "all";//acks=0 not wait, acks=1 , acks=-1 or all + public String key_serializer = "org.apache.kafka.common.serialization.StringSerializer";//string key + public String value_serializer = "org.apache.kafka.common.serialization.StringSerializer";//string value + public long buffer_memory = 33554432; + public int retries = 0; + public int batch_size = 200;//0 : disable this function, default = 16384 + public long linger_ms = 0; //0:no delay util batch.size got + //for consumer + public int max_poll_records = 100; //default 500 + + public String dnsLookup = DNSLookupMode.USE_ALL_DNS_IPS; + + public String getKeyString() { + return String.format("%s_%s_%s", brokers, groupID, topic); + } + + @Override + public String toString() { + return "KafkaConfig{" + + "brokers='" + brokers + '\'' + + ", topic='" + topic + '\'' + + ", groupID='" + groupID + '\'' + + ", noComsumer=" + noComsumer + + ", noProducerPoolSize=" + noProducerPoolSize + + ", sessionTimoutMs=" + sessionTimoutMs + + ", commitIntervalMs=" + commitIntervalMs + + ", acks='" + acks + '\'' + + ", key_serializer='" + key_serializer + '\'' + + ", value_serializer='" + value_serializer + '\'' + + ", buffer_memory=" + buffer_memory + + ", retries=" + retries + + ", batch_size=" + batch_size + + ", linger_ms=" + linger_ms + + ", max_poll_records=" + max_poll_records + + ", dnsLookup='" + dnsLookup + '\'' + + '}'; + } +} diff --git a/kafkawriter/src/main/java/kafka/producer/BaseKafkaProducer.java b/kafkawriter/src/main/java/kafka/producer/BaseKafkaProducer.java new file mode 100644 index 00000000..65ae6172 --- /dev/null +++ b/kafkawriter/src/main/java/kafka/producer/BaseKafkaProducer.java @@ -0,0 +1,77 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package kafka.producer; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import kafka.config.KafkaConfig; + +import java.util.Properties; + +public class BaseKafkaProducer { + + private static final Log LOGGER = LogFactory.getLog(BaseKafkaProducer.class); + + private KafkaProducer producer; + private KafkaConfig config; + + public BaseKafkaProducer(KafkaConfig config) { + try { + this.config = config; + + Properties prop = createProducerConfig(config); + producer = new KafkaProducer<>(prop); + } catch (Exception ex) { + LOGGER.error(ex.getMessage()); + } + } + + public boolean sendMessage(String msg) { + return sendMessage(config.topic, msg); + } + + public boolean sendMessage(String topic, String msg) { + return sendMessage(new ProducerRecord<>(topic, msg)); + } + + public boolean sendMessage(ProducerRecord pRecord) { + try { + producer.send(pRecord); + return true; + } catch (Exception ex) { + LOGGER.error("KafkaProducer send error!", ex); + return false; + } + } + + private static Properties createProducerConfig(KafkaConfig config) { + Properties props = new Properties(); + props.put("bootstrap.servers", config.brokers); + props.put("acks", config.acks); + props.put("retries", config.retries); + props.put("batch.size", config.batch_size); + props.put("linger.ms", config.linger_ms); + props.put("buffer.memory", config.buffer_memory); + props.put("key.serializer", config.key_serializer); + props.put("value.serializer", config.value_serializer); + props.put("client.dns.lookup", config.dnsLookup); + return props; + } + + public void close() { + try { + if (producer != null) { + producer.flush(); + producer.close(); + } + } catch (Exception ex) { + LOGGER.error("Close producer error!", ex); + } + } +} diff --git a/kafkawriter/src/main/java/kafka/producer/KafkaProducerManager.java b/kafkawriter/src/main/java/kafka/producer/KafkaProducerManager.java new file mode 100644 index 00000000..89bf25ea --- /dev/null +++ b/kafkawriter/src/main/java/kafka/producer/KafkaProducerManager.java @@ -0,0 +1,45 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package kafka.producer; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import kafka.config.KafkaConfig; + +import java.util.HashMap; +import java.util.Map; + +public class KafkaProducerManager { + private static final Logger logger = LoggerFactory.getLogger(KafkaProducerManager.class); + private static final Map poolMap = new HashMap<>(); + + private KafkaProducerManager() { + } + + private static String createMapKey(KafkaConfig config) { + return String.format("%s", config.getKeyString()); + } + + public static void initProducer(KafkaConfig config) throws Exception { + KafkaProducerPool producer = new KafkaProducerPool(config); + poolMap.put(createMapKey(config), producer); + } + + public static BaseKafkaProducer getClient(KafkaConfig config) { + return poolMap.get(createMapKey(config)).getClient(); + } + + public static void shutdownALLConnections() { + for (KafkaProducerPool pool : poolMap.values()) { + try { + pool.shutDownALLConnections(); + } catch (Exception ex) { + logger.error("Exception when shutdown {}", ex.getMessage()); + } + } + } +} diff --git a/kafkawriter/src/main/java/kafka/producer/KafkaProducerPool.java b/kafkawriter/src/main/java/kafka/producer/KafkaProducerPool.java new file mode 100644 index 00000000..616cd42f --- /dev/null +++ b/kafkawriter/src/main/java/kafka/producer/KafkaProducerPool.java @@ -0,0 +1,55 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package kafka.producer; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import kafka.config.KafkaConfig; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class KafkaProducerPool { + private static final Log LOGGER = LogFactory.getLog(KafkaProducerPool.class); + + private final Map kafkaClientMap = new HashMap<>(); + private final AtomicInteger usingIndex; + private final KafkaConfig kafkaConfig; + + private String createMapKey(KafkaConfig kafkaConfig, int pos) { + return String.format("%s_%d", kafkaConfig.getKeyString(), pos); + } + + public KafkaProducerPool(KafkaConfig config) { + kafkaConfig = config; + usingIndex = new AtomicInteger(0); + for (int i = 0; i < kafkaConfig.noProducerPoolSize; i++) { + BaseKafkaProducer custom = new BaseKafkaProducer(kafkaConfig); + kafkaClientMap.put(createMapKey(kafkaConfig, i), custom); + } + } + + public BaseKafkaProducer getClient() { + int index = usingIndex.incrementAndGet() % kafkaConfig.noProducerPoolSize; + if (index >= 1000) { + usingIndex.set(0); + } + + return kafkaClientMap.get(createMapKey(kafkaConfig, index)); + } + + public void shutDownALLConnections() { + for (BaseKafkaProducer producer : kafkaClientMap.values()) { + try { + producer.close(); + } catch (Exception ex) { + LOGGER.error(String.format("Exception when shutdown grpcClient %s =====>", kafkaConfig.getKeyString()), ex); + } + } + } +} diff --git a/kafkawriter/src/main/java/kafka/utils/KafkaUtils.java b/kafkawriter/src/main/java/kafka/utils/KafkaUtils.java new file mode 100644 index 00000000..95f9a2e9 --- /dev/null +++ b/kafkawriter/src/main/java/kafka/utils/KafkaUtils.java @@ -0,0 +1,39 @@ +package kafka.utils; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import kafka.config.KafkaConfig; +import kafka.producer.BaseKafkaProducer; +import kafka.producer.KafkaProducerManager; + + +/** + * @author huypva + */ +public class KafkaUtils { + private KafkaUtils() { + } + + private static final Log LOGGER = LogFactory.getLog(KafkaUtils.class); + + public static boolean sendMsg(KafkaConfig config, String msg) { + try { + BaseKafkaProducer producer = KafkaProducerManager.getClient(config); + return producer.sendMessage(msg); + } catch (Exception ex) { + LOGGER.error("Send msg to kafka error!" + ex.getMessage(), ex); + return false; + } + } + + public static boolean sendMsg(KafkaConfig config, String topic, String msg) { + try { + BaseKafkaProducer producer = KafkaProducerManager.getClient(config); + return producer.sendMessage(topic, msg); + } catch (Exception ex) { + LOGGER.error("Send msg to kafka error!" + ex.getMessage(), ex); + return false; + } + } + +} diff --git a/kafkawriter/src/main/resources/plugin.json b/kafkawriter/src/main/resources/plugin.json new file mode 100755 index 00000000..441452f3 --- /dev/null +++ b/kafkawriter/src/main/resources/plugin.json @@ -0,0 +1,7 @@ +{ + "name": "kafkawriter", + "class": "com.alibaba.datax.plugin.writer.kafkawriter.KafkaWriter", + "description": "push stringify record json struct to kafka", + "developer": "linhdmn", + "contributor": "dienvt" +} \ No newline at end of file diff --git a/kafkawriter/src/main/resources/plugin_job_template.json b/kafkawriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..304b7764 --- /dev/null +++ b/kafkawriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,7 @@ +{ + "name": "kafkawriter", + "parameter": { + "brokers": "", + "topic": "" + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 1b364a75..8452d2ea 100644 --- a/pom.xml +++ b/pom.xml @@ -112,6 +112,7 @@ hdfswriter txtfilewriter streamwriter + kafkawriter elasticsearchwriter mongodbwriter