diff --git a/core/src/main/job/opentsdb2tdengine.json b/core/src/main/job/opentsdb2tdengine.json
index dd332202..377b98c9 100644
--- a/core/src/main/job/opentsdb2tdengine.json
+++ b/core/src/main/job/opentsdb2tdengine.json
@@ -20,7 +20,8 @@
"port": 6030,
"dbname": "test",
"user": "root",
- "password": "taosdata"
+ "password": "taosdata",
+ "batchSize": 1000
}
}
}
diff --git a/tdenginewriter/doc/tdenginewriter.md b/tdenginewriter/doc/tdenginewriter.md
new file mode 100644
index 00000000..8e55b189
--- /dev/null
+++ b/tdenginewriter/doc/tdenginewriter.md
@@ -0,0 +1,151 @@
+# DataX TDengineWriter
+
+## 1 快速介绍
+
+TDengineWriter 插件实现了写入数据到 TDengine 的功能。 在底层实现上, TDengineWriter 通过 JNI的方式调用libtaos.so/tao.dll中的方法,连接 TDengine
+数据库实例,并执行schemaless的写入。 TDengineWriter 面向ETL开发工程师,他们使用 TDengineWriter 从数仓导入数据到 TDengine。同时,TDengineWriter
+亦可以作为数据迁移工具为DBA等用户提供服务。
+
+## 2 实现原理
+
+TDengineWriter 通过 DataX 框架获取 Reader
+生成的协议数据,根据reader的类型解析数据,通过JNI方式调用libtaos.so(或taos.dll)中的方法,使用schemaless的方式写入到TDengine。
+
+## 3 功能说明
+
+### 3.1 配置样例
+
+* 这里使用一份从OpenTSDB产生到 TDengine 导入的数据。
+
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "opentsdbreader",
+ "parameter": {
+ "endpoint": "http://192.168.1.180:4242",
+ "column": [
+ "weather_temperature"
+ ],
+ "beginDateTime": "2021-01-01 00:00:00",
+ "endDateTime": "2021-01-01 01:00:00"
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "host": "192.168.1.180",
+ "port": 6030,
+ "dbname": "test",
+ "user": "root",
+ "password": "taosdata"
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
+```
+
+### 3.2 参数说明
+
+* **host**
+ * 描述:TDengine实例的host。
+
+ * 必选:是
+
+ * 默认值:无
+* **port**
+ * 描述:TDengine实例的port。
+ * 必选:是
+ * 默认值:无
+* **dbname**
+ * 描述:目的数据库的名称。
+
+ * 必选:是
+
+ * 默认值:无
+* **username**
+ * 描述:TDengine实例的用户名
+ * 必选:是
+ * 默认值:无
+* **password**
+ * 描述:TDengine实例的密码
+ * 必选:是
+ * 默认值:无
+
+### 3.3 类型转换
+
+目前,由于opentsdbreader将opentsdb的数据统一读取为json字符串,TDengineWriter 在做Opentsdb到TDengine的迁移时,按照以下类型进行处理:
+
+| OpenTSDB数据类型 | DataX 内部类型| TDengine 数据类型 |
+| -------- | ----- | -------- |
+| timestamp | Date | timestamp |
+| Integer(value) | Double | double |
+| Float(value) | Double | double |
+| String(value) | String | binary |
+| Integer(tag) | String | binary |
+| Float(tag) | String |binary |
+| String(tag) | String |binary |
+
+## 4 性能报告
+
+### 4.1 环境准备
+
+#### 4.1.1 数据特征
+
+建表语句:
+
+单行记录类似于:
+
+#### 4.1.2 机器参数
+
+* 执行DataX的机器参数为:
+ 1. cpu:
+ 2. mem:
+ 3. net: 千兆双网卡
+ 4. disc: DataX 数据不落磁盘,不统计此项
+
+* TDengine数据库机器参数为:
+ 1. cpu:
+ 2. mem:
+ 3. net: 千兆双网卡
+ 4. disc:
+
+#### 4.1.3 DataX jvm 参数
+
+ -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
+
+### 4.2 测试报告
+
+#### 4.2.1 单表测试报告
+
+| 通道数| DataX速度(Rec/s)|DataX流量(MB/s)| DataX机器网卡流出流量(MB/s)|DataX机器运行负载|DB网卡进入流量(MB/s)|DB运行负载|DB TPS|
+|--------| --------|--------|--------|--------|--------|--------|--------|
+|1| | | | | | | |
+|4| | | | | | | |
+|8| | | | | | | |
+|16| | | | | | | |
+|32| | | | | | | |
+
+说明:
+
+1. 这里的单表,主键类型为 bigint(20),自增。
+2. batchSize 和 通道个数,对性能影响较大。
+3. 16通道,4096批量提交时,出现 full gc 2次。
+
+#### 4.2.4 性能测试小结
+
+1.
+2.
+
+## 5 约束限制
+
+## FAQ
\ No newline at end of file
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java
index a8739c54..b1670633 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java
@@ -18,11 +18,6 @@ public class JniConnection {
}
public JniConnection(Properties props) throws Exception {
- if (this.conn != JNI_NULL_POINTER) {
- close();
- this.conn = JNI_NULL_POINTER;
- }
-
initImp(props.getProperty(PROPERTY_KEY_CONFIG_DIR, null));
String locale = props.getProperty(PROPERTY_KEY_LOCALE);
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/Key.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/Key.java
new file mode 100644
index 00000000..b240bce4
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/Key.java
@@ -0,0 +1,11 @@
+package com.alibaba.datax.plugin.writer;
+
+public class Key {
+ public static final String HOST = "host";
+ public static final String PORT = "port";
+ public static final String DBNAME = "dbname";
+ public static final String USER = "user";
+ public static final String PASSWORD = "password";
+ public static final String BATCH_SIZE = "batchSize";
+
+}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java
index 28fa4ca5..9dc42d9d 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java
@@ -10,19 +10,12 @@ import com.alibaba.datax.common.util.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.Properties;
+import java.util.*;
public class TDengineWriter extends Writer {
- private static final String HOST = "host";
- private static final String PORT = "port";
- private static final String DBNAME = "dbname";
- private static final String USER = "user";
- private static final String PASSWORD = "password";
private static final String PEER_PLUGIN_NAME = "peerPluginName";
+ private static final String DEFAULT_BATCH_SIZE = "1";
public static class Job extends Writer.Job {
@@ -45,7 +38,6 @@ public class TDengineWriter extends Writer {
for (int i = 0; i < mandatoryNumber; i++) {
writerSplitConfigs.add(this.originalConfig);
}
-
return writerSplitConfigs;
}
}
@@ -53,7 +45,6 @@ public class TDengineWriter extends Writer {
public static class Task extends Writer.Task {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
- private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");
private Configuration writerSliceConfig;
@Override
@@ -68,53 +59,71 @@ public class TDengineWriter extends Writer {
@Override
public void startWrite(RecordReceiver lineReceiver) {
-
-
- String host = this.writerSliceConfig.getString(HOST);
- int port = this.writerSliceConfig.getInt(PORT);
- String dbname = this.writerSliceConfig.getString(DBNAME);
- String user = this.writerSliceConfig.getString(USER);
- String password = this.writerSliceConfig.getString(PASSWORD);
-
+ Set keys = this.writerSliceConfig.getKeys();
Properties properties = new Properties();
- String cfgdir = this.writerSliceConfig.getString(JniConnection.PROPERTY_KEY_CONFIG_DIR);
- if (cfgdir != null && !cfgdir.isEmpty()) {
- properties.setProperty(JniConnection.PROPERTY_KEY_CONFIG_DIR, cfgdir);
- }
- String timezone = this.writerSliceConfig.getString(JniConnection.PROPERTY_KEY_TIME_ZONE);
- if (timezone != null && !timezone.isEmpty()) {
- properties.setProperty(JniConnection.PROPERTY_KEY_TIME_ZONE, timezone);
- }
- String locale = this.writerSliceConfig.getString(JniConnection.PROPERTY_KEY_LOCALE);
- if (locale != null && !locale.isEmpty()) {
- properties.setProperty(JniConnection.PROPERTY_KEY_LOCALE, locale);
- }
- String charset = this.writerSliceConfig.getString(JniConnection.PROPERTY_KEY_CHARSET);
- if (charset != null && !charset.isEmpty()) {
- properties.setProperty(JniConnection.PROPERTY_KEY_CHARSET, charset);
+ for (String key : keys) {
+ String value = this.writerSliceConfig.getString(key);
+ properties.setProperty(key, value);
}
String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME);
if (peerPluginName.equals("opentsdbreader")) {
+ // opentsdb json protocol use JNI and schemaless API to write
+ String host = properties.getProperty(Key.HOST);
+ int port = Integer.parseInt(properties.getProperty(Key.PORT));
+ String dbname = properties.getProperty(Key.DBNAME);
+ String user = properties.getProperty(Key.USER);
+ String password = properties.getProperty(Key.PASSWORD);
+
+ JniConnection conn = null;
try {
- JniConnection conn = new JniConnection(properties);
+ conn = new JniConnection(properties);
conn.open(host, port, dbname, user, password);
LOG.info("TDengine connection established, host: " + host + ", port: " + port + ", dbname: " + dbname + ", user: " + user);
- writeOpentsdb(lineReceiver, conn);
- conn.close();
- LOG.info("TDengine connection closed");
+ int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, DEFAULT_BATCH_SIZE));
+ writeOpentsdb(lineReceiver, conn, batchSize);
} catch (Exception e) {
LOG.error(e.getMessage());
e.printStackTrace();
+ } finally {
+ try {
+ if (conn != null)
+ conn.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ LOG.info("TDengine connection closed");
}
+ } else {
+ // other
}
}
- private void writeOpentsdb(RecordReceiver lineReceiver, JniConnection conn) {
+ private void writeOpentsdb(RecordReceiver lineReceiver, JniConnection conn, int batchSize) {
try {
Record record;
+ StringBuilder sb = new StringBuilder();
+ long recordIndex = 1;
while ((record = lineReceiver.getFromReader()) != null) {
- String jsonData = recordToString(record);
+ if (batchSize == 1) {
+ String jsonData = recordToString(record);
+ LOG.debug(">>> " + jsonData);
+ conn.insertOpentsdbJson(jsonData);
+ } else if (recordIndex % batchSize == 1) {
+ sb.append("[").append(recordToString(record)).append(",");
+ } else if (recordIndex % batchSize == 0) {
+ sb.append(recordToString(record)).append("]");
+ String jsonData = sb.toString();
+ LOG.debug(">>> " + jsonData);
+ conn.insertOpentsdbJson(jsonData);
+ sb.delete(0, sb.length());
+ } else {
+ sb.append(recordToString(record)).append(",");
+ }
+ recordIndex++;
+ }
+ if (sb.length() != 0 && sb.charAt(0) == '[') {
+ String jsonData = sb.deleteCharAt(sb.length() - 1).append("]").toString();
LOG.debug(">>> " + jsonData);
conn.insertOpentsdbJson(jsonData);
}
@@ -127,7 +136,7 @@ public class TDengineWriter extends Writer {
private String recordToString(Record record) {
int recordLength = record.getColumnNumber();
if (0 == recordLength) {
- return NEWLINE_FLAG;
+ return "";
}
Column column;
StringBuilder sb = new StringBuilder();
@@ -136,7 +145,6 @@ public class TDengineWriter extends Writer {
sb.append(column.asString()).append("\t");
}
sb.setLength(sb.length() - 1);
- sb.append(NEWLINE_FLAG);
return sb.toString();
}
}