diff --git a/tdenginewriter/doc/tdenginewriter.md b/tdenginewriter/doc/tdenginewriter.md
new file mode 100644
index 00000000..062fac2c
--- /dev/null
+++ b/tdenginewriter/doc/tdenginewriter.md
@@ -0,0 +1,398 @@
+# DataX TDengineWriter
+
+## 1 快速介绍
+
+TDengineWriter插件实现了写入数据到TDengine数据库功能。可用于离线同步其它数据库的数据到TDengine。
+
+## 2 实现原理
+
+TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据,根据reader的类型解析数据。目前有两种写入方式:
+
+1. 对于OpenTSDBReader, TDengineWriter通过JNI方式调用TDengine客户端库文件(taos.lib或taos.dll)中的方法,使用[schemaless的方式](https://www.taosdata.com/cn/documentation/insert#schemaless)写入。
+
+2. 对于其它数据源,会根据配置生成SQL语句, 通过[taos-jdbcdriver](https://www.taosdata.com/cn/documentation/connector/java)批量写入。
+
+这样区分的原因是OpenTSDBReader将opentsdb的数据统一读取为json字符串,Writer端接收到的数据只有1列。而其它Reader插件一般会把数据放在不同列。
+
+## 3 功能说明
+### 3.1 从OpenTSDB到TDengine
+#### 3.1.1 配置样例
+
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "opentsdbreader",
+ "parameter": {
+ "endpoint": "http://192.168.1.180:4242",
+ "column": [
+ "weather_temperature"
+ ],
+ "beginDateTime": "2021-01-01 00:00:00",
+ "endDateTime": "2021-01-01 01:00:00"
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "host": "192.168.1.180",
+ "port": 6030,
+ "dbname": "test",
+ "user": "root",
+ "password": "taosdata"
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
+```
+
+#### 3.1.2 参数说明
+
+| 参数 | 描述 | 是否必选 | 默认值 |
+| --------- | -------------------- | -------- | -------- |
+| host | TDengine实例的host | 是 | 无 |
+| port | TDengine实例的port | 是 | 无 |
+| user | TDengine实例的用户名 | 否 | root |
+| password | TDengine实例的密码 | 否 | taosdata |
+| dbname | 目的数据库的名称 | 是 | 无 |
+| batchSize | 每次批量插入多少记录 | 否 | 1 |
+
+
+#### 3.1.3 类型转换
+
+目前,由于OpenTSDBReader将opentsdb的数据统一读取为json字符串,TDengineWriter 在做Opentsdb到TDengine的迁移时,按照以下类型进行处理:
+
+| OpenTSDB数据类型 | DataX 内部类型 | TDengine 数据类型 |
+| ---------------- | -------------- | ----------------- |
+| timestamp | Date | timestamp |
+| Integer(value) | Double | double |
+| Float(value) | Double | double |
+| String(value) | String | binary |
+| Integer(tag) | String | binary |
+| Float(tag) | String | binary |
+| String(tag) | String | binary |
+
+### 3.2 从MongoDB到TDengine
+
+#### 3.2.1 配置样例
+```json
+{
+ "job": {
+ "setting": {
+ "speed": {
+ "channel": 2
+ }
+ },
+ "content": [
+ {
+ "reader": {
+ "name": "mongodbreader",
+ "parameter": {
+ "address": [
+ "127.0.0.1:27017"
+ ],
+ "userName": "user",
+ "mechanism": "SCRAM-SHA-1",
+ "userPassword": "password",
+ "authDb": "admin",
+ "dbName": "test",
+ "collectionName": "stock",
+ "column": [
+ {
+ "name": "stockID",
+ "type": "string"
+ },
+ {
+ "name": "tradeTime",
+ "type": "date"
+ },
+ {
+ "name": "lastPrice",
+ "type": "double"
+ },
+ {
+ "name": "askPrice1",
+ "type": "double"
+ },
+ {
+ "name": "bidPrice1",
+ "type": "double"
+ },
+ {
+ "name": "volume",
+ "type": "int"
+ }
+ ]
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "host": "localhost",
+ "port": 6030,
+ "dbname": "test",
+ "user": "root",
+ "password": "taosdata",
+ "stable": "stock",
+ "tagColumn": {
+ "industry": "energy",
+ "stockID": 0
+ },
+ "fieldColumn": {
+ "lastPrice": 2,
+ "askPrice1": 3,
+ "bidPrice1": 4,
+ "volume": 5
+ },
+ "timestampColumn": {
+ "tradeTime": 1
+ }
+ }
+ }
+ }
+ ]
+ }
+}
+```
+
+**注:本配置的writer部分同样适用于关系型数据库**
+
+
+#### 3.2.2 参数说明
+| 参数 | 描述 | 是否必选 | 默认值 | 备注 |
+| --------------- | -------------------- | ---------------- | -------- | ------------------ |
+| host | TDengine实例的host | 是 | 无 |
+| port | TDengine实例的port | 是 | 无 |
+| user | TDengine实例的用户名 | 否 | root |
+| password | TDengine实例的密码 | 否 | taosdata |
+| dbname | 目的数据库的名称 | 是 | 无 |
+| batchSize | 每次批量插入多少记录 | 否 | 1000 |
+| stable | 目标超级表的名称 | 是(OpenTSDB除外) | 无 |
+| tagColumn | 标签列的列名和位置 | 否 | 无 | 位置索引均从0开始 |
+| fieldColumn | 字段列的列名和位置 | 否 | 无 | |
+| timestampColumn | 时间戳列的列名和位置 | 否 | 无 | 时间戳列只能有一个 |
+
+#### 3.2.3 自动建表规则
+##### 3.2.3.1 超级表创建规则
+
+如果配置了tagColumn、 fieldColumn和timestampColumn将会在插入第一条数据前,自动创建超级表。
+数据列的类型从第1条记录自动推断, 标签列默认类型为`NCHAR(64)`, 比如示例配置,可能生成以下建表语句:
+
+```sql
+CREATE STABLE IF NOT EXISTS market_snapshot (
+ tadetime TIMESTAMP,
+ lastprice DOUBLE,
+ askprice1 DOUBLE,
+ bidprice1 DOUBLE,
+ volume INT
+)
+TAGS(
+ industry NCHAR(64),
+ stockID NCHAR(64)
+);
+```
+
+##### 3.2.3.2 子表创建规则
+
+子表结果与超表相同,子表表名生成规则:
+1. 将标签的value 组合成为如下的字符串: `tag_value1!tag_value2!tag_value3`。
+2. 计算该字符串的 MD5 散列值 "md5_val"。
+3. "t_md5val"作为子表名。其中的 "t" 是固定的前缀。
+
+#### 3.2.4 用户提前建表
+
+如果你已经创建好目标超级表,那么tagColumn、 fieldColumn和timestampColumn三个字段均可省略, 插件将通过执行通过`describe stableName`获取表结构的信息。
+此时要求接收到的Record中Column的顺序和执行`describe stableName`返回的列顺序相同, 比如通过`describe stableName`返回以下内容:
+```
+ Field | Type | Length | Note |
+=================================================================================
+ ts | TIMESTAMP | 8 | |
+ current | DOUBLE | 8 | |
+ location | BINARY | 10 | TAG |
+```
+那么插件收到的数据第1列必须代表时间戳,第2列必须代表电流,第3列必须代表位置。
+
+#### 3.2.5 注意事项
+
+1. tagColumn、 fieldColumn和timestampColumn三个字段用于描述目标表的结构信息,这三个配置字段必须同时存在或同时省略。
+2. 如果存在以上三个配置,且目标表也已经存在,则两者必须一致。**一致性**由用户自己保证,插件不做检查。不一致可能会导致插入失败或插入数据错乱。
+3. 插件优先使用配置文件中指定的表结构。
+
+#### 3.2.6 类型转换
+
+| MongoDB 数据类型 | DataX 内部类型 | TDengine 数据类型 |
+| ---------------- | -------------- | ----------------- |
+| int, Long | Long | BIGINT |
+| double | Double | DOUBLE |
+| string, array | String | NCHAR(64) |
+| date | Date | TIMESTAMP |
+| boolean | Boolean | BOOL |
+| bytes | Bytes | BINARY |
+
+### 3.3 从关系型数据库到TDengine
+writer部分的配置规则和上述MongoDB的示例是一样的,这里给出一个MySQL的示例。
+
+#### 3.3.1 MySQL中表结构
+```sql
+CREATE TABLE IF NOT EXISTS weather(
+ station varchar(100),
+ latitude DOUBLE,
+ longtitude DOUBLE,
+ `date` DATE,
+ TMAX int,
+ TMIN int
+)
+```
+
+#### 3.3.2 配置文件示例
+
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "mysqlreader",
+ "parameter": {
+ "username": "root",
+ "password": "passw0rd",
+ "column": [
+ "*"
+ ],
+ "splitPk": "station",
+ "connection": [
+ {
+ "table": [
+ "weather"
+ ],
+ "jdbcUrl": [
+ "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8"
+ ]
+ }
+ ]
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "host": "127.0.0.1",
+ "port": 6030,
+ "dbname": "test",
+ "user": "root",
+ "password": "taosdata",
+ "batchSize": 1000,
+ "stable": "weather",
+ "tagColumn": {
+ "station": 0
+ },
+ "fieldColumn": {
+ "latitude": 1,
+ "longtitude": 2,
+ "tmax": 4,
+ "tmin": 5
+ },
+ "timestampColumn":{
+ "date": 3
+ }
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
+```
+
+
+## 4 性能报告
+
+### 4.1 环境准备
+
+#### 4.1.1 数据特征
+
+建表语句:
+
+单行记录类似于:
+
+#### 4.1.2 机器参数
+
+* 执行DataX的机器参数为:
+ 1. cpu:
+ 2. mem:
+ 3. net: 千兆双网卡
+ 4. disc: DataX 数据不落磁盘,不统计此项
+
+* TDengine数据库机器参数为:
+ 1. cpu:
+ 2. mem:
+ 3. net: 千兆双网卡
+ 4. disc:
+
+#### 4.1.3 DataX jvm 参数
+
+ -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
+
+### 4.2 测试报告
+
+#### 4.2.1 单表测试报告
+
+| 通道数 | DataX速度(Rec/s) | DataX流量(MB/s) | DataX机器网卡流出流量(MB/s) | DataX机器运行负载 | DB网卡进入流量(MB/s) | DB运行负载 | DB TPS |
+| ------ | ---------------- | --------------- | --------------------------- | ----------------- | -------------------- | ---------- | ------ |
+| 1 | | | | | | | |
+| 4 | | | | | | | |
+| 8 | | | | | | | |
+| 16 | | | | | | | |
+| 32 | | | | | | | |
+
+说明:
+
+1. 这里的单表,主键类型为 bigint(20),自增。
+2. batchSize 和 通道个数,对性能影响较大。
+3. 16通道,4096批量提交时,出现 full gc 2次。
+
+#### 4.2.4 性能测试小结
+
+
+## 5 约束限制
+
+1. 本插件自动创建超级表时NCHAR类型的长度固定为64,对于包含长度大于64的字符串的数据源,将不支持。
+2. 标签列不能包含null值,如果包含会被过滤掉。
+
+## FAQ
+
+### 如何选取要同步的数据的范围?
+
+数据范围的选取在Reader插件端配置,对于不同的Reader插件配置方法往往不同。比如对于mysqlreader, 可以用sql语句指定数据范围。对于opentsdbreader, 用beginDateTime和endDateTime两个配置项指定数据范围。
+
+### 如何一次导入多张源表?
+
+如果Reader插件支持一次读多张表,Writer插件就能一次导入多张表。如果Reader不支持多多张表,可以建多个job,分别导入。Writer插件只负责写数据。
+
+### 一张源表导入之后对应TDengine中多少张表?
+
+这是由tagColumn决定的,如果所有tag列的值都相同,那么目标表只有一个。源表有多少不同的tag组合,目标超表就有多少子表。
+
+### 源表和目标表的字段顺序一致吗?
+
+TDengine要求每个表第一列是时间戳列,后边是普通字段,最后是标签列。如果源表不是这个顺序,插件在自动建表是自动调整。
+
+### 插件如何确定各列的数据类型?
+
+根据收到的第一批数据自动推断各列的类型。
+
+### 为什么插入10年前的数据会抛异常`TDengine ERROR (2350): failed to execute batch bind` ?
+
+因为创建数据库的时候,默认保留10年的数据。可以手动指定要保留多长时间的数据,比如:`CREATE DATABASE power KEEP 36500;`。
\ No newline at end of file
diff --git a/tdenginewriter/pom.xml b/tdenginewriter/pom.xml
new file mode 100644
index 00000000..054f2ef8
--- /dev/null
+++ b/tdenginewriter/pom.xml
@@ -0,0 +1,107 @@
+
+
+
+ datax-all
+ com.alibaba.datax
+ 0.0.1-SNAPSHOT
+
+ 4.0.0
+
+ com.alibaba.datax.tdenginewriter
+ tdenginewriter
+ 0.0.1-SNAPSHOT
+
+
+ 8
+ 8
+
+
+
+
+ com.taosdata.jdbc
+ taos-jdbcdriver
+ 2.0.34
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax-project-version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+
+ com.taosdata.jdbc
+ taos-jdbcdriver
+ 2.0.34
+
+
+
+ junit
+ junit
+ ${junit-version}
+ test
+
+
+ org.apache.commons
+ commons-lang3
+ ${commons-lang3-version}
+
+
+
+
+
+
+
+ maven-compiler-plugin
+
+ ${jdk-version}
+ ${jdk-version}
+ ${project-sourceEncoding}
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.12.4
+
+
+
+ **/*Test.java
+
+
+
+
+ true
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/tdenginewriter/src/main/assembly/package.xml b/tdenginewriter/src/main/assembly/package.xml
new file mode 100644
index 00000000..d3b75ea2
--- /dev/null
+++ b/tdenginewriter/src/main/assembly/package.xml
@@ -0,0 +1,34 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/writer/tdenginewriter
+
+
+ target/
+
+ tdenginewriter-0.0.1-SNAPSHOT.jar
+
+ plugin/writer/tdenginewriter
+
+
+
+
+
+ false
+ plugin/writer/tdenginewriter/libs
+ runtime
+
+
+
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java
new file mode 100644
index 00000000..dcc3ca8c
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java
@@ -0,0 +1,12 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import com.alibaba.datax.common.plugin.RecordReceiver;
+
+import com.alibaba.datax.common.plugin.TaskPluginCollector;
+
+import java.util.Properties;
+
+public interface DataHandler {
+
+ long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector);
+}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandlerFactory.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandlerFactory.java
new file mode 100644
index 00000000..1f740d7e
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandlerFactory.java
@@ -0,0 +1,10 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+public class DataHandlerFactory {
+
+ public static DataHandler build(String peerPluginName) {
+ if (peerPluginName.equals("opentsdbreader"))
+ return new OpentsdbDataHandler();
+ return new DefaultDataHandler();
+ }
+}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java
new file mode 100644
index 00000000..91c2b7e3
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java
@@ -0,0 +1,105 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.plugin.TaskPluginCollector;
+import com.taosdata.jdbc.TSDBPreparedStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * 默认DataHandler
+ */
+public class DefaultDataHandler implements DataHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
+
+ static {
+ try {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) {
+ SchemaManager schemaManager = new SchemaManager(properties);
+ if (!schemaManager.configValid()) {
+ return 0;
+ }
+
+ try {
+ Connection conn = getTaosConnection(properties);
+ if (conn == null) {
+ return 0;
+ }
+ if (schemaManager.shouldGuessSchema()) {
+ // 无法从配置文件获取表结构信息,尝试从数据库获取
+ LOG.info(Msg.get("try_get_schema_from_db"));
+ boolean success = schemaManager.getFromDB(conn);
+ if (!success) {
+ return 0;
+ }
+ } else {
+
+ }
+ int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, "1000"));
+ if (batchSize < 5) {
+ // batchSize太小,会增加自动类型推断错误的概率,建议改大后重试
+ LOG.error(Msg.get("batch_size_too_small"));
+ return 0;
+ }
+ return write(lineReceiver, conn, batchSize, schemaManager, collector);
+ } catch (Exception e) {
+ LOG.error("write failed " + e.getMessage());
+ e.printStackTrace();
+ }
+ return 0;
+ }
+
+
+ private Connection getTaosConnection(Properties properties) throws SQLException {
+ // 检查必要参数
+ String host = properties.getProperty(Key.HOST);
+ String port = properties.getProperty(Key.PORT);
+ String dbname = properties.getProperty(Key.DBNAME);
+ String user = properties.getProperty(Key.USER);
+ String password = properties.getProperty(Key.PASSWORD);
+ if (host == null || port == null || dbname == null || user == null || password == null) {
+ String keys = String.join(" ", Key.HOST, Key.PORT, Key.DBNAME, Key.USER, Key.PASSWORD);
+ LOG.error("Required options missing, please check: " + keys);
+ return null;
+ }
+ String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", host, port, dbname, user, password);
+ LOG.info("TDengine connection established, host:{} port:{} dbname:{} user:{}", host, port, dbname, user);
+ return DriverManager.getConnection(jdbcUrl);
+ }
+
+ /**
+ * 使用SQL批量写入
+ *
+ * @return 成功写入记录数
+ * @throws SQLException
+ */
+ private long write(RecordReceiver lineReceiver, Connection conn, int batchSize, SchemaManager scm, TaskPluginCollector collector) throws SQLException {
+ Record record = lineReceiver.getFromReader();
+ if (record == null) {
+ return 0;
+ }
+ String pq = String.format("INSERT INTO ? USING %s TAGS(%s) (%s) values (%s)", scm.getStable(), scm.getTagValuesPlaceHolder(), scm.getJoinedFieldNames(), scm.getFieldValuesPlaceHolder());
+ LOG.info("Prepared SQL: {}", pq);
+ try (TSDBPreparedStatement stmt = (TSDBPreparedStatement) conn.prepareStatement(pq)) {
+ JDBCBatchWriter batchWriter = new JDBCBatchWriter(conn, stmt, scm, batchSize, collector);
+ do {
+ batchWriter.append(record);
+ } while ((record = lineReceiver.getFromReader()) != null);
+ batchWriter.flush();
+ return batchWriter.getCount();
+ }
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java
new file mode 100644
index 00000000..53ab9bb9
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java
@@ -0,0 +1,244 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.TaskPluginCollector;
+import com.taosdata.jdbc.TSDBPreparedStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * 使用JDBC原生写入接口批量写入。
+ * 有两个限制条件导致批量写入的代码逻辑过于复杂,以至于需要开发新的类来封装。
+ * 1. 用户必须提前把需要批量写入的数据搜集到ArrayList中
+ * 2. 每批写入的表名必须相同。
+ * 这个类的实现逻辑是:
+ * 1. 先把属于同一子表的Record缓存起来
+ * 2. 缓存的数量达到batchSize阈值,自动执行一次批量写入
+ * 3. 最后一批数据需要用户手动flush才能写入
+ */
+public class JDBCBatchWriter {
+ public static final Logger LOG = LoggerFactory.getLogger(JDBCBatchWriter.class);
+ private TSDBPreparedStatement stmt;
+ private SchemaManager scm;
+ private Connection conn;
+ private int batchSize;
+ private TaskPluginCollector collector;
+
+ // 缓存Record, key为tableName
+ Map> buf = new HashMap<>();
+ // 缓存表的标签值, key为tableName
+ Map tableTagValues = new HashMap<>();
+ private long sucCount = 0;
+ private final int tsColIndex;
+ private List fieldList;
+ // 每个record至少应该包含的列数,用于校验数据
+ private int minColNum = 0;
+ private Map fieldIndexMap;
+ private List fieldTypes = null;
+
+ public JDBCBatchWriter(Connection conn, TSDBPreparedStatement stmt, SchemaManager scm, int batchSize, TaskPluginCollector collector) {
+ this.conn = conn;
+ this.stmt = stmt;
+ this.scm = scm;
+ this.batchSize = batchSize;
+ this.collector = collector;
+ this.tsColIndex = scm.getTsColIndex();
+ this.fieldList = scm.getFieldList();
+ this.fieldIndexMap = scm.getFieldIndexMap();
+ this.minColNum = 1 + fieldList.size() + scm.getDynamicTagCount();
+
+ }
+
+ public void initFiledTypesAndTargetTable(List records) throws SQLException {
+ if (fieldTypes != null) {
+ return;
+ }
+ guessFieldTypes(records);
+ if (scm.shouldCreateTable()) {
+ scm.createSTable(conn, fieldTypes);
+ }
+ }
+
+ public void append(Record record) throws SQLException {
+ int columnNum = record.getColumnNumber();
+ if (columnNum < minColNum) {
+ // 实际列数小于期望列数
+ collector.collectDirtyRecord(record, Msg.get("column_number_error"));
+ return;
+ }
+ String[] tagValues = scm.getTagValuesFromRecord(record);
+ if (tagValues == null) {
+ // 标签列包含null
+ collector.collectDirtyRecord(record, Msg.get("tag_value_error"));
+ return;
+ }
+ if (!scm.hasTimestamp(record)) {
+ // 时间戳列为null或类型错误
+ collector.collectDirtyRecord(record, Msg.get("ts_value_error"));
+ return;
+ }
+ String tableName = scm.computeTableName(tagValues);
+ if (buf.containsKey(tableName)) {
+ List lis = buf.get(tableName);
+ lis.add(record);
+ if (lis.size() == batchSize) {
+ if (fieldTypes == null) {
+ initFiledTypesAndTargetTable(lis);
+ }
+ executeBatch(tableName);
+ lis.clear();
+ }
+ } else {
+ List lis = new ArrayList<>(batchSize);
+ lis.add(record);
+ buf.put(tableName, lis);
+ tableTagValues.put(tableName, tagValues);
+ }
+ }
+
+ /**
+ * 只有String类型比较特别,测试发现值为null的列会转成String类型。所以Column的类型为String并不代表这一列的类型真的是String。
+ *
+ * @param records
+ */
+ private void guessFieldTypes(List records) {
+ fieldTypes = new ArrayList<>(fieldList.size());
+ for (int i = 0; i < fieldList.size(); ++i) {
+ int colIndex = fieldIndexMap.get(fieldList.get(i));
+ boolean ok = false;
+ for (int j = 0; j < records.size() && !ok; ++j) {
+ Column column = records.get(j).getColumn(colIndex);
+ Column.Type type = column.getType();
+ switch (type) {
+ case LONG:
+ case DOUBLE:
+ case DATE:
+ case BOOL:
+ case BYTES:
+ if (column.getRawData() != null) {
+ fieldTypes.add(type);
+ ok = true;
+ }
+ break;
+ case STRING:
+ // 只有非null且非空的String列,才会被真的当作String类型。
+ String value = column.asString();
+ if (value != null && !"".equals(value)) {
+ fieldTypes.add(type);
+ ok = true;
+ }
+ break;
+ default:
+ throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString());
+ }
+ }
+ if (!ok) {
+ // 根据采样的%d条数据,无法推断第%d列的数据类型
+ throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, String.format(Msg.get("infer_column_type_error"), records.size(), i + 1));
+ }
+ }
+ LOG.info("Field Types: {}", fieldTypes);
+ }
+
+ /**
+ * 执行单表批量写入
+ *
+ * @param tableName
+ * @throws SQLException
+ */
+ private void executeBatch(String tableName) throws SQLException {
+ // 表名
+ stmt.setTableName(tableName);
+ List records = buf.get(tableName);
+ // 标签
+ String[] tagValues = tableTagValues.get(tableName);
+ LOG.debug("executeBatch {}", String.join(",", tagValues));
+ for (int i = 0; i < tagValues.length; ++i) {
+ stmt.setTagNString(i, tagValues[i]);
+ }
+ // 时间戳
+ ArrayList tsList = records.stream().map(r -> r.getColumn(tsColIndex).asDate().getTime()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setTimestamp(0, tsList);
+ // 字段
+ for (int i = 0; i < fieldList.size(); ) {
+ String fieldName = fieldList.get(i);
+ int index = fieldIndexMap.get(fieldName);
+ switch (fieldTypes.get(i)) {
+ case LONG:
+ ArrayList lisLong = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setLong(++i, lisLong);
+ break;
+ case DOUBLE:
+ ArrayList lisDouble = records.stream().map(r -> r.getColumn(index).asDouble()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setDouble(++i, lisDouble);
+ break;
+ case STRING:
+ ArrayList lisString = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setNString(++i, lisString, 64);
+ break;
+ case DATE:
+ ArrayList lisTs = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setTimestamp(++i, lisTs);
+ break;
+ case BOOL:
+ ArrayList lisBool = records.stream().map(r -> r.getColumn(index).asBoolean()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setBoolean(++i, lisBool);
+ break;
+ case BYTES:
+ ArrayList lisBytes = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setString(++i, lisBytes, 64);
+ break;
+ default:
+ throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString());
+ }
+ }
+ // 执行
+ stmt.columnDataAddBatch();
+ stmt.columnDataExecuteBatch();
+ // 更新计数器
+ sucCount += records.size();
+ }
+
+ /**
+ * 把缓存的Record全部写入
+ */
+ public void flush() throws SQLException {
+ if (fieldTypes == null) {
+ List records = new ArrayList<>();
+ for (List lis : buf.values()) {
+ records.addAll(lis);
+ if (records.size() > 100) {
+ break;
+ }
+ }
+ if (records.size() > 0) {
+ initFiledTypesAndTargetTable(records);
+ } else {
+ return;
+ }
+ }
+ for (String tabName : buf.keySet()) {
+ if (buf.get(tabName).size() > 0) {
+ executeBatch(tabName);
+ }
+ }
+ stmt.columnDataCloseBatch();
+ }
+
+ /**
+ * @return 成功写入的数据量
+ */
+ public long getCount() {
+ return sucCount;
+ }
+}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnection.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnection.java
new file mode 100644
index 00000000..0aabe32a
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnection.java
@@ -0,0 +1,89 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import java.util.Properties;
+
+public class JniConnection {
+
+ private static final long JNI_NULL_POINTER = 0L;
+ private static final int JNI_SUCCESSFUL = 0;
+ public static final String PROPERTY_KEY_CONFIG_DIR = "cfgdir";
+ public static final String PROPERTY_KEY_LOCALE = "locale";
+ public static final String PROPERTY_KEY_CHARSET = "charset";
+ public static final String PROPERTY_KEY_TIME_ZONE = "timezone";
+
+ private long conn;
+
+ static {
+ System.loadLibrary("taos");
+ }
+
+ public JniConnection(Properties props) throws Exception {
+ initImp(props.getProperty(PROPERTY_KEY_CONFIG_DIR, null));
+
+ String locale = props.getProperty(PROPERTY_KEY_LOCALE);
+ if (setOptions(0, locale) < 0) {
+ throw new Exception("Failed to set locale: " + locale + ". System default will be used.");
+ }
+ String charset = props.getProperty(PROPERTY_KEY_CHARSET);
+ if (setOptions(1, charset) < 0) {
+ throw new Exception("Failed to set charset: " + charset + ". System default will be used.");
+ }
+ String timezone = props.getProperty(PROPERTY_KEY_TIME_ZONE);
+ if (setOptions(2, timezone) < 0) {
+ throw new Exception("Failed to set timezone: " + timezone + ". System default will be used.");
+ }
+ }
+
+ public void open(String host, int port, String dbname, String user, String password) throws Exception {
+ if (this.conn != JNI_NULL_POINTER) {
+ close();
+ this.conn = JNI_NULL_POINTER;
+ }
+
+ this.conn = connectImp(host, port, dbname, user, password);
+ if (this.conn == JNI_NULL_POINTER) {
+ String errMsg = getErrMsgImp(0);
+ throw new Exception(errMsg);
+ }
+ }
+
+ public void insertOpentsdbJson(String json) throws Exception {
+ if (this.conn == JNI_NULL_POINTER) {
+ throw new Exception("JNI connection is NULL");
+ }
+
+ long result = insertOpentsdbJson(json, this.conn);
+ int errCode = getErrCodeImp(this.conn, result);
+ if (errCode != JNI_SUCCESSFUL) {
+ String errMsg = getErrMsgImp(result);
+ freeResultSetImp(this.conn, result);
+ throw new Exception(errMsg);
+ }
+ freeResultSetImp(this.conn, result);
+ }
+
+ public void close() throws Exception {
+ int code = this.closeConnectionImp(this.conn);
+ if (code != 0) {
+ throw new Exception("JNI closeConnection failed");
+ }
+ this.conn = JNI_NULL_POINTER;
+ }
+
+ private static native void initImp(String configDir);
+
+ private static native int setOptions(int optionIndex, String optionValue);
+
+ private native long connectImp(String host, int port, String dbName, String user, String password);
+
+ private native int getErrCodeImp(long connection, long pSql);
+
+ private native String getErrMsgImp(long pSql);
+
+ private native void freeResultSetImp(long connection, long pSql);
+
+ private native int closeConnectionImp(long connection);
+
+ private native long insertOpentsdbJson(String json, long pSql);
+
+}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java
new file mode 100644
index 00000000..090a7999
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java
@@ -0,0 +1,14 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+public class Key {
+ public static final String HOST = "host";
+ public static final String PORT = "port";
+ public static final String DBNAME = "dbname";
+ public static final String USER = "user";
+ public static final String PASSWORD = "password";
+ public static final String BATCH_SIZE = "batchSize";
+ public static final String STABLE = "stable";
+ public static final String TAG_COLUMN = "tagColumn";
+ public static final String FIELD_COLUMN = "fieldColumn";
+ public static final String TIMESTAMP_COLUMN = "timestampColumn";
+}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Msg.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Msg.java
new file mode 100644
index 00000000..89730d35
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Msg.java
@@ -0,0 +1,20 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+/**
+ * i18n message util
+ */
+public class Msg {
+ private static ResourceBundle bundle;
+
+ static {
+ bundle = ResourceBundle.getBundle("tdenginewritermsg", Locale.getDefault());
+ }
+
+ public static String get(String key) {
+ return bundle.getString(key);
+ }
+
+}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java
new file mode 100644
index 00000000..e1b8f5dd
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java
@@ -0,0 +1,99 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.plugin.TaskPluginCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+public class OpentsdbDataHandler implements DataHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(OpentsdbDataHandler.class);
+ private static final String DEFAULT_BATCH_SIZE = "1";
+
+ @Override
+ public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) {
+ // opentsdb json protocol use JNI and schemaless API to write
+ String host = properties.getProperty(Key.HOST);
+ int port = Integer.parseInt(properties.getProperty(Key.PORT));
+ String dbname = properties.getProperty(Key.DBNAME);
+ String user = properties.getProperty(Key.USER);
+ String password = properties.getProperty(Key.PASSWORD);
+
+ JniConnection conn = null;
+ long count = 0;
+ try {
+ conn = new JniConnection(properties);
+ conn.open(host, port, dbname, user, password);
+ LOG.info("TDengine connection established, host: " + host + ", port: " + port + ", dbname: " + dbname + ", user: " + user);
+ int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, DEFAULT_BATCH_SIZE));
+ count = writeOpentsdb(lineReceiver, conn, batchSize);
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ e.printStackTrace();
+ } finally {
+ try {
+ if (conn != null)
+ conn.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ LOG.info("TDengine connection closed");
+ }
+
+ return count;
+ }
+
+ private long writeOpentsdb(RecordReceiver lineReceiver, JniConnection conn, int batchSize) {
+ long recordIndex = 1;
+ try {
+ Record record;
+ StringBuilder sb = new StringBuilder();
+ while ((record = lineReceiver.getFromReader()) != null) {
+ if (batchSize == 1) {
+ String jsonData = recordToString(record);
+ LOG.debug(">>> " + jsonData);
+ conn.insertOpentsdbJson(jsonData);
+ } else if (recordIndex % batchSize == 1) {
+ sb.append("[").append(recordToString(record)).append(",");
+ } else if (recordIndex % batchSize == 0) {
+ sb.append(recordToString(record)).append("]");
+ String jsonData = sb.toString();
+ LOG.debug(">>> " + jsonData);
+ conn.insertOpentsdbJson(jsonData);
+ sb.delete(0, sb.length());
+ } else {
+ sb.append(recordToString(record)).append(",");
+ }
+ recordIndex++;
+ }
+ if (sb.length() != 0 && sb.charAt(0) == '[') {
+ String jsonData = sb.deleteCharAt(sb.length() - 1).append("]").toString();
+ LOG.debug(">>> " + jsonData);
+ conn.insertOpentsdbJson(jsonData);
+ }
+ } catch (Exception e) {
+ LOG.error("TDengineWriter ERROR: " + e.getMessage());
+ throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e);
+ }
+ return recordIndex - 1;
+ }
+
+ private String recordToString(Record record) {
+ int recordLength = record.getColumnNumber();
+ if (0 == recordLength) {
+ return "";
+ }
+ Column column;
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < recordLength; i++) {
+ column = record.getColumn(i);
+ sb.append(column.asString()).append("\t");
+ }
+ sb.setLength(sb.length() - 1);
+ return sb.toString();
+ }
+}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java
new file mode 100644
index 00000000..d67a6585
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java
@@ -0,0 +1,271 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class SchemaManager {
+ private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class);
+
+ private String stable; // 目标超表名
+ private Map fixedTagValue = new HashMap<>(); // 固定标签值 标签名 -> 标签值
+ private Map tagIndexMap = new HashMap<>(); // 动态标签值 标签名 -> 列索引
+ private Map fieldIndexMap = new HashMap<>(); // 字段名 -> 字段索引
+ private String tsColName; // 时间戳列名
+ private int tsColIndex = -1; // 时间戳列索引
+ private List fieldList = new ArrayList<>();
+ private List tagList = new ArrayList<>();
+ private boolean canInferSchemaFromConfig = false;
+
+
+ public SchemaManager() {
+ }
+
+ public SchemaManager(Properties properties) {
+ getFromConfig(properties);
+ }
+
+ private String mapDataxType(Column.Type type) {
+ switch (type) {
+ case LONG:
+ return "BIGINT";
+ case DOUBLE:
+ return "DOUBLE";
+ case STRING:
+ return "NCHAR(64)";
+ case DATE:
+ return "TIMESTAMP";
+ case BOOL:
+ return "BOOL";
+ case BYTES:
+ return "BINARY(64)";
+ default:
+ throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, type.toString());
+ }
+ }
+
+ public void setStable(String stable) {
+ stable = stable;
+ }
+
+ public String getStable() {
+ return stable;
+ }
+
+ private void getFromConfig(Properties properties) {
+ stable = properties.getProperty(Key.STABLE);
+ if (stable == null) {
+ LOG.error("Config error: no stable");
+ return;
+ }
+ for (Object key : properties.keySet()) {
+ String k = (String) key;
+ String v = properties.getProperty(k);
+
+ String[] ps = k.split("\\.");
+ if (ps.length == 1) {
+ continue;
+ }
+ if (k.startsWith(Key.TAG_COLUMN)) {
+ String tagName = ps[1];
+ try {
+ Integer tagIndex = Integer.parseInt(v);
+ this.tagIndexMap.put(tagName, tagIndex);
+ tagList.add(tagName);
+ } catch (NumberFormatException e) {
+ fixedTagValue.put(tagName, v);
+ tagList.add(tagName);
+ }
+ } else if (k.startsWith(Key.FIELD_COLUMN)) {
+ String fieldName = ps[1];
+ Integer fileIndex = Integer.parseInt(v);
+ fieldIndexMap.put(fieldName, fileIndex);
+ } else if (k.startsWith(Key.TIMESTAMP_COLUMN)) {
+ tsColName = ps[1];
+ tsColIndex = Integer.parseInt(v);
+ }
+ }
+ List sortedFieldName = fieldIndexMap.entrySet().stream().sorted((x, y) -> x.getValue().compareTo(y.getValue())).map(e -> e.getKey()).collect(Collectors.toList());
+ fieldList.addAll(sortedFieldName); // 排序的目的是保证自动建表时列的顺序和输入数据的列的顺序保持一致
+ canInferSchemaFromConfig = tsColIndex > -1 && !(fixedTagValue.isEmpty() && tagIndexMap.isEmpty()) && !fieldIndexMap.isEmpty();
+ LOG.info("Config file parsed result:fixedTags=[{}] ,tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", fixedTagValue.keySet()), String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex);
+ }
+
+ public boolean shouldGuessSchema() {
+ return !canInferSchemaFromConfig;
+ }
+
+ public boolean shouldCreateTable() {
+ return canInferSchemaFromConfig;
+ }
+
+ public boolean configValid() {
+ boolean valid = (tagList.size() > 0 && fieldList.size() > 0 && tsColIndex > -1) || (tagList.size() == 0 && fieldList.size() == 0 && tsColIndex == -1);
+ if (!valid) {
+ LOG.error("Config error: tagColumn, fieldColumn and timestampColumn must be present together or absent together.");
+ }
+ return valid;
+ }
+
+ /**
+ * 通过执行`describe dbname.stable`命令,获取表的schema.
+ * describe命名返回有4列内容,分布是:Field,Type,Length,Note
+ *
+ * @return 成功返回true,如果超表不存在或其他错误则返回false
+ */
+ public boolean getFromDB(Connection conn) {
+ try {
+ List stables = getSTables(conn);
+ if (!stables.contains(stable)) {
+ LOG.error("super table {} not exist, fail to get schema from database.", stable);
+ return false;
+ }
+ } catch (SQLException e) {
+ LOG.error(e.getMessage());
+ e.printStackTrace();
+ return false;
+ }
+ try (Statement stmt = conn.createStatement()) {
+ ResultSet rs = stmt.executeQuery("describe " + stable);
+ int colIndex = 0;
+ while (rs.next()) {
+ String name = rs.getString(1);
+ String type = rs.getString(2);
+ String note = rs.getString(4);
+ if ("TIMESTAMP".equals(type)) {
+ tsColName = name;
+ tsColIndex = colIndex;
+ } else if ("TAG".equals(note)) {
+ tagIndexMap.put(name, colIndex);
+ tagList.add(name);
+ } else {
+ fieldIndexMap.put(name, colIndex);
+ fieldList.add(name);
+ }
+ colIndex++;
+ }
+ LOG.info("table info:tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex);
+ return true;
+ } catch (SQLException e) {
+ LOG.error(e.getMessage());
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ public static List getSTables(Connection conn) throws SQLException {
+ List stables = new ArrayList<>();
+ try (Statement stmt = conn.createStatement()) {
+ ResultSet rs = stmt.executeQuery("show stables");
+ while (rs.next()) {
+ String name = rs.getString(1);
+ stables.add(name);
+ }
+ }
+ return stables;
+ }
+
+ public void createSTable(Connection conn, List fieldTypes) throws SQLException {
+ StringBuilder sb = new StringBuilder();
+ sb.append("CREATE STABLE IF NOT EXISTS ").append(stable).append("(");
+ sb.append(tsColName).append(" ").append("TIMESTAMP,");
+ for (int i = 0; i < fieldList.size(); ++i) {
+ String fieldName = fieldList.get(i);
+ Column.Type dxType = fieldTypes.get(i);
+ sb.append(fieldName).append(' ');
+ String tdType = mapDataxType(dxType);
+ sb.append(tdType).append(',');
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ sb.append(") TAGS(");
+ for (String tagName : tagList) {
+ sb.append(tagName).append(" NCHAR(64),");
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ sb.append(")");
+ String q = sb.toString();
+ LOG.info("run sql:" + q);
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute(q);
+ }
+ }
+
+ public String[] getTagValuesFromRecord(Record record) {
+ String[] tagValues = new String[tagList.size()];
+ for (int i = 0; i < tagList.size(); ++i) {
+ if (fixedTagValue.containsKey(tagList.get(i))) {
+ tagValues[i] = fixedTagValue.get(tagList.get(i));
+ } else {
+ int tagIndex = tagIndexMap.get(tagList.get(i));
+ tagValues[i] = record.getColumn(tagIndex).asString();
+ }
+ if (tagValues[i] == null) {
+ return null;
+ }
+ }
+ return tagValues;
+ }
+
+ public boolean hasTimestamp(Record record) {
+ Column column = record.getColumn(tsColIndex);
+ if (column.getType() == Column.Type.DATE && column.asDate() != null) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public Map getFieldIndexMap() {
+ return fieldIndexMap;
+ }
+
+ public List getFieldList() {
+ return fieldList;
+ }
+
+ public String getJoinedFieldNames() {
+ return tsColName + ", " + String.join(", ", fieldList);
+ }
+
+ public int getTsColIndex() {
+ return tsColIndex;
+ }
+
+ public String getTagValuesPlaceHolder() {
+ return tagList.stream().map(x -> "?").collect(Collectors.joining(","));
+ }
+
+ public String getFieldValuesPlaceHolder() {
+ return "?, " + fieldList.stream().map(x -> "?").collect(Collectors.joining(", "));
+ }
+
+ /**
+ * 计算子表表名
+ *
+ * - 将标签的value 组合成为如下的字符串: tag_value1!tag_value2!tag_value3。
+ * - 计算该字符串的 MD5 散列值 "md5_val"。
+ * - "t_md5val"作为子表名。其中的 "t" 是固定的前缀。
+ *
+ *
+ * @param tagValues
+ * @return
+ */
+ public String computeTableName(String[] tagValues) {
+ String s = String.join("!", tagValues);
+ return "t_" + DigestUtils.md5Hex(s);
+ }
+
+ public int getDynamicTagCount() {
+ return tagIndexMap.size();
+ }
+}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java
new file mode 100644
index 00000000..79e5238c
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java
@@ -0,0 +1,91 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.spi.Writer;
+import com.alibaba.datax.common.util.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+public class TDengineWriter extends Writer {
+
+ private static final String PEER_PLUGIN_NAME = "peerPluginName";
+
+ static {
+ try {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static class Job extends Writer.Job {
+
+ private Configuration originalConfig;
+
+ @Override
+ public void init() {
+ this.originalConfig = super.getPluginJobConf();
+ this.originalConfig.set(PEER_PLUGIN_NAME, getPeerPluginName());
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+
+ @Override
+ public List split(int mandatoryNumber) {
+ List writerSplitConfigs = new ArrayList<>();
+ for (int i = 0; i < mandatoryNumber; i++) {
+ writerSplitConfigs.add(this.originalConfig);
+ }
+ return writerSplitConfigs;
+ }
+ }
+
+ public static class Task extends Writer.Task {
+ private static final Logger LOG = LoggerFactory.getLogger(Job.class);
+
+ private Configuration writerSliceConfig;
+
+ @Override
+ public void init() {
+ this.writerSliceConfig = getPluginJobConf();
+
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+
+ @Override
+ public void startWrite(RecordReceiver lineReceiver) {
+ Set keys = this.writerSliceConfig.getKeys();
+ Properties properties = new Properties();
+ for (String key : keys) {
+ String value = this.writerSliceConfig.getString(key);
+ properties.setProperty(key, value);
+ }
+ if (!keys.contains(Key.USER)) {
+ properties.setProperty(Key.USER, "root");
+ }
+ if (!keys.contains(Key.PASSWORD)) {
+ properties.setProperty(Key.PASSWORD, "taosdata");
+ }
+ LOG.debug("========================properties==========================\n" + properties);
+ String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME);
+ LOG.debug("start to handle record from: " + peerPluginName);
+ DataHandler handler = DataHandlerFactory.build(peerPluginName);
+ long records = handler.handle(lineReceiver, properties, getTaskPluginCollector());
+ LOG.debug("handle data finished, records: " + records);
+ }
+
+ }
+}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java
new file mode 100644
index 00000000..994f1e89
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java
@@ -0,0 +1,32 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import com.alibaba.datax.common.spi.ErrorCode;
+
+public enum TDengineWriterErrorCode implements ErrorCode {
+ RUNTIME_EXCEPTION("TDengineWriter-00", "运行时异常"),
+ TYPE_ERROR("TDengineWriter-00", "Datax类型无法正确映射到TDengine类型");
+
+ private final String code;
+ private final String description;
+
+ private TDengineWriterErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Code:[%s], Description:[%s]. ", this.code,
+ this.description);
+ }
+}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/com_alibaba_datax_plugin_writer_tdenginewriter_JniConnection.h b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/com_alibaba_datax_plugin_writer_tdenginewriter_JniConnection.h
new file mode 100644
index 00000000..4bdf3639
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/com_alibaba_datax_plugin_writer_tdenginewriter_JniConnection.h
@@ -0,0 +1,105 @@
+/* DO NOT EDIT THIS FILE - it is machine generated */
+#include
+/* Header for class com_alibaba_datax_plugin_writer_JniConnection */
+
+#ifndef _Included_com_alibaba_datax_plugin_writer_JniConnection
+#define _Included_com_alibaba_datax_plugin_writer_JniConnection
+#ifdef __cplusplus
+extern "C" {
+#endif
+#undef com_alibaba_datax_plugin_writer_JniConnection_JNI_NULL_POINTER
+#define com_alibaba_datax_plugin_writer_JniConnection_JNI_NULL_POINTER 0LL
+#undef com_alibaba_datax_plugin_writer_JniConnection_JNI_SUCCESSFUL
+#define com_alibaba_datax_plugin_writer_JniConnection_JNI_SUCCESSFUL 0L
+/*
+ * Class: com_alibaba_datax_plugin_writer_JniConnection
+ * Method: initImp
+ * Signature: (Ljava/lang/String;)V
+ */
+JNIEXPORT void JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_initImp
+ (JNIEnv *, jclass, jstring);
+
+/*
+ * Class: com_alibaba_datax_plugin_writer_JniConnection
+ * Method: setOptions
+ * Signature: (ILjava/lang/String;)I
+ */
+JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_setOptions
+ (JNIEnv *, jclass, jint, jstring);
+
+/*
+ * Class: com_alibaba_datax_plugin_writer_JniConnection
+ * Method: getTsCharset
+ * Signature: ()Ljava/lang/String;
+ */
+JNIEXPORT jstring JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getTsCharset
+ (JNIEnv *, jclass);
+
+/*
+ * Class: com_alibaba_datax_plugin_writer_JniConnection
+ * Method: connectImp
+ * Signature: (Ljava/lang/String;ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;)J
+ */
+JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_connectImp
+ (JNIEnv *, jobject, jstring, jint, jstring, jstring, jstring);
+
+/*
+ * Class: com_alibaba_datax_plugin_writer_JniConnection
+ * Method: executeQueryImp
+ * Signature: ([BJ)J
+ */
+JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_executeQueryImp
+ (JNIEnv *, jobject, jbyteArray, jlong);
+
+/*
+ * Class: com_alibaba_datax_plugin_writer_JniConnection
+ * Method: getErrCodeImp
+ * Signature: (JJ)I
+ */
+JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrCodeImp
+ (JNIEnv *, jobject, jlong, jlong);
+
+/*
+ * Class: com_alibaba_datax_plugin_writer_JniConnection
+ * Method: getErrMsgImp
+ * Signature: (J)Ljava/lang/String;
+ */
+JNIEXPORT jstring JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrMsgImp
+ (JNIEnv *, jobject, jlong);
+
+/*
+ * Class: com_alibaba_datax_plugin_writer_JniConnection
+ * Method: getErrMsgByCode
+ * Signature: (J)Ljava/lang/String;
+ */
+JNIEXPORT jstring JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrMsgByCode
+ (JNIEnv *, jobject, jlong);
+
+/*
+ * Class: com_alibaba_datax_plugin_writer_JniConnection
+ * Method: getAffectedRowsImp
+ * Signature: (JJ)I
+ */
+JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getAffectedRowsImp
+ (JNIEnv *, jobject, jlong, jlong);
+
+/*
+ * Class: com_alibaba_datax_plugin_writer_JniConnection
+ * Method: closeConnectionImp
+ * Signature: (J)I
+ */
+JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_closeConnectionImp
+ (JNIEnv *, jobject, jlong);
+
+/*
+ * Class: com_alibaba_datax_plugin_writer_JniConnection
+ * Method: insertOpentsdbJson
+ * Signature: (Ljava/lang/String;J)J
+ */
+JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_insertOpentsdbJson
+ (JNIEnv *, jobject, jstring, jlong);
+
+#ifdef __cplusplus
+}
+#endif
+#endif
diff --git a/tdenginewriter/src/main/resources/plugin.json b/tdenginewriter/src/main/resources/plugin.json
new file mode 100644
index 00000000..e54f65ff
--- /dev/null
+++ b/tdenginewriter/src/main/resources/plugin.json
@@ -0,0 +1,9 @@
+{
+ "name": "tdenginewriter",
+ "class": "com.alibaba.datax.plugin.writer.tdenginewriter.TDengineWriter",
+ "description": {
+ "useScene": "data migration to tdengine",
+ "mechanism": "use JNI or taos-jdbc to write data to tdengine."
+ },
+ "developer": "zyyang-taosdata"
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/main/resources/plugin_job_template.json b/tdenginewriter/src/main/resources/plugin_job_template.json
new file mode 100644
index 00000000..5482b26e
--- /dev/null
+++ b/tdenginewriter/src/main/resources/plugin_job_template.json
@@ -0,0 +1,10 @@
+{
+ "name": "tdenginewriter",
+ "parameter": {
+ "host": "",
+ "port": 6030,
+ "db": "",
+ "user": "",
+ "password": ""
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/main/resources/tdenginewritermsg.properties b/tdenginewriter/src/main/resources/tdenginewritermsg.properties
new file mode 100644
index 00000000..4aaa220b
--- /dev/null
+++ b/tdenginewriter/src/main/resources/tdenginewritermsg.properties
@@ -0,0 +1,6 @@
+try_get_schema_fromdb=fail to get structure info of target table from configure file and will try to get it from database
+batch_size_too_small='batchSize' is too small, please increase it and try again
+column_number_error=number of columns is less than expected
+tag_value_error=tag columns include 'null' value
+ts_value_error=timestamp column type error or null
+infer_column_type_error=fail to infer column type: sample count %d, column index %d
\ No newline at end of file
diff --git a/tdenginewriter/src/main/resources/tdenginewritermsg_en_US.properties b/tdenginewriter/src/main/resources/tdenginewritermsg_en_US.properties
new file mode 100644
index 00000000..4aaa220b
--- /dev/null
+++ b/tdenginewriter/src/main/resources/tdenginewritermsg_en_US.properties
@@ -0,0 +1,6 @@
+try_get_schema_fromdb=fail to get structure info of target table from configure file and will try to get it from database
+batch_size_too_small='batchSize' is too small, please increase it and try again
+column_number_error=number of columns is less than expected
+tag_value_error=tag columns include 'null' value
+ts_value_error=timestamp column type error or null
+infer_column_type_error=fail to infer column type: sample count %d, column index %d
\ No newline at end of file
diff --git a/tdenginewriter/src/main/resources/tdenginewritermsg_zh_CN.properties b/tdenginewriter/src/main/resources/tdenginewritermsg_zh_CN.properties
new file mode 100644
index 00000000..4b9552fd
--- /dev/null
+++ b/tdenginewriter/src/main/resources/tdenginewritermsg_zh_CN.properties
@@ -0,0 +1,6 @@
+try_get_schema_fromdb=\u65e0\u6cd5\u4ece\u914d\u7f6e\u6587\u4ef6\u83b7\u53d6\u8868\u7ed3\u6784\u4fe1\u606f\uff0c\u5c1d\u8bd5\u4ece\u6570\u636e\u5e93\u83b7\u53d6
+batch_size_too_small=batchSize\u592a\u5c0f\uff0c\u4f1a\u589e\u52a0\u81ea\u52a8\u7c7b\u578b\u63a8\u65ad\u9519\u8bef\u7684\u6982\u7387\uff0c\u5efa\u8bae\u6539\u5927\u540e\u91cd\u8bd5
+column_number_error=\u5b9e\u9645\u5217\u6570\u5c0f\u4e8e\u671f\u671b\u5217\u6570
+tag_value_error=\u6807\u7b7e\u5217\u5305\u542bnull
+ts_value_error=\u65f6\u95f4\u6233\u5217\u4e3anull\u6216\u7c7b\u578b\u9519\u8bef
+infer_column_type_error=\u6839\u636e\u91c7\u6837\u7684%d\u6761\u6570\u636e\uff0c\u65e0\u6cd5\u63a8\u65ad\u7b2c%d\u5217\u7684\u6570\u636e\u7c7b\u578b
diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnectionTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnectionTest.java
new file mode 100644
index 00000000..09c3df26
--- /dev/null
+++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnectionTest.java
@@ -0,0 +1,21 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import org.junit.Test;
+
+import java.util.Properties;
+
+public class JniConnectionTest {
+
+ @Test
+ public void test() throws Exception {
+ JniConnection connection = new JniConnection(new Properties());
+
+ connection.open("192.168.56.105", 6030, "test", "root", "taosdata");
+
+ String json = "{\"metric\":\"weather_temperature\",\"timestamp\":1609430400000,\"value\":123,\"tags\":{\"location\":\"beijing\",\"id\":\"t123\"}}";
+ connection.insertOpentsdbJson(json);
+
+ connection.close();
+ }
+
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/MessageTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/MessageTest.java
new file mode 100644
index 00000000..b1b7ddd8
--- /dev/null
+++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/MessageTest.java
@@ -0,0 +1,25 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import org.junit.Test;
+
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+import org.junit.Assert;
+
+public class MessageTest {
+ @Test
+ public void testChineseMessage() {
+ Locale local = new Locale("zh", "CN");
+ ResourceBundle bundle = ResourceBundle.getBundle("tdenginewritermsg", local);
+ String msg = bundle.getString("try_get_schema_fromdb");
+ Assert.assertEquals("无法从配置文件获取表结构信息,尝试从数据库获取", msg);
+ }
+
+ @Test
+ public void testDefaultMessage() {
+ ResourceBundle bundle = ResourceBundle.getBundle("tdenginewritermsg", Locale.getDefault());
+ String msg = bundle.getString("try_get_schema_fromdb");
+ System.out.println(msg);
+ }
+}
diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java
new file mode 100644
index 00000000..62bf7040
--- /dev/null
+++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java
@@ -0,0 +1,31 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class TDengineWriterTest {
+
+
+ @Test
+ public void testGetSchema() throws ClassNotFoundException, SQLException {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", "wozai.fun", "6030", "test", "root", "taosdata");
+ Connection conn = DriverManager.getConnection(jdbcUrl);
+ SchemaManager schemaManager = new SchemaManager();
+ schemaManager.setStable("test1");
+ schemaManager.getFromDB(conn);
+ }
+
+ @Test
+ public void dropTestTable() throws ClassNotFoundException, SQLException {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", "wozai.fun", "6030", "test", "root", "taosdata");
+ Connection conn = DriverManager.getConnection(jdbcUrl);
+ Statement stmt = conn.createStatement();
+ stmt.execute("drop table market_snapshot");
+ }
+}