mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-03 02:50:49 +08:00
mongodb2tdengine support
This commit is contained in:
parent
3de5a8f715
commit
bf01999222
@ -55,18 +55,21 @@
|
||||
"dbname": "test",
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"measurement": "market_snapshot",
|
||||
"tag_set": {
|
||||
"stable": "market_snapshot",
|
||||
"batchSize": 35,
|
||||
"tagColumn": {
|
||||
"product": "cu",
|
||||
"instrumentID": 0
|
||||
},
|
||||
"field_set": {
|
||||
"fieldColumn": {
|
||||
"lastPrice": 2,
|
||||
"askPrice1": 3,
|
||||
"bidPrice1": 4,
|
||||
"volume": 5
|
||||
},
|
||||
"timestamp": 1
|
||||
"timestampColumn": {
|
||||
"tradeTime": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2,20 +2,21 @@
|
||||
|
||||
## 1 快速介绍
|
||||
|
||||
TDengineWriter 插件实现了写入数据到 TDengine 的功能。 在底层实现上, TDengineWriter 通过 JNI的方式调用libtaos.so/tao.dll中的方法,连接 TDengine
|
||||
数据库实例,并执行schemaless的写入。 TDengineWriter 面向ETL开发工程师,他们使用 TDengineWriter 从数仓导入数据到 TDengine。同时,TDengineWriter
|
||||
亦可以作为数据迁移工具为DBA等用户提供服务。
|
||||
TDengineWriter插件实现了写入数据到TDengine数据库功能。可用于离线同步其它数据库的数据到TDengine。
|
||||
|
||||
## 2 实现原理
|
||||
|
||||
TDengineWriter 通过 DataX 框架获取 Reader
|
||||
生成的协议数据,根据reader的类型解析数据,通过JNI方式调用libtaos.so(或taos.dll)中的方法,使用schemaless的方式写入到TDengine。
|
||||
TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据,根据reader的类型解析数据。目前有两种写入方式:
|
||||
|
||||
1. 对于OpenTSDBReader, TDengineWriter通过JNI方式调用TDengine客户端库文件(taos.lib或taos.dll)中的方法,使用[schemaless的方式](https://www.taosdata.com/cn/documentation/insert#schemaless)写入。
|
||||
|
||||
2. 对于其它数据源,会根据配置生成SQL语句, 通过[taos-jdbcdriver](https://www.taosdata.com/cn/documentation/connector/java)批量写入。
|
||||
|
||||
这样区分的原因是OpenTSDBReader将opentsdb的数据统一读取为json字符串,Writer端接收到的数据只有1列。而其它Reader插件一般会把数据放在不同列。
|
||||
|
||||
## 3 功能说明
|
||||
|
||||
### 3.1 配置样例
|
||||
|
||||
* 这里使用一份从OpenTSDB产生到 TDengine 导入的数据。
|
||||
### 3.1 从OpenTSDB到TDengine
|
||||
#### 3.1.1 配置样例
|
||||
|
||||
```json
|
||||
{
|
||||
@ -54,46 +55,189 @@ TDengineWriter 通过 DataX 框架获取 Reader
|
||||
}
|
||||
```
|
||||
|
||||
### 3.2 参数说明
|
||||
#### 3.1.2 参数说明
|
||||
|
||||
* **host**
|
||||
* 描述:TDengine实例的host。
|
||||
| 参数 | 描述 | 是否必选 | 默认值 |
|
||||
| --------- | -------------------- | -------- | -------- |
|
||||
| host | TDengine实例的host | 是 | 无 |
|
||||
| port | TDengine实例的port | 是 | 无 |
|
||||
| user | TDengine实例的用户名 | 否 | root |
|
||||
| password | TDengine实例的密码 | 否 | taosdata |
|
||||
| dbname | 目的数据库的名称 | 是 | 无 |
|
||||
| batchSize | 每次批量插入多少记录 | 否 | 1 |
|
||||
|
||||
* 必选:是 <br />
|
||||
|
||||
* 默认值:无 <br />
|
||||
* **port**
|
||||
* 描述:TDengine实例的port。
|
||||
* 必选:是 <br />
|
||||
* 默认值:无 <br />
|
||||
* **dbname**
|
||||
* 描述:目的数据库的名称。
|
||||
#### 3.1.3 类型转换
|
||||
|
||||
* 必选:是 <br />
|
||||
目前,由于OpenTSDBReader将opentsdb的数据统一读取为json字符串,TDengineWriter 在做Opentsdb到TDengine的迁移时,按照以下类型进行处理:
|
||||
|
||||
* 默认值:无 <br />
|
||||
* **username**
|
||||
* 描述:TDengine实例的用户名 <br />
|
||||
* 必选:是 <br />
|
||||
* 默认值:无 <br />
|
||||
* **password**
|
||||
* 描述:TDengine实例的密码 <br />
|
||||
* 必选:是 <br />
|
||||
* 默认值:无 <br />
|
||||
|
||||
### 3.3 类型转换
|
||||
|
||||
目前,由于opentsdbreader将opentsdb的数据统一读取为json字符串,TDengineWriter 在做Opentsdb到TDengine的迁移时,按照以下类型进行处理:
|
||||
|
||||
| OpenTSDB数据类型 | DataX 内部类型| TDengine 数据类型 |
|
||||
| -------- | ----- | -------- |
|
||||
| OpenTSDB数据类型 | DataX 内部类型 | TDengine 数据类型 |
|
||||
| ---------------- | -------------- | ----------------- |
|
||||
| timestamp | Date | timestamp |
|
||||
| Integer(value) | Double | double |
|
||||
| Float(value) | Double | double |
|
||||
| String(value) | String | binary |
|
||||
| Float(value) | Double | double |
|
||||
| String(value) | String | binary |
|
||||
| Integer(tag) | String | binary |
|
||||
| Float(tag) | String |binary |
|
||||
| String(tag) | String |binary |
|
||||
| Float(tag) | String | binary |
|
||||
| String(tag) | String | binary |
|
||||
|
||||
### 3.2 从MongoDB到TDengine
|
||||
|
||||
#### 3.2.1 配置样例
|
||||
```json
|
||||
{
|
||||
"job": {
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 2
|
||||
}
|
||||
},
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "mongodbreader",
|
||||
"parameter": {
|
||||
"address": [
|
||||
"127.0.0.1:27017"
|
||||
],
|
||||
"userName": "user",
|
||||
"mechanism": "SCRAM-SHA-1",
|
||||
"userPassword": "password",
|
||||
"authDb": "admin",
|
||||
"dbName": "test",
|
||||
"collectionName": "stock",
|
||||
"column": [
|
||||
{
|
||||
"name": "stockID",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"name": "tradeTime",
|
||||
"type": "date"
|
||||
},
|
||||
{
|
||||
"name": "lastPrice",
|
||||
"type": "double"
|
||||
},
|
||||
{
|
||||
"name": "askPrice1",
|
||||
"type": "double"
|
||||
},
|
||||
{
|
||||
"name": "bidPrice1",
|
||||
"type": "double"
|
||||
},
|
||||
{
|
||||
"name": "volume",
|
||||
"type": "int"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "tdenginewriter",
|
||||
"parameter": {
|
||||
"host": "localhost",
|
||||
"port": 6030,
|
||||
"dbname": "test",
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"stable": "stock",
|
||||
"tagColumn": {
|
||||
"industry": "energy",
|
||||
"stockID": 0
|
||||
},
|
||||
"fieldColumn": {
|
||||
"lastPrice": 2,
|
||||
"askPrice1": 3,
|
||||
"bidPrice1": 4,
|
||||
"volume": 5
|
||||
},
|
||||
"timestampColumn": {
|
||||
"tradeTime": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**注:本配置的writer部分同样适用于关系型数据库**
|
||||
|
||||
|
||||
#### 3.2.2 参数说明
|
||||
| 参数 | 描述 | 是否必选 | 默认值 | 备注 |
|
||||
| --------------- | -------------------- | ---------------- | -------- | ------------------ |
|
||||
| host | TDengine实例的host | 是 | 无 |
|
||||
| port | TDengine实例的port | 是 | 无 |
|
||||
| user | TDengine实例的用户名 | 否 | root |
|
||||
| password | TDengine实例的密码 | 否 | taosdata |
|
||||
| dbname | 目的数据库的名称 | 是 | 无 |
|
||||
| batchSize | 每次批量插入多少记录 | 否 | 1000 |
|
||||
| stable | 目标超级表的名称 | 是(OpenTSDB除外) | 无 |
|
||||
| tagColumn | 标签列的列名和位置 | 否 | 无 | 位置索引均从0开始 |
|
||||
| fieldColumn | 字段列的列名和位置 | 否 | 无 | |
|
||||
| timestampColumn | 时间戳列的列名和位置 | 否 | 无 | 时间戳列只能有一个 |
|
||||
|
||||
#### 3.3.3 自动建表规则
|
||||
##### 3.3.3.1 超级表创建规则
|
||||
|
||||
如果配置了tagColumn、 fieldColumn和timestampColumn将会在插入第一条数据前,自动创建超级表。<br>
|
||||
数据列的类型从第1条记录自动推断, 标签列默认类型为`NCHAR(64)`, 比如示例配置,可能生成以下建表语句:
|
||||
|
||||
```sql
|
||||
CREATE STABLE IF NOT EXISTS market_snapshot (
|
||||
tadetime TIMESTAMP,
|
||||
lastprice DOUBLE,
|
||||
askprice1 DOUBLE,
|
||||
bidprice1 DOUBLE,
|
||||
volume INT
|
||||
)
|
||||
TAGS(
|
||||
industry NCHAR(64),
|
||||
stockID NCHAR(64
|
||||
);
|
||||
```
|
||||
|
||||
##### 3.3.3.2 子表创建规则
|
||||
|
||||
子表结果与超表相同,子表表名生成规则:
|
||||
1. 将标签的value 组合成为如下的字符串: `tag_value1!tag_value2!tag_value3`。
|
||||
2. 计算该字符串的 MD5 散列值 "md5_val"。
|
||||
3. "t_md5val"作为子表名。其中的 "t" 是固定的前缀。
|
||||
|
||||
#### 3.3.4 用户提前建表
|
||||
|
||||
如果你已经创建好目标超级表,那么tagColumn、 fieldColumn和timestampColumn三个字段均可省略, 插件将通过执行通过`describe stableName`获取表结构的信息。
|
||||
此时要求接收到的Record中Column的顺序和执行`describe stableName`返回的列顺序相同, 比如通过`describe stableName`返回以下内容:
|
||||
```
|
||||
Field | Type | Length | Note |
|
||||
=================================================================================
|
||||
ts | TIMESTAMP | 8 | |
|
||||
current | DOUBLE | 8 | |
|
||||
location | BINARY | 10 | TAG |
|
||||
```
|
||||
那么插件收到的数据第1列必须代表时间戳,第2列必须代表电流,第3列必须代表位置。
|
||||
|
||||
#### 3.3.5 注意事项
|
||||
|
||||
1. tagColumn、 fieldColumn和timestampColumn三个字段用于描述目标表的结构信息,这三个配置字段必须同时存在或同时省略。
|
||||
2. 如果存在以上三个配置,且目标表也已经存在,则两者必须一致。**一致性**由用户自己保证,插件不做检查。不一致可能会导致插入失败或插入数据错乱。
|
||||
3. 插件优先使用配置文件中指定的表结构。
|
||||
|
||||
#### 3.3.6 类型转换
|
||||
|
||||
| MongoDB 数据类型 | DataX 内部类型 | TDengine 数据类型 |
|
||||
| ---------------- | -------------- | ----------------- |
|
||||
| int, Long | Long | BIGINT |
|
||||
| double | Double | DOUBLE |
|
||||
| string, array | String | NCHAR(64) |
|
||||
| date | Date | TIMESTAMP |
|
||||
| boolean | Boolean | BOOL |
|
||||
| bytes | Bytes | BINARY |
|
||||
|
||||
|
||||
## 4 性能报告
|
||||
|
||||
@ -127,13 +271,13 @@ TDengineWriter 通过 DataX 框架获取 Reader
|
||||
|
||||
#### 4.2.1 单表测试报告
|
||||
|
||||
| 通道数| DataX速度(Rec/s)|DataX流量(MB/s)| DataX机器网卡流出流量(MB/s)|DataX机器运行负载|DB网卡进入流量(MB/s)|DB运行负载|DB TPS|
|
||||
|--------| --------|--------|--------|--------|--------|--------|--------|
|
||||
|1| | | | | | | |
|
||||
|4| | | | | | | |
|
||||
|8| | | | | | | |
|
||||
|16| | | | | | | |
|
||||
|32| | | | | | | |
|
||||
| 通道数 | DataX速度(Rec/s) | DataX流量(MB/s) | DataX机器网卡流出流量(MB/s) | DataX机器运行负载 | DB网卡进入流量(MB/s) | DB运行负载 | DB TPS |
|
||||
| ------ | ---------------- | --------------- | --------------------------- | ----------------- | -------------------- | ---------- | ------ |
|
||||
| 1 | | | | | | | |
|
||||
| 4 | | | | | | | |
|
||||
| 8 | | | | | | | |
|
||||
| 16 | | | | | | | |
|
||||
| 32 | | | | | | | |
|
||||
|
||||
说明:
|
||||
|
||||
@ -143,9 +287,23 @@ TDengineWriter 通过 DataX 框架获取 Reader
|
||||
|
||||
#### 4.2.4 性能测试小结
|
||||
|
||||
1.
|
||||
2.
|
||||
|
||||
## 5 约束限制
|
||||
|
||||
## FAQ
|
||||
## FAQ
|
||||
|
||||
### 如何选取要同步的数据的范围?
|
||||
|
||||
数据范围的选取在Reader插件端配置,对于不同的Reader插件配置方法往往不同。比如对于mysqlreader, 可以用sql语句指定数据范围。对于opentsdbreader, 用beginDateTime和endDateTime两个配置项指定数据范围。
|
||||
|
||||
### 如何一次导入多张源表?
|
||||
|
||||
如果Reader插件支持一次读多张表,Writer插件就能一次导入多张表。如果Reader不支持多多张表,可以建多个job,分别导入。Writer插件只负责写数据。
|
||||
|
||||
### 1张源表导入之后对应TDengine中多少张表?
|
||||
|
||||
这是又tagColumn决定的,如果所有tag列的值都相同,目标表也只有一个。源表有多少不同的tag组合,目标超表就会有多少子表。
|
||||
|
||||
### 源表和目标表的字段顺序一致吗?
|
||||
|
||||
TDengine要求每个表第一列是时间戳列,后边是普通字段,最后是标签列。如果源表不是这个顺序,插件在自动建表是自动调整。
|
@ -19,6 +19,11 @@
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>2.0.34</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>datax-common</artifactId>
|
||||
@ -37,7 +42,11 @@
|
||||
<version>${junit-version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
<version>${commons-lang3-version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -1,34 +0,0 @@
|
||||
package com.alibaba.datax.plugin.writer;
|
||||
|
||||
import com.alibaba.datax.common.element.Column;
|
||||
import com.alibaba.datax.common.element.Record;
|
||||
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
public class DefaultDataHandler implements DataHandler {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
|
||||
|
||||
@Override
|
||||
public long handle(RecordReceiver lineReceiver, Properties properties) {
|
||||
long count = 0;
|
||||
Record record;
|
||||
while ((record = lineReceiver.getFromReader()) != null) {
|
||||
|
||||
int recordLength = record.getColumnNumber();
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (int i = 0; i < recordLength; i++) {
|
||||
Column column = record.getColumn(i);
|
||||
sb.append(column.asString()).append("\t");
|
||||
}
|
||||
sb.setLength(sb.length() - 1);
|
||||
LOG.debug(sb.toString());
|
||||
|
||||
count++;
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package com.alibaba.datax.plugin.writer;
|
||||
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||
|
||||
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||
|
@ -1,4 +1,4 @@
|
||||
package com.alibaba.datax.plugin.writer;
|
||||
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||
|
||||
public class DataHandlerFactory {
|
||||
|
@ -0,0 +1,101 @@
|
||||
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||
|
||||
import com.alibaba.datax.common.element.Record;
|
||||
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||
import com.taosdata.jdbc.TSDBPreparedStatement;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* 默认DataHandler
|
||||
*/
|
||||
public class DefaultDataHandler implements DataHandler {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
|
||||
|
||||
static {
|
||||
try {
|
||||
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
||||
} catch (ClassNotFoundException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long handle(RecordReceiver lineReceiver, Properties properties) {
|
||||
SchemaManager schemaManager = new SchemaManager(properties);
|
||||
if (!schemaManager.configValid()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
try {
|
||||
Connection conn = getTaosConnection(properties);
|
||||
if (conn == null) {
|
||||
return 0;
|
||||
}
|
||||
if (schemaManager.shouldGuessSchema()) {
|
||||
LOG.info("无法从配置文件获取表结构信息,尝试从数据库获取");
|
||||
boolean success = schemaManager.getFromDB(conn);
|
||||
if (!success) {
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
|
||||
}
|
||||
int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, "1000"));
|
||||
return write(lineReceiver, conn, batchSize, schemaManager);
|
||||
} catch (Exception e) {
|
||||
LOG.error("write failed " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
private Connection getTaosConnection(Properties properties) throws SQLException {
|
||||
// 检查必要参数
|
||||
String host = properties.getProperty(Key.HOST);
|
||||
String port = properties.getProperty(Key.PORT);
|
||||
String dbname = properties.getProperty(Key.DBNAME);
|
||||
String user = properties.getProperty(Key.USER);
|
||||
String password = properties.getProperty(Key.PASSWORD);
|
||||
if (host == null || port == null || dbname == null || user == null || password == null) {
|
||||
String keys = String.join(" ", Key.HOST, Key.PORT, Key.DBNAME, Key.USER, Key.PASSWORD);
|
||||
LOG.error("Required options missing, please check: " + keys);
|
||||
return null;
|
||||
}
|
||||
String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", host, port, dbname, user, password);
|
||||
LOG.info("TDengine connection established, host:{} port:{} dbname:{} user:{}", host, port, dbname, user);
|
||||
return DriverManager.getConnection(jdbcUrl);
|
||||
}
|
||||
|
||||
/**
|
||||
* 使用SQL批量写入<br/>
|
||||
*
|
||||
* @return 成功写入记录数
|
||||
* @throws SQLException
|
||||
*/
|
||||
private long write(RecordReceiver lineReceiver, Connection conn, int batchSize, SchemaManager scm) throws SQLException {
|
||||
Record record = lineReceiver.getFromReader();
|
||||
if (record == null) {
|
||||
return 0;
|
||||
}
|
||||
if (scm.shouldCreateTable()) {
|
||||
scm.createSTable(conn, record);
|
||||
}
|
||||
String pq = String.format("INSERT INTO ? USING %s TAGS(%s) (%s) values (%s)", scm.getStable(), scm.getTagValuesPlaceHolder(), scm.getJoinedFieldNames(), scm.getFieldValuesPlaceHolder());
|
||||
LOG.info("Prepared SQL: {}", pq);
|
||||
try (TSDBPreparedStatement stmt = (TSDBPreparedStatement) conn.prepareStatement(pq)) {
|
||||
JDBCBatchWriter batchWriter = new JDBCBatchWriter(stmt, scm, batchSize);
|
||||
do {
|
||||
batchWriter.append(record);
|
||||
} while ((record = lineReceiver.getFromReader()) != null);
|
||||
batchWriter.flush();
|
||||
return batchWriter.getCount();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,149 @@
|
||||
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||
|
||||
import com.alibaba.datax.common.element.Column;
|
||||
import com.alibaba.datax.common.element.Record;
|
||||
import com.alibaba.datax.common.exception.DataXException;
|
||||
import com.taosdata.jdbc.TSDBPreparedStatement;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 使用JDBC原生写入接口批量写入。<br/>
|
||||
* 有两个限制条件导致批量写入的代码逻辑过于复杂,以至于需要开发新的类来封装。<br/>
|
||||
* 1. 用户必须提前把需要批量写入的数据搜集到ArrayList中
|
||||
* 2. 每批写入的表名必须相同。
|
||||
* 这个类的实现逻辑是:
|
||||
* 1. 先把属于同一子表的Record缓存起来
|
||||
* 2. 缓存的数量达到batchSize阈值,自动执行一次批量写入
|
||||
* 3. 最后一批数据需要用户手动flush才能写入
|
||||
*/
|
||||
public class JDBCBatchWriter {
|
||||
public static final Logger LOG = LoggerFactory.getLogger(JDBCBatchWriter.class);
|
||||
|
||||
private TSDBPreparedStatement stmt;
|
||||
private SchemaManager scm;
|
||||
private int batchSize;
|
||||
// 缓存Record, key为tableName
|
||||
Map<String, List<Record>> buf = new HashMap<>();
|
||||
// 缓存表的标签值, key为tableName
|
||||
Map<String, String[]> tableTagValues = new HashMap<>();
|
||||
private long sucCount = 0;
|
||||
private final int tsColIndex;
|
||||
private List<String> fieldList;
|
||||
private Map<String, Integer> fieldIndexMap;
|
||||
|
||||
public JDBCBatchWriter(TSDBPreparedStatement stmt, SchemaManager scm, int batchSize) {
|
||||
this.stmt = stmt;
|
||||
this.scm = scm;
|
||||
this.batchSize = batchSize;
|
||||
this.tsColIndex = scm.getTsColIndex();
|
||||
this.fieldList = scm.getFieldList();
|
||||
this.fieldIndexMap = scm.getFieldIndexMap();
|
||||
}
|
||||
|
||||
|
||||
public void append(Record record) throws SQLException {
|
||||
String[] tagValues = scm.getTagValuesFromRecord(record);
|
||||
String tableName = scm.computeTableName(tagValues);
|
||||
if (buf.containsKey(tableName)) {
|
||||
List<Record> lis = buf.get(tableName);
|
||||
lis.add(record);
|
||||
if (lis.size() == batchSize) {
|
||||
executeBatch(tableName);
|
||||
lis.clear();
|
||||
}
|
||||
} else {
|
||||
List<Record> lis = new ArrayList<>(batchSize);
|
||||
lis.add(record);
|
||||
buf.put(tableName, lis);
|
||||
tableTagValues.put(tableName, tagValues);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行单表批量写入
|
||||
*
|
||||
* @param tableName
|
||||
* @throws SQLException
|
||||
*/
|
||||
private void executeBatch(String tableName) throws SQLException {
|
||||
// 表名
|
||||
stmt.setTableName(tableName);
|
||||
List<Record> records = buf.get(tableName);
|
||||
// 标签
|
||||
String[] tagValues = tableTagValues.get(tableName);
|
||||
LOG.debug("executeBatch {}", String.join(",", tagValues));
|
||||
for (int i = 0; i < tagValues.length; ++i) {
|
||||
stmt.setTagNString(i, tagValues[i]);
|
||||
}
|
||||
// 时间戳
|
||||
ArrayList<Long> tsList = records.stream().map(r -> r.getColumn(tsColIndex).asDate().getTime()).collect(Collectors.toCollection(ArrayList::new));
|
||||
stmt.setTimestamp(0, tsList);
|
||||
// 字段
|
||||
Record record = records.get(0);
|
||||
for (int i = 0; i < fieldList.size(); ) {
|
||||
String fieldName = fieldList.get(i);
|
||||
int index = fieldIndexMap.get(fieldName);
|
||||
Column column = record.getColumn(index);
|
||||
switch (column.getType()) {
|
||||
case LONG:
|
||||
ArrayList<Long> lisLong = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new));
|
||||
stmt.setLong(++i, lisLong);
|
||||
break;
|
||||
case DOUBLE:
|
||||
ArrayList<Double> lisDouble = records.stream().map(r -> r.getColumn(index).asDouble()).collect(Collectors.toCollection(ArrayList::new));
|
||||
stmt.setDouble(++i, lisDouble);
|
||||
break;
|
||||
case STRING:
|
||||
ArrayList<String> lisString = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new));
|
||||
stmt.setNString(++i, lisString, 64);
|
||||
break;
|
||||
case DATE:
|
||||
ArrayList<Long> lisTs = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new));
|
||||
stmt.setTimestamp(++i, lisTs);
|
||||
break;
|
||||
case BOOL:
|
||||
ArrayList<Boolean> lisBool = records.stream().map(r -> r.getColumn(index).asBoolean()).collect(Collectors.toCollection(ArrayList::new));
|
||||
stmt.setBoolean(++i, lisBool);
|
||||
break;
|
||||
case BYTES:
|
||||
ArrayList<String> lisBytes = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new));
|
||||
stmt.setString(++i, lisBytes, 64);
|
||||
break;
|
||||
default:
|
||||
throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, column.getType().toString());
|
||||
}
|
||||
}
|
||||
// 执行
|
||||
stmt.columnDataAddBatch();
|
||||
stmt.columnDataExecuteBatch();
|
||||
// 更新计数器
|
||||
sucCount += records.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* 把缓存的Record全部写入
|
||||
*/
|
||||
public void flush() throws SQLException {
|
||||
for (String tabName : buf.keySet()) {
|
||||
if (buf.get(tabName).size() > 0) {
|
||||
executeBatch(tabName);
|
||||
}
|
||||
}
|
||||
stmt.columnDataCloseBatch();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return 成功写入的数据量
|
||||
*/
|
||||
public long getCount() {
|
||||
return sucCount;
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package com.alibaba.datax.plugin.writer;
|
||||
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||
|
||||
import java.util.Properties;
|
||||
|
@ -1,4 +1,4 @@
|
||||
package com.alibaba.datax.plugin.writer;
|
||||
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||
|
||||
public class Key {
|
||||
public static final String HOST = "host";
|
||||
@ -7,5 +7,8 @@ public class Key {
|
||||
public static final String USER = "user";
|
||||
public static final String PASSWORD = "password";
|
||||
public static final String BATCH_SIZE = "batchSize";
|
||||
|
||||
public static final String STABLE = "stable";
|
||||
public static final String TAG_COLUMN = "tagColumn";
|
||||
public static final String FIELD_COLUMN = "fieldColumn";
|
||||
public static final String TIMESTAMP_COLUMN = "timestampColumn";
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package com.alibaba.datax.plugin.writer;
|
||||
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||
|
||||
import com.alibaba.datax.common.element.Column;
|
||||
import com.alibaba.datax.common.element.Record;
|
@ -0,0 +1,255 @@
|
||||
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||
|
||||
import com.alibaba.datax.common.element.Column;
|
||||
import com.alibaba.datax.common.element.Record;
|
||||
import com.alibaba.datax.common.exception.DataXException;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class SchemaManager {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class);
|
||||
|
||||
private String stable; // 目标超表名
|
||||
private Map<String, String> fixedTagValue = new HashMap<>(); // 固定标签值 标签名 -> 标签值
|
||||
private Map<String, Integer> tagIndexMap = new HashMap<>(); // 动态标签值 标签名 -> 列索引
|
||||
private Map<String, Integer> fieldIndexMap = new HashMap<>(); // 字段名 -> 字段索引
|
||||
private String tsColName; // 时间戳列名
|
||||
private int tsColIndex = -1; // 时间戳列索引
|
||||
private List<String> fieldList = new ArrayList<>();
|
||||
private List<String> tagList = new ArrayList<>();
|
||||
private boolean canInferSchemaFromConfig = false;
|
||||
|
||||
|
||||
public SchemaManager() {
|
||||
}
|
||||
|
||||
public SchemaManager(Properties properties) {
|
||||
getFromConfig(properties);
|
||||
}
|
||||
|
||||
private String mapDataxType(Column.Type type) {
|
||||
switch (type) {
|
||||
case LONG:
|
||||
return "BIGINT";
|
||||
case DOUBLE:
|
||||
return "DOUBLE";
|
||||
case STRING:
|
||||
return "NCHAR(64)";
|
||||
case DATE:
|
||||
return "TIMESTAMP";
|
||||
case BOOL:
|
||||
return "BOOL";
|
||||
case BYTES:
|
||||
return "BINARY";
|
||||
default:
|
||||
throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, type.toString());
|
||||
}
|
||||
}
|
||||
|
||||
public void setStable(String stable) {
|
||||
stable = stable;
|
||||
}
|
||||
|
||||
public String getStable() {
|
||||
return stable;
|
||||
}
|
||||
|
||||
private void getFromConfig(Properties properties) {
|
||||
stable = properties.getProperty(Key.STABLE);
|
||||
if (stable == null) {
|
||||
LOG.error("配置错误: no stable");
|
||||
return;
|
||||
}
|
||||
for (Object key : properties.keySet()) {
|
||||
String k = (String) key;
|
||||
String v = properties.getProperty(k);
|
||||
|
||||
String[] ps = k.split("\\.");
|
||||
if (ps.length == 1) {
|
||||
continue;
|
||||
}
|
||||
if (k.startsWith(Key.TAG_COLUMN)) {
|
||||
String tagName = ps[1];
|
||||
try {
|
||||
Integer tagIndex = Integer.parseInt(v);
|
||||
this.tagIndexMap.put(tagName, tagIndex);
|
||||
tagList.add(tagName);
|
||||
} catch (NumberFormatException e) {
|
||||
fixedTagValue.put(tagName, v);
|
||||
tagList.add(tagName);
|
||||
}
|
||||
} else if (k.startsWith(Key.FIELD_COLUMN)) {
|
||||
String fieldName = ps[1];
|
||||
Integer fileIndex = Integer.parseInt(v);
|
||||
fieldIndexMap.put(fieldName, fileIndex);
|
||||
} else if (k.startsWith(Key.TIMESTAMP_COLUMN)) {
|
||||
tsColName = ps[1];
|
||||
tsColIndex = Integer.parseInt(v);
|
||||
}
|
||||
}
|
||||
List<String> sortedFieldName = fieldIndexMap.entrySet().stream().sorted((x, y) -> x.getValue().compareTo(y.getValue())).map(e -> e.getKey()).collect(Collectors.toList());
|
||||
fieldList.addAll(sortedFieldName); // 排序的目的是保证自动建表时列的顺序和输入数据的列的顺序保持一致
|
||||
canInferSchemaFromConfig = tsColIndex > -1 && !(fixedTagValue.isEmpty() && tagIndexMap.isEmpty()) && !fieldIndexMap.isEmpty();
|
||||
LOG.info("配置文件解析结果:fixedTags=[{}] ,tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", fixedTagValue.keySet()), String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex);
|
||||
}
|
||||
|
||||
public boolean shouldGuessSchema() {
|
||||
return !canInferSchemaFromConfig;
|
||||
}
|
||||
|
||||
public boolean shouldCreateTable() {
|
||||
return canInferSchemaFromConfig;
|
||||
}
|
||||
|
||||
public boolean configValid() {
|
||||
boolean valid = (tagList.size() > 0 && fieldList.size() > 0 && tsColIndex > -1) || (tagList.size() == 0 && fieldList.size() == 0 && tsColIndex == -1);
|
||||
if (!valid) {
|
||||
LOG.error("配置错误. tag_columns,field_columns,timestamp_column必须同时存在或同时省略,当前解析结果: tag_columns: {}, field_columns:{}, timestamp_column:{} tsColIndex:{}",
|
||||
(fixedTagValue.size() + tagIndexMap.size()), fieldIndexMap.size(), tsColName, tsColIndex);
|
||||
}
|
||||
return valid;
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过执行`describe dbname.stable`命令,获取表的schema.<br/>
|
||||
* describe命名返回有4列内容,分布是:Field,Type,Length,Note<br/>
|
||||
*
|
||||
* @return 成功返回true,如果超表不存在或其他错误则返回false
|
||||
*/
|
||||
public boolean getFromDB(Connection conn) {
|
||||
try {
|
||||
List<String> stables = getSTables(conn);
|
||||
if (!stables.contains(stable)) {
|
||||
LOG.error("超级表{}不存在,无法从数据库获取表结构信息.", stable);
|
||||
return false;
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
LOG.error(e.getMessage());
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
}
|
||||
try (Statement stmt = conn.createStatement()) {
|
||||
ResultSet rs = stmt.executeQuery("describe " + stable);
|
||||
int colIndex = 0;
|
||||
while (rs.next()) {
|
||||
String name = rs.getString(1);
|
||||
String type = rs.getString(2);
|
||||
String note = rs.getString(4);
|
||||
if ("TIMESTAMP".equals(type)) {
|
||||
tsColName = name;
|
||||
tsColIndex = colIndex;
|
||||
} else if ("TAG".equals(note)) {
|
||||
tagIndexMap.put(name, colIndex);
|
||||
tagList.add(name);
|
||||
} else {
|
||||
fieldIndexMap.put(name, colIndex);
|
||||
fieldList.add(name);
|
||||
}
|
||||
colIndex++;
|
||||
}
|
||||
LOG.info("从数据库获取的表结构概要:tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex);
|
||||
return true;
|
||||
} catch (SQLException e) {
|
||||
LOG.error(e.getMessage());
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public static List<String> getSTables(Connection conn) throws SQLException {
|
||||
List<String> stables = new ArrayList<>();
|
||||
try (Statement stmt = conn.createStatement()) {
|
||||
ResultSet rs = stmt.executeQuery("show stables");
|
||||
while (rs.next()) {
|
||||
String name = rs.getString(1);
|
||||
stables.add(name);
|
||||
}
|
||||
}
|
||||
return stables;
|
||||
}
|
||||
|
||||
public void createSTable(Connection conn, Record record) throws SQLException {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("CREATE STABLE IF NOT EXISTS ").append(stable).append("(");
|
||||
sb.append(tsColName).append(" ").append("TIMESTAMP,");
|
||||
for (String fieldName : fieldList) {
|
||||
sb.append(fieldName).append(' ');
|
||||
Column col = record.getColumn(fieldIndexMap.get(fieldName));
|
||||
String tdType = mapDataxType(col.getType());
|
||||
sb.append(tdType).append(',');
|
||||
}
|
||||
sb.deleteCharAt(sb.length() - 1);
|
||||
sb.append(") TAGS(");
|
||||
for (String tagName : tagList) {
|
||||
sb.append(tagName).append(" NCHAR(64),");
|
||||
}
|
||||
sb.deleteCharAt(sb.length() - 1);
|
||||
sb.append(")");
|
||||
String q = sb.toString();
|
||||
LOG.info("自动创建超级表:" + q);
|
||||
try (Statement stmt = conn.createStatement()) {
|
||||
stmt.execute(q);
|
||||
}
|
||||
}
|
||||
|
||||
public String[] getTagValuesFromRecord(Record record) {
|
||||
String[] tagValues = new String[tagList.size()];
|
||||
for (int i = 0; i < tagList.size(); ++i) {
|
||||
if (fixedTagValue.containsKey(tagList.get(i))) {
|
||||
tagValues[i] = fixedTagValue.get(tagList.get(i));
|
||||
} else {
|
||||
int tagIndex = tagIndexMap.get(tagList.get(i));
|
||||
tagValues[i] = record.getColumn(tagIndex).asString();
|
||||
}
|
||||
}
|
||||
return tagValues;
|
||||
}
|
||||
|
||||
public Map<String, Integer> getFieldIndexMap() {
|
||||
return fieldIndexMap;
|
||||
}
|
||||
|
||||
public List<String> getFieldList() {
|
||||
return fieldList;
|
||||
}
|
||||
|
||||
public String getJoinedFieldNames() {
|
||||
return tsColName + ", " + String.join(", ", fieldList);
|
||||
}
|
||||
|
||||
public int getTsColIndex() {
|
||||
return tsColIndex;
|
||||
}
|
||||
|
||||
public String getTagValuesPlaceHolder() {
|
||||
return tagList.stream().map(x -> "?").collect(Collectors.joining(","));
|
||||
}
|
||||
|
||||
public String getFieldValuesPlaceHolder() {
|
||||
return "?, " + fieldList.stream().map(x -> "?").collect(Collectors.joining(", "));
|
||||
}
|
||||
|
||||
/**
|
||||
* 计算子表表名
|
||||
* <ol>
|
||||
* <li>将标签的value 组合成为如下的字符串: tag_value1!tag_value2!tag_value3。</li>
|
||||
* <li>计算该字符串的 MD5 散列值 "md5_val"。</li>
|
||||
* <li>"t_md5val"作为子表名。其中的 "t" 是固定的前缀。</li>
|
||||
* </ol>
|
||||
*
|
||||
* @param tagValues
|
||||
* @return
|
||||
*/
|
||||
public String computeTableName(String[] tagValues) {
|
||||
String s = String.join("!", tagValues);
|
||||
return "t_" + DigestUtils.md5Hex(s);
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package com.alibaba.datax.plugin.writer;
|
||||
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||
|
||||
|
||||
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||
@ -64,7 +64,13 @@ public class TDengineWriter extends Writer {
|
||||
String value = this.writerSliceConfig.getString(key);
|
||||
properties.setProperty(key, value);
|
||||
}
|
||||
|
||||
if (!keys.contains(Key.USER)) {
|
||||
properties.setProperty(Key.USER, "root");
|
||||
}
|
||||
if (!keys.contains(Key.PASSWORD)) {
|
||||
properties.setProperty(Key.PASSWORD, "taosdata");
|
||||
}
|
||||
LOG.debug("========================properties==========================\n" + properties.toString());
|
||||
String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME);
|
||||
LOG.debug("start to handle record from: " + peerPluginName);
|
||||
DataHandler handler = DataHandlerFactory.build(peerPluginName);
|
@ -1,9 +1,10 @@
|
||||
package com.alibaba.datax.plugin.writer;
|
||||
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||
|
||||
import com.alibaba.datax.common.spi.ErrorCode;
|
||||
|
||||
public enum TDengineWriterErrorCode implements ErrorCode {
|
||||
RUNTIME_EXCEPTION("TDengineWriter-00", "运行时异常");
|
||||
RUNTIME_EXCEPTION("TDengineWriter-00", "运行时异常"),
|
||||
TYPE_ERROR("TDengineWriter-00", "Datax类型无法正确映射到TDengine类型");
|
||||
|
||||
private final String code;
|
||||
private final String description;
|
@ -1,9 +1,9 @@
|
||||
{
|
||||
"name": "tdenginewriter",
|
||||
"class": "com.alibaba.datax.plugin.writer.TDengineWriter",
|
||||
"class": "com.alibaba.datax.plugin.writer.tdenginewriter.TDengineWriter",
|
||||
"description": {
|
||||
"useScene": "data migration to tdengine",
|
||||
"mechanism": "use JNI to write data to tdengine."
|
||||
"mechanism": "use JNI or taos-jdbc to write data to tdengine."
|
||||
},
|
||||
"developer": "zyyang-taosdata"
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package com.alibaba.datax.plugin.writer;
|
||||
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||
|
||||
import org.junit.Test;
|
||||
|
@ -0,0 +1,21 @@
|
||||
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
|
||||
public class TDengineWriterTest {
|
||||
|
||||
|
||||
@Test
|
||||
public void testGetSchema() throws ClassNotFoundException, SQLException {
|
||||
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
||||
String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", "wozai.fun", "6030", "test", "root", "taosdata");
|
||||
Connection conn = DriverManager.getConnection(jdbcUrl);
|
||||
SchemaManager schemaManager = new SchemaManager();
|
||||
schemaManager.setStable("test1");
|
||||
schemaManager.getFromDB(conn);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user