From e60948b604f8887addc806160eb89984bd477333 Mon Sep 17 00:00:00 2001 From: dingbo Date: Thu, 11 Nov 2021 15:24:09 +0800 Subject: [PATCH 1/9] add option authDB to mongodb reader doc --- mongodbreader/doc/mongodbreader.md | 1 + 1 file changed, 1 insertion(+) diff --git a/mongodbreader/doc/mongodbreader.md b/mongodbreader/doc/mongodbreader.md index b61493e6..99d25731 100644 --- a/mongodbreader/doc/mongodbreader.md +++ b/mongodbreader/doc/mongodbreader.md @@ -127,6 +127,7 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据,通过主控的J * address: MongoDB的数据地址信息,因为MonogDB可能是个集群,则ip端口信息需要以Json数组的形式给出。【必填】 * userName:MongoDB的用户名。【选填】 * userPassword: MongoDB的密码。【选填】 +* authDb: MongoDB认证数据库【选填】 * collectionName: MonogoDB的集合名。【必填】 * column:MongoDB的文档列名。【必填】 * name:Column的名字。【必填】 From 3de5a8f71517268afe77415f71f6b5511d81a8b6 Mon Sep 17 00:00:00 2001 From: dingbo Date: Thu, 11 Nov 2021 17:38:14 +0800 Subject: [PATCH 2/9] add test job configure --- core/src/main/job/mongodb2tdengine.json | 75 +++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 core/src/main/job/mongodb2tdengine.json diff --git a/core/src/main/job/mongodb2tdengine.json b/core/src/main/job/mongodb2tdengine.json new file mode 100644 index 00000000..0667bddd --- /dev/null +++ b/core/src/main/job/mongodb2tdengine.json @@ -0,0 +1,75 @@ +{ + "job": { + "setting": { + "speed": { + "channel": 2 + } + }, + "content": [ + { + "reader": { + "name": "mongodbreader", + "parameter": { + "address": [ + "123.56.104.14:27017" + ], + "userName": "admin678", + "mechanism": "SCRAM-SHA-1", + "userPassword": "huwG86123", + "authDb": "admin", + "dbName": "test", + "collectionName": "cu_market_data", + "column": [ + { + "name": "instrumentID", + "type": "string" + }, + { + "name": "tradeTime", + "type": "date" + }, + { + "name": "lastPrice", + "type": "double" + }, + { + "name": "askPrice1", + "type": "double" + }, + { + "name": "bidPrice1", + "type": "double" + }, + { + "name": "volume", + "type": "int" + } + ] + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "host": "123.56.104.14", + "port": 6030, + "dbname": "test", + "user": "root", + "password": "taosdata", + "measurement": "market_snapshot", + "tag_set": { + "product": "cu", + "instrumentID": 0 + }, + "field_set": { + "lastPrice": 2, + "askPrice1": 3, + "bidPrice1": 4, + "volume": 5 + }, + "timestamp": 1 + } + } + } + ] + } +} \ No newline at end of file From bf01999222ff86d0275c1f7112db9846e814b729 Mon Sep 17 00:00:00 2001 From: dingbo Date: Thu, 18 Nov 2021 10:28:20 +0800 Subject: [PATCH 3/9] mongodb2tdengine support --- core/src/main/job/mongodb2tdengine.json | 11 +- tdenginewriter/doc/tdenginewriter.md | 260 ++++++++++++++---- tdenginewriter/pom.xml | 11 +- .../plugin/writer/DefaultDataHandler.java | 34 --- .../{ => tdenginewriter}/DataHandler.java | 2 +- .../DataHandlerFactory.java | 2 +- .../tdenginewriter/DefaultDataHandler.java | 101 +++++++ .../tdenginewriter/JDBCBatchWriter.java | 149 ++++++++++ .../{ => tdenginewriter}/JniConnection.java | 2 +- .../writer/{ => tdenginewriter}/Key.java | 7 +- .../OpentsdbDataHandler.java | 2 +- .../writer/tdenginewriter/SchemaManager.java | 255 +++++++++++++++++ .../{ => tdenginewriter}/TDengineWriter.java | 10 +- .../TDengineWriterErrorCode.java | 5 +- ...gin_writer_tdenginewriter_JniConnection.h} | 0 tdenginewriter/src/main/resources/plugin.json | 4 +- .../JniConnectionTest.java | 2 +- .../tdenginewriter/TDengineWriterTest.java | 21 ++ 18 files changed, 775 insertions(+), 103 deletions(-) delete mode 100644 tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DefaultDataHandler.java rename tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/{ => tdenginewriter}/DataHandler.java (77%) rename tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/{ => tdenginewriter}/DataHandlerFactory.java (81%) create mode 100644 tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java create mode 100644 tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java rename tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/{ => tdenginewriter}/JniConnection.java (98%) rename tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/{ => tdenginewriter}/Key.java (52%) rename tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/{ => tdenginewriter}/OpentsdbDataHandler.java (98%) create mode 100644 tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java rename tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/{ => tdenginewriter}/TDengineWriter.java (84%) rename tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/{ => tdenginewriter}/TDengineWriterErrorCode.java (75%) rename tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/{com_alibaba_datax_plugin_writer_JniConnection.h => tdenginewriter/com_alibaba_datax_plugin_writer_tdenginewriter_JniConnection.h} (100%) rename tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/{ => tdenginewriter}/JniConnectionTest.java (90%) create mode 100644 tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java diff --git a/core/src/main/job/mongodb2tdengine.json b/core/src/main/job/mongodb2tdengine.json index 0667bddd..4cfc987e 100644 --- a/core/src/main/job/mongodb2tdengine.json +++ b/core/src/main/job/mongodb2tdengine.json @@ -55,18 +55,21 @@ "dbname": "test", "user": "root", "password": "taosdata", - "measurement": "market_snapshot", - "tag_set": { + "stable": "market_snapshot", + "batchSize": 35, + "tagColumn": { "product": "cu", "instrumentID": 0 }, - "field_set": { + "fieldColumn": { "lastPrice": 2, "askPrice1": 3, "bidPrice1": 4, "volume": 5 }, - "timestamp": 1 + "timestampColumn": { + "tradeTime": 1 + } } } } diff --git a/tdenginewriter/doc/tdenginewriter.md b/tdenginewriter/doc/tdenginewriter.md index 8e55b189..c9c222a2 100644 --- a/tdenginewriter/doc/tdenginewriter.md +++ b/tdenginewriter/doc/tdenginewriter.md @@ -2,20 +2,21 @@ ## 1 快速介绍 -TDengineWriter 插件实现了写入数据到 TDengine 的功能。 在底层实现上, TDengineWriter 通过 JNI的方式调用libtaos.so/tao.dll中的方法,连接 TDengine -数据库实例,并执行schemaless的写入。 TDengineWriter 面向ETL开发工程师,他们使用 TDengineWriter 从数仓导入数据到 TDengine。同时,TDengineWriter -亦可以作为数据迁移工具为DBA等用户提供服务。 +TDengineWriter插件实现了写入数据到TDengine数据库功能。可用于离线同步其它数据库的数据到TDengine。 ## 2 实现原理 -TDengineWriter 通过 DataX 框架获取 Reader -生成的协议数据,根据reader的类型解析数据,通过JNI方式调用libtaos.so(或taos.dll)中的方法,使用schemaless的方式写入到TDengine。 +TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据,根据reader的类型解析数据。目前有两种写入方式: + +1. 对于OpenTSDBReader, TDengineWriter通过JNI方式调用TDengine客户端库文件(taos.lib或taos.dll)中的方法,使用[schemaless的方式](https://www.taosdata.com/cn/documentation/insert#schemaless)写入。 + +2. 对于其它数据源,会根据配置生成SQL语句, 通过[taos-jdbcdriver](https://www.taosdata.com/cn/documentation/connector/java)批量写入。 + +这样区分的原因是OpenTSDBReader将opentsdb的数据统一读取为json字符串,Writer端接收到的数据只有1列。而其它Reader插件一般会把数据放在不同列。 ## 3 功能说明 - -### 3.1 配置样例 - -* 这里使用一份从OpenTSDB产生到 TDengine 导入的数据。 +### 3.1 从OpenTSDB到TDengine +#### 3.1.1 配置样例 ```json { @@ -54,46 +55,189 @@ TDengineWriter 通过 DataX 框架获取 Reader } ``` -### 3.2 参数说明 +#### 3.1.2 参数说明 -* **host** - * 描述:TDengine实例的host。 +| 参数 | 描述 | 是否必选 | 默认值 | +| --------- | -------------------- | -------- | -------- | +| host | TDengine实例的host | 是 | 无 | +| port | TDengine实例的port | 是 | 无 | +| user | TDengine实例的用户名 | 否 | root | +| password | TDengine实例的密码 | 否 | taosdata | +| dbname | 目的数据库的名称 | 是 | 无 | +| batchSize | 每次批量插入多少记录 | 否 | 1 | - * 必选:是
- * 默认值:无
-* **port** - * 描述:TDengine实例的port。 - * 必选:是
- * 默认值:无
-* **dbname** - * 描述:目的数据库的名称。 +#### 3.1.3 类型转换 - * 必选:是
+目前,由于OpenTSDBReader将opentsdb的数据统一读取为json字符串,TDengineWriter 在做Opentsdb到TDengine的迁移时,按照以下类型进行处理: - * 默认值:无
-* **username** - * 描述:TDengine实例的用户名
- * 必选:是
- * 默认值:无
-* **password** - * 描述:TDengine实例的密码
- * 必选:是
- * 默认值:无
- -### 3.3 类型转换 - -目前,由于opentsdbreader将opentsdb的数据统一读取为json字符串,TDengineWriter 在做Opentsdb到TDengine的迁移时,按照以下类型进行处理: - -| OpenTSDB数据类型 | DataX 内部类型| TDengine 数据类型 | -| -------- | ----- | -------- | +| OpenTSDB数据类型 | DataX 内部类型 | TDengine 数据类型 | +| ---------------- | -------------- | ----------------- | | timestamp | Date | timestamp | | Integer(value) | Double | double | -| Float(value) | Double | double | -| String(value) | String | binary | +| Float(value) | Double | double | +| String(value) | String | binary | | Integer(tag) | String | binary | -| Float(tag) | String |binary | -| String(tag) | String |binary | +| Float(tag) | String | binary | +| String(tag) | String | binary | + +### 3.2 从MongoDB到TDengine + +#### 3.2.1 配置样例 +```json +{ + "job": { + "setting": { + "speed": { + "channel": 2 + } + }, + "content": [ + { + "reader": { + "name": "mongodbreader", + "parameter": { + "address": [ + "127.0.0.1:27017" + ], + "userName": "user", + "mechanism": "SCRAM-SHA-1", + "userPassword": "password", + "authDb": "admin", + "dbName": "test", + "collectionName": "stock", + "column": [ + { + "name": "stockID", + "type": "string" + }, + { + "name": "tradeTime", + "type": "date" + }, + { + "name": "lastPrice", + "type": "double" + }, + { + "name": "askPrice1", + "type": "double" + }, + { + "name": "bidPrice1", + "type": "double" + }, + { + "name": "volume", + "type": "int" + } + ] + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "host": "localhost", + "port": 6030, + "dbname": "test", + "user": "root", + "password": "taosdata", + "stable": "stock", + "tagColumn": { + "industry": "energy", + "stockID": 0 + }, + "fieldColumn": { + "lastPrice": 2, + "askPrice1": 3, + "bidPrice1": 4, + "volume": 5 + }, + "timestampColumn": { + "tradeTime": 1 + } + } + } + } + ] + } +} +``` + +**注:本配置的writer部分同样适用于关系型数据库** + + +#### 3.2.2 参数说明 +| 参数 | 描述 | 是否必选 | 默认值 | 备注 | +| --------------- | -------------------- | ---------------- | -------- | ------------------ | +| host | TDengine实例的host | 是 | 无 | +| port | TDengine实例的port | 是 | 无 | +| user | TDengine实例的用户名 | 否 | root | +| password | TDengine实例的密码 | 否 | taosdata | +| dbname | 目的数据库的名称 | 是 | 无 | +| batchSize | 每次批量插入多少记录 | 否 | 1000 | +| stable | 目标超级表的名称 | 是(OpenTSDB除外) | 无 | +| tagColumn | 标签列的列名和位置 | 否 | 无 | 位置索引均从0开始 | +| fieldColumn | 字段列的列名和位置 | 否 | 无 | | +| timestampColumn | 时间戳列的列名和位置 | 否 | 无 | 时间戳列只能有一个 | + +#### 3.3.3 自动建表规则 +##### 3.3.3.1 超级表创建规则 + +如果配置了tagColumn、 fieldColumn和timestampColumn将会在插入第一条数据前,自动创建超级表。
+数据列的类型从第1条记录自动推断, 标签列默认类型为`NCHAR(64)`, 比如示例配置,可能生成以下建表语句: + +```sql +CREATE STABLE IF NOT EXISTS market_snapshot ( + tadetime TIMESTAMP, + lastprice DOUBLE, + askprice1 DOUBLE, + bidprice1 DOUBLE, + volume INT +) +TAGS( + industry NCHAR(64), + stockID NCHAR(64 +); +``` + +##### 3.3.3.2 子表创建规则 + +子表结果与超表相同,子表表名生成规则: +1. 将标签的value 组合成为如下的字符串: `tag_value1!tag_value2!tag_value3`。 +2. 计算该字符串的 MD5 散列值 "md5_val"。 +3. "t_md5val"作为子表名。其中的 "t" 是固定的前缀。 + +#### 3.3.4 用户提前建表 + +如果你已经创建好目标超级表,那么tagColumn、 fieldColumn和timestampColumn三个字段均可省略, 插件将通过执行通过`describe stableName`获取表结构的信息。 +此时要求接收到的Record中Column的顺序和执行`describe stableName`返回的列顺序相同, 比如通过`describe stableName`返回以下内容: +``` + Field | Type | Length | Note | +================================================================================= + ts | TIMESTAMP | 8 | | + current | DOUBLE | 8 | | + location | BINARY | 10 | TAG | +``` +那么插件收到的数据第1列必须代表时间戳,第2列必须代表电流,第3列必须代表位置。 + +#### 3.3.5 注意事项 + +1. tagColumn、 fieldColumn和timestampColumn三个字段用于描述目标表的结构信息,这三个配置字段必须同时存在或同时省略。 +2. 如果存在以上三个配置,且目标表也已经存在,则两者必须一致。**一致性**由用户自己保证,插件不做检查。不一致可能会导致插入失败或插入数据错乱。 +3. 插件优先使用配置文件中指定的表结构。 + +#### 3.3.6 类型转换 + +| MongoDB 数据类型 | DataX 内部类型 | TDengine 数据类型 | +| ---------------- | -------------- | ----------------- | +| int, Long | Long | BIGINT | +| double | Double | DOUBLE | +| string, array | String | NCHAR(64) | +| date | Date | TIMESTAMP | +| boolean | Boolean | BOOL | +| bytes | Bytes | BINARY | + ## 4 性能报告 @@ -127,13 +271,13 @@ TDengineWriter 通过 DataX 框架获取 Reader #### 4.2.1 单表测试报告 -| 通道数| DataX速度(Rec/s)|DataX流量(MB/s)| DataX机器网卡流出流量(MB/s)|DataX机器运行负载|DB网卡进入流量(MB/s)|DB运行负载|DB TPS| -|--------| --------|--------|--------|--------|--------|--------|--------| -|1| | | | | | | | -|4| | | | | | | | -|8| | | | | | | | -|16| | | | | | | | -|32| | | | | | | | +| 通道数 | DataX速度(Rec/s) | DataX流量(MB/s) | DataX机器网卡流出流量(MB/s) | DataX机器运行负载 | DB网卡进入流量(MB/s) | DB运行负载 | DB TPS | +| ------ | ---------------- | --------------- | --------------------------- | ----------------- | -------------------- | ---------- | ------ | +| 1 | | | | | | | | +| 4 | | | | | | | | +| 8 | | | | | | | | +| 16 | | | | | | | | +| 32 | | | | | | | | 说明: @@ -143,9 +287,23 @@ TDengineWriter 通过 DataX 框架获取 Reader #### 4.2.4 性能测试小结 -1. -2. ## 5 约束限制 -## FAQ \ No newline at end of file +## FAQ + +### 如何选取要同步的数据的范围? + +数据范围的选取在Reader插件端配置,对于不同的Reader插件配置方法往往不同。比如对于mysqlreader, 可以用sql语句指定数据范围。对于opentsdbreader, 用beginDateTime和endDateTime两个配置项指定数据范围。 + +### 如何一次导入多张源表? + +如果Reader插件支持一次读多张表,Writer插件就能一次导入多张表。如果Reader不支持多多张表,可以建多个job,分别导入。Writer插件只负责写数据。 + +### 1张源表导入之后对应TDengine中多少张表? + +这是又tagColumn决定的,如果所有tag列的值都相同,目标表也只有一个。源表有多少不同的tag组合,目标超表就会有多少子表。 + +### 源表和目标表的字段顺序一致吗? + +TDengine要求每个表第一列是时间戳列,后边是普通字段,最后是标签列。如果源表不是这个顺序,插件在自动建表是自动调整。 \ No newline at end of file diff --git a/tdenginewriter/pom.xml b/tdenginewriter/pom.xml index d658d4a2..8eb94b33 100644 --- a/tdenginewriter/pom.xml +++ b/tdenginewriter/pom.xml @@ -19,6 +19,11 @@ + + com.taosdata.jdbc + taos-jdbcdriver + 2.0.34 + com.alibaba.datax datax-common @@ -37,7 +42,11 @@ ${junit-version} test - + + org.apache.commons + commons-lang3 + ${commons-lang3-version} + diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DefaultDataHandler.java deleted file mode 100644 index a1d52d75..00000000 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DefaultDataHandler.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.alibaba.datax.plugin.writer; - -import com.alibaba.datax.common.element.Column; -import com.alibaba.datax.common.element.Record; -import com.alibaba.datax.common.plugin.RecordReceiver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Properties; - -public class DefaultDataHandler implements DataHandler { - private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class); - - @Override - public long handle(RecordReceiver lineReceiver, Properties properties) { - long count = 0; - Record record; - while ((record = lineReceiver.getFromReader()) != null) { - - int recordLength = record.getColumnNumber(); - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < recordLength; i++) { - Column column = record.getColumn(i); - sb.append(column.asString()).append("\t"); - } - sb.setLength(sb.length() - 1); - LOG.debug(sb.toString()); - - count++; - } - return count; - } - -} \ No newline at end of file diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java similarity index 77% rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandler.java rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java index 94d1db30..686ac27b 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java @@ -1,4 +1,4 @@ -package com.alibaba.datax.plugin.writer; +package com.alibaba.datax.plugin.writer.tdenginewriter; import com.alibaba.datax.common.plugin.RecordReceiver; diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandlerFactory.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandlerFactory.java similarity index 81% rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandlerFactory.java rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandlerFactory.java index a488e7d5..1f740d7e 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandlerFactory.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandlerFactory.java @@ -1,4 +1,4 @@ -package com.alibaba.datax.plugin.writer; +package com.alibaba.datax.plugin.writer.tdenginewriter; public class DataHandlerFactory { diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java new file mode 100644 index 00000000..733f49c5 --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java @@ -0,0 +1,101 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.taosdata.jdbc.TSDBPreparedStatement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Properties; + +/** + * 默认DataHandler + */ +public class DefaultDataHandler implements DataHandler { + private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class); + + static { + try { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + } + + @Override + public long handle(RecordReceiver lineReceiver, Properties properties) { + SchemaManager schemaManager = new SchemaManager(properties); + if (!schemaManager.configValid()) { + return 0; + } + + try { + Connection conn = getTaosConnection(properties); + if (conn == null) { + return 0; + } + if (schemaManager.shouldGuessSchema()) { + LOG.info("无法从配置文件获取表结构信息,尝试从数据库获取"); + boolean success = schemaManager.getFromDB(conn); + if (!success) { + return 0; + } + } else { + + } + int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, "1000")); + return write(lineReceiver, conn, batchSize, schemaManager); + } catch (Exception e) { + LOG.error("write failed " + e.getMessage()); + e.printStackTrace(); + } + return 0; + } + + + private Connection getTaosConnection(Properties properties) throws SQLException { + // 检查必要参数 + String host = properties.getProperty(Key.HOST); + String port = properties.getProperty(Key.PORT); + String dbname = properties.getProperty(Key.DBNAME); + String user = properties.getProperty(Key.USER); + String password = properties.getProperty(Key.PASSWORD); + if (host == null || port == null || dbname == null || user == null || password == null) { + String keys = String.join(" ", Key.HOST, Key.PORT, Key.DBNAME, Key.USER, Key.PASSWORD); + LOG.error("Required options missing, please check: " + keys); + return null; + } + String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", host, port, dbname, user, password); + LOG.info("TDengine connection established, host:{} port:{} dbname:{} user:{}", host, port, dbname, user); + return DriverManager.getConnection(jdbcUrl); + } + + /** + * 使用SQL批量写入
+ * + * @return 成功写入记录数 + * @throws SQLException + */ + private long write(RecordReceiver lineReceiver, Connection conn, int batchSize, SchemaManager scm) throws SQLException { + Record record = lineReceiver.getFromReader(); + if (record == null) { + return 0; + } + if (scm.shouldCreateTable()) { + scm.createSTable(conn, record); + } + String pq = String.format("INSERT INTO ? USING %s TAGS(%s) (%s) values (%s)", scm.getStable(), scm.getTagValuesPlaceHolder(), scm.getJoinedFieldNames(), scm.getFieldValuesPlaceHolder()); + LOG.info("Prepared SQL: {}", pq); + try (TSDBPreparedStatement stmt = (TSDBPreparedStatement) conn.prepareStatement(pq)) { + JDBCBatchWriter batchWriter = new JDBCBatchWriter(stmt, scm, batchSize); + do { + batchWriter.append(record); + } while ((record = lineReceiver.getFromReader()) != null); + batchWriter.flush(); + return batchWriter.getCount(); + } + } +} \ No newline at end of file diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java new file mode 100644 index 00000000..17023d03 --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java @@ -0,0 +1,149 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.taosdata.jdbc.TSDBPreparedStatement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * 使用JDBC原生写入接口批量写入。
+ * 有两个限制条件导致批量写入的代码逻辑过于复杂,以至于需要开发新的类来封装。
+ * 1. 用户必须提前把需要批量写入的数据搜集到ArrayList中 + * 2. 每批写入的表名必须相同。 + * 这个类的实现逻辑是: + * 1. 先把属于同一子表的Record缓存起来 + * 2. 缓存的数量达到batchSize阈值,自动执行一次批量写入 + * 3. 最后一批数据需要用户手动flush才能写入 + */ +public class JDBCBatchWriter { + public static final Logger LOG = LoggerFactory.getLogger(JDBCBatchWriter.class); + + private TSDBPreparedStatement stmt; + private SchemaManager scm; + private int batchSize; + // 缓存Record, key为tableName + Map> buf = new HashMap<>(); + // 缓存表的标签值, key为tableName + Map tableTagValues = new HashMap<>(); + private long sucCount = 0; + private final int tsColIndex; + private List fieldList; + private Map fieldIndexMap; + + public JDBCBatchWriter(TSDBPreparedStatement stmt, SchemaManager scm, int batchSize) { + this.stmt = stmt; + this.scm = scm; + this.batchSize = batchSize; + this.tsColIndex = scm.getTsColIndex(); + this.fieldList = scm.getFieldList(); + this.fieldIndexMap = scm.getFieldIndexMap(); + } + + + public void append(Record record) throws SQLException { + String[] tagValues = scm.getTagValuesFromRecord(record); + String tableName = scm.computeTableName(tagValues); + if (buf.containsKey(tableName)) { + List lis = buf.get(tableName); + lis.add(record); + if (lis.size() == batchSize) { + executeBatch(tableName); + lis.clear(); + } + } else { + List lis = new ArrayList<>(batchSize); + lis.add(record); + buf.put(tableName, lis); + tableTagValues.put(tableName, tagValues); + } + } + + /** + * 执行单表批量写入 + * + * @param tableName + * @throws SQLException + */ + private void executeBatch(String tableName) throws SQLException { + // 表名 + stmt.setTableName(tableName); + List records = buf.get(tableName); + // 标签 + String[] tagValues = tableTagValues.get(tableName); + LOG.debug("executeBatch {}", String.join(",", tagValues)); + for (int i = 0; i < tagValues.length; ++i) { + stmt.setTagNString(i, tagValues[i]); + } + // 时间戳 + ArrayList tsList = records.stream().map(r -> r.getColumn(tsColIndex).asDate().getTime()).collect(Collectors.toCollection(ArrayList::new)); + stmt.setTimestamp(0, tsList); + // 字段 + Record record = records.get(0); + for (int i = 0; i < fieldList.size(); ) { + String fieldName = fieldList.get(i); + int index = fieldIndexMap.get(fieldName); + Column column = record.getColumn(index); + switch (column.getType()) { + case LONG: + ArrayList lisLong = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new)); + stmt.setLong(++i, lisLong); + break; + case DOUBLE: + ArrayList lisDouble = records.stream().map(r -> r.getColumn(index).asDouble()).collect(Collectors.toCollection(ArrayList::new)); + stmt.setDouble(++i, lisDouble); + break; + case STRING: + ArrayList lisString = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new)); + stmt.setNString(++i, lisString, 64); + break; + case DATE: + ArrayList lisTs = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new)); + stmt.setTimestamp(++i, lisTs); + break; + case BOOL: + ArrayList lisBool = records.stream().map(r -> r.getColumn(index).asBoolean()).collect(Collectors.toCollection(ArrayList::new)); + stmt.setBoolean(++i, lisBool); + break; + case BYTES: + ArrayList lisBytes = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new)); + stmt.setString(++i, lisBytes, 64); + break; + default: + throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, column.getType().toString()); + } + } + // 执行 + stmt.columnDataAddBatch(); + stmt.columnDataExecuteBatch(); + // 更新计数器 + sucCount += records.size(); + } + + /** + * 把缓存的Record全部写入 + */ + public void flush() throws SQLException { + for (String tabName : buf.keySet()) { + if (buf.get(tabName).size() > 0) { + executeBatch(tabName); + } + } + stmt.columnDataCloseBatch(); + } + + /** + * @return 成功写入的数据量 + */ + public long getCount() { + return sucCount; + } +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnection.java similarity index 98% rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnection.java index 3ce786e5..0aabe32a 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnection.java @@ -1,4 +1,4 @@ -package com.alibaba.datax.plugin.writer; +package com.alibaba.datax.plugin.writer.tdenginewriter; import java.util.Properties; diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/Key.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java similarity index 52% rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/Key.java rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java index b240bce4..090a7999 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/Key.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java @@ -1,4 +1,4 @@ -package com.alibaba.datax.plugin.writer; +package com.alibaba.datax.plugin.writer.tdenginewriter; public class Key { public static final String HOST = "host"; @@ -7,5 +7,8 @@ public class Key { public static final String USER = "user"; public static final String PASSWORD = "password"; public static final String BATCH_SIZE = "batchSize"; - + public static final String STABLE = "stable"; + public static final String TAG_COLUMN = "tagColumn"; + public static final String FIELD_COLUMN = "fieldColumn"; + public static final String TIMESTAMP_COLUMN = "timestampColumn"; } diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/OpentsdbDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java similarity index 98% rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/OpentsdbDataHandler.java rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java index 599e5f3e..52f1aa7a 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/OpentsdbDataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java @@ -1,4 +1,4 @@ -package com.alibaba.datax.plugin.writer; +package com.alibaba.datax.plugin.writer.tdenginewriter; import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Record; diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java new file mode 100644 index 00000000..b3d7b7e3 --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java @@ -0,0 +1,255 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.*; +import java.util.stream.Collectors; + +public class SchemaManager { + private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class); + + private String stable; // 目标超表名 + private Map fixedTagValue = new HashMap<>(); // 固定标签值 标签名 -> 标签值 + private Map tagIndexMap = new HashMap<>(); // 动态标签值 标签名 -> 列索引 + private Map fieldIndexMap = new HashMap<>(); // 字段名 -> 字段索引 + private String tsColName; // 时间戳列名 + private int tsColIndex = -1; // 时间戳列索引 + private List fieldList = new ArrayList<>(); + private List tagList = new ArrayList<>(); + private boolean canInferSchemaFromConfig = false; + + + public SchemaManager() { + } + + public SchemaManager(Properties properties) { + getFromConfig(properties); + } + + private String mapDataxType(Column.Type type) { + switch (type) { + case LONG: + return "BIGINT"; + case DOUBLE: + return "DOUBLE"; + case STRING: + return "NCHAR(64)"; + case DATE: + return "TIMESTAMP"; + case BOOL: + return "BOOL"; + case BYTES: + return "BINARY"; + default: + throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, type.toString()); + } + } + + public void setStable(String stable) { + stable = stable; + } + + public String getStable() { + return stable; + } + + private void getFromConfig(Properties properties) { + stable = properties.getProperty(Key.STABLE); + if (stable == null) { + LOG.error("配置错误: no stable"); + return; + } + for (Object key : properties.keySet()) { + String k = (String) key; + String v = properties.getProperty(k); + + String[] ps = k.split("\\."); + if (ps.length == 1) { + continue; + } + if (k.startsWith(Key.TAG_COLUMN)) { + String tagName = ps[1]; + try { + Integer tagIndex = Integer.parseInt(v); + this.tagIndexMap.put(tagName, tagIndex); + tagList.add(tagName); + } catch (NumberFormatException e) { + fixedTagValue.put(tagName, v); + tagList.add(tagName); + } + } else if (k.startsWith(Key.FIELD_COLUMN)) { + String fieldName = ps[1]; + Integer fileIndex = Integer.parseInt(v); + fieldIndexMap.put(fieldName, fileIndex); + } else if (k.startsWith(Key.TIMESTAMP_COLUMN)) { + tsColName = ps[1]; + tsColIndex = Integer.parseInt(v); + } + } + List sortedFieldName = fieldIndexMap.entrySet().stream().sorted((x, y) -> x.getValue().compareTo(y.getValue())).map(e -> e.getKey()).collect(Collectors.toList()); + fieldList.addAll(sortedFieldName); // 排序的目的是保证自动建表时列的顺序和输入数据的列的顺序保持一致 + canInferSchemaFromConfig = tsColIndex > -1 && !(fixedTagValue.isEmpty() && tagIndexMap.isEmpty()) && !fieldIndexMap.isEmpty(); + LOG.info("配置文件解析结果:fixedTags=[{}] ,tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", fixedTagValue.keySet()), String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex); + } + + public boolean shouldGuessSchema() { + return !canInferSchemaFromConfig; + } + + public boolean shouldCreateTable() { + return canInferSchemaFromConfig; + } + + public boolean configValid() { + boolean valid = (tagList.size() > 0 && fieldList.size() > 0 && tsColIndex > -1) || (tagList.size() == 0 && fieldList.size() == 0 && tsColIndex == -1); + if (!valid) { + LOG.error("配置错误. tag_columns,field_columns,timestamp_column必须同时存在或同时省略,当前解析结果: tag_columns: {}, field_columns:{}, timestamp_column:{} tsColIndex:{}", + (fixedTagValue.size() + tagIndexMap.size()), fieldIndexMap.size(), tsColName, tsColIndex); + } + return valid; + } + + /** + * 通过执行`describe dbname.stable`命令,获取表的schema.
+ * describe命名返回有4列内容,分布是:Field,Type,Length,Note
+ * + * @return 成功返回true,如果超表不存在或其他错误则返回false + */ + public boolean getFromDB(Connection conn) { + try { + List stables = getSTables(conn); + if (!stables.contains(stable)) { + LOG.error("超级表{}不存在,无法从数据库获取表结构信息.", stable); + return false; + } + } catch (SQLException e) { + LOG.error(e.getMessage()); + e.printStackTrace(); + return false; + } + try (Statement stmt = conn.createStatement()) { + ResultSet rs = stmt.executeQuery("describe " + stable); + int colIndex = 0; + while (rs.next()) { + String name = rs.getString(1); + String type = rs.getString(2); + String note = rs.getString(4); + if ("TIMESTAMP".equals(type)) { + tsColName = name; + tsColIndex = colIndex; + } else if ("TAG".equals(note)) { + tagIndexMap.put(name, colIndex); + tagList.add(name); + } else { + fieldIndexMap.put(name, colIndex); + fieldList.add(name); + } + colIndex++; + } + LOG.info("从数据库获取的表结构概要:tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex); + return true; + } catch (SQLException e) { + LOG.error(e.getMessage()); + e.printStackTrace(); + return false; + } + } + + public static List getSTables(Connection conn) throws SQLException { + List stables = new ArrayList<>(); + try (Statement stmt = conn.createStatement()) { + ResultSet rs = stmt.executeQuery("show stables"); + while (rs.next()) { + String name = rs.getString(1); + stables.add(name); + } + } + return stables; + } + + public void createSTable(Connection conn, Record record) throws SQLException { + StringBuilder sb = new StringBuilder(); + sb.append("CREATE STABLE IF NOT EXISTS ").append(stable).append("("); + sb.append(tsColName).append(" ").append("TIMESTAMP,"); + for (String fieldName : fieldList) { + sb.append(fieldName).append(' '); + Column col = record.getColumn(fieldIndexMap.get(fieldName)); + String tdType = mapDataxType(col.getType()); + sb.append(tdType).append(','); + } + sb.deleteCharAt(sb.length() - 1); + sb.append(") TAGS("); + for (String tagName : tagList) { + sb.append(tagName).append(" NCHAR(64),"); + } + sb.deleteCharAt(sb.length() - 1); + sb.append(")"); + String q = sb.toString(); + LOG.info("自动创建超级表:" + q); + try (Statement stmt = conn.createStatement()) { + stmt.execute(q); + } + } + + public String[] getTagValuesFromRecord(Record record) { + String[] tagValues = new String[tagList.size()]; + for (int i = 0; i < tagList.size(); ++i) { + if (fixedTagValue.containsKey(tagList.get(i))) { + tagValues[i] = fixedTagValue.get(tagList.get(i)); + } else { + int tagIndex = tagIndexMap.get(tagList.get(i)); + tagValues[i] = record.getColumn(tagIndex).asString(); + } + } + return tagValues; + } + + public Map getFieldIndexMap() { + return fieldIndexMap; + } + + public List getFieldList() { + return fieldList; + } + + public String getJoinedFieldNames() { + return tsColName + ", " + String.join(", ", fieldList); + } + + public int getTsColIndex() { + return tsColIndex; + } + + public String getTagValuesPlaceHolder() { + return tagList.stream().map(x -> "?").collect(Collectors.joining(",")); + } + + public String getFieldValuesPlaceHolder() { + return "?, " + fieldList.stream().map(x -> "?").collect(Collectors.joining(", ")); + } + + /** + * 计算子表表名 + *
    + *
  1. 将标签的value 组合成为如下的字符串: tag_value1!tag_value2!tag_value3。
  2. + *
  3. 计算该字符串的 MD5 散列值 "md5_val"。
  4. + *
  5. "t_md5val"作为子表名。其中的 "t" 是固定的前缀。
  6. + *
+ * + * @param tagValues + * @return + */ + public String computeTableName(String[] tagValues) { + String s = String.join("!", tagValues); + return "t_" + DigestUtils.md5Hex(s); + } +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java similarity index 84% rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java index 84600802..70ea5737 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java @@ -1,4 +1,4 @@ -package com.alibaba.datax.plugin.writer; +package com.alibaba.datax.plugin.writer.tdenginewriter; import com.alibaba.datax.common.plugin.RecordReceiver; @@ -64,7 +64,13 @@ public class TDengineWriter extends Writer { String value = this.writerSliceConfig.getString(key); properties.setProperty(key, value); } - + if (!keys.contains(Key.USER)) { + properties.setProperty(Key.USER, "root"); + } + if (!keys.contains(Key.PASSWORD)) { + properties.setProperty(Key.PASSWORD, "taosdata"); + } + LOG.debug("========================properties==========================\n" + properties.toString()); String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME); LOG.debug("start to handle record from: " + peerPluginName); DataHandler handler = DataHandlerFactory.build(peerPluginName); diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriterErrorCode.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java similarity index 75% rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriterErrorCode.java rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java index 02e87079..994f1e89 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriterErrorCode.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java @@ -1,9 +1,10 @@ -package com.alibaba.datax.plugin.writer; +package com.alibaba.datax.plugin.writer.tdenginewriter; import com.alibaba.datax.common.spi.ErrorCode; public enum TDengineWriterErrorCode implements ErrorCode { - RUNTIME_EXCEPTION("TDengineWriter-00", "运行时异常"); + RUNTIME_EXCEPTION("TDengineWriter-00", "运行时异常"), + TYPE_ERROR("TDengineWriter-00", "Datax类型无法正确映射到TDengine类型"); private final String code; private final String description; diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/com_alibaba_datax_plugin_writer_JniConnection.h b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/com_alibaba_datax_plugin_writer_tdenginewriter_JniConnection.h similarity index 100% rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/com_alibaba_datax_plugin_writer_JniConnection.h rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/com_alibaba_datax_plugin_writer_tdenginewriter_JniConnection.h diff --git a/tdenginewriter/src/main/resources/plugin.json b/tdenginewriter/src/main/resources/plugin.json index 6c900a15..e54f65ff 100755 --- a/tdenginewriter/src/main/resources/plugin.json +++ b/tdenginewriter/src/main/resources/plugin.json @@ -1,9 +1,9 @@ { "name": "tdenginewriter", - "class": "com.alibaba.datax.plugin.writer.TDengineWriter", + "class": "com.alibaba.datax.plugin.writer.tdenginewriter.TDengineWriter", "description": { "useScene": "data migration to tdengine", - "mechanism": "use JNI to write data to tdengine." + "mechanism": "use JNI or taos-jdbc to write data to tdengine." }, "developer": "zyyang-taosdata" } \ No newline at end of file diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/JniConnectionTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnectionTest.java similarity index 90% rename from tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/JniConnectionTest.java rename to tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnectionTest.java index 040cf34c..09c3df26 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/JniConnectionTest.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnectionTest.java @@ -1,4 +1,4 @@ -package com.alibaba.datax.plugin.writer; +package com.alibaba.datax.plugin.writer.tdenginewriter; import org.junit.Test; diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java new file mode 100644 index 00000000..43928db9 --- /dev/null +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java @@ -0,0 +1,21 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +public class TDengineWriterTest { + + + @Test + public void testGetSchema() throws ClassNotFoundException, SQLException { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", "wozai.fun", "6030", "test", "root", "taosdata"); + Connection conn = DriverManager.getConnection(jdbcUrl); + SchemaManager schemaManager = new SchemaManager(); + schemaManager.setStable("test1"); + schemaManager.getFromDB(conn); + } +} From ca1851fb995a3200bd4c186539ff1930b7162e8d Mon Sep 17 00:00:00 2001 From: dingbo Date: Thu, 18 Nov 2021 18:06:10 +0800 Subject: [PATCH 4/9] mongodb2tdengine test and refine --- core/src/main/job/mongodb2tdengine.json | 4 +- tdenginewriter/doc/tdenginewriter.md | 11 +- .../writer/tdenginewriter/DataHandler.java | 3 +- .../tdenginewriter/DefaultDataHandler.java | 17 +-- .../tdenginewriter/JDBCBatchWriter.java | 100 ++++++++++++++++-- .../tdenginewriter/OpentsdbDataHandler.java | 3 +- .../writer/tdenginewriter/SchemaManager.java | 25 ++++- .../writer/tdenginewriter/TDengineWriter.java | 2 +- .../tdenginewriter/TDengineWriterTest.java | 10 ++ 9 files changed, 150 insertions(+), 25 deletions(-) diff --git a/core/src/main/job/mongodb2tdengine.json b/core/src/main/job/mongodb2tdengine.json index 4cfc987e..45e5a640 100644 --- a/core/src/main/job/mongodb2tdengine.json +++ b/core/src/main/job/mongodb2tdengine.json @@ -11,7 +11,7 @@ "name": "mongodbreader", "parameter": { "address": [ - "123.56.104.14:27017" + "127.0.0.1:27017" ], "userName": "admin678", "mechanism": "SCRAM-SHA-1", @@ -50,7 +50,7 @@ "writer": { "name": "tdenginewriter", "parameter": { - "host": "123.56.104.14", + "host": "127.0.0.1", "port": 6030, "dbname": "test", "user": "root", diff --git a/tdenginewriter/doc/tdenginewriter.md b/tdenginewriter/doc/tdenginewriter.md index c9c222a2..9ab64a2d 100644 --- a/tdenginewriter/doc/tdenginewriter.md +++ b/tdenginewriter/doc/tdenginewriter.md @@ -290,6 +290,9 @@ TAGS( ## 5 约束限制 +1. 本插件自动创建超级表时NCHAR类型的长度固定为64,对于包含长度大于64的字符串的数据源,将不支持。 +2. 标签列不能包含null值,如果包含会被过滤掉。 + ## FAQ ### 如何选取要同步的数据的范围? @@ -300,10 +303,14 @@ TAGS( 如果Reader插件支持一次读多张表,Writer插件就能一次导入多张表。如果Reader不支持多多张表,可以建多个job,分别导入。Writer插件只负责写数据。 -### 1张源表导入之后对应TDengine中多少张表? +### 一张源表导入之后对应TDengine中多少张表? 这是又tagColumn决定的,如果所有tag列的值都相同,目标表也只有一个。源表有多少不同的tag组合,目标超表就会有多少子表。 ### 源表和目标表的字段顺序一致吗? -TDengine要求每个表第一列是时间戳列,后边是普通字段,最后是标签列。如果源表不是这个顺序,插件在自动建表是自动调整。 \ No newline at end of file +TDengine要求每个表第一列是时间戳列,后边是普通字段,最后是标签列。如果源表不是这个顺序,插件在自动建表是自动调整。 + +### 插件如何确定各列的数据类型? + +抽样收到的第一批数据自动推断各列的类型。schema是从数据来的,因此要保障“好的”数据占大多数。 \ No newline at end of file diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java index 686ac27b..421c2fe4 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java @@ -1,10 +1,11 @@ package com.alibaba.datax.plugin.writer.tdenginewriter; import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; import java.util.Properties; public interface DataHandler { - long handle(RecordReceiver lineReceiver, Properties properties); + long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector); } diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java index 733f49c5..9250910a 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java @@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.writer.tdenginewriter; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.taosdata.jdbc.TSDBPreparedStatement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,7 +17,6 @@ import java.util.Properties; */ public class DefaultDataHandler implements DataHandler { private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class); - static { try { Class.forName("com.taosdata.jdbc.TSDBDriver"); @@ -26,7 +26,7 @@ public class DefaultDataHandler implements DataHandler { } @Override - public long handle(RecordReceiver lineReceiver, Properties properties) { + public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) { SchemaManager schemaManager = new SchemaManager(properties); if (!schemaManager.configValid()) { return 0; @@ -47,7 +47,11 @@ public class DefaultDataHandler implements DataHandler { } int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, "1000")); - return write(lineReceiver, conn, batchSize, schemaManager); + if (batchSize < 5) { + LOG.error("batchSize太小,会增加自动类型推断错误的概率,建议改大后重试"); + return 0; + } + return write(lineReceiver, conn, batchSize, schemaManager, collector); } catch (Exception e) { LOG.error("write failed " + e.getMessage()); e.printStackTrace(); @@ -79,18 +83,15 @@ public class DefaultDataHandler implements DataHandler { * @return 成功写入记录数 * @throws SQLException */ - private long write(RecordReceiver lineReceiver, Connection conn, int batchSize, SchemaManager scm) throws SQLException { + private long write(RecordReceiver lineReceiver, Connection conn, int batchSize, SchemaManager scm, TaskPluginCollector collector) throws SQLException { Record record = lineReceiver.getFromReader(); if (record == null) { return 0; } - if (scm.shouldCreateTable()) { - scm.createSTable(conn, record); - } String pq = String.format("INSERT INTO ? USING %s TAGS(%s) (%s) values (%s)", scm.getStable(), scm.getTagValuesPlaceHolder(), scm.getJoinedFieldNames(), scm.getFieldValuesPlaceHolder()); LOG.info("Prepared SQL: {}", pq); try (TSDBPreparedStatement stmt = (TSDBPreparedStatement) conn.prepareStatement(pq)) { - JDBCBatchWriter batchWriter = new JDBCBatchWriter(stmt, scm, batchSize); + JDBCBatchWriter batchWriter = new JDBCBatchWriter(conn, stmt, scm, batchSize, collector); do { batchWriter.append(record); } while ((record = lineReceiver.getFromReader()) != null); diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java index 17023d03..21974e93 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java @@ -3,10 +3,13 @@ package com.alibaba.datax.plugin.writer.tdenginewriter; import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.AbstractTaskPlugin; +import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.taosdata.jdbc.TSDBPreparedStatement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; @@ -26,10 +29,12 @@ import java.util.stream.Collectors; */ public class JDBCBatchWriter { public static final Logger LOG = LoggerFactory.getLogger(JDBCBatchWriter.class); - private TSDBPreparedStatement stmt; private SchemaManager scm; + private Connection conn; private int batchSize; + private TaskPluginCollector collector; + // 缓存Record, key为tableName Map> buf = new HashMap<>(); // 缓存表的标签值, key为tableName @@ -37,25 +42,57 @@ public class JDBCBatchWriter { private long sucCount = 0; private final int tsColIndex; private List fieldList; + // 每个record至少应该包含的列数,用于检测数据 + private int minColNum = 0; private Map fieldIndexMap; + private List fieldTypes = null; - public JDBCBatchWriter(TSDBPreparedStatement stmt, SchemaManager scm, int batchSize) { + public JDBCBatchWriter(Connection conn, TSDBPreparedStatement stmt, SchemaManager scm, int batchSize, TaskPluginCollector collector) { + this.conn = conn; this.stmt = stmt; this.scm = scm; this.batchSize = batchSize; + this.collector = collector; this.tsColIndex = scm.getTsColIndex(); this.fieldList = scm.getFieldList(); this.fieldIndexMap = scm.getFieldIndexMap(); + this.minColNum = 1 + fieldList.size() + scm.getDynamicTagCount(); + } + public void initFiledTypesAndTargetTable(List records) throws SQLException { + if (fieldTypes != null) { + return; + } + guessFieldTypes(records); + if (scm.shouldCreateTable()) { + scm.createSTable(conn, fieldTypes); + } + } public void append(Record record) throws SQLException { + int columnNum = record.getColumnNumber(); + if (columnNum < minColNum) { + collector.collectDirtyRecord(record, "实际列数小于期望列数"); + return; + } String[] tagValues = scm.getTagValuesFromRecord(record); + if (tagValues == null) { + collector.collectDirtyRecord(record, "标签列包含null"); + return; + } + if (!scm.hasTimestamp(record)) { + collector.collectDirtyRecord(record, "时间戳列为null或类型错误"); + return; + } String tableName = scm.computeTableName(tagValues); if (buf.containsKey(tableName)) { List lis = buf.get(tableName); lis.add(record); if (lis.size() == batchSize) { + if (fieldTypes == null) { + initFiledTypesAndTargetTable(lis); + } executeBatch(tableName); lis.clear(); } @@ -67,6 +104,49 @@ public class JDBCBatchWriter { } } + /** + * 只有String类型比较特别,测试发现值为null的列会转成String类型。所以Column的类型为String并不代表这一列的类型真的是String。 + * + * @param records + */ + private void guessFieldTypes(List records) { + fieldTypes = new ArrayList<>(fieldList.size()); + for (int i = 0; i < fieldList.size(); ++i) { + int colIndex = fieldIndexMap.get(fieldList.get(i)); + boolean ok = false; + for (int j = 0; j < records.size() && !ok; ++j) { + Column column = records.get(j).getColumn(colIndex); + Column.Type type = column.getType(); + switch (type) { + case LONG: + case DOUBLE: + case DATE: + case BOOL: + case BYTES: + if (column.getRawData() != null) { + fieldTypes.add(type); + ok = true; + } + break; + case STRING: + // 只有非null且非空的String列,才会被真的当作String类型。 + String value = column.asString(); + if (value != null && !"".equals(value)) { + fieldTypes.add(type); + ok = true; + } + break; + default: + throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString()); + } + } + if (!ok) { + throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, String.format("根据采样的%d条数据,无法推断第%d列的数据类型", records.size(), i + 1)); + } + } + LOG.info("Field Types: {}", fieldTypes); + } + /** * 执行单表批量写入 * @@ -87,12 +167,10 @@ public class JDBCBatchWriter { ArrayList tsList = records.stream().map(r -> r.getColumn(tsColIndex).asDate().getTime()).collect(Collectors.toCollection(ArrayList::new)); stmt.setTimestamp(0, tsList); // 字段 - Record record = records.get(0); for (int i = 0; i < fieldList.size(); ) { String fieldName = fieldList.get(i); int index = fieldIndexMap.get(fieldName); - Column column = record.getColumn(index); - switch (column.getType()) { + switch (fieldTypes.get(i)) { case LONG: ArrayList lisLong = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new)); stmt.setLong(++i, lisLong); @@ -118,7 +196,7 @@ public class JDBCBatchWriter { stmt.setString(++i, lisBytes, 64); break; default: - throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, column.getType().toString()); + throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString()); } } // 执行 @@ -132,6 +210,16 @@ public class JDBCBatchWriter { * 把缓存的Record全部写入 */ public void flush() throws SQLException { + if (fieldTypes == null) { + List records = new ArrayList<>(); + for (List lis : buf.values()) { + records.addAll(lis); + if (records.size() > 100) { + break; + } + } + initFiledTypesAndTargetTable(records); + } for (String tabName : buf.keySet()) { if (buf.get(tabName).size() > 0) { executeBatch(tabName); diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java index 52f1aa7a..e1b8f5dd 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java @@ -4,6 +4,7 @@ import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,7 +15,7 @@ public class OpentsdbDataHandler implements DataHandler { private static final String DEFAULT_BATCH_SIZE = "1"; @Override - public long handle(RecordReceiver lineReceiver, Properties properties) { + public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) { // opentsdb json protocol use JNI and schemaless API to write String host = properties.getProperty(Key.HOST); int port = Integer.parseInt(properties.getProperty(Key.PORT)); diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java index b3d7b7e3..21b8ef01 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java @@ -176,14 +176,15 @@ public class SchemaManager { return stables; } - public void createSTable(Connection conn, Record record) throws SQLException { + public void createSTable(Connection conn, List fieldTypes) throws SQLException { StringBuilder sb = new StringBuilder(); sb.append("CREATE STABLE IF NOT EXISTS ").append(stable).append("("); sb.append(tsColName).append(" ").append("TIMESTAMP,"); - for (String fieldName : fieldList) { + for (int i = 0; i < fieldList.size(); ++i) { + String fieldName = fieldList.get(i); + Column.Type dxType = fieldTypes.get(i); sb.append(fieldName).append(' '); - Column col = record.getColumn(fieldIndexMap.get(fieldName)); - String tdType = mapDataxType(col.getType()); + String tdType = mapDataxType(dxType); sb.append(tdType).append(','); } sb.deleteCharAt(sb.length() - 1); @@ -209,10 +210,22 @@ public class SchemaManager { int tagIndex = tagIndexMap.get(tagList.get(i)); tagValues[i] = record.getColumn(tagIndex).asString(); } + if (tagValues[i] == null) { + return null; + } } return tagValues; } + public boolean hasTimestamp(Record record) { + Column column = record.getColumn(tsColIndex); + if (column.getType() == Column.Type.DATE && column.asDate() != null) { + return true; + } else { + return false; + } + } + public Map getFieldIndexMap() { return fieldIndexMap; } @@ -252,4 +265,8 @@ public class SchemaManager { String s = String.join("!", tagValues); return "t_" + DigestUtils.md5Hex(s); } + + public int getDynamicTagCount() { + return tagIndexMap.size(); + } } diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java index 70ea5737..cd223792 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java @@ -74,7 +74,7 @@ public class TDengineWriter extends Writer { String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME); LOG.debug("start to handle record from: " + peerPluginName); DataHandler handler = DataHandlerFactory.build(peerPluginName); - long records = handler.handle(lineReceiver, properties); + long records = handler.handle(lineReceiver, properties, getTaskPluginCollector()); LOG.debug("handle data finished, records: " + records); } diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java index 43928db9..62bf7040 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java @@ -5,6 +5,7 @@ import org.junit.Test; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.sql.Statement; public class TDengineWriterTest { @@ -18,4 +19,13 @@ public class TDengineWriterTest { schemaManager.setStable("test1"); schemaManager.getFromDB(conn); } + + @Test + public void dropTestTable() throws ClassNotFoundException, SQLException { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", "wozai.fun", "6030", "test", "root", "taosdata"); + Connection conn = DriverManager.getConnection(jdbcUrl); + Statement stmt = conn.createStatement(); + stmt.execute("drop table market_snapshot"); + } } From 75d4f7e101bc17bf0427acc7efcdafb4648642ff Mon Sep 17 00:00:00 2001 From: dingbo Date: Thu, 18 Nov 2021 19:08:19 +0800 Subject: [PATCH 5/9] mongodb2tdengine check records count before flush --- .../datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java index 21974e93..3b1f860b 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java @@ -218,7 +218,11 @@ public class JDBCBatchWriter { break; } } - initFiledTypesAndTargetTable(records); + if (records.size() > 0) { + initFiledTypesAndTargetTable(records); + } else { + return; + } } for (String tabName : buf.keySet()) { if (buf.get(tabName).size() > 0) { From 485d2d881593aaadc0ad39c95aa215e88d2f38db Mon Sep 17 00:00:00 2001 From: dingbo Date: Thu, 18 Nov 2021 20:18:40 +0800 Subject: [PATCH 6/9] mongodb2tdengine typo --- tdenginewriter/doc/tdenginewriter.md | 2 +- .../datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java | 1 - .../datax/plugin/writer/tdenginewriter/SchemaManager.java | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/tdenginewriter/doc/tdenginewriter.md b/tdenginewriter/doc/tdenginewriter.md index 9ab64a2d..715080c1 100644 --- a/tdenginewriter/doc/tdenginewriter.md +++ b/tdenginewriter/doc/tdenginewriter.md @@ -197,7 +197,7 @@ CREATE STABLE IF NOT EXISTS market_snapshot ( ) TAGS( industry NCHAR(64), - stockID NCHAR(64 + stockID NCHAR(64) ); ``` diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java index 3b1f860b..279e6ed3 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java @@ -3,7 +3,6 @@ package com.alibaba.datax.plugin.writer.tdenginewriter; import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.exception.DataXException; -import com.alibaba.datax.common.plugin.AbstractTaskPlugin; import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.taosdata.jdbc.TSDBPreparedStatement; import org.slf4j.Logger; diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java index 21b8ef01..22c8a44f 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java @@ -48,7 +48,7 @@ public class SchemaManager { case BOOL: return "BOOL"; case BYTES: - return "BINARY"; + return "BINARY(64)"; default: throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, type.toString()); } From be78295e116f6b2c98e65bb76c3d5cf2b71f2578 Mon Sep 17 00:00:00 2001 From: dingbo Date: Thu, 18 Nov 2021 22:57:53 +0800 Subject: [PATCH 7/9] mongodb2tdengine typo --- tdenginewriter/doc/tdenginewriter.md | 4 ++-- .../datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tdenginewriter/doc/tdenginewriter.md b/tdenginewriter/doc/tdenginewriter.md index 715080c1..432b1fb2 100644 --- a/tdenginewriter/doc/tdenginewriter.md +++ b/tdenginewriter/doc/tdenginewriter.md @@ -305,7 +305,7 @@ TAGS( ### 一张源表导入之后对应TDengine中多少张表? -这是又tagColumn决定的,如果所有tag列的值都相同,目标表也只有一个。源表有多少不同的tag组合,目标超表就会有多少子表。 +这是由tagColumn决定的,如果所有tag列的值都相同,那么目标表只有一个。源表有多少不同的tag组合,目标超表就有多少子表。 ### 源表和目标表的字段顺序一致吗? @@ -313,4 +313,4 @@ TDengine要求每个表第一列是时间戳列,后边是普通字段,最后 ### 插件如何确定各列的数据类型? -抽样收到的第一批数据自动推断各列的类型。schema是从数据来的,因此要保障“好的”数据占大多数。 \ No newline at end of file +根据收到的第一批数据自动推断各列的类型。 \ No newline at end of file diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java index 279e6ed3..20065a70 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java @@ -41,7 +41,7 @@ public class JDBCBatchWriter { private long sucCount = 0; private final int tsColIndex; private List fieldList; - // 每个record至少应该包含的列数,用于检测数据 + // 每个record至少应该包含的列数,用于校验数据 private int minColNum = 0; private Map fieldIndexMap; private List fieldTypes = null; From e5c3fed1a939e8006911a423f7559ff5b7428a1c Mon Sep 17 00:00:00 2001 From: dingbo Date: Fri, 19 Nov 2021 14:20:37 +0800 Subject: [PATCH 8/9] i18n support for some import log messages --- .../tdenginewriter/DefaultDataHandler.java | 7 ++++-- .../tdenginewriter/JDBCBatchWriter.java | 12 ++++++--- .../plugin/writer/tdenginewriter/Msg.java | 20 +++++++++++++++ .../writer/tdenginewriter/SchemaManager.java | 13 +++++----- .../resources/tdenginewritermsg.properties | 6 +++++ .../tdenginewritermsg_en_US.properties | 6 +++++ .../tdenginewritermsg_zh_CN.properties | 6 +++++ .../writer/tdenginewriter/MessageTest.java | 25 +++++++++++++++++++ 8 files changed, 82 insertions(+), 13 deletions(-) create mode 100644 tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Msg.java create mode 100644 tdenginewriter/src/main/resources/tdenginewritermsg.properties create mode 100644 tdenginewriter/src/main/resources/tdenginewritermsg_en_US.properties create mode 100644 tdenginewriter/src/main/resources/tdenginewritermsg_zh_CN.properties create mode 100644 tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/MessageTest.java diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java index 9250910a..91c2b7e3 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java @@ -17,6 +17,7 @@ import java.util.Properties; */ public class DefaultDataHandler implements DataHandler { private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class); + static { try { Class.forName("com.taosdata.jdbc.TSDBDriver"); @@ -38,7 +39,8 @@ public class DefaultDataHandler implements DataHandler { return 0; } if (schemaManager.shouldGuessSchema()) { - LOG.info("无法从配置文件获取表结构信息,尝试从数据库获取"); + // 无法从配置文件获取表结构信息,尝试从数据库获取 + LOG.info(Msg.get("try_get_schema_from_db")); boolean success = schemaManager.getFromDB(conn); if (!success) { return 0; @@ -48,7 +50,8 @@ public class DefaultDataHandler implements DataHandler { } int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, "1000")); if (batchSize < 5) { - LOG.error("batchSize太小,会增加自动类型推断错误的概率,建议改大后重试"); + // batchSize太小,会增加自动类型推断错误的概率,建议改大后重试 + LOG.error(Msg.get("batch_size_too_small")); return 0; } return write(lineReceiver, conn, batchSize, schemaManager, collector); diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java index 20065a70..53ab9bb9 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java @@ -72,16 +72,19 @@ public class JDBCBatchWriter { public void append(Record record) throws SQLException { int columnNum = record.getColumnNumber(); if (columnNum < minColNum) { - collector.collectDirtyRecord(record, "实际列数小于期望列数"); + // 实际列数小于期望列数 + collector.collectDirtyRecord(record, Msg.get("column_number_error")); return; } String[] tagValues = scm.getTagValuesFromRecord(record); if (tagValues == null) { - collector.collectDirtyRecord(record, "标签列包含null"); + // 标签列包含null + collector.collectDirtyRecord(record, Msg.get("tag_value_error")); return; } if (!scm.hasTimestamp(record)) { - collector.collectDirtyRecord(record, "时间戳列为null或类型错误"); + // 时间戳列为null或类型错误 + collector.collectDirtyRecord(record, Msg.get("ts_value_error")); return; } String tableName = scm.computeTableName(tagValues); @@ -140,7 +143,8 @@ public class JDBCBatchWriter { } } if (!ok) { - throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, String.format("根据采样的%d条数据,无法推断第%d列的数据类型", records.size(), i + 1)); + // 根据采样的%d条数据,无法推断第%d列的数据类型 + throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, String.format(Msg.get("infer_column_type_error"), records.size(), i + 1)); } } LOG.info("Field Types: {}", fieldTypes); diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Msg.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Msg.java new file mode 100644 index 00000000..89730d35 --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Msg.java @@ -0,0 +1,20 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import java.util.Locale; +import java.util.ResourceBundle; + +/** + * i18n message util + */ +public class Msg { + private static ResourceBundle bundle; + + static { + bundle = ResourceBundle.getBundle("tdenginewritermsg", Locale.getDefault()); + } + + public static String get(String key) { + return bundle.getString(key); + } + +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java index 22c8a44f..d67a6585 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java @@ -65,7 +65,7 @@ public class SchemaManager { private void getFromConfig(Properties properties) { stable = properties.getProperty(Key.STABLE); if (stable == null) { - LOG.error("配置错误: no stable"); + LOG.error("Config error: no stable"); return; } for (Object key : properties.keySet()) { @@ -98,7 +98,7 @@ public class SchemaManager { List sortedFieldName = fieldIndexMap.entrySet().stream().sorted((x, y) -> x.getValue().compareTo(y.getValue())).map(e -> e.getKey()).collect(Collectors.toList()); fieldList.addAll(sortedFieldName); // 排序的目的是保证自动建表时列的顺序和输入数据的列的顺序保持一致 canInferSchemaFromConfig = tsColIndex > -1 && !(fixedTagValue.isEmpty() && tagIndexMap.isEmpty()) && !fieldIndexMap.isEmpty(); - LOG.info("配置文件解析结果:fixedTags=[{}] ,tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", fixedTagValue.keySet()), String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex); + LOG.info("Config file parsed result:fixedTags=[{}] ,tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", fixedTagValue.keySet()), String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex); } public boolean shouldGuessSchema() { @@ -112,8 +112,7 @@ public class SchemaManager { public boolean configValid() { boolean valid = (tagList.size() > 0 && fieldList.size() > 0 && tsColIndex > -1) || (tagList.size() == 0 && fieldList.size() == 0 && tsColIndex == -1); if (!valid) { - LOG.error("配置错误. tag_columns,field_columns,timestamp_column必须同时存在或同时省略,当前解析结果: tag_columns: {}, field_columns:{}, timestamp_column:{} tsColIndex:{}", - (fixedTagValue.size() + tagIndexMap.size()), fieldIndexMap.size(), tsColName, tsColIndex); + LOG.error("Config error: tagColumn, fieldColumn and timestampColumn must be present together or absent together."); } return valid; } @@ -128,7 +127,7 @@ public class SchemaManager { try { List stables = getSTables(conn); if (!stables.contains(stable)) { - LOG.error("超级表{}不存在,无法从数据库获取表结构信息.", stable); + LOG.error("super table {} not exist, fail to get schema from database.", stable); return false; } } catch (SQLException e) { @@ -155,7 +154,7 @@ public class SchemaManager { } colIndex++; } - LOG.info("从数据库获取的表结构概要:tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex); + LOG.info("table info:tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex); return true; } catch (SQLException e) { LOG.error(e.getMessage()); @@ -195,7 +194,7 @@ public class SchemaManager { sb.deleteCharAt(sb.length() - 1); sb.append(")"); String q = sb.toString(); - LOG.info("自动创建超级表:" + q); + LOG.info("run sql:" + q); try (Statement stmt = conn.createStatement()) { stmt.execute(q); } diff --git a/tdenginewriter/src/main/resources/tdenginewritermsg.properties b/tdenginewriter/src/main/resources/tdenginewritermsg.properties new file mode 100644 index 00000000..4aaa220b --- /dev/null +++ b/tdenginewriter/src/main/resources/tdenginewritermsg.properties @@ -0,0 +1,6 @@ +try_get_schema_fromdb=fail to get structure info of target table from configure file and will try to get it from database +batch_size_too_small='batchSize' is too small, please increase it and try again +column_number_error=number of columns is less than expected +tag_value_error=tag columns include 'null' value +ts_value_error=timestamp column type error or null +infer_column_type_error=fail to infer column type: sample count %d, column index %d \ No newline at end of file diff --git a/tdenginewriter/src/main/resources/tdenginewritermsg_en_US.properties b/tdenginewriter/src/main/resources/tdenginewritermsg_en_US.properties new file mode 100644 index 00000000..4aaa220b --- /dev/null +++ b/tdenginewriter/src/main/resources/tdenginewritermsg_en_US.properties @@ -0,0 +1,6 @@ +try_get_schema_fromdb=fail to get structure info of target table from configure file and will try to get it from database +batch_size_too_small='batchSize' is too small, please increase it and try again +column_number_error=number of columns is less than expected +tag_value_error=tag columns include 'null' value +ts_value_error=timestamp column type error or null +infer_column_type_error=fail to infer column type: sample count %d, column index %d \ No newline at end of file diff --git a/tdenginewriter/src/main/resources/tdenginewritermsg_zh_CN.properties b/tdenginewriter/src/main/resources/tdenginewritermsg_zh_CN.properties new file mode 100644 index 00000000..4b9552fd --- /dev/null +++ b/tdenginewriter/src/main/resources/tdenginewritermsg_zh_CN.properties @@ -0,0 +1,6 @@ +try_get_schema_fromdb=\u65e0\u6cd5\u4ece\u914d\u7f6e\u6587\u4ef6\u83b7\u53d6\u8868\u7ed3\u6784\u4fe1\u606f\uff0c\u5c1d\u8bd5\u4ece\u6570\u636e\u5e93\u83b7\u53d6 +batch_size_too_small=batchSize\u592a\u5c0f\uff0c\u4f1a\u589e\u52a0\u81ea\u52a8\u7c7b\u578b\u63a8\u65ad\u9519\u8bef\u7684\u6982\u7387\uff0c\u5efa\u8bae\u6539\u5927\u540e\u91cd\u8bd5 +column_number_error=\u5b9e\u9645\u5217\u6570\u5c0f\u4e8e\u671f\u671b\u5217\u6570 +tag_value_error=\u6807\u7b7e\u5217\u5305\u542bnull +ts_value_error=\u65f6\u95f4\u6233\u5217\u4e3anull\u6216\u7c7b\u578b\u9519\u8bef +infer_column_type_error=\u6839\u636e\u91c7\u6837\u7684%d\u6761\u6570\u636e\uff0c\u65e0\u6cd5\u63a8\u65ad\u7b2c%d\u5217\u7684\u6570\u636e\u7c7b\u578b diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/MessageTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/MessageTest.java new file mode 100644 index 00000000..b1b7ddd8 --- /dev/null +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/MessageTest.java @@ -0,0 +1,25 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import org.junit.Test; + +import java.util.Locale; +import java.util.ResourceBundle; + +import org.junit.Assert; + +public class MessageTest { + @Test + public void testChineseMessage() { + Locale local = new Locale("zh", "CN"); + ResourceBundle bundle = ResourceBundle.getBundle("tdenginewritermsg", local); + String msg = bundle.getString("try_get_schema_fromdb"); + Assert.assertEquals("无法从配置文件获取表结构信息,尝试从数据库获取", msg); + } + + @Test + public void testDefaultMessage() { + ResourceBundle bundle = ResourceBundle.getBundle("tdenginewritermsg", Locale.getDefault()); + String msg = bundle.getString("try_get_schema_fromdb"); + System.out.println(msg); + } +} From b47cdaf217dafa3ff84b8a289b13fc5e190901b5 Mon Sep 17 00:00:00 2001 From: dingbo Date: Fri, 19 Nov 2021 14:23:24 +0800 Subject: [PATCH 9/9] use plain user name in demo config file --- core/src/main/job/mongodb2tdengine.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/job/mongodb2tdengine.json b/core/src/main/job/mongodb2tdengine.json index 45e5a640..49e04c11 100644 --- a/core/src/main/job/mongodb2tdengine.json +++ b/core/src/main/job/mongodb2tdengine.json @@ -13,9 +13,9 @@ "address": [ "127.0.0.1:27017" ], - "userName": "admin678", + "userName": "mongouser", "mechanism": "SCRAM-SHA-1", - "userPassword": "huwG86123", + "userPassword": "mongopass", "authDb": "admin", "dbName": "test", "collectionName": "cu_market_data",