Merge pull request #1234 from taosdata/merge_to_upstream

Add TDengine writer plugin
This commit is contained in:
jtchen-study 2021-12-16 17:24:32 +08:00 committed by GitHub
commit 6980d89883
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 2134 additions and 0 deletions

View File

@ -127,6 +127,7 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据通过主控的J
* address MongoDB的数据地址信息因为MonogDB可能是个集群则ip端口信息需要以Json数组的形式给出。【必填】
* userNameMongoDB的用户名。【选填】
* userPassword MongoDB的密码。【选填】
* authDb: MongoDB认证数据库【选填】
* collectionName MonogoDB的集合名。【必填】
* columnMongoDB的文档列名。【必填】
* nameColumn的名字。【必填】

View File

@ -189,6 +189,13 @@
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>tdenginewriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>drdswriter/target/datax/</directory>
<includes>

View File

@ -73,6 +73,7 @@
<!-- writer -->
<module>mysqlwriter</module>
<module>tdenginewriter</module>
<module>drdswriter</module>
<module>odpswriter</module>
<module>txtfilewriter</module>

View File

@ -0,0 +1,405 @@
# DataX TDengineWriter
简体中文| [English](./tdenginewriter.md)
## 1 快速介绍
TDengineWriter插件实现了写入数据到TDengine数据库功能。可用于离线同步其它数据库的数据到TDengine。
## 2 实现原理
TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据根据reader的类型解析数据。目前有两种写入方式
1. 对于OpenTSDBReader, TDengineWriter通过JNI方式调用TDengine客户端库文件taos.lib或taos.dll中的方法使用[schemaless的方式](https://www.taosdata.com/cn/documentation/insert#schemaless)写入。
2. 对于其它数据源,会根据配置生成SQL语句, 通过[taos-jdbcdriver](https://www.taosdata.com/cn/documentation/connector/java)批量写入。
这样区分的原因是OpenTSDBReader将opentsdb的数据统一读取为json字符串Writer端接收到的数据只有1列。而其它Reader插件一般会把数据放在不同列。
## 3 功能说明
### 3.1 从OpenTSDB到TDengine
#### 3.1.1 配置样例
```json
{
"job": {
"content": [
{
"reader": {
"name": "opentsdbreader",
"parameter": {
"endpoint": "http://192.168.1.180:4242",
"column": [
"weather_temperature"
],
"beginDateTime": "2021-01-01 00:00:00",
"endDateTime": "2021-01-01 01:00:00"
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"host": "192.168.1.180",
"port": 6030,
"dbName": "test",
"username": "root",
"password": "taosdata"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
```
#### 3.1.2 参数说明
| 参数 | 描述 | 是否必选 | 默认值 |
| --------- | -------------------- | -------- | -------- |
| host | TDengine实例的host | 是 | 无 |
| port | TDengine实例的port | 是 | 无 |
| username | TDengine实例的用户名 | 否 | root |
| password | TDengine实例的密码 | 否 | taosdata |
| dbName | 目的数据库的名称 | 是 | 无 |
| batchSize | 每次批量插入多少记录 | 否 | 1 |
#### 3.1.3 类型转换
目前由于OpenTSDBReader将opentsdb的数据统一读取为json字符串TDengineWriter 在做Opentsdb到TDengine的迁移时按照以下类型进行处理
| OpenTSDB数据类型 | DataX 内部类型 | TDengine 数据类型 |
| ---------------- | -------------- | ----------------- |
| timestamp | Date | timestamp |
| Integervalue | Double | double |
| Floatvalue | Double | double |
| Stringvalue | String | binary |
| Integertag | String | binary |
| Floattag | String | binary |
| Stringtag | String | binary |
### 3.2 从MongoDB到TDengine
#### 3.2.1 配置样例
```json
{
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": [
"127.0.0.1:27017"
],
"userName": "user",
"mechanism": "SCRAM-SHA-1",
"userPassword": "password",
"authDb": "admin",
"dbName": "test",
"collectionName": "stock",
"column": [
{
"name": "stockID",
"type": "string"
},
{
"name": "tradeTime",
"type": "date"
},
{
"name": "lastPrice",
"type": "double"
},
{
"name": "askPrice1",
"type": "double"
},
{
"name": "bidPrice1",
"type": "double"
},
{
"name": "volume",
"type": "int"
}
]
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"host": "localhost",
"port": 6030,
"dbName": "test",
"username": "root",
"password": "taosdata",
"stable": "stock",
"tagColumn": {
"industry": "energy",
"stockID": 0
},
"fieldColumn": {
"lastPrice": 2,
"askPrice1": 3,
"bidPrice1": 4,
"volume": 5
},
"timestampColumn": {
"tradeTime": 1
}
}
}
}
]
}
}
```
**注本配置的writer部分同样适用于关系型数据库**
#### 3.2.2 参数说明
| 参数 | 描述 | 是否必选 | 默认值 | 备注 |
| --------------- | -------------------- | ---------------- | -------- | ------------------ |
| host | TDengine实例的host | 是 | 无 |
| port | TDengine实例的port | 是 | 无 |
| username | TDengine实例的用户名 | 否 | root |
| password | TDengine实例的密码 | 否 | taosdata |
| dbName | 目的数据库的名称 | 是 | 无 |
| batchSize | 每次批量插入多少记录 | 否 | 1000 |
| stable | 目标超级表的名称 | 是(OpenTSDB除外) | 无 |
| tagColumn | 格式:{tagName1: tagInd1, tagName2: tagInd2}, 标签列在写插件收到的Record中的位置和列名 | 否 | 无 | 位置索引均从0开始, tagInd如果为字符串, 表示固定标签值,不需要从源数据中获取 |
| fieldColumn | 格式:{fdName1: fdInd1, fdName2: fdInd2}, 字段列在写插件收到的Record中的位置和列名 | 否 | 无 | |
| timestampColumn | 格式:{tsColName: tsColIndex}, 时间戳列在写插件收到的Record中的位置和列名 | 否 | 无 | 时间戳列只能有一个 |
示例配置中tagColumn有一个industry它的值是一个固定的字符串“energy”, 作用是给导入的所有数据加一个值为"energy"的固定标签industry。这个应用场景可以是在源库中有多个设备采集的数据分表存储设备名就是表名可以用这个机制把设备名称转化为标签。
#### 3.2.3 自动建表规则
##### 3.2.3.1 超级表创建规则
如果配置了tagColumn、 fieldColumn和timestampColumn将会在插入第一条数据前自动创建超级表。<br>
数据列的类型从第1条记录自动推断, 标签列默认类型为`NCHAR(64)`, 比如示例配置,可能生成以下建表语句:
```sql
CREATE STABLE IF NOT EXISTS market_snapshot (
tadetime TIMESTAMP,
lastprice DOUBLE,
askprice1 DOUBLE,
bidprice1 DOUBLE,
volume INT
)
TAGS(
industry NCHAR(64),
stockID NCHAR(64)
);
```
##### 3.2.3.2 子表创建规则
子表结构与超级表相同,子表表名生成规则:
1. 将标签的value 组合成为如下的字符串: `tag_value1!tag_value2!tag_value3`
2. 计算该字符串的 MD5 散列值 "md5_val"。
3. "t_md5val"作为子表名。其中的 "t" 是固定的前缀。
#### 3.2.4 用户提前建表
如果你已经创建好目标超级表那么tagColumn、 fieldColumn和timestampColumn三个字段均可省略, 插件将通过执行通过`describe stableName`获取表结构的信息。
此时要求接收到的Record中Column的顺序和执行`describe stableName`返回的列顺序相同, 比如通过`describe stableName`返回以下内容:
```
Field | Type | Length | Note |
=================================================================================
ts | TIMESTAMP | 8 | |
current | DOUBLE | 8 | |
location | BINARY | 10 | TAG |
```
那么插件收到的数据第1列必须代表时间戳第2列必须代表电流第3列必须代表位置。
#### 3.2.5 注意事项
1. tagColumn、 fieldColumn和timestampColumn三个字段用于描述目标表的结构信息这三个配置字段必须同时存在或同时省略。
2. 如果存在以上三个配置,且目标表也已经存在,则两者必须一致。**一致性**由用户自己保证,插件不做检查。不一致可能会导致插入失败或插入数据错乱。
#### 3.2.6 类型转换
| DataX 内部类型 | TDengine 数据类型 |
|-------------- | ----------------- |
|Long | BIGINT |
|Double | DOUBLE |
|String | NCHAR(64) |
|Date | TIMESTAMP |
|Boolean | BOOL |
|Bytes | BINARY(64) |
### 3.3 从关系型数据库到TDengine
writer部分的配置规则和上述MongoDB的示例是一样的这里给出一个MySQL的示例。
#### 3.3.1 MySQL中表结构
```sql
CREATE TABLE IF NOT EXISTS weather(
station varchar(100),
latitude DOUBLE,
longtitude DOUBLE,
`date` DATE,
TMAX int,
TMIN int
)
```
#### 3.3.2 配置文件示例
```json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "passw0rd",
"column": [
"*"
],
"splitPk": "station",
"connection": [
{
"table": [
"weather"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8"
]
}
]
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"host": "127.0.0.1",
"port": 6030,
"dbName": "test",
"username": "root",
"password": "taosdata",
"batchSize": 1000,
"stable": "weather",
"tagColumn": {
"station": 0
},
"fieldColumn": {
"latitude": 1,
"longtitude": 2,
"tmax": 4,
"tmin": 5
},
"timestampColumn":{
"date": 3
}
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
```
## 4 性能报告
### 4.1 环境准备
#### 4.1.1 数据特征
建表语句:
单行记录类似于:
#### 4.1.2 机器参数
* 执行DataX的机器参数为:
1. cpu:
2. mem:
3. net: 千兆双网卡
4. disc: DataX 数据不落磁盘,不统计此项
* TDengine数据库机器参数为:
1. cpu:
2. mem:
3. net: 千兆双网卡
4. disc:
#### 4.1.3 DataX jvm 参数
-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
### 4.2 测试报告
#### 4.2.1 单表测试报告
| 通道数 | DataX速度(Rec/s) | DataX流量(MB/s) | DataX机器网卡流出流量(MB/s) | DataX机器运行负载 | DB网卡进入流量(MB/s) | DB运行负载 | DB TPS |
| ------ | ---------------- | --------------- | --------------------------- | ----------------- | -------------------- | ---------- | ------ |
| 1 | | | | | | | |
| 4 | | | | | | | |
| 8 | | | | | | | |
| 16 | | | | | | | |
| 32 | | | | | | | |
说明:
1. 这里的单表,主键类型为 bigint(20),自增。
2. batchSize 和 通道个数,对性能影响较大。
3. 16通道4096批量提交时出现 full gc 2次。
#### 4.2.4 性能测试小结
## 5 约束限制
1. 本插件自动创建超级表时NCHAR类型的长度固定为64对于包含长度大于64的字符串的数据源将不支持。
2. 标签列不能包含null值如果包含会被过滤掉。
## FAQ
### 如何选取要同步的数据的范围?
数据范围的选取在Reader插件端配置对于不同的Reader插件配置方法往往不同。比如对于mysqlreader 可以用sql语句指定数据范围。对于opentsdbreader, 用beginDateTime和endDateTime两个配置项指定数据范围。
### 如何一次导入多张源表?
如果Reader插件支持一次读多张表Writer插件就能一次导入多张表。如果Reader不支持多多张表可以建多个job分别导入。Writer插件只负责写数据。
### 一张源表导入之后对应TDengine中多少张表
这是由tagColumn决定的如果所有tag列的值都相同那么目标表只有一个。源表有多少不同的tag组合目标超级表就有多少子表。
### 源表和目标表的字段顺序一致吗?
TDengine要求每个表第一列是时间戳列后边是普通字段最后是标签列。如果源表不是这个顺序插件在自动建表时会自动调整。
### 插件如何确定各列的数据类型?
根据收到的第一批数据自动推断各列的类型。
### 为什么插入10年前的数据会抛异常`TDengine ERROR (2350): failed to execute batch bind` ?
因为创建数据库的时候默认保留10年的数据。可以手动指定要保留多长时间的数据比如:`CREATE DATABASE power KEEP 36500;`。
### 如果编译的时候某些插件的依赖找不到怎么办?
如果这个插件不是必须的可以注释掉根目录下的pom.xml中的对应插件。

View File

@ -0,0 +1,356 @@
# DataX TDengineWriter
[简体中文](./tdenginewriter-CN.md) | English
## 1 Quick Introduction
TDengineWriter Plugin writes data to [TDengine](https://www.taosdata.com/en/). It can be used to offline synchronize data from other databases to TDengine.
## 2 Implementation
TDengineWriter get records from DataX Framework that are generated from reader side. It has two whiting strategies:
1. For data from OpenTSDBReader which is in json format, to leverage the new feature of TDengine Server that support writing json data directly called [schemaless writing](https://www.taosdata.com/cn/documentation/insert#schemaless), we use JNI to call functions in `taos.lib` or `taos.dll`.(Since the feature was not included in taos-jdbcdrive until version 2.0.36).
2. For other data sources, we use [taos-jdbcdriver](https://www.taosdata.com/cn/documentation/connector/java) to write data. If the target table is not exists beforehand, then it will be created automatically according to your configuration.
## 3 Features Introduction
### 3.1 From OpenTSDB to TDengine
#### 3.1.1 Sample Setting
```json
{
"job": {
"content": [
{
"reader": {
"name": "opentsdbreader",
"parameter": {
"endpoint": "http://192.168.1.180:4242",
"column": [
"weather_temperature"
],
"beginDateTime": "2021-01-01 00:00:00",
"endDateTime": "2021-01-01 01:00:00"
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"host": "192.168.1.180",
"port": 6030,
"dbName": "test",
"username": "root",
"password": "taosdata"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
```
#### 3.1.2 Configuration
| Parameter | Description | Required | Default |
| --------- | ------------------------------ | -------- | -------- |
| host | host of TDengine | Yes | |
| port | port of TDengine | Yes | |
| username | use name of TDengine | No | root |
| password | password of TDengine | No | taosdata |
| dbName | name of target database | No | |
| batchSize | batch size of insert operation | No | 1 |
#### 3.1.3 Type Convert
| OpenTSDB Type | DataX Type | TDengine Type |
| ---------------- | ---------- | ------------- |
| timestamp | Date | timestamp |
| Integervalue | Double | double |
| Floatvalue | Double | double |
| Stringvalue | String | binary |
| Integertag | String | binary |
| Floattag | String | binary |
| Stringtag | String | binary |
### 3.2 From MongoDB to TDengine
#### 3.2.1 Sample Setting
```json
{
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": [
"127.0.0.1:27017"
],
"userName": "user",
"mechanism": "SCRAM-SHA-1",
"userPassword": "password",
"authDb": "admin",
"dbName": "test",
"collectionName": "stock",
"column": [
{
"name": "stockID",
"type": "string"
},
{
"name": "tradeTime",
"type": "date"
},
{
"name": "lastPrice",
"type": "double"
},
{
"name": "askPrice1",
"type": "double"
},
{
"name": "bidPrice1",
"type": "double"
},
{
"name": "volume",
"type": "int"
}
]
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"host": "localhost",
"port": 6030,
"dbName": "test",
"username": "root",
"password": "taosdata",
"stable": "stock",
"tagColumn": {
"industry": "energy",
"stockID": 0
},
"fieldColumn": {
"lastPrice": 2,
"askPrice1": 3,
"bidPrice1": 4,
"volume": 5
},
"timestampColumn": {
"tradeTime": 1
}
}
}
}
]
}
}
```
**Notethe writer part of this setting can also apply to other data source except for OpenTSDB **
#### 3.2.2 Configuration
| Parameter | Description | Required | Default | Remark |
| --------------- | --------------------------------------------------------------- | ------------------------ | -------- | ------------------- |
| host | host ofTDengine | Yes | |
| port | port of TDengine | Yes | |
| username | username of TDengine | No | root |
| password | password of TDengine | No | taosdata |
| dbName | name of target database | Yes | |
| batchSize | batch size of insert operation | No | 1000 |
| stable | name of target super table | Yes(except for OpenTSDB) | |
| tagColumn | name and position of tag columns in the record from reader, format:{tagName1: tagInd1, tagName2: tagInd2} | No | | index starts with 0 |
| fieldColumn | name and position of data columns in the record from reader, format: {fdName1: fdInd1, fdName2: fdInd2} | No | | |
| timestampColumn | name and position of timestamp column in the record from reader | No | | |
**Note**: You see that the value of tagColumn "industry" is a fixed string, this ia a good feature of this plugin. Think about this scenario: you have many tables with the structure and one table corresponds to one device. You want to use the device number as a tag in the target super table, then this feature is designed for you.
#### 3.2.3 Auto table creating
##### 3.2.3.1 Rules
If all of `tagColumn`, `fieldColumn` and `timestampColumn` are offered in writer configuration, then target super table will be created automatically.
The type of tag columns will always be `NCHAR(64)`. The sample setting above will produce following sql:
```sql
CREATE STABLE IF NOT EXISTS market_snapshot (
tadetime TIMESTAMP,
lastprice DOUBLE,
askprice1 DOUBLE,
bidprice1 DOUBLE,
volume INT
)
TAGS(
industry NCHAR(64),
stockID NCHAR(64)
);
```
##### 3.2.3.2 Sub-table Creating Rules
The structure of sub-tables are the same with structure of super table. The names of sub-tables are generated by rules below:
1. combine value of tags like this:`tag_value1!tag_value2!tag_value3`.
2. compute md5 hash hex of above string, named `md5val`
3. use "t_md5val" as sub-table name, in which "t" is fixed prefix.
#### 3.2.4 Use Pre-created Table
If you have created super table firstly, then all of tagColumn, fieldColumn and timestampColumn can be omitted. The writer plugin will get table schema by executing `describe stableName`.
The order of columns of records received by this plugin must be the same as the order of columns returned by `describe stableName`. For example, if you have super table as below:
```
Field | Type | Length | Note |
=================================================================================
ts | TIMESTAMP | 8 | |
current | DOUBLE | 8 | |
location | BINARY | 10 | TAG |
```
Then the first columns received by this writer plugin must represent timestamp, the second column must represent current with type double, the third column must represent location with internal type string.
#### 3.2.5 Remarks
1. Config keys --tagColumn, fieldColumn and timestampColumn, must be presented or omitted at the same time.
2. If above three config keys exist and the target table also exists, then the order of columns defined by the config file and the existed table must be the same.
#### 3.2.6 Type Convert
|DataX Type | TDengine Type |
|-------------- | ----------------- |
|Long | BIGINT |
|Double | DOUBLE |
|String | NCHAR(64) |
|Date | TIMESTAMP |
|Boolean | BOOL |
|Bytes | BINARY(64) |
### 3.3 From Relational Database to TDengine
Take MySQl as example.
#### 3.3.1 Table Structure in MySQL
```sql
CREATE TABLE IF NOT EXISTS weather(
station varchar(100),
latitude DOUBLE,
longtitude DOUBLE,
`date` DATE,
TMAX int,
TMIN int
)
```
#### 3.3.2 Sample Setting
```json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "passw0rd",
"column": [
"*"
],
"splitPk": "station",
"connection": [
{
"table": [
"weather"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8"
]
}
]
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"host": "127.0.0.1",
"port": 6030,
"dbName": "test",
"username": "root",
"password": "taosdata",
"batchSize": 1000,
"stable": "weather",
"tagColumn": {
"station": 0
},
"fieldColumn": {
"latitude": 1,
"longtitude": 2,
"tmax": 4,
"tmin": 5
},
"timestampColumn":{
"date": 3
}
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
```
## 4 Performance Test
## 5 Restriction
1. NCHAR type has fixed length 64 when auto creating stable.
2. Rows have null tag values will be dropped.
## FAQ
### How to filter on source table
It depends on reader plugin. For different reader plugins, the way may be different.
### How to import multiple source tables at once
It depends on reader plugin. If the reader plugin supports reading multiple tables at once, then there is no problem.
### How many sub-tables will be produced?
The number of sub-tables is determined by tagColumns, equals to the number of different combinations of tag values.
### Do columns in source table and columns in target table must be in the same order?
No. TDengine require the first column has timestamp typewhich is followed by data columns, followed by tag columns. The writer plugin will create super table in this column order, regardless of origin column orders.
### How dose the plugin infer the data type of incoming data?
By the first batch of records it received.
### Why can't I insert data of 10 years ago? Do this will get error: `TDengine ERROR (2350): failed to execute batch bind`.
Because the database you created only keep 10 years data by default, you can create table like this: `CREATE DATABASE power KEEP 36500;`, in order to enlarge the time period to 100 years.
### What should I do if some dependencies of a plugin can't be found?
I this plugin is not necessary for you, just remove it from pom.xml under project's root directory.

107
tdenginewriter/pom.xml Normal file
View File

@ -0,0 +1,107 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.alibaba.datax.tdenginewriter</groupId>
<artifactId>tdenginewriter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.34</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.34</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3-version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12.4</version>
<configuration>
<!-- 包含哪些测试用例 -->
<includes>
<include>**/*Test.java</include>
</includes>
<!-- 不包含哪些测试用例 -->
<excludes>
</excludes>
<testFailureIgnore>true</testFailureIgnore>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,34 @@
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/tdenginewriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>tdenginewriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/tdenginewriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/tdenginewriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,12 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import java.util.Properties;
public interface DataHandler {
long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector);
}

View File

@ -0,0 +1,10 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
public class DataHandlerFactory {
public static DataHandler build(String peerPluginName) {
if (peerPluginName.equals("opentsdbreader"))
return new OpentsdbDataHandler();
return new DefaultDataHandler();
}
}

View File

@ -0,0 +1,108 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBPreparedStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
/**
* 默认DataHandler
*/
public class DefaultDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
static {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
@Override
public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) {
SchemaManager schemaManager = new SchemaManager(properties);
if (!schemaManager.configValid()) {
return 0;
}
try {
Connection conn = getTaosConnection(properties);
if (conn == null) {
return 0;
}
if (schemaManager.shouldGuessSchema()) {
// 无法从配置文件获取表结构信息尝试从数据库获取
LOG.info(Msg.get("try_get_schema_from_db"));
boolean success = schemaManager.getFromDB(conn);
if (!success) {
return 0;
}
} else {
}
int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, "1000"));
if (batchSize < 5) {
// batchSize太小会增加自动类型推断错误的概率建议改大后重试
LOG.error(Msg.get("batch_size_too_small"));
return 0;
}
return write(lineReceiver, conn, batchSize, schemaManager, collector);
} catch (Exception e) {
LOG.error("write failed " + e.getMessage());
e.printStackTrace();
}
return 0;
}
private Connection getTaosConnection(Properties properties) throws SQLException {
// 检查必要参数
String host = properties.getProperty(Key.HOST);
String port = properties.getProperty(Key.PORT);
String dbname = properties.getProperty(Key.DBNAME);
String user = properties.getProperty(Key.USER);
String password = properties.getProperty(Key.PASSWORD);
if (host == null || port == null || dbname == null || user == null || password == null) {
String keys = String.join(" ", Key.HOST, Key.PORT, Key.DBNAME, Key.USER, Key.PASSWORD);
LOG.error("Required options missing, please check: " + keys);
return null;
}
String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", host, port, dbname, user, password);
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
LOG.info("TDengine connection established, host:{} port:{} dbname:{} user:{}", host, port, dbname, user);
return DriverManager.getConnection(jdbcUrl, connProps);
}
/**
* 使用SQL批量写入<br/>
*
* @return 成功写入记录数
* @throws SQLException
*/
private long write(RecordReceiver lineReceiver, Connection conn, int batchSize, SchemaManager scm, TaskPluginCollector collector) throws SQLException {
Record record = lineReceiver.getFromReader();
if (record == null) {
return 0;
}
String pq = String.format("INSERT INTO ? USING %s TAGS(%s) (%s) values (%s)", scm.getStable(), scm.getTagValuesPlaceHolder(), scm.getJoinedFieldNames(), scm.getFieldValuesPlaceHolder());
LOG.info("Prepared SQL: {}", pq);
try (TSDBPreparedStatement stmt = (TSDBPreparedStatement) conn.prepareStatement(pq)) {
JDBCBatchWriter batchWriter = new JDBCBatchWriter(conn, stmt, scm, batchSize, collector);
do {
batchWriter.append(record);
} while ((record = lineReceiver.getFromReader()) != null);
batchWriter.flush();
return batchWriter.getCount();
}
}
}

View File

@ -0,0 +1,244 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.taosdata.jdbc.TSDBPreparedStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 使用JDBC原生写入接口批量写入<br/>
* 有两个限制条件导致批量写入的代码逻辑过于复杂以至于需要开发新的类来封装<br/>
* 1. 用户必须提前把需要批量写入的数据搜集到ArrayList中
* 2. 每批写入的表名必须相同
* 这个类的实现逻辑是
* 1. 先把属于同一子表的Record缓存起来
* 2. 缓存的数量达到batchSize阈值自动执行一次批量写入
* 3. 最后一批数据需要用户手动flush才能写入
*/
public class JDBCBatchWriter {
public static final Logger LOG = LoggerFactory.getLogger(JDBCBatchWriter.class);
private TSDBPreparedStatement stmt;
private SchemaManager scm;
private Connection conn;
private int batchSize;
private TaskPluginCollector collector;
// 缓存Record key为tableName
Map<String, List<Record>> buf = new HashMap<>();
// 缓存表的标签值 key为tableName
Map<String, String[]> tableTagValues = new HashMap<>();
private long sucCount = 0;
private final int tsColIndex;
private List<String> fieldList;
// 每个record至少应该包含的列数用于校验数据
private int minColNum = 0;
private Map<String, Integer> fieldIndexMap;
private List<Column.Type> fieldTypes = null;
public JDBCBatchWriter(Connection conn, TSDBPreparedStatement stmt, SchemaManager scm, int batchSize, TaskPluginCollector collector) {
this.conn = conn;
this.stmt = stmt;
this.scm = scm;
this.batchSize = batchSize;
this.collector = collector;
this.tsColIndex = scm.getTsColIndex();
this.fieldList = scm.getFieldList();
this.fieldIndexMap = scm.getFieldIndexMap();
this.minColNum = 1 + fieldList.size() + scm.getDynamicTagCount();
}
public void initFiledTypesAndTargetTable(List<Record> records) throws SQLException {
if (fieldTypes != null) {
return;
}
guessFieldTypes(records);
if (scm.shouldCreateTable()) {
scm.createSTable(conn, fieldTypes);
}
}
public void append(Record record) throws SQLException {
int columnNum = record.getColumnNumber();
if (columnNum < minColNum) {
// 实际列数小于期望列数
collector.collectDirtyRecord(record, Msg.get("column_number_error"));
return;
}
String[] tagValues = scm.getTagValuesFromRecord(record);
if (tagValues == null) {
// 标签列包含null
collector.collectDirtyRecord(record, Msg.get("tag_value_error"));
return;
}
if (!scm.hasTimestamp(record)) {
// 时间戳列为null或类型错误
collector.collectDirtyRecord(record, Msg.get("ts_value_error"));
return;
}
String tableName = scm.computeTableName(tagValues);
if (buf.containsKey(tableName)) {
List<Record> lis = buf.get(tableName);
lis.add(record);
if (lis.size() == batchSize) {
if (fieldTypes == null) {
initFiledTypesAndTargetTable(lis);
}
executeBatch(tableName);
lis.clear();
}
} else {
List<Record> lis = new ArrayList<>(batchSize);
lis.add(record);
buf.put(tableName, lis);
tableTagValues.put(tableName, tagValues);
}
}
/**
* 只有String类型比较特别测试发现值为null的列会转成String类型所以Column的类型为String并不代表这一列的类型真的是String
*
* @param records
*/
private void guessFieldTypes(List<Record> records) {
fieldTypes = new ArrayList<>(fieldList.size());
for (int i = 0; i < fieldList.size(); ++i) {
int colIndex = fieldIndexMap.get(fieldList.get(i));
boolean ok = false;
for (int j = 0; j < records.size() && !ok; ++j) {
Column column = records.get(j).getColumn(colIndex);
Column.Type type = column.getType();
switch (type) {
case LONG:
case DOUBLE:
case DATE:
case BOOL:
case BYTES:
if (column.getRawData() != null) {
fieldTypes.add(type);
ok = true;
}
break;
case STRING:
// 只有非null且非空的String列才会被真的当作String类型
String value = column.asString();
if (value != null && !"".equals(value)) {
fieldTypes.add(type);
ok = true;
}
break;
default:
throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString());
}
}
if (!ok) {
// 根据采样的%d条数据无法推断第%d列的数据类型
throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, String.format(Msg.get("infer_column_type_error"), records.size(), i + 1));
}
}
LOG.info("Field Types: {}", fieldTypes);
}
/**
* 执行单表批量写入
*
* @param tableName
* @throws SQLException
*/
private void executeBatch(String tableName) throws SQLException {
// 表名
stmt.setTableName(tableName);
List<Record> records = buf.get(tableName);
// 标签
String[] tagValues = tableTagValues.get(tableName);
LOG.debug("executeBatch {}", String.join(",", tagValues));
for (int i = 0; i < tagValues.length; ++i) {
stmt.setTagNString(i, tagValues[i]);
}
// 时间戳
ArrayList<Long> tsList = records.stream().map(r -> r.getColumn(tsColIndex).asDate().getTime()).collect(Collectors.toCollection(ArrayList::new));
stmt.setTimestamp(0, tsList);
// 字段
for (int i = 0; i < fieldList.size(); ) {
String fieldName = fieldList.get(i);
int index = fieldIndexMap.get(fieldName);
switch (fieldTypes.get(i)) {
case LONG:
ArrayList<Long> lisLong = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new));
stmt.setLong(++i, lisLong);
break;
case DOUBLE:
ArrayList<Double> lisDouble = records.stream().map(r -> r.getColumn(index).asDouble()).collect(Collectors.toCollection(ArrayList::new));
stmt.setDouble(++i, lisDouble);
break;
case STRING:
ArrayList<String> lisString = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new));
stmt.setNString(++i, lisString, 64);
break;
case DATE:
ArrayList<Long> lisTs = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new));
stmt.setTimestamp(++i, lisTs);
break;
case BOOL:
ArrayList<Boolean> lisBool = records.stream().map(r -> r.getColumn(index).asBoolean()).collect(Collectors.toCollection(ArrayList::new));
stmt.setBoolean(++i, lisBool);
break;
case BYTES:
ArrayList<String> lisBytes = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new));
stmt.setString(++i, lisBytes, 64);
break;
default:
throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString());
}
}
// 执行
stmt.columnDataAddBatch();
stmt.columnDataExecuteBatch();
// 更新计数器
sucCount += records.size();
}
/**
* 把缓存的Record全部写入
*/
public void flush() throws SQLException {
if (fieldTypes == null) {
List<Record> records = new ArrayList<>();
for (List<Record> lis : buf.values()) {
records.addAll(lis);
if (records.size() > 100) {
break;
}
}
if (records.size() > 0) {
initFiledTypesAndTargetTable(records);
} else {
return;
}
}
for (String tabName : buf.keySet()) {
if (buf.get(tabName).size() > 0) {
executeBatch(tabName);
}
}
stmt.columnDataCloseBatch();
}
/**
* @return 成功写入的数据量
*/
public long getCount() {
return sucCount;
}
}

View File

@ -0,0 +1,89 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import java.util.Properties;
public class JniConnection {
private static final long JNI_NULL_POINTER = 0L;
private static final int JNI_SUCCESSFUL = 0;
public static final String PROPERTY_KEY_CONFIG_DIR = "cfgdir";
public static final String PROPERTY_KEY_LOCALE = "locale";
public static final String PROPERTY_KEY_CHARSET = "charset";
public static final String PROPERTY_KEY_TIME_ZONE = "timezone";
private long conn;
static {
System.loadLibrary("taos");
}
public JniConnection(Properties props) throws Exception {
initImp(props.getProperty(PROPERTY_KEY_CONFIG_DIR, null));
String locale = props.getProperty(PROPERTY_KEY_LOCALE);
if (setOptions(0, locale) < 0) {
throw new Exception("Failed to set locale: " + locale + ". System default will be used.");
}
String charset = props.getProperty(PROPERTY_KEY_CHARSET);
if (setOptions(1, charset) < 0) {
throw new Exception("Failed to set charset: " + charset + ". System default will be used.");
}
String timezone = props.getProperty(PROPERTY_KEY_TIME_ZONE);
if (setOptions(2, timezone) < 0) {
throw new Exception("Failed to set timezone: " + timezone + ". System default will be used.");
}
}
public void open(String host, int port, String dbname, String user, String password) throws Exception {
if (this.conn != JNI_NULL_POINTER) {
close();
this.conn = JNI_NULL_POINTER;
}
this.conn = connectImp(host, port, dbname, user, password);
if (this.conn == JNI_NULL_POINTER) {
String errMsg = getErrMsgImp(0);
throw new Exception(errMsg);
}
}
public void insertOpentsdbJson(String json) throws Exception {
if (this.conn == JNI_NULL_POINTER) {
throw new Exception("JNI connection is NULL");
}
long result = insertOpentsdbJson(json, this.conn);
int errCode = getErrCodeImp(this.conn, result);
if (errCode != JNI_SUCCESSFUL) {
String errMsg = getErrMsgImp(result);
freeResultSetImp(this.conn, result);
throw new Exception(errMsg);
}
freeResultSetImp(this.conn, result);
}
public void close() throws Exception {
int code = this.closeConnectionImp(this.conn);
if (code != 0) {
throw new Exception("JNI closeConnection failed");
}
this.conn = JNI_NULL_POINTER;
}
private static native void initImp(String configDir);
private static native int setOptions(int optionIndex, String optionValue);
private native long connectImp(String host, int port, String dbName, String user, String password);
private native int getErrCodeImp(long connection, long pSql);
private native String getErrMsgImp(long pSql);
private native void freeResultSetImp(long connection, long pSql);
private native int closeConnectionImp(long connection);
private native long insertOpentsdbJson(String json, long pSql);
}

View File

@ -0,0 +1,14 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
public class Key {
public static final String HOST = "host";
public static final String PORT = "port";
public static final String DBNAME = "dbName";
public static final String USER = "username";
public static final String PASSWORD = "password";
public static final String BATCH_SIZE = "batchSize";
public static final String STABLE = "stable";
public static final String TAG_COLUMN = "tagColumn";
public static final String FIELD_COLUMN = "fieldColumn";
public static final String TIMESTAMP_COLUMN = "timestampColumn";
}

View File

@ -0,0 +1,20 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import java.util.Locale;
import java.util.ResourceBundle;
/**
* i18n message util
*/
public class Msg {
private static ResourceBundle bundle;
static {
bundle = ResourceBundle.getBundle("tdenginewritermsg", Locale.getDefault());
}
public static String get(String key) {
return bundle.getString(key);
}
}

View File

@ -0,0 +1,99 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class OpentsdbDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(OpentsdbDataHandler.class);
private static final String DEFAULT_BATCH_SIZE = "1";
@Override
public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) {
// opentsdb json protocol use JNI and schemaless API to write
String host = properties.getProperty(Key.HOST);
int port = Integer.parseInt(properties.getProperty(Key.PORT));
String dbname = properties.getProperty(Key.DBNAME);
String user = properties.getProperty(Key.USER);
String password = properties.getProperty(Key.PASSWORD);
JniConnection conn = null;
long count = 0;
try {
conn = new JniConnection(properties);
conn.open(host, port, dbname, user, password);
LOG.info("TDengine connection established, host: " + host + ", port: " + port + ", dbname: " + dbname + ", user: " + user);
int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, DEFAULT_BATCH_SIZE));
count = writeOpentsdb(lineReceiver, conn, batchSize);
} catch (Exception e) {
LOG.error(e.getMessage());
e.printStackTrace();
} finally {
try {
if (conn != null)
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
LOG.info("TDengine connection closed");
}
return count;
}
private long writeOpentsdb(RecordReceiver lineReceiver, JniConnection conn, int batchSize) {
long recordIndex = 1;
try {
Record record;
StringBuilder sb = new StringBuilder();
while ((record = lineReceiver.getFromReader()) != null) {
if (batchSize == 1) {
String jsonData = recordToString(record);
LOG.debug(">>> " + jsonData);
conn.insertOpentsdbJson(jsonData);
} else if (recordIndex % batchSize == 1) {
sb.append("[").append(recordToString(record)).append(",");
} else if (recordIndex % batchSize == 0) {
sb.append(recordToString(record)).append("]");
String jsonData = sb.toString();
LOG.debug(">>> " + jsonData);
conn.insertOpentsdbJson(jsonData);
sb.delete(0, sb.length());
} else {
sb.append(recordToString(record)).append(",");
}
recordIndex++;
}
if (sb.length() != 0 && sb.charAt(0) == '[') {
String jsonData = sb.deleteCharAt(sb.length() - 1).append("]").toString();
LOG.debug(">>> " + jsonData);
conn.insertOpentsdbJson(jsonData);
}
} catch (Exception e) {
LOG.error("TDengineWriter ERROR: " + e.getMessage());
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e);
}
return recordIndex - 1;
}
private String recordToString(Record record) {
int recordLength = record.getColumnNumber();
if (0 == recordLength) {
return "";
}
Column column;
StringBuilder sb = new StringBuilder();
for (int i = 0; i < recordLength; i++) {
column = record.getColumn(i);
sb.append(column.asString()).append("\t");
}
sb.setLength(sb.length() - 1);
return sb.toString();
}
}

View File

@ -0,0 +1,271 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;
import java.util.stream.Collectors;
public class SchemaManager {
private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class);
private String stable; // 目标超表名
private Map<String, String> fixedTagValue = new HashMap<>(); // 固定标签值 标签名 -> 标签值
private Map<String, Integer> tagIndexMap = new HashMap<>(); // 动态标签值 标签名 -> 列索引
private Map<String, Integer> fieldIndexMap = new HashMap<>(); // 字段名 -> 字段索引
private String tsColName; // 时间戳列名
private int tsColIndex = -1; // 时间戳列索引
private List<String> fieldList = new ArrayList<>();
private List<String> tagList = new ArrayList<>();
private boolean canInferSchemaFromConfig = false;
public SchemaManager() {
}
public SchemaManager(Properties properties) {
getFromConfig(properties);
}
private String mapDataxType(Column.Type type) {
switch (type) {
case LONG:
return "BIGINT";
case DOUBLE:
return "DOUBLE";
case STRING:
return "NCHAR(64)";
case DATE:
return "TIMESTAMP";
case BOOL:
return "BOOL";
case BYTES:
return "BINARY(64)";
default:
throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, type.toString());
}
}
public void setStable(String stable) {
stable = stable;
}
public String getStable() {
return stable;
}
private void getFromConfig(Properties properties) {
stable = properties.getProperty(Key.STABLE);
if (stable == null) {
LOG.error("Config error: no stable");
return;
}
for (Object key : properties.keySet()) {
String k = (String) key;
String v = properties.getProperty(k);
String[] ps = k.split("\\.");
if (ps.length == 1) {
continue;
}
if (k.startsWith(Key.TAG_COLUMN)) {
String tagName = ps[1];
try {
Integer tagIndex = Integer.parseInt(v);
this.tagIndexMap.put(tagName, tagIndex);
tagList.add(tagName);
} catch (NumberFormatException e) {
fixedTagValue.put(tagName, v);
tagList.add(tagName);
}
} else if (k.startsWith(Key.FIELD_COLUMN)) {
String fieldName = ps[1];
Integer fileIndex = Integer.parseInt(v);
fieldIndexMap.put(fieldName, fileIndex);
} else if (k.startsWith(Key.TIMESTAMP_COLUMN)) {
tsColName = ps[1];
tsColIndex = Integer.parseInt(v);
}
}
List<String> sortedFieldName = fieldIndexMap.entrySet().stream().sorted((x, y) -> x.getValue().compareTo(y.getValue())).map(e -> e.getKey()).collect(Collectors.toList());
fieldList.addAll(sortedFieldName); // 排序的目的是保证自动建表时列的顺序和输入数据的列的顺序保持一致
canInferSchemaFromConfig = tsColIndex > -1 && !(fixedTagValue.isEmpty() && tagIndexMap.isEmpty()) && !fieldIndexMap.isEmpty();
LOG.info("Config file parsed resultfixedTags=[{}] ,tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", fixedTagValue.keySet()), String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex);
}
public boolean shouldGuessSchema() {
return !canInferSchemaFromConfig;
}
public boolean shouldCreateTable() {
return canInferSchemaFromConfig;
}
public boolean configValid() {
boolean valid = (tagList.size() > 0 && fieldList.size() > 0 && tsColIndex > -1) || (tagList.size() == 0 && fieldList.size() == 0 && tsColIndex == -1);
if (!valid) {
LOG.error("Config error: tagColumn, fieldColumn and timestampColumn must be present together or absent together.");
}
return valid;
}
/**
* 通过执行`describe dbname.stable`命令获取表的schema.<br/>
* describe命名返回有4列内容分布是Field,Type,Length,Note<br/>
*
* @return 成功返回true如果超表不存在或其他错误则返回false
*/
public boolean getFromDB(Connection conn) {
try {
List<String> stables = getSTables(conn);
if (!stables.contains(stable)) {
LOG.error("super table {} not exist fail to get schema from database.", stable);
return false;
}
} catch (SQLException e) {
LOG.error(e.getMessage());
e.printStackTrace();
return false;
}
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("describe " + stable);
int colIndex = 0;
while (rs.next()) {
String name = rs.getString(1);
String type = rs.getString(2);
String note = rs.getString(4);
if ("TIMESTAMP".equals(type)) {
tsColName = name;
tsColIndex = colIndex;
} else if ("TAG".equals(note)) {
tagIndexMap.put(name, colIndex);
tagList.add(name);
} else {
fieldIndexMap.put(name, colIndex);
fieldList.add(name);
}
colIndex++;
}
LOG.info("table infotags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex);
return true;
} catch (SQLException e) {
LOG.error(e.getMessage());
e.printStackTrace();
return false;
}
}
public static List<String> getSTables(Connection conn) throws SQLException {
List<String> stables = new ArrayList<>();
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("show stables");
while (rs.next()) {
String name = rs.getString(1);
stables.add(name);
}
}
return stables;
}
public void createSTable(Connection conn, List<Column.Type> fieldTypes) throws SQLException {
StringBuilder sb = new StringBuilder();
sb.append("CREATE STABLE IF NOT EXISTS ").append(stable).append("(");
sb.append(tsColName).append(" ").append("TIMESTAMP,");
for (int i = 0; i < fieldList.size(); ++i) {
String fieldName = fieldList.get(i);
Column.Type dxType = fieldTypes.get(i);
sb.append(fieldName).append(' ');
String tdType = mapDataxType(dxType);
sb.append(tdType).append(',');
}
sb.deleteCharAt(sb.length() - 1);
sb.append(") TAGS(");
for (String tagName : tagList) {
sb.append(tagName).append(" NCHAR(64),");
}
sb.deleteCharAt(sb.length() - 1);
sb.append(")");
String q = sb.toString();
LOG.info("run sql" + q);
try (Statement stmt = conn.createStatement()) {
stmt.execute(q);
}
}
public String[] getTagValuesFromRecord(Record record) {
String[] tagValues = new String[tagList.size()];
for (int i = 0; i < tagList.size(); ++i) {
if (fixedTagValue.containsKey(tagList.get(i))) {
tagValues[i] = fixedTagValue.get(tagList.get(i));
} else {
int tagIndex = tagIndexMap.get(tagList.get(i));
tagValues[i] = record.getColumn(tagIndex).asString();
}
if (tagValues[i] == null) {
return null;
}
}
return tagValues;
}
public boolean hasTimestamp(Record record) {
Column column = record.getColumn(tsColIndex);
if (column.getType() == Column.Type.DATE && column.asDate() != null) {
return true;
} else {
return false;
}
}
public Map<String, Integer> getFieldIndexMap() {
return fieldIndexMap;
}
public List<String> getFieldList() {
return fieldList;
}
public String getJoinedFieldNames() {
return tsColName + ", " + String.join(", ", fieldList);
}
public int getTsColIndex() {
return tsColIndex;
}
public String getTagValuesPlaceHolder() {
return tagList.stream().map(x -> "?").collect(Collectors.joining(","));
}
public String getFieldValuesPlaceHolder() {
return "?, " + fieldList.stream().map(x -> "?").collect(Collectors.joining(", "));
}
/**
* 计算子表表名
* <ol>
* <li>将标签的value 组合成为如下的字符串: tag_value1!tag_value2!tag_value3</li>
* <li>计算该字符串的 MD5 散列值 "md5_val"</li>
* <li>"t_md5val"作为子表名其中的 "t" 是固定的前缀</li>
* </ol>
*
* @param tagValues
* @return
*/
public String computeTableName(String[] tagValues) {
String s = String.join("!", tagValues);
return "t_" + DigestUtils.md5Hex(s);
}
public int getDynamicTagCount() {
return tagIndexMap.size();
}
}

View File

@ -0,0 +1,91 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
public class TDengineWriter extends Writer {
private static final String PEER_PLUGIN_NAME = "peerPluginName";
static {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public static class Job extends Writer.Job {
private Configuration originalConfig;
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
this.originalConfig.set(PEER_PLUGIN_NAME, getPeerPluginName());
}
@Override
public void destroy() {
}
@Override
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> writerSplitConfigs = new ArrayList<>();
for (int i = 0; i < mandatoryNumber; i++) {
writerSplitConfigs.add(this.originalConfig);
}
return writerSplitConfigs;
}
}
public static class Task extends Writer.Task {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration writerSliceConfig;
@Override
public void init() {
this.writerSliceConfig = getPluginJobConf();
}
@Override
public void destroy() {
}
@Override
public void startWrite(RecordReceiver lineReceiver) {
Set<String> keys = this.writerSliceConfig.getKeys();
Properties properties = new Properties();
for (String key : keys) {
String value = this.writerSliceConfig.getString(key);
properties.setProperty(key, value);
}
if (!keys.contains(Key.USER)) {
properties.setProperty(Key.USER, "root");
}
if (!keys.contains(Key.PASSWORD)) {
properties.setProperty(Key.PASSWORD, "taosdata");
}
LOG.debug("========================properties==========================\n" + properties);
String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME);
LOG.debug("start to handle record from: " + peerPluginName);
DataHandler handler = DataHandlerFactory.build(peerPluginName);
long records = handler.handle(lineReceiver, properties, getTaskPluginCollector());
LOG.debug("handle data finished, records: " + records);
}
}
}

View File

@ -0,0 +1,32 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.spi.ErrorCode;
public enum TDengineWriterErrorCode implements ErrorCode {
RUNTIME_EXCEPTION("TDengineWriter-00", "运行时异常"),
TYPE_ERROR("TDengineWriter-00", "Datax类型无法正确映射到TDengine类型");
private final String code;
private final String description;
private TDengineWriterErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s]. ", this.code,
this.description);
}
}

View File

@ -0,0 +1,105 @@
/* DO NOT EDIT THIS FILE - it is machine generated */
#include <jni.h>
/* Header for class com_alibaba_datax_plugin_writer_JniConnection */
#ifndef _Included_com_alibaba_datax_plugin_writer_JniConnection
#define _Included_com_alibaba_datax_plugin_writer_JniConnection
#ifdef __cplusplus
extern "C" {
#endif
#undef com_alibaba_datax_plugin_writer_JniConnection_JNI_NULL_POINTER
#define com_alibaba_datax_plugin_writer_JniConnection_JNI_NULL_POINTER 0LL
#undef com_alibaba_datax_plugin_writer_JniConnection_JNI_SUCCESSFUL
#define com_alibaba_datax_plugin_writer_JniConnection_JNI_SUCCESSFUL 0L
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: initImp
* Signature: (Ljava/lang/String;)V
*/
JNIEXPORT void JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_initImp
(JNIEnv *, jclass, jstring);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: setOptions
* Signature: (ILjava/lang/String;)I
*/
JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_setOptions
(JNIEnv *, jclass, jint, jstring);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: getTsCharset
* Signature: ()Ljava/lang/String;
*/
JNIEXPORT jstring JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getTsCharset
(JNIEnv *, jclass);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: connectImp
* Signature: (Ljava/lang/String;ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;)J
*/
JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_connectImp
(JNIEnv *, jobject, jstring, jint, jstring, jstring, jstring);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: executeQueryImp
* Signature: ([BJ)J
*/
JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_executeQueryImp
(JNIEnv *, jobject, jbyteArray, jlong);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: getErrCodeImp
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrCodeImp
(JNIEnv *, jobject, jlong, jlong);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: getErrMsgImp
* Signature: (J)Ljava/lang/String;
*/
JNIEXPORT jstring JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrMsgImp
(JNIEnv *, jobject, jlong);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: getErrMsgByCode
* Signature: (J)Ljava/lang/String;
*/
JNIEXPORT jstring JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrMsgByCode
(JNIEnv *, jobject, jlong);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: getAffectedRowsImp
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getAffectedRowsImp
(JNIEnv *, jobject, jlong, jlong);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: closeConnectionImp
* Signature: (J)I
*/
JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_closeConnectionImp
(JNIEnv *, jobject, jlong);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: insertOpentsdbJson
* Signature: (Ljava/lang/String;J)J
*/
JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_insertOpentsdbJson
(JNIEnv *, jobject, jstring, jlong);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -0,0 +1,9 @@
{
"name": "tdenginewriter",
"class": "com.alibaba.datax.plugin.writer.tdenginewriter.TDengineWriter",
"description": {
"useScene": "data migration to tdengine",
"mechanism": "use JNI or taos-jdbc to write data to tdengine."
},
"developer": "zyyang-taosdata"
}

View File

@ -0,0 +1,24 @@
{
"name": "tdenginewriter",
"parameter": {
"host": "127.0.0.1",
"port": 6030,
"dbname": "test",
"user": "root",
"password": "taosdata",
"batchSize": 1000,
"stable": "weather",
"tagColumn": {
"station": 0
},
"fieldColumn": {
"latitude": 1,
"longtitude": 2,
"tmax": 4,
"tmin": 5
},
"timestampColumn":{
"date": 3
}
}
}

View File

@ -0,0 +1,6 @@
try_get_schema_fromdb=fail to get structure info of target table from configure file and will try to get it from database
batch_size_too_small='batchSize' is too small, please increase it and try again
column_number_error=number of columns is less than expected
tag_value_error=tag columns include 'null' value
ts_value_error=timestamp column type error or null
infer_column_type_error=fail to infer column type: sample count %d, column index %d

View File

@ -0,0 +1,6 @@
try_get_schema_fromdb=fail to get structure info of target table from configure file and will try to get it from database
batch_size_too_small='batchSize' is too small, please increase it and try again
column_number_error=number of columns is less than expected
tag_value_error=tag columns include 'null' value
ts_value_error=timestamp column type error or null
infer_column_type_error=fail to infer column type: sample count %d, column index %d

View File

@ -0,0 +1,6 @@
try_get_schema_fromdb=\u65e0\u6cd5\u4ece\u914d\u7f6e\u6587\u4ef6\u83b7\u53d6\u8868\u7ed3\u6784\u4fe1\u606f\uff0c\u5c1d\u8bd5\u4ece\u6570\u636e\u5e93\u83b7\u53d6
batch_size_too_small=batchSize\u592a\u5c0f\uff0c\u4f1a\u589e\u52a0\u81ea\u52a8\u7c7b\u578b\u63a8\u65ad\u9519\u8bef\u7684\u6982\u7387\uff0c\u5efa\u8bae\u6539\u5927\u540e\u91cd\u8bd5
column_number_error=\u5b9e\u9645\u5217\u6570\u5c0f\u4e8e\u671f\u671b\u5217\u6570
tag_value_error=\u6807\u7b7e\u5217\u5305\u542bnull
ts_value_error=\u65f6\u95f4\u6233\u5217\u4e3anull\u6216\u7c7b\u578b\u9519\u8bef
infer_column_type_error=\u6839\u636e\u91c7\u6837\u7684%d\u6761\u6570\u636e\uff0c\u65e0\u6cd5\u63a8\u65ad\u7b2c%d\u5217\u7684\u6570\u636e\u7c7b\u578b

View File

@ -0,0 +1,21 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import org.junit.Test;
import java.util.Properties;
public class JniConnectionTest {
@Test
public void test() throws Exception {
JniConnection connection = new JniConnection(new Properties());
connection.open("192.168.56.105", 6030, "test", "root", "taosdata");
String json = "{\"metric\":\"weather_temperature\",\"timestamp\":1609430400000,\"value\":123,\"tags\":{\"location\":\"beijing\",\"id\":\"t123\"}}";
connection.insertOpentsdbJson(json);
connection.close();
}
}

View File

@ -0,0 +1,25 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import org.junit.Test;
import java.util.Locale;
import java.util.ResourceBundle;
import org.junit.Assert;
public class MessageTest {
@Test
public void testChineseMessage() {
Locale local = new Locale("zh", "CN");
ResourceBundle bundle = ResourceBundle.getBundle("tdenginewritermsg", local);
String msg = bundle.getString("try_get_schema_fromdb");
Assert.assertEquals("无法从配置文件获取表结构信息,尝试从数据库获取", msg);
}
@Test
public void testDefaultMessage() {
ResourceBundle bundle = ResourceBundle.getBundle("tdenginewritermsg", Locale.getDefault());
String msg = bundle.getString("try_get_schema_fromdb");
System.out.println(msg);
}
}

View File

@ -0,0 +1,31 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class TDengineWriterTest {
@Test
public void testGetSchema() throws ClassNotFoundException, SQLException {
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", "wozai.fun", "6030", "test", "root", "taosdata");
Connection conn = DriverManager.getConnection(jdbcUrl);
SchemaManager schemaManager = new SchemaManager();
schemaManager.setStable("test1");
schemaManager.getFromDB(conn);
}
@Test
public void dropTestTable() throws ClassNotFoundException, SQLException {
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", "wozai.fun", "6030", "test", "root", "taosdata");
Connection conn = DriverManager.getConnection(jdbcUrl);
Statement stmt = conn.createStatement();
stmt.execute("drop table market_snapshot");
}
}