mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-03 03:59:07 +08:00
Merge pull request #2 from taosdata/feature/TD-10698
[TD-10698]: tdenginewriter support batch write
This commit is contained in:
commit
66ab68c00c
@ -20,7 +20,8 @@
|
|||||||
"port": 6030,
|
"port": 6030,
|
||||||
"dbname": "test",
|
"dbname": "test",
|
||||||
"user": "root",
|
"user": "root",
|
||||||
"password": "taosdata"
|
"password": "taosdata",
|
||||||
|
"batchSize": 1000
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
151
tdenginewriter/doc/tdenginewriter.md
Normal file
151
tdenginewriter/doc/tdenginewriter.md
Normal file
@ -0,0 +1,151 @@
|
|||||||
|
# DataX TDengineWriter
|
||||||
|
|
||||||
|
## 1 快速介绍
|
||||||
|
|
||||||
|
TDengineWriter 插件实现了写入数据到 TDengine 的功能。 在底层实现上, TDengineWriter 通过 JNI的方式调用libtaos.so/tao.dll中的方法,连接 TDengine
|
||||||
|
数据库实例,并执行schemaless的写入。 TDengineWriter 面向ETL开发工程师,他们使用 TDengineWriter 从数仓导入数据到 TDengine。同时,TDengineWriter
|
||||||
|
亦可以作为数据迁移工具为DBA等用户提供服务。
|
||||||
|
|
||||||
|
## 2 实现原理
|
||||||
|
|
||||||
|
TDengineWriter 通过 DataX 框架获取 Reader
|
||||||
|
生成的协议数据,根据reader的类型解析数据,通过JNI方式调用libtaos.so(或taos.dll)中的方法,使用schemaless的方式写入到TDengine。
|
||||||
|
|
||||||
|
## 3 功能说明
|
||||||
|
|
||||||
|
### 3.1 配置样例
|
||||||
|
|
||||||
|
* 这里使用一份从OpenTSDB产生到 TDengine 导入的数据。
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "opentsdbreader",
|
||||||
|
"parameter": {
|
||||||
|
"endpoint": "http://192.168.1.180:4242",
|
||||||
|
"column": [
|
||||||
|
"weather_temperature"
|
||||||
|
],
|
||||||
|
"beginDateTime": "2021-01-01 00:00:00",
|
||||||
|
"endDateTime": "2021-01-01 01:00:00"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "tdenginewriter",
|
||||||
|
"parameter": {
|
||||||
|
"host": "192.168.1.180",
|
||||||
|
"port": 6030,
|
||||||
|
"dbname": "test",
|
||||||
|
"user": "root",
|
||||||
|
"password": "taosdata"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3.2 参数说明
|
||||||
|
|
||||||
|
* **host**
|
||||||
|
* 描述:TDengine实例的host。
|
||||||
|
|
||||||
|
* 必选:是 <br />
|
||||||
|
|
||||||
|
* 默认值:无 <br />
|
||||||
|
* **port**
|
||||||
|
* 描述:TDengine实例的port。
|
||||||
|
* 必选:是 <br />
|
||||||
|
* 默认值:无 <br />
|
||||||
|
* **dbname**
|
||||||
|
* 描述:目的数据库的名称。
|
||||||
|
|
||||||
|
* 必选:是 <br />
|
||||||
|
|
||||||
|
* 默认值:无 <br />
|
||||||
|
* **username**
|
||||||
|
* 描述:TDengine实例的用户名 <br />
|
||||||
|
* 必选:是 <br />
|
||||||
|
* 默认值:无 <br />
|
||||||
|
* **password**
|
||||||
|
* 描述:TDengine实例的密码 <br />
|
||||||
|
* 必选:是 <br />
|
||||||
|
* 默认值:无 <br />
|
||||||
|
|
||||||
|
### 3.3 类型转换
|
||||||
|
|
||||||
|
目前,由于opentsdbreader将opentsdb的数据统一读取为json字符串,TDengineWriter 在做Opentsdb到TDengine的迁移时,按照以下类型进行处理:
|
||||||
|
|
||||||
|
| OpenTSDB数据类型 | DataX 内部类型| TDengine 数据类型 |
|
||||||
|
| -------- | ----- | -------- |
|
||||||
|
| timestamp | Date | timestamp |
|
||||||
|
| Integer(value) | Double | double |
|
||||||
|
| Float(value) | Double | double |
|
||||||
|
| String(value) | String | binary |
|
||||||
|
| Integer(tag) | String | binary |
|
||||||
|
| Float(tag) | String |binary |
|
||||||
|
| String(tag) | String |binary |
|
||||||
|
|
||||||
|
## 4 性能报告
|
||||||
|
|
||||||
|
### 4.1 环境准备
|
||||||
|
|
||||||
|
#### 4.1.1 数据特征
|
||||||
|
|
||||||
|
建表语句:
|
||||||
|
|
||||||
|
单行记录类似于:
|
||||||
|
|
||||||
|
#### 4.1.2 机器参数
|
||||||
|
|
||||||
|
* 执行DataX的机器参数为:
|
||||||
|
1. cpu:
|
||||||
|
2. mem:
|
||||||
|
3. net: 千兆双网卡
|
||||||
|
4. disc: DataX 数据不落磁盘,不统计此项
|
||||||
|
|
||||||
|
* TDengine数据库机器参数为:
|
||||||
|
1. cpu:
|
||||||
|
2. mem:
|
||||||
|
3. net: 千兆双网卡
|
||||||
|
4. disc:
|
||||||
|
|
||||||
|
#### 4.1.3 DataX jvm 参数
|
||||||
|
|
||||||
|
-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
|
||||||
|
|
||||||
|
### 4.2 测试报告
|
||||||
|
|
||||||
|
#### 4.2.1 单表测试报告
|
||||||
|
|
||||||
|
| 通道数| DataX速度(Rec/s)|DataX流量(MB/s)| DataX机器网卡流出流量(MB/s)|DataX机器运行负载|DB网卡进入流量(MB/s)|DB运行负载|DB TPS|
|
||||||
|
|--------| --------|--------|--------|--------|--------|--------|--------|
|
||||||
|
|1| | | | | | | |
|
||||||
|
|4| | | | | | | |
|
||||||
|
|8| | | | | | | |
|
||||||
|
|16| | | | | | | |
|
||||||
|
|32| | | | | | | |
|
||||||
|
|
||||||
|
说明:
|
||||||
|
|
||||||
|
1. 这里的单表,主键类型为 bigint(20),自增。
|
||||||
|
2. batchSize 和 通道个数,对性能影响较大。
|
||||||
|
3. 16通道,4096批量提交时,出现 full gc 2次。
|
||||||
|
|
||||||
|
#### 4.2.4 性能测试小结
|
||||||
|
|
||||||
|
1.
|
||||||
|
2.
|
||||||
|
|
||||||
|
## 5 约束限制
|
||||||
|
|
||||||
|
## FAQ
|
@ -18,11 +18,6 @@ public class JniConnection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public JniConnection(Properties props) throws Exception {
|
public JniConnection(Properties props) throws Exception {
|
||||||
if (this.conn != JNI_NULL_POINTER) {
|
|
||||||
close();
|
|
||||||
this.conn = JNI_NULL_POINTER;
|
|
||||||
}
|
|
||||||
|
|
||||||
initImp(props.getProperty(PROPERTY_KEY_CONFIG_DIR, null));
|
initImp(props.getProperty(PROPERTY_KEY_CONFIG_DIR, null));
|
||||||
|
|
||||||
String locale = props.getProperty(PROPERTY_KEY_LOCALE);
|
String locale = props.getProperty(PROPERTY_KEY_LOCALE);
|
||||||
|
@ -0,0 +1,11 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer;
|
||||||
|
|
||||||
|
public class Key {
|
||||||
|
public static final String HOST = "host";
|
||||||
|
public static final String PORT = "port";
|
||||||
|
public static final String DBNAME = "dbname";
|
||||||
|
public static final String USER = "user";
|
||||||
|
public static final String PASSWORD = "password";
|
||||||
|
public static final String BATCH_SIZE = "batchSize";
|
||||||
|
|
||||||
|
}
|
@ -10,19 +10,12 @@ import com.alibaba.datax.common.util.Configuration;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
public class TDengineWriter extends Writer {
|
public class TDengineWriter extends Writer {
|
||||||
|
|
||||||
private static final String HOST = "host";
|
|
||||||
private static final String PORT = "port";
|
|
||||||
private static final String DBNAME = "dbname";
|
|
||||||
private static final String USER = "user";
|
|
||||||
private static final String PASSWORD = "password";
|
|
||||||
private static final String PEER_PLUGIN_NAME = "peerPluginName";
|
private static final String PEER_PLUGIN_NAME = "peerPluginName";
|
||||||
|
private static final String DEFAULT_BATCH_SIZE = "1";
|
||||||
|
|
||||||
public static class Job extends Writer.Job {
|
public static class Job extends Writer.Job {
|
||||||
|
|
||||||
@ -45,7 +38,6 @@ public class TDengineWriter extends Writer {
|
|||||||
for (int i = 0; i < mandatoryNumber; i++) {
|
for (int i = 0; i < mandatoryNumber; i++) {
|
||||||
writerSplitConfigs.add(this.originalConfig);
|
writerSplitConfigs.add(this.originalConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
return writerSplitConfigs;
|
return writerSplitConfigs;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -53,7 +45,6 @@ public class TDengineWriter extends Writer {
|
|||||||
public static class Task extends Writer.Task {
|
public static class Task extends Writer.Task {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
|
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
|
||||||
|
|
||||||
private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");
|
|
||||||
private Configuration writerSliceConfig;
|
private Configuration writerSliceConfig;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -68,53 +59,71 @@ public class TDengineWriter extends Writer {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void startWrite(RecordReceiver lineReceiver) {
|
public void startWrite(RecordReceiver lineReceiver) {
|
||||||
|
Set<String> keys = this.writerSliceConfig.getKeys();
|
||||||
|
|
||||||
String host = this.writerSliceConfig.getString(HOST);
|
|
||||||
int port = this.writerSliceConfig.getInt(PORT);
|
|
||||||
String dbname = this.writerSliceConfig.getString(DBNAME);
|
|
||||||
String user = this.writerSliceConfig.getString(USER);
|
|
||||||
String password = this.writerSliceConfig.getString(PASSWORD);
|
|
||||||
|
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
String cfgdir = this.writerSliceConfig.getString(JniConnection.PROPERTY_KEY_CONFIG_DIR);
|
for (String key : keys) {
|
||||||
if (cfgdir != null && !cfgdir.isEmpty()) {
|
String value = this.writerSliceConfig.getString(key);
|
||||||
properties.setProperty(JniConnection.PROPERTY_KEY_CONFIG_DIR, cfgdir);
|
properties.setProperty(key, value);
|
||||||
}
|
|
||||||
String timezone = this.writerSliceConfig.getString(JniConnection.PROPERTY_KEY_TIME_ZONE);
|
|
||||||
if (timezone != null && !timezone.isEmpty()) {
|
|
||||||
properties.setProperty(JniConnection.PROPERTY_KEY_TIME_ZONE, timezone);
|
|
||||||
}
|
|
||||||
String locale = this.writerSliceConfig.getString(JniConnection.PROPERTY_KEY_LOCALE);
|
|
||||||
if (locale != null && !locale.isEmpty()) {
|
|
||||||
properties.setProperty(JniConnection.PROPERTY_KEY_LOCALE, locale);
|
|
||||||
}
|
|
||||||
String charset = this.writerSliceConfig.getString(JniConnection.PROPERTY_KEY_CHARSET);
|
|
||||||
if (charset != null && !charset.isEmpty()) {
|
|
||||||
properties.setProperty(JniConnection.PROPERTY_KEY_CHARSET, charset);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME);
|
String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME);
|
||||||
if (peerPluginName.equals("opentsdbreader")) {
|
if (peerPluginName.equals("opentsdbreader")) {
|
||||||
|
// opentsdb json protocol use JNI and schemaless API to write
|
||||||
|
String host = properties.getProperty(Key.HOST);
|
||||||
|
int port = Integer.parseInt(properties.getProperty(Key.PORT));
|
||||||
|
String dbname = properties.getProperty(Key.DBNAME);
|
||||||
|
String user = properties.getProperty(Key.USER);
|
||||||
|
String password = properties.getProperty(Key.PASSWORD);
|
||||||
|
|
||||||
|
JniConnection conn = null;
|
||||||
try {
|
try {
|
||||||
JniConnection conn = new JniConnection(properties);
|
conn = new JniConnection(properties);
|
||||||
conn.open(host, port, dbname, user, password);
|
conn.open(host, port, dbname, user, password);
|
||||||
LOG.info("TDengine connection established, host: " + host + ", port: " + port + ", dbname: " + dbname + ", user: " + user);
|
LOG.info("TDengine connection established, host: " + host + ", port: " + port + ", dbname: " + dbname + ", user: " + user);
|
||||||
writeOpentsdb(lineReceiver, conn);
|
int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, DEFAULT_BATCH_SIZE));
|
||||||
conn.close();
|
writeOpentsdb(lineReceiver, conn, batchSize);
|
||||||
LOG.info("TDengine connection closed");
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error(e.getMessage());
|
LOG.error(e.getMessage());
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
if (conn != null)
|
||||||
|
conn.close();
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
LOG.info("TDengine connection closed");
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// other
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeOpentsdb(RecordReceiver lineReceiver, JniConnection conn) {
|
private void writeOpentsdb(RecordReceiver lineReceiver, JniConnection conn, int batchSize) {
|
||||||
try {
|
try {
|
||||||
Record record;
|
Record record;
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
long recordIndex = 1;
|
||||||
while ((record = lineReceiver.getFromReader()) != null) {
|
while ((record = lineReceiver.getFromReader()) != null) {
|
||||||
String jsonData = recordToString(record);
|
if (batchSize == 1) {
|
||||||
|
String jsonData = recordToString(record);
|
||||||
|
LOG.debug(">>> " + jsonData);
|
||||||
|
conn.insertOpentsdbJson(jsonData);
|
||||||
|
} else if (recordIndex % batchSize == 1) {
|
||||||
|
sb.append("[").append(recordToString(record)).append(",");
|
||||||
|
} else if (recordIndex % batchSize == 0) {
|
||||||
|
sb.append(recordToString(record)).append("]");
|
||||||
|
String jsonData = sb.toString();
|
||||||
|
LOG.debug(">>> " + jsonData);
|
||||||
|
conn.insertOpentsdbJson(jsonData);
|
||||||
|
sb.delete(0, sb.length());
|
||||||
|
} else {
|
||||||
|
sb.append(recordToString(record)).append(",");
|
||||||
|
}
|
||||||
|
recordIndex++;
|
||||||
|
}
|
||||||
|
if (sb.length() != 0 && sb.charAt(0) == '[') {
|
||||||
|
String jsonData = sb.deleteCharAt(sb.length() - 1).append("]").toString();
|
||||||
LOG.debug(">>> " + jsonData);
|
LOG.debug(">>> " + jsonData);
|
||||||
conn.insertOpentsdbJson(jsonData);
|
conn.insertOpentsdbJson(jsonData);
|
||||||
}
|
}
|
||||||
@ -127,7 +136,7 @@ public class TDengineWriter extends Writer {
|
|||||||
private String recordToString(Record record) {
|
private String recordToString(Record record) {
|
||||||
int recordLength = record.getColumnNumber();
|
int recordLength = record.getColumnNumber();
|
||||||
if (0 == recordLength) {
|
if (0 == recordLength) {
|
||||||
return NEWLINE_FLAG;
|
return "";
|
||||||
}
|
}
|
||||||
Column column;
|
Column column;
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
@ -136,7 +145,6 @@ public class TDengineWriter extends Writer {
|
|||||||
sb.append(column.asString()).append("\t");
|
sb.append(column.asString()).append("\t");
|
||||||
}
|
}
|
||||||
sb.setLength(sb.length() - 1);
|
sb.setLength(sb.length() - 1);
|
||||||
sb.append(NEWLINE_FLAG);
|
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user