feat: support Kafka Writer

This commit is contained in:
LinhDMN 2023-07-27 17:10:17 +07:00 committed by dienvt
parent 3614c2633e
commit 7e3bae2d96
15 changed files with 770 additions and 0 deletions

View File

@ -0,0 +1,109 @@
# DataX KafkaWriter
## 1 Introduction
KafkaWriter is a plugin for DataX to write data to Kafka Queue from dataX records.
The plugin is based on [Apache Kafka driver](https://github.com/apache/kafka)
to produce message to queue..
## 2 Detailed Implementation
KafkaWriter would use DataX to fetch records generated by DataX Reader, and then during each write batch, kafka writer
iterate each record and parse it into JSON string with types supported and produce that message to Kafka.
## 3 Features
### 3.1 Example Configurations
* the following configuration would read some generated data in memory and produce data into Kafka topic.
#### Preparation
#### Configurations
```json
{
"job": {
"content": [
{
"reader": {
"name": "cassandrareader",
"parameter": {
"host": "<cassandra-host>",
"port": 9042,
"username": "<username>",
"password": "<pass>",
"consitancyLevel": "ONE",
"useSSL": false,
"keyspace": "<keyspace>",
"table": "<your_table>",
"column": [
"col1",
"col2",
"col3"
]
}
},
"writer": {
"name": "kafkawriter",
"parameter": {
"isSilence": true,
"topic": "<your_topic>",
"brokers": "<your_broker_addresses>",
"column": [
"col1",
"col2",
"col3"
]
}
}
}
],
"setting": {
"speed": {
"channel": "10"
}
}
}
}
```
### 3.2 Configuration Description
* isSilence
* Description:
* Required: yes
* Default: True
* topic
* Description: desired topic kafka
* Required: yes
* Default: ""
* Example: "DEV.MY-TOPIC"
* brokers
* Description: 1 or more brokers kafka address, seperate by ","
* Required: yes
* Default: ""
* Example: "addr1,addr2"
* column
* Description: A list of keys in Kafka JSON message.
* Required: yes
* Default: [""]
* Example: ["key1", "key2", "key3"]
### 3.3 Type Convert
Data types in datax can be converted to the corresponding data types in databend. The following table shows the correspondence between the two types.
| DataX Type | Kafka Message Type |
|------------|--------------------|
| INT | INT |
| LONG | LONG |
| STRING | STRING |
| DOUBLE | FLOAT |
| BOOL | BOOL |
| DATE | DATE |
| BYTES | BYTES |
## 4 Performance Test
## 5 Restrictions
Currently, complex data type support is not stable, if you want to use complex data type such as tuple, array, please check further release version of databend and jdbc driver.
## FAQ

122
kafkawriter/pom.xml Executable file
View File

@ -0,0 +1,122 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>kafkawriter</artifactId>
<name>kafkawriter</name>
<packaging>jar</packaging>
<properties>
<log4j.version>1.2.14</log4j.version>
<protobuf.version>3.4.0</protobuf.version>
<kafka.clients.version>2.3.1</kafka.clients.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<distributionManagement>
<snapshotRepository>
<id>nexus-snapshots</id>
<url>http://artifactory.zpapps.vn:8081/repository/mep-common-snapshots/</url>
</snapshotRepository>
<repository>
<id>nexus-releases</id>
<url>http://artifactory.zpapps.vn:8081/repository/mep-common-releases/</url>
</repository>
</distributionManagement>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.driver.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.clients.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,35 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/kafkawriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>kafkawriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/kafkawriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/kafkawriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,161 @@
package com.alibaba.datax.plugin.writer.kafkawriter;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson2.JSONObject;
import kafka.config.KafkaConfig;
import kafka.producer.BaseKafkaProducer;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
public class KafkaWriter extends Writer {
public static class Job extends Writer.Job {
private static final Logger LOG = LoggerFactory
.getLogger(Job.class);
private Configuration originalConfig;
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
String topic = this.originalConfig.getString(Key.TOPIC, null);
String brokers = this.originalConfig.getString(Key.BROKERS, null);
validateKafkaConfig(topic, brokers);
}
private void validateKafkaConfig(String topic, String brokers) {
if (!StringUtils.isNoneBlank(topic)) {
throw DataXException.asDataXException(
KafkaWriterErrorCode.ILLEGAL_VALUE,
String.format("Config topic is invalid: [%s]", topic)
);
}
if (!StringUtils.isNoneBlank(brokers)) {
throw DataXException.asDataXException(
KafkaWriterErrorCode.ILLEGAL_VALUE,
String.format("Config brokers is invalid: [%s]", brokers)
);
}
}
@Override
public void prepare() {
}
@Override
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> writerSplitConfigs = new ArrayList<Configuration>();
for (int i = 0; i < mandatoryNumber; i++) {
writerSplitConfigs.add(this.originalConfig);
}
return writerSplitConfigs;
}
@Override
public void post() {
}
@Override
public void destroy() {
}
}
public static class Task extends Writer.Task {
private static final Logger LOG = LoggerFactory
.getLogger(Task.class);
private Configuration writerSliceConfig;
private String topic;
private String brokers;
private boolean isSilence;
private List<String> column;
private BaseKafkaProducer producer;
@Override
public void init() {
this.writerSliceConfig = getPluginJobConf();
this.topic = this.writerSliceConfig.getString(Key.TOPIC, null);
this.brokers = this.writerSliceConfig.getString(Key.BROKERS, null);
this.isSilence = this.writerSliceConfig.getBool(Key.IS_SILENCE, false);
this.column = this.writerSliceConfig.getList(Key.COLUMN, String.class);
KafkaConfig kafkaConfig = new KafkaConfig();
kafkaConfig.brokers = brokers;
kafkaConfig.topic = topic;
this.producer = new BaseKafkaProducer(kafkaConfig);
}
@Override
public void prepare() {
}
@Override
public void startWrite(RecordReceiver recordReceiver) {
try {
Record r;
while ((r = recordReceiver.getFromReader()) != null) {
JSONObject msg = new JSONObject();
for (int i = 0; i < r.getColumnNumber(); i++) {
switch (r.getColumn(i).getType()){
case LONG:
msg.put(this.column.get(i), r.getColumn(i).asLong());
break;
case BOOL:
msg.put(this.column.get(i), r.getColumn(i).asBoolean());
break;
case DATE:
msg.put(this.column.get(i), r.getColumn(i).asDate());
break;
case DOUBLE:
msg.put(this.column.get(i), r.getColumn(i).asDouble());
break;
case BYTES:
msg.put(this.column.get(i), r.getColumn(i).asBytes());
break;
case STRING:
msg.put(this.column.get(i), r.getColumn(i).asString());
break;
default:
throw new Exception(String.format("Invalid column type [%s]: %s",
this.column.get(i),r.getColumn(i).getType()));
}
}
String raw = msg.toJSONString();
if (!isSilence) {
LOG.info("start write kafka :[{}]. context info:{}.", this.topic, raw);
}
this.producer.sendMessage(raw);
}
} catch (Exception e) {
throw DataXException.asDataXException(KafkaWriterErrorCode.RUNTIME_EXCEPTION, e);
}
}
@Override
public void post() {
}
@Override
public void destroy() {
}
}
}

View File

@ -0,0 +1,36 @@
package com.alibaba.datax.plugin.writer.kafkawriter;
import com.alibaba.datax.common.spi.ErrorCode;
public enum KafkaWriterErrorCode implements ErrorCode {
RUNTIME_EXCEPTION("KafkaWriter-00", "Runtime exception"),
ILLEGAL_VALUE("KafkaWriter-01", "Config value invalid."),
CONFIG_INVALID_EXCEPTION("KafkaWriter-02", "Config invalid."),
NETWORK_ERROR_KAFKA_WRITER("NetworkKafkaWriter-03", "Can not connect to Kafka Producer");
private final String code;
private final String description;
private KafkaWriterErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s]. ", this.code,
this.description);
}
}

View File

@ -0,0 +1,12 @@
package com.alibaba.datax.plugin.writer.kafkawriter;
public class Key {
public static final String IS_SILENCE = "isSilence";
public static final String BROKERS = "brokers";
public static final String TOPIC = "topic";
public static final String COLUMN = "column";
}

View File

@ -0,0 +1,10 @@
package kafka.config;
public class DNSLookupMode {
public static final String DEFAULT = "default";
public static final String USE_ALL_DNS_IPS = "use_all_dns_ips";
public static final String RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY = "resolve_canonical_bootstrap_servers_only";
private DNSLookupMode() {
}
}

View File

@ -0,0 +1,54 @@
package kafka.config;
public class KafkaConfig {
public String brokers;
public String topic;
public String groupID = "default";
public int noComsumer = 8;
public int noProducerPoolSize = 1;
public int sessionTimoutMs = 30000;
public int commitIntervalMs = 30000;
//Added by BangDQ
public String acks = "all";//acks=0 not wait, acks=1 , acks=-1 or all
public String key_serializer = "org.apache.kafka.common.serialization.StringSerializer";//string key
public String value_serializer = "org.apache.kafka.common.serialization.StringSerializer";//string value
public long buffer_memory = 33554432;
public int retries = 0;
public int batch_size = 200;//0 : disable this function, default = 16384
public long linger_ms = 0; //0:no delay util batch.size got
//for consumer
public int max_poll_records = 100; //default 500
public String dnsLookup = DNSLookupMode.USE_ALL_DNS_IPS;
public String getKeyString() {
return String.format("%s_%s_%s", brokers, groupID, topic);
}
@Override
public String toString() {
return "KafkaConfig{" +
"brokers='" + brokers + '\'' +
", topic='" + topic + '\'' +
", groupID='" + groupID + '\'' +
", noComsumer=" + noComsumer +
", noProducerPoolSize=" + noProducerPoolSize +
", sessionTimoutMs=" + sessionTimoutMs +
", commitIntervalMs=" + commitIntervalMs +
", acks='" + acks + '\'' +
", key_serializer='" + key_serializer + '\'' +
", value_serializer='" + value_serializer + '\'' +
", buffer_memory=" + buffer_memory +
", retries=" + retries +
", batch_size=" + batch_size +
", linger_ms=" + linger_ms +
", max_poll_records=" + max_poll_records +
", dnsLookup='" + dnsLookup + '\'' +
'}';
}
}

View File

@ -0,0 +1,77 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package kafka.producer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import kafka.config.KafkaConfig;
import java.util.Properties;
public class BaseKafkaProducer {
private static final Log LOGGER = LogFactory.getLog(BaseKafkaProducer.class);
private KafkaProducer<String, String> producer;
private KafkaConfig config;
public BaseKafkaProducer(KafkaConfig config) {
try {
this.config = config;
Properties prop = createProducerConfig(config);
producer = new KafkaProducer<>(prop);
} catch (Exception ex) {
LOGGER.error(ex.getMessage());
}
}
public boolean sendMessage(String msg) {
return sendMessage(config.topic, msg);
}
public boolean sendMessage(String topic, String msg) {
return sendMessage(new ProducerRecord<>(topic, msg));
}
public boolean sendMessage(ProducerRecord<String, String> pRecord) {
try {
producer.send(pRecord);
return true;
} catch (Exception ex) {
LOGGER.error("KafkaProducer send error!", ex);
return false;
}
}
private static Properties createProducerConfig(KafkaConfig config) {
Properties props = new Properties();
props.put("bootstrap.servers", config.brokers);
props.put("acks", config.acks);
props.put("retries", config.retries);
props.put("batch.size", config.batch_size);
props.put("linger.ms", config.linger_ms);
props.put("buffer.memory", config.buffer_memory);
props.put("key.serializer", config.key_serializer);
props.put("value.serializer", config.value_serializer);
props.put("client.dns.lookup", config.dnsLookup);
return props;
}
public void close() {
try {
if (producer != null) {
producer.flush();
producer.close();
}
} catch (Exception ex) {
LOGGER.error("Close producer error!", ex);
}
}
}

View File

@ -0,0 +1,45 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package kafka.producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.config.KafkaConfig;
import java.util.HashMap;
import java.util.Map;
public class KafkaProducerManager {
private static final Logger logger = LoggerFactory.getLogger(KafkaProducerManager.class);
private static final Map<String, KafkaProducerPool> poolMap = new HashMap<>();
private KafkaProducerManager() {
}
private static String createMapKey(KafkaConfig config) {
return String.format("%s", config.getKeyString());
}
public static void initProducer(KafkaConfig config) throws Exception {
KafkaProducerPool producer = new KafkaProducerPool(config);
poolMap.put(createMapKey(config), producer);
}
public static BaseKafkaProducer getClient(KafkaConfig config) {
return poolMap.get(createMapKey(config)).getClient();
}
public static void shutdownALLConnections() {
for (KafkaProducerPool pool : poolMap.values()) {
try {
pool.shutDownALLConnections();
} catch (Exception ex) {
logger.error("Exception when shutdown {}", ex.getMessage());
}
}
}
}

View File

@ -0,0 +1,55 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package kafka.producer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import kafka.config.KafkaConfig;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class KafkaProducerPool {
private static final Log LOGGER = LogFactory.getLog(KafkaProducerPool.class);
private final Map<String, BaseKafkaProducer> kafkaClientMap = new HashMap<>();
private final AtomicInteger usingIndex;
private final KafkaConfig kafkaConfig;
private String createMapKey(KafkaConfig kafkaConfig, int pos) {
return String.format("%s_%d", kafkaConfig.getKeyString(), pos);
}
public KafkaProducerPool(KafkaConfig config) {
kafkaConfig = config;
usingIndex = new AtomicInteger(0);
for (int i = 0; i < kafkaConfig.noProducerPoolSize; i++) {
BaseKafkaProducer custom = new BaseKafkaProducer(kafkaConfig);
kafkaClientMap.put(createMapKey(kafkaConfig, i), custom);
}
}
public BaseKafkaProducer getClient() {
int index = usingIndex.incrementAndGet() % kafkaConfig.noProducerPoolSize;
if (index >= 1000) {
usingIndex.set(0);
}
return kafkaClientMap.get(createMapKey(kafkaConfig, index));
}
public void shutDownALLConnections() {
for (BaseKafkaProducer producer : kafkaClientMap.values()) {
try {
producer.close();
} catch (Exception ex) {
LOGGER.error(String.format("Exception when shutdown grpcClient %s =====>", kafkaConfig.getKeyString()), ex);
}
}
}
}

View File

@ -0,0 +1,39 @@
package kafka.utils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import kafka.config.KafkaConfig;
import kafka.producer.BaseKafkaProducer;
import kafka.producer.KafkaProducerManager;
/**
* @author huypva
*/
public class KafkaUtils {
private KafkaUtils() {
}
private static final Log LOGGER = LogFactory.getLog(KafkaUtils.class);
public static boolean sendMsg(KafkaConfig config, String msg) {
try {
BaseKafkaProducer producer = KafkaProducerManager.getClient(config);
return producer.sendMessage(msg);
} catch (Exception ex) {
LOGGER.error("Send msg to kafka error!" + ex.getMessage(), ex);
return false;
}
}
public static boolean sendMsg(KafkaConfig config, String topic, String msg) {
try {
BaseKafkaProducer producer = KafkaProducerManager.getClient(config);
return producer.sendMessage(topic, msg);
} catch (Exception ex) {
LOGGER.error("Send msg to kafka error!" + ex.getMessage(), ex);
return false;
}
}
}

View File

@ -0,0 +1,7 @@
{
"name": "kafkawriter",
"class": "com.alibaba.datax.plugin.writer.kafkawriter.KafkaWriter",
"description": "push stringify record json struct to kafka",
"developer": "linhdmn",
"contributor": "dienvt"
}

View File

@ -0,0 +1,7 @@
{
"name": "kafkawriter",
"parameter": {
"brokers": "",
"topic": ""
}
}

View File

@ -112,6 +112,7 @@
<module>hdfswriter</module>
<module>txtfilewriter</module>
<module>streamwriter</module>
<module>kafkawriter</module>
<module>elasticsearchwriter</module>
<module>mongodbwriter</module>