diff --git a/core/src/main/job/mongodb2tdengine.json b/core/src/main/job/mongodb2tdengine.json
new file mode 100644
index 00000000..49e04c11
--- /dev/null
+++ b/core/src/main/job/mongodb2tdengine.json
@@ -0,0 +1,78 @@
+{
+ "job": {
+ "setting": {
+ "speed": {
+ "channel": 2
+ }
+ },
+ "content": [
+ {
+ "reader": {
+ "name": "mongodbreader",
+ "parameter": {
+ "address": [
+ "127.0.0.1:27017"
+ ],
+ "userName": "mongouser",
+ "mechanism": "SCRAM-SHA-1",
+ "userPassword": "mongopass",
+ "authDb": "admin",
+ "dbName": "test",
+ "collectionName": "cu_market_data",
+ "column": [
+ {
+ "name": "instrumentID",
+ "type": "string"
+ },
+ {
+ "name": "tradeTime",
+ "type": "date"
+ },
+ {
+ "name": "lastPrice",
+ "type": "double"
+ },
+ {
+ "name": "askPrice1",
+ "type": "double"
+ },
+ {
+ "name": "bidPrice1",
+ "type": "double"
+ },
+ {
+ "name": "volume",
+ "type": "int"
+ }
+ ]
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "host": "127.0.0.1",
+ "port": 6030,
+ "dbname": "test",
+ "user": "root",
+ "password": "taosdata",
+ "stable": "market_snapshot",
+ "batchSize": 35,
+ "tagColumn": {
+ "product": "cu",
+ "instrumentID": 0
+ },
+ "fieldColumn": {
+ "lastPrice": 2,
+ "askPrice1": 3,
+ "bidPrice1": 4,
+ "volume": 5
+ },
+ "timestampColumn": {
+ "tradeTime": 1
+ }
+ }
+ }
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git a/mongodbreader/doc/mongodbreader.md b/mongodbreader/doc/mongodbreader.md
index b61493e6..99d25731 100644
--- a/mongodbreader/doc/mongodbreader.md
+++ b/mongodbreader/doc/mongodbreader.md
@@ -127,6 +127,7 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据,通过主控的J
* address: MongoDB的数据地址信息,因为MonogDB可能是个集群,则ip端口信息需要以Json数组的形式给出。【必填】
* userName:MongoDB的用户名。【选填】
* userPassword: MongoDB的密码。【选填】
+* authDb: MongoDB认证数据库【选填】
* collectionName: MonogoDB的集合名。【必填】
* column:MongoDB的文档列名。【必填】
* name:Column的名字。【必填】
diff --git a/tdenginewriter/doc/tdenginewriter.md b/tdenginewriter/doc/tdenginewriter.md
index 8e55b189..432b1fb2 100644
--- a/tdenginewriter/doc/tdenginewriter.md
+++ b/tdenginewriter/doc/tdenginewriter.md
@@ -2,20 +2,21 @@
## 1 快速介绍
-TDengineWriter 插件实现了写入数据到 TDengine 的功能。 在底层实现上, TDengineWriter 通过 JNI的方式调用libtaos.so/tao.dll中的方法,连接 TDengine
-数据库实例,并执行schemaless的写入。 TDengineWriter 面向ETL开发工程师,他们使用 TDengineWriter 从数仓导入数据到 TDengine。同时,TDengineWriter
-亦可以作为数据迁移工具为DBA等用户提供服务。
+TDengineWriter插件实现了写入数据到TDengine数据库功能。可用于离线同步其它数据库的数据到TDengine。
## 2 实现原理
-TDengineWriter 通过 DataX 框架获取 Reader
-生成的协议数据,根据reader的类型解析数据,通过JNI方式调用libtaos.so(或taos.dll)中的方法,使用schemaless的方式写入到TDengine。
+TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据,根据reader的类型解析数据。目前有两种写入方式:
+
+1. 对于OpenTSDBReader, TDengineWriter通过JNI方式调用TDengine客户端库文件(taos.lib或taos.dll)中的方法,使用[schemaless的方式](https://www.taosdata.com/cn/documentation/insert#schemaless)写入。
+
+2. 对于其它数据源,会根据配置生成SQL语句, 通过[taos-jdbcdriver](https://www.taosdata.com/cn/documentation/connector/java)批量写入。
+
+这样区分的原因是OpenTSDBReader将opentsdb的数据统一读取为json字符串,Writer端接收到的数据只有1列。而其它Reader插件一般会把数据放在不同列。
## 3 功能说明
-
-### 3.1 配置样例
-
-* 这里使用一份从OpenTSDB产生到 TDengine 导入的数据。
+### 3.1 从OpenTSDB到TDengine
+#### 3.1.1 配置样例
```json
{
@@ -54,46 +55,189 @@ TDengineWriter 通过 DataX 框架获取 Reader
}
```
-### 3.2 参数说明
+#### 3.1.2 参数说明
-* **host**
- * 描述:TDengine实例的host。
+| 参数 | 描述 | 是否必选 | 默认值 |
+| --------- | -------------------- | -------- | -------- |
+| host | TDengine实例的host | 是 | 无 |
+| port | TDengine实例的port | 是 | 无 |
+| user | TDengine实例的用户名 | 否 | root |
+| password | TDengine实例的密码 | 否 | taosdata |
+| dbname | 目的数据库的名称 | 是 | 无 |
+| batchSize | 每次批量插入多少记录 | 否 | 1 |
- * 必选:是
- * 默认值:无
-* **port**
- * 描述:TDengine实例的port。
- * 必选:是
- * 默认值:无
-* **dbname**
- * 描述:目的数据库的名称。
+#### 3.1.3 类型转换
- * 必选:是
+目前,由于OpenTSDBReader将opentsdb的数据统一读取为json字符串,TDengineWriter 在做Opentsdb到TDengine的迁移时,按照以下类型进行处理:
- * 默认值:无
-* **username**
- * 描述:TDengine实例的用户名
- * 必选:是
- * 默认值:无
-* **password**
- * 描述:TDengine实例的密码
- * 必选:是
- * 默认值:无
-
-### 3.3 类型转换
-
-目前,由于opentsdbreader将opentsdb的数据统一读取为json字符串,TDengineWriter 在做Opentsdb到TDengine的迁移时,按照以下类型进行处理:
-
-| OpenTSDB数据类型 | DataX 内部类型| TDengine 数据类型 |
-| -------- | ----- | -------- |
+| OpenTSDB数据类型 | DataX 内部类型 | TDengine 数据类型 |
+| ---------------- | -------------- | ----------------- |
| timestamp | Date | timestamp |
| Integer(value) | Double | double |
-| Float(value) | Double | double |
-| String(value) | String | binary |
+| Float(value) | Double | double |
+| String(value) | String | binary |
| Integer(tag) | String | binary |
-| Float(tag) | String |binary |
-| String(tag) | String |binary |
+| Float(tag) | String | binary |
+| String(tag) | String | binary |
+
+### 3.2 从MongoDB到TDengine
+
+#### 3.2.1 配置样例
+```json
+{
+ "job": {
+ "setting": {
+ "speed": {
+ "channel": 2
+ }
+ },
+ "content": [
+ {
+ "reader": {
+ "name": "mongodbreader",
+ "parameter": {
+ "address": [
+ "127.0.0.1:27017"
+ ],
+ "userName": "user",
+ "mechanism": "SCRAM-SHA-1",
+ "userPassword": "password",
+ "authDb": "admin",
+ "dbName": "test",
+ "collectionName": "stock",
+ "column": [
+ {
+ "name": "stockID",
+ "type": "string"
+ },
+ {
+ "name": "tradeTime",
+ "type": "date"
+ },
+ {
+ "name": "lastPrice",
+ "type": "double"
+ },
+ {
+ "name": "askPrice1",
+ "type": "double"
+ },
+ {
+ "name": "bidPrice1",
+ "type": "double"
+ },
+ {
+ "name": "volume",
+ "type": "int"
+ }
+ ]
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "host": "localhost",
+ "port": 6030,
+ "dbname": "test",
+ "user": "root",
+ "password": "taosdata",
+ "stable": "stock",
+ "tagColumn": {
+ "industry": "energy",
+ "stockID": 0
+ },
+ "fieldColumn": {
+ "lastPrice": 2,
+ "askPrice1": 3,
+ "bidPrice1": 4,
+ "volume": 5
+ },
+ "timestampColumn": {
+ "tradeTime": 1
+ }
+ }
+ }
+ }
+ ]
+ }
+}
+```
+
+**注:本配置的writer部分同样适用于关系型数据库**
+
+
+#### 3.2.2 参数说明
+| 参数 | 描述 | 是否必选 | 默认值 | 备注 |
+| --------------- | -------------------- | ---------------- | -------- | ------------------ |
+| host | TDengine实例的host | 是 | 无 |
+| port | TDengine实例的port | 是 | 无 |
+| user | TDengine实例的用户名 | 否 | root |
+| password | TDengine实例的密码 | 否 | taosdata |
+| dbname | 目的数据库的名称 | 是 | 无 |
+| batchSize | 每次批量插入多少记录 | 否 | 1000 |
+| stable | 目标超级表的名称 | 是(OpenTSDB除外) | 无 |
+| tagColumn | 标签列的列名和位置 | 否 | 无 | 位置索引均从0开始 |
+| fieldColumn | 字段列的列名和位置 | 否 | 无 | |
+| timestampColumn | 时间戳列的列名和位置 | 否 | 无 | 时间戳列只能有一个 |
+
+#### 3.3.3 自动建表规则
+##### 3.3.3.1 超级表创建规则
+
+如果配置了tagColumn、 fieldColumn和timestampColumn将会在插入第一条数据前,自动创建超级表。
+数据列的类型从第1条记录自动推断, 标签列默认类型为`NCHAR(64)`, 比如示例配置,可能生成以下建表语句:
+
+```sql
+CREATE STABLE IF NOT EXISTS market_snapshot (
+ tadetime TIMESTAMP,
+ lastprice DOUBLE,
+ askprice1 DOUBLE,
+ bidprice1 DOUBLE,
+ volume INT
+)
+TAGS(
+ industry NCHAR(64),
+ stockID NCHAR(64)
+);
+```
+
+##### 3.3.3.2 子表创建规则
+
+子表结果与超表相同,子表表名生成规则:
+1. 将标签的value 组合成为如下的字符串: `tag_value1!tag_value2!tag_value3`。
+2. 计算该字符串的 MD5 散列值 "md5_val"。
+3. "t_md5val"作为子表名。其中的 "t" 是固定的前缀。
+
+#### 3.3.4 用户提前建表
+
+如果你已经创建好目标超级表,那么tagColumn、 fieldColumn和timestampColumn三个字段均可省略, 插件将通过执行通过`describe stableName`获取表结构的信息。
+此时要求接收到的Record中Column的顺序和执行`describe stableName`返回的列顺序相同, 比如通过`describe stableName`返回以下内容:
+```
+ Field | Type | Length | Note |
+=================================================================================
+ ts | TIMESTAMP | 8 | |
+ current | DOUBLE | 8 | |
+ location | BINARY | 10 | TAG |
+```
+那么插件收到的数据第1列必须代表时间戳,第2列必须代表电流,第3列必须代表位置。
+
+#### 3.3.5 注意事项
+
+1. tagColumn、 fieldColumn和timestampColumn三个字段用于描述目标表的结构信息,这三个配置字段必须同时存在或同时省略。
+2. 如果存在以上三个配置,且目标表也已经存在,则两者必须一致。**一致性**由用户自己保证,插件不做检查。不一致可能会导致插入失败或插入数据错乱。
+3. 插件优先使用配置文件中指定的表结构。
+
+#### 3.3.6 类型转换
+
+| MongoDB 数据类型 | DataX 内部类型 | TDengine 数据类型 |
+| ---------------- | -------------- | ----------------- |
+| int, Long | Long | BIGINT |
+| double | Double | DOUBLE |
+| string, array | String | NCHAR(64) |
+| date | Date | TIMESTAMP |
+| boolean | Boolean | BOOL |
+| bytes | Bytes | BINARY |
+
## 4 性能报告
@@ -127,13 +271,13 @@ TDengineWriter 通过 DataX 框架获取 Reader
#### 4.2.1 单表测试报告
-| 通道数| DataX速度(Rec/s)|DataX流量(MB/s)| DataX机器网卡流出流量(MB/s)|DataX机器运行负载|DB网卡进入流量(MB/s)|DB运行负载|DB TPS|
-|--------| --------|--------|--------|--------|--------|--------|--------|
-|1| | | | | | | |
-|4| | | | | | | |
-|8| | | | | | | |
-|16| | | | | | | |
-|32| | | | | | | |
+| 通道数 | DataX速度(Rec/s) | DataX流量(MB/s) | DataX机器网卡流出流量(MB/s) | DataX机器运行负载 | DB网卡进入流量(MB/s) | DB运行负载 | DB TPS |
+| ------ | ---------------- | --------------- | --------------------------- | ----------------- | -------------------- | ---------- | ------ |
+| 1 | | | | | | | |
+| 4 | | | | | | | |
+| 8 | | | | | | | |
+| 16 | | | | | | | |
+| 32 | | | | | | | |
说明:
@@ -143,9 +287,30 @@ TDengineWriter 通过 DataX 框架获取 Reader
#### 4.2.4 性能测试小结
-1.
-2.
## 5 约束限制
-## FAQ
\ No newline at end of file
+1. 本插件自动创建超级表时NCHAR类型的长度固定为64,对于包含长度大于64的字符串的数据源,将不支持。
+2. 标签列不能包含null值,如果包含会被过滤掉。
+
+## FAQ
+
+### 如何选取要同步的数据的范围?
+
+数据范围的选取在Reader插件端配置,对于不同的Reader插件配置方法往往不同。比如对于mysqlreader, 可以用sql语句指定数据范围。对于opentsdbreader, 用beginDateTime和endDateTime两个配置项指定数据范围。
+
+### 如何一次导入多张源表?
+
+如果Reader插件支持一次读多张表,Writer插件就能一次导入多张表。如果Reader不支持多多张表,可以建多个job,分别导入。Writer插件只负责写数据。
+
+### 一张源表导入之后对应TDengine中多少张表?
+
+这是由tagColumn决定的,如果所有tag列的值都相同,那么目标表只有一个。源表有多少不同的tag组合,目标超表就有多少子表。
+
+### 源表和目标表的字段顺序一致吗?
+
+TDengine要求每个表第一列是时间戳列,后边是普通字段,最后是标签列。如果源表不是这个顺序,插件在自动建表是自动调整。
+
+### 插件如何确定各列的数据类型?
+
+根据收到的第一批数据自动推断各列的类型。
\ No newline at end of file
diff --git a/tdenginewriter/pom.xml b/tdenginewriter/pom.xml
index d658d4a2..8eb94b33 100644
--- a/tdenginewriter/pom.xml
+++ b/tdenginewriter/pom.xml
@@ -19,6 +19,11 @@
+
+ com.taosdata.jdbc
+ taos-jdbcdriver
+ 2.0.34
+
com.alibaba.datax
datax-common
@@ -37,7 +42,11 @@
${junit-version}
test
-
+
+ org.apache.commons
+ commons-lang3
+ ${commons-lang3-version}
+
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DefaultDataHandler.java
deleted file mode 100644
index a1d52d75..00000000
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DefaultDataHandler.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package com.alibaba.datax.plugin.writer;
-
-import com.alibaba.datax.common.element.Column;
-import com.alibaba.datax.common.element.Record;
-import com.alibaba.datax.common.plugin.RecordReceiver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Properties;
-
-public class DefaultDataHandler implements DataHandler {
- private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
-
- @Override
- public long handle(RecordReceiver lineReceiver, Properties properties) {
- long count = 0;
- Record record;
- while ((record = lineReceiver.getFromReader()) != null) {
-
- int recordLength = record.getColumnNumber();
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < recordLength; i++) {
- Column column = record.getColumn(i);
- sb.append(column.asString()).append("\t");
- }
- sb.setLength(sb.length() - 1);
- LOG.debug(sb.toString());
-
- count++;
- }
- return count;
- }
-
-}
\ No newline at end of file
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java
similarity index 54%
rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandler.java
rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java
index 94d1db30..421c2fe4 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandler.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java
@@ -1,10 +1,11 @@
-package com.alibaba.datax.plugin.writer;
+package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.plugin.TaskPluginCollector;
import java.util.Properties;
public interface DataHandler {
- long handle(RecordReceiver lineReceiver, Properties properties);
+ long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector);
}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandlerFactory.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandlerFactory.java
similarity index 81%
rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandlerFactory.java
rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandlerFactory.java
index a488e7d5..1f740d7e 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandlerFactory.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandlerFactory.java
@@ -1,4 +1,4 @@
-package com.alibaba.datax.plugin.writer;
+package com.alibaba.datax.plugin.writer.tdenginewriter;
public class DataHandlerFactory {
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java
new file mode 100644
index 00000000..91c2b7e3
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java
@@ -0,0 +1,105 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.plugin.TaskPluginCollector;
+import com.taosdata.jdbc.TSDBPreparedStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * 默认DataHandler
+ */
+public class DefaultDataHandler implements DataHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
+
+ static {
+ try {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) {
+ SchemaManager schemaManager = new SchemaManager(properties);
+ if (!schemaManager.configValid()) {
+ return 0;
+ }
+
+ try {
+ Connection conn = getTaosConnection(properties);
+ if (conn == null) {
+ return 0;
+ }
+ if (schemaManager.shouldGuessSchema()) {
+ // 无法从配置文件获取表结构信息,尝试从数据库获取
+ LOG.info(Msg.get("try_get_schema_from_db"));
+ boolean success = schemaManager.getFromDB(conn);
+ if (!success) {
+ return 0;
+ }
+ } else {
+
+ }
+ int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, "1000"));
+ if (batchSize < 5) {
+ // batchSize太小,会增加自动类型推断错误的概率,建议改大后重试
+ LOG.error(Msg.get("batch_size_too_small"));
+ return 0;
+ }
+ return write(lineReceiver, conn, batchSize, schemaManager, collector);
+ } catch (Exception e) {
+ LOG.error("write failed " + e.getMessage());
+ e.printStackTrace();
+ }
+ return 0;
+ }
+
+
+ private Connection getTaosConnection(Properties properties) throws SQLException {
+ // 检查必要参数
+ String host = properties.getProperty(Key.HOST);
+ String port = properties.getProperty(Key.PORT);
+ String dbname = properties.getProperty(Key.DBNAME);
+ String user = properties.getProperty(Key.USER);
+ String password = properties.getProperty(Key.PASSWORD);
+ if (host == null || port == null || dbname == null || user == null || password == null) {
+ String keys = String.join(" ", Key.HOST, Key.PORT, Key.DBNAME, Key.USER, Key.PASSWORD);
+ LOG.error("Required options missing, please check: " + keys);
+ return null;
+ }
+ String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", host, port, dbname, user, password);
+ LOG.info("TDengine connection established, host:{} port:{} dbname:{} user:{}", host, port, dbname, user);
+ return DriverManager.getConnection(jdbcUrl);
+ }
+
+ /**
+ * 使用SQL批量写入
+ *
+ * @return 成功写入记录数
+ * @throws SQLException
+ */
+ private long write(RecordReceiver lineReceiver, Connection conn, int batchSize, SchemaManager scm, TaskPluginCollector collector) throws SQLException {
+ Record record = lineReceiver.getFromReader();
+ if (record == null) {
+ return 0;
+ }
+ String pq = String.format("INSERT INTO ? USING %s TAGS(%s) (%s) values (%s)", scm.getStable(), scm.getTagValuesPlaceHolder(), scm.getJoinedFieldNames(), scm.getFieldValuesPlaceHolder());
+ LOG.info("Prepared SQL: {}", pq);
+ try (TSDBPreparedStatement stmt = (TSDBPreparedStatement) conn.prepareStatement(pq)) {
+ JDBCBatchWriter batchWriter = new JDBCBatchWriter(conn, stmt, scm, batchSize, collector);
+ do {
+ batchWriter.append(record);
+ } while ((record = lineReceiver.getFromReader()) != null);
+ batchWriter.flush();
+ return batchWriter.getCount();
+ }
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java
new file mode 100644
index 00000000..53ab9bb9
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java
@@ -0,0 +1,244 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.TaskPluginCollector;
+import com.taosdata.jdbc.TSDBPreparedStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * 使用JDBC原生写入接口批量写入。
+ * 有两个限制条件导致批量写入的代码逻辑过于复杂,以至于需要开发新的类来封装。
+ * 1. 用户必须提前把需要批量写入的数据搜集到ArrayList中
+ * 2. 每批写入的表名必须相同。
+ * 这个类的实现逻辑是:
+ * 1. 先把属于同一子表的Record缓存起来
+ * 2. 缓存的数量达到batchSize阈值,自动执行一次批量写入
+ * 3. 最后一批数据需要用户手动flush才能写入
+ */
+public class JDBCBatchWriter {
+ public static final Logger LOG = LoggerFactory.getLogger(JDBCBatchWriter.class);
+ private TSDBPreparedStatement stmt;
+ private SchemaManager scm;
+ private Connection conn;
+ private int batchSize;
+ private TaskPluginCollector collector;
+
+ // 缓存Record, key为tableName
+ Map> buf = new HashMap<>();
+ // 缓存表的标签值, key为tableName
+ Map tableTagValues = new HashMap<>();
+ private long sucCount = 0;
+ private final int tsColIndex;
+ private List fieldList;
+ // 每个record至少应该包含的列数,用于校验数据
+ private int minColNum = 0;
+ private Map fieldIndexMap;
+ private List fieldTypes = null;
+
+ public JDBCBatchWriter(Connection conn, TSDBPreparedStatement stmt, SchemaManager scm, int batchSize, TaskPluginCollector collector) {
+ this.conn = conn;
+ this.stmt = stmt;
+ this.scm = scm;
+ this.batchSize = batchSize;
+ this.collector = collector;
+ this.tsColIndex = scm.getTsColIndex();
+ this.fieldList = scm.getFieldList();
+ this.fieldIndexMap = scm.getFieldIndexMap();
+ this.minColNum = 1 + fieldList.size() + scm.getDynamicTagCount();
+
+ }
+
+ public void initFiledTypesAndTargetTable(List records) throws SQLException {
+ if (fieldTypes != null) {
+ return;
+ }
+ guessFieldTypes(records);
+ if (scm.shouldCreateTable()) {
+ scm.createSTable(conn, fieldTypes);
+ }
+ }
+
+ public void append(Record record) throws SQLException {
+ int columnNum = record.getColumnNumber();
+ if (columnNum < minColNum) {
+ // 实际列数小于期望列数
+ collector.collectDirtyRecord(record, Msg.get("column_number_error"));
+ return;
+ }
+ String[] tagValues = scm.getTagValuesFromRecord(record);
+ if (tagValues == null) {
+ // 标签列包含null
+ collector.collectDirtyRecord(record, Msg.get("tag_value_error"));
+ return;
+ }
+ if (!scm.hasTimestamp(record)) {
+ // 时间戳列为null或类型错误
+ collector.collectDirtyRecord(record, Msg.get("ts_value_error"));
+ return;
+ }
+ String tableName = scm.computeTableName(tagValues);
+ if (buf.containsKey(tableName)) {
+ List lis = buf.get(tableName);
+ lis.add(record);
+ if (lis.size() == batchSize) {
+ if (fieldTypes == null) {
+ initFiledTypesAndTargetTable(lis);
+ }
+ executeBatch(tableName);
+ lis.clear();
+ }
+ } else {
+ List lis = new ArrayList<>(batchSize);
+ lis.add(record);
+ buf.put(tableName, lis);
+ tableTagValues.put(tableName, tagValues);
+ }
+ }
+
+ /**
+ * 只有String类型比较特别,测试发现值为null的列会转成String类型。所以Column的类型为String并不代表这一列的类型真的是String。
+ *
+ * @param records
+ */
+ private void guessFieldTypes(List records) {
+ fieldTypes = new ArrayList<>(fieldList.size());
+ for (int i = 0; i < fieldList.size(); ++i) {
+ int colIndex = fieldIndexMap.get(fieldList.get(i));
+ boolean ok = false;
+ for (int j = 0; j < records.size() && !ok; ++j) {
+ Column column = records.get(j).getColumn(colIndex);
+ Column.Type type = column.getType();
+ switch (type) {
+ case LONG:
+ case DOUBLE:
+ case DATE:
+ case BOOL:
+ case BYTES:
+ if (column.getRawData() != null) {
+ fieldTypes.add(type);
+ ok = true;
+ }
+ break;
+ case STRING:
+ // 只有非null且非空的String列,才会被真的当作String类型。
+ String value = column.asString();
+ if (value != null && !"".equals(value)) {
+ fieldTypes.add(type);
+ ok = true;
+ }
+ break;
+ default:
+ throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString());
+ }
+ }
+ if (!ok) {
+ // 根据采样的%d条数据,无法推断第%d列的数据类型
+ throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, String.format(Msg.get("infer_column_type_error"), records.size(), i + 1));
+ }
+ }
+ LOG.info("Field Types: {}", fieldTypes);
+ }
+
+ /**
+ * 执行单表批量写入
+ *
+ * @param tableName
+ * @throws SQLException
+ */
+ private void executeBatch(String tableName) throws SQLException {
+ // 表名
+ stmt.setTableName(tableName);
+ List records = buf.get(tableName);
+ // 标签
+ String[] tagValues = tableTagValues.get(tableName);
+ LOG.debug("executeBatch {}", String.join(",", tagValues));
+ for (int i = 0; i < tagValues.length; ++i) {
+ stmt.setTagNString(i, tagValues[i]);
+ }
+ // 时间戳
+ ArrayList tsList = records.stream().map(r -> r.getColumn(tsColIndex).asDate().getTime()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setTimestamp(0, tsList);
+ // 字段
+ for (int i = 0; i < fieldList.size(); ) {
+ String fieldName = fieldList.get(i);
+ int index = fieldIndexMap.get(fieldName);
+ switch (fieldTypes.get(i)) {
+ case LONG:
+ ArrayList lisLong = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setLong(++i, lisLong);
+ break;
+ case DOUBLE:
+ ArrayList lisDouble = records.stream().map(r -> r.getColumn(index).asDouble()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setDouble(++i, lisDouble);
+ break;
+ case STRING:
+ ArrayList lisString = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setNString(++i, lisString, 64);
+ break;
+ case DATE:
+ ArrayList lisTs = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setTimestamp(++i, lisTs);
+ break;
+ case BOOL:
+ ArrayList lisBool = records.stream().map(r -> r.getColumn(index).asBoolean()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setBoolean(++i, lisBool);
+ break;
+ case BYTES:
+ ArrayList lisBytes = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setString(++i, lisBytes, 64);
+ break;
+ default:
+ throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString());
+ }
+ }
+ // 执行
+ stmt.columnDataAddBatch();
+ stmt.columnDataExecuteBatch();
+ // 更新计数器
+ sucCount += records.size();
+ }
+
+ /**
+ * 把缓存的Record全部写入
+ */
+ public void flush() throws SQLException {
+ if (fieldTypes == null) {
+ List records = new ArrayList<>();
+ for (List lis : buf.values()) {
+ records.addAll(lis);
+ if (records.size() > 100) {
+ break;
+ }
+ }
+ if (records.size() > 0) {
+ initFiledTypesAndTargetTable(records);
+ } else {
+ return;
+ }
+ }
+ for (String tabName : buf.keySet()) {
+ if (buf.get(tabName).size() > 0) {
+ executeBatch(tabName);
+ }
+ }
+ stmt.columnDataCloseBatch();
+ }
+
+ /**
+ * @return 成功写入的数据量
+ */
+ public long getCount() {
+ return sucCount;
+ }
+}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnection.java
similarity index 98%
rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java
rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnection.java
index 3ce786e5..0aabe32a 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnection.java
@@ -1,4 +1,4 @@
-package com.alibaba.datax.plugin.writer;
+package com.alibaba.datax.plugin.writer.tdenginewriter;
import java.util.Properties;
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/Key.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java
similarity index 52%
rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/Key.java
rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java
index b240bce4..090a7999 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/Key.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java
@@ -1,4 +1,4 @@
-package com.alibaba.datax.plugin.writer;
+package com.alibaba.datax.plugin.writer.tdenginewriter;
public class Key {
public static final String HOST = "host";
@@ -7,5 +7,8 @@ public class Key {
public static final String USER = "user";
public static final String PASSWORD = "password";
public static final String BATCH_SIZE = "batchSize";
-
+ public static final String STABLE = "stable";
+ public static final String TAG_COLUMN = "tagColumn";
+ public static final String FIELD_COLUMN = "fieldColumn";
+ public static final String TIMESTAMP_COLUMN = "timestampColumn";
}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Msg.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Msg.java
new file mode 100644
index 00000000..89730d35
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Msg.java
@@ -0,0 +1,20 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+/**
+ * i18n message util
+ */
+public class Msg {
+ private static ResourceBundle bundle;
+
+ static {
+ bundle = ResourceBundle.getBundle("tdenginewritermsg", Locale.getDefault());
+ }
+
+ public static String get(String key) {
+ return bundle.getString(key);
+ }
+
+}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/OpentsdbDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java
similarity index 96%
rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/OpentsdbDataHandler.java
rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java
index 599e5f3e..e1b8f5dd 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/OpentsdbDataHandler.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java
@@ -1,9 +1,10 @@
-package com.alibaba.datax.plugin.writer;
+package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.plugin.TaskPluginCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -14,7 +15,7 @@ public class OpentsdbDataHandler implements DataHandler {
private static final String DEFAULT_BATCH_SIZE = "1";
@Override
- public long handle(RecordReceiver lineReceiver, Properties properties) {
+ public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) {
// opentsdb json protocol use JNI and schemaless API to write
String host = properties.getProperty(Key.HOST);
int port = Integer.parseInt(properties.getProperty(Key.PORT));
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java
new file mode 100644
index 00000000..d67a6585
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java
@@ -0,0 +1,271 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class SchemaManager {
+ private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class);
+
+ private String stable; // 目标超表名
+ private Map fixedTagValue = new HashMap<>(); // 固定标签值 标签名 -> 标签值
+ private Map tagIndexMap = new HashMap<>(); // 动态标签值 标签名 -> 列索引
+ private Map fieldIndexMap = new HashMap<>(); // 字段名 -> 字段索引
+ private String tsColName; // 时间戳列名
+ private int tsColIndex = -1; // 时间戳列索引
+ private List fieldList = new ArrayList<>();
+ private List tagList = new ArrayList<>();
+ private boolean canInferSchemaFromConfig = false;
+
+
+ public SchemaManager() {
+ }
+
+ public SchemaManager(Properties properties) {
+ getFromConfig(properties);
+ }
+
+ private String mapDataxType(Column.Type type) {
+ switch (type) {
+ case LONG:
+ return "BIGINT";
+ case DOUBLE:
+ return "DOUBLE";
+ case STRING:
+ return "NCHAR(64)";
+ case DATE:
+ return "TIMESTAMP";
+ case BOOL:
+ return "BOOL";
+ case BYTES:
+ return "BINARY(64)";
+ default:
+ throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, type.toString());
+ }
+ }
+
+ public void setStable(String stable) {
+ stable = stable;
+ }
+
+ public String getStable() {
+ return stable;
+ }
+
+ private void getFromConfig(Properties properties) {
+ stable = properties.getProperty(Key.STABLE);
+ if (stable == null) {
+ LOG.error("Config error: no stable");
+ return;
+ }
+ for (Object key : properties.keySet()) {
+ String k = (String) key;
+ String v = properties.getProperty(k);
+
+ String[] ps = k.split("\\.");
+ if (ps.length == 1) {
+ continue;
+ }
+ if (k.startsWith(Key.TAG_COLUMN)) {
+ String tagName = ps[1];
+ try {
+ Integer tagIndex = Integer.parseInt(v);
+ this.tagIndexMap.put(tagName, tagIndex);
+ tagList.add(tagName);
+ } catch (NumberFormatException e) {
+ fixedTagValue.put(tagName, v);
+ tagList.add(tagName);
+ }
+ } else if (k.startsWith(Key.FIELD_COLUMN)) {
+ String fieldName = ps[1];
+ Integer fileIndex = Integer.parseInt(v);
+ fieldIndexMap.put(fieldName, fileIndex);
+ } else if (k.startsWith(Key.TIMESTAMP_COLUMN)) {
+ tsColName = ps[1];
+ tsColIndex = Integer.parseInt(v);
+ }
+ }
+ List sortedFieldName = fieldIndexMap.entrySet().stream().sorted((x, y) -> x.getValue().compareTo(y.getValue())).map(e -> e.getKey()).collect(Collectors.toList());
+ fieldList.addAll(sortedFieldName); // 排序的目的是保证自动建表时列的顺序和输入数据的列的顺序保持一致
+ canInferSchemaFromConfig = tsColIndex > -1 && !(fixedTagValue.isEmpty() && tagIndexMap.isEmpty()) && !fieldIndexMap.isEmpty();
+ LOG.info("Config file parsed result:fixedTags=[{}] ,tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", fixedTagValue.keySet()), String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex);
+ }
+
+ public boolean shouldGuessSchema() {
+ return !canInferSchemaFromConfig;
+ }
+
+ public boolean shouldCreateTable() {
+ return canInferSchemaFromConfig;
+ }
+
+ public boolean configValid() {
+ boolean valid = (tagList.size() > 0 && fieldList.size() > 0 && tsColIndex > -1) || (tagList.size() == 0 && fieldList.size() == 0 && tsColIndex == -1);
+ if (!valid) {
+ LOG.error("Config error: tagColumn, fieldColumn and timestampColumn must be present together or absent together.");
+ }
+ return valid;
+ }
+
+ /**
+ * 通过执行`describe dbname.stable`命令,获取表的schema.
+ * describe命名返回有4列内容,分布是:Field,Type,Length,Note
+ *
+ * @return 成功返回true,如果超表不存在或其他错误则返回false
+ */
+ public boolean getFromDB(Connection conn) {
+ try {
+ List stables = getSTables(conn);
+ if (!stables.contains(stable)) {
+ LOG.error("super table {} not exist, fail to get schema from database.", stable);
+ return false;
+ }
+ } catch (SQLException e) {
+ LOG.error(e.getMessage());
+ e.printStackTrace();
+ return false;
+ }
+ try (Statement stmt = conn.createStatement()) {
+ ResultSet rs = stmt.executeQuery("describe " + stable);
+ int colIndex = 0;
+ while (rs.next()) {
+ String name = rs.getString(1);
+ String type = rs.getString(2);
+ String note = rs.getString(4);
+ if ("TIMESTAMP".equals(type)) {
+ tsColName = name;
+ tsColIndex = colIndex;
+ } else if ("TAG".equals(note)) {
+ tagIndexMap.put(name, colIndex);
+ tagList.add(name);
+ } else {
+ fieldIndexMap.put(name, colIndex);
+ fieldList.add(name);
+ }
+ colIndex++;
+ }
+ LOG.info("table info:tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex);
+ return true;
+ } catch (SQLException e) {
+ LOG.error(e.getMessage());
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ public static List getSTables(Connection conn) throws SQLException {
+ List stables = new ArrayList<>();
+ try (Statement stmt = conn.createStatement()) {
+ ResultSet rs = stmt.executeQuery("show stables");
+ while (rs.next()) {
+ String name = rs.getString(1);
+ stables.add(name);
+ }
+ }
+ return stables;
+ }
+
+ public void createSTable(Connection conn, List fieldTypes) throws SQLException {
+ StringBuilder sb = new StringBuilder();
+ sb.append("CREATE STABLE IF NOT EXISTS ").append(stable).append("(");
+ sb.append(tsColName).append(" ").append("TIMESTAMP,");
+ for (int i = 0; i < fieldList.size(); ++i) {
+ String fieldName = fieldList.get(i);
+ Column.Type dxType = fieldTypes.get(i);
+ sb.append(fieldName).append(' ');
+ String tdType = mapDataxType(dxType);
+ sb.append(tdType).append(',');
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ sb.append(") TAGS(");
+ for (String tagName : tagList) {
+ sb.append(tagName).append(" NCHAR(64),");
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ sb.append(")");
+ String q = sb.toString();
+ LOG.info("run sql:" + q);
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute(q);
+ }
+ }
+
+ public String[] getTagValuesFromRecord(Record record) {
+ String[] tagValues = new String[tagList.size()];
+ for (int i = 0; i < tagList.size(); ++i) {
+ if (fixedTagValue.containsKey(tagList.get(i))) {
+ tagValues[i] = fixedTagValue.get(tagList.get(i));
+ } else {
+ int tagIndex = tagIndexMap.get(tagList.get(i));
+ tagValues[i] = record.getColumn(tagIndex).asString();
+ }
+ if (tagValues[i] == null) {
+ return null;
+ }
+ }
+ return tagValues;
+ }
+
+ public boolean hasTimestamp(Record record) {
+ Column column = record.getColumn(tsColIndex);
+ if (column.getType() == Column.Type.DATE && column.asDate() != null) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public Map getFieldIndexMap() {
+ return fieldIndexMap;
+ }
+
+ public List getFieldList() {
+ return fieldList;
+ }
+
+ public String getJoinedFieldNames() {
+ return tsColName + ", " + String.join(", ", fieldList);
+ }
+
+ public int getTsColIndex() {
+ return tsColIndex;
+ }
+
+ public String getTagValuesPlaceHolder() {
+ return tagList.stream().map(x -> "?").collect(Collectors.joining(","));
+ }
+
+ public String getFieldValuesPlaceHolder() {
+ return "?, " + fieldList.stream().map(x -> "?").collect(Collectors.joining(", "));
+ }
+
+ /**
+ * 计算子表表名
+ *
+ * - 将标签的value 组合成为如下的字符串: tag_value1!tag_value2!tag_value3。
+ * - 计算该字符串的 MD5 散列值 "md5_val"。
+ * - "t_md5val"作为子表名。其中的 "t" 是固定的前缀。
+ *
+ *
+ * @param tagValues
+ * @return
+ */
+ public String computeTableName(String[] tagValues) {
+ String s = String.join("!", tagValues);
+ return "t_" + DigestUtils.md5Hex(s);
+ }
+
+ public int getDynamicTagCount() {
+ return tagIndexMap.size();
+ }
+}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java
similarity index 83%
rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java
rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java
index 84600802..cd223792 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java
@@ -1,4 +1,4 @@
-package com.alibaba.datax.plugin.writer;
+package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.plugin.RecordReceiver;
@@ -64,11 +64,17 @@ public class TDengineWriter extends Writer {
String value = this.writerSliceConfig.getString(key);
properties.setProperty(key, value);
}
-
+ if (!keys.contains(Key.USER)) {
+ properties.setProperty(Key.USER, "root");
+ }
+ if (!keys.contains(Key.PASSWORD)) {
+ properties.setProperty(Key.PASSWORD, "taosdata");
+ }
+ LOG.debug("========================properties==========================\n" + properties.toString());
String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME);
LOG.debug("start to handle record from: " + peerPluginName);
DataHandler handler = DataHandlerFactory.build(peerPluginName);
- long records = handler.handle(lineReceiver, properties);
+ long records = handler.handle(lineReceiver, properties, getTaskPluginCollector());
LOG.debug("handle data finished, records: " + records);
}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriterErrorCode.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java
similarity index 75%
rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriterErrorCode.java
rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java
index 02e87079..994f1e89 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriterErrorCode.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java
@@ -1,9 +1,10 @@
-package com.alibaba.datax.plugin.writer;
+package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.spi.ErrorCode;
public enum TDengineWriterErrorCode implements ErrorCode {
- RUNTIME_EXCEPTION("TDengineWriter-00", "运行时异常");
+ RUNTIME_EXCEPTION("TDengineWriter-00", "运行时异常"),
+ TYPE_ERROR("TDengineWriter-00", "Datax类型无法正确映射到TDengine类型");
private final String code;
private final String description;
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/com_alibaba_datax_plugin_writer_JniConnection.h b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/com_alibaba_datax_plugin_writer_tdenginewriter_JniConnection.h
similarity index 100%
rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/com_alibaba_datax_plugin_writer_JniConnection.h
rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/com_alibaba_datax_plugin_writer_tdenginewriter_JniConnection.h
diff --git a/tdenginewriter/src/main/resources/plugin.json b/tdenginewriter/src/main/resources/plugin.json
index 6c900a15..e54f65ff 100755
--- a/tdenginewriter/src/main/resources/plugin.json
+++ b/tdenginewriter/src/main/resources/plugin.json
@@ -1,9 +1,9 @@
{
"name": "tdenginewriter",
- "class": "com.alibaba.datax.plugin.writer.TDengineWriter",
+ "class": "com.alibaba.datax.plugin.writer.tdenginewriter.TDengineWriter",
"description": {
"useScene": "data migration to tdengine",
- "mechanism": "use JNI to write data to tdengine."
+ "mechanism": "use JNI or taos-jdbc to write data to tdengine."
},
"developer": "zyyang-taosdata"
}
\ No newline at end of file
diff --git a/tdenginewriter/src/main/resources/tdenginewritermsg.properties b/tdenginewriter/src/main/resources/tdenginewritermsg.properties
new file mode 100644
index 00000000..4aaa220b
--- /dev/null
+++ b/tdenginewriter/src/main/resources/tdenginewritermsg.properties
@@ -0,0 +1,6 @@
+try_get_schema_fromdb=fail to get structure info of target table from configure file and will try to get it from database
+batch_size_too_small='batchSize' is too small, please increase it and try again
+column_number_error=number of columns is less than expected
+tag_value_error=tag columns include 'null' value
+ts_value_error=timestamp column type error or null
+infer_column_type_error=fail to infer column type: sample count %d, column index %d
\ No newline at end of file
diff --git a/tdenginewriter/src/main/resources/tdenginewritermsg_en_US.properties b/tdenginewriter/src/main/resources/tdenginewritermsg_en_US.properties
new file mode 100644
index 00000000..4aaa220b
--- /dev/null
+++ b/tdenginewriter/src/main/resources/tdenginewritermsg_en_US.properties
@@ -0,0 +1,6 @@
+try_get_schema_fromdb=fail to get structure info of target table from configure file and will try to get it from database
+batch_size_too_small='batchSize' is too small, please increase it and try again
+column_number_error=number of columns is less than expected
+tag_value_error=tag columns include 'null' value
+ts_value_error=timestamp column type error or null
+infer_column_type_error=fail to infer column type: sample count %d, column index %d
\ No newline at end of file
diff --git a/tdenginewriter/src/main/resources/tdenginewritermsg_zh_CN.properties b/tdenginewriter/src/main/resources/tdenginewritermsg_zh_CN.properties
new file mode 100644
index 00000000..4b9552fd
--- /dev/null
+++ b/tdenginewriter/src/main/resources/tdenginewritermsg_zh_CN.properties
@@ -0,0 +1,6 @@
+try_get_schema_fromdb=\u65e0\u6cd5\u4ece\u914d\u7f6e\u6587\u4ef6\u83b7\u53d6\u8868\u7ed3\u6784\u4fe1\u606f\uff0c\u5c1d\u8bd5\u4ece\u6570\u636e\u5e93\u83b7\u53d6
+batch_size_too_small=batchSize\u592a\u5c0f\uff0c\u4f1a\u589e\u52a0\u81ea\u52a8\u7c7b\u578b\u63a8\u65ad\u9519\u8bef\u7684\u6982\u7387\uff0c\u5efa\u8bae\u6539\u5927\u540e\u91cd\u8bd5
+column_number_error=\u5b9e\u9645\u5217\u6570\u5c0f\u4e8e\u671f\u671b\u5217\u6570
+tag_value_error=\u6807\u7b7e\u5217\u5305\u542bnull
+ts_value_error=\u65f6\u95f4\u6233\u5217\u4e3anull\u6216\u7c7b\u578b\u9519\u8bef
+infer_column_type_error=\u6839\u636e\u91c7\u6837\u7684%d\u6761\u6570\u636e\uff0c\u65e0\u6cd5\u63a8\u65ad\u7b2c%d\u5217\u7684\u6570\u636e\u7c7b\u578b
diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/JniConnectionTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnectionTest.java
similarity index 90%
rename from tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/JniConnectionTest.java
rename to tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnectionTest.java
index 040cf34c..09c3df26 100644
--- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/JniConnectionTest.java
+++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnectionTest.java
@@ -1,4 +1,4 @@
-package com.alibaba.datax.plugin.writer;
+package com.alibaba.datax.plugin.writer.tdenginewriter;
import org.junit.Test;
diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/MessageTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/MessageTest.java
new file mode 100644
index 00000000..b1b7ddd8
--- /dev/null
+++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/MessageTest.java
@@ -0,0 +1,25 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import org.junit.Test;
+
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+import org.junit.Assert;
+
+public class MessageTest {
+ @Test
+ public void testChineseMessage() {
+ Locale local = new Locale("zh", "CN");
+ ResourceBundle bundle = ResourceBundle.getBundle("tdenginewritermsg", local);
+ String msg = bundle.getString("try_get_schema_fromdb");
+ Assert.assertEquals("无法从配置文件获取表结构信息,尝试从数据库获取", msg);
+ }
+
+ @Test
+ public void testDefaultMessage() {
+ ResourceBundle bundle = ResourceBundle.getBundle("tdenginewritermsg", Locale.getDefault());
+ String msg = bundle.getString("try_get_schema_fromdb");
+ System.out.println(msg);
+ }
+}
diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java
new file mode 100644
index 00000000..62bf7040
--- /dev/null
+++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java
@@ -0,0 +1,31 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class TDengineWriterTest {
+
+
+ @Test
+ public void testGetSchema() throws ClassNotFoundException, SQLException {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", "wozai.fun", "6030", "test", "root", "taosdata");
+ Connection conn = DriverManager.getConnection(jdbcUrl);
+ SchemaManager schemaManager = new SchemaManager();
+ schemaManager.setStable("test1");
+ schemaManager.getFromDB(conn);
+ }
+
+ @Test
+ public void dropTestTable() throws ClassNotFoundException, SQLException {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", "wozai.fun", "6030", "test", "root", "taosdata");
+ Connection conn = DriverManager.getConnection(jdbcUrl);
+ Statement stmt = conn.createStatement();
+ stmt.execute("drop table market_snapshot");
+ }
+}