change doc-CN

This commit is contained in:
zyyang 2022-02-20 15:20:19 +08:00
parent fad2760ca2
commit c96256c9a2

View File

@ -4,21 +4,40 @@
## 1 快速介绍
TDengineWriter插件实现了写入数据到TDengine数据库功能。可用于离线同步其它数据库的数据到TDengine。
TDengineWriter插件实现了写入数据到TDengine数据库目标表的功能。底层实现上TDengineWriter通过JDBC连接TDengine按照TDengine的SQL语法执行insert语句/schemaless语句将数据写入TDengine。
TDengineWriter可以作为数据迁移工具供DBA将其它数据库的数据导入到TDengine。
## 2 实现原理
TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据根据reader的类型解析数据。目前有两种写入方式
TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据通过JDBC Driver连接TDengine执行insert语句/schemaless语句将数据写入TDengine。
1. 对于OpenTSDBReader, TDengineWriter通过JNI方式调用TDengine客户端库文件taos.lib或taos.dll中的方法使用[schemaless的方式](https://www.taosdata.com/cn/documentation/insert#schemaless)写入。
在TDengine中table可以分成超级表、子表、普通表三种类型超级表和子表包括colum和tag子表的tag列的值为固定值普通表与关系型数据库中表的概念一致。详细请参考[数据模型](https://www.taosdata.com/docs/cn/v2.0/architecture#model)
TDengineWriter支持向超级表、子表、普通表中写入数据按照table的类型和column参数中是否包含tbname使用以下方法进行写入
1. table为超级表column中指定tbname使用自动建表的insert语句使用tbname作为子表的名称。
2. table为超级表column中未指定tbname使用schemaless写入TDengine会根据超级表名、tag值计算一个子表名称。
3. table为子表使用insert语句写入ignoreTagUnmatched参数为true时忽略record中tag值与table的tag值不一致的数据。
4. table为普通表使用insert语句写入。
2. 对于其它数据源,会根据配置生成SQL语句, 通过[taos-jdbcdriver](https://www.taosdata.com/cn/documentation/connector/java)批量写入。
这样区分的原因是OpenTSDBReader将opentsdb的数据统一读取为json字符串Writer端接收到的数据只有1列。而其它Reader插件一般会把数据放在不同列。
## 3 功能说明
### 3.1 从OpenTSDB到TDengine
#### 3.1.1 配置样例
### 3.1 配置样例
配置一个写入TDengine的作业
先在TDengine上创建超级表
```sql
create database if not exists test;
create table test.weather (ts timestamp, temperature int, humidity double) tags(is_normal bool, device_id binary(100), address nchar(100));
```
使用下面的Job配置将数据写入TDengine
```json
{
@ -26,286 +45,65 @@ TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据根据rea
"content": [
{
"reader": {
"name": "opentsdbreader",
"name": "streamreader",
"parameter": {
"endpoint": "http://192.168.1.180:4242",
"column": [
"weather_temperature"
{
"type": "string",
"value": "tb1"
},
{
"type": "date",
"value": "2022-02-20 12:00:01"
},
{
"type": "long",
"random": "0, 10"
},
{
"type": "double",
"random": "0, 10"
},
{
"type": "bool",
"random": "0, 50"
},
{
"type": "bytes",
"value": "abcABC123"
},
{
"type": "string",
"value": "北京朝阳望京"
}
],
"beginDateTime": "2021-01-01 00:00:00",
"endDateTime": "2021-01-01 01:00:00"
"sliceRecordCount": 1
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"host": "192.168.1.180",
"port": 6030,
"dbName": "test",
"username": "root",
"password": "taosdata"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
```
#### 3.1.2 参数说明
| 参数 | 描述 | 是否必选 | 默认值 |
| --------- | -------------------- | -------- | -------- |
| host | TDengine实例的host | 是 | 无 |
| port | TDengine实例的port | 是 | 无 |
| username | TDengine实例的用户名 | 否 | root |
| password | TDengine实例的密码 | 否 | taosdata |
| dbName | 目的数据库的名称 | 是 | 无 |
| batchSize | 每次批量插入多少记录 | 否 | 1 |
#### 3.1.3 类型转换
目前由于OpenTSDBReader将opentsdb的数据统一读取为json字符串TDengineWriter 在做Opentsdb到TDengine的迁移时按照以下类型进行处理
| OpenTSDB数据类型 | DataX 内部类型 | TDengine 数据类型 |
| ---------------- | -------------- | ----------------- |
| timestamp | Date | timestamp |
| Integervalue | Double | double |
| Floatvalue | Double | double |
| Stringvalue | String | binary |
| Integertag | String | binary |
| Floattag | String | binary |
| Stringtag | String | binary |
### 3.2 从MongoDB到TDengine
#### 3.2.1 配置样例
```json
{
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": [
"127.0.0.1:27017"
],
"userName": "user",
"mechanism": "SCRAM-SHA-1",
"userPassword": "password",
"authDb": "admin",
"dbName": "test",
"collectionName": "stock",
"column": [
{
"name": "stockID",
"type": "string"
},
{
"name": "tradeTime",
"type": "date"
},
{
"name": "lastPrice",
"type": "double"
},
{
"name": "askPrice1",
"type": "double"
},
{
"name": "bidPrice1",
"type": "double"
},
{
"name": "volume",
"type": "int"
}
]
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"host": "localhost",
"port": 6030,
"dbName": "test",
"username": "root",
"password": "taosdata",
"stable": "stock",
"tagColumn": {
"industry": "energy",
"stockID": 0
},
"fieldColumn": {
"lastPrice": 2,
"askPrice1": 3,
"bidPrice1": 4,
"volume": 5
},
"timestampColumn": {
"tradeTime": 1
}
}
}
}
]
}
}
```
**注本配置的writer部分同样适用于关系型数据库**
#### 3.2.2 参数说明
| 参数 | 描述 | 是否必选 | 默认值 | 备注 |
| --------------- | -------------------- | ---------------- | -------- | ------------------ |
| host | TDengine实例的host | 是 | 无 |
| port | TDengine实例的port | 是 | 无 |
| username | TDengine实例的用户名 | 否 | root |
| password | TDengine实例的密码 | 否 | taosdata |
| dbName | 目的数据库的名称 | 是 | 无 |
| batchSize | 每次批量插入多少记录 | 否 | 1000 |
| stable | 目标超级表的名称 | 是(OpenTSDB除外) | 无 |
| tagColumn | 格式:{tagName1: tagInd1, tagName2: tagInd2}, 标签列在写插件收到的Record中的位置和列名 | 否 | 无 | 位置索引均从0开始, tagInd如果为字符串, 表示固定标签值,不需要从源数据中获取 |
| fieldColumn | 格式:{fdName1: fdInd1, fdName2: fdInd2}, 字段列在写插件收到的Record中的位置和列名 | 否 | 无 | |
| timestampColumn | 格式:{tsColName: tsColIndex}, 时间戳列在写插件收到的Record中的位置和列名 | 否 | 无 | 时间戳列只能有一个 |
示例配置中tagColumn有一个industry它的值是一个固定的字符串“energy”, 作用是给导入的所有数据加一个值为"energy"的固定标签industry。这个应用场景可以是在源库中有多个设备采集的数据分表存储设备名就是表名可以用这个机制把设备名称转化为标签。
#### 3.2.3 自动建表规则
##### 3.2.3.1 超级表创建规则
如果配置了tagColumn、 fieldColumn和timestampColumn将会在插入第一条数据前自动创建超级表。<br>
数据列的类型从第1条记录自动推断, 标签列默认类型为`NCHAR(64)`, 比如示例配置,可能生成以下建表语句:
```sql
CREATE STABLE IF NOT EXISTS market_snapshot (
tadetime TIMESTAMP,
lastprice DOUBLE,
askprice1 DOUBLE,
bidprice1 DOUBLE,
volume INT
)
TAGS(
industry NCHAR(64),
stockID NCHAR(64)
);
```
##### 3.2.3.2 子表创建规则
子表结构与超级表相同,子表表名生成规则:
1. 将标签的value 组合成为如下的字符串: `tag_value1!tag_value2!tag_value3`
2. 计算该字符串的 MD5 散列值 "md5_val"。
3. "t_md5val"作为子表名。其中的 "t" 是固定的前缀。
#### 3.2.4 用户提前建表
如果你已经创建好目标超级表那么tagColumn、 fieldColumn和timestampColumn三个字段均可省略, 插件将通过执行通过`describe stableName`获取表结构的信息。
此时要求接收到的Record中Column的顺序和执行`describe stableName`返回的列顺序相同, 比如通过`describe stableName`返回以下内容:
```
Field | Type | Length | Note |
=================================================================================
ts | TIMESTAMP | 8 | |
current | DOUBLE | 8 | |
location | BINARY | 10 | TAG |
```
那么插件收到的数据第1列必须代表时间戳第2列必须代表电流第3列必须代表位置。
#### 3.2.5 注意事项
1. tagColumn、 fieldColumn和timestampColumn三个字段用于描述目标表的结构信息这三个配置字段必须同时存在或同时省略。
2. 如果存在以上三个配置,且目标表也已经存在,则两者必须一致。**一致性**由用户自己保证,插件不做检查。不一致可能会导致插入失败或插入数据错乱。
#### 3.2.6 类型转换
| MongoDB 数据类型 | DataX 内部类型 | TDengine 数据类型 |
| ---------------- | -------------- | ----------------- |
| int, Long | Long | BIGINT |
| double | Double | DOUBLE |
| string, array | String | NCHAR(64) |
| date | Date | TIMESTAMP |
| boolean | Boolean | BOOL |
| bytes | Bytes | BINARY(64) |
### 3.3 从关系型数据库到TDengine
writer部分的配置规则和上述MongoDB的示例是一样的这里给出一个MySQL的示例。
#### 3.3.1 MySQL中表结构
```sql
CREATE TABLE IF NOT EXISTS weather(
station varchar(100),
latitude DOUBLE,
longtitude DOUBLE,
`date` DATE,
TMAX int,
TMIN int
)
```
#### 3.3.2 配置文件示例
```json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "passw0rd",
"password": "taosdata",
"column": [
"*"
"tbname",
"ts",
"temperature",
"humidity",
"is_normal",
"device_id",
"address"
],
"splitPk": "station",
"connection": [
{
"table": [
"weather"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8"
]
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/test"
}
]
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"host": "127.0.0.1",
"port": 6030,
"dbName": "test",
"username": "root",
"password": "taosdata",
"batchSize": 1000,
"stable": "weather",
"tagColumn": {
"station": 0
},
"fieldColumn": {
"latitude": 1,
"longtitude": 2,
"tmax": 4,
"tmin": 5
},
"timestampColumn":{
"date": 3
}
],
"batchSize": 100,
"ignoreTagsUnmatched": true
}
}
}
@ -319,6 +117,72 @@ CREATE TABLE IF NOT EXISTS weather(
}
```
### 3.2 参数说明
* jdbcUrl
* 描述数据源的JDBC连接信息TDengine的JDBC信息请参考[Java连接器的使用](https://www.taosdata.com/docs/cn/v2.0/connector/java#url)
* 必选:是
* 默认值:无
* username
* 描述:用户名
* 必选:是
* 默认值:无
* password
* 描述:用户名的密码
* 必选:是
* 默认值:无
* table
* 描述表名的集合table应该包含column参数中的所有列tbname除外。注意column中的tbname会被当作TDengine中子表名使用。
* 必选:是
* 默认值:无
* column
* 描述字段名的集合字段的顺序应该与record中column的
* 必选:是
* 默认值:无
* batchSize
* 描述:
* 必选:否
* 默认值1
* ignoreTagsUnmatched
* 描述当table为TDengine中的一张子表table具有tag值。如果数据的tag值与table的tag值不想等数据不写入到table中。
* 必选:否
* 默认值false
### 3.3 类型转换
datax中的数据类型可以映射到TDengine的数据类型
| DataX 内部类型 | TDengine 数据类型 |
| -------------- | ----------------------------------------- |
| INT | TINYINT, SMALLINT, INT |
| LONG | TIMESTAMP, TINYINT, SMALLINT, INT, BIGINT |
| DOUBLE | FLOAT, DOUBLE |
| STRING | TIMESTAMP, BINARY, NCHAR |
| BOOL | BOOL |
| DATE | TIMESTAMP |
| BYTES | BINARY |
### 3.4 各数据源到TDengine的参考示例
下面是一些数据源到TDengine进行数据迁移的示例
| 数据迁移示例 | 配置的示例 |
| ------------------ | ------------------------------------------------------------ |
| TDengine到TDengine | [超级表到超级表指定tbname](../src/test/resources/t2t-1.json) |
| TDengine到TDengine | [超级表到超级表不指定tbname](../src/test/resources/t2t-2.json) |
| TDengine到TDengine | [超级表到子表](../src/test/resources/t2t-3.json) |
| TDengine到TDengine | [普通表到普通表](../src/test/resources/t2t-4.json) |
| RDBMS到TDengine | [普通表到超级表指定tbname](../src/test/resources/dm2t-1.json) |
| RDBMS到TDengine | [普通表到超级表不指定tbname](../src/test/resources/dm2t-2.json) |
| RDBMS到TDengine | [普通表到子表](../src/test/resources/dm2t-3.json) |
| RDBMS到TDengine | [普通表到普通表](../src/test/resources/dm2t-4.json) |
| OpenTSDB到TDengine | [metric到普通表](../src/test/resources/o2t-1.json) |
## 4 性能报告
@ -362,44 +226,29 @@ CREATE TABLE IF NOT EXISTS weather(
说明:
1. 这里的单表,主键类型为 bigint(20),自增。
2. batchSize 和 通道个数,对性能影响较大。
3. 16通道4096批量提交时出现 full gc 2次。
1.
#### 4.2.4 性能测试小结
## 5 约束限制
1. 本插件自动创建超级表时NCHAR类型的长度固定为64对于包含长度大于64的字符串的数据源将不支持。
2. 标签列不能包含null值如果包含会被过滤掉。
1.
## FAQ
### 如何选取要同步的数据的范围?
数据范围的选取在Reader插件端配置对于不同的Reader插件配置方法往往不同。比如对于mysqlreader 可以用sql语句指定数据范围。对于opentsdbreader, 用beginDateTime和endDateTime两个配置项指定数据范围。
### 如何一次导入多张源表?
如果Reader插件支持一次读多张表Writer插件就能一次导入多张表。如果Reader不支持多多张表可以建多个job分别导入。Writer插件只负责写数据。
### 一张源表导入之后对应TDengine中多少张表
这是由tagColumn决定的如果所有tag列的值都相同那么目标表只有一个。源表有多少不同的tag组合目标超级表就有多少子表。
### 源表和目标表的字段顺序一致吗?
TDengine要求每个表第一列是时间戳列后边是普通字段最后是标签列。如果源表不是这个顺序插件在自动建表时会自动调整
是的TDengineWriter按照column中字段的顺序解析来自datax的数据。
### 插件如何确定各列的数据类型?
根据收到的第一批数据自动推断各列的类型。
### 为什么插入10年前的数据会抛异常`TDengine ERROR (2350): failed to execute batch bind` ?
因为创建数据库的时候默认保留10年的数据。可以手动指定要保留多长时间的数据比如:`CREATE DATABASE power KEEP 36500;`。
### 如果编译的时候某些插件的依赖找不到怎么办?
如果这个插件不是必须的可以注释掉根目录下的pom.xml中的对应插件。