From dd19fd4332289c858042de30f86467fc14574d38 Mon Sep 17 00:00:00 2001 From: asdf2014 Date: Fri, 8 Nov 2019 16:52:39 +0800 Subject: [PATCH] Add TSDB Reader --- pom.xml | 1 + tsdbreader/doc/tsdbreader.md | 587 ++++++++++++++++++ tsdbreader/pom.xml | 146 +++++ tsdbreader/src/main/assembly/package.xml | 35 ++ .../plugin/reader/tsdbreader/Constant.java | 29 + .../datax/plugin/reader/tsdbreader/Key.java | 36 ++ .../plugin/reader/tsdbreader/TSDBReader.java | 320 ++++++++++ .../tsdbreader/TSDBReaderErrorCode.java | 40 ++ .../tsdbreader/conn/Connection4TSDB.java | 88 +++ .../conn/DataPoint4MultiFieldsTSDB.java | 68 ++ .../tsdbreader/conn/DataPoint4TSDB.java | 68 ++ .../conn/MultiFieldQueryResult.java | 64 ++ .../reader/tsdbreader/conn/QueryResult.java | 64 ++ .../tsdbreader/conn/TSDBConnection.java | 94 +++ .../reader/tsdbreader/conn/TSDBDump.java | 318 ++++++++++ .../reader/tsdbreader/util/HttpUtils.java | 67 ++ .../reader/tsdbreader/util/TSDBUtils.java | 68 ++ .../reader/tsdbreader/util/TimeUtils.java | 38 ++ tsdbreader/src/main/resources/plugin.json | 10 + .../main/resources/plugin_job_template.json | 29 + .../tsdbreader/conn/TSDBConnectionTest.java | 30 + .../plugin/reader/tsdbreader/util/Const.java | 17 + .../reader/tsdbreader/util/HttpUtilsTest.java | 39 ++ .../reader/tsdbreader/util/TimeUtilsTest.java | 33 + 24 files changed, 2289 insertions(+) create mode 100644 tsdbreader/doc/tsdbreader.md create mode 100644 tsdbreader/pom.xml create mode 100755 tsdbreader/src/main/assembly/package.xml create mode 100644 tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Constant.java create mode 100644 tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Key.java create mode 100755 tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReader.java create mode 100755 tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReaderErrorCode.java create mode 100644 tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/Connection4TSDB.java create mode 100644 tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/DataPoint4MultiFieldsTSDB.java create mode 100644 tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/DataPoint4TSDB.java create mode 100644 tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/MultiFieldQueryResult.java create mode 100644 tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/QueryResult.java create mode 100644 tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnection.java create mode 100644 tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBDump.java create mode 100644 tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtils.java create mode 100644 tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TSDBUtils.java create mode 100644 tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TimeUtils.java create mode 100755 tsdbreader/src/main/resources/plugin.json create mode 100644 tsdbreader/src/main/resources/plugin_job_template.json create mode 100644 tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnectionTest.java create mode 100644 tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/util/Const.java create mode 100644 tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtilsTest.java create mode 100644 tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TimeUtilsTest.java diff --git a/pom.xml b/pom.xml index ece9491e..9463f2bf 100755 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,7 @@ rdbmsreader hbase11xreader hbase094xreader + tsdbreader opentsdbreader cassandrareader diff --git a/tsdbreader/doc/tsdbreader.md b/tsdbreader/doc/tsdbreader.md new file mode 100644 index 00000000..557dcc51 --- /dev/null +++ b/tsdbreader/doc/tsdbreader.md @@ -0,0 +1,587 @@ + +# TSDBReader 插件文档 + +___ + + +## 1 快速介绍 + +TSDBReader 插件实现了从阿里云 TSDB 读取数据。阿里云时间序列数据库 ( **T**ime **S**eries **D**ata**b**ase , 简称 TSDB) 是一种集时序数据高效读写,压缩存储,实时计算能力为一体的数据库服务,可广泛应用于物联网和互联网领域,实现对设备及业务服务的实时监控,实时预测告警。详见 TSDB 的阿里云[官网](https://cn.aliyun.com/product/hitsdb)。 + + + +## 2 实现原理 + +在底层实现上,TSDBReader 通过 HTTP 请求链接到 阿里云 TSDB 实例,利用 `/api/query` 或者 `/api/mquery` 接口将数据点扫描出来(更多细节详见:[时序数据库 TSDB - HTTP API 概览](https://help.aliyun.com/document_detail/63557.html))。而整个同步的过程,是通过时间线和查询时间线范围进行切分。 + + + +## 3 功能说明 + +### 3.1 配置样例 + +* 配置一个从 阿里云 TSDB 数据库同步抽取数据到本地的作业,并以**时序数据**的格式输出: + +时序数据样例: +```json +{"metric":"m","tags":{"app":"a19","cluster":"c5","group":"g10","ip":"i999","zone":"z1"},"timestamp":1546272263,"value":1} +``` + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "tsdbreader", + "parameter": { + "sinkDbType": "TSDB", + "endpoint": "http://localhost:8242", + "column": [ + "m" + ], + "splitIntervalMs": 60000, + "beginDateTime": "2019-01-01 00:00:00", + "endDateTime": "2019-01-01 01:00:00" + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "encoding": "UTF-8", + "print": true + } + } + } + ], + "setting": { + "speed": { + "channel": 3 + } + } + } +} +``` + +* 配置一个从 阿里云 TSDB 数据库同步抽取数据到本地的作业,并以**关系型数据**的格式输出: + +关系型数据样例: +```txt +m 1546272125 a1 c1 g2 i3021 z4 1.0 +``` + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "tsdbreader", + "parameter": { + "sinkDbType": "RDB", + "endpoint": "http://localhost:8242", + "column": [ + "__metric__", + "__ts__", + "app", + "cluster", + "group", + "ip", + "zone", + "__value__" + ], + "metric": [ + "m" + ], + "splitIntervalMs": 60000, + "beginDateTime": "2019-01-01 00:00:00", + "endDateTime": "2019-01-01 01:00:00" + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "encoding": "UTF-8", + "print": true + } + } + } + ], + "setting": { + "speed": { + "channel": 3 + } + } + } +} +``` + +* 配置一个从 阿里云 TSDB 数据库同步抽取**单值**数据到 ADB 的作业: + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "tsdbreader", + "parameter": { + "sinkDbType": "RDB", + "endpoint": "http://localhost:8242", + "column": [ + "__metric__", + "__ts__", + "app", + "cluster", + "group", + "ip", + "zone", + "__value__" + ], + "metric": [ + "m" + ], + "splitIntervalMs": 60000, + "beginDateTime": "2019-01-01 00:00:00", + "endDateTime": "2019-01-01 01:00:00" + } + }, + "writer": { + "name": "adswriter", + "parameter": { + "username": "******", + "password": "******", + "column": [ + "`metric`", + "`ts`", + "`app`", + "`cluster`", + "`group`", + "`ip`", + "`zone`", + "`value`" + ], + "url": "http://localhost:3306", + "schema": "datax_test", + "table": "datax_test", + "writeMode": "insert", + "opIndex": "0", + "batchSize": "2" + } + } + } + ], + "setting": { + "speed": { + "channel": 3 + } + } + } +} +``` + +* 配置一个从 阿里云 TSDB 数据库同步抽取**多值**数据到 ADB 的作业: + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "tsdbreader", + "parameter": { + "sinkDbType": "RDB", + "endpoint": "http://localhost:8242", + "column": [ + "__metric__", + "__ts__", + "app", + "cluster", + "group", + "ip", + "zone", + "load", + "memory", + "cpu" + ], + "metric": [ + "m_field" + ], + "field": { + "m_field": [ + "load", + "memory", + "cpu" + ] + }, + "splitIntervalMs": 60000, + "beginDateTime": "2019-01-01 00:00:00", + "endDateTime": "2019-01-01 01:00:00" + } + }, + "writer": { + "name": "adswriter", + "parameter": { + "username": "******", + "password": "******", + "column": [ + "`metric`", + "`ts`", + "`app`", + "`cluster`", + "`group`", + "`ip`", + "`zone`", + "`load`", + "`memory`", + "`cpu`" + ], + "url": "http://localhost:3306", + "schema": "datax_test", + "table": "datax_test_multi_field", + "writeMode": "insert", + "opIndex": "0", + "batchSize": "2" + } + } + } + ], + "setting": { + "speed": { + "channel": 3 + } + } + } +} +``` + +* 配置一个从 阿里云 TSDB 数据库同步抽取**单值**数据到 ADB 的作业,并指定过滤部分时间线: + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "tsdbreader", + "parameter": { + "sinkDbType": "RDB", + "endpoint": "http://localhost:8242", + "column": [ + "__metric__", + "__ts__", + "app", + "cluster", + "group", + "ip", + "zone", + "__value__" + ], + "metric": [ + "m" + ], + "tag": { + "m": { + "app": "a1", + "cluster": "c1" + } + }, + "splitIntervalMs": 60000, + "beginDateTime": "2019-01-01 00:00:00", + "endDateTime": "2019-01-01 01:00:00" + } + }, + "writer": { + "name": "adswriter", + "parameter": { + "username": "******", + "password": "******", + "column": [ + "`metric`", + "`ts`", + "`app`", + "`cluster`", + "`group`", + "`ip`", + "`zone`", + "`value`" + ], + "url": "http://localhost:3306", + "schema": "datax_test", + "table": "datax_test", + "writeMode": "insert", + "opIndex": "0", + "batchSize": "2" + } + } + } + ], + "setting": { + "speed": { + "channel": 3 + } + } + } +} +``` + +* 配置一个从 阿里云 TSDB 数据库同步抽取**多值**数据到 ADB 的作业,并指定过滤部分时间线: + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "tsdbreader", + "parameter": { + "sinkDbType": "RDB", + "endpoint": "http://localhost:8242", + "column": [ + "__metric__", + "__ts__", + "app", + "cluster", + "group", + "ip", + "zone", + "load", + "memory", + "cpu" + ], + "metric": [ + "m_field" + ], + "field": { + "m_field": [ + "load", + "memory", + "cpu" + ] + }, + "tag": { + "m_field": { + "ip": "i999" + } + }, + "splitIntervalMs": 60000, + "beginDateTime": "2019-01-01 00:00:00", + "endDateTime": "2019-01-01 01:00:00" + } + }, + "writer": { + "name": "adswriter", + "parameter": { + "username": "******", + "password": "******", + "column": [ + "`metric`", + "`ts`", + "`app`", + "`cluster`", + "`group`", + "`ip`", + "`zone`", + "`load`", + "`memory`", + "`cpu`" + ], + "url": "http://localhost:3306", + "schema": "datax_test", + "table": "datax_test_multi_field", + "writeMode": "insert", + "opIndex": "0", + "batchSize": "2" + } + } + } + ], + "setting": { + "speed": { + "channel": 3 + } + } + } +} +``` + +* 配置一个从 阿里云 TSDB 数据库同步抽取**单值**数据到另一个 阿里云 TSDB 数据库 的作业: + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "tsdbreader", + "parameter": { + "sinkDbType": "TSDB", + "endpoint": "http://localhost:8242", + "column": [ + "m" + ], + "splitIntervalMs": 60000, + "beginDateTime": "2019-01-01 00:00:00", + "endDateTime": "2019-01-01 01:00:00" + } + }, + "writer": { + "name": "tsdbwriter", + "parameter": { + "endpoint": "http://localhost:8240" + } + } + } + ], + "setting": { + "speed": { + "channel": 3 + } + } + } +} +``` + +* 配置一个从 阿里云 TSDB 数据库同步抽取**多值**数据到另一个 阿里云 TSDB 数据库 的作业: + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "tsdbreader", + "parameter": { + "sinkDbType": "TSDB", + "endpoint": "http://localhost:8242", + "column": [ + "m_field" + ], + "field": { + "m_field": [ + "load", + "memory", + "cpu" + ] + }, + "splitIntervalMs": 60000, + "beginDateTime": "2019-01-01 00:00:00", + "endDateTime": "2019-01-01 01:00:00" + } + }, + "writer": { + "name": "tsdbwriter", + "parameter": { + "multiField": true, + "endpoint": "http://localhost:8240" + } + } + } + ], + "setting": { + "speed": { + "channel": 3 + } + } + } +} +``` + + + + + + +### 3.2 参数说明 + +* **name** + * 描述:本插件的名称 + * 必选:是 + * 默认值:tsdbreader + +* **parameter** + * **sinkDbType** + * 描述:目标数据库的类型 + * 必选:否 + * 默认值:TSDB + * 注意:目前支持 TSDB 和 RDB 两个取值。其中,TSDB 包括 阿里云 TSDB、OpenTSDB、InfluxDB、Prometheus 和 TimeScale。RDB 包括 ADB、MySQL、Oracle、PostgreSQL 和 DRDS 等。 + + * **endpoint** + * 描述:阿里云 TSDB 的 HTTP 连接地址 + * 必选:是 + * 格式:http://IP:Port + * 默认值:无 + + * **column** + * 描述:TSDB 场景下:数据迁移任务需要迁移的 Metric 列表;RDB 场景下:映射到关系型数据库中的表字段,且增加 `__metric__`、`__ts__` 和 `__value__` 三个字段,其中 `__metric__` 用于映射度量字段,`__ts__` 用于映射 timestamp 字段,而 `__value__` 仅适用于单值场景,用于映射度量值,多值场景下,直接指定 field 字段即可 + * 必选:是 + * 默认值:无 + + * **metric** + * 描述:仅适用于 RDB 场景下,表示数据迁移任务需要迁移的 Metric 列表 + * 必选:否 + * 默认值:无 + + * **field** + * 描述:仅适用于多值场景下,表示数据迁移任务需要迁移的 Field 列表 + * 必选:否 + * 默认值:无 + + * **tag** + * 描述:数据迁移任务需要迁移的 TagK 和 TagV,用于进一步过滤时间线 + * 必选:否 + * 默认值:无 + + * **splitIntervalMs** + * 描述:用于 DataX 内部切分 Task,每个 Task 只查询一小部分的时间段 + * 必选:是 + * 默认值:无 + * 注意:单位是 ms 毫秒 + + +* **beginDateTime** + * 描述:和 endDateTime 配合使用,用于指定哪个时间段内的数据点,需要被迁移 + * 必选:是 + * 格式:`yyyy-MM-dd HH:mm:ss` + * 默认值:无 + * 注意:指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的 [3:35, 4:55) 会被转为 [3:00, 4:00) + +* **endDateTime** + * 描述:和 beginDateTime 配合使用,用于指定哪个时间段内的数据点,需要被迁移 + * 必选:是 + * 格式:`yyyy-MM-dd HH:mm:ss` + * 默认值:无 + * 注意:指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的 [3:35, 4:55) 会被转为 [3:00, 4:00) + + + + +### 3.3 类型转换 + +| DataX 内部类型 | TSDB 数据类型 | +| -------------- | ------------------------------------------------------------ | +| String | TSDB 数据点序列化字符串,包括 timestamp、metric、tags、fields 和 value | + + + + + + +## 4 约束限制 + +### 4.2 如果存在某一个 Metric 下在一个小时范围内的数据量过大,可能需要通过 `-j` 参数调整 JVM 内存大小 + +考虑到下游 Writer 如果写入速度不及 TSDB Reader 的查询数据,可能会存在积压的情况,因此需要适当地调整 JVM 参数。以"从 阿里云 TSDB 数据库同步抽取数据到本地的作业"为例,启动命令如下: + +```bash + python datax/bin/datax.py tsdb2stream.json -j "-Xms4096m -Xmx4096m" +``` + + + +### 4.3 指定起止时间会自动被转为整点时刻 + +指定起止时间会自动被转为整点时刻,例如 2019-4-18 的 `[3:35, 3:55)` 会被转为 `[3:00, 4:00)` + + + diff --git a/tsdbreader/pom.xml b/tsdbreader/pom.xml new file mode 100644 index 00000000..b9a45985 --- /dev/null +++ b/tsdbreader/pom.xml @@ -0,0 +1,146 @@ + + + 4.0.0 + + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + + tsdbreader + tsdbreader + jar + + + UTF-8 + + + 3.3.2 + + + 4.4 + 2.4 + + + 1.2.28 + + + 4.12 + + + 2.9.9 + + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + fastjson + com.alibaba + + + commons-math3 + org.apache.commons + + + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + + + org.apache.commons + commons-lang3 + ${commons-lang3.version} + + + + + org.apache.httpcomponents + httpclient + ${httpclient.version} + + + commons-io + commons-io + ${commons-io.version} + + + org.apache.httpcomponents + fluent-hc + ${httpclient.version} + + + + + com.alibaba + fastjson + ${fastjson.version} + + + + + joda-time + joda-time + ${joda-time.version} + + + + + junit + junit + ${junit4.version} + test + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/tsdbreader/src/main/assembly/package.xml b/tsdbreader/src/main/assembly/package.xml new file mode 100755 index 00000000..b76f2aba --- /dev/null +++ b/tsdbreader/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/reader/tsdbreader + + + target/ + + tsdbreader-0.0.1-SNAPSHOT.jar + + plugin/reader/tsdbreader + + + + + + false + plugin/reader/tsdbreader/libs + runtime + + + diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Constant.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Constant.java new file mode 100644 index 00000000..e42dedc0 --- /dev/null +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Constant.java @@ -0,0 +1,29 @@ +package com.alibaba.datax.plugin.reader.tsdbreader; + +import java.util.HashSet; +import java.util.Set; + +/** + * Copyright @ 2019 alibaba.com + * All right reserved. + * Function:Constant + * + * @author Benedict Jin + * @since 2019-10-21 + */ +public final class Constant { + + static final String DEFAULT_DATA_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + public static final String METRIC_SPECIFY_KEY = "__metric__"; + public static final String TS_SPECIFY_KEY = "__ts__"; + public static final String VALUE_SPECIFY_KEY = "__value__"; + + static final Set MUST_CONTAINED_SPECIFY_KEYS = new HashSet<>(); + + static { + MUST_CONTAINED_SPECIFY_KEYS.add(METRIC_SPECIFY_KEY); + MUST_CONTAINED_SPECIFY_KEYS.add(TS_SPECIFY_KEY); + // __value__ 在多值场景下,可以不指定 + } +} diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Key.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Key.java new file mode 100644 index 00000000..14ee7e41 --- /dev/null +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Key.java @@ -0,0 +1,36 @@ +package com.alibaba.datax.plugin.reader.tsdbreader; + +import java.util.HashSet; +import java.util.Set; + +/** + * Copyright @ 2019 alibaba.com + * All right reserved. + * Function:Key + * + * @author Benedict Jin + * @since 2019-10-21 + */ +public class Key { + + // TSDB for OpenTSDB / InfluxDB / TimeScale / Prometheus etc. + // RDB for MySQL / ADB etc. + static final String SINK_DB_TYPE = "sinkDbType"; + static final String ENDPOINT = "endpoint"; + static final String COLUMN = "column"; + static final String METRIC = "metric"; + static final String FIELD = "field"; + static final String TAG = "tag"; + static final String INTERVAL_DATE_TIME = "splitIntervalMs"; + static final String BEGIN_DATE_TIME = "beginDateTime"; + static final String END_DATE_TIME = "endDateTime"; + + static final Integer INTERVAL_DATE_TIME_DEFAULT_VALUE = 60; + static final String TYPE_DEFAULT_VALUE = "TSDB"; + static final Set TYPE_SET = new HashSet<>(); + + static { + TYPE_SET.add("TSDB"); + TYPE_SET.add("RDB"); + } +} diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReader.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReader.java new file mode 100755 index 00000000..04b931c7 --- /dev/null +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReader.java @@ -0,0 +1,320 @@ +package com.alibaba.datax.plugin.reader.tsdbreader; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.reader.tsdbreader.conn.TSDBConnection; +import com.alibaba.datax.plugin.reader.tsdbreader.util.TimeUtils; +import com.alibaba.fastjson.JSON; +import org.apache.commons.lang3.StringUtils; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * Copyright @ 2019 alibaba.com + * All right reserved. + * Function:TSDB Reader + * + * @author Benedict Jin + * @since 2019-10-21 + */ +@SuppressWarnings("unused") +public class TSDBReader extends Reader { + + public static class Job extends Reader.Job { + + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + + private Configuration originalConfig; + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + + String type = originalConfig.getString(Key.SINK_DB_TYPE, Key.TYPE_DEFAULT_VALUE); + if (StringUtils.isBlank(type)) { + throw DataXException.asDataXException( + TSDBReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.SINK_DB_TYPE + "] is not set."); + } + if (!Key.TYPE_SET.contains(type)) { + throw DataXException.asDataXException( + TSDBReaderErrorCode.ILLEGAL_VALUE, + "The parameter [" + Key.SINK_DB_TYPE + "] should be one of [" + + JSON.toJSONString(Key.TYPE_SET) + "]."); + } + + String address = originalConfig.getString(Key.ENDPOINT); + if (StringUtils.isBlank(address)) { + throw DataXException.asDataXException( + TSDBReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.ENDPOINT + "] is not set."); + } + + // tagK / field could be empty + if ("TSDB".equals(type)) { + List columns = originalConfig.getList(Key.COLUMN, String.class); + if (columns == null || columns.isEmpty()) { + throw DataXException.asDataXException( + TSDBReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.COLUMN + "] is not set."); + } + } else { + List columns = originalConfig.getList(Key.COLUMN, String.class); + if (columns == null || columns.isEmpty()) { + throw DataXException.asDataXException( + TSDBReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.COLUMN + "] is not set."); + } + for (String specifyKey : Constant.MUST_CONTAINED_SPECIFY_KEYS) { + if (!columns.contains(specifyKey)) { + throw DataXException.asDataXException( + TSDBReaderErrorCode.ILLEGAL_VALUE, + "The parameter [" + Key.COLUMN + "] should contain " + + JSON.toJSONString(Constant.MUST_CONTAINED_SPECIFY_KEYS) + "."); + } + } + final List metrics = originalConfig.getList(Key.METRIC, String.class); + if (metrics == null || metrics.isEmpty()) { + throw DataXException.asDataXException( + TSDBReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.METRIC + "] is not set."); + } + } + + Integer splitIntervalMs = originalConfig.getInt(Key.INTERVAL_DATE_TIME, + Key.INTERVAL_DATE_TIME_DEFAULT_VALUE); + if (splitIntervalMs <= 0) { + throw DataXException.asDataXException( + TSDBReaderErrorCode.ILLEGAL_VALUE, + "The parameter [" + Key.INTERVAL_DATE_TIME + "] should be great than zero."); + } + + SimpleDateFormat format = new SimpleDateFormat(Constant.DEFAULT_DATA_FORMAT); + String startTime = originalConfig.getString(Key.BEGIN_DATE_TIME); + Long startDate; + if (startTime == null || startTime.trim().length() == 0) { + throw DataXException.asDataXException( + TSDBReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.BEGIN_DATE_TIME + "] is not set."); + } else { + try { + startDate = format.parse(startTime).getTime(); + } catch (ParseException e) { + throw DataXException.asDataXException(TSDBReaderErrorCode.ILLEGAL_VALUE, + "The parameter [" + Key.BEGIN_DATE_TIME + + "] needs to conform to the [" + Constant.DEFAULT_DATA_FORMAT + "] format."); + } + } + String endTime = originalConfig.getString(Key.END_DATE_TIME); + Long endDate; + if (endTime == null || endTime.trim().length() == 0) { + throw DataXException.asDataXException( + TSDBReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.END_DATE_TIME + "] is not set."); + } else { + try { + endDate = format.parse(endTime).getTime(); + } catch (ParseException e) { + throw DataXException.asDataXException(TSDBReaderErrorCode.ILLEGAL_VALUE, + "The parameter [" + Key.END_DATE_TIME + + "] needs to conform to the [" + Constant.DEFAULT_DATA_FORMAT + "] format."); + } + } + if (startDate >= endDate) { + throw DataXException.asDataXException(TSDBReaderErrorCode.ILLEGAL_VALUE, + "The parameter [" + Key.BEGIN_DATE_TIME + + "] should be less than the parameter [" + Key.END_DATE_TIME + "]."); + } + } + + @Override + public void prepare() { + } + + @Override + public List split(int adviceNumber) { + List configurations = new ArrayList<>(); + + // get metrics + String type = originalConfig.getString(Key.SINK_DB_TYPE, Key.TYPE_DEFAULT_VALUE); + List columns4TSDB = null; + List columns4RDB = null; + List metrics = null; + if ("TSDB".equals(type)) { + columns4TSDB = originalConfig.getList(Key.COLUMN, String.class); + } else { + columns4RDB = originalConfig.getList(Key.COLUMN, String.class); + metrics = originalConfig.getList(Key.METRIC, String.class); + } + + // get time interval + Integer splitIntervalMs = originalConfig.getInt(Key.INTERVAL_DATE_TIME, + Key.INTERVAL_DATE_TIME_DEFAULT_VALUE); + + // get time range + SimpleDateFormat format = new SimpleDateFormat(Constant.DEFAULT_DATA_FORMAT); + long startTime; + try { + startTime = format.parse(originalConfig.getString(Key.BEGIN_DATE_TIME)).getTime(); + } catch (ParseException e) { + throw DataXException.asDataXException( + TSDBReaderErrorCode.ILLEGAL_VALUE, "解析[" + Key.BEGIN_DATE_TIME + "]失败.", e); + } + long endTime; + try { + endTime = format.parse(originalConfig.getString(Key.END_DATE_TIME)).getTime(); + } catch (ParseException e) { + throw DataXException.asDataXException( + TSDBReaderErrorCode.ILLEGAL_VALUE, "解析[" + Key.END_DATE_TIME + "]失败.", e); + } + if (TimeUtils.isSecond(startTime)) { + startTime *= 1000; + } + if (TimeUtils.isSecond(endTime)) { + endTime *= 1000; + } + DateTime startDateTime = new DateTime(TimeUtils.getTimeInHour(startTime)); + DateTime endDateTime = new DateTime(TimeUtils.getTimeInHour(endTime)); + + if ("TSDB".equals(type)) { + // split by metric + for (String column : columns4TSDB) { + // split by time in hour + while (startDateTime.isBefore(endDateTime)) { + Configuration clone = this.originalConfig.clone(); + clone.set(Key.COLUMN, Collections.singletonList(column)); + + clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis()); + startDateTime = startDateTime.plusMillis(splitIntervalMs); + // Make sure the time interval is [start, end). + clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1); + configurations.add(clone); + + LOG.info("Configuration: {}", JSON.toJSONString(clone)); + } + } + } else { + // split by metric + for (String metric : metrics) { + // split by time in hour + while (startDateTime.isBefore(endDateTime)) { + Configuration clone = this.originalConfig.clone(); + clone.set(Key.COLUMN, columns4RDB); + clone.set(Key.METRIC, Collections.singletonList(metric)); + + clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis()); + startDateTime = startDateTime.plusMillis(splitIntervalMs); + // Make sure the time interval is [start, end). + clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1); + configurations.add(clone); + + LOG.info("Configuration: {}", JSON.toJSONString(clone)); + } + } + } + return configurations; + } + + @Override + public void post() { + } + + @Override + public void destroy() { + } + } + + public static class Task extends Reader.Task { + + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + + private String type; + private List columns4TSDB = null; + private List columns4RDB = null; + private List metrics = null; + private Map fields; + private Map tags; + private TSDBConnection conn; + private Long startTime; + private Long endTime; + + @Override + public void init() { + Configuration readerSliceConfig = super.getPluginJobConf(); + + LOG.info("getPluginJobConf: {}", JSON.toJSONString(readerSliceConfig)); + + this.type = readerSliceConfig.getString(Key.SINK_DB_TYPE); + if ("TSDB".equals(type)) { + columns4TSDB = readerSliceConfig.getList(Key.COLUMN, String.class); + } else { + columns4RDB = readerSliceConfig.getList(Key.COLUMN, String.class); + metrics = readerSliceConfig.getList(Key.METRIC, String.class); + } + this.fields = readerSliceConfig.getMap(Key.FIELD); + this.tags = readerSliceConfig.getMap(Key.TAG); + + String address = readerSliceConfig.getString(Key.ENDPOINT); + + conn = new TSDBConnection(address); + + this.startTime = readerSliceConfig.getLong(Key.BEGIN_DATE_TIME); + this.endTime = readerSliceConfig.getLong(Key.END_DATE_TIME); + } + + @Override + public void prepare() { + } + + @Override + @SuppressWarnings("unchecked") + public void startRead(RecordSender recordSender) { + try { + if ("TSDB".equals(type)) { + for (String metric : columns4TSDB) { + final Map tags = this.tags == null ? + null : (Map) this.tags.get(metric); + if (fields == null || !fields.containsKey(metric)) { + conn.sendDPs(metric, tags, this.startTime, this.endTime, recordSender); + } else { + conn.sendDPs(metric, (List) fields.get(metric), + tags, this.startTime, this.endTime, recordSender); + } + } + } else { + for (String metric : metrics) { + final Map tags = this.tags == null ? + null : (Map) this.tags.get(metric); + if (fields == null || !fields.containsKey(metric)) { + conn.sendRecords(metric, tags, startTime, endTime, columns4RDB, recordSender); + } else { + conn.sendRecords(metric, (List) fields.get(metric), + tags, startTime, endTime, columns4RDB, recordSender); + } + } + } + } catch (Exception e) { + throw DataXException.asDataXException( + TSDBReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e); + } + } + + @Override + public void post() { + } + + @Override + public void destroy() { + } + } +} diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReaderErrorCode.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReaderErrorCode.java new file mode 100755 index 00000000..4ebdcca5 --- /dev/null +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReaderErrorCode.java @@ -0,0 +1,40 @@ +package com.alibaba.datax.plugin.reader.tsdbreader; + +import com.alibaba.datax.common.spi.ErrorCode; + +/** + * Copyright @ 2019 alibaba.com + * All right reserved. + * Function:TSDB Reader Error Code + * + * @author Benedict Jin + * @since 2019-10-21 + */ +public enum TSDBReaderErrorCode implements ErrorCode { + + REQUIRED_VALUE("TSDBReader-00", "缺失必要的值"), + ILLEGAL_VALUE("TSDBReader-01", "值非法"); + + private final String code; + private final String description; + + TSDBReaderErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s]. ", this.code, this.description); + } +} diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/Connection4TSDB.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/Connection4TSDB.java new file mode 100644 index 00000000..500894bb --- /dev/null +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/Connection4TSDB.java @@ -0,0 +1,88 @@ +package com.alibaba.datax.plugin.reader.tsdbreader.conn; + +import com.alibaba.datax.common.plugin.RecordSender; + +import java.util.List; +import java.util.Map; + +/** + * Copyright @ 2019 alibaba.com + * All right reserved. + * Function:Connection for TSDB-like databases + * + * @author Benedict Jin + * @since 2019-10-21 + */ +public interface Connection4TSDB { + + /** + * Get the address of Database. + * + * @return host+ip + */ + String address(); + + /** + * Get the version of Database. + * + * @return version + */ + String version(); + + /** + * Get these configurations. + * + * @return configs + */ + String config(); + + /** + * Get the list of supported version. + * + * @return version list + */ + String[] getSupportVersionPrefix(); + + /** + * Send data points for TSDB with single field. + */ + void sendDPs(String metric, Map tags, Long start, Long end, RecordSender recordSender) throws Exception; + + /** + * Send data points for TSDB with multi fields. + */ + void sendDPs(String metric, List fields, Map tags, Long start, Long end, RecordSender recordSender) throws Exception; + + /** + * Send data points for RDB with single field. + */ + void sendRecords(String metric, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender) throws Exception; + + /** + * Send data points for RDB with multi fields. + */ + void sendRecords(String metric, List fields, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender) throws Exception; + + /** + * Put data point. + * + * @param dp data point + * @return whether the data point is written successfully + */ + boolean put(DataPoint4TSDB dp); + + /** + * Put data points. + * + * @param dps data points + * @return whether the data point is written successfully + */ + boolean put(List dps); + + /** + * Whether current version is supported. + * + * @return true: supported; false: not yet! + */ + boolean isSupported(); +} diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/DataPoint4MultiFieldsTSDB.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/DataPoint4MultiFieldsTSDB.java new file mode 100644 index 00000000..5b380c73 --- /dev/null +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/DataPoint4MultiFieldsTSDB.java @@ -0,0 +1,68 @@ +package com.alibaba.datax.plugin.reader.tsdbreader.conn; + +import com.alibaba.fastjson.JSON; + +import java.util.Map; + +/** + * Copyright @ 2019 alibaba.com + * All right reserved. + * Function:DataPoint for TSDB with Multi Fields + * + * @author Benedict Jin + * @since 2019-10-21 + */ +public class DataPoint4MultiFieldsTSDB { + + private long timestamp; + private String metric; + private Map tags; + private Map fields; + + public DataPoint4MultiFieldsTSDB() { + } + + public DataPoint4MultiFieldsTSDB(long timestamp, String metric, Map tags, Map fields) { + this.timestamp = timestamp; + this.metric = metric; + this.tags = tags; + this.fields = fields; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public String getMetric() { + return metric; + } + + public void setMetric(String metric) { + this.metric = metric; + } + + public Map getTags() { + return tags; + } + + public void setTags(Map tags) { + this.tags = tags; + } + + public Map getFields() { + return fields; + } + + public void setFields(Map fields) { + this.fields = fields; + } + + @Override + public String toString() { + return JSON.toJSONString(this); + } +} diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/DataPoint4TSDB.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/DataPoint4TSDB.java new file mode 100644 index 00000000..5c5c1349 --- /dev/null +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/DataPoint4TSDB.java @@ -0,0 +1,68 @@ +package com.alibaba.datax.plugin.reader.tsdbreader.conn; + +import com.alibaba.fastjson.JSON; + +import java.util.Map; + +/** + * Copyright @ 2019 alibaba.com + * All right reserved. + * Function:DataPoint for TSDB + * + * @author Benedict Jin + * @since 2019-10-21 + */ +public class DataPoint4TSDB { + + private long timestamp; + private String metric; + private Map tags; + private Object value; + + public DataPoint4TSDB() { + } + + public DataPoint4TSDB(long timestamp, String metric, Map tags, Object value) { + this.timestamp = timestamp; + this.metric = metric; + this.tags = tags; + this.value = value; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public String getMetric() { + return metric; + } + + public void setMetric(String metric) { + this.metric = metric; + } + + public Map getTags() { + return tags; + } + + public void setTags(Map tags) { + this.tags = tags; + } + + public Object getValue() { + return value; + } + + public void setValue(Object value) { + this.value = value; + } + + @Override + public String toString() { + return JSON.toJSONString(this); + } +} diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/MultiFieldQueryResult.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/MultiFieldQueryResult.java new file mode 100644 index 00000000..32c2a76e --- /dev/null +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/MultiFieldQueryResult.java @@ -0,0 +1,64 @@ +package com.alibaba.datax.plugin.reader.tsdbreader.conn; + +import java.util.List; +import java.util.Map; + +/** + * Copyright @ 2019 alibaba.com + * All right reserved. + * Function:Multi Field Query Result + * + * @author Benedict Jin + * @since 2019-10-22 + */ +public class MultiFieldQueryResult { + + private String metric; + private Map tags; + private List aggregatedTags; + private List columns; + private List> values; + + public MultiFieldQueryResult() { + } + + public String getMetric() { + return metric; + } + + public void setMetric(String metric) { + this.metric = metric; + } + + public Map getTags() { + return tags; + } + + public void setTags(Map tags) { + this.tags = tags; + } + + public List getAggregatedTags() { + return aggregatedTags; + } + + public void setAggregatedTags(List aggregatedTags) { + this.aggregatedTags = aggregatedTags; + } + + public List getColumns() { + return columns; + } + + public void setColumns(List columns) { + this.columns = columns; + } + + public List> getValues() { + return values; + } + + public void setValues(List> values) { + this.values = values; + } +} diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/QueryResult.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/QueryResult.java new file mode 100644 index 00000000..8f3e2b59 --- /dev/null +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/QueryResult.java @@ -0,0 +1,64 @@ +package com.alibaba.datax.plugin.reader.tsdbreader.conn; + +import java.util.List; +import java.util.Map; + +/** + * Copyright @ 2019 alibaba.com + * All right reserved. + * Function:Query Result + * + * @author Benedict Jin + * @since 2019-09-19 + */ +public class QueryResult { + + private String metricName; + private Map tags; + private List groupByTags; + private List aggregatedTags; + private Map dps; + + public QueryResult() { + } + + public String getMetricName() { + return metricName; + } + + public void setMetricName(String metricName) { + this.metricName = metricName; + } + + public Map getTags() { + return tags; + } + + public void setTags(Map tags) { + this.tags = tags; + } + + public List getGroupByTags() { + return groupByTags; + } + + public void setGroupByTags(List groupByTags) { + this.groupByTags = groupByTags; + } + + public List getAggregatedTags() { + return aggregatedTags; + } + + public void setAggregatedTags(List aggregatedTags) { + this.aggregatedTags = aggregatedTags; + } + + public Map getDps() { + return dps; + } + + public void setDps(Map dps) { + this.dps = dps; + } +} diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnection.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnection.java new file mode 100644 index 00000000..5426ab49 --- /dev/null +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnection.java @@ -0,0 +1,94 @@ +package com.alibaba.datax.plugin.reader.tsdbreader.conn; + +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.plugin.reader.tsdbreader.util.TSDBUtils; +import com.alibaba.fastjson.JSON; +import org.apache.commons.lang3.StringUtils; + +import java.util.List; +import java.util.Map; + +/** + * Copyright @ 2019 alibaba.com + * All right reserved. + * Function:TSDB Connection + * + * @author Benedict Jin + * @since 2019-10-21 + */ +public class TSDBConnection implements Connection4TSDB { + + private String address; + + public TSDBConnection(String address) { + this.address = address; + } + + @Override + public String address() { + return address; + } + + @Override + public String version() { + return TSDBUtils.version(address); + } + + @Override + public String config() { + return TSDBUtils.config(address); + } + + @Override + public String[] getSupportVersionPrefix() { + return new String[]{"2.4", "2.5"}; + } + + @Override + public void sendDPs(String metric, Map tags, Long start, Long end, RecordSender recordSender) throws Exception { + TSDBDump.dump4TSDB(this, metric, tags, start, end, recordSender); + } + + @Override + public void sendDPs(String metric, List fields, Map tags, Long start, Long end, RecordSender recordSender) throws Exception { + TSDBDump.dump4TSDB(this, metric, fields, tags, start, end, recordSender); + } + + @Override + public void sendRecords(String metric, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender) throws Exception { + TSDBDump.dump4RDB(this, metric, tags, start, end, columns4RDB, recordSender); + } + + @Override + public void sendRecords(String metric, List fields, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender) throws Exception { + TSDBDump.dump4RDB(this, metric, fields, tags, start, end, columns4RDB, recordSender); + } + + @Override + public boolean put(DataPoint4TSDB dp) { + return false; + } + + @Override + public boolean put(List dps) { + return false; + } + + @Override + public boolean isSupported() { + String versionJson = version(); + if (StringUtils.isBlank(versionJson)) { + throw new RuntimeException("Cannot get the version!"); + } + String version = JSON.parseObject(versionJson).getString("version"); + if (StringUtils.isBlank(version)) { + return false; + } + for (String prefix : getSupportVersionPrefix()) { + if (version.startsWith(prefix)) { + return true; + } + } + return false; + } +} diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBDump.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBDump.java new file mode 100644 index 00000000..8bae3a70 --- /dev/null +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBDump.java @@ -0,0 +1,318 @@ +package com.alibaba.datax.plugin.reader.tsdbreader.conn; + +import com.alibaba.datax.common.element.*; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.plugin.reader.tsdbreader.Constant; +import com.alibaba.datax.plugin.reader.tsdbreader.util.HttpUtils; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.parser.Feature; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Copyright @ 2019 alibaba.com + * All right reserved. + * Function:TSDB Dump + * + * @author Benedict Jin + * @since 2019-10-21 + */ +final class TSDBDump { + + private static final Logger LOG = LoggerFactory.getLogger(TSDBDump.class); + + private static final String QUERY = "/api/query"; + private static final String QUERY_MULTI_FIELD = "/api/mquery"; + + static { + JSON.DEFAULT_PARSER_FEATURE &= ~Feature.UseBigDecimal.getMask(); + } + + private TSDBDump() { + } + + static void dump4TSDB(TSDBConnection conn, String metric, Map tags, + Long start, Long end, RecordSender sender) throws Exception { + LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end); + + String res = queryRange4SingleField(conn, metric, tags, start, end); + List dps = getDps4TSDB(metric, res); + if (dps == null || dps.isEmpty()) { + return; + } + sendTSDBDps(sender, dps); + } + + static void dump4TSDB(TSDBConnection conn, String metric, List fields, Map tags, + Long start, Long end, RecordSender sender) throws Exception { + LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end); + + String res = queryRange4MultiFields(conn, metric, fields, tags, start, end); + List dps = getDps4TSDB(metric, fields, res); + if (dps == null || dps.isEmpty()) { + return; + } + sendTSDBDps(sender, dps); + } + + static void dump4RDB(TSDBConnection conn, String metric, Map tags, + Long start, Long end, List columns4RDB, RecordSender sender) throws Exception { + LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end); + + String res = queryRange4SingleField(conn, metric, tags, start, end); + List dps = getDps4RDB(metric, res); + if (dps == null || dps.isEmpty()) { + return; + } + for (DataPoint4TSDB dp : dps) { + final Record record = sender.createRecord(); + final Map tagKV = dp.getTags(); + + for (String column : columns4RDB) { + if (Constant.METRIC_SPECIFY_KEY.equals(column)) { + record.addColumn(new StringColumn(dp.getMetric())); + } else if (Constant.TS_SPECIFY_KEY.equals(column)) { + record.addColumn(new LongColumn(dp.getTimestamp())); + } else if (Constant.VALUE_SPECIFY_KEY.equals(column)) { + record.addColumn(getColumn(dp.getValue())); + } else { + final Object tagk = tagKV.get(column); + if (tagk == null) { + continue; + } + record.addColumn(getColumn(tagk)); + } + } + sender.sendToWriter(record); + } + } + + static void dump4RDB(TSDBConnection conn, String metric, List fields, + Map tags, Long start, Long end, + List columns4RDB, RecordSender sender) throws Exception { + LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end); + + String res = queryRange4MultiFields(conn, metric, fields, tags, start, end); + List dps = getDps4RDB(metric, fields, res); + if (dps == null || dps.isEmpty()) { + return; + } + for (DataPoint4TSDB dp : dps) { + final Record record = sender.createRecord(); + final Map tagKV = dp.getTags(); + + for (String column : columns4RDB) { + if (Constant.METRIC_SPECIFY_KEY.equals(column)) { + record.addColumn(new StringColumn(dp.getMetric())); + } else if (Constant.TS_SPECIFY_KEY.equals(column)) { + record.addColumn(new LongColumn(dp.getTimestamp())); + } else { + final Object tagvOrField = tagKV.get(column); + if (tagvOrField == null) { + continue; + } + record.addColumn(getColumn(tagvOrField)); + } + } + sender.sendToWriter(record); + } + } + + private static Column getColumn(Object value) throws Exception { + Column valueColumn; + if (value instanceof Double) { + valueColumn = new DoubleColumn((Double) value); + } else if (value instanceof Long) { + valueColumn = new LongColumn((Long) value); + } else if (value instanceof String) { + valueColumn = new StringColumn((String) value); + } else { + throw new Exception(String.format("value 不支持类型: [%s]", value.getClass().getSimpleName())); + } + return valueColumn; + } + + private static String queryRange4SingleField(TSDBConnection conn, String metric, Map tags, + Long start, Long end) throws Exception { + String tagKV = getFilterByTags(tags); + String body = "{\n" + + " \"start\": " + start + ",\n" + + " \"end\": " + end + ",\n" + + " \"queries\": [\n" + + " {\n" + + " \"aggregator\": \"none\",\n" + + " \"metric\": \"" + metric + "\"\n" + + (tagKV == null ? "" : tagKV) + + " }\n" + + " ]\n" + + "}"; + return HttpUtils.post(conn.address() + QUERY, body); + } + + private static String queryRange4MultiFields(TSDBConnection conn, String metric, List fields, + Map tags, Long start, Long end) throws Exception { + // fields + StringBuilder fieldBuilder = new StringBuilder(); + fieldBuilder.append("\"fields\":["); + for (int i = 0; i < fields.size(); i++) { + fieldBuilder.append("{\"field\": \"").append(fields.get(i)).append("\",\"aggregator\": \"none\"}"); + if (i != fields.size() - 1) { + fieldBuilder.append(","); + } + } + fieldBuilder.append("]"); + // tagkv + String tagKV = getFilterByTags(tags); + String body = "{\n" + + " \"start\": " + start + ",\n" + + " \"end\": " + end + ",\n" + + " \"queries\": [\n" + + " {\n" + + " \"aggregator\": \"none\",\n" + + " \"metric\": \"" + metric + "\",\n" + + fieldBuilder.toString() + + (tagKV == null ? "" : tagKV) + + " }\n" + + " ]\n" + + "}"; + return HttpUtils.post(conn.address() + QUERY_MULTI_FIELD, body); + } + + private static String getFilterByTags(Map tags) { + if (tags != null && !tags.isEmpty()) { + // tagKV = ",\"tags:\":" + JSON.toJSONString(tags); + StringBuilder tagBuilder = new StringBuilder(); + tagBuilder.append(",\"filters\":["); + int count = 1; + final int size = tags.size(); + for (Map.Entry entry : tags.entrySet()) { + final String tagK = entry.getKey(); + final String tagV = entry.getValue(); + tagBuilder.append("{\"type\":\"literal_or\",\"tagk\":\"").append(tagK) + .append("\",\"filter\":\"").append(tagV).append("\",\"groupBy\":false}"); + if (count != size) { + tagBuilder.append(","); + } + count++; + } + tagBuilder.append("]"); + return tagBuilder.toString(); + } + return null; + } + + private static List getDps4TSDB(String metric, String dps) { + final List jsonArray = JSON.parseArray(dps, QueryResult.class); + if (jsonArray.size() == 0) { + return null; + } + List dpsArr = new LinkedList<>(); + for (QueryResult queryResult : jsonArray) { + final Map tags = queryResult.getTags(); + final Map points = queryResult.getDps(); + for (Map.Entry entry : points.entrySet()) { + final String ts = entry.getKey(); + final Object value = entry.getValue(); + + DataPoint4TSDB dp = new DataPoint4TSDB(); + dp.setMetric(metric); + dp.setTags(tags); + dp.setTimestamp(Long.parseLong(ts)); + dp.setValue(value); + dpsArr.add(dp.toString()); + } + } + return dpsArr; + } + + private static List getDps4TSDB(String metric, List fields, String dps) { + final List jsonArray = JSON.parseArray(dps, MultiFieldQueryResult.class); + if (jsonArray.size() == 0) { + return null; + } + List dpsArr = new LinkedList<>(); + for (MultiFieldQueryResult queryResult : jsonArray) { + final Map tags = queryResult.getTags(); + final List> values = queryResult.getValues(); + for (List value : values) { + final String ts = value.get(0).toString(); + Map fieldsAndValues = new HashMap<>(); + for (int i = 0; i < fields.size(); i++) { + fieldsAndValues.put(fields.get(i), value.get(i + 1)); + } + + final DataPoint4MultiFieldsTSDB dp = new DataPoint4MultiFieldsTSDB(); + dp.setMetric(metric); + dp.setTimestamp(Long.parseLong(ts)); + dp.setTags(tags); + dp.setFields(fieldsAndValues); + dpsArr.add(dp.toString()); + } + } + return dpsArr; + } + + private static List getDps4RDB(String metric, String dps) { + final List jsonArray = JSON.parseArray(dps, QueryResult.class); + if (jsonArray.size() == 0) { + return null; + } + List dpsArr = new LinkedList<>(); + for (QueryResult queryResult : jsonArray) { + final Map tags = queryResult.getTags(); + final Map points = queryResult.getDps(); + for (Map.Entry entry : points.entrySet()) { + final String ts = entry.getKey(); + final Object value = entry.getValue(); + + final DataPoint4TSDB dp = new DataPoint4TSDB(); + dp.setMetric(metric); + dp.setTags(tags); + dp.setTimestamp(Long.parseLong(ts)); + dp.setValue(value); + dpsArr.add(dp); + } + } + return dpsArr; + } + + private static List getDps4RDB(String metric, List fields, String dps) { + final List jsonArray = JSON.parseArray(dps, MultiFieldQueryResult.class); + if (jsonArray.size() == 0) { + return null; + } + List dpsArr = new LinkedList<>(); + for (MultiFieldQueryResult queryResult : jsonArray) { + final Map tags = queryResult.getTags(); + final List> values = queryResult.getValues(); + for (List value : values) { + final String ts = value.get(0).toString(); + Map tagsTmp = new HashMap<>(tags); + for (int i = 0; i < fields.size(); i++) { + tagsTmp.put(fields.get(i), value.get(i + 1)); + } + + final DataPoint4TSDB dp = new DataPoint4TSDB(); + dp.setMetric(metric); + dp.setTimestamp(Long.parseLong(ts)); + dp.setTags(tagsTmp); + dpsArr.add(dp); + } + } + return dpsArr; + } + + private static void sendTSDBDps(RecordSender sender, List dps) { + for (String dp : dps) { + StringColumn tsdbColumn = new StringColumn(dp); + Record record = sender.createRecord(); + record.addColumn(tsdbColumn); + sender.sendToWriter(record); + } + } +} diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtils.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtils.java new file mode 100644 index 00000000..3e0be854 --- /dev/null +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtils.java @@ -0,0 +1,67 @@ +package com.alibaba.datax.plugin.reader.tsdbreader.util; + +import com.alibaba.fastjson.JSON; +import org.apache.http.client.fluent.Content; +import org.apache.http.client.fluent.Request; +import org.apache.http.entity.ContentType; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Copyright @ 2019 alibaba.com + * All right reserved. + * Function:HttpUtils + * + * @author Benedict Jin + * @since 2019-10-21 + */ +public final class HttpUtils { + + public final static int CONNECT_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60); + public final static int SOCKET_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60); + + private HttpUtils() { + } + + public static String get(String url) throws Exception { + Content content = Request.Get(url) + .connectTimeout(CONNECT_TIMEOUT_DEFAULT_IN_MILL) + .socketTimeout(SOCKET_TIMEOUT_DEFAULT_IN_MILL) + .execute() + .returnContent(); + if (content == null) { + return null; + } + return content.asString(StandardCharsets.UTF_8); + } + + public static String post(String url, Map params) throws Exception { + return post(url, JSON.toJSONString(params), CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL); + } + + public static String post(String url, String params) throws Exception { + return post(url, params, CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL); + } + + public static String post(String url, Map params, + int connectTimeoutInMill, int socketTimeoutInMill) throws Exception { + return post(url, JSON.toJSONString(params), connectTimeoutInMill, socketTimeoutInMill); + } + + public static String post(String url, String params, + int connectTimeoutInMill, int socketTimeoutInMill) throws Exception { + Content content = Request.Post(url) + .connectTimeout(connectTimeoutInMill) + .socketTimeout(socketTimeoutInMill) + .addHeader("Content-Type", "application/json") + .bodyString(params, ContentType.APPLICATION_JSON) + .execute() + .returnContent(); + if (content == null) { + return null; + } + return content.asString(StandardCharsets.UTF_8); + } +} diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TSDBUtils.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TSDBUtils.java new file mode 100644 index 00000000..bb7b4b87 --- /dev/null +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TSDBUtils.java @@ -0,0 +1,68 @@ +package com.alibaba.datax.plugin.reader.tsdbreader.util; + +import com.alibaba.datax.plugin.reader.tsdbreader.conn.DataPoint4TSDB; +import com.alibaba.fastjson.JSON; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Copyright @ 2019 alibaba.com + * All right reserved. + * Function:TSDB Utils + * + * @author Benedict Jin + * @since 2019-10-21 + */ +public final class TSDBUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(TSDBUtils.class); + + private TSDBUtils() { + } + + public static String version(String address) { + String url = String.format("%s/api/version", address); + String rsp; + try { + rsp = HttpUtils.get(url); + } catch (Exception e) { + throw new RuntimeException(e); + } + return rsp; + } + + public static String config(String address) { + String url = String.format("%s/api/config", address); + String rsp; + try { + rsp = HttpUtils.get(url); + } catch (Exception e) { + throw new RuntimeException(e); + } + return rsp; + } + + public static boolean put(String address, List dps) { + return put(address, JSON.toJSON(dps)); + } + + public static boolean put(String address, DataPoint4TSDB dp) { + return put(address, JSON.toJSON(dp)); + } + + private static boolean put(String address, Object o) { + String url = String.format("%s/api/put", address); + String rsp; + try { + rsp = HttpUtils.post(url, o.toString()); + // If successful, the returned content should be null. + assert rsp == null; + } catch (Exception e) { + LOGGER.error("Address: {}, DataPoints: {}", url, o); + throw new RuntimeException(e); + } + return true; + } +} diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TimeUtils.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TimeUtils.java new file mode 100644 index 00000000..c1243164 --- /dev/null +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TimeUtils.java @@ -0,0 +1,38 @@ +package com.alibaba.datax.plugin.reader.tsdbreader.util; + +import java.util.concurrent.TimeUnit; + +/** + * Copyright @ 2019 alibaba.com + * All right reserved. + * Function:TimeUtils + * + * @author Benedict Jin + * @since 2019-10-21 + */ +public final class TimeUtils { + + private TimeUtils() { + } + + private static final long SECOND_MASK = 0xFFFFFFFF00000000L; + private static final long HOUR_IN_MILL = TimeUnit.HOURS.toMillis(1); + + /** + * Weather the timestamp is second. + * + * @param ts timestamp + */ + public static boolean isSecond(long ts) { + return (ts & SECOND_MASK) == 0; + } + + /** + * Get the hour. + * + * @param ms time in millisecond + */ + public static long getTimeInHour(long ms) { + return ms - ms % HOUR_IN_MILL; + } +} diff --git a/tsdbreader/src/main/resources/plugin.json b/tsdbreader/src/main/resources/plugin.json new file mode 100755 index 00000000..f2dbb1f0 --- /dev/null +++ b/tsdbreader/src/main/resources/plugin.json @@ -0,0 +1,10 @@ +{ + "name": "tsdbreader", + "class": "com.alibaba.datax.plugin.reader.tsdbreader.TSDBReader", + "description": { + "useScene": "从 TSDB 中摄取数据点", + "mechanism": "通过 /api/query 接口查询出符合条件的数据点", + "warn": "指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的 [3:35, 4:55) 会被转为 [3:00, 4:00)" + }, + "developer": "Benedict Jin" +} diff --git a/tsdbreader/src/main/resources/plugin_job_template.json b/tsdbreader/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..8570abb9 --- /dev/null +++ b/tsdbreader/src/main/resources/plugin_job_template.json @@ -0,0 +1,29 @@ +{ + "name": "tsdbreader", + "parameter": { + "sinkDbType": "RDB", + "endpoint": "http://localhost:8242", + "column": [ + "__metric__", + "__ts__", + "app", + "cluster", + "group", + "ip", + "zone", + "__value__" + ], + "metric": [ + "m" + ], + "tag": { + "m": { + "app": "a1", + "cluster": "c1" + } + }, + "splitIntervalMs": 60000, + "beginDateTime": "2019-01-01 00:00:00", + "endDateTime": "2019-01-01 01:00:00" + } +} diff --git a/tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnectionTest.java b/tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnectionTest.java new file mode 100644 index 00000000..e4544088 --- /dev/null +++ b/tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnectionTest.java @@ -0,0 +1,30 @@ +package com.alibaba.datax.plugin.reader.tsdbreader.conn; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Copyright @ 2019 alibaba.com + * All right reserved. + * Function:TSDB Connection4TSDB Test + * + * @author Benedict Jin + * @since 2019-10-21 + */ +@Ignore +public class TSDBConnectionTest { + + private static final String TSDB_ADDRESS = "http://localhost:8242"; + + @Test + public void testVersion() { + String version = new TSDBConnection(TSDB_ADDRESS).version(); + Assert.assertNotNull(version); + } + + @Test + public void testIsSupported() { + Assert.assertTrue(new TSDBConnection(TSDB_ADDRESS).isSupported()); + } +} diff --git a/tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/util/Const.java b/tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/util/Const.java new file mode 100644 index 00000000..43e04b5c --- /dev/null +++ b/tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/util/Const.java @@ -0,0 +1,17 @@ +package com.alibaba.datax.plugin.reader.tsdbreader.util; + +/** + * Copyright @ 2019 alibaba.com + * All right reserved. + * Function:Const + * + * @author Benedict Jin + * @since 2019-10-21 + */ +final class Const { + + private Const() { + } + + static final String TSDB_ADDRESS = "http://localhost:8242"; +} diff --git a/tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtilsTest.java b/tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtilsTest.java new file mode 100644 index 00000000..12a2660a --- /dev/null +++ b/tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtilsTest.java @@ -0,0 +1,39 @@ +package com.alibaba.datax.plugin.reader.tsdbreader.util; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Copyright @ 2019 alibaba.com + * All right reserved. + * Function:HttpUtils Test + * + * @author Benedict Jin + * @since 2019-10-21 + */ +@Ignore +public class HttpUtilsTest { + + @Test + public void testSimpleCase() throws Exception { + String url = "https://httpbin.org/post"; + Map params = new HashMap<>(); + params.put("foo", "bar"); + + String rsp = HttpUtils.post(url, params); + System.out.println(rsp); + Assert.assertNotNull(rsp); + } + + @Test + public void testGet() throws Exception { + String url = String.format("%s/api/version", Const.TSDB_ADDRESS); + String rsp = HttpUtils.get(url); + System.out.println(rsp); + Assert.assertNotNull(rsp); + } +} diff --git a/tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TimeUtilsTest.java b/tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TimeUtilsTest.java new file mode 100644 index 00000000..cef8efae --- /dev/null +++ b/tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TimeUtilsTest.java @@ -0,0 +1,33 @@ +package com.alibaba.datax.plugin.reader.tsdbreader.util; + +import org.junit.Assert; +import org.junit.Test; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * Copyright @ 2019 alibaba.com + * All right reserved. + * Function:com.alibaba.datax.common.util + * + * @author Benedict Jin + * @since 2019-10-21 + */ +public class TimeUtilsTest { + + @Test + public void testIsSecond() { + Assert.assertFalse(TimeUtils.isSecond(System.currentTimeMillis())); + Assert.assertTrue(TimeUtils.isSecond(System.currentTimeMillis() / 1000)); + } + + @Test + public void testGetTimeInHour() throws ParseException { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date date = sdf.parse("2019-04-18 15:32:33"); + long timeInHour = TimeUtils.getTimeInHour(date.getTime()); + Assert.assertEquals("2019-04-18 15:00:00", sdf.format(timeInHour)); + } +}