From c2027ed102085f6e789f77ef1342a7df1dc750f4 Mon Sep 17 00:00:00 2001 From: dingbo Date: Fri, 19 Nov 2021 16:38:55 +0800 Subject: [PATCH 1/9] migrate datax.py to python3, and compatible with python2 --- core/src/main/bin/datax.py | 114 +++++++++++++++++++++---------------- userGuid.md | 2 +- 2 files changed, 65 insertions(+), 51 deletions(-) diff --git a/core/src/main/bin/datax.py b/core/src/main/bin/datax.py index 1099ed3a..4811ae8d 100755 --- a/core/src/main/bin/datax.py +++ b/core/src/main/bin/datax.py @@ -1,23 +1,26 @@ #!/usr/bin/env python # -*- coding:utf-8 -*- -import sys -import os -import signal -import subprocess -import time -import re -import socket -import json -from optparse import OptionParser -from optparse import OptionGroup -from string import Template import codecs +import json +import os import platform +import re +import signal +import socket +import subprocess +import sys +import time +from optparse import OptionGroup +from optparse import OptionParser +from string import Template + +ispy2 = sys.version_info.major == 2 def isWindows(): return platform.system() == 'Windows' + DATAX_HOME = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) DATAX_VERSION = 'DATAX-OPENSOURCE-3.0' @@ -52,13 +55,19 @@ def getLocalIp(): def suicide(signum, e): global child_process - print >> sys.stderr, "[Error] DataX receive unexpected signal %d, starts to suicide." % (signum) + if ispy2: + print >> sys.stderr, "[Error] DataX receive unexpected signal %d, starts to suicide." % (signum) + else: + print("[Error] DataX receive unexpected signal %d, starts to suicide." % (signum), sys.stderr) if child_process: child_process.send_signal(signal.SIGQUIT) time.sleep(1) child_process.kill() - print >> sys.stderr, "DataX Process was killed ! you did ?" + if ispy2: + print >> sys.stderr, "DataX Process was killed ! you did ?" + else: + print("DataX Process was killed ! you did ?", sys.stderr) sys.exit(RET_STATE["KILL"]) @@ -92,10 +101,10 @@ def getOptionParser(): 'if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".' 'Note: you should config in you job tableName with ${tableName}.') prodEnvOptionGroup.add_option("-r", "--reader", metavar="", - action="store", dest="reader",type="string", + action="store", dest="reader", type="string", help='View job config[reader] template, eg: mysqlreader,streamreader') prodEnvOptionGroup.add_option("-w", "--writer", metavar="", - action="store", dest="writer",type="string", + action="store", dest="writer", type="string", help='View job config[writer] template, eg: mysqlwriter,streamwriter') parser.add_option_group(prodEnvOptionGroup) @@ -108,45 +117,50 @@ def getOptionParser(): parser.add_option_group(devEnvOptionGroup) return parser + def generateJobConfigTemplate(reader, writer): - readerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n" % (reader,reader,reader) - writerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n " % (writer,writer,writer) - print readerRef - print writerRef + readerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n" % ( + reader, reader, reader) + writerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n " % ( + writer, writer, writer) + print(readerRef) + print(writerRef) jobGuid = 'Please save the following configuration as a json file and use\n python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json \nto run the job.\n' - print jobGuid - jobTemplate={ - "job": { - "setting": { - "speed": { - "channel": "" - } - }, - "content": [ - { - "reader": {}, - "writer": {} - } - ] - } + print(jobGuid) + jobTemplate = { + "job": { + "setting": { + "speed": { + "channel": "" + } + }, + "content": [ + { + "reader": {}, + "writer": {} + } + ] + } } - readerTemplatePath = "%s/plugin/reader/%s/plugin_job_template.json" % (DATAX_HOME,reader) - writerTemplatePath = "%s/plugin/writer/%s/plugin_job_template.json" % (DATAX_HOME,writer) + readerTemplatePath = "%s/plugin/reader/%s/plugin_job_template.json" % (DATAX_HOME, reader) + writerTemplatePath = "%s/plugin/writer/%s/plugin_job_template.json" % (DATAX_HOME, writer) try: - readerPar = readPluginTemplate(readerTemplatePath); - except Exception, e: - print "Read reader[%s] template error: can\'t find file %s" % (reader,readerTemplatePath) + readerPar = readPluginTemplate(readerTemplatePath) + except: + print("Read reader[%s] template error: can\'t find file %s" % (reader, readerTemplatePath)) try: - writerPar = readPluginTemplate(writerTemplatePath); - except Exception, e: - print "Read writer[%s] template error: : can\'t find file %s" % (writer,writerTemplatePath) - jobTemplate['job']['content'][0]['reader'] = readerPar; - jobTemplate['job']['content'][0]['writer'] = writerPar; - print json.dumps(jobTemplate, indent=4, sort_keys=True) + writerPar = readPluginTemplate(writerTemplatePath) + except: + print("Read writer[%s] template error: : can\'t find file %s" % (writer, writerTemplatePath)) + jobTemplate['job']['content'][0]['reader'] = readerPar + jobTemplate['job']['content'][0]['writer'] = writerPar + print(json.dumps(jobTemplate, indent=4, sort_keys=True)) + def readPluginTemplate(plugin): with open(plugin, 'r') as f: - return json.load(f) + return json.load(f) + def isUrl(path): if not path: @@ -168,7 +182,7 @@ def buildStartCommand(options, args): if options.remoteDebug: tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG - print 'local ip: ', getLocalIp() + print('local ip: ', getLocalIp()) if options.loglevel: tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel)) @@ -198,11 +212,11 @@ def buildStartCommand(options, args): def printCopyright(): - print ''' + print(''' DataX (%s), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. -''' % DATAX_VERSION +''' % DATAX_VERSION) sys.stdout.flush() @@ -211,7 +225,7 @@ if __name__ == "__main__": parser = getOptionParser() options, args = parser.parse_args(sys.argv[1:]) if options.reader is not None and options.writer is not None: - generateJobConfigTemplate(options.reader,options.writer) + generateJobConfigTemplate(options.reader, options.writer) sys.exit(RET_STATE['OK']) if len(args) != 1: parser.print_help() diff --git a/userGuid.md b/userGuid.md index 153c8111..16771a5e 100644 --- a/userGuid.md +++ b/userGuid.md @@ -10,7 +10,7 @@ DataX本身作为数据同步框架,将不同数据源的同步抽象为从源 - Linux - [JDK(1.8以上,推荐1.8) ](http://www.oracle.com/technetwork/cn/java/javase/downloads/index.html) -- [Python(推荐Python2.6.X) ](https://www.python.org/downloads/) +- [Python(2或3都可以) ](https://www.python.org/downloads/) - [Apache Maven 3.x](https://maven.apache.org/download.cgi) (Compile DataX) # Quick Start From 9587f9b93be0833b35d512649fced60c9593c4a2 Mon Sep 17 00:00:00 2001 From: dingbo Date: Thu, 11 Nov 2021 15:24:09 +0800 Subject: [PATCH 2/9] add option authDB to mongodb reader doc --- mongodbreader/doc/mongodbreader.md | 1 + 1 file changed, 1 insertion(+) diff --git a/mongodbreader/doc/mongodbreader.md b/mongodbreader/doc/mongodbreader.md index b61493e6..99d25731 100644 --- a/mongodbreader/doc/mongodbreader.md +++ b/mongodbreader/doc/mongodbreader.md @@ -127,6 +127,7 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据,通过主控的J * address: MongoDB的数据地址信息,因为MonogDB可能是个集群,则ip端口信息需要以Json数组的形式给出。【必填】 * userName:MongoDB的用户名。【选填】 * userPassword: MongoDB的密码。【选填】 +* authDb: MongoDB认证数据库【选填】 * collectionName: MonogoDB的集合名。【必填】 * column:MongoDB的文档列名。【必填】 * name:Column的名字。【必填】 From ef6c53e7e22c74efc2b03dd1babad21bef501e18 Mon Sep 17 00:00:00 2001 From: dingbo Date: Mon, 29 Nov 2021 13:15:49 +0800 Subject: [PATCH 3/9] add tdengine weriter --- tdenginewriter/doc/tdenginewriter.md | 398 ++++++++++++++++++ tdenginewriter/pom.xml | 107 +++++ tdenginewriter/src/main/assembly/package.xml | 34 ++ .../writer/tdenginewriter/DataHandler.java | 12 + .../tdenginewriter/DataHandlerFactory.java | 10 + .../tdenginewriter/DefaultDataHandler.java | 105 +++++ .../tdenginewriter/JDBCBatchWriter.java | 244 +++++++++++ .../writer/tdenginewriter/JniConnection.java | 89 ++++ .../plugin/writer/tdenginewriter/Key.java | 14 + .../plugin/writer/tdenginewriter/Msg.java | 20 + .../tdenginewriter/OpentsdbDataHandler.java | 99 +++++ .../writer/tdenginewriter/SchemaManager.java | 271 ++++++++++++ .../writer/tdenginewriter/TDengineWriter.java | 91 ++++ .../TDengineWriterErrorCode.java | 32 ++ ...ugin_writer_tdenginewriter_JniConnection.h | 105 +++++ tdenginewriter/src/main/resources/plugin.json | 9 + .../main/resources/plugin_job_template.json | 10 + .../resources/tdenginewritermsg.properties | 6 + .../tdenginewritermsg_en_US.properties | 6 + .../tdenginewritermsg_zh_CN.properties | 6 + .../tdenginewriter/JniConnectionTest.java | 21 + .../writer/tdenginewriter/MessageTest.java | 25 ++ .../tdenginewriter/TDengineWriterTest.java | 31 ++ 23 files changed, 1745 insertions(+) create mode 100644 tdenginewriter/doc/tdenginewriter.md create mode 100644 tdenginewriter/pom.xml create mode 100644 tdenginewriter/src/main/assembly/package.xml create mode 100644 tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java create mode 100644 tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandlerFactory.java create mode 100644 tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java create mode 100644 tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java create mode 100644 tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnection.java create mode 100644 tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java create mode 100644 tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Msg.java create mode 100644 tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java create mode 100644 tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java create mode 100644 tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java create mode 100644 tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java create mode 100644 tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/com_alibaba_datax_plugin_writer_tdenginewriter_JniConnection.h create mode 100644 tdenginewriter/src/main/resources/plugin.json create mode 100644 tdenginewriter/src/main/resources/plugin_job_template.json create mode 100644 tdenginewriter/src/main/resources/tdenginewritermsg.properties create mode 100644 tdenginewriter/src/main/resources/tdenginewritermsg_en_US.properties create mode 100644 tdenginewriter/src/main/resources/tdenginewritermsg_zh_CN.properties create mode 100644 tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnectionTest.java create mode 100644 tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/MessageTest.java create mode 100644 tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java diff --git a/tdenginewriter/doc/tdenginewriter.md b/tdenginewriter/doc/tdenginewriter.md new file mode 100644 index 00000000..062fac2c --- /dev/null +++ b/tdenginewriter/doc/tdenginewriter.md @@ -0,0 +1,398 @@ +# DataX TDengineWriter + +## 1 快速介绍 + +TDengineWriter插件实现了写入数据到TDengine数据库功能。可用于离线同步其它数据库的数据到TDengine。 + +## 2 实现原理 + +TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据,根据reader的类型解析数据。目前有两种写入方式: + +1. 对于OpenTSDBReader, TDengineWriter通过JNI方式调用TDengine客户端库文件(taos.lib或taos.dll)中的方法,使用[schemaless的方式](https://www.taosdata.com/cn/documentation/insert#schemaless)写入。 + +2. 对于其它数据源,会根据配置生成SQL语句, 通过[taos-jdbcdriver](https://www.taosdata.com/cn/documentation/connector/java)批量写入。 + +这样区分的原因是OpenTSDBReader将opentsdb的数据统一读取为json字符串,Writer端接收到的数据只有1列。而其它Reader插件一般会把数据放在不同列。 + +## 3 功能说明 +### 3.1 从OpenTSDB到TDengine +#### 3.1.1 配置样例 + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "opentsdbreader", + "parameter": { + "endpoint": "http://192.168.1.180:4242", + "column": [ + "weather_temperature" + ], + "beginDateTime": "2021-01-01 00:00:00", + "endDateTime": "2021-01-01 01:00:00" + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "host": "192.168.1.180", + "port": 6030, + "dbname": "test", + "user": "root", + "password": "taosdata" + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} +``` + +#### 3.1.2 参数说明 + +| 参数 | 描述 | 是否必选 | 默认值 | +| --------- | -------------------- | -------- | -------- | +| host | TDengine实例的host | 是 | 无 | +| port | TDengine实例的port | 是 | 无 | +| user | TDengine实例的用户名 | 否 | root | +| password | TDengine实例的密码 | 否 | taosdata | +| dbname | 目的数据库的名称 | 是 | 无 | +| batchSize | 每次批量插入多少记录 | 否 | 1 | + + +#### 3.1.3 类型转换 + +目前,由于OpenTSDBReader将opentsdb的数据统一读取为json字符串,TDengineWriter 在做Opentsdb到TDengine的迁移时,按照以下类型进行处理: + +| OpenTSDB数据类型 | DataX 内部类型 | TDengine 数据类型 | +| ---------------- | -------------- | ----------------- | +| timestamp | Date | timestamp | +| Integer(value) | Double | double | +| Float(value) | Double | double | +| String(value) | String | binary | +| Integer(tag) | String | binary | +| Float(tag) | String | binary | +| String(tag) | String | binary | + +### 3.2 从MongoDB到TDengine + +#### 3.2.1 配置样例 +```json +{ + "job": { + "setting": { + "speed": { + "channel": 2 + } + }, + "content": [ + { + "reader": { + "name": "mongodbreader", + "parameter": { + "address": [ + "127.0.0.1:27017" + ], + "userName": "user", + "mechanism": "SCRAM-SHA-1", + "userPassword": "password", + "authDb": "admin", + "dbName": "test", + "collectionName": "stock", + "column": [ + { + "name": "stockID", + "type": "string" + }, + { + "name": "tradeTime", + "type": "date" + }, + { + "name": "lastPrice", + "type": "double" + }, + { + "name": "askPrice1", + "type": "double" + }, + { + "name": "bidPrice1", + "type": "double" + }, + { + "name": "volume", + "type": "int" + } + ] + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "host": "localhost", + "port": 6030, + "dbname": "test", + "user": "root", + "password": "taosdata", + "stable": "stock", + "tagColumn": { + "industry": "energy", + "stockID": 0 + }, + "fieldColumn": { + "lastPrice": 2, + "askPrice1": 3, + "bidPrice1": 4, + "volume": 5 + }, + "timestampColumn": { + "tradeTime": 1 + } + } + } + } + ] + } +} +``` + +**注:本配置的writer部分同样适用于关系型数据库** + + +#### 3.2.2 参数说明 +| 参数 | 描述 | 是否必选 | 默认值 | 备注 | +| --------------- | -------------------- | ---------------- | -------- | ------------------ | +| host | TDengine实例的host | 是 | 无 | +| port | TDengine实例的port | 是 | 无 | +| user | TDengine实例的用户名 | 否 | root | +| password | TDengine实例的密码 | 否 | taosdata | +| dbname | 目的数据库的名称 | 是 | 无 | +| batchSize | 每次批量插入多少记录 | 否 | 1000 | +| stable | 目标超级表的名称 | 是(OpenTSDB除外) | 无 | +| tagColumn | 标签列的列名和位置 | 否 | 无 | 位置索引均从0开始 | +| fieldColumn | 字段列的列名和位置 | 否 | 无 | | +| timestampColumn | 时间戳列的列名和位置 | 否 | 无 | 时间戳列只能有一个 | + +#### 3.2.3 自动建表规则 +##### 3.2.3.1 超级表创建规则 + +如果配置了tagColumn、 fieldColumn和timestampColumn将会在插入第一条数据前,自动创建超级表。
+数据列的类型从第1条记录自动推断, 标签列默认类型为`NCHAR(64)`, 比如示例配置,可能生成以下建表语句: + +```sql +CREATE STABLE IF NOT EXISTS market_snapshot ( + tadetime TIMESTAMP, + lastprice DOUBLE, + askprice1 DOUBLE, + bidprice1 DOUBLE, + volume INT +) +TAGS( + industry NCHAR(64), + stockID NCHAR(64) +); +``` + +##### 3.2.3.2 子表创建规则 + +子表结果与超表相同,子表表名生成规则: +1. 将标签的value 组合成为如下的字符串: `tag_value1!tag_value2!tag_value3`。 +2. 计算该字符串的 MD5 散列值 "md5_val"。 +3. "t_md5val"作为子表名。其中的 "t" 是固定的前缀。 + +#### 3.2.4 用户提前建表 + +如果你已经创建好目标超级表,那么tagColumn、 fieldColumn和timestampColumn三个字段均可省略, 插件将通过执行通过`describe stableName`获取表结构的信息。 +此时要求接收到的Record中Column的顺序和执行`describe stableName`返回的列顺序相同, 比如通过`describe stableName`返回以下内容: +``` + Field | Type | Length | Note | +================================================================================= + ts | TIMESTAMP | 8 | | + current | DOUBLE | 8 | | + location | BINARY | 10 | TAG | +``` +那么插件收到的数据第1列必须代表时间戳,第2列必须代表电流,第3列必须代表位置。 + +#### 3.2.5 注意事项 + +1. tagColumn、 fieldColumn和timestampColumn三个字段用于描述目标表的结构信息,这三个配置字段必须同时存在或同时省略。 +2. 如果存在以上三个配置,且目标表也已经存在,则两者必须一致。**一致性**由用户自己保证,插件不做检查。不一致可能会导致插入失败或插入数据错乱。 +3. 插件优先使用配置文件中指定的表结构。 + +#### 3.2.6 类型转换 + +| MongoDB 数据类型 | DataX 内部类型 | TDengine 数据类型 | +| ---------------- | -------------- | ----------------- | +| int, Long | Long | BIGINT | +| double | Double | DOUBLE | +| string, array | String | NCHAR(64) | +| date | Date | TIMESTAMP | +| boolean | Boolean | BOOL | +| bytes | Bytes | BINARY | + +### 3.3 从关系型数据库到TDengine +writer部分的配置规则和上述MongoDB的示例是一样的,这里给出一个MySQL的示例。 + +#### 3.3.1 MySQL中表结构 +```sql +CREATE TABLE IF NOT EXISTS weather( + station varchar(100), + latitude DOUBLE, + longtitude DOUBLE, + `date` DATE, + TMAX int, + TMIN int +) +``` + +#### 3.3.2 配置文件示例 + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "username": "root", + "password": "passw0rd", + "column": [ + "*" + ], + "splitPk": "station", + "connection": [ + { + "table": [ + "weather" + ], + "jdbcUrl": [ + "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8" + ] + } + ] + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "host": "127.0.0.1", + "port": 6030, + "dbname": "test", + "user": "root", + "password": "taosdata", + "batchSize": 1000, + "stable": "weather", + "tagColumn": { + "station": 0 + }, + "fieldColumn": { + "latitude": 1, + "longtitude": 2, + "tmax": 4, + "tmin": 5 + }, + "timestampColumn":{ + "date": 3 + } + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} +``` + + +## 4 性能报告 + +### 4.1 环境准备 + +#### 4.1.1 数据特征 + +建表语句: + +单行记录类似于: + +#### 4.1.2 机器参数 + +* 执行DataX的机器参数为: + 1. cpu: + 2. mem: + 3. net: 千兆双网卡 + 4. disc: DataX 数据不落磁盘,不统计此项 + +* TDengine数据库机器参数为: + 1. cpu: + 2. mem: + 3. net: 千兆双网卡 + 4. disc: + +#### 4.1.3 DataX jvm 参数 + + -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError + +### 4.2 测试报告 + +#### 4.2.1 单表测试报告 + +| 通道数 | DataX速度(Rec/s) | DataX流量(MB/s) | DataX机器网卡流出流量(MB/s) | DataX机器运行负载 | DB网卡进入流量(MB/s) | DB运行负载 | DB TPS | +| ------ | ---------------- | --------------- | --------------------------- | ----------------- | -------------------- | ---------- | ------ | +| 1 | | | | | | | | +| 4 | | | | | | | | +| 8 | | | | | | | | +| 16 | | | | | | | | +| 32 | | | | | | | | + +说明: + +1. 这里的单表,主键类型为 bigint(20),自增。 +2. batchSize 和 通道个数,对性能影响较大。 +3. 16通道,4096批量提交时,出现 full gc 2次。 + +#### 4.2.4 性能测试小结 + + +## 5 约束限制 + +1. 本插件自动创建超级表时NCHAR类型的长度固定为64,对于包含长度大于64的字符串的数据源,将不支持。 +2. 标签列不能包含null值,如果包含会被过滤掉。 + +## FAQ + +### 如何选取要同步的数据的范围? + +数据范围的选取在Reader插件端配置,对于不同的Reader插件配置方法往往不同。比如对于mysqlreader, 可以用sql语句指定数据范围。对于opentsdbreader, 用beginDateTime和endDateTime两个配置项指定数据范围。 + +### 如何一次导入多张源表? + +如果Reader插件支持一次读多张表,Writer插件就能一次导入多张表。如果Reader不支持多多张表,可以建多个job,分别导入。Writer插件只负责写数据。 + +### 一张源表导入之后对应TDengine中多少张表? + +这是由tagColumn决定的,如果所有tag列的值都相同,那么目标表只有一个。源表有多少不同的tag组合,目标超表就有多少子表。 + +### 源表和目标表的字段顺序一致吗? + +TDengine要求每个表第一列是时间戳列,后边是普通字段,最后是标签列。如果源表不是这个顺序,插件在自动建表是自动调整。 + +### 插件如何确定各列的数据类型? + +根据收到的第一批数据自动推断各列的类型。 + +### 为什么插入10年前的数据会抛异常`TDengine ERROR (2350): failed to execute batch bind` ? + +因为创建数据库的时候,默认保留10年的数据。可以手动指定要保留多长时间的数据,比如:`CREATE DATABASE power KEEP 36500;`。 \ No newline at end of file diff --git a/tdenginewriter/pom.xml b/tdenginewriter/pom.xml new file mode 100644 index 00000000..054f2ef8 --- /dev/null +++ b/tdenginewriter/pom.xml @@ -0,0 +1,107 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + com.alibaba.datax.tdenginewriter + tdenginewriter + 0.0.1-SNAPSHOT + + + 8 + 8 + + + + + com.taosdata.jdbc + taos-jdbcdriver + 2.0.34 + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + + com.taosdata.jdbc + taos-jdbcdriver + 2.0.34 + + + + junit + junit + ${junit-version} + test + + + org.apache.commons + commons-lang3 + ${commons-lang3-version} + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.12.4 + + + + **/*Test.java + + + + + true + + + + + + + \ No newline at end of file diff --git a/tdenginewriter/src/main/assembly/package.xml b/tdenginewriter/src/main/assembly/package.xml new file mode 100644 index 00000000..d3b75ea2 --- /dev/null +++ b/tdenginewriter/src/main/assembly/package.xml @@ -0,0 +1,34 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/tdenginewriter + + + target/ + + tdenginewriter-0.0.1-SNAPSHOT.jar + + plugin/writer/tdenginewriter + + + + + + false + plugin/writer/tdenginewriter/libs + runtime + + + diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java new file mode 100644 index 00000000..dcc3ca8c --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java @@ -0,0 +1,12 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.common.plugin.RecordReceiver; + +import com.alibaba.datax.common.plugin.TaskPluginCollector; + +import java.util.Properties; + +public interface DataHandler { + + long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector); +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandlerFactory.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandlerFactory.java new file mode 100644 index 00000000..1f740d7e --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandlerFactory.java @@ -0,0 +1,10 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +public class DataHandlerFactory { + + public static DataHandler build(String peerPluginName) { + if (peerPluginName.equals("opentsdbreader")) + return new OpentsdbDataHandler(); + return new DefaultDataHandler(); + } +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java new file mode 100644 index 00000000..91c2b7e3 --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java @@ -0,0 +1,105 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.taosdata.jdbc.TSDBPreparedStatement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Properties; + +/** + * 默认DataHandler + */ +public class DefaultDataHandler implements DataHandler { + private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class); + + static { + try { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + } + + @Override + public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) { + SchemaManager schemaManager = new SchemaManager(properties); + if (!schemaManager.configValid()) { + return 0; + } + + try { + Connection conn = getTaosConnection(properties); + if (conn == null) { + return 0; + } + if (schemaManager.shouldGuessSchema()) { + // 无法从配置文件获取表结构信息,尝试从数据库获取 + LOG.info(Msg.get("try_get_schema_from_db")); + boolean success = schemaManager.getFromDB(conn); + if (!success) { + return 0; + } + } else { + + } + int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, "1000")); + if (batchSize < 5) { + // batchSize太小,会增加自动类型推断错误的概率,建议改大后重试 + LOG.error(Msg.get("batch_size_too_small")); + return 0; + } + return write(lineReceiver, conn, batchSize, schemaManager, collector); + } catch (Exception e) { + LOG.error("write failed " + e.getMessage()); + e.printStackTrace(); + } + return 0; + } + + + private Connection getTaosConnection(Properties properties) throws SQLException { + // 检查必要参数 + String host = properties.getProperty(Key.HOST); + String port = properties.getProperty(Key.PORT); + String dbname = properties.getProperty(Key.DBNAME); + String user = properties.getProperty(Key.USER); + String password = properties.getProperty(Key.PASSWORD); + if (host == null || port == null || dbname == null || user == null || password == null) { + String keys = String.join(" ", Key.HOST, Key.PORT, Key.DBNAME, Key.USER, Key.PASSWORD); + LOG.error("Required options missing, please check: " + keys); + return null; + } + String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", host, port, dbname, user, password); + LOG.info("TDengine connection established, host:{} port:{} dbname:{} user:{}", host, port, dbname, user); + return DriverManager.getConnection(jdbcUrl); + } + + /** + * 使用SQL批量写入
+ * + * @return 成功写入记录数 + * @throws SQLException + */ + private long write(RecordReceiver lineReceiver, Connection conn, int batchSize, SchemaManager scm, TaskPluginCollector collector) throws SQLException { + Record record = lineReceiver.getFromReader(); + if (record == null) { + return 0; + } + String pq = String.format("INSERT INTO ? USING %s TAGS(%s) (%s) values (%s)", scm.getStable(), scm.getTagValuesPlaceHolder(), scm.getJoinedFieldNames(), scm.getFieldValuesPlaceHolder()); + LOG.info("Prepared SQL: {}", pq); + try (TSDBPreparedStatement stmt = (TSDBPreparedStatement) conn.prepareStatement(pq)) { + JDBCBatchWriter batchWriter = new JDBCBatchWriter(conn, stmt, scm, batchSize, collector); + do { + batchWriter.append(record); + } while ((record = lineReceiver.getFromReader()) != null); + batchWriter.flush(); + return batchWriter.getCount(); + } + } +} \ No newline at end of file diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java new file mode 100644 index 00000000..53ab9bb9 --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java @@ -0,0 +1,244 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.taosdata.jdbc.TSDBPreparedStatement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * 使用JDBC原生写入接口批量写入。
+ * 有两个限制条件导致批量写入的代码逻辑过于复杂,以至于需要开发新的类来封装。
+ * 1. 用户必须提前把需要批量写入的数据搜集到ArrayList中 + * 2. 每批写入的表名必须相同。 + * 这个类的实现逻辑是: + * 1. 先把属于同一子表的Record缓存起来 + * 2. 缓存的数量达到batchSize阈值,自动执行一次批量写入 + * 3. 最后一批数据需要用户手动flush才能写入 + */ +public class JDBCBatchWriter { + public static final Logger LOG = LoggerFactory.getLogger(JDBCBatchWriter.class); + private TSDBPreparedStatement stmt; + private SchemaManager scm; + private Connection conn; + private int batchSize; + private TaskPluginCollector collector; + + // 缓存Record, key为tableName + Map> buf = new HashMap<>(); + // 缓存表的标签值, key为tableName + Map tableTagValues = new HashMap<>(); + private long sucCount = 0; + private final int tsColIndex; + private List fieldList; + // 每个record至少应该包含的列数,用于校验数据 + private int minColNum = 0; + private Map fieldIndexMap; + private List fieldTypes = null; + + public JDBCBatchWriter(Connection conn, TSDBPreparedStatement stmt, SchemaManager scm, int batchSize, TaskPluginCollector collector) { + this.conn = conn; + this.stmt = stmt; + this.scm = scm; + this.batchSize = batchSize; + this.collector = collector; + this.tsColIndex = scm.getTsColIndex(); + this.fieldList = scm.getFieldList(); + this.fieldIndexMap = scm.getFieldIndexMap(); + this.minColNum = 1 + fieldList.size() + scm.getDynamicTagCount(); + + } + + public void initFiledTypesAndTargetTable(List records) throws SQLException { + if (fieldTypes != null) { + return; + } + guessFieldTypes(records); + if (scm.shouldCreateTable()) { + scm.createSTable(conn, fieldTypes); + } + } + + public void append(Record record) throws SQLException { + int columnNum = record.getColumnNumber(); + if (columnNum < minColNum) { + // 实际列数小于期望列数 + collector.collectDirtyRecord(record, Msg.get("column_number_error")); + return; + } + String[] tagValues = scm.getTagValuesFromRecord(record); + if (tagValues == null) { + // 标签列包含null + collector.collectDirtyRecord(record, Msg.get("tag_value_error")); + return; + } + if (!scm.hasTimestamp(record)) { + // 时间戳列为null或类型错误 + collector.collectDirtyRecord(record, Msg.get("ts_value_error")); + return; + } + String tableName = scm.computeTableName(tagValues); + if (buf.containsKey(tableName)) { + List lis = buf.get(tableName); + lis.add(record); + if (lis.size() == batchSize) { + if (fieldTypes == null) { + initFiledTypesAndTargetTable(lis); + } + executeBatch(tableName); + lis.clear(); + } + } else { + List lis = new ArrayList<>(batchSize); + lis.add(record); + buf.put(tableName, lis); + tableTagValues.put(tableName, tagValues); + } + } + + /** + * 只有String类型比较特别,测试发现值为null的列会转成String类型。所以Column的类型为String并不代表这一列的类型真的是String。 + * + * @param records + */ + private void guessFieldTypes(List records) { + fieldTypes = new ArrayList<>(fieldList.size()); + for (int i = 0; i < fieldList.size(); ++i) { + int colIndex = fieldIndexMap.get(fieldList.get(i)); + boolean ok = false; + for (int j = 0; j < records.size() && !ok; ++j) { + Column column = records.get(j).getColumn(colIndex); + Column.Type type = column.getType(); + switch (type) { + case LONG: + case DOUBLE: + case DATE: + case BOOL: + case BYTES: + if (column.getRawData() != null) { + fieldTypes.add(type); + ok = true; + } + break; + case STRING: + // 只有非null且非空的String列,才会被真的当作String类型。 + String value = column.asString(); + if (value != null && !"".equals(value)) { + fieldTypes.add(type); + ok = true; + } + break; + default: + throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString()); + } + } + if (!ok) { + // 根据采样的%d条数据,无法推断第%d列的数据类型 + throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, String.format(Msg.get("infer_column_type_error"), records.size(), i + 1)); + } + } + LOG.info("Field Types: {}", fieldTypes); + } + + /** + * 执行单表批量写入 + * + * @param tableName + * @throws SQLException + */ + private void executeBatch(String tableName) throws SQLException { + // 表名 + stmt.setTableName(tableName); + List records = buf.get(tableName); + // 标签 + String[] tagValues = tableTagValues.get(tableName); + LOG.debug("executeBatch {}", String.join(",", tagValues)); + for (int i = 0; i < tagValues.length; ++i) { + stmt.setTagNString(i, tagValues[i]); + } + // 时间戳 + ArrayList tsList = records.stream().map(r -> r.getColumn(tsColIndex).asDate().getTime()).collect(Collectors.toCollection(ArrayList::new)); + stmt.setTimestamp(0, tsList); + // 字段 + for (int i = 0; i < fieldList.size(); ) { + String fieldName = fieldList.get(i); + int index = fieldIndexMap.get(fieldName); + switch (fieldTypes.get(i)) { + case LONG: + ArrayList lisLong = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new)); + stmt.setLong(++i, lisLong); + break; + case DOUBLE: + ArrayList lisDouble = records.stream().map(r -> r.getColumn(index).asDouble()).collect(Collectors.toCollection(ArrayList::new)); + stmt.setDouble(++i, lisDouble); + break; + case STRING: + ArrayList lisString = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new)); + stmt.setNString(++i, lisString, 64); + break; + case DATE: + ArrayList lisTs = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new)); + stmt.setTimestamp(++i, lisTs); + break; + case BOOL: + ArrayList lisBool = records.stream().map(r -> r.getColumn(index).asBoolean()).collect(Collectors.toCollection(ArrayList::new)); + stmt.setBoolean(++i, lisBool); + break; + case BYTES: + ArrayList lisBytes = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new)); + stmt.setString(++i, lisBytes, 64); + break; + default: + throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString()); + } + } + // 执行 + stmt.columnDataAddBatch(); + stmt.columnDataExecuteBatch(); + // 更新计数器 + sucCount += records.size(); + } + + /** + * 把缓存的Record全部写入 + */ + public void flush() throws SQLException { + if (fieldTypes == null) { + List records = new ArrayList<>(); + for (List lis : buf.values()) { + records.addAll(lis); + if (records.size() > 100) { + break; + } + } + if (records.size() > 0) { + initFiledTypesAndTargetTable(records); + } else { + return; + } + } + for (String tabName : buf.keySet()) { + if (buf.get(tabName).size() > 0) { + executeBatch(tabName); + } + } + stmt.columnDataCloseBatch(); + } + + /** + * @return 成功写入的数据量 + */ + public long getCount() { + return sucCount; + } +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnection.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnection.java new file mode 100644 index 00000000..0aabe32a --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnection.java @@ -0,0 +1,89 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import java.util.Properties; + +public class JniConnection { + + private static final long JNI_NULL_POINTER = 0L; + private static final int JNI_SUCCESSFUL = 0; + public static final String PROPERTY_KEY_CONFIG_DIR = "cfgdir"; + public static final String PROPERTY_KEY_LOCALE = "locale"; + public static final String PROPERTY_KEY_CHARSET = "charset"; + public static final String PROPERTY_KEY_TIME_ZONE = "timezone"; + + private long conn; + + static { + System.loadLibrary("taos"); + } + + public JniConnection(Properties props) throws Exception { + initImp(props.getProperty(PROPERTY_KEY_CONFIG_DIR, null)); + + String locale = props.getProperty(PROPERTY_KEY_LOCALE); + if (setOptions(0, locale) < 0) { + throw new Exception("Failed to set locale: " + locale + ". System default will be used."); + } + String charset = props.getProperty(PROPERTY_KEY_CHARSET); + if (setOptions(1, charset) < 0) { + throw new Exception("Failed to set charset: " + charset + ". System default will be used."); + } + String timezone = props.getProperty(PROPERTY_KEY_TIME_ZONE); + if (setOptions(2, timezone) < 0) { + throw new Exception("Failed to set timezone: " + timezone + ". System default will be used."); + } + } + + public void open(String host, int port, String dbname, String user, String password) throws Exception { + if (this.conn != JNI_NULL_POINTER) { + close(); + this.conn = JNI_NULL_POINTER; + } + + this.conn = connectImp(host, port, dbname, user, password); + if (this.conn == JNI_NULL_POINTER) { + String errMsg = getErrMsgImp(0); + throw new Exception(errMsg); + } + } + + public void insertOpentsdbJson(String json) throws Exception { + if (this.conn == JNI_NULL_POINTER) { + throw new Exception("JNI connection is NULL"); + } + + long result = insertOpentsdbJson(json, this.conn); + int errCode = getErrCodeImp(this.conn, result); + if (errCode != JNI_SUCCESSFUL) { + String errMsg = getErrMsgImp(result); + freeResultSetImp(this.conn, result); + throw new Exception(errMsg); + } + freeResultSetImp(this.conn, result); + } + + public void close() throws Exception { + int code = this.closeConnectionImp(this.conn); + if (code != 0) { + throw new Exception("JNI closeConnection failed"); + } + this.conn = JNI_NULL_POINTER; + } + + private static native void initImp(String configDir); + + private static native int setOptions(int optionIndex, String optionValue); + + private native long connectImp(String host, int port, String dbName, String user, String password); + + private native int getErrCodeImp(long connection, long pSql); + + private native String getErrMsgImp(long pSql); + + private native void freeResultSetImp(long connection, long pSql); + + private native int closeConnectionImp(long connection); + + private native long insertOpentsdbJson(String json, long pSql); + +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java new file mode 100644 index 00000000..090a7999 --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java @@ -0,0 +1,14 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +public class Key { + public static final String HOST = "host"; + public static final String PORT = "port"; + public static final String DBNAME = "dbname"; + public static final String USER = "user"; + public static final String PASSWORD = "password"; + public static final String BATCH_SIZE = "batchSize"; + public static final String STABLE = "stable"; + public static final String TAG_COLUMN = "tagColumn"; + public static final String FIELD_COLUMN = "fieldColumn"; + public static final String TIMESTAMP_COLUMN = "timestampColumn"; +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Msg.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Msg.java new file mode 100644 index 00000000..89730d35 --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Msg.java @@ -0,0 +1,20 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import java.util.Locale; +import java.util.ResourceBundle; + +/** + * i18n message util + */ +public class Msg { + private static ResourceBundle bundle; + + static { + bundle = ResourceBundle.getBundle("tdenginewritermsg", Locale.getDefault()); + } + + public static String get(String key) { + return bundle.getString(key); + } + +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java new file mode 100644 index 00000000..e1b8f5dd --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java @@ -0,0 +1,99 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class OpentsdbDataHandler implements DataHandler { + private static final Logger LOG = LoggerFactory.getLogger(OpentsdbDataHandler.class); + private static final String DEFAULT_BATCH_SIZE = "1"; + + @Override + public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) { + // opentsdb json protocol use JNI and schemaless API to write + String host = properties.getProperty(Key.HOST); + int port = Integer.parseInt(properties.getProperty(Key.PORT)); + String dbname = properties.getProperty(Key.DBNAME); + String user = properties.getProperty(Key.USER); + String password = properties.getProperty(Key.PASSWORD); + + JniConnection conn = null; + long count = 0; + try { + conn = new JniConnection(properties); + conn.open(host, port, dbname, user, password); + LOG.info("TDengine connection established, host: " + host + ", port: " + port + ", dbname: " + dbname + ", user: " + user); + int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, DEFAULT_BATCH_SIZE)); + count = writeOpentsdb(lineReceiver, conn, batchSize); + } catch (Exception e) { + LOG.error(e.getMessage()); + e.printStackTrace(); + } finally { + try { + if (conn != null) + conn.close(); + } catch (Exception e) { + e.printStackTrace(); + } + LOG.info("TDengine connection closed"); + } + + return count; + } + + private long writeOpentsdb(RecordReceiver lineReceiver, JniConnection conn, int batchSize) { + long recordIndex = 1; + try { + Record record; + StringBuilder sb = new StringBuilder(); + while ((record = lineReceiver.getFromReader()) != null) { + if (batchSize == 1) { + String jsonData = recordToString(record); + LOG.debug(">>> " + jsonData); + conn.insertOpentsdbJson(jsonData); + } else if (recordIndex % batchSize == 1) { + sb.append("[").append(recordToString(record)).append(","); + } else if (recordIndex % batchSize == 0) { + sb.append(recordToString(record)).append("]"); + String jsonData = sb.toString(); + LOG.debug(">>> " + jsonData); + conn.insertOpentsdbJson(jsonData); + sb.delete(0, sb.length()); + } else { + sb.append(recordToString(record)).append(","); + } + recordIndex++; + } + if (sb.length() != 0 && sb.charAt(0) == '[') { + String jsonData = sb.deleteCharAt(sb.length() - 1).append("]").toString(); + LOG.debug(">>> " + jsonData); + conn.insertOpentsdbJson(jsonData); + } + } catch (Exception e) { + LOG.error("TDengineWriter ERROR: " + e.getMessage()); + throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e); + } + return recordIndex - 1; + } + + private String recordToString(Record record) { + int recordLength = record.getColumnNumber(); + if (0 == recordLength) { + return ""; + } + Column column; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < recordLength; i++) { + column = record.getColumn(i); + sb.append(column.asString()).append("\t"); + } + sb.setLength(sb.length() - 1); + return sb.toString(); + } +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java new file mode 100644 index 00000000..d67a6585 --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java @@ -0,0 +1,271 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.*; +import java.util.stream.Collectors; + +public class SchemaManager { + private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class); + + private String stable; // 目标超表名 + private Map fixedTagValue = new HashMap<>(); // 固定标签值 标签名 -> 标签值 + private Map tagIndexMap = new HashMap<>(); // 动态标签值 标签名 -> 列索引 + private Map fieldIndexMap = new HashMap<>(); // 字段名 -> 字段索引 + private String tsColName; // 时间戳列名 + private int tsColIndex = -1; // 时间戳列索引 + private List fieldList = new ArrayList<>(); + private List tagList = new ArrayList<>(); + private boolean canInferSchemaFromConfig = false; + + + public SchemaManager() { + } + + public SchemaManager(Properties properties) { + getFromConfig(properties); + } + + private String mapDataxType(Column.Type type) { + switch (type) { + case LONG: + return "BIGINT"; + case DOUBLE: + return "DOUBLE"; + case STRING: + return "NCHAR(64)"; + case DATE: + return "TIMESTAMP"; + case BOOL: + return "BOOL"; + case BYTES: + return "BINARY(64)"; + default: + throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, type.toString()); + } + } + + public void setStable(String stable) { + stable = stable; + } + + public String getStable() { + return stable; + } + + private void getFromConfig(Properties properties) { + stable = properties.getProperty(Key.STABLE); + if (stable == null) { + LOG.error("Config error: no stable"); + return; + } + for (Object key : properties.keySet()) { + String k = (String) key; + String v = properties.getProperty(k); + + String[] ps = k.split("\\."); + if (ps.length == 1) { + continue; + } + if (k.startsWith(Key.TAG_COLUMN)) { + String tagName = ps[1]; + try { + Integer tagIndex = Integer.parseInt(v); + this.tagIndexMap.put(tagName, tagIndex); + tagList.add(tagName); + } catch (NumberFormatException e) { + fixedTagValue.put(tagName, v); + tagList.add(tagName); + } + } else if (k.startsWith(Key.FIELD_COLUMN)) { + String fieldName = ps[1]; + Integer fileIndex = Integer.parseInt(v); + fieldIndexMap.put(fieldName, fileIndex); + } else if (k.startsWith(Key.TIMESTAMP_COLUMN)) { + tsColName = ps[1]; + tsColIndex = Integer.parseInt(v); + } + } + List sortedFieldName = fieldIndexMap.entrySet().stream().sorted((x, y) -> x.getValue().compareTo(y.getValue())).map(e -> e.getKey()).collect(Collectors.toList()); + fieldList.addAll(sortedFieldName); // 排序的目的是保证自动建表时列的顺序和输入数据的列的顺序保持一致 + canInferSchemaFromConfig = tsColIndex > -1 && !(fixedTagValue.isEmpty() && tagIndexMap.isEmpty()) && !fieldIndexMap.isEmpty(); + LOG.info("Config file parsed result:fixedTags=[{}] ,tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", fixedTagValue.keySet()), String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex); + } + + public boolean shouldGuessSchema() { + return !canInferSchemaFromConfig; + } + + public boolean shouldCreateTable() { + return canInferSchemaFromConfig; + } + + public boolean configValid() { + boolean valid = (tagList.size() > 0 && fieldList.size() > 0 && tsColIndex > -1) || (tagList.size() == 0 && fieldList.size() == 0 && tsColIndex == -1); + if (!valid) { + LOG.error("Config error: tagColumn, fieldColumn and timestampColumn must be present together or absent together."); + } + return valid; + } + + /** + * 通过执行`describe dbname.stable`命令,获取表的schema.
+ * describe命名返回有4列内容,分布是:Field,Type,Length,Note
+ * + * @return 成功返回true,如果超表不存在或其他错误则返回false + */ + public boolean getFromDB(Connection conn) { + try { + List stables = getSTables(conn); + if (!stables.contains(stable)) { + LOG.error("super table {} not exist, fail to get schema from database.", stable); + return false; + } + } catch (SQLException e) { + LOG.error(e.getMessage()); + e.printStackTrace(); + return false; + } + try (Statement stmt = conn.createStatement()) { + ResultSet rs = stmt.executeQuery("describe " + stable); + int colIndex = 0; + while (rs.next()) { + String name = rs.getString(1); + String type = rs.getString(2); + String note = rs.getString(4); + if ("TIMESTAMP".equals(type)) { + tsColName = name; + tsColIndex = colIndex; + } else if ("TAG".equals(note)) { + tagIndexMap.put(name, colIndex); + tagList.add(name); + } else { + fieldIndexMap.put(name, colIndex); + fieldList.add(name); + } + colIndex++; + } + LOG.info("table info:tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex); + return true; + } catch (SQLException e) { + LOG.error(e.getMessage()); + e.printStackTrace(); + return false; + } + } + + public static List getSTables(Connection conn) throws SQLException { + List stables = new ArrayList<>(); + try (Statement stmt = conn.createStatement()) { + ResultSet rs = stmt.executeQuery("show stables"); + while (rs.next()) { + String name = rs.getString(1); + stables.add(name); + } + } + return stables; + } + + public void createSTable(Connection conn, List fieldTypes) throws SQLException { + StringBuilder sb = new StringBuilder(); + sb.append("CREATE STABLE IF NOT EXISTS ").append(stable).append("("); + sb.append(tsColName).append(" ").append("TIMESTAMP,"); + for (int i = 0; i < fieldList.size(); ++i) { + String fieldName = fieldList.get(i); + Column.Type dxType = fieldTypes.get(i); + sb.append(fieldName).append(' '); + String tdType = mapDataxType(dxType); + sb.append(tdType).append(','); + } + sb.deleteCharAt(sb.length() - 1); + sb.append(") TAGS("); + for (String tagName : tagList) { + sb.append(tagName).append(" NCHAR(64),"); + } + sb.deleteCharAt(sb.length() - 1); + sb.append(")"); + String q = sb.toString(); + LOG.info("run sql:" + q); + try (Statement stmt = conn.createStatement()) { + stmt.execute(q); + } + } + + public String[] getTagValuesFromRecord(Record record) { + String[] tagValues = new String[tagList.size()]; + for (int i = 0; i < tagList.size(); ++i) { + if (fixedTagValue.containsKey(tagList.get(i))) { + tagValues[i] = fixedTagValue.get(tagList.get(i)); + } else { + int tagIndex = tagIndexMap.get(tagList.get(i)); + tagValues[i] = record.getColumn(tagIndex).asString(); + } + if (tagValues[i] == null) { + return null; + } + } + return tagValues; + } + + public boolean hasTimestamp(Record record) { + Column column = record.getColumn(tsColIndex); + if (column.getType() == Column.Type.DATE && column.asDate() != null) { + return true; + } else { + return false; + } + } + + public Map getFieldIndexMap() { + return fieldIndexMap; + } + + public List getFieldList() { + return fieldList; + } + + public String getJoinedFieldNames() { + return tsColName + ", " + String.join(", ", fieldList); + } + + public int getTsColIndex() { + return tsColIndex; + } + + public String getTagValuesPlaceHolder() { + return tagList.stream().map(x -> "?").collect(Collectors.joining(",")); + } + + public String getFieldValuesPlaceHolder() { + return "?, " + fieldList.stream().map(x -> "?").collect(Collectors.joining(", ")); + } + + /** + * 计算子表表名 + *
    + *
  1. 将标签的value 组合成为如下的字符串: tag_value1!tag_value2!tag_value3。
  2. + *
  3. 计算该字符串的 MD5 散列值 "md5_val"。
  4. + *
  5. "t_md5val"作为子表名。其中的 "t" 是固定的前缀。
  6. + *
+ * + * @param tagValues + * @return + */ + public String computeTableName(String[] tagValues) { + String s = String.join("!", tagValues); + return "t_" + DigestUtils.md5Hex(s); + } + + public int getDynamicTagCount() { + return tagIndexMap.size(); + } +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java new file mode 100644 index 00000000..79e5238c --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java @@ -0,0 +1,91 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + + +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +public class TDengineWriter extends Writer { + + private static final String PEER_PLUGIN_NAME = "peerPluginName"; + + static { + try { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + } + + public static class Job extends Writer.Job { + + private Configuration originalConfig; + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + this.originalConfig.set(PEER_PLUGIN_NAME, getPeerPluginName()); + } + + @Override + public void destroy() { + + } + + @Override + public List split(int mandatoryNumber) { + List writerSplitConfigs = new ArrayList<>(); + for (int i = 0; i < mandatoryNumber; i++) { + writerSplitConfigs.add(this.originalConfig); + } + return writerSplitConfigs; + } + } + + public static class Task extends Writer.Task { + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + + private Configuration writerSliceConfig; + + @Override + public void init() { + this.writerSliceConfig = getPluginJobConf(); + + } + + @Override + public void destroy() { + + } + + @Override + public void startWrite(RecordReceiver lineReceiver) { + Set keys = this.writerSliceConfig.getKeys(); + Properties properties = new Properties(); + for (String key : keys) { + String value = this.writerSliceConfig.getString(key); + properties.setProperty(key, value); + } + if (!keys.contains(Key.USER)) { + properties.setProperty(Key.USER, "root"); + } + if (!keys.contains(Key.PASSWORD)) { + properties.setProperty(Key.PASSWORD, "taosdata"); + } + LOG.debug("========================properties==========================\n" + properties); + String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME); + LOG.debug("start to handle record from: " + peerPluginName); + DataHandler handler = DataHandlerFactory.build(peerPluginName); + long records = handler.handle(lineReceiver, properties, getTaskPluginCollector()); + LOG.debug("handle data finished, records: " + records); + } + + } +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java new file mode 100644 index 00000000..994f1e89 --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java @@ -0,0 +1,32 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.common.spi.ErrorCode; + +public enum TDengineWriterErrorCode implements ErrorCode { + RUNTIME_EXCEPTION("TDengineWriter-00", "运行时异常"), + TYPE_ERROR("TDengineWriter-00", "Datax类型无法正确映射到TDengine类型"); + + private final String code; + private final String description; + + private TDengineWriterErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s]. ", this.code, + this.description); + } +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/com_alibaba_datax_plugin_writer_tdenginewriter_JniConnection.h b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/com_alibaba_datax_plugin_writer_tdenginewriter_JniConnection.h new file mode 100644 index 00000000..4bdf3639 --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/com_alibaba_datax_plugin_writer_tdenginewriter_JniConnection.h @@ -0,0 +1,105 @@ +/* DO NOT EDIT THIS FILE - it is machine generated */ +#include +/* Header for class com_alibaba_datax_plugin_writer_JniConnection */ + +#ifndef _Included_com_alibaba_datax_plugin_writer_JniConnection +#define _Included_com_alibaba_datax_plugin_writer_JniConnection +#ifdef __cplusplus +extern "C" { +#endif +#undef com_alibaba_datax_plugin_writer_JniConnection_JNI_NULL_POINTER +#define com_alibaba_datax_plugin_writer_JniConnection_JNI_NULL_POINTER 0LL +#undef com_alibaba_datax_plugin_writer_JniConnection_JNI_SUCCESSFUL +#define com_alibaba_datax_plugin_writer_JniConnection_JNI_SUCCESSFUL 0L +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: initImp + * Signature: (Ljava/lang/String;)V + */ +JNIEXPORT void JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_initImp + (JNIEnv *, jclass, jstring); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: setOptions + * Signature: (ILjava/lang/String;)I + */ +JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_setOptions + (JNIEnv *, jclass, jint, jstring); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: getTsCharset + * Signature: ()Ljava/lang/String; + */ +JNIEXPORT jstring JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getTsCharset + (JNIEnv *, jclass); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: connectImp + * Signature: (Ljava/lang/String;ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;)J + */ +JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_connectImp + (JNIEnv *, jobject, jstring, jint, jstring, jstring, jstring); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: executeQueryImp + * Signature: ([BJ)J + */ +JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_executeQueryImp + (JNIEnv *, jobject, jbyteArray, jlong); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: getErrCodeImp + * Signature: (JJ)I + */ +JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrCodeImp + (JNIEnv *, jobject, jlong, jlong); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: getErrMsgImp + * Signature: (J)Ljava/lang/String; + */ +JNIEXPORT jstring JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrMsgImp + (JNIEnv *, jobject, jlong); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: getErrMsgByCode + * Signature: (J)Ljava/lang/String; + */ +JNIEXPORT jstring JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrMsgByCode + (JNIEnv *, jobject, jlong); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: getAffectedRowsImp + * Signature: (JJ)I + */ +JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getAffectedRowsImp + (JNIEnv *, jobject, jlong, jlong); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: closeConnectionImp + * Signature: (J)I + */ +JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_closeConnectionImp + (JNIEnv *, jobject, jlong); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: insertOpentsdbJson + * Signature: (Ljava/lang/String;J)J + */ +JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_insertOpentsdbJson + (JNIEnv *, jobject, jstring, jlong); + +#ifdef __cplusplus +} +#endif +#endif diff --git a/tdenginewriter/src/main/resources/plugin.json b/tdenginewriter/src/main/resources/plugin.json new file mode 100644 index 00000000..e54f65ff --- /dev/null +++ b/tdenginewriter/src/main/resources/plugin.json @@ -0,0 +1,9 @@ +{ + "name": "tdenginewriter", + "class": "com.alibaba.datax.plugin.writer.tdenginewriter.TDengineWriter", + "description": { + "useScene": "data migration to tdengine", + "mechanism": "use JNI or taos-jdbc to write data to tdengine." + }, + "developer": "zyyang-taosdata" +} \ No newline at end of file diff --git a/tdenginewriter/src/main/resources/plugin_job_template.json b/tdenginewriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..5482b26e --- /dev/null +++ b/tdenginewriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,10 @@ +{ + "name": "tdenginewriter", + "parameter": { + "host": "", + "port": 6030, + "db": "", + "user": "", + "password": "" + } +} \ No newline at end of file diff --git a/tdenginewriter/src/main/resources/tdenginewritermsg.properties b/tdenginewriter/src/main/resources/tdenginewritermsg.properties new file mode 100644 index 00000000..4aaa220b --- /dev/null +++ b/tdenginewriter/src/main/resources/tdenginewritermsg.properties @@ -0,0 +1,6 @@ +try_get_schema_fromdb=fail to get structure info of target table from configure file and will try to get it from database +batch_size_too_small='batchSize' is too small, please increase it and try again +column_number_error=number of columns is less than expected +tag_value_error=tag columns include 'null' value +ts_value_error=timestamp column type error or null +infer_column_type_error=fail to infer column type: sample count %d, column index %d \ No newline at end of file diff --git a/tdenginewriter/src/main/resources/tdenginewritermsg_en_US.properties b/tdenginewriter/src/main/resources/tdenginewritermsg_en_US.properties new file mode 100644 index 00000000..4aaa220b --- /dev/null +++ b/tdenginewriter/src/main/resources/tdenginewritermsg_en_US.properties @@ -0,0 +1,6 @@ +try_get_schema_fromdb=fail to get structure info of target table from configure file and will try to get it from database +batch_size_too_small='batchSize' is too small, please increase it and try again +column_number_error=number of columns is less than expected +tag_value_error=tag columns include 'null' value +ts_value_error=timestamp column type error or null +infer_column_type_error=fail to infer column type: sample count %d, column index %d \ No newline at end of file diff --git a/tdenginewriter/src/main/resources/tdenginewritermsg_zh_CN.properties b/tdenginewriter/src/main/resources/tdenginewritermsg_zh_CN.properties new file mode 100644 index 00000000..4b9552fd --- /dev/null +++ b/tdenginewriter/src/main/resources/tdenginewritermsg_zh_CN.properties @@ -0,0 +1,6 @@ +try_get_schema_fromdb=\u65e0\u6cd5\u4ece\u914d\u7f6e\u6587\u4ef6\u83b7\u53d6\u8868\u7ed3\u6784\u4fe1\u606f\uff0c\u5c1d\u8bd5\u4ece\u6570\u636e\u5e93\u83b7\u53d6 +batch_size_too_small=batchSize\u592a\u5c0f\uff0c\u4f1a\u589e\u52a0\u81ea\u52a8\u7c7b\u578b\u63a8\u65ad\u9519\u8bef\u7684\u6982\u7387\uff0c\u5efa\u8bae\u6539\u5927\u540e\u91cd\u8bd5 +column_number_error=\u5b9e\u9645\u5217\u6570\u5c0f\u4e8e\u671f\u671b\u5217\u6570 +tag_value_error=\u6807\u7b7e\u5217\u5305\u542bnull +ts_value_error=\u65f6\u95f4\u6233\u5217\u4e3anull\u6216\u7c7b\u578b\u9519\u8bef +infer_column_type_error=\u6839\u636e\u91c7\u6837\u7684%d\u6761\u6570\u636e\uff0c\u65e0\u6cd5\u63a8\u65ad\u7b2c%d\u5217\u7684\u6570\u636e\u7c7b\u578b diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnectionTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnectionTest.java new file mode 100644 index 00000000..09c3df26 --- /dev/null +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnectionTest.java @@ -0,0 +1,21 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import org.junit.Test; + +import java.util.Properties; + +public class JniConnectionTest { + + @Test + public void test() throws Exception { + JniConnection connection = new JniConnection(new Properties()); + + connection.open("192.168.56.105", 6030, "test", "root", "taosdata"); + + String json = "{\"metric\":\"weather_temperature\",\"timestamp\":1609430400000,\"value\":123,\"tags\":{\"location\":\"beijing\",\"id\":\"t123\"}}"; + connection.insertOpentsdbJson(json); + + connection.close(); + } + +} \ No newline at end of file diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/MessageTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/MessageTest.java new file mode 100644 index 00000000..b1b7ddd8 --- /dev/null +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/MessageTest.java @@ -0,0 +1,25 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import org.junit.Test; + +import java.util.Locale; +import java.util.ResourceBundle; + +import org.junit.Assert; + +public class MessageTest { + @Test + public void testChineseMessage() { + Locale local = new Locale("zh", "CN"); + ResourceBundle bundle = ResourceBundle.getBundle("tdenginewritermsg", local); + String msg = bundle.getString("try_get_schema_fromdb"); + Assert.assertEquals("无法从配置文件获取表结构信息,尝试从数据库获取", msg); + } + + @Test + public void testDefaultMessage() { + ResourceBundle bundle = ResourceBundle.getBundle("tdenginewritermsg", Locale.getDefault()); + String msg = bundle.getString("try_get_schema_fromdb"); + System.out.println(msg); + } +} diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java new file mode 100644 index 00000000..62bf7040 --- /dev/null +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java @@ -0,0 +1,31 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +public class TDengineWriterTest { + + + @Test + public void testGetSchema() throws ClassNotFoundException, SQLException { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", "wozai.fun", "6030", "test", "root", "taosdata"); + Connection conn = DriverManager.getConnection(jdbcUrl); + SchemaManager schemaManager = new SchemaManager(); + schemaManager.setStable("test1"); + schemaManager.getFromDB(conn); + } + + @Test + public void dropTestTable() throws ClassNotFoundException, SQLException { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", "wozai.fun", "6030", "test", "root", "taosdata"); + Connection conn = DriverManager.getConnection(jdbcUrl); + Statement stmt = conn.createStatement(); + stmt.execute("drop table market_snapshot"); + } +} From 4ba65407140f9d5e086c2a80708c667d7dc09532 Mon Sep 17 00:00:00 2001 From: dingbo Date: Mon, 29 Nov 2021 13:37:47 +0800 Subject: [PATCH 4/9] fix job template error --- .../main/resources/plugin_job_template.json | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/tdenginewriter/src/main/resources/plugin_job_template.json b/tdenginewriter/src/main/resources/plugin_job_template.json index 5482b26e..39c9c969 100644 --- a/tdenginewriter/src/main/resources/plugin_job_template.json +++ b/tdenginewriter/src/main/resources/plugin_job_template.json @@ -1,10 +1,24 @@ { "name": "tdenginewriter", "parameter": { - "host": "", + "host": "127.0.0.1", "port": 6030, - "db": "", - "user": "", - "password": "" + "dbname": "test", + "user": "root", + "password": "taosdata", + "batchSize": 1000, + "stable": "weather", + "tagColumn": { + "station": 0 + }, + "fieldColumn": { + "latitude": 1, + "longtitude": 2, + "tmax": 4, + "tmin": 5 + }, + "timestampColumn":{ + "date": 3 + } } } \ No newline at end of file From 400b34e8cd34542703ce18efd7b2a8de219f1a9a Mon Sep 17 00:00:00 2001 From: dingbo Date: Mon, 29 Nov 2021 14:42:01 +0800 Subject: [PATCH 5/9] add pom module tdengine writer --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index 3bd75a31..1a9da81b 100644 --- a/pom.xml +++ b/pom.xml @@ -73,6 +73,7 @@ mysqlwriter + tdenginewriter drdswriter odpswriter txtfilewriter From beb0d76136246c1d462cdba3214b309c7fb31282 Mon Sep 17 00:00:00 2001 From: dingbo Date: Thu, 2 Dec 2021 15:08:52 +0800 Subject: [PATCH 6/9] rename dbname to dbName. use english doc as default. --- tdenginewriter/doc/tdenginewriter-CN.md | 399 ++++++++++++++++++ tdenginewriter/doc/tdenginewriter.md | 244 ++++------- .../plugin/writer/tdenginewriter/Key.java | 2 +- 3 files changed, 489 insertions(+), 156 deletions(-) create mode 100644 tdenginewriter/doc/tdenginewriter-CN.md diff --git a/tdenginewriter/doc/tdenginewriter-CN.md b/tdenginewriter/doc/tdenginewriter-CN.md new file mode 100644 index 00000000..ffd3efc9 --- /dev/null +++ b/tdenginewriter/doc/tdenginewriter-CN.md @@ -0,0 +1,399 @@ +# DataX TDengineWriter + +简体中文| [English](./tdenginewriter.md) + +## 1 快速介绍 + +TDengineWriter插件实现了写入数据到TDengine数据库功能。可用于离线同步其它数据库的数据到TDengine。 + +## 2 实现原理 + +TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据,根据reader的类型解析数据。目前有两种写入方式: + +1. 对于OpenTSDBReader, TDengineWriter通过JNI方式调用TDengine客户端库文件(taos.lib或taos.dll)中的方法,使用[schemaless的方式](https://www.taosdata.com/cn/documentation/insert#schemaless)写入。 + +2. 对于其它数据源,会根据配置生成SQL语句, 通过[taos-jdbcdriver](https://www.taosdata.com/cn/documentation/connector/java)批量写入。 + +这样区分的原因是OpenTSDBReader将opentsdb的数据统一读取为json字符串,Writer端接收到的数据只有1列。而其它Reader插件一般会把数据放在不同列。 + +## 3 功能说明 +### 3.1 从OpenTSDB到TDengine +#### 3.1.1 配置样例 + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "opentsdbreader", + "parameter": { + "endpoint": "http://192.168.1.180:4242", + "column": [ + "weather_temperature" + ], + "beginDateTime": "2021-01-01 00:00:00", + "endDateTime": "2021-01-01 01:00:00" + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "host": "192.168.1.180", + "port": 6030, + "dbName": "test", + "user": "root", + "password": "taosdata" + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} +``` + +#### 3.1.2 参数说明 + +| 参数 | 描述 | 是否必选 | 默认值 | +| --------- | -------------------- | -------- | -------- | +| host | TDengine实例的host | 是 | 无 | +| port | TDengine实例的port | 是 | 无 | +| user | TDengine实例的用户名 | 否 | root | +| password | TDengine实例的密码 | 否 | taosdata | +| dbName | 目的数据库的名称 | 是 | 无 | +| batchSize | 每次批量插入多少记录 | 否 | 1 | + + +#### 3.1.3 类型转换 + +目前,由于OpenTSDBReader将opentsdb的数据统一读取为json字符串,TDengineWriter 在做Opentsdb到TDengine的迁移时,按照以下类型进行处理: + +| OpenTSDB数据类型 | DataX 内部类型 | TDengine 数据类型 | +| ---------------- | -------------- | ----------------- | +| timestamp | Date | timestamp | +| Integer(value) | Double | double | +| Float(value) | Double | double | +| String(value) | String | binary | +| Integer(tag) | String | binary | +| Float(tag) | String | binary | +| String(tag) | String | binary | + +### 3.2 从MongoDB到TDengine + +#### 3.2.1 配置样例 +```json +{ + "job": { + "setting": { + "speed": { + "channel": 2 + } + }, + "content": [ + { + "reader": { + "name": "mongodbreader", + "parameter": { + "address": [ + "127.0.0.1:27017" + ], + "userName": "user", + "mechanism": "SCRAM-SHA-1", + "userPassword": "password", + "authDb": "admin", + "dbName": "test", + "collectionName": "stock", + "column": [ + { + "name": "stockID", + "type": "string" + }, + { + "name": "tradeTime", + "type": "date" + }, + { + "name": "lastPrice", + "type": "double" + }, + { + "name": "askPrice1", + "type": "double" + }, + { + "name": "bidPrice1", + "type": "double" + }, + { + "name": "volume", + "type": "int" + } + ] + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "host": "localhost", + "port": 6030, + "dbName": "test", + "user": "root", + "password": "taosdata", + "stable": "stock", + "tagColumn": { + "industry": "energy", + "stockID": 0 + }, + "fieldColumn": { + "lastPrice": 2, + "askPrice1": 3, + "bidPrice1": 4, + "volume": 5 + }, + "timestampColumn": { + "tradeTime": 1 + } + } + } + } + ] + } +} +``` + +**注:本配置的writer部分同样适用于关系型数据库** + + +#### 3.2.2 参数说明 +| 参数 | 描述 | 是否必选 | 默认值 | 备注 | +| --------------- | -------------------- | ---------------- | -------- | ------------------ | +| host | TDengine实例的host | 是 | 无 | +| port | TDengine实例的port | 是 | 无 | +| user | TDengine实例的用户名 | 否 | root | +| password | TDengine实例的密码 | 否 | taosdata | +| dbName | 目的数据库的名称 | 是 | 无 | +| batchSize | 每次批量插入多少记录 | 否 | 1000 | +| stable | 目标超级表的名称 | 是(OpenTSDB除外) | 无 | +| tagColumn | 标签列的列名和位置 | 否 | 无 | 位置索引均从0开始 | +| fieldColumn | 字段列的列名和位置 | 否 | 无 | | +| timestampColumn | 时间戳列的列名和位置 | 否 | 无 | 时间戳列只能有一个 | + +#### 3.2.3 自动建表规则 +##### 3.2.3.1 超级表创建规则 + +如果配置了tagColumn、 fieldColumn和timestampColumn将会在插入第一条数据前,自动创建超级表。
+数据列的类型从第1条记录自动推断, 标签列默认类型为`NCHAR(64)`, 比如示例配置,可能生成以下建表语句: + +```sql +CREATE STABLE IF NOT EXISTS market_snapshot ( + tadetime TIMESTAMP, + lastprice DOUBLE, + askprice1 DOUBLE, + bidprice1 DOUBLE, + volume INT +) +TAGS( + industry NCHAR(64), + stockID NCHAR(64) +); +``` + +##### 3.2.3.2 子表创建规则 + +子表结构与超级表相同,子表表名生成规则: +1. 将标签的value 组合成为如下的字符串: `tag_value1!tag_value2!tag_value3`。 +2. 计算该字符串的 MD5 散列值 "md5_val"。 +3. "t_md5val"作为子表名。其中的 "t" 是固定的前缀。 + +#### 3.2.4 用户提前建表 + +如果你已经创建好目标超级表,那么tagColumn、 fieldColumn和timestampColumn三个字段均可省略, 插件将通过执行通过`describe stableName`获取表结构的信息。 +此时要求接收到的Record中Column的顺序和执行`describe stableName`返回的列顺序相同, 比如通过`describe stableName`返回以下内容: +``` + Field | Type | Length | Note | +================================================================================= + ts | TIMESTAMP | 8 | | + current | DOUBLE | 8 | | + location | BINARY | 10 | TAG | +``` +那么插件收到的数据第1列必须代表时间戳,第2列必须代表电流,第3列必须代表位置。 + +#### 3.2.5 注意事项 + +1. tagColumn、 fieldColumn和timestampColumn三个字段用于描述目标表的结构信息,这三个配置字段必须同时存在或同时省略。 +2. 如果存在以上三个配置,且目标表也已经存在,则两者必须一致。**一致性**由用户自己保证,插件不做检查。不一致可能会导致插入失败或插入数据错乱。 + +#### 3.2.6 类型转换 + +| MongoDB 数据类型 | DataX 内部类型 | TDengine 数据类型 | +| ---------------- | -------------- | ----------------- | +| int, Long | Long | BIGINT | +| double | Double | DOUBLE | +| string, array | String | NCHAR(64) | +| date | Date | TIMESTAMP | +| boolean | Boolean | BOOL | +| bytes | Bytes | BINARY(64) | + +### 3.3 从关系型数据库到TDengine +writer部分的配置规则和上述MongoDB的示例是一样的,这里给出一个MySQL的示例。 + +#### 3.3.1 MySQL中表结构 +```sql +CREATE TABLE IF NOT EXISTS weather( + station varchar(100), + latitude DOUBLE, + longtitude DOUBLE, + `date` DATE, + TMAX int, + TMIN int +) +``` + +#### 3.3.2 配置文件示例 + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "username": "root", + "password": "passw0rd", + "column": [ + "*" + ], + "splitPk": "station", + "connection": [ + { + "table": [ + "weather" + ], + "jdbcUrl": [ + "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8" + ] + } + ] + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "host": "127.0.0.1", + "port": 6030, + "dbName": "test", + "user": "root", + "password": "taosdata", + "batchSize": 1000, + "stable": "weather", + "tagColumn": { + "station": 0 + }, + "fieldColumn": { + "latitude": 1, + "longtitude": 2, + "tmax": 4, + "tmin": 5 + }, + "timestampColumn":{ + "date": 3 + } + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} +``` + + +## 4 性能报告 + +### 4.1 环境准备 + +#### 4.1.1 数据特征 + +建表语句: + +单行记录类似于: + +#### 4.1.2 机器参数 + +* 执行DataX的机器参数为: + 1. cpu: + 2. mem: + 3. net: 千兆双网卡 + 4. disc: DataX 数据不落磁盘,不统计此项 + +* TDengine数据库机器参数为: + 1. cpu: + 2. mem: + 3. net: 千兆双网卡 + 4. disc: + +#### 4.1.3 DataX jvm 参数 + + -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError + +### 4.2 测试报告 + +#### 4.2.1 单表测试报告 + +| 通道数 | DataX速度(Rec/s) | DataX流量(MB/s) | DataX机器网卡流出流量(MB/s) | DataX机器运行负载 | DB网卡进入流量(MB/s) | DB运行负载 | DB TPS | +| ------ | ---------------- | --------------- | --------------------------- | ----------------- | -------------------- | ---------- | ------ | +| 1 | | | | | | | | +| 4 | | | | | | | | +| 8 | | | | | | | | +| 16 | | | | | | | | +| 32 | | | | | | | | + +说明: + +1. 这里的单表,主键类型为 bigint(20),自增。 +2. batchSize 和 通道个数,对性能影响较大。 +3. 16通道,4096批量提交时,出现 full gc 2次。 + +#### 4.2.4 性能测试小结 + + +## 5 约束限制 + +1. 本插件自动创建超级表时NCHAR类型的长度固定为64,对于包含长度大于64的字符串的数据源,将不支持。 +2. 标签列不能包含null值,如果包含会被过滤掉。 + +## FAQ + +### 如何选取要同步的数据的范围? + +数据范围的选取在Reader插件端配置,对于不同的Reader插件配置方法往往不同。比如对于mysqlreader, 可以用sql语句指定数据范围。对于opentsdbreader, 用beginDateTime和endDateTime两个配置项指定数据范围。 + +### 如何一次导入多张源表? + +如果Reader插件支持一次读多张表,Writer插件就能一次导入多张表。如果Reader不支持多多张表,可以建多个job,分别导入。Writer插件只负责写数据。 + +### 一张源表导入之后对应TDengine中多少张表? + +这是由tagColumn决定的,如果所有tag列的值都相同,那么目标表只有一个。源表有多少不同的tag组合,目标超级表就有多少子表。 + +### 源表和目标表的字段顺序一致吗? + +TDengine要求每个表第一列是时间戳列,后边是普通字段,最后是标签列。如果源表不是这个顺序,插件在自动建表时会自动调整。 + +### 插件如何确定各列的数据类型? + +根据收到的第一批数据自动推断各列的类型。 + +### 为什么插入10年前的数据会抛异常`TDengine ERROR (2350): failed to execute batch bind` ? + +因为创建数据库的时候,默认保留10年的数据。可以手动指定要保留多长时间的数据,比如:`CREATE DATABASE power KEEP 36500;`。 \ No newline at end of file diff --git a/tdenginewriter/doc/tdenginewriter.md b/tdenginewriter/doc/tdenginewriter.md index 3e8a5f79..022bd212 100644 --- a/tdenginewriter/doc/tdenginewriter.md +++ b/tdenginewriter/doc/tdenginewriter.md @@ -1,24 +1,21 @@ # DataX TDengineWriter -简体中文| [English](./tdenginewriter-EN.md) +[简体中文](./tdenginewriter-CN.md) | English -## 1 快速介绍 +## 1 Quick Introduction -TDengineWriter插件实现了写入数据到TDengine数据库功能。可用于离线同步其它数据库的数据到TDengine。 +TDengineWriter Plugin writes data to [TDengine](https://www.taosdata.com/en/). It can be used to offline synchronize data from other databases to TDengine. -## 2 实现原理 +## 2 Implementation -TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据,根据reader的类型解析数据。目前有两种写入方式: +TDengineWriter get records from DataX Framework that are generated from reader side. It has two whiting strategies: -1. 对于OpenTSDBReader, TDengineWriter通过JNI方式调用TDengine客户端库文件(taos.lib或taos.dll)中的方法,使用[schemaless的方式](https://www.taosdata.com/cn/documentation/insert#schemaless)写入。 +1. For data from OpenTSDBReader which is in json format, to leverage the new feature of TDengine Server that support writing json data directly called [schemaless writing](https://www.taosdata.com/cn/documentation/insert#schemaless), we use JNI to call functions in `taos.lib` or `taos.dll`.(Since the feature was not included in taos-jdbcdrive until version 2.0.36). +2. For other data sources, we use [taos-jdbcdriver](https://www.taosdata.com/cn/documentation/connector/java) to write data. If the target table is not exists beforehand, then it will be created automatically according to your configuration. -2. 对于其它数据源,会根据配置生成SQL语句, 通过[taos-jdbcdriver](https://www.taosdata.com/cn/documentation/connector/java)批量写入。 - -这样区分的原因是OpenTSDBReader将opentsdb的数据统一读取为json字符串,Writer端接收到的数据只有1列。而其它Reader插件一般会把数据放在不同列。 - -## 3 功能说明 -### 3.1 从OpenTSDB到TDengine -#### 3.1.1 配置样例 +## 3 Features Introduction +### 3.1 From OpenTSDB to TDengine +#### 3.1.1 Sample Setting ```json { @@ -41,7 +38,7 @@ TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据,根据rea "parameter": { "host": "192.168.1.180", "port": 6030, - "dbname": "test", + "dbName": "test", "user": "root", "password": "taosdata" } @@ -57,35 +54,33 @@ TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据,根据rea } ``` -#### 3.1.2 参数说明 +#### 3.1.2 Configuration -| 参数 | 描述 | 是否必选 | 默认值 | -| --------- | -------------------- | -------- | -------- | -| host | TDengine实例的host | 是 | 无 | -| port | TDengine实例的port | 是 | 无 | -| user | TDengine实例的用户名 | 否 | root | -| password | TDengine实例的密码 | 否 | taosdata | -| dbname | 目的数据库的名称 | 是 | 无 | -| batchSize | 每次批量插入多少记录 | 否 | 1 | +| Parameter | Description | Required | Default | +| --------- | ------------------------------ | -------- | -------- | +| host | host of TDengine | Yes | | +| port | port of TDengine | Yes | | +| user | use name of TDengine | No | root | +| password | password of TDengine | No | taosdata | +| dbName | name of target database | No | | +| batchSize | batch size of insert operation | No | 1 | -#### 3.1.3 类型转换 +#### 3.1.3 Type Convert -目前,由于OpenTSDBReader将opentsdb的数据统一读取为json字符串,TDengineWriter 在做Opentsdb到TDengine的迁移时,按照以下类型进行处理: +| OpenTSDB Type | DataX Type | TDengine Type | +| ---------------- | ---------- | ------------- | +| timestamp | Date | timestamp | +| Integer(value) | Double | double | +| Float(value) | Double | double | +| String(value) | String | binary | +| Integer(tag) | String | binary | +| Float(tag) | String | binary | +| String(tag) | String | binary | -| OpenTSDB数据类型 | DataX 内部类型 | TDengine 数据类型 | -| ---------------- | -------------- | ----------------- | -| timestamp | Date | timestamp | -| Integer(value) | Double | double | -| Float(value) | Double | double | -| String(value) | String | binary | -| Integer(tag) | String | binary | -| Float(tag) | String | binary | -| String(tag) | String | binary | +### 3.2 From MongoDB to TDengine -### 3.2 从MongoDB到TDengine - -#### 3.2.1 配置样例 +#### 3.2.1 Sample Setting ```json { "job": { @@ -141,7 +136,7 @@ TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据,根据rea "parameter": { "host": "localhost", "port": 6030, - "dbname": "test", + "dbName": "test", "user": "root", "password": "taosdata", "stable": "stock", @@ -166,28 +161,29 @@ TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据,根据rea } ``` -**注:本配置的writer部分同样适用于关系型数据库** +**Note:the writer part of this setting can also apply to other data source except for OpenTSDB ** -#### 3.2.2 参数说明 -| 参数 | 描述 | 是否必选 | 默认值 | 备注 | -| --------------- | -------------------- | ---------------- | -------- | ------------------ | -| host | TDengine实例的host | 是 | 无 | -| port | TDengine实例的port | 是 | 无 | -| user | TDengine实例的用户名 | 否 | root | -| password | TDengine实例的密码 | 否 | taosdata | -| dbname | 目的数据库的名称 | 是 | 无 | -| batchSize | 每次批量插入多少记录 | 否 | 1000 | -| stable | 目标超级表的名称 | 是(OpenTSDB除外) | 无 | -| tagColumn | 标签列的列名和位置 | 否 | 无 | 位置索引均从0开始 | -| fieldColumn | 字段列的列名和位置 | 否 | 无 | | -| timestampColumn | 时间戳列的列名和位置 | 否 | 无 | 时间戳列只能有一个 | +#### 3.2.2 Configuration -#### 3.2.3 自动建表规则 -##### 3.2.3.1 超级表创建规则 +| Parameter | Description | Required | Default | Remark | +| --------------- | --------------------------------------------------------------- | ------------------------ | -------- | ------------------- | +| host | host ofTDengine | Yes | | +| port | port of TDengine | Yes | | +| user | user name of TDengine | No | root | +| password | password of TDengine | No | taosdata | +| dbName | name of target database | Yes | | +| batchSize | batch size of insert operation | No | 1000 | +| stable | name of target super table | Yes(except for OpenTSDB) | | +| tagColumn | name and position of tag columns in the record from reader | No | | index starts with 0 | +| fieldColumn | name and position of data columns in the record from reader | No | | | +| timestampColumn | name and position of timestamp column in the record from reader | No | | | -如果配置了tagColumn、 fieldColumn和timestampColumn将会在插入第一条数据前,自动创建超级表。
-数据列的类型从第1条记录自动推断, 标签列默认类型为`NCHAR(64)`, 比如示例配置,可能生成以下建表语句: +#### 3.2.3 Auto table creating +##### 3.2.3.1 Rules + +If all of `tagColumn`, `fieldColumn` and `timestampColumn` are offered in writer configuration, then target super table will be created automatically. +The type of tag columns will always be `NCHAR(64)`. The sample setting above will produce following sql: ```sql CREATE STABLE IF NOT EXISTS market_snapshot ( @@ -203,21 +199,17 @@ TAGS( ); ``` -##### 3.2.3.2 子表创建规则 +##### 3.2.3.2 Sub-table Creating Rules -<<<<<<< HEAD -子表结果与超表相同,子表表名生成规则: -======= -子表结构与超级表相同,子表表名生成规则: ->>>>>>> TD-11503/english-doc-for-writer -1. 将标签的value 组合成为如下的字符串: `tag_value1!tag_value2!tag_value3`。 -2. 计算该字符串的 MD5 散列值 "md5_val"。 -3. "t_md5val"作为子表名。其中的 "t" 是固定的前缀。 +The structure of sub-tables are the same with structure of super table. The names of sub-tables are generated by rules below: +1. combine value of tags like this:`tag_value1!tag_value2!tag_value3`. +2. compute md5 hash hex of above string, named `md5val` +3. use "t_md5val" as sub-table name, in which "t" is fixed prefix. -#### 3.2.4 用户提前建表 +#### 3.2.4 Use Pre-created Table -如果你已经创建好目标超级表,那么tagColumn、 fieldColumn和timestampColumn三个字段均可省略, 插件将通过执行通过`describe stableName`获取表结构的信息。 -此时要求接收到的Record中Column的顺序和执行`describe stableName`返回的列顺序相同, 比如通过`describe stableName`返回以下内容: +If you have created super table firstly, then all of tagColumn, fieldColumn and timestampColumn can be omitted. The writer plugin will get table schema by executing `describe stableName`. +The order of columns of records received by this plugin must be the same as the order of columns returned by `describe stableName`. For example, if you have super table as below: ``` Field | Type | Length | Note | ================================================================================= @@ -225,32 +217,29 @@ TAGS( current | DOUBLE | 8 | | location | BINARY | 10 | TAG | ``` -那么插件收到的数据第1列必须代表时间戳,第2列必须代表电流,第3列必须代表位置。 +Then the first columns received by this writer plugin must represent timestamp, the second column must represent current with type double, the third column must represent location with internal type string. -#### 3.2.5 注意事项 +#### 3.2.5 Remarks -1. tagColumn、 fieldColumn和timestampColumn三个字段用于描述目标表的结构信息,这三个配置字段必须同时存在或同时省略。 -2. 如果存在以上三个配置,且目标表也已经存在,则两者必须一致。**一致性**由用户自己保证,插件不做检查。不一致可能会导致插入失败或插入数据错乱。 -<<<<<<< HEAD -3. 插件优先使用配置文件中指定的表结构。 -======= ->>>>>>> TD-11503/english-doc-for-writer +1. Config keys --tagColumn, fieldColumn and timestampColumn, must be presented or omitted at the same time. +2. If above three config keys exist and the target table also exists, then the order of columns defined by the config file and the existed table must be the same. -#### 3.2.6 类型转换 +#### 3.2.6 Type Convert -| MongoDB 数据类型 | DataX 内部类型 | TDengine 数据类型 | +| MongoDB Type | DataX Type | TDengine Type | | ---------------- | -------------- | ----------------- | | int, Long | Long | BIGINT | | double | Double | DOUBLE | | string, array | String | NCHAR(64) | | date | Date | TIMESTAMP | | boolean | Boolean | BOOL | -| bytes | Bytes | BINARY | +| bytes | Bytes | BINARY(64) | -### 3.3 从关系型数据库到TDengine -writer部分的配置规则和上述MongoDB的示例是一样的,这里给出一个MySQL的示例。 +### 3.3 From Relational Database to TDengine -#### 3.3.1 MySQL中表结构 +Take MySQl as example. + +#### 3.3.1 Table Structure in MySQL ```sql CREATE TABLE IF NOT EXISTS weather( station varchar(100), @@ -262,7 +251,7 @@ CREATE TABLE IF NOT EXISTS weather( ) ``` -#### 3.3.2 配置文件示例 +#### 3.3.2 Sample Setting ```json { @@ -295,7 +284,7 @@ CREATE TABLE IF NOT EXISTS weather( "parameter": { "host": "127.0.0.1", "port": 6030, - "dbname": "test", + "dbName": "test", "user": "root", "password": "taosdata", "batchSize": 1000, @@ -326,90 +315,35 @@ CREATE TABLE IF NOT EXISTS weather( ``` -## 4 性能报告 +## 4 Performance Test -### 4.1 环境准备 +## 5 Restriction -#### 4.1.1 数据特征 - -建表语句: - -单行记录类似于: - -#### 4.1.2 机器参数 - -* 执行DataX的机器参数为: - 1. cpu: - 2. mem: - 3. net: 千兆双网卡 - 4. disc: DataX 数据不落磁盘,不统计此项 - -* TDengine数据库机器参数为: - 1. cpu: - 2. mem: - 3. net: 千兆双网卡 - 4. disc: - -#### 4.1.3 DataX jvm 参数 - - -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError - -### 4.2 测试报告 - -#### 4.2.1 单表测试报告 - -| 通道数 | DataX速度(Rec/s) | DataX流量(MB/s) | DataX机器网卡流出流量(MB/s) | DataX机器运行负载 | DB网卡进入流量(MB/s) | DB运行负载 | DB TPS | -| ------ | ---------------- | --------------- | --------------------------- | ----------------- | -------------------- | ---------- | ------ | -| 1 | | | | | | | | -| 4 | | | | | | | | -| 8 | | | | | | | | -| 16 | | | | | | | | -| 32 | | | | | | | | - -说明: - -1. 这里的单表,主键类型为 bigint(20),自增。 -2. batchSize 和 通道个数,对性能影响较大。 -3. 16通道,4096批量提交时,出现 full gc 2次。 - -#### 4.2.4 性能测试小结 - - -## 5 约束限制 - -1. 本插件自动创建超级表时NCHAR类型的长度固定为64,对于包含长度大于64的字符串的数据源,将不支持。 -2. 标签列不能包含null值,如果包含会被过滤掉。 +1. NCHAR type has fixed length 64 when auto creating stable. +2. Rows have null tag values will be dropped. ## FAQ -### 如何选取要同步的数据的范围? +### How to filter on source table? -数据范围的选取在Reader插件端配置,对于不同的Reader插件配置方法往往不同。比如对于mysqlreader, 可以用sql语句指定数据范围。对于opentsdbreader, 用beginDateTime和endDateTime两个配置项指定数据范围。 +It depends on reader plugin. For different reader plugins, the way may be different. -### 如何一次导入多张源表? +### How to import multiple source tables at once? -如果Reader插件支持一次读多张表,Writer插件就能一次导入多张表。如果Reader不支持多多张表,可以建多个job,分别导入。Writer插件只负责写数据。 +It depends on reader plugin. If the reader plugin supports reading multiple tables at once, then there is no problem. -### 一张源表导入之后对应TDengine中多少张表? +### How many sub-tables will be produced? -<<<<<<< HEAD -这是由tagColumn决定的,如果所有tag列的值都相同,那么目标表只有一个。源表有多少不同的tag组合,目标超表就有多少子表。 +The number of sub-tables is determined by tagColumns, equals to the number of different combinations of tag values. -### 源表和目标表的字段顺序一致吗? +### Do columns in source table and columns in target table must be in the same order? -TDengine要求每个表第一列是时间戳列,后边是普通字段,最后是标签列。如果源表不是这个顺序,插件在自动建表是自动调整。 -======= -这是由tagColumn决定的,如果所有tag列的值都相同,那么目标表只有一个。源表有多少不同的tag组合,目标超级表就有多少子表。 +No. TDengine require the first column has timestamp type,which is followed by data columns, followed by tag columns. The writer plugin will create super table in this column order, regardless of origin column orders. -### 源表和目标表的字段顺序一致吗? +### How dose the plugin infer the data type of incoming data? -TDengine要求每个表第一列是时间戳列,后边是普通字段,最后是标签列。如果源表不是这个顺序,插件在自动建表时会自动调整。 ->>>>>>> TD-11503/english-doc-for-writer +By the first batch of records it received. -### 插件如何确定各列的数据类型? +### Why can't I insert data of 10 years ago? Do this will get error: `TDengine ERROR (2350): failed to execute batch bind`. -根据收到的第一批数据自动推断各列的类型。 - -### 为什么插入10年前的数据会抛异常`TDengine ERROR (2350): failed to execute batch bind` ? - -因为创建数据库的时候,默认保留10年的数据。可以手动指定要保留多长时间的数据,比如:`CREATE DATABASE power KEEP 36500;`。 \ No newline at end of file +Because the database you created only keep 10 years data by default, you can create table like this: `CREATE DATABASE power KEEP 36500;`, in order to enlarge the time period to 100 years. \ No newline at end of file diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java index 090a7999..a061e97f 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java @@ -3,7 +3,7 @@ package com.alibaba.datax.plugin.writer.tdenginewriter; public class Key { public static final String HOST = "host"; public static final String PORT = "port"; - public static final String DBNAME = "dbname"; + public static final String DBNAME = "dbName"; public static final String USER = "user"; public static final String PASSWORD = "password"; public static final String BATCH_SIZE = "batchSize"; From 5d58600e44160dedf68f550330147951f219a1a7 Mon Sep 17 00:00:00 2001 From: dingbo Date: Fri, 3 Dec 2021 16:04:51 +0800 Subject: [PATCH 7/9] add tdenginewriter to package.xml --- package.xml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/package.xml b/package.xml index 882dd23b..d95feb40 100755 --- a/package.xml +++ b/package.xml @@ -189,6 +189,13 @@ datax + + tdenginewriter/target/datax/ + + **/*.* + + datax + drdswriter/target/datax/ From 1372a653cf40f5ca6d04c7ff050878af04bd9815 Mon Sep 17 00:00:00 2001 From: dingbo Date: Fri, 3 Dec 2021 16:34:19 +0800 Subject: [PATCH 8/9] delete duplicate en doc --- tdenginewriter/doc/tdenginewriter-EN.md | 349 ------------------------ 1 file changed, 349 deletions(-) delete mode 100644 tdenginewriter/doc/tdenginewriter-EN.md diff --git a/tdenginewriter/doc/tdenginewriter-EN.md b/tdenginewriter/doc/tdenginewriter-EN.md deleted file mode 100644 index eda88a9f..00000000 --- a/tdenginewriter/doc/tdenginewriter-EN.md +++ /dev/null @@ -1,349 +0,0 @@ -# DataX TDengineWriter - -[简体中文](./tdenginewriter.md) | English - -## 1 Quick Introduction - -TDengineWriter Plugin writes data to [TDengine](https://www.taosdata.com/en/). It can be used to offline synchronize data from other databases to TDengine. - -## 2 Implementation - -TDengineWriter get records from DataX Framework that are generated from reader side. It has two whiting strategies: - -1. For data from OpenTSDBReader which is in json format, to leverage the new feature of TDengine Server that support writing json data directly called [schemaless writing](https://www.taosdata.com/cn/documentation/insert#schemaless), we use JNI to call functions in `taos.lib` or `taos.dll`.(Since the feature was not included in taos-jdbcdrive until version 2.0.36). -2. For other data sources, we use [taos-jdbcdriver](https://www.taosdata.com/cn/documentation/connector/java) to write data. If the target table is not exists beforehand, then it will be created automatically according to your configuration. - -## 3 Features Introduction -### 3.1 From OpenTSDB to TDengine -#### 3.1.1 Sample Setting - -```json -{ - "job": { - "content": [ - { - "reader": { - "name": "opentsdbreader", - "parameter": { - "endpoint": "http://192.168.1.180:4242", - "column": [ - "weather_temperature" - ], - "beginDateTime": "2021-01-01 00:00:00", - "endDateTime": "2021-01-01 01:00:00" - } - }, - "writer": { - "name": "tdenginewriter", - "parameter": { - "host": "192.168.1.180", - "port": 6030, - "dbname": "test", - "user": "root", - "password": "taosdata" - } - } - } - ], - "setting": { - "speed": { - "channel": 1 - } - } - } -} -``` - -#### 3.1.2 Configuration - -| Parameter | Description | Required | Default | -| --------- | ------------------------------ | -------- | -------- | -| host | host of TDengine | Yes | | -| port | port of TDengine | Yes | | -| user | use name of TDengine | No | root | -| password | password of TDengine | No | taosdata | -| dbname | name of target database | No | | -| batchSize | batch size of insert operation | No | 1 | - - -#### 3.1.3 Type Convert - -| OpenTSDB Type | DataX Type | TDengine Type | -| ---------------- | ---------- | ------------- | -| timestamp | Date | timestamp | -| Integer(value) | Double | double | -| Float(value) | Double | double | -| String(value) | String | binary | -| Integer(tag) | String | binary | -| Float(tag) | String | binary | -| String(tag) | String | binary | - -### 3.2 From MongoDB to TDengine - -#### 3.2.1 Sample Setting -```json -{ - "job": { - "setting": { - "speed": { - "channel": 2 - } - }, - "content": [ - { - "reader": { - "name": "mongodbreader", - "parameter": { - "address": [ - "127.0.0.1:27017" - ], - "userName": "user", - "mechanism": "SCRAM-SHA-1", - "userPassword": "password", - "authDb": "admin", - "dbName": "test", - "collectionName": "stock", - "column": [ - { - "name": "stockID", - "type": "string" - }, - { - "name": "tradeTime", - "type": "date" - }, - { - "name": "lastPrice", - "type": "double" - }, - { - "name": "askPrice1", - "type": "double" - }, - { - "name": "bidPrice1", - "type": "double" - }, - { - "name": "volume", - "type": "int" - } - ] - } - }, - "writer": { - "name": "tdenginewriter", - "parameter": { - "host": "localhost", - "port": 6030, - "dbname": "test", - "user": "root", - "password": "taosdata", - "stable": "stock", - "tagColumn": { - "industry": "energy", - "stockID": 0 - }, - "fieldColumn": { - "lastPrice": 2, - "askPrice1": 3, - "bidPrice1": 4, - "volume": 5 - }, - "timestampColumn": { - "tradeTime": 1 - } - } - } - } - ] - } -} -``` - -**Note:the writer part of this setting can also apply to other data source except for OpenTSDB ** - - -#### 3.2.2 Configuration - -| Parameter | Description | Required | Default | Remark | -| --------------- | --------------------------------------------------------------- | ------------------------ | -------- | ------------------- | -| host | host ofTDengine | Yes | | -| port | port of TDengine | Yes | | -| user | user name of TDengine | No | root | -| password | password of TDengine | No | taosdata | -| dbname | name of target database | Yes | | -| batchSize | batch size of insert operation | No | 1000 | -| stable | name of target super table | Yes(except for OpenTSDB) | | -| tagColumn | name and position of tag columns in the record from reader | No | | index starts with 0 | -| fieldColumn | name and position of data columns in the record from reader | No | | | -| timestampColumn | name and position of timestamp column in the record from reader | No | | | - -#### 3.2.3 Auto table creating -##### 3.2.3.1 Rules - -If all of `tagColumn`, `fieldColumn` and `timestampColumn` are offered in writer configuration, then target super table will be created automatically. -The type of tag columns will always be `NCHAR(64)`. The sample setting above will produce following sql: - -```sql -CREATE STABLE IF NOT EXISTS market_snapshot ( - tadetime TIMESTAMP, - lastprice DOUBLE, - askprice1 DOUBLE, - bidprice1 DOUBLE, - volume INT -) -TAGS( - industry NCHAR(64), - stockID NCHAR(64) -); -``` - -##### 3.2.3.2 Sub-table Creating Rules - -The structure of sub-tables are the same with structure of super table. The names of sub-tables are generated by rules below: -1. combine value of tags like this:`tag_value1!tag_value2!tag_value3`. -2. compute md5 hash hex of above string, named `md5val` -3. use "t_md5val" as sub-table name, in which "t" is fixed prefix. - -#### 3.2.4 Use Pre-created Table - -If you have created super table firstly, then all of tagColumn, fieldColumn and timestampColumn can be omitted. The writer plugin will get table schema by executing `describe stableName`. -The order of columns of records received by this plugin must be the same as the order of columns returned by `describe stableName`. For example, if you have super table as below: -``` - Field | Type | Length | Note | -================================================================================= - ts | TIMESTAMP | 8 | | - current | DOUBLE | 8 | | - location | BINARY | 10 | TAG | -``` -Then the first columns received by this writer plugin must represent timestamp, the second column must represent current with type double, the third column must represent location with internal type string. - -#### 3.2.5 Remarks - -1. Config keys --tagColumn, fieldColumn and timestampColumn, must be presented or omitted at the same time. -2. If above three config keys exist and the target table also exists, then the order of columns defined by the config file and the existed table must be the same. - -#### 3.2.6 Type Convert - -| MongoDB Type | DataX Type | TDengine Type | -| ---------------- | -------------- | ----------------- | -| int, Long | Long | BIGINT | -| double | Double | DOUBLE | -| string, array | String | NCHAR(64) | -| date | Date | TIMESTAMP | -| boolean | Boolean | BOOL | -| bytes | Bytes | BINARY | - -### 3.3 From Relational Database to TDengine - -Take MySQl as example. - -#### 3.3.1 Table Structure in MySQL -```sql -CREATE TABLE IF NOT EXISTS weather( - station varchar(100), - latitude DOUBLE, - longtitude DOUBLE, - `date` DATE, - TMAX int, - TMIN int -) -``` - -#### 3.3.2 Sample Setting - -```json -{ - "job": { - "content": [ - { - "reader": { - "name": "mysqlreader", - "parameter": { - "username": "root", - "password": "passw0rd", - "column": [ - "*" - ], - "splitPk": "station", - "connection": [ - { - "table": [ - "weather" - ], - "jdbcUrl": [ - "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8" - ] - } - ] - } - }, - "writer": { - "name": "tdenginewriter", - "parameter": { - "host": "127.0.0.1", - "port": 6030, - "dbname": "test", - "user": "root", - "password": "taosdata", - "batchSize": 1000, - "stable": "weather", - "tagColumn": { - "station": 0 - }, - "fieldColumn": { - "latitude": 1, - "longtitude": 2, - "tmax": 4, - "tmin": 5 - }, - "timestampColumn":{ - "date": 3 - } - } - } - } - ], - "setting": { - "speed": { - "channel": 1 - } - } - } -} -``` - - -## 4 Performance Test - -## 5 Restriction - -1. NCHAR type has fixed length 64 when auto creating stable. -2. Rows have null tag values will be dropped. - -## FAQ - -### How to filter on source table? - -It depends on reader plugin. For different reader plugins, the way may be different. - -### How to import multiple source tables at once? - -It depends on reader plugin. If the reader plugin supports reading multiple tables at once, then there is no problem. - -### How many sub-tables will be produced? - -The number of sub-tables is determined by tagColumns, equals to the number of different combinations of tag values. - -### Do columns in source table and columns in target table must be in the same order? - -No. TDengine require the first column has timestamp type,which is followed by data columns, followed by tag columns. The writer plugin will create super table in this column order, regardless of origin column orders. - -### How dose the plugin infer the data type of incoming data? - -By the first batch of records it received. - -### Why can't I insert data of 10 years ago? Do this will get error: `TDengine ERROR (2350): failed to execute batch bind`. - -Because the database you created only keep 10 years data by default, you can create table like this: `CREATE DATABASE power KEEP 36500;`, in order to enlarge the time period to 100 years. \ No newline at end of file From 75c032c4f93c3662ac1870819e6836888a8ec846 Mon Sep 17 00:00:00 2001 From: dingbo Date: Fri, 3 Dec 2021 17:09:50 +0800 Subject: [PATCH 9/9] refine doc --- tdenginewriter/doc/tdenginewriter-CN.md | 14 ++++++++++---- tdenginewriter/doc/tdenginewriter.md | 13 ++++++++++--- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/tdenginewriter/doc/tdenginewriter-CN.md b/tdenginewriter/doc/tdenginewriter-CN.md index ffd3efc9..abd92de5 100644 --- a/tdenginewriter/doc/tdenginewriter-CN.md +++ b/tdenginewriter/doc/tdenginewriter-CN.md @@ -179,9 +179,11 @@ TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据,根据rea | dbName | 目的数据库的名称 | 是 | 无 | | batchSize | 每次批量插入多少记录 | 否 | 1000 | | stable | 目标超级表的名称 | 是(OpenTSDB除外) | 无 | -| tagColumn | 标签列的列名和位置 | 否 | 无 | 位置索引均从0开始 | -| fieldColumn | 字段列的列名和位置 | 否 | 无 | | -| timestampColumn | 时间戳列的列名和位置 | 否 | 无 | 时间戳列只能有一个 | +| tagColumn | 格式:{tagName1: tagInd1, tagName2: tagInd2}, 标签列在写插件收到的Record中的位置和列名 | 否 | 无 | 位置索引均从0开始, tagInd如果为字符串, 表示固定标签值,不需要从源数据中获取 | +| fieldColumn | 格式:{fdName1: fdInd1, fdName2: fdInd2}, 字段列在写插件收到的Record中的位置和列名 | 否 | 无 | | +| timestampColumn | 格式:{tsColName: tsColIndex}, 时间戳列在写插件收到的Record中的位置和列名 | 否 | 无 | 时间戳列只能有一个 | + +示例配置中tagColumn有一个industry,它的值是一个固定的字符串“energy”, 作用是给导入的所有数据加一个值为"energy"的固定标签industry。这个应用场景可以是:在源库中,有多个设备采集的数据分表存储,设备名就是表名,可以用这个机制把设备名称转化为标签。 #### 3.2.3 自动建表规则 ##### 3.2.3.1 超级表创建规则 @@ -396,4 +398,8 @@ TDengine要求每个表第一列是时间戳列,后边是普通字段,最后 ### 为什么插入10年前的数据会抛异常`TDengine ERROR (2350): failed to execute batch bind` ? -因为创建数据库的时候,默认保留10年的数据。可以手动指定要保留多长时间的数据,比如:`CREATE DATABASE power KEEP 36500;`。 \ No newline at end of file +因为创建数据库的时候,默认保留10年的数据。可以手动指定要保留多长时间的数据,比如:`CREATE DATABASE power KEEP 36500;`。 + +### 如果编译的时候某些插件的依赖找不到怎么办? + +如果这个插件不是必须的,可以注释掉根目录下的pom.xml中的对应插件。 \ No newline at end of file diff --git a/tdenginewriter/doc/tdenginewriter.md b/tdenginewriter/doc/tdenginewriter.md index 022bd212..fd190570 100644 --- a/tdenginewriter/doc/tdenginewriter.md +++ b/tdenginewriter/doc/tdenginewriter.md @@ -175,10 +175,12 @@ TDengineWriter get records from DataX Framework that are generated from reader s | dbName | name of target database | Yes | | | batchSize | batch size of insert operation | No | 1000 | | stable | name of target super table | Yes(except for OpenTSDB) | | -| tagColumn | name and position of tag columns in the record from reader | No | | index starts with 0 | -| fieldColumn | name and position of data columns in the record from reader | No | | | +| tagColumn | name and position of tag columns in the record from reader, format:{tagName1: tagInd1, tagName2: tagInd2} | No | | index starts with 0 | +| fieldColumn | name and position of data columns in the record from reader, format: {fdName1: fdInd1, fdName2: fdInd2} | No | | | | timestampColumn | name and position of timestamp column in the record from reader | No | | | +**Note**: You see that the value of tagColumn "industry" is a fixed string, this ia a good feature of this plugin. Think about this scenario: you have many tables with the structure and one table corresponds to one device. You want to use the device number as a tag in the target super table, then this feature is designed for you. + #### 3.2.3 Auto table creating ##### 3.2.3.1 Rules @@ -346,4 +348,9 @@ By the first batch of records it received. ### Why can't I insert data of 10 years ago? Do this will get error: `TDengine ERROR (2350): failed to execute batch bind`. -Because the database you created only keep 10 years data by default, you can create table like this: `CREATE DATABASE power KEEP 36500;`, in order to enlarge the time period to 100 years. \ No newline at end of file +Because the database you created only keep 10 years data by default, you can create table like this: `CREATE DATABASE power KEEP 36500;`, in order to enlarge the time period to 100 years. + + +### What should I do if some dependencies of a plugin can't be found? + +I this plugin is not necessary for you, just remove it from pom.xml under project's root directory.