diff --git a/core/src/main/job/mongodb2tdengine.json b/core/src/main/job/mongodb2tdengine.json
index 0667bddd..4cfc987e 100644
--- a/core/src/main/job/mongodb2tdengine.json
+++ b/core/src/main/job/mongodb2tdengine.json
@@ -55,18 +55,21 @@
"dbname": "test",
"user": "root",
"password": "taosdata",
- "measurement": "market_snapshot",
- "tag_set": {
+ "stable": "market_snapshot",
+ "batchSize": 35,
+ "tagColumn": {
"product": "cu",
"instrumentID": 0
},
- "field_set": {
+ "fieldColumn": {
"lastPrice": 2,
"askPrice1": 3,
"bidPrice1": 4,
"volume": 5
},
- "timestamp": 1
+ "timestampColumn": {
+ "tradeTime": 1
+ }
}
}
}
diff --git a/tdenginewriter/doc/tdenginewriter.md b/tdenginewriter/doc/tdenginewriter.md
index 8e55b189..c9c222a2 100644
--- a/tdenginewriter/doc/tdenginewriter.md
+++ b/tdenginewriter/doc/tdenginewriter.md
@@ -2,20 +2,21 @@
## 1 快速介绍
-TDengineWriter 插件实现了写入数据到 TDengine 的功能。 在底层实现上, TDengineWriter 通过 JNI的方式调用libtaos.so/tao.dll中的方法,连接 TDengine
-数据库实例,并执行schemaless的写入。 TDengineWriter 面向ETL开发工程师,他们使用 TDengineWriter 从数仓导入数据到 TDengine。同时,TDengineWriter
-亦可以作为数据迁移工具为DBA等用户提供服务。
+TDengineWriter插件实现了写入数据到TDengine数据库功能。可用于离线同步其它数据库的数据到TDengine。
## 2 实现原理
-TDengineWriter 通过 DataX 框架获取 Reader
-生成的协议数据,根据reader的类型解析数据,通过JNI方式调用libtaos.so(或taos.dll)中的方法,使用schemaless的方式写入到TDengine。
+TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据,根据reader的类型解析数据。目前有两种写入方式:
+
+1. 对于OpenTSDBReader, TDengineWriter通过JNI方式调用TDengine客户端库文件(taos.lib或taos.dll)中的方法,使用[schemaless的方式](https://www.taosdata.com/cn/documentation/insert#schemaless)写入。
+
+2. 对于其它数据源,会根据配置生成SQL语句, 通过[taos-jdbcdriver](https://www.taosdata.com/cn/documentation/connector/java)批量写入。
+
+这样区分的原因是OpenTSDBReader将opentsdb的数据统一读取为json字符串,Writer端接收到的数据只有1列。而其它Reader插件一般会把数据放在不同列。
## 3 功能说明
-
-### 3.1 配置样例
-
-* 这里使用一份从OpenTSDB产生到 TDengine 导入的数据。
+### 3.1 从OpenTSDB到TDengine
+#### 3.1.1 配置样例
```json
{
@@ -54,46 +55,189 @@ TDengineWriter 通过 DataX 框架获取 Reader
}
```
-### 3.2 参数说明
+#### 3.1.2 参数说明
-* **host**
- * 描述:TDengine实例的host。
+| 参数 | 描述 | 是否必选 | 默认值 |
+| --------- | -------------------- | -------- | -------- |
+| host | TDengine实例的host | 是 | 无 |
+| port | TDengine实例的port | 是 | 无 |
+| user | TDengine实例的用户名 | 否 | root |
+| password | TDengine实例的密码 | 否 | taosdata |
+| dbname | 目的数据库的名称 | 是 | 无 |
+| batchSize | 每次批量插入多少记录 | 否 | 1 |
- * 必选:是
- * 默认值:无
-* **port**
- * 描述:TDengine实例的port。
- * 必选:是
- * 默认值:无
-* **dbname**
- * 描述:目的数据库的名称。
+#### 3.1.3 类型转换
- * 必选:是
+目前,由于OpenTSDBReader将opentsdb的数据统一读取为json字符串,TDengineWriter 在做Opentsdb到TDengine的迁移时,按照以下类型进行处理:
- * 默认值:无
-* **username**
- * 描述:TDengine实例的用户名
- * 必选:是
- * 默认值:无
-* **password**
- * 描述:TDengine实例的密码
- * 必选:是
- * 默认值:无
-
-### 3.3 类型转换
-
-目前,由于opentsdbreader将opentsdb的数据统一读取为json字符串,TDengineWriter 在做Opentsdb到TDengine的迁移时,按照以下类型进行处理:
-
-| OpenTSDB数据类型 | DataX 内部类型| TDengine 数据类型 |
-| -------- | ----- | -------- |
+| OpenTSDB数据类型 | DataX 内部类型 | TDengine 数据类型 |
+| ---------------- | -------------- | ----------------- |
| timestamp | Date | timestamp |
| Integer(value) | Double | double |
-| Float(value) | Double | double |
-| String(value) | String | binary |
+| Float(value) | Double | double |
+| String(value) | String | binary |
| Integer(tag) | String | binary |
-| Float(tag) | String |binary |
-| String(tag) | String |binary |
+| Float(tag) | String | binary |
+| String(tag) | String | binary |
+
+### 3.2 从MongoDB到TDengine
+
+#### 3.2.1 配置样例
+```json
+{
+ "job": {
+ "setting": {
+ "speed": {
+ "channel": 2
+ }
+ },
+ "content": [
+ {
+ "reader": {
+ "name": "mongodbreader",
+ "parameter": {
+ "address": [
+ "127.0.0.1:27017"
+ ],
+ "userName": "user",
+ "mechanism": "SCRAM-SHA-1",
+ "userPassword": "password",
+ "authDb": "admin",
+ "dbName": "test",
+ "collectionName": "stock",
+ "column": [
+ {
+ "name": "stockID",
+ "type": "string"
+ },
+ {
+ "name": "tradeTime",
+ "type": "date"
+ },
+ {
+ "name": "lastPrice",
+ "type": "double"
+ },
+ {
+ "name": "askPrice1",
+ "type": "double"
+ },
+ {
+ "name": "bidPrice1",
+ "type": "double"
+ },
+ {
+ "name": "volume",
+ "type": "int"
+ }
+ ]
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "host": "localhost",
+ "port": 6030,
+ "dbname": "test",
+ "user": "root",
+ "password": "taosdata",
+ "stable": "stock",
+ "tagColumn": {
+ "industry": "energy",
+ "stockID": 0
+ },
+ "fieldColumn": {
+ "lastPrice": 2,
+ "askPrice1": 3,
+ "bidPrice1": 4,
+ "volume": 5
+ },
+ "timestampColumn": {
+ "tradeTime": 1
+ }
+ }
+ }
+ }
+ ]
+ }
+}
+```
+
+**注:本配置的writer部分同样适用于关系型数据库**
+
+
+#### 3.2.2 参数说明
+| 参数 | 描述 | 是否必选 | 默认值 | 备注 |
+| --------------- | -------------------- | ---------------- | -------- | ------------------ |
+| host | TDengine实例的host | 是 | 无 |
+| port | TDengine实例的port | 是 | 无 |
+| user | TDengine实例的用户名 | 否 | root |
+| password | TDengine实例的密码 | 否 | taosdata |
+| dbname | 目的数据库的名称 | 是 | 无 |
+| batchSize | 每次批量插入多少记录 | 否 | 1000 |
+| stable | 目标超级表的名称 | 是(OpenTSDB除外) | 无 |
+| tagColumn | 标签列的列名和位置 | 否 | 无 | 位置索引均从0开始 |
+| fieldColumn | 字段列的列名和位置 | 否 | 无 | |
+| timestampColumn | 时间戳列的列名和位置 | 否 | 无 | 时间戳列只能有一个 |
+
+#### 3.3.3 自动建表规则
+##### 3.3.3.1 超级表创建规则
+
+如果配置了tagColumn、 fieldColumn和timestampColumn将会在插入第一条数据前,自动创建超级表。
+数据列的类型从第1条记录自动推断, 标签列默认类型为`NCHAR(64)`, 比如示例配置,可能生成以下建表语句:
+
+```sql
+CREATE STABLE IF NOT EXISTS market_snapshot (
+ tadetime TIMESTAMP,
+ lastprice DOUBLE,
+ askprice1 DOUBLE,
+ bidprice1 DOUBLE,
+ volume INT
+)
+TAGS(
+ industry NCHAR(64),
+ stockID NCHAR(64
+);
+```
+
+##### 3.3.3.2 子表创建规则
+
+子表结果与超表相同,子表表名生成规则:
+1. 将标签的value 组合成为如下的字符串: `tag_value1!tag_value2!tag_value3`。
+2. 计算该字符串的 MD5 散列值 "md5_val"。
+3. "t_md5val"作为子表名。其中的 "t" 是固定的前缀。
+
+#### 3.3.4 用户提前建表
+
+如果你已经创建好目标超级表,那么tagColumn、 fieldColumn和timestampColumn三个字段均可省略, 插件将通过执行通过`describe stableName`获取表结构的信息。
+此时要求接收到的Record中Column的顺序和执行`describe stableName`返回的列顺序相同, 比如通过`describe stableName`返回以下内容:
+```
+ Field | Type | Length | Note |
+=================================================================================
+ ts | TIMESTAMP | 8 | |
+ current | DOUBLE | 8 | |
+ location | BINARY | 10 | TAG |
+```
+那么插件收到的数据第1列必须代表时间戳,第2列必须代表电流,第3列必须代表位置。
+
+#### 3.3.5 注意事项
+
+1. tagColumn、 fieldColumn和timestampColumn三个字段用于描述目标表的结构信息,这三个配置字段必须同时存在或同时省略。
+2. 如果存在以上三个配置,且目标表也已经存在,则两者必须一致。**一致性**由用户自己保证,插件不做检查。不一致可能会导致插入失败或插入数据错乱。
+3. 插件优先使用配置文件中指定的表结构。
+
+#### 3.3.6 类型转换
+
+| MongoDB 数据类型 | DataX 内部类型 | TDengine 数据类型 |
+| ---------------- | -------------- | ----------------- |
+| int, Long | Long | BIGINT |
+| double | Double | DOUBLE |
+| string, array | String | NCHAR(64) |
+| date | Date | TIMESTAMP |
+| boolean | Boolean | BOOL |
+| bytes | Bytes | BINARY |
+
## 4 性能报告
@@ -127,13 +271,13 @@ TDengineWriter 通过 DataX 框架获取 Reader
#### 4.2.1 单表测试报告
-| 通道数| DataX速度(Rec/s)|DataX流量(MB/s)| DataX机器网卡流出流量(MB/s)|DataX机器运行负载|DB网卡进入流量(MB/s)|DB运行负载|DB TPS|
-|--------| --------|--------|--------|--------|--------|--------|--------|
-|1| | | | | | | |
-|4| | | | | | | |
-|8| | | | | | | |
-|16| | | | | | | |
-|32| | | | | | | |
+| 通道数 | DataX速度(Rec/s) | DataX流量(MB/s) | DataX机器网卡流出流量(MB/s) | DataX机器运行负载 | DB网卡进入流量(MB/s) | DB运行负载 | DB TPS |
+| ------ | ---------------- | --------------- | --------------------------- | ----------------- | -------------------- | ---------- | ------ |
+| 1 | | | | | | | |
+| 4 | | | | | | | |
+| 8 | | | | | | | |
+| 16 | | | | | | | |
+| 32 | | | | | | | |
说明:
@@ -143,9 +287,23 @@ TDengineWriter 通过 DataX 框架获取 Reader
#### 4.2.4 性能测试小结
-1.
-2.
## 5 约束限制
-## FAQ
\ No newline at end of file
+## FAQ
+
+### 如何选取要同步的数据的范围?
+
+数据范围的选取在Reader插件端配置,对于不同的Reader插件配置方法往往不同。比如对于mysqlreader, 可以用sql语句指定数据范围。对于opentsdbreader, 用beginDateTime和endDateTime两个配置项指定数据范围。
+
+### 如何一次导入多张源表?
+
+如果Reader插件支持一次读多张表,Writer插件就能一次导入多张表。如果Reader不支持多多张表,可以建多个job,分别导入。Writer插件只负责写数据。
+
+### 1张源表导入之后对应TDengine中多少张表?
+
+这是又tagColumn决定的,如果所有tag列的值都相同,目标表也只有一个。源表有多少不同的tag组合,目标超表就会有多少子表。
+
+### 源表和目标表的字段顺序一致吗?
+
+TDengine要求每个表第一列是时间戳列,后边是普通字段,最后是标签列。如果源表不是这个顺序,插件在自动建表是自动调整。
\ No newline at end of file
diff --git a/tdenginewriter/pom.xml b/tdenginewriter/pom.xml
index d658d4a2..8eb94b33 100644
--- a/tdenginewriter/pom.xml
+++ b/tdenginewriter/pom.xml
@@ -19,6 +19,11 @@
+
+ com.taosdata.jdbc
+ taos-jdbcdriver
+ 2.0.34
+
com.alibaba.datax
datax-common
@@ -37,7 +42,11 @@
${junit-version}
test
-
+
+ org.apache.commons
+ commons-lang3
+ ${commons-lang3-version}
+
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DefaultDataHandler.java
deleted file mode 100644
index a1d52d75..00000000
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DefaultDataHandler.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package com.alibaba.datax.plugin.writer;
-
-import com.alibaba.datax.common.element.Column;
-import com.alibaba.datax.common.element.Record;
-import com.alibaba.datax.common.plugin.RecordReceiver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Properties;
-
-public class DefaultDataHandler implements DataHandler {
- private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
-
- @Override
- public long handle(RecordReceiver lineReceiver, Properties properties) {
- long count = 0;
- Record record;
- while ((record = lineReceiver.getFromReader()) != null) {
-
- int recordLength = record.getColumnNumber();
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < recordLength; i++) {
- Column column = record.getColumn(i);
- sb.append(column.asString()).append("\t");
- }
- sb.setLength(sb.length() - 1);
- LOG.debug(sb.toString());
-
- count++;
- }
- return count;
- }
-
-}
\ No newline at end of file
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java
similarity index 77%
rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandler.java
rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java
index 94d1db30..686ac27b 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandler.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java
@@ -1,4 +1,4 @@
-package com.alibaba.datax.plugin.writer;
+package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.plugin.RecordReceiver;
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandlerFactory.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandlerFactory.java
similarity index 81%
rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandlerFactory.java
rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandlerFactory.java
index a488e7d5..1f740d7e 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/DataHandlerFactory.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandlerFactory.java
@@ -1,4 +1,4 @@
-package com.alibaba.datax.plugin.writer;
+package com.alibaba.datax.plugin.writer.tdenginewriter;
public class DataHandlerFactory {
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java
new file mode 100644
index 00000000..733f49c5
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java
@@ -0,0 +1,101 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.taosdata.jdbc.TSDBPreparedStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * 默认DataHandler
+ */
+public class DefaultDataHandler implements DataHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
+
+ static {
+ try {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public long handle(RecordReceiver lineReceiver, Properties properties) {
+ SchemaManager schemaManager = new SchemaManager(properties);
+ if (!schemaManager.configValid()) {
+ return 0;
+ }
+
+ try {
+ Connection conn = getTaosConnection(properties);
+ if (conn == null) {
+ return 0;
+ }
+ if (schemaManager.shouldGuessSchema()) {
+ LOG.info("无法从配置文件获取表结构信息,尝试从数据库获取");
+ boolean success = schemaManager.getFromDB(conn);
+ if (!success) {
+ return 0;
+ }
+ } else {
+
+ }
+ int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, "1000"));
+ return write(lineReceiver, conn, batchSize, schemaManager);
+ } catch (Exception e) {
+ LOG.error("write failed " + e.getMessage());
+ e.printStackTrace();
+ }
+ return 0;
+ }
+
+
+ private Connection getTaosConnection(Properties properties) throws SQLException {
+ // 检查必要参数
+ String host = properties.getProperty(Key.HOST);
+ String port = properties.getProperty(Key.PORT);
+ String dbname = properties.getProperty(Key.DBNAME);
+ String user = properties.getProperty(Key.USER);
+ String password = properties.getProperty(Key.PASSWORD);
+ if (host == null || port == null || dbname == null || user == null || password == null) {
+ String keys = String.join(" ", Key.HOST, Key.PORT, Key.DBNAME, Key.USER, Key.PASSWORD);
+ LOG.error("Required options missing, please check: " + keys);
+ return null;
+ }
+ String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", host, port, dbname, user, password);
+ LOG.info("TDengine connection established, host:{} port:{} dbname:{} user:{}", host, port, dbname, user);
+ return DriverManager.getConnection(jdbcUrl);
+ }
+
+ /**
+ * 使用SQL批量写入
+ *
+ * @return 成功写入记录数
+ * @throws SQLException
+ */
+ private long write(RecordReceiver lineReceiver, Connection conn, int batchSize, SchemaManager scm) throws SQLException {
+ Record record = lineReceiver.getFromReader();
+ if (record == null) {
+ return 0;
+ }
+ if (scm.shouldCreateTable()) {
+ scm.createSTable(conn, record);
+ }
+ String pq = String.format("INSERT INTO ? USING %s TAGS(%s) (%s) values (%s)", scm.getStable(), scm.getTagValuesPlaceHolder(), scm.getJoinedFieldNames(), scm.getFieldValuesPlaceHolder());
+ LOG.info("Prepared SQL: {}", pq);
+ try (TSDBPreparedStatement stmt = (TSDBPreparedStatement) conn.prepareStatement(pq)) {
+ JDBCBatchWriter batchWriter = new JDBCBatchWriter(stmt, scm, batchSize);
+ do {
+ batchWriter.append(record);
+ } while ((record = lineReceiver.getFromReader()) != null);
+ batchWriter.flush();
+ return batchWriter.getCount();
+ }
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java
new file mode 100644
index 00000000..17023d03
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java
@@ -0,0 +1,149 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
+import com.taosdata.jdbc.TSDBPreparedStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * 使用JDBC原生写入接口批量写入。
+ * 有两个限制条件导致批量写入的代码逻辑过于复杂,以至于需要开发新的类来封装。
+ * 1. 用户必须提前把需要批量写入的数据搜集到ArrayList中
+ * 2. 每批写入的表名必须相同。
+ * 这个类的实现逻辑是:
+ * 1. 先把属于同一子表的Record缓存起来
+ * 2. 缓存的数量达到batchSize阈值,自动执行一次批量写入
+ * 3. 最后一批数据需要用户手动flush才能写入
+ */
+public class JDBCBatchWriter {
+ public static final Logger LOG = LoggerFactory.getLogger(JDBCBatchWriter.class);
+
+ private TSDBPreparedStatement stmt;
+ private SchemaManager scm;
+ private int batchSize;
+ // 缓存Record, key为tableName
+ Map> buf = new HashMap<>();
+ // 缓存表的标签值, key为tableName
+ Map tableTagValues = new HashMap<>();
+ private long sucCount = 0;
+ private final int tsColIndex;
+ private List fieldList;
+ private Map fieldIndexMap;
+
+ public JDBCBatchWriter(TSDBPreparedStatement stmt, SchemaManager scm, int batchSize) {
+ this.stmt = stmt;
+ this.scm = scm;
+ this.batchSize = batchSize;
+ this.tsColIndex = scm.getTsColIndex();
+ this.fieldList = scm.getFieldList();
+ this.fieldIndexMap = scm.getFieldIndexMap();
+ }
+
+
+ public void append(Record record) throws SQLException {
+ String[] tagValues = scm.getTagValuesFromRecord(record);
+ String tableName = scm.computeTableName(tagValues);
+ if (buf.containsKey(tableName)) {
+ List lis = buf.get(tableName);
+ lis.add(record);
+ if (lis.size() == batchSize) {
+ executeBatch(tableName);
+ lis.clear();
+ }
+ } else {
+ List lis = new ArrayList<>(batchSize);
+ lis.add(record);
+ buf.put(tableName, lis);
+ tableTagValues.put(tableName, tagValues);
+ }
+ }
+
+ /**
+ * 执行单表批量写入
+ *
+ * @param tableName
+ * @throws SQLException
+ */
+ private void executeBatch(String tableName) throws SQLException {
+ // 表名
+ stmt.setTableName(tableName);
+ List records = buf.get(tableName);
+ // 标签
+ String[] tagValues = tableTagValues.get(tableName);
+ LOG.debug("executeBatch {}", String.join(",", tagValues));
+ for (int i = 0; i < tagValues.length; ++i) {
+ stmt.setTagNString(i, tagValues[i]);
+ }
+ // 时间戳
+ ArrayList tsList = records.stream().map(r -> r.getColumn(tsColIndex).asDate().getTime()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setTimestamp(0, tsList);
+ // 字段
+ Record record = records.get(0);
+ for (int i = 0; i < fieldList.size(); ) {
+ String fieldName = fieldList.get(i);
+ int index = fieldIndexMap.get(fieldName);
+ Column column = record.getColumn(index);
+ switch (column.getType()) {
+ case LONG:
+ ArrayList lisLong = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setLong(++i, lisLong);
+ break;
+ case DOUBLE:
+ ArrayList lisDouble = records.stream().map(r -> r.getColumn(index).asDouble()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setDouble(++i, lisDouble);
+ break;
+ case STRING:
+ ArrayList lisString = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setNString(++i, lisString, 64);
+ break;
+ case DATE:
+ ArrayList lisTs = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setTimestamp(++i, lisTs);
+ break;
+ case BOOL:
+ ArrayList lisBool = records.stream().map(r -> r.getColumn(index).asBoolean()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setBoolean(++i, lisBool);
+ break;
+ case BYTES:
+ ArrayList lisBytes = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new));
+ stmt.setString(++i, lisBytes, 64);
+ break;
+ default:
+ throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, column.getType().toString());
+ }
+ }
+ // 执行
+ stmt.columnDataAddBatch();
+ stmt.columnDataExecuteBatch();
+ // 更新计数器
+ sucCount += records.size();
+ }
+
+ /**
+ * 把缓存的Record全部写入
+ */
+ public void flush() throws SQLException {
+ for (String tabName : buf.keySet()) {
+ if (buf.get(tabName).size() > 0) {
+ executeBatch(tabName);
+ }
+ }
+ stmt.columnDataCloseBatch();
+ }
+
+ /**
+ * @return 成功写入的数据量
+ */
+ public long getCount() {
+ return sucCount;
+ }
+}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnection.java
similarity index 98%
rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java
rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnection.java
index 3ce786e5..0aabe32a 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnection.java
@@ -1,4 +1,4 @@
-package com.alibaba.datax.plugin.writer;
+package com.alibaba.datax.plugin.writer.tdenginewriter;
import java.util.Properties;
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/Key.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java
similarity index 52%
rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/Key.java
rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java
index b240bce4..090a7999 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/Key.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java
@@ -1,4 +1,4 @@
-package com.alibaba.datax.plugin.writer;
+package com.alibaba.datax.plugin.writer.tdenginewriter;
public class Key {
public static final String HOST = "host";
@@ -7,5 +7,8 @@ public class Key {
public static final String USER = "user";
public static final String PASSWORD = "password";
public static final String BATCH_SIZE = "batchSize";
-
+ public static final String STABLE = "stable";
+ public static final String TAG_COLUMN = "tagColumn";
+ public static final String FIELD_COLUMN = "fieldColumn";
+ public static final String TIMESTAMP_COLUMN = "timestampColumn";
}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/OpentsdbDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java
similarity index 98%
rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/OpentsdbDataHandler.java
rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java
index 599e5f3e..52f1aa7a 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/OpentsdbDataHandler.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java
@@ -1,4 +1,4 @@
-package com.alibaba.datax.plugin.writer;
+package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java
new file mode 100644
index 00000000..b3d7b7e3
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java
@@ -0,0 +1,255 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class SchemaManager {
+ private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class);
+
+ private String stable; // 目标超表名
+ private Map fixedTagValue = new HashMap<>(); // 固定标签值 标签名 -> 标签值
+ private Map tagIndexMap = new HashMap<>(); // 动态标签值 标签名 -> 列索引
+ private Map fieldIndexMap = new HashMap<>(); // 字段名 -> 字段索引
+ private String tsColName; // 时间戳列名
+ private int tsColIndex = -1; // 时间戳列索引
+ private List fieldList = new ArrayList<>();
+ private List tagList = new ArrayList<>();
+ private boolean canInferSchemaFromConfig = false;
+
+
+ public SchemaManager() {
+ }
+
+ public SchemaManager(Properties properties) {
+ getFromConfig(properties);
+ }
+
+ private String mapDataxType(Column.Type type) {
+ switch (type) {
+ case LONG:
+ return "BIGINT";
+ case DOUBLE:
+ return "DOUBLE";
+ case STRING:
+ return "NCHAR(64)";
+ case DATE:
+ return "TIMESTAMP";
+ case BOOL:
+ return "BOOL";
+ case BYTES:
+ return "BINARY";
+ default:
+ throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, type.toString());
+ }
+ }
+
+ public void setStable(String stable) {
+ stable = stable;
+ }
+
+ public String getStable() {
+ return stable;
+ }
+
+ private void getFromConfig(Properties properties) {
+ stable = properties.getProperty(Key.STABLE);
+ if (stable == null) {
+ LOG.error("配置错误: no stable");
+ return;
+ }
+ for (Object key : properties.keySet()) {
+ String k = (String) key;
+ String v = properties.getProperty(k);
+
+ String[] ps = k.split("\\.");
+ if (ps.length == 1) {
+ continue;
+ }
+ if (k.startsWith(Key.TAG_COLUMN)) {
+ String tagName = ps[1];
+ try {
+ Integer tagIndex = Integer.parseInt(v);
+ this.tagIndexMap.put(tagName, tagIndex);
+ tagList.add(tagName);
+ } catch (NumberFormatException e) {
+ fixedTagValue.put(tagName, v);
+ tagList.add(tagName);
+ }
+ } else if (k.startsWith(Key.FIELD_COLUMN)) {
+ String fieldName = ps[1];
+ Integer fileIndex = Integer.parseInt(v);
+ fieldIndexMap.put(fieldName, fileIndex);
+ } else if (k.startsWith(Key.TIMESTAMP_COLUMN)) {
+ tsColName = ps[1];
+ tsColIndex = Integer.parseInt(v);
+ }
+ }
+ List sortedFieldName = fieldIndexMap.entrySet().stream().sorted((x, y) -> x.getValue().compareTo(y.getValue())).map(e -> e.getKey()).collect(Collectors.toList());
+ fieldList.addAll(sortedFieldName); // 排序的目的是保证自动建表时列的顺序和输入数据的列的顺序保持一致
+ canInferSchemaFromConfig = tsColIndex > -1 && !(fixedTagValue.isEmpty() && tagIndexMap.isEmpty()) && !fieldIndexMap.isEmpty();
+ LOG.info("配置文件解析结果:fixedTags=[{}] ,tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", fixedTagValue.keySet()), String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex);
+ }
+
+ public boolean shouldGuessSchema() {
+ return !canInferSchemaFromConfig;
+ }
+
+ public boolean shouldCreateTable() {
+ return canInferSchemaFromConfig;
+ }
+
+ public boolean configValid() {
+ boolean valid = (tagList.size() > 0 && fieldList.size() > 0 && tsColIndex > -1) || (tagList.size() == 0 && fieldList.size() == 0 && tsColIndex == -1);
+ if (!valid) {
+ LOG.error("配置错误. tag_columns,field_columns,timestamp_column必须同时存在或同时省略,当前解析结果: tag_columns: {}, field_columns:{}, timestamp_column:{} tsColIndex:{}",
+ (fixedTagValue.size() + tagIndexMap.size()), fieldIndexMap.size(), tsColName, tsColIndex);
+ }
+ return valid;
+ }
+
+ /**
+ * 通过执行`describe dbname.stable`命令,获取表的schema.
+ * describe命名返回有4列内容,分布是:Field,Type,Length,Note
+ *
+ * @return 成功返回true,如果超表不存在或其他错误则返回false
+ */
+ public boolean getFromDB(Connection conn) {
+ try {
+ List stables = getSTables(conn);
+ if (!stables.contains(stable)) {
+ LOG.error("超级表{}不存在,无法从数据库获取表结构信息.", stable);
+ return false;
+ }
+ } catch (SQLException e) {
+ LOG.error(e.getMessage());
+ e.printStackTrace();
+ return false;
+ }
+ try (Statement stmt = conn.createStatement()) {
+ ResultSet rs = stmt.executeQuery("describe " + stable);
+ int colIndex = 0;
+ while (rs.next()) {
+ String name = rs.getString(1);
+ String type = rs.getString(2);
+ String note = rs.getString(4);
+ if ("TIMESTAMP".equals(type)) {
+ tsColName = name;
+ tsColIndex = colIndex;
+ } else if ("TAG".equals(note)) {
+ tagIndexMap.put(name, colIndex);
+ tagList.add(name);
+ } else {
+ fieldIndexMap.put(name, colIndex);
+ fieldList.add(name);
+ }
+ colIndex++;
+ }
+ LOG.info("从数据库获取的表结构概要:tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex);
+ return true;
+ } catch (SQLException e) {
+ LOG.error(e.getMessage());
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ public static List getSTables(Connection conn) throws SQLException {
+ List stables = new ArrayList<>();
+ try (Statement stmt = conn.createStatement()) {
+ ResultSet rs = stmt.executeQuery("show stables");
+ while (rs.next()) {
+ String name = rs.getString(1);
+ stables.add(name);
+ }
+ }
+ return stables;
+ }
+
+ public void createSTable(Connection conn, Record record) throws SQLException {
+ StringBuilder sb = new StringBuilder();
+ sb.append("CREATE STABLE IF NOT EXISTS ").append(stable).append("(");
+ sb.append(tsColName).append(" ").append("TIMESTAMP,");
+ for (String fieldName : fieldList) {
+ sb.append(fieldName).append(' ');
+ Column col = record.getColumn(fieldIndexMap.get(fieldName));
+ String tdType = mapDataxType(col.getType());
+ sb.append(tdType).append(',');
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ sb.append(") TAGS(");
+ for (String tagName : tagList) {
+ sb.append(tagName).append(" NCHAR(64),");
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ sb.append(")");
+ String q = sb.toString();
+ LOG.info("自动创建超级表:" + q);
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute(q);
+ }
+ }
+
+ public String[] getTagValuesFromRecord(Record record) {
+ String[] tagValues = new String[tagList.size()];
+ for (int i = 0; i < tagList.size(); ++i) {
+ if (fixedTagValue.containsKey(tagList.get(i))) {
+ tagValues[i] = fixedTagValue.get(tagList.get(i));
+ } else {
+ int tagIndex = tagIndexMap.get(tagList.get(i));
+ tagValues[i] = record.getColumn(tagIndex).asString();
+ }
+ }
+ return tagValues;
+ }
+
+ public Map getFieldIndexMap() {
+ return fieldIndexMap;
+ }
+
+ public List getFieldList() {
+ return fieldList;
+ }
+
+ public String getJoinedFieldNames() {
+ return tsColName + ", " + String.join(", ", fieldList);
+ }
+
+ public int getTsColIndex() {
+ return tsColIndex;
+ }
+
+ public String getTagValuesPlaceHolder() {
+ return tagList.stream().map(x -> "?").collect(Collectors.joining(","));
+ }
+
+ public String getFieldValuesPlaceHolder() {
+ return "?, " + fieldList.stream().map(x -> "?").collect(Collectors.joining(", "));
+ }
+
+ /**
+ * 计算子表表名
+ *
+ * - 将标签的value 组合成为如下的字符串: tag_value1!tag_value2!tag_value3。
+ * - 计算该字符串的 MD5 散列值 "md5_val"。
+ * - "t_md5val"作为子表名。其中的 "t" 是固定的前缀。
+ *
+ *
+ * @param tagValues
+ * @return
+ */
+ public String computeTableName(String[] tagValues) {
+ String s = String.join("!", tagValues);
+ return "t_" + DigestUtils.md5Hex(s);
+ }
+}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java
similarity index 84%
rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java
rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java
index 84600802..70ea5737 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriter.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java
@@ -1,4 +1,4 @@
-package com.alibaba.datax.plugin.writer;
+package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.plugin.RecordReceiver;
@@ -64,7 +64,13 @@ public class TDengineWriter extends Writer {
String value = this.writerSliceConfig.getString(key);
properties.setProperty(key, value);
}
-
+ if (!keys.contains(Key.USER)) {
+ properties.setProperty(Key.USER, "root");
+ }
+ if (!keys.contains(Key.PASSWORD)) {
+ properties.setProperty(Key.PASSWORD, "taosdata");
+ }
+ LOG.debug("========================properties==========================\n" + properties.toString());
String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME);
LOG.debug("start to handle record from: " + peerPluginName);
DataHandler handler = DataHandlerFactory.build(peerPluginName);
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriterErrorCode.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java
similarity index 75%
rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriterErrorCode.java
rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java
index 02e87079..994f1e89 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/TDengineWriterErrorCode.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java
@@ -1,9 +1,10 @@
-package com.alibaba.datax.plugin.writer;
+package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.spi.ErrorCode;
public enum TDengineWriterErrorCode implements ErrorCode {
- RUNTIME_EXCEPTION("TDengineWriter-00", "运行时异常");
+ RUNTIME_EXCEPTION("TDengineWriter-00", "运行时异常"),
+ TYPE_ERROR("TDengineWriter-00", "Datax类型无法正确映射到TDengine类型");
private final String code;
private final String description;
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/com_alibaba_datax_plugin_writer_JniConnection.h b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/com_alibaba_datax_plugin_writer_tdenginewriter_JniConnection.h
similarity index 100%
rename from tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/com_alibaba_datax_plugin_writer_JniConnection.h
rename to tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/com_alibaba_datax_plugin_writer_tdenginewriter_JniConnection.h
diff --git a/tdenginewriter/src/main/resources/plugin.json b/tdenginewriter/src/main/resources/plugin.json
index 6c900a15..e54f65ff 100755
--- a/tdenginewriter/src/main/resources/plugin.json
+++ b/tdenginewriter/src/main/resources/plugin.json
@@ -1,9 +1,9 @@
{
"name": "tdenginewriter",
- "class": "com.alibaba.datax.plugin.writer.TDengineWriter",
+ "class": "com.alibaba.datax.plugin.writer.tdenginewriter.TDengineWriter",
"description": {
"useScene": "data migration to tdengine",
- "mechanism": "use JNI to write data to tdengine."
+ "mechanism": "use JNI or taos-jdbc to write data to tdengine."
},
"developer": "zyyang-taosdata"
}
\ No newline at end of file
diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/JniConnectionTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnectionTest.java
similarity index 90%
rename from tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/JniConnectionTest.java
rename to tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnectionTest.java
index 040cf34c..09c3df26 100644
--- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/JniConnectionTest.java
+++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnectionTest.java
@@ -1,4 +1,4 @@
-package com.alibaba.datax.plugin.writer;
+package com.alibaba.datax.plugin.writer.tdenginewriter;
import org.junit.Test;
diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java
new file mode 100644
index 00000000..43928db9
--- /dev/null
+++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java
@@ -0,0 +1,21 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public class TDengineWriterTest {
+
+
+ @Test
+ public void testGetSchema() throws ClassNotFoundException, SQLException {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", "wozai.fun", "6030", "test", "root", "taosdata");
+ Connection conn = DriverManager.getConnection(jdbcUrl);
+ SchemaManager schemaManager = new SchemaManager();
+ schemaManager.setStable("test1");
+ schemaManager.getFromDB(conn);
+ }
+}