增加IoTDB读写文档并修改了一些配置

This commit is contained in:
李浩然 2024-06-24 19:28:16 +08:00
parent 4d514fb1df
commit 1235819d62
5 changed files with 408 additions and 12 deletions

View File

@ -1,4 +1,220 @@
# DataX IoTDBReader
## 1 快速介绍
## 快速介绍
iotdbreader用来读取iotdb中的数据然后传输到其他数据库。
IoTDBReader 插件实现了 IoTDB 读取数据的功能。
## 2 实现原理
IoTDBReader 通过 IoTDB 的 原生java session 查询获取数据。
## 3 功能说明
### 3.1 配置样例
* 配置一个从 IoTDB 抽取数据作业:
```json
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "iotdbreader",
"parameter": {
"username": "root",
"password": "root",
"host": "192.168.150.100",
"port": 6667,
"fetchSize": 10000,
"version": "V_1_0",
"timeColumnPosition": 0,
"finalSqls":[
],
"device": "root.cgn.device",
"measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU",
"beginDateTime": "2023-03-07 12:00:00",
"endDateTime": "2024-03-07 19:00:00",
"where": ""
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "toy123",
"writeMode": "insert",
"#需要提前建表": "CREATE TABLE device (`time` BIGINT,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);",
"column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from device"
],
"connection": [
{
"table": [
"device"
],
"#": "下面的URL需要把中括号去掉否则报错mysqlreader的bug未修改",
"jdbcUrl": "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
}
]
}
}
}
]
}
}
```
* 配置一个自定义 SQL 的数据抽取作业:
```json
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "iotdbreader",
"parameter": {
"username": "root",
"password": "root",
"host": "192.168.150.100",
"port": 6667,
"fetchSize": 10000,
"version": "V_1_0",
"timeColumnPosition": 0,
"finalSqls":[
"select * from root.cgn.device",
"select A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU from root.cgn.device"
],
"device": "",
"measurements": "",
"beginDateTime": "",
"endDateTime": "",
"where": ""
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"path": "D:/下载",
"fileName": "txtText",
"writeMode": "truncate",
"dateFormat": "yyyy-MM-dd"
}
}
}
]
}
}
```
### 3.2 参数说明
* username
* 描述:用户名
* 必选:是
* 默认值:无
* password
* 描述:用户名的密码
* 必选:是
* 默认值:无
* host
* 描述连接iotdb数据库的主机地址
* 必选:是
* 默认值:无
* port
* 描述:端口
* 必选:是
* 默认值:无
* version
* 描述iotdb版本
* 必选:是
* 默认值:无
* timeColumnPosition
* 描述时间列在Record中列的位置
* 必选:否
* 默认值0
* finalSqls
* 描述直接写多行SQL可以并行读取此时下面的参数失效。
* 必选:否
* 默认值:
* device
* 描述IoTDB中的概念可理解为mysql中的表。
* 必选finalSqls为空时必选
* 默认值:无
* measurements
* 描述IoTDB中的概念可理解为mysql中的字段。
* 必选finalSqls为空时必选
* 默认值:无
* beginDateTime
* 描述SQL查询时的数据的开始时间
* 必选finalSqls为空时必选
* 默认值:无
* measurements
* 描述SQL查询时的数据的结束时间
* 必选:否
* 默认值:无
* where
* 描述:额外的条件
* 必选:否
* 默认值:无
### 3.3 类型转换
| IoTDB 数据类型 | DataX 内部类型 |
|-----------------|------------|
| INT32 | Int |
| INT64,TIMESTAMP | Long |
| FLOAT | FLOAT |
| DOUBLE | Double |
| BOOLEAN | Bool |
| DATE | Date |
| STRING,TEXT | String |
## 4 性能报告
### 4.1 环境准备
#### 4.1.1 数据特征
#### 4.1.2 机器参数
#### 4.1.3 DataX jvm 参数
-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
### 4.2 测试报告
#### 4.2.1 单表测试报告
| 通道数| DataX速度(Rec/s)|DataX流量(MB/s)| DataX机器网卡流出流量(MB/s)|DataX机器运行负载|DB网卡进入流量(MB/s)|DB运行负载|DB TPS|
|--------| --------|--------|--------|--------|--------|--------|--------|
|1| | | | | | | |
|4| | | | | | | |
|8| | | | | | | |
|16| | | | | | | |
|32| | | | | | | |
说明:
#### 4.2.4 性能测试小结
1.
2.
## 5 约束限制
## FAQ

View File

@ -0,0 +1,189 @@
# DataX IoTDBWriter
## 1 快速介绍
IoTDBWriter插件实现了写入数据到IoTDB数据库目标表(设备)的功能。
底层实现上IoTDBWriter通过iotdb.session连接IoTDB按照IoTDB的SQL语法
执行session.insertRecordsOfOneDevice语句将数据写入IoTDB。
IoTDBWriter可以作为数据迁移工具供DBA将其它数据库的数据导入到IoTDB。
## 2 实现原理
IoTDBWriter 通过 DataX 框架获取 Reader 生成的协议数据Record通过Session连接IoTDB执行insert语句将数据写入IoTDB。
IoTDB中设备与列的概念见IoTDB官方文档。
## 3 功能说明
### 3.1 配置样例
配置一个MySQL数据写入IoTDB的作业
使用下面的Job配置将数据写入IoTDB
```json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "toy123",
"column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
"splitPk": "",
"connection": [
{
"table": [
"device"
],
"jdbcUrl": [
"jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
]
}
]
}
},
"writer": {
"name": "iotdbwriter",
"parameter": {
"username": "root",
"password": "root",
"host": "192.168.150.100",
"port": 6667,
"version": "V_1_0",
"##": "Reader中时间列的位置默认0列",
"timeColumnPosition": 0,
"insertBatchSize": 1000,
"device": "root.cgn.device",
"measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU",
"deleteExistTimeseries": false
}
}
}
],
"setting": {
"speed": {
"channel": 3
}
}
}
}
```
### 3.2 参数说明
* username
* 描述:用户名
* 必选:是
* 默认值:无
* password
* 描述:用户名的密码
* 必选:是
* 默认值:无
* host
* 描述连接iotdb数据库的主机地址
* 必选:是
* 默认值:无
* port
* 描述:端口
* 必选:是
* 默认值:无
* version
* 描述iotdb版本
* 必选:是
* 默认值:无
* timeColumnPosition
* 描述时间列在Record中列的位置
* 必选:否
* 默认值0
* device
* 描述iotdb中的概念对应mysql中的表名
* 必选:是
* 默认值:无
* measurements
* 描述iotdb中的概念对应mysql中的字段集合顺序应该与record中column的顺序相同
* 必选:是
* 默认值:无
* batchSize
* 描述每batchSize条record为一个batch进行写入
* 必选:否
* 默认值1000
* deleteExistTimeseries
* 描述插入前是否删除该device下的所有数据
* 必选:否
* 默认值false
### 3.3 类型转换
datax中的数据类型映射到IoTDB的数据类型
| DataX 内部类型 | IoTDB 数据类型 |
| -------------- |------------------|
| INT | INT32 |
| LONG | TIMESTAMP, INT64 |
| DOUBLE | DOUBLE |
| STRING | STRING |
| BOOL | BOOL |
| DATE | TIMESTAMP,DATE |
| BYTES | BINARY |
### 3.4 各数据源到IoTDB的参考示例
见datax-example/datax-example-iotdb
## 4 性能报告
### 4.1 环境准备
#### 4.1.1 数据特征
建表语句:
单行记录类似于:
#### 4.1.2 机器参数
* 执行DataX的机器参数为:
1. cpu:
2. mem:
3. net: 千兆双网卡
4. disc: DataX 数据不落磁盘,不统计此项
* IoTDB数据库机器参数为:
1. cpu:
2. mem:
3. net: 千兆双网卡
4. disc:
#### 4.1.3 DataX jvm 参数
-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
### 4.2 测试报告
#### 4.2.1 单表测试报告
| 通道数 | DataX速度(Rec/s) | DataX流量(MB/s) | DataX机器网卡流出流量(MB/s) | DataX机器运行负载 | DB网卡进入流量(MB/s) | DB运行负载 | DB TPS |
| ------ | ---------------- | --------------- | --------------------------- | ----------------- | -------------------- | ---------- | ------ |
| 1 | | | | | | | |
| 4 | | | | | | | |
| 8 | | | | | | | |
| 16 | | | | | | | |
| 32 | | | | | | | |
#### 4.2.4 性能测试小结
## 5 约束限制

View File

@ -45,10 +45,6 @@ public class IoTDBWriter extends Writer {
if (port == null || port.isEmpty()) {
throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PORT + "] is not set.");
}
String fetchSize = this.jobConf.getString(Key.FETCH_SIZE);
if (fetchSize == null || fetchSize.isEmpty()) {
throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.FETCH_SIZE + "] is not set.");
}
// 还有一部分参数没检查没必要了
}
@ -119,9 +115,6 @@ public class IoTDBWriter extends Writer {
throw new RuntimeException(e);
}
// set session fetchSize
session.setFetchSize(taskConf.getInt(Key.FETCH_SIZE));
// 获取参数否则默认值
insertBatchSize = (taskConf.getInt(Key.INSERT_BATCH_SIZE) == null) ? 1000 : taskConf.getInt(Key.INSERT_BATCH_SIZE);
timeColumnPosition = (taskConf.getInt(Key.TIME_COLUMN_POSITION) == null) ? 0 : taskConf.getInt(Key.TIME_COLUMN_POSITION);
@ -144,7 +137,7 @@ public class IoTDBWriter extends Writer {
throw new RuntimeException(e);
}
// TODO 是否创建测点时间序列
// 是否创建测点时间序列不需要IoTDB会自动创建时间序列
}
@Override

View File

@ -5,7 +5,6 @@ public class Key {
public static final String PASSWORD = "password";
public static final String HOST = "host";
public static final String PORT = "port";
public static final String FETCH_SIZE = "fetchSize";
public static final String VERSION = "version";
public static final String TIME_COLUMN_POSITION = "timeColumnPosition";
public static final String DEVICE = "device";

View File

@ -5,7 +5,6 @@
"password": "root",
"host": "192.168.150.100",
"port": 6667,
"fetchSize": 10000,
"version": "V_1_0",
"##": "注意是Reader插件读取到的数据中时间列的位置不是该插件默认0列",
"timeColumnPosition": 0,