Merge pull request #5 from taosdata/feature/TD-10941

TD-10941:[OPPO] dataX MongoDB迁移到TDengine
This commit is contained in:
Shuduo Sang 2021-11-20 10:57:04 +08:00 committed by GitHub
commit 801b1a0a8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1047 additions and 102 deletions

View File

@ -0,0 +1,78 @@
{
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": [
"127.0.0.1:27017"
],
"userName": "mongouser",
"mechanism": "SCRAM-SHA-1",
"userPassword": "mongopass",
"authDb": "admin",
"dbName": "test",
"collectionName": "cu_market_data",
"column": [
{
"name": "instrumentID",
"type": "string"
},
{
"name": "tradeTime",
"type": "date"
},
{
"name": "lastPrice",
"type": "double"
},
{
"name": "askPrice1",
"type": "double"
},
{
"name": "bidPrice1",
"type": "double"
},
{
"name": "volume",
"type": "int"
}
]
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"host": "127.0.0.1",
"port": 6030,
"dbname": "test",
"user": "root",
"password": "taosdata",
"stable": "market_snapshot",
"batchSize": 35,
"tagColumn": {
"product": "cu",
"instrumentID": 0
},
"fieldColumn": {
"lastPrice": 2,
"askPrice1": 3,
"bidPrice1": 4,
"volume": 5
},
"timestampColumn": {
"tradeTime": 1
}
}
}
}
]
}
}

View File

@ -127,6 +127,7 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据通过主控的J
* address MongoDB的数据地址信息因为MonogDB可能是个集群则ip端口信息需要以Json数组的形式给出。【必填】
* userNameMongoDB的用户名。【选填】
* userPassword MongoDB的密码。【选填】
* authDb: MongoDB认证数据库【选填】
* collectionName MonogoDB的集合名。【必填】
* columnMongoDB的文档列名。【必填】
* nameColumn的名字。【必填】

View File

@ -2,20 +2,21 @@
## 1 快速介绍
TDengineWriter 插件实现了写入数据到 TDengine 的功能。 在底层实现上, TDengineWriter 通过 JNI的方式调用libtaos.so/tao.dll中的方法连接 TDengine
数据库实例并执行schemaless的写入。 TDengineWriter 面向ETL开发工程师他们使用 TDengineWriter 从数仓导入数据到 TDengine。同时TDengineWriter
亦可以作为数据迁移工具为DBA等用户提供服务。
TDengineWriter插件实现了写入数据到TDengine数据库功能。可用于离线同步其它数据库的数据到TDengine。
## 2 实现原理
TDengineWriter 通过 DataX 框架获取 Reader
生成的协议数据根据reader的类型解析数据通过JNI方式调用libtaos.so或taos.dll中的方法使用schemaless的方式写入到TDengine。
TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据根据reader的类型解析数据。目前有两种写入方式
1. 对于OpenTSDBReader, TDengineWriter通过JNI方式调用TDengine客户端库文件taos.lib或taos.dll中的方法使用[schemaless的方式](https://www.taosdata.com/cn/documentation/insert#schemaless)写入。
2. 对于其它数据源,会根据配置生成SQL语句, 通过[taos-jdbcdriver](https://www.taosdata.com/cn/documentation/connector/java)批量写入。
这样区分的原因是OpenTSDBReader将opentsdb的数据统一读取为json字符串Writer端接收到的数据只有1列。而其它Reader插件一般会把数据放在不同列。
## 3 功能说明
### 3.1 配置样例
* 这里使用一份从OpenTSDB产生到 TDengine 导入的数据。
### 3.1 从OpenTSDB到TDengine
#### 3.1.1 配置样例
```json
{
@ -54,46 +55,189 @@ TDengineWriter 通过 DataX 框架获取 Reader
}
```
### 3.2 参数说明
#### 3.1.2 参数说明
* **host**
* 描述TDengine实例的host。
| 参数 | 描述 | 是否必选 | 默认值 |
| --------- | -------------------- | -------- | -------- |
| host | TDengine实例的host | 是 | 无 |
| port | TDengine实例的port | 是 | 无 |
| user | TDengine实例的用户名 | 否 | root |
| password | TDengine实例的密码 | 否 | taosdata |
| dbname | 目的数据库的名称 | 是 | 无 |
| batchSize | 每次批量插入多少记录 | 否 | 1 |
* 必选:是 <br />
* 默认值:无 <br />
* **port**
* 描述TDengine实例的port。
* 必选:是 <br />
* 默认值:无 <br />
* **dbname**
* 描述:目的数据库的名称。
#### 3.1.3 类型转换
* 必选:是 <br />
目前由于OpenTSDBReader将opentsdb的数据统一读取为json字符串TDengineWriter 在做Opentsdb到TDengine的迁移时按照以下类型进行处理
* 默认值:无 <br />
* **username**
* 描述TDengine实例的用户名 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **password**
* 描述TDengine实例的密码 <br />
* 必选:是 <br />
* 默认值:无 <br />
### 3.3 类型转换
目前由于opentsdbreader将opentsdb的数据统一读取为json字符串TDengineWriter 在做Opentsdb到TDengine的迁移时按照以下类型进行处理
| OpenTSDB数据类型 | DataX 内部类型| TDengine 数据类型 |
| -------- | ----- | -------- |
| OpenTSDB数据类型 | DataX 内部类型 | TDengine 数据类型 |
| ---------------- | -------------- | ----------------- |
| timestamp | Date | timestamp |
| Integervalue | Double | double |
| Floatvalue | Double | double |
| Stringvalue | String | binary |
| Floatvalue | Double | double |
| Stringvalue | String | binary |
| Integertag | String | binary |
| Floattag | String |binary |
| Stringtag | String |binary |
| Floattag | String | binary |
| Stringtag | String | binary |
### 3.2 从MongoDB到TDengine
#### 3.2.1 配置样例
```json
{
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": [
"127.0.0.1:27017"
],
"userName": "user",
"mechanism": "SCRAM-SHA-1",
"userPassword": "password",
"authDb": "admin",
"dbName": "test",
"collectionName": "stock",
"column": [
{
"name": "stockID",
"type": "string"
},
{
"name": "tradeTime",
"type": "date"
},
{
"name": "lastPrice",
"type": "double"
},
{
"name": "askPrice1",
"type": "double"
},
{
"name": "bidPrice1",
"type": "double"
},
{
"name": "volume",
"type": "int"
}
]
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"host": "localhost",
"port": 6030,
"dbname": "test",
"user": "root",
"password": "taosdata",
"stable": "stock",
"tagColumn": {
"industry": "energy",
"stockID": 0
},
"fieldColumn": {
"lastPrice": 2,
"askPrice1": 3,
"bidPrice1": 4,
"volume": 5
},
"timestampColumn": {
"tradeTime": 1
}
}
}
}
]
}
}
```
**注本配置的writer部分同样适用于关系型数据库**
#### 3.2.2 参数说明
| 参数 | 描述 | 是否必选 | 默认值 | 备注 |
| --------------- | -------------------- | ---------------- | -------- | ------------------ |
| host | TDengine实例的host | 是 | 无 |
| port | TDengine实例的port | 是 | 无 |
| user | TDengine实例的用户名 | 否 | root |
| password | TDengine实例的密码 | 否 | taosdata |
| dbname | 目的数据库的名称 | 是 | 无 |
| batchSize | 每次批量插入多少记录 | 否 | 1000 |
| stable | 目标超级表的名称 | 是(OpenTSDB除外) | 无 |
| tagColumn | 标签列的列名和位置 | 否 | 无 | 位置索引均从0开始 |
| fieldColumn | 字段列的列名和位置 | 否 | 无 | |
| timestampColumn | 时间戳列的列名和位置 | 否 | 无 | 时间戳列只能有一个 |
#### 3.3.3 自动建表规则
##### 3.3.3.1 超级表创建规则
如果配置了tagColumn、 fieldColumn和timestampColumn将会在插入第一条数据前自动创建超级表。<br>
数据列的类型从第1条记录自动推断, 标签列默认类型为`NCHAR(64)`, 比如示例配置,可能生成以下建表语句:
```sql
CREATE STABLE IF NOT EXISTS market_snapshot (
tadetime TIMESTAMP,
lastprice DOUBLE,
askprice1 DOUBLE,
bidprice1 DOUBLE,
volume INT
)
TAGS(
industry NCHAR(64),
stockID NCHAR(64)
);
```
##### 3.3.3.2 子表创建规则
子表结果与超表相同,子表表名生成规则:
1. 将标签的value 组合成为如下的字符串: `tag_value1!tag_value2!tag_value3`
2. 计算该字符串的 MD5 散列值 "md5_val"。
3. "t_md5val"作为子表名。其中的 "t" 是固定的前缀。
#### 3.3.4 用户提前建表
如果你已经创建好目标超级表那么tagColumn、 fieldColumn和timestampColumn三个字段均可省略, 插件将通过执行通过`describe stableName`获取表结构的信息。
此时要求接收到的Record中Column的顺序和执行`describe stableName`返回的列顺序相同, 比如通过`describe stableName`返回以下内容:
```
Field | Type | Length | Note |
=================================================================================
ts | TIMESTAMP | 8 | |
current | DOUBLE | 8 | |
location | BINARY | 10 | TAG |
```
那么插件收到的数据第1列必须代表时间戳第2列必须代表电流第3列必须代表位置。
#### 3.3.5 注意事项
1. tagColumn、 fieldColumn和timestampColumn三个字段用于描述目标表的结构信息这三个配置字段必须同时存在或同时省略。
2. 如果存在以上三个配置,且目标表也已经存在,则两者必须一致。**一致性**由用户自己保证,插件不做检查。不一致可能会导致插入失败或插入数据错乱。
3. 插件优先使用配置文件中指定的表结构。
#### 3.3.6 类型转换
| MongoDB 数据类型 | DataX 内部类型 | TDengine 数据类型 |
| ---------------- | -------------- | ----------------- |
| int, Long | Long | BIGINT |
| double | Double | DOUBLE |
| string, array | String | NCHAR(64) |
| date | Date | TIMESTAMP |
| boolean | Boolean | BOOL |
| bytes | Bytes | BINARY |
## 4 性能报告
@ -127,13 +271,13 @@ TDengineWriter 通过 DataX 框架获取 Reader
#### 4.2.1 单表测试报告
| 通道数| DataX速度(Rec/s)|DataX流量(MB/s)| DataX机器网卡流出流量(MB/s)|DataX机器运行负载|DB网卡进入流量(MB/s)|DB运行负载|DB TPS|
|--------| --------|--------|--------|--------|--------|--------|--------|
|1| | | | | | | |
|4| | | | | | | |
|8| | | | | | | |
|16| | | | | | | |
|32| | | | | | | |
| 通道数 | DataX速度(Rec/s) | DataX流量(MB/s) | DataX机器网卡流出流量(MB/s) | DataX机器运行负载 | DB网卡进入流量(MB/s) | DB运行负载 | DB TPS |
| ------ | ---------------- | --------------- | --------------------------- | ----------------- | -------------------- | ---------- | ------ |
| 1 | | | | | | | |
| 4 | | | | | | | |
| 8 | | | | | | | |
| 16 | | | | | | | |
| 32 | | | | | | | |
说明:
@ -143,9 +287,30 @@ TDengineWriter 通过 DataX 框架获取 Reader
#### 4.2.4 性能测试小结
1.
2.
## 5 约束限制
1. 本插件自动创建超级表时NCHAR类型的长度固定为64对于包含长度大于64的字符串的数据源将不支持。
2. 标签列不能包含null值如果包含会被过滤掉。
## FAQ
### 如何选取要同步的数据的范围?
数据范围的选取在Reader插件端配置对于不同的Reader插件配置方法往往不同。比如对于mysqlreader 可以用sql语句指定数据范围。对于opentsdbreader, 用beginDateTime和endDateTime两个配置项指定数据范围。
### 如何一次导入多张源表?
如果Reader插件支持一次读多张表Writer插件就能一次导入多张表。如果Reader不支持多多张表可以建多个job分别导入。Writer插件只负责写数据。
### 一张源表导入之后对应TDengine中多少张表
这是由tagColumn决定的如果所有tag列的值都相同那么目标表只有一个。源表有多少不同的tag组合目标超表就有多少子表。
### 源表和目标表的字段顺序一致吗?
TDengine要求每个表第一列是时间戳列后边是普通字段最后是标签列。如果源表不是这个顺序插件在自动建表是自动调整。
### 插件如何确定各列的数据类型?
根据收到的第一批数据自动推断各列的类型。

View File

@ -19,6 +19,11 @@
</properties>
<dependencies>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.34</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
@ -37,7 +42,11 @@
<version>${junit-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3-version}</version>
</dependency>
</dependencies>
<build>

View File

@ -1,34 +0,0 @@
package com.alibaba.datax.plugin.writer;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.RecordReceiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class DefaultDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
@Override
public long handle(RecordReceiver lineReceiver, Properties properties) {
long count = 0;
Record record;
while ((record = lineReceiver.getFromReader()) != null) {
int recordLength = record.getColumnNumber();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < recordLength; i++) {
Column column = record.getColumn(i);
sb.append(column.asString()).append("\t");
}
sb.setLength(sb.length() - 1);
LOG.debug(sb.toString());
count++;
}
return count;
}
}

View File

@ -1,10 +1,11 @@
package com.alibaba.datax.plugin.writer;
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import java.util.Properties;
public interface DataHandler {
long handle(RecordReceiver lineReceiver, Properties properties);
long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector);
}

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer;
package com.alibaba.datax.plugin.writer.tdenginewriter;
public class DataHandlerFactory {

View File

@ -0,0 +1,105 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.taosdata.jdbc.TSDBPreparedStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
/**
* 默认DataHandler
*/
public class DefaultDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
static {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
@Override
public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) {
SchemaManager schemaManager = new SchemaManager(properties);
if (!schemaManager.configValid()) {
return 0;
}
try {
Connection conn = getTaosConnection(properties);
if (conn == null) {
return 0;
}
if (schemaManager.shouldGuessSchema()) {
// 无法从配置文件获取表结构信息尝试从数据库获取
LOG.info(Msg.get("try_get_schema_from_db"));
boolean success = schemaManager.getFromDB(conn);
if (!success) {
return 0;
}
} else {
}
int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, "1000"));
if (batchSize < 5) {
// batchSize太小会增加自动类型推断错误的概率建议改大后重试
LOG.error(Msg.get("batch_size_too_small"));
return 0;
}
return write(lineReceiver, conn, batchSize, schemaManager, collector);
} catch (Exception e) {
LOG.error("write failed " + e.getMessage());
e.printStackTrace();
}
return 0;
}
private Connection getTaosConnection(Properties properties) throws SQLException {
// 检查必要参数
String host = properties.getProperty(Key.HOST);
String port = properties.getProperty(Key.PORT);
String dbname = properties.getProperty(Key.DBNAME);
String user = properties.getProperty(Key.USER);
String password = properties.getProperty(Key.PASSWORD);
if (host == null || port == null || dbname == null || user == null || password == null) {
String keys = String.join(" ", Key.HOST, Key.PORT, Key.DBNAME, Key.USER, Key.PASSWORD);
LOG.error("Required options missing, please check: " + keys);
return null;
}
String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", host, port, dbname, user, password);
LOG.info("TDengine connection established, host:{} port:{} dbname:{} user:{}", host, port, dbname, user);
return DriverManager.getConnection(jdbcUrl);
}
/**
* 使用SQL批量写入<br/>
*
* @return 成功写入记录数
* @throws SQLException
*/
private long write(RecordReceiver lineReceiver, Connection conn, int batchSize, SchemaManager scm, TaskPluginCollector collector) throws SQLException {
Record record = lineReceiver.getFromReader();
if (record == null) {
return 0;
}
String pq = String.format("INSERT INTO ? USING %s TAGS(%s) (%s) values (%s)", scm.getStable(), scm.getTagValuesPlaceHolder(), scm.getJoinedFieldNames(), scm.getFieldValuesPlaceHolder());
LOG.info("Prepared SQL: {}", pq);
try (TSDBPreparedStatement stmt = (TSDBPreparedStatement) conn.prepareStatement(pq)) {
JDBCBatchWriter batchWriter = new JDBCBatchWriter(conn, stmt, scm, batchSize, collector);
do {
batchWriter.append(record);
} while ((record = lineReceiver.getFromReader()) != null);
batchWriter.flush();
return batchWriter.getCount();
}
}
}

View File

@ -0,0 +1,244 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.taosdata.jdbc.TSDBPreparedStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 使用JDBC原生写入接口批量写入<br/>
* 有两个限制条件导致批量写入的代码逻辑过于复杂以至于需要开发新的类来封装<br/>
* 1. 用户必须提前把需要批量写入的数据搜集到ArrayList中
* 2. 每批写入的表名必须相同
* 这个类的实现逻辑是
* 1. 先把属于同一子表的Record缓存起来
* 2. 缓存的数量达到batchSize阈值自动执行一次批量写入
* 3. 最后一批数据需要用户手动flush才能写入
*/
public class JDBCBatchWriter {
public static final Logger LOG = LoggerFactory.getLogger(JDBCBatchWriter.class);
private TSDBPreparedStatement stmt;
private SchemaManager scm;
private Connection conn;
private int batchSize;
private TaskPluginCollector collector;
// 缓存Record key为tableName
Map<String, List<Record>> buf = new HashMap<>();
// 缓存表的标签值 key为tableName
Map<String, String[]> tableTagValues = new HashMap<>();
private long sucCount = 0;
private final int tsColIndex;
private List<String> fieldList;
// 每个record至少应该包含的列数用于校验数据
private int minColNum = 0;
private Map<String, Integer> fieldIndexMap;
private List<Column.Type> fieldTypes = null;
public JDBCBatchWriter(Connection conn, TSDBPreparedStatement stmt, SchemaManager scm, int batchSize, TaskPluginCollector collector) {
this.conn = conn;
this.stmt = stmt;
this.scm = scm;
this.batchSize = batchSize;
this.collector = collector;
this.tsColIndex = scm.getTsColIndex();
this.fieldList = scm.getFieldList();
this.fieldIndexMap = scm.getFieldIndexMap();
this.minColNum = 1 + fieldList.size() + scm.getDynamicTagCount();
}
public void initFiledTypesAndTargetTable(List<Record> records) throws SQLException {
if (fieldTypes != null) {
return;
}
guessFieldTypes(records);
if (scm.shouldCreateTable()) {
scm.createSTable(conn, fieldTypes);
}
}
public void append(Record record) throws SQLException {
int columnNum = record.getColumnNumber();
if (columnNum < minColNum) {
// 实际列数小于期望列数
collector.collectDirtyRecord(record, Msg.get("column_number_error"));
return;
}
String[] tagValues = scm.getTagValuesFromRecord(record);
if (tagValues == null) {
// 标签列包含null
collector.collectDirtyRecord(record, Msg.get("tag_value_error"));
return;
}
if (!scm.hasTimestamp(record)) {
// 时间戳列为null或类型错误
collector.collectDirtyRecord(record, Msg.get("ts_value_error"));
return;
}
String tableName = scm.computeTableName(tagValues);
if (buf.containsKey(tableName)) {
List<Record> lis = buf.get(tableName);
lis.add(record);
if (lis.size() == batchSize) {
if (fieldTypes == null) {
initFiledTypesAndTargetTable(lis);
}
executeBatch(tableName);
lis.clear();
}
} else {
List<Record> lis = new ArrayList<>(batchSize);
lis.add(record);
buf.put(tableName, lis);
tableTagValues.put(tableName, tagValues);
}
}
/**
* 只有String类型比较特别测试发现值为null的列会转成String类型所以Column的类型为String并不代表这一列的类型真的是String
*
* @param records
*/
private void guessFieldTypes(List<Record> records) {
fieldTypes = new ArrayList<>(fieldList.size());
for (int i = 0; i < fieldList.size(); ++i) {
int colIndex = fieldIndexMap.get(fieldList.get(i));
boolean ok = false;
for (int j = 0; j < records.size() && !ok; ++j) {
Column column = records.get(j).getColumn(colIndex);
Column.Type type = column.getType();
switch (type) {
case LONG:
case DOUBLE:
case DATE:
case BOOL:
case BYTES:
if (column.getRawData() != null) {
fieldTypes.add(type);
ok = true;
}
break;
case STRING:
// 只有非null且非空的String列才会被真的当作String类型
String value = column.asString();
if (value != null && !"".equals(value)) {
fieldTypes.add(type);
ok = true;
}
break;
default:
throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString());
}
}
if (!ok) {
// 根据采样的%d条数据无法推断第%d列的数据类型
throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, String.format(Msg.get("infer_column_type_error"), records.size(), i + 1));
}
}
LOG.info("Field Types: {}", fieldTypes);
}
/**
* 执行单表批量写入
*
* @param tableName
* @throws SQLException
*/
private void executeBatch(String tableName) throws SQLException {
// 表名
stmt.setTableName(tableName);
List<Record> records = buf.get(tableName);
// 标签
String[] tagValues = tableTagValues.get(tableName);
LOG.debug("executeBatch {}", String.join(",", tagValues));
for (int i = 0; i < tagValues.length; ++i) {
stmt.setTagNString(i, tagValues[i]);
}
// 时间戳
ArrayList<Long> tsList = records.stream().map(r -> r.getColumn(tsColIndex).asDate().getTime()).collect(Collectors.toCollection(ArrayList::new));
stmt.setTimestamp(0, tsList);
// 字段
for (int i = 0; i < fieldList.size(); ) {
String fieldName = fieldList.get(i);
int index = fieldIndexMap.get(fieldName);
switch (fieldTypes.get(i)) {
case LONG:
ArrayList<Long> lisLong = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new));
stmt.setLong(++i, lisLong);
break;
case DOUBLE:
ArrayList<Double> lisDouble = records.stream().map(r -> r.getColumn(index).asDouble()).collect(Collectors.toCollection(ArrayList::new));
stmt.setDouble(++i, lisDouble);
break;
case STRING:
ArrayList<String> lisString = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new));
stmt.setNString(++i, lisString, 64);
break;
case DATE:
ArrayList<Long> lisTs = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new));
stmt.setTimestamp(++i, lisTs);
break;
case BOOL:
ArrayList<Boolean> lisBool = records.stream().map(r -> r.getColumn(index).asBoolean()).collect(Collectors.toCollection(ArrayList::new));
stmt.setBoolean(++i, lisBool);
break;
case BYTES:
ArrayList<String> lisBytes = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new));
stmt.setString(++i, lisBytes, 64);
break;
default:
throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString());
}
}
// 执行
stmt.columnDataAddBatch();
stmt.columnDataExecuteBatch();
// 更新计数器
sucCount += records.size();
}
/**
* 把缓存的Record全部写入
*/
public void flush() throws SQLException {
if (fieldTypes == null) {
List<Record> records = new ArrayList<>();
for (List<Record> lis : buf.values()) {
records.addAll(lis);
if (records.size() > 100) {
break;
}
}
if (records.size() > 0) {
initFiledTypesAndTargetTable(records);
} else {
return;
}
}
for (String tabName : buf.keySet()) {
if (buf.get(tabName).size() > 0) {
executeBatch(tabName);
}
}
stmt.columnDataCloseBatch();
}
/**
* @return 成功写入的数据量
*/
public long getCount() {
return sucCount;
}
}

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer;
package com.alibaba.datax.plugin.writer.tdenginewriter;
import java.util.Properties;

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer;
package com.alibaba.datax.plugin.writer.tdenginewriter;
public class Key {
public static final String HOST = "host";
@ -7,5 +7,8 @@ public class Key {
public static final String USER = "user";
public static final String PASSWORD = "password";
public static final String BATCH_SIZE = "batchSize";
public static final String STABLE = "stable";
public static final String TAG_COLUMN = "tagColumn";
public static final String FIELD_COLUMN = "fieldColumn";
public static final String TIMESTAMP_COLUMN = "timestampColumn";
}

View File

@ -0,0 +1,20 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import java.util.Locale;
import java.util.ResourceBundle;
/**
* i18n message util
*/
public class Msg {
private static ResourceBundle bundle;
static {
bundle = ResourceBundle.getBundle("tdenginewritermsg", Locale.getDefault());
}
public static String get(String key) {
return bundle.getString(key);
}
}

View File

@ -1,9 +1,10 @@
package com.alibaba.datax.plugin.writer;
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -14,7 +15,7 @@ public class OpentsdbDataHandler implements DataHandler {
private static final String DEFAULT_BATCH_SIZE = "1";
@Override
public long handle(RecordReceiver lineReceiver, Properties properties) {
public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) {
// opentsdb json protocol use JNI and schemaless API to write
String host = properties.getProperty(Key.HOST);
int port = Integer.parseInt(properties.getProperty(Key.PORT));

View File

@ -0,0 +1,271 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;
import java.util.stream.Collectors;
public class SchemaManager {
private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class);
private String stable; // 目标超表名
private Map<String, String> fixedTagValue = new HashMap<>(); // 固定标签值 标签名 -> 标签值
private Map<String, Integer> tagIndexMap = new HashMap<>(); // 动态标签值 标签名 -> 列索引
private Map<String, Integer> fieldIndexMap = new HashMap<>(); // 字段名 -> 字段索引
private String tsColName; // 时间戳列名
private int tsColIndex = -1; // 时间戳列索引
private List<String> fieldList = new ArrayList<>();
private List<String> tagList = new ArrayList<>();
private boolean canInferSchemaFromConfig = false;
public SchemaManager() {
}
public SchemaManager(Properties properties) {
getFromConfig(properties);
}
private String mapDataxType(Column.Type type) {
switch (type) {
case LONG:
return "BIGINT";
case DOUBLE:
return "DOUBLE";
case STRING:
return "NCHAR(64)";
case DATE:
return "TIMESTAMP";
case BOOL:
return "BOOL";
case BYTES:
return "BINARY(64)";
default:
throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, type.toString());
}
}
public void setStable(String stable) {
stable = stable;
}
public String getStable() {
return stable;
}
private void getFromConfig(Properties properties) {
stable = properties.getProperty(Key.STABLE);
if (stable == null) {
LOG.error("Config error: no stable");
return;
}
for (Object key : properties.keySet()) {
String k = (String) key;
String v = properties.getProperty(k);
String[] ps = k.split("\\.");
if (ps.length == 1) {
continue;
}
if (k.startsWith(Key.TAG_COLUMN)) {
String tagName = ps[1];
try {
Integer tagIndex = Integer.parseInt(v);
this.tagIndexMap.put(tagName, tagIndex);
tagList.add(tagName);
} catch (NumberFormatException e) {
fixedTagValue.put(tagName, v);
tagList.add(tagName);
}
} else if (k.startsWith(Key.FIELD_COLUMN)) {
String fieldName = ps[1];
Integer fileIndex = Integer.parseInt(v);
fieldIndexMap.put(fieldName, fileIndex);
} else if (k.startsWith(Key.TIMESTAMP_COLUMN)) {
tsColName = ps[1];
tsColIndex = Integer.parseInt(v);
}
}
List<String> sortedFieldName = fieldIndexMap.entrySet().stream().sorted((x, y) -> x.getValue().compareTo(y.getValue())).map(e -> e.getKey()).collect(Collectors.toList());
fieldList.addAll(sortedFieldName); // 排序的目的是保证自动建表时列的顺序和输入数据的列的顺序保持一致
canInferSchemaFromConfig = tsColIndex > -1 && !(fixedTagValue.isEmpty() && tagIndexMap.isEmpty()) && !fieldIndexMap.isEmpty();
LOG.info("Config file parsed resultfixedTags=[{}] ,tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", fixedTagValue.keySet()), String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex);
}
public boolean shouldGuessSchema() {
return !canInferSchemaFromConfig;
}
public boolean shouldCreateTable() {
return canInferSchemaFromConfig;
}
public boolean configValid() {
boolean valid = (tagList.size() > 0 && fieldList.size() > 0 && tsColIndex > -1) || (tagList.size() == 0 && fieldList.size() == 0 && tsColIndex == -1);
if (!valid) {
LOG.error("Config error: tagColumn, fieldColumn and timestampColumn must be present together or absent together.");
}
return valid;
}
/**
* 通过执行`describe dbname.stable`命令获取表的schema.<br/>
* describe命名返回有4列内容分布是Field,Type,Length,Note<br/>
*
* @return 成功返回true如果超表不存在或其他错误则返回false
*/
public boolean getFromDB(Connection conn) {
try {
List<String> stables = getSTables(conn);
if (!stables.contains(stable)) {
LOG.error("super table {} not exist fail to get schema from database.", stable);
return false;
}
} catch (SQLException e) {
LOG.error(e.getMessage());
e.printStackTrace();
return false;
}
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("describe " + stable);
int colIndex = 0;
while (rs.next()) {
String name = rs.getString(1);
String type = rs.getString(2);
String note = rs.getString(4);
if ("TIMESTAMP".equals(type)) {
tsColName = name;
tsColIndex = colIndex;
} else if ("TAG".equals(note)) {
tagIndexMap.put(name, colIndex);
tagList.add(name);
} else {
fieldIndexMap.put(name, colIndex);
fieldList.add(name);
}
colIndex++;
}
LOG.info("table infotags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex);
return true;
} catch (SQLException e) {
LOG.error(e.getMessage());
e.printStackTrace();
return false;
}
}
public static List<String> getSTables(Connection conn) throws SQLException {
List<String> stables = new ArrayList<>();
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("show stables");
while (rs.next()) {
String name = rs.getString(1);
stables.add(name);
}
}
return stables;
}
public void createSTable(Connection conn, List<Column.Type> fieldTypes) throws SQLException {
StringBuilder sb = new StringBuilder();
sb.append("CREATE STABLE IF NOT EXISTS ").append(stable).append("(");
sb.append(tsColName).append(" ").append("TIMESTAMP,");
for (int i = 0; i < fieldList.size(); ++i) {
String fieldName = fieldList.get(i);
Column.Type dxType = fieldTypes.get(i);
sb.append(fieldName).append(' ');
String tdType = mapDataxType(dxType);
sb.append(tdType).append(',');
}
sb.deleteCharAt(sb.length() - 1);
sb.append(") TAGS(");
for (String tagName : tagList) {
sb.append(tagName).append(" NCHAR(64),");
}
sb.deleteCharAt(sb.length() - 1);
sb.append(")");
String q = sb.toString();
LOG.info("run sql" + q);
try (Statement stmt = conn.createStatement()) {
stmt.execute(q);
}
}
public String[] getTagValuesFromRecord(Record record) {
String[] tagValues = new String[tagList.size()];
for (int i = 0; i < tagList.size(); ++i) {
if (fixedTagValue.containsKey(tagList.get(i))) {
tagValues[i] = fixedTagValue.get(tagList.get(i));
} else {
int tagIndex = tagIndexMap.get(tagList.get(i));
tagValues[i] = record.getColumn(tagIndex).asString();
}
if (tagValues[i] == null) {
return null;
}
}
return tagValues;
}
public boolean hasTimestamp(Record record) {
Column column = record.getColumn(tsColIndex);
if (column.getType() == Column.Type.DATE && column.asDate() != null) {
return true;
} else {
return false;
}
}
public Map<String, Integer> getFieldIndexMap() {
return fieldIndexMap;
}
public List<String> getFieldList() {
return fieldList;
}
public String getJoinedFieldNames() {
return tsColName + ", " + String.join(", ", fieldList);
}
public int getTsColIndex() {
return tsColIndex;
}
public String getTagValuesPlaceHolder() {
return tagList.stream().map(x -> "?").collect(Collectors.joining(","));
}
public String getFieldValuesPlaceHolder() {
return "?, " + fieldList.stream().map(x -> "?").collect(Collectors.joining(", "));
}
/**
* 计算子表表名
* <ol>
* <li>将标签的value 组合成为如下的字符串: tag_value1!tag_value2!tag_value3</li>
* <li>计算该字符串的 MD5 散列值 "md5_val"</li>
* <li>"t_md5val"作为子表名其中的 "t" 是固定的前缀</li>
* </ol>
*
* @param tagValues
* @return
*/
public String computeTableName(String[] tagValues) {
String s = String.join("!", tagValues);
return "t_" + DigestUtils.md5Hex(s);
}
public int getDynamicTagCount() {
return tagIndexMap.size();
}
}

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer;
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.plugin.RecordReceiver;
@ -64,11 +64,17 @@ public class TDengineWriter extends Writer {
String value = this.writerSliceConfig.getString(key);
properties.setProperty(key, value);
}
if (!keys.contains(Key.USER)) {
properties.setProperty(Key.USER, "root");
}
if (!keys.contains(Key.PASSWORD)) {
properties.setProperty(Key.PASSWORD, "taosdata");
}
LOG.debug("========================properties==========================\n" + properties.toString());
String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME);
LOG.debug("start to handle record from: " + peerPluginName);
DataHandler handler = DataHandlerFactory.build(peerPluginName);
long records = handler.handle(lineReceiver, properties);
long records = handler.handle(lineReceiver, properties, getTaskPluginCollector());
LOG.debug("handle data finished, records: " + records);
}

View File

@ -1,9 +1,10 @@
package com.alibaba.datax.plugin.writer;
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.spi.ErrorCode;
public enum TDengineWriterErrorCode implements ErrorCode {
RUNTIME_EXCEPTION("TDengineWriter-00", "运行时异常");
RUNTIME_EXCEPTION("TDengineWriter-00", "运行时异常"),
TYPE_ERROR("TDengineWriter-00", "Datax类型无法正确映射到TDengine类型");
private final String code;
private final String description;

View File

@ -1,9 +1,9 @@
{
"name": "tdenginewriter",
"class": "com.alibaba.datax.plugin.writer.TDengineWriter",
"class": "com.alibaba.datax.plugin.writer.tdenginewriter.TDengineWriter",
"description": {
"useScene": "data migration to tdengine",
"mechanism": "use JNI to write data to tdengine."
"mechanism": "use JNI or taos-jdbc to write data to tdengine."
},
"developer": "zyyang-taosdata"
}

View File

@ -0,0 +1,6 @@
try_get_schema_fromdb=fail to get structure info of target table from configure file and will try to get it from database
batch_size_too_small='batchSize' is too small, please increase it and try again
column_number_error=number of columns is less than expected
tag_value_error=tag columns include 'null' value
ts_value_error=timestamp column type error or null
infer_column_type_error=fail to infer column type: sample count %d, column index %d

View File

@ -0,0 +1,6 @@
try_get_schema_fromdb=fail to get structure info of target table from configure file and will try to get it from database
batch_size_too_small='batchSize' is too small, please increase it and try again
column_number_error=number of columns is less than expected
tag_value_error=tag columns include 'null' value
ts_value_error=timestamp column type error or null
infer_column_type_error=fail to infer column type: sample count %d, column index %d

View File

@ -0,0 +1,6 @@
try_get_schema_fromdb=\u65e0\u6cd5\u4ece\u914d\u7f6e\u6587\u4ef6\u83b7\u53d6\u8868\u7ed3\u6784\u4fe1\u606f\uff0c\u5c1d\u8bd5\u4ece\u6570\u636e\u5e93\u83b7\u53d6
batch_size_too_small=batchSize\u592a\u5c0f\uff0c\u4f1a\u589e\u52a0\u81ea\u52a8\u7c7b\u578b\u63a8\u65ad\u9519\u8bef\u7684\u6982\u7387\uff0c\u5efa\u8bae\u6539\u5927\u540e\u91cd\u8bd5
column_number_error=\u5b9e\u9645\u5217\u6570\u5c0f\u4e8e\u671f\u671b\u5217\u6570
tag_value_error=\u6807\u7b7e\u5217\u5305\u542bnull
ts_value_error=\u65f6\u95f4\u6233\u5217\u4e3anull\u6216\u7c7b\u578b\u9519\u8bef
infer_column_type_error=\u6839\u636e\u91c7\u6837\u7684%d\u6761\u6570\u636e\uff0c\u65e0\u6cd5\u63a8\u65ad\u7b2c%d\u5217\u7684\u6570\u636e\u7c7b\u578b

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer;
package com.alibaba.datax.plugin.writer.tdenginewriter;
import org.junit.Test;

View File

@ -0,0 +1,25 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import org.junit.Test;
import java.util.Locale;
import java.util.ResourceBundle;
import org.junit.Assert;
public class MessageTest {
@Test
public void testChineseMessage() {
Locale local = new Locale("zh", "CN");
ResourceBundle bundle = ResourceBundle.getBundle("tdenginewritermsg", local);
String msg = bundle.getString("try_get_schema_fromdb");
Assert.assertEquals("无法从配置文件获取表结构信息,尝试从数据库获取", msg);
}
@Test
public void testDefaultMessage() {
ResourceBundle bundle = ResourceBundle.getBundle("tdenginewritermsg", Locale.getDefault());
String msg = bundle.getString("try_get_schema_fromdb");
System.out.println(msg);
}
}

View File

@ -0,0 +1,31 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class TDengineWriterTest {
@Test
public void testGetSchema() throws ClassNotFoundException, SQLException {
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", "wozai.fun", "6030", "test", "root", "taosdata");
Connection conn = DriverManager.getConnection(jdbcUrl);
SchemaManager schemaManager = new SchemaManager();
schemaManager.setStable("test1");
schemaManager.getFromDB(conn);
}
@Test
public void dropTestTable() throws ClassNotFoundException, SQLException {
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", "wozai.fun", "6030", "test", "root", "taosdata");
Connection conn = DriverManager.getConnection(jdbcUrl);
Statement stmt = conn.createStatement();
stmt.execute("drop table market_snapshot");
}
}