diff --git a/pom.xml b/pom.xml
index ece9491e..9463f2bf 100755
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,7 @@
rdbmsreader
hbase11xreader
hbase094xreader
+ tsdbreader
opentsdbreader
cassandrareader
diff --git a/tsdbreader/doc/tsdbreader.md b/tsdbreader/doc/tsdbreader.md
new file mode 100644
index 00000000..557dcc51
--- /dev/null
+++ b/tsdbreader/doc/tsdbreader.md
@@ -0,0 +1,587 @@
+
+# TSDBReader 插件文档
+
+___
+
+
+## 1 快速介绍
+
+TSDBReader 插件实现了从阿里云 TSDB 读取数据。阿里云时间序列数据库 ( **T**ime **S**eries **D**ata**b**ase , 简称 TSDB) 是一种集时序数据高效读写,压缩存储,实时计算能力为一体的数据库服务,可广泛应用于物联网和互联网领域,实现对设备及业务服务的实时监控,实时预测告警。详见 TSDB 的阿里云[官网](https://cn.aliyun.com/product/hitsdb)。
+
+
+
+## 2 实现原理
+
+在底层实现上,TSDBReader 通过 HTTP 请求链接到 阿里云 TSDB 实例,利用 `/api/query` 或者 `/api/mquery` 接口将数据点扫描出来(更多细节详见:[时序数据库 TSDB - HTTP API 概览](https://help.aliyun.com/document_detail/63557.html))。而整个同步的过程,是通过时间线和查询时间线范围进行切分。
+
+
+
+## 3 功能说明
+
+### 3.1 配置样例
+
+* 配置一个从 阿里云 TSDB 数据库同步抽取数据到本地的作业,并以**时序数据**的格式输出:
+
+时序数据样例:
+```json
+{"metric":"m","tags":{"app":"a19","cluster":"c5","group":"g10","ip":"i999","zone":"z1"},"timestamp":1546272263,"value":1}
+```
+
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "tsdbreader",
+ "parameter": {
+ "sinkDbType": "TSDB",
+ "endpoint": "http://localhost:8242",
+ "column": [
+ "m"
+ ],
+ "splitIntervalMs": 60000,
+ "beginDateTime": "2019-01-01 00:00:00",
+ "endDateTime": "2019-01-01 01:00:00"
+ }
+ },
+ "writer": {
+ "name": "streamwriter",
+ "parameter": {
+ "encoding": "UTF-8",
+ "print": true
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 3
+ }
+ }
+ }
+}
+```
+
+* 配置一个从 阿里云 TSDB 数据库同步抽取数据到本地的作业,并以**关系型数据**的格式输出:
+
+关系型数据样例:
+```txt
+m 1546272125 a1 c1 g2 i3021 z4 1.0
+```
+
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "tsdbreader",
+ "parameter": {
+ "sinkDbType": "RDB",
+ "endpoint": "http://localhost:8242",
+ "column": [
+ "__metric__",
+ "__ts__",
+ "app",
+ "cluster",
+ "group",
+ "ip",
+ "zone",
+ "__value__"
+ ],
+ "metric": [
+ "m"
+ ],
+ "splitIntervalMs": 60000,
+ "beginDateTime": "2019-01-01 00:00:00",
+ "endDateTime": "2019-01-01 01:00:00"
+ }
+ },
+ "writer": {
+ "name": "streamwriter",
+ "parameter": {
+ "encoding": "UTF-8",
+ "print": true
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 3
+ }
+ }
+ }
+}
+```
+
+* 配置一个从 阿里云 TSDB 数据库同步抽取**单值**数据到 ADB 的作业:
+
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "tsdbreader",
+ "parameter": {
+ "sinkDbType": "RDB",
+ "endpoint": "http://localhost:8242",
+ "column": [
+ "__metric__",
+ "__ts__",
+ "app",
+ "cluster",
+ "group",
+ "ip",
+ "zone",
+ "__value__"
+ ],
+ "metric": [
+ "m"
+ ],
+ "splitIntervalMs": 60000,
+ "beginDateTime": "2019-01-01 00:00:00",
+ "endDateTime": "2019-01-01 01:00:00"
+ }
+ },
+ "writer": {
+ "name": "adswriter",
+ "parameter": {
+ "username": "******",
+ "password": "******",
+ "column": [
+ "`metric`",
+ "`ts`",
+ "`app`",
+ "`cluster`",
+ "`group`",
+ "`ip`",
+ "`zone`",
+ "`value`"
+ ],
+ "url": "http://localhost:3306",
+ "schema": "datax_test",
+ "table": "datax_test",
+ "writeMode": "insert",
+ "opIndex": "0",
+ "batchSize": "2"
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 3
+ }
+ }
+ }
+}
+```
+
+* 配置一个从 阿里云 TSDB 数据库同步抽取**多值**数据到 ADB 的作业:
+
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "tsdbreader",
+ "parameter": {
+ "sinkDbType": "RDB",
+ "endpoint": "http://localhost:8242",
+ "column": [
+ "__metric__",
+ "__ts__",
+ "app",
+ "cluster",
+ "group",
+ "ip",
+ "zone",
+ "load",
+ "memory",
+ "cpu"
+ ],
+ "metric": [
+ "m_field"
+ ],
+ "field": {
+ "m_field": [
+ "load",
+ "memory",
+ "cpu"
+ ]
+ },
+ "splitIntervalMs": 60000,
+ "beginDateTime": "2019-01-01 00:00:00",
+ "endDateTime": "2019-01-01 01:00:00"
+ }
+ },
+ "writer": {
+ "name": "adswriter",
+ "parameter": {
+ "username": "******",
+ "password": "******",
+ "column": [
+ "`metric`",
+ "`ts`",
+ "`app`",
+ "`cluster`",
+ "`group`",
+ "`ip`",
+ "`zone`",
+ "`load`",
+ "`memory`",
+ "`cpu`"
+ ],
+ "url": "http://localhost:3306",
+ "schema": "datax_test",
+ "table": "datax_test_multi_field",
+ "writeMode": "insert",
+ "opIndex": "0",
+ "batchSize": "2"
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 3
+ }
+ }
+ }
+}
+```
+
+* 配置一个从 阿里云 TSDB 数据库同步抽取**单值**数据到 ADB 的作业,并指定过滤部分时间线:
+
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "tsdbreader",
+ "parameter": {
+ "sinkDbType": "RDB",
+ "endpoint": "http://localhost:8242",
+ "column": [
+ "__metric__",
+ "__ts__",
+ "app",
+ "cluster",
+ "group",
+ "ip",
+ "zone",
+ "__value__"
+ ],
+ "metric": [
+ "m"
+ ],
+ "tag": {
+ "m": {
+ "app": "a1",
+ "cluster": "c1"
+ }
+ },
+ "splitIntervalMs": 60000,
+ "beginDateTime": "2019-01-01 00:00:00",
+ "endDateTime": "2019-01-01 01:00:00"
+ }
+ },
+ "writer": {
+ "name": "adswriter",
+ "parameter": {
+ "username": "******",
+ "password": "******",
+ "column": [
+ "`metric`",
+ "`ts`",
+ "`app`",
+ "`cluster`",
+ "`group`",
+ "`ip`",
+ "`zone`",
+ "`value`"
+ ],
+ "url": "http://localhost:3306",
+ "schema": "datax_test",
+ "table": "datax_test",
+ "writeMode": "insert",
+ "opIndex": "0",
+ "batchSize": "2"
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 3
+ }
+ }
+ }
+}
+```
+
+* 配置一个从 阿里云 TSDB 数据库同步抽取**多值**数据到 ADB 的作业,并指定过滤部分时间线:
+
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "tsdbreader",
+ "parameter": {
+ "sinkDbType": "RDB",
+ "endpoint": "http://localhost:8242",
+ "column": [
+ "__metric__",
+ "__ts__",
+ "app",
+ "cluster",
+ "group",
+ "ip",
+ "zone",
+ "load",
+ "memory",
+ "cpu"
+ ],
+ "metric": [
+ "m_field"
+ ],
+ "field": {
+ "m_field": [
+ "load",
+ "memory",
+ "cpu"
+ ]
+ },
+ "tag": {
+ "m_field": {
+ "ip": "i999"
+ }
+ },
+ "splitIntervalMs": 60000,
+ "beginDateTime": "2019-01-01 00:00:00",
+ "endDateTime": "2019-01-01 01:00:00"
+ }
+ },
+ "writer": {
+ "name": "adswriter",
+ "parameter": {
+ "username": "******",
+ "password": "******",
+ "column": [
+ "`metric`",
+ "`ts`",
+ "`app`",
+ "`cluster`",
+ "`group`",
+ "`ip`",
+ "`zone`",
+ "`load`",
+ "`memory`",
+ "`cpu`"
+ ],
+ "url": "http://localhost:3306",
+ "schema": "datax_test",
+ "table": "datax_test_multi_field",
+ "writeMode": "insert",
+ "opIndex": "0",
+ "batchSize": "2"
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 3
+ }
+ }
+ }
+}
+```
+
+* 配置一个从 阿里云 TSDB 数据库同步抽取**单值**数据到另一个 阿里云 TSDB 数据库 的作业:
+
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "tsdbreader",
+ "parameter": {
+ "sinkDbType": "TSDB",
+ "endpoint": "http://localhost:8242",
+ "column": [
+ "m"
+ ],
+ "splitIntervalMs": 60000,
+ "beginDateTime": "2019-01-01 00:00:00",
+ "endDateTime": "2019-01-01 01:00:00"
+ }
+ },
+ "writer": {
+ "name": "tsdbwriter",
+ "parameter": {
+ "endpoint": "http://localhost:8240"
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 3
+ }
+ }
+ }
+}
+```
+
+* 配置一个从 阿里云 TSDB 数据库同步抽取**多值**数据到另一个 阿里云 TSDB 数据库 的作业:
+
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "tsdbreader",
+ "parameter": {
+ "sinkDbType": "TSDB",
+ "endpoint": "http://localhost:8242",
+ "column": [
+ "m_field"
+ ],
+ "field": {
+ "m_field": [
+ "load",
+ "memory",
+ "cpu"
+ ]
+ },
+ "splitIntervalMs": 60000,
+ "beginDateTime": "2019-01-01 00:00:00",
+ "endDateTime": "2019-01-01 01:00:00"
+ }
+ },
+ "writer": {
+ "name": "tsdbwriter",
+ "parameter": {
+ "multiField": true,
+ "endpoint": "http://localhost:8240"
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 3
+ }
+ }
+ }
+}
+```
+
+
+
+
+
+
+### 3.2 参数说明
+
+* **name**
+ * 描述:本插件的名称
+ * 必选:是
+ * 默认值:tsdbreader
+
+* **parameter**
+ * **sinkDbType**
+ * 描述:目标数据库的类型
+ * 必选:否
+ * 默认值:TSDB
+ * 注意:目前支持 TSDB 和 RDB 两个取值。其中,TSDB 包括 阿里云 TSDB、OpenTSDB、InfluxDB、Prometheus 和 TimeScale。RDB 包括 ADB、MySQL、Oracle、PostgreSQL 和 DRDS 等。
+
+ * **endpoint**
+ * 描述:阿里云 TSDB 的 HTTP 连接地址
+ * 必选:是
+ * 格式:http://IP:Port
+ * 默认值:无
+
+ * **column**
+ * 描述:TSDB 场景下:数据迁移任务需要迁移的 Metric 列表;RDB 场景下:映射到关系型数据库中的表字段,且增加 `__metric__`、`__ts__` 和 `__value__` 三个字段,其中 `__metric__` 用于映射度量字段,`__ts__` 用于映射 timestamp 字段,而 `__value__` 仅适用于单值场景,用于映射度量值,多值场景下,直接指定 field 字段即可
+ * 必选:是
+ * 默认值:无
+
+ * **metric**
+ * 描述:仅适用于 RDB 场景下,表示数据迁移任务需要迁移的 Metric 列表
+ * 必选:否
+ * 默认值:无
+
+ * **field**
+ * 描述:仅适用于多值场景下,表示数据迁移任务需要迁移的 Field 列表
+ * 必选:否
+ * 默认值:无
+
+ * **tag**
+ * 描述:数据迁移任务需要迁移的 TagK 和 TagV,用于进一步过滤时间线
+ * 必选:否
+ * 默认值:无
+
+ * **splitIntervalMs**
+ * 描述:用于 DataX 内部切分 Task,每个 Task 只查询一小部分的时间段
+ * 必选:是
+ * 默认值:无
+ * 注意:单位是 ms 毫秒
+
+
+* **beginDateTime**
+ * 描述:和 endDateTime 配合使用,用于指定哪个时间段内的数据点,需要被迁移
+ * 必选:是
+ * 格式:`yyyy-MM-dd HH:mm:ss`
+ * 默认值:无
+ * 注意:指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的 [3:35, 4:55) 会被转为 [3:00, 4:00)
+
+* **endDateTime**
+ * 描述:和 beginDateTime 配合使用,用于指定哪个时间段内的数据点,需要被迁移
+ * 必选:是
+ * 格式:`yyyy-MM-dd HH:mm:ss`
+ * 默认值:无
+ * 注意:指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的 [3:35, 4:55) 会被转为 [3:00, 4:00)
+
+
+
+
+### 3.3 类型转换
+
+| DataX 内部类型 | TSDB 数据类型 |
+| -------------- | ------------------------------------------------------------ |
+| String | TSDB 数据点序列化字符串,包括 timestamp、metric、tags、fields 和 value |
+
+
+
+
+
+
+## 4 约束限制
+
+### 4.2 如果存在某一个 Metric 下在一个小时范围内的数据量过大,可能需要通过 `-j` 参数调整 JVM 内存大小
+
+考虑到下游 Writer 如果写入速度不及 TSDB Reader 的查询数据,可能会存在积压的情况,因此需要适当地调整 JVM 参数。以"从 阿里云 TSDB 数据库同步抽取数据到本地的作业"为例,启动命令如下:
+
+```bash
+ python datax/bin/datax.py tsdb2stream.json -j "-Xms4096m -Xmx4096m"
+```
+
+
+
+### 4.3 指定起止时间会自动被转为整点时刻
+
+指定起止时间会自动被转为整点时刻,例如 2019-4-18 的 `[3:35, 3:55)` 会被转为 `[3:00, 4:00)`
+
+
+
diff --git a/tsdbreader/pom.xml b/tsdbreader/pom.xml
new file mode 100644
index 00000000..b9a45985
--- /dev/null
+++ b/tsdbreader/pom.xml
@@ -0,0 +1,146 @@
+
+
+ 4.0.0
+
+
+ com.alibaba.datax
+ datax-all
+ 0.0.1-SNAPSHOT
+
+
+ tsdbreader
+ tsdbreader
+ jar
+
+
+ UTF-8
+
+
+ 3.3.2
+
+
+ 4.4
+ 2.4
+
+
+ 1.2.28
+
+
+ 4.12
+
+
+ 2.9.9
+
+
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax-project-version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ fastjson
+ com.alibaba
+
+
+ commons-math3
+ org.apache.commons
+
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+ ch.qos.logback
+ logback-classic
+
+
+
+
+ org.apache.commons
+ commons-lang3
+ ${commons-lang3.version}
+
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ ${httpclient.version}
+
+
+ commons-io
+ commons-io
+ ${commons-io.version}
+
+
+ org.apache.httpcomponents
+ fluent-hc
+ ${httpclient.version}
+
+
+
+
+ com.alibaba
+ fastjson
+ ${fastjson.version}
+
+
+
+
+ joda-time
+ joda-time
+ ${joda-time.version}
+
+
+
+
+ junit
+ junit
+ ${junit4.version}
+ test
+
+
+
+
+
+
+
+ maven-compiler-plugin
+
+ ${jdk-version}
+ ${jdk-version}
+ ${project-sourceEncoding}
+
+
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
diff --git a/tsdbreader/src/main/assembly/package.xml b/tsdbreader/src/main/assembly/package.xml
new file mode 100755
index 00000000..b76f2aba
--- /dev/null
+++ b/tsdbreader/src/main/assembly/package.xml
@@ -0,0 +1,35 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/reader/tsdbreader
+
+
+ target/
+
+ tsdbreader-0.0.1-SNAPSHOT.jar
+
+ plugin/reader/tsdbreader
+
+
+
+
+
+ false
+ plugin/reader/tsdbreader/libs
+ runtime
+
+
+
diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Constant.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Constant.java
new file mode 100644
index 00000000..e42dedc0
--- /dev/null
+++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Constant.java
@@ -0,0 +1,29 @@
+package com.alibaba.datax.plugin.reader.tsdbreader;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:Constant
+ *
+ * @author Benedict Jin
+ * @since 2019-10-21
+ */
+public final class Constant {
+
+ static final String DEFAULT_DATA_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+ public static final String METRIC_SPECIFY_KEY = "__metric__";
+ public static final String TS_SPECIFY_KEY = "__ts__";
+ public static final String VALUE_SPECIFY_KEY = "__value__";
+
+ static final Set MUST_CONTAINED_SPECIFY_KEYS = new HashSet<>();
+
+ static {
+ MUST_CONTAINED_SPECIFY_KEYS.add(METRIC_SPECIFY_KEY);
+ MUST_CONTAINED_SPECIFY_KEYS.add(TS_SPECIFY_KEY);
+ // __value__ 在多值场景下,可以不指定
+ }
+}
diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Key.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Key.java
new file mode 100644
index 00000000..14ee7e41
--- /dev/null
+++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Key.java
@@ -0,0 +1,36 @@
+package com.alibaba.datax.plugin.reader.tsdbreader;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:Key
+ *
+ * @author Benedict Jin
+ * @since 2019-10-21
+ */
+public class Key {
+
+ // TSDB for OpenTSDB / InfluxDB / TimeScale / Prometheus etc.
+ // RDB for MySQL / ADB etc.
+ static final String SINK_DB_TYPE = "sinkDbType";
+ static final String ENDPOINT = "endpoint";
+ static final String COLUMN = "column";
+ static final String METRIC = "metric";
+ static final String FIELD = "field";
+ static final String TAG = "tag";
+ static final String INTERVAL_DATE_TIME = "splitIntervalMs";
+ static final String BEGIN_DATE_TIME = "beginDateTime";
+ static final String END_DATE_TIME = "endDateTime";
+
+ static final Integer INTERVAL_DATE_TIME_DEFAULT_VALUE = 60;
+ static final String TYPE_DEFAULT_VALUE = "TSDB";
+ static final Set TYPE_SET = new HashSet<>();
+
+ static {
+ TYPE_SET.add("TSDB");
+ TYPE_SET.add("RDB");
+ }
+}
diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReader.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReader.java
new file mode 100755
index 00000000..04b931c7
--- /dev/null
+++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReader.java
@@ -0,0 +1,320 @@
+package com.alibaba.datax.plugin.reader.tsdbreader;
+
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordSender;
+import com.alibaba.datax.common.spi.Reader;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.reader.tsdbreader.conn.TSDBConnection;
+import com.alibaba.datax.plugin.reader.tsdbreader.util.TimeUtils;
+import com.alibaba.fastjson.JSON;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:TSDB Reader
+ *
+ * @author Benedict Jin
+ * @since 2019-10-21
+ */
+@SuppressWarnings("unused")
+public class TSDBReader extends Reader {
+
+ public static class Job extends Reader.Job {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Job.class);
+
+ private Configuration originalConfig;
+
+ @Override
+ public void init() {
+ this.originalConfig = super.getPluginJobConf();
+
+ String type = originalConfig.getString(Key.SINK_DB_TYPE, Key.TYPE_DEFAULT_VALUE);
+ if (StringUtils.isBlank(type)) {
+ throw DataXException.asDataXException(
+ TSDBReaderErrorCode.REQUIRED_VALUE,
+ "The parameter [" + Key.SINK_DB_TYPE + "] is not set.");
+ }
+ if (!Key.TYPE_SET.contains(type)) {
+ throw DataXException.asDataXException(
+ TSDBReaderErrorCode.ILLEGAL_VALUE,
+ "The parameter [" + Key.SINK_DB_TYPE + "] should be one of [" +
+ JSON.toJSONString(Key.TYPE_SET) + "].");
+ }
+
+ String address = originalConfig.getString(Key.ENDPOINT);
+ if (StringUtils.isBlank(address)) {
+ throw DataXException.asDataXException(
+ TSDBReaderErrorCode.REQUIRED_VALUE,
+ "The parameter [" + Key.ENDPOINT + "] is not set.");
+ }
+
+ // tagK / field could be empty
+ if ("TSDB".equals(type)) {
+ List columns = originalConfig.getList(Key.COLUMN, String.class);
+ if (columns == null || columns.isEmpty()) {
+ throw DataXException.asDataXException(
+ TSDBReaderErrorCode.REQUIRED_VALUE,
+ "The parameter [" + Key.COLUMN + "] is not set.");
+ }
+ } else {
+ List columns = originalConfig.getList(Key.COLUMN, String.class);
+ if (columns == null || columns.isEmpty()) {
+ throw DataXException.asDataXException(
+ TSDBReaderErrorCode.REQUIRED_VALUE,
+ "The parameter [" + Key.COLUMN + "] is not set.");
+ }
+ for (String specifyKey : Constant.MUST_CONTAINED_SPECIFY_KEYS) {
+ if (!columns.contains(specifyKey)) {
+ throw DataXException.asDataXException(
+ TSDBReaderErrorCode.ILLEGAL_VALUE,
+ "The parameter [" + Key.COLUMN + "] should contain "
+ + JSON.toJSONString(Constant.MUST_CONTAINED_SPECIFY_KEYS) + ".");
+ }
+ }
+ final List metrics = originalConfig.getList(Key.METRIC, String.class);
+ if (metrics == null || metrics.isEmpty()) {
+ throw DataXException.asDataXException(
+ TSDBReaderErrorCode.REQUIRED_VALUE,
+ "The parameter [" + Key.METRIC + "] is not set.");
+ }
+ }
+
+ Integer splitIntervalMs = originalConfig.getInt(Key.INTERVAL_DATE_TIME,
+ Key.INTERVAL_DATE_TIME_DEFAULT_VALUE);
+ if (splitIntervalMs <= 0) {
+ throw DataXException.asDataXException(
+ TSDBReaderErrorCode.ILLEGAL_VALUE,
+ "The parameter [" + Key.INTERVAL_DATE_TIME + "] should be great than zero.");
+ }
+
+ SimpleDateFormat format = new SimpleDateFormat(Constant.DEFAULT_DATA_FORMAT);
+ String startTime = originalConfig.getString(Key.BEGIN_DATE_TIME);
+ Long startDate;
+ if (startTime == null || startTime.trim().length() == 0) {
+ throw DataXException.asDataXException(
+ TSDBReaderErrorCode.REQUIRED_VALUE,
+ "The parameter [" + Key.BEGIN_DATE_TIME + "] is not set.");
+ } else {
+ try {
+ startDate = format.parse(startTime).getTime();
+ } catch (ParseException e) {
+ throw DataXException.asDataXException(TSDBReaderErrorCode.ILLEGAL_VALUE,
+ "The parameter [" + Key.BEGIN_DATE_TIME +
+ "] needs to conform to the [" + Constant.DEFAULT_DATA_FORMAT + "] format.");
+ }
+ }
+ String endTime = originalConfig.getString(Key.END_DATE_TIME);
+ Long endDate;
+ if (endTime == null || endTime.trim().length() == 0) {
+ throw DataXException.asDataXException(
+ TSDBReaderErrorCode.REQUIRED_VALUE,
+ "The parameter [" + Key.END_DATE_TIME + "] is not set.");
+ } else {
+ try {
+ endDate = format.parse(endTime).getTime();
+ } catch (ParseException e) {
+ throw DataXException.asDataXException(TSDBReaderErrorCode.ILLEGAL_VALUE,
+ "The parameter [" + Key.END_DATE_TIME +
+ "] needs to conform to the [" + Constant.DEFAULT_DATA_FORMAT + "] format.");
+ }
+ }
+ if (startDate >= endDate) {
+ throw DataXException.asDataXException(TSDBReaderErrorCode.ILLEGAL_VALUE,
+ "The parameter [" + Key.BEGIN_DATE_TIME +
+ "] should be less than the parameter [" + Key.END_DATE_TIME + "].");
+ }
+ }
+
+ @Override
+ public void prepare() {
+ }
+
+ @Override
+ public List split(int adviceNumber) {
+ List configurations = new ArrayList<>();
+
+ // get metrics
+ String type = originalConfig.getString(Key.SINK_DB_TYPE, Key.TYPE_DEFAULT_VALUE);
+ List columns4TSDB = null;
+ List columns4RDB = null;
+ List metrics = null;
+ if ("TSDB".equals(type)) {
+ columns4TSDB = originalConfig.getList(Key.COLUMN, String.class);
+ } else {
+ columns4RDB = originalConfig.getList(Key.COLUMN, String.class);
+ metrics = originalConfig.getList(Key.METRIC, String.class);
+ }
+
+ // get time interval
+ Integer splitIntervalMs = originalConfig.getInt(Key.INTERVAL_DATE_TIME,
+ Key.INTERVAL_DATE_TIME_DEFAULT_VALUE);
+
+ // get time range
+ SimpleDateFormat format = new SimpleDateFormat(Constant.DEFAULT_DATA_FORMAT);
+ long startTime;
+ try {
+ startTime = format.parse(originalConfig.getString(Key.BEGIN_DATE_TIME)).getTime();
+ } catch (ParseException e) {
+ throw DataXException.asDataXException(
+ TSDBReaderErrorCode.ILLEGAL_VALUE, "解析[" + Key.BEGIN_DATE_TIME + "]失败.", e);
+ }
+ long endTime;
+ try {
+ endTime = format.parse(originalConfig.getString(Key.END_DATE_TIME)).getTime();
+ } catch (ParseException e) {
+ throw DataXException.asDataXException(
+ TSDBReaderErrorCode.ILLEGAL_VALUE, "解析[" + Key.END_DATE_TIME + "]失败.", e);
+ }
+ if (TimeUtils.isSecond(startTime)) {
+ startTime *= 1000;
+ }
+ if (TimeUtils.isSecond(endTime)) {
+ endTime *= 1000;
+ }
+ DateTime startDateTime = new DateTime(TimeUtils.getTimeInHour(startTime));
+ DateTime endDateTime = new DateTime(TimeUtils.getTimeInHour(endTime));
+
+ if ("TSDB".equals(type)) {
+ // split by metric
+ for (String column : columns4TSDB) {
+ // split by time in hour
+ while (startDateTime.isBefore(endDateTime)) {
+ Configuration clone = this.originalConfig.clone();
+ clone.set(Key.COLUMN, Collections.singletonList(column));
+
+ clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
+ startDateTime = startDateTime.plusMillis(splitIntervalMs);
+ // Make sure the time interval is [start, end).
+ clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1);
+ configurations.add(clone);
+
+ LOG.info("Configuration: {}", JSON.toJSONString(clone));
+ }
+ }
+ } else {
+ // split by metric
+ for (String metric : metrics) {
+ // split by time in hour
+ while (startDateTime.isBefore(endDateTime)) {
+ Configuration clone = this.originalConfig.clone();
+ clone.set(Key.COLUMN, columns4RDB);
+ clone.set(Key.METRIC, Collections.singletonList(metric));
+
+ clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
+ startDateTime = startDateTime.plusMillis(splitIntervalMs);
+ // Make sure the time interval is [start, end).
+ clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1);
+ configurations.add(clone);
+
+ LOG.info("Configuration: {}", JSON.toJSONString(clone));
+ }
+ }
+ }
+ return configurations;
+ }
+
+ @Override
+ public void post() {
+ }
+
+ @Override
+ public void destroy() {
+ }
+ }
+
+ public static class Task extends Reader.Task {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+ private String type;
+ private List columns4TSDB = null;
+ private List columns4RDB = null;
+ private List metrics = null;
+ private Map fields;
+ private Map tags;
+ private TSDBConnection conn;
+ private Long startTime;
+ private Long endTime;
+
+ @Override
+ public void init() {
+ Configuration readerSliceConfig = super.getPluginJobConf();
+
+ LOG.info("getPluginJobConf: {}", JSON.toJSONString(readerSliceConfig));
+
+ this.type = readerSliceConfig.getString(Key.SINK_DB_TYPE);
+ if ("TSDB".equals(type)) {
+ columns4TSDB = readerSliceConfig.getList(Key.COLUMN, String.class);
+ } else {
+ columns4RDB = readerSliceConfig.getList(Key.COLUMN, String.class);
+ metrics = readerSliceConfig.getList(Key.METRIC, String.class);
+ }
+ this.fields = readerSliceConfig.getMap(Key.FIELD);
+ this.tags = readerSliceConfig.getMap(Key.TAG);
+
+ String address = readerSliceConfig.getString(Key.ENDPOINT);
+
+ conn = new TSDBConnection(address);
+
+ this.startTime = readerSliceConfig.getLong(Key.BEGIN_DATE_TIME);
+ this.endTime = readerSliceConfig.getLong(Key.END_DATE_TIME);
+ }
+
+ @Override
+ public void prepare() {
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void startRead(RecordSender recordSender) {
+ try {
+ if ("TSDB".equals(type)) {
+ for (String metric : columns4TSDB) {
+ final Map tags = this.tags == null ?
+ null : (Map) this.tags.get(metric);
+ if (fields == null || !fields.containsKey(metric)) {
+ conn.sendDPs(metric, tags, this.startTime, this.endTime, recordSender);
+ } else {
+ conn.sendDPs(metric, (List) fields.get(metric),
+ tags, this.startTime, this.endTime, recordSender);
+ }
+ }
+ } else {
+ for (String metric : metrics) {
+ final Map tags = this.tags == null ?
+ null : (Map) this.tags.get(metric);
+ if (fields == null || !fields.containsKey(metric)) {
+ conn.sendRecords(metric, tags, startTime, endTime, columns4RDB, recordSender);
+ } else {
+ conn.sendRecords(metric, (List) fields.get(metric),
+ tags, startTime, endTime, columns4RDB, recordSender);
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw DataXException.asDataXException(
+ TSDBReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e);
+ }
+ }
+
+ @Override
+ public void post() {
+ }
+
+ @Override
+ public void destroy() {
+ }
+ }
+}
diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReaderErrorCode.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReaderErrorCode.java
new file mode 100755
index 00000000..4ebdcca5
--- /dev/null
+++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReaderErrorCode.java
@@ -0,0 +1,40 @@
+package com.alibaba.datax.plugin.reader.tsdbreader;
+
+import com.alibaba.datax.common.spi.ErrorCode;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:TSDB Reader Error Code
+ *
+ * @author Benedict Jin
+ * @since 2019-10-21
+ */
+public enum TSDBReaderErrorCode implements ErrorCode {
+
+ REQUIRED_VALUE("TSDBReader-00", "缺失必要的值"),
+ ILLEGAL_VALUE("TSDBReader-01", "值非法");
+
+ private final String code;
+ private final String description;
+
+ TSDBReaderErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);
+ }
+}
diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/Connection4TSDB.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/Connection4TSDB.java
new file mode 100644
index 00000000..500894bb
--- /dev/null
+++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/Connection4TSDB.java
@@ -0,0 +1,88 @@
+package com.alibaba.datax.plugin.reader.tsdbreader.conn;
+
+import com.alibaba.datax.common.plugin.RecordSender;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:Connection for TSDB-like databases
+ *
+ * @author Benedict Jin
+ * @since 2019-10-21
+ */
+public interface Connection4TSDB {
+
+ /**
+ * Get the address of Database.
+ *
+ * @return host+ip
+ */
+ String address();
+
+ /**
+ * Get the version of Database.
+ *
+ * @return version
+ */
+ String version();
+
+ /**
+ * Get these configurations.
+ *
+ * @return configs
+ */
+ String config();
+
+ /**
+ * Get the list of supported version.
+ *
+ * @return version list
+ */
+ String[] getSupportVersionPrefix();
+
+ /**
+ * Send data points for TSDB with single field.
+ */
+ void sendDPs(String metric, Map tags, Long start, Long end, RecordSender recordSender) throws Exception;
+
+ /**
+ * Send data points for TSDB with multi fields.
+ */
+ void sendDPs(String metric, List fields, Map tags, Long start, Long end, RecordSender recordSender) throws Exception;
+
+ /**
+ * Send data points for RDB with single field.
+ */
+ void sendRecords(String metric, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender) throws Exception;
+
+ /**
+ * Send data points for RDB with multi fields.
+ */
+ void sendRecords(String metric, List fields, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender) throws Exception;
+
+ /**
+ * Put data point.
+ *
+ * @param dp data point
+ * @return whether the data point is written successfully
+ */
+ boolean put(DataPoint4TSDB dp);
+
+ /**
+ * Put data points.
+ *
+ * @param dps data points
+ * @return whether the data point is written successfully
+ */
+ boolean put(List dps);
+
+ /**
+ * Whether current version is supported.
+ *
+ * @return true: supported; false: not yet!
+ */
+ boolean isSupported();
+}
diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/DataPoint4MultiFieldsTSDB.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/DataPoint4MultiFieldsTSDB.java
new file mode 100644
index 00000000..5b380c73
--- /dev/null
+++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/DataPoint4MultiFieldsTSDB.java
@@ -0,0 +1,68 @@
+package com.alibaba.datax.plugin.reader.tsdbreader.conn;
+
+import com.alibaba.fastjson.JSON;
+
+import java.util.Map;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:DataPoint for TSDB with Multi Fields
+ *
+ * @author Benedict Jin
+ * @since 2019-10-21
+ */
+public class DataPoint4MultiFieldsTSDB {
+
+ private long timestamp;
+ private String metric;
+ private Map tags;
+ private Map fields;
+
+ public DataPoint4MultiFieldsTSDB() {
+ }
+
+ public DataPoint4MultiFieldsTSDB(long timestamp, String metric, Map tags, Map fields) {
+ this.timestamp = timestamp;
+ this.metric = metric;
+ this.tags = tags;
+ this.fields = fields;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public String getMetric() {
+ return metric;
+ }
+
+ public void setMetric(String metric) {
+ this.metric = metric;
+ }
+
+ public Map getTags() {
+ return tags;
+ }
+
+ public void setTags(Map tags) {
+ this.tags = tags;
+ }
+
+ public Map getFields() {
+ return fields;
+ }
+
+ public void setFields(Map fields) {
+ this.fields = fields;
+ }
+
+ @Override
+ public String toString() {
+ return JSON.toJSONString(this);
+ }
+}
diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/DataPoint4TSDB.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/DataPoint4TSDB.java
new file mode 100644
index 00000000..5c5c1349
--- /dev/null
+++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/DataPoint4TSDB.java
@@ -0,0 +1,68 @@
+package com.alibaba.datax.plugin.reader.tsdbreader.conn;
+
+import com.alibaba.fastjson.JSON;
+
+import java.util.Map;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:DataPoint for TSDB
+ *
+ * @author Benedict Jin
+ * @since 2019-10-21
+ */
+public class DataPoint4TSDB {
+
+ private long timestamp;
+ private String metric;
+ private Map tags;
+ private Object value;
+
+ public DataPoint4TSDB() {
+ }
+
+ public DataPoint4TSDB(long timestamp, String metric, Map tags, Object value) {
+ this.timestamp = timestamp;
+ this.metric = metric;
+ this.tags = tags;
+ this.value = value;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public String getMetric() {
+ return metric;
+ }
+
+ public void setMetric(String metric) {
+ this.metric = metric;
+ }
+
+ public Map getTags() {
+ return tags;
+ }
+
+ public void setTags(Map tags) {
+ this.tags = tags;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ public void setValue(Object value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return JSON.toJSONString(this);
+ }
+}
diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/MultiFieldQueryResult.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/MultiFieldQueryResult.java
new file mode 100644
index 00000000..32c2a76e
--- /dev/null
+++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/MultiFieldQueryResult.java
@@ -0,0 +1,64 @@
+package com.alibaba.datax.plugin.reader.tsdbreader.conn;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:Multi Field Query Result
+ *
+ * @author Benedict Jin
+ * @since 2019-10-22
+ */
+public class MultiFieldQueryResult {
+
+ private String metric;
+ private Map tags;
+ private List aggregatedTags;
+ private List columns;
+ private List> values;
+
+ public MultiFieldQueryResult() {
+ }
+
+ public String getMetric() {
+ return metric;
+ }
+
+ public void setMetric(String metric) {
+ this.metric = metric;
+ }
+
+ public Map getTags() {
+ return tags;
+ }
+
+ public void setTags(Map tags) {
+ this.tags = tags;
+ }
+
+ public List getAggregatedTags() {
+ return aggregatedTags;
+ }
+
+ public void setAggregatedTags(List aggregatedTags) {
+ this.aggregatedTags = aggregatedTags;
+ }
+
+ public List getColumns() {
+ return columns;
+ }
+
+ public void setColumns(List columns) {
+ this.columns = columns;
+ }
+
+ public List> getValues() {
+ return values;
+ }
+
+ public void setValues(List> values) {
+ this.values = values;
+ }
+}
diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/QueryResult.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/QueryResult.java
new file mode 100644
index 00000000..8f3e2b59
--- /dev/null
+++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/QueryResult.java
@@ -0,0 +1,64 @@
+package com.alibaba.datax.plugin.reader.tsdbreader.conn;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:Query Result
+ *
+ * @author Benedict Jin
+ * @since 2019-09-19
+ */
+public class QueryResult {
+
+ private String metricName;
+ private Map tags;
+ private List groupByTags;
+ private List aggregatedTags;
+ private Map dps;
+
+ public QueryResult() {
+ }
+
+ public String getMetricName() {
+ return metricName;
+ }
+
+ public void setMetricName(String metricName) {
+ this.metricName = metricName;
+ }
+
+ public Map getTags() {
+ return tags;
+ }
+
+ public void setTags(Map tags) {
+ this.tags = tags;
+ }
+
+ public List getGroupByTags() {
+ return groupByTags;
+ }
+
+ public void setGroupByTags(List groupByTags) {
+ this.groupByTags = groupByTags;
+ }
+
+ public List getAggregatedTags() {
+ return aggregatedTags;
+ }
+
+ public void setAggregatedTags(List aggregatedTags) {
+ this.aggregatedTags = aggregatedTags;
+ }
+
+ public Map getDps() {
+ return dps;
+ }
+
+ public void setDps(Map dps) {
+ this.dps = dps;
+ }
+}
diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnection.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnection.java
new file mode 100644
index 00000000..5426ab49
--- /dev/null
+++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnection.java
@@ -0,0 +1,94 @@
+package com.alibaba.datax.plugin.reader.tsdbreader.conn;
+
+import com.alibaba.datax.common.plugin.RecordSender;
+import com.alibaba.datax.plugin.reader.tsdbreader.util.TSDBUtils;
+import com.alibaba.fastjson.JSON;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:TSDB Connection
+ *
+ * @author Benedict Jin
+ * @since 2019-10-21
+ */
+public class TSDBConnection implements Connection4TSDB {
+
+ private String address;
+
+ public TSDBConnection(String address) {
+ this.address = address;
+ }
+
+ @Override
+ public String address() {
+ return address;
+ }
+
+ @Override
+ public String version() {
+ return TSDBUtils.version(address);
+ }
+
+ @Override
+ public String config() {
+ return TSDBUtils.config(address);
+ }
+
+ @Override
+ public String[] getSupportVersionPrefix() {
+ return new String[]{"2.4", "2.5"};
+ }
+
+ @Override
+ public void sendDPs(String metric, Map tags, Long start, Long end, RecordSender recordSender) throws Exception {
+ TSDBDump.dump4TSDB(this, metric, tags, start, end, recordSender);
+ }
+
+ @Override
+ public void sendDPs(String metric, List fields, Map tags, Long start, Long end, RecordSender recordSender) throws Exception {
+ TSDBDump.dump4TSDB(this, metric, fields, tags, start, end, recordSender);
+ }
+
+ @Override
+ public void sendRecords(String metric, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender) throws Exception {
+ TSDBDump.dump4RDB(this, metric, tags, start, end, columns4RDB, recordSender);
+ }
+
+ @Override
+ public void sendRecords(String metric, List fields, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender) throws Exception {
+ TSDBDump.dump4RDB(this, metric, fields, tags, start, end, columns4RDB, recordSender);
+ }
+
+ @Override
+ public boolean put(DataPoint4TSDB dp) {
+ return false;
+ }
+
+ @Override
+ public boolean put(List dps) {
+ return false;
+ }
+
+ @Override
+ public boolean isSupported() {
+ String versionJson = version();
+ if (StringUtils.isBlank(versionJson)) {
+ throw new RuntimeException("Cannot get the version!");
+ }
+ String version = JSON.parseObject(versionJson).getString("version");
+ if (StringUtils.isBlank(version)) {
+ return false;
+ }
+ for (String prefix : getSupportVersionPrefix()) {
+ if (version.startsWith(prefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBDump.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBDump.java
new file mode 100644
index 00000000..8bae3a70
--- /dev/null
+++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBDump.java
@@ -0,0 +1,318 @@
+package com.alibaba.datax.plugin.reader.tsdbreader.conn;
+
+import com.alibaba.datax.common.element.*;
+import com.alibaba.datax.common.plugin.RecordSender;
+import com.alibaba.datax.plugin.reader.tsdbreader.Constant;
+import com.alibaba.datax.plugin.reader.tsdbreader.util.HttpUtils;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.parser.Feature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:TSDB Dump
+ *
+ * @author Benedict Jin
+ * @since 2019-10-21
+ */
+final class TSDBDump {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TSDBDump.class);
+
+ private static final String QUERY = "/api/query";
+ private static final String QUERY_MULTI_FIELD = "/api/mquery";
+
+ static {
+ JSON.DEFAULT_PARSER_FEATURE &= ~Feature.UseBigDecimal.getMask();
+ }
+
+ private TSDBDump() {
+ }
+
+ static void dump4TSDB(TSDBConnection conn, String metric, Map tags,
+ Long start, Long end, RecordSender sender) throws Exception {
+ LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end);
+
+ String res = queryRange4SingleField(conn, metric, tags, start, end);
+ List dps = getDps4TSDB(metric, res);
+ if (dps == null || dps.isEmpty()) {
+ return;
+ }
+ sendTSDBDps(sender, dps);
+ }
+
+ static void dump4TSDB(TSDBConnection conn, String metric, List fields, Map tags,
+ Long start, Long end, RecordSender sender) throws Exception {
+ LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end);
+
+ String res = queryRange4MultiFields(conn, metric, fields, tags, start, end);
+ List dps = getDps4TSDB(metric, fields, res);
+ if (dps == null || dps.isEmpty()) {
+ return;
+ }
+ sendTSDBDps(sender, dps);
+ }
+
+ static void dump4RDB(TSDBConnection conn, String metric, Map tags,
+ Long start, Long end, List columns4RDB, RecordSender sender) throws Exception {
+ LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end);
+
+ String res = queryRange4SingleField(conn, metric, tags, start, end);
+ List dps = getDps4RDB(metric, res);
+ if (dps == null || dps.isEmpty()) {
+ return;
+ }
+ for (DataPoint4TSDB dp : dps) {
+ final Record record = sender.createRecord();
+ final Map tagKV = dp.getTags();
+
+ for (String column : columns4RDB) {
+ if (Constant.METRIC_SPECIFY_KEY.equals(column)) {
+ record.addColumn(new StringColumn(dp.getMetric()));
+ } else if (Constant.TS_SPECIFY_KEY.equals(column)) {
+ record.addColumn(new LongColumn(dp.getTimestamp()));
+ } else if (Constant.VALUE_SPECIFY_KEY.equals(column)) {
+ record.addColumn(getColumn(dp.getValue()));
+ } else {
+ final Object tagk = tagKV.get(column);
+ if (tagk == null) {
+ continue;
+ }
+ record.addColumn(getColumn(tagk));
+ }
+ }
+ sender.sendToWriter(record);
+ }
+ }
+
+ static void dump4RDB(TSDBConnection conn, String metric, List fields,
+ Map tags, Long start, Long end,
+ List columns4RDB, RecordSender sender) throws Exception {
+ LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end);
+
+ String res = queryRange4MultiFields(conn, metric, fields, tags, start, end);
+ List dps = getDps4RDB(metric, fields, res);
+ if (dps == null || dps.isEmpty()) {
+ return;
+ }
+ for (DataPoint4TSDB dp : dps) {
+ final Record record = sender.createRecord();
+ final Map tagKV = dp.getTags();
+
+ for (String column : columns4RDB) {
+ if (Constant.METRIC_SPECIFY_KEY.equals(column)) {
+ record.addColumn(new StringColumn(dp.getMetric()));
+ } else if (Constant.TS_SPECIFY_KEY.equals(column)) {
+ record.addColumn(new LongColumn(dp.getTimestamp()));
+ } else {
+ final Object tagvOrField = tagKV.get(column);
+ if (tagvOrField == null) {
+ continue;
+ }
+ record.addColumn(getColumn(tagvOrField));
+ }
+ }
+ sender.sendToWriter(record);
+ }
+ }
+
+ private static Column getColumn(Object value) throws Exception {
+ Column valueColumn;
+ if (value instanceof Double) {
+ valueColumn = new DoubleColumn((Double) value);
+ } else if (value instanceof Long) {
+ valueColumn = new LongColumn((Long) value);
+ } else if (value instanceof String) {
+ valueColumn = new StringColumn((String) value);
+ } else {
+ throw new Exception(String.format("value 不支持类型: [%s]", value.getClass().getSimpleName()));
+ }
+ return valueColumn;
+ }
+
+ private static String queryRange4SingleField(TSDBConnection conn, String metric, Map tags,
+ Long start, Long end) throws Exception {
+ String tagKV = getFilterByTags(tags);
+ String body = "{\n" +
+ " \"start\": " + start + ",\n" +
+ " \"end\": " + end + ",\n" +
+ " \"queries\": [\n" +
+ " {\n" +
+ " \"aggregator\": \"none\",\n" +
+ " \"metric\": \"" + metric + "\"\n" +
+ (tagKV == null ? "" : tagKV) +
+ " }\n" +
+ " ]\n" +
+ "}";
+ return HttpUtils.post(conn.address() + QUERY, body);
+ }
+
+ private static String queryRange4MultiFields(TSDBConnection conn, String metric, List fields,
+ Map tags, Long start, Long end) throws Exception {
+ // fields
+ StringBuilder fieldBuilder = new StringBuilder();
+ fieldBuilder.append("\"fields\":[");
+ for (int i = 0; i < fields.size(); i++) {
+ fieldBuilder.append("{\"field\": \"").append(fields.get(i)).append("\",\"aggregator\": \"none\"}");
+ if (i != fields.size() - 1) {
+ fieldBuilder.append(",");
+ }
+ }
+ fieldBuilder.append("]");
+ // tagkv
+ String tagKV = getFilterByTags(tags);
+ String body = "{\n" +
+ " \"start\": " + start + ",\n" +
+ " \"end\": " + end + ",\n" +
+ " \"queries\": [\n" +
+ " {\n" +
+ " \"aggregator\": \"none\",\n" +
+ " \"metric\": \"" + metric + "\",\n" +
+ fieldBuilder.toString() +
+ (tagKV == null ? "" : tagKV) +
+ " }\n" +
+ " ]\n" +
+ "}";
+ return HttpUtils.post(conn.address() + QUERY_MULTI_FIELD, body);
+ }
+
+ private static String getFilterByTags(Map tags) {
+ if (tags != null && !tags.isEmpty()) {
+ // tagKV = ",\"tags:\":" + JSON.toJSONString(tags);
+ StringBuilder tagBuilder = new StringBuilder();
+ tagBuilder.append(",\"filters\":[");
+ int count = 1;
+ final int size = tags.size();
+ for (Map.Entry entry : tags.entrySet()) {
+ final String tagK = entry.getKey();
+ final String tagV = entry.getValue();
+ tagBuilder.append("{\"type\":\"literal_or\",\"tagk\":\"").append(tagK)
+ .append("\",\"filter\":\"").append(tagV).append("\",\"groupBy\":false}");
+ if (count != size) {
+ tagBuilder.append(",");
+ }
+ count++;
+ }
+ tagBuilder.append("]");
+ return tagBuilder.toString();
+ }
+ return null;
+ }
+
+ private static List getDps4TSDB(String metric, String dps) {
+ final List jsonArray = JSON.parseArray(dps, QueryResult.class);
+ if (jsonArray.size() == 0) {
+ return null;
+ }
+ List dpsArr = new LinkedList<>();
+ for (QueryResult queryResult : jsonArray) {
+ final Map tags = queryResult.getTags();
+ final Map points = queryResult.getDps();
+ for (Map.Entry entry : points.entrySet()) {
+ final String ts = entry.getKey();
+ final Object value = entry.getValue();
+
+ DataPoint4TSDB dp = new DataPoint4TSDB();
+ dp.setMetric(metric);
+ dp.setTags(tags);
+ dp.setTimestamp(Long.parseLong(ts));
+ dp.setValue(value);
+ dpsArr.add(dp.toString());
+ }
+ }
+ return dpsArr;
+ }
+
+ private static List getDps4TSDB(String metric, List fields, String dps) {
+ final List jsonArray = JSON.parseArray(dps, MultiFieldQueryResult.class);
+ if (jsonArray.size() == 0) {
+ return null;
+ }
+ List dpsArr = new LinkedList<>();
+ for (MultiFieldQueryResult queryResult : jsonArray) {
+ final Map tags = queryResult.getTags();
+ final List> values = queryResult.getValues();
+ for (List