Add TSDB Reader

This commit is contained in:
asdf2014 2019-11-08 16:52:39 +08:00
parent 0af1933802
commit dd19fd4332
24 changed files with 2289 additions and 0 deletions

View File

@ -63,6 +63,7 @@
<module>rdbmsreader</module>
<module>hbase11xreader</module>
<module>hbase094xreader</module>
<module>tsdbreader</module>
<module>opentsdbreader</module>
<module>cassandrareader</module>

View File

@ -0,0 +1,587 @@
# TSDBReader 插件文档
___
## 1 快速介绍
TSDBReader 插件实现了从阿里云 TSDB 读取数据。阿里云时间序列数据库 ( **T**ime **S**eries **D**ata**b**ase , 简称 TSDB) 是一种集时序数据高效读写,压缩存储,实时计算能力为一体的数据库服务,可广泛应用于物联网和互联网领域,实现对设备及业务服务的实时监控,实时预测告警。详见 TSDB 的阿里云[官网](https://cn.aliyun.com/product/hitsdb)。
## 2 实现原理
在底层实现上TSDBReader 通过 HTTP 请求链接到 阿里云 TSDB 实例,利用 `/api/query` 或者 `/api/mquery` 接口将数据点扫描出来(更多细节详见:[时序数据库 TSDB - HTTP API 概览](https://help.aliyun.com/document_detail/63557.html))。而整个同步的过程,是通过时间线和查询时间线范围进行切分。
## 3 功能说明
### 3.1 配置样例
* 配置一个从 阿里云 TSDB 数据库同步抽取数据到本地的作业,并以**时序数据**的格式输出:
时序数据样例:
```json
{"metric":"m","tags":{"app":"a19","cluster":"c5","group":"g10","ip":"i999","zone":"z1"},"timestamp":1546272263,"value":1}
```
```json
{
"job": {
"content": [
{
"reader": {
"name": "tsdbreader",
"parameter": {
"sinkDbType": "TSDB",
"endpoint": "http://localhost:8242",
"column": [
"m"
],
"splitIntervalMs": 60000,
"beginDateTime": "2019-01-01 00:00:00",
"endDateTime": "2019-01-01 01:00:00"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 3
}
}
}
}
```
* 配置一个从 阿里云 TSDB 数据库同步抽取数据到本地的作业,并以**关系型数据**的格式输出:
关系型数据样例:
```txt
m 1546272125 a1 c1 g2 i3021 z4 1.0
```
```json
{
"job": {
"content": [
{
"reader": {
"name": "tsdbreader",
"parameter": {
"sinkDbType": "RDB",
"endpoint": "http://localhost:8242",
"column": [
"__metric__",
"__ts__",
"app",
"cluster",
"group",
"ip",
"zone",
"__value__"
],
"metric": [
"m"
],
"splitIntervalMs": 60000,
"beginDateTime": "2019-01-01 00:00:00",
"endDateTime": "2019-01-01 01:00:00"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 3
}
}
}
}
```
* 配置一个从 阿里云 TSDB 数据库同步抽取**单值**数据到 ADB 的作业:
```json
{
"job": {
"content": [
{
"reader": {
"name": "tsdbreader",
"parameter": {
"sinkDbType": "RDB",
"endpoint": "http://localhost:8242",
"column": [
"__metric__",
"__ts__",
"app",
"cluster",
"group",
"ip",
"zone",
"__value__"
],
"metric": [
"m"
],
"splitIntervalMs": 60000,
"beginDateTime": "2019-01-01 00:00:00",
"endDateTime": "2019-01-01 01:00:00"
}
},
"writer": {
"name": "adswriter",
"parameter": {
"username": "******",
"password": "******",
"column": [
"`metric`",
"`ts`",
"`app`",
"`cluster`",
"`group`",
"`ip`",
"`zone`",
"`value`"
],
"url": "http://localhost:3306",
"schema": "datax_test",
"table": "datax_test",
"writeMode": "insert",
"opIndex": "0",
"batchSize": "2"
}
}
}
],
"setting": {
"speed": {
"channel": 3
}
}
}
}
```
* 配置一个从 阿里云 TSDB 数据库同步抽取**多值**数据到 ADB 的作业:
```json
{
"job": {
"content": [
{
"reader": {
"name": "tsdbreader",
"parameter": {
"sinkDbType": "RDB",
"endpoint": "http://localhost:8242",
"column": [
"__metric__",
"__ts__",
"app",
"cluster",
"group",
"ip",
"zone",
"load",
"memory",
"cpu"
],
"metric": [
"m_field"
],
"field": {
"m_field": [
"load",
"memory",
"cpu"
]
},
"splitIntervalMs": 60000,
"beginDateTime": "2019-01-01 00:00:00",
"endDateTime": "2019-01-01 01:00:00"
}
},
"writer": {
"name": "adswriter",
"parameter": {
"username": "******",
"password": "******",
"column": [
"`metric`",
"`ts`",
"`app`",
"`cluster`",
"`group`",
"`ip`",
"`zone`",
"`load`",
"`memory`",
"`cpu`"
],
"url": "http://localhost:3306",
"schema": "datax_test",
"table": "datax_test_multi_field",
"writeMode": "insert",
"opIndex": "0",
"batchSize": "2"
}
}
}
],
"setting": {
"speed": {
"channel": 3
}
}
}
}
```
* 配置一个从 阿里云 TSDB 数据库同步抽取**单值**数据到 ADB 的作业,并指定过滤部分时间线:
```json
{
"job": {
"content": [
{
"reader": {
"name": "tsdbreader",
"parameter": {
"sinkDbType": "RDB",
"endpoint": "http://localhost:8242",
"column": [
"__metric__",
"__ts__",
"app",
"cluster",
"group",
"ip",
"zone",
"__value__"
],
"metric": [
"m"
],
"tag": {
"m": {
"app": "a1",
"cluster": "c1"
}
},
"splitIntervalMs": 60000,
"beginDateTime": "2019-01-01 00:00:00",
"endDateTime": "2019-01-01 01:00:00"
}
},
"writer": {
"name": "adswriter",
"parameter": {
"username": "******",
"password": "******",
"column": [
"`metric`",
"`ts`",
"`app`",
"`cluster`",
"`group`",
"`ip`",
"`zone`",
"`value`"
],
"url": "http://localhost:3306",
"schema": "datax_test",
"table": "datax_test",
"writeMode": "insert",
"opIndex": "0",
"batchSize": "2"
}
}
}
],
"setting": {
"speed": {
"channel": 3
}
}
}
}
```
* 配置一个从 阿里云 TSDB 数据库同步抽取**多值**数据到 ADB 的作业,并指定过滤部分时间线:
```json
{
"job": {
"content": [
{
"reader": {
"name": "tsdbreader",
"parameter": {
"sinkDbType": "RDB",
"endpoint": "http://localhost:8242",
"column": [
"__metric__",
"__ts__",
"app",
"cluster",
"group",
"ip",
"zone",
"load",
"memory",
"cpu"
],
"metric": [
"m_field"
],
"field": {
"m_field": [
"load",
"memory",
"cpu"
]
},
"tag": {
"m_field": {
"ip": "i999"
}
},
"splitIntervalMs": 60000,
"beginDateTime": "2019-01-01 00:00:00",
"endDateTime": "2019-01-01 01:00:00"
}
},
"writer": {
"name": "adswriter",
"parameter": {
"username": "******",
"password": "******",
"column": [
"`metric`",
"`ts`",
"`app`",
"`cluster`",
"`group`",
"`ip`",
"`zone`",
"`load`",
"`memory`",
"`cpu`"
],
"url": "http://localhost:3306",
"schema": "datax_test",
"table": "datax_test_multi_field",
"writeMode": "insert",
"opIndex": "0",
"batchSize": "2"
}
}
}
],
"setting": {
"speed": {
"channel": 3
}
}
}
}
```
* 配置一个从 阿里云 TSDB 数据库同步抽取**单值**数据到另一个 阿里云 TSDB 数据库 的作业:
```json
{
"job": {
"content": [
{
"reader": {
"name": "tsdbreader",
"parameter": {
"sinkDbType": "TSDB",
"endpoint": "http://localhost:8242",
"column": [
"m"
],
"splitIntervalMs": 60000,
"beginDateTime": "2019-01-01 00:00:00",
"endDateTime": "2019-01-01 01:00:00"
}
},
"writer": {
"name": "tsdbwriter",
"parameter": {
"endpoint": "http://localhost:8240"
}
}
}
],
"setting": {
"speed": {
"channel": 3
}
}
}
}
```
* 配置一个从 阿里云 TSDB 数据库同步抽取**多值**数据到另一个 阿里云 TSDB 数据库 的作业:
```json
{
"job": {
"content": [
{
"reader": {
"name": "tsdbreader",
"parameter": {
"sinkDbType": "TSDB",
"endpoint": "http://localhost:8242",
"column": [
"m_field"
],
"field": {
"m_field": [
"load",
"memory",
"cpu"
]
},
"splitIntervalMs": 60000,
"beginDateTime": "2019-01-01 00:00:00",
"endDateTime": "2019-01-01 01:00:00"
}
},
"writer": {
"name": "tsdbwriter",
"parameter": {
"multiField": true,
"endpoint": "http://localhost:8240"
}
}
}
],
"setting": {
"speed": {
"channel": 3
}
}
}
}
```
### 3.2 参数说明
* **name**
* 描述:本插件的名称
* 必选:是
* 默认值tsdbreader
* **parameter**
* **sinkDbType**
* 描述:目标数据库的类型
* 必选:否
* 默认值TSDB
* 注意:目前支持 TSDB 和 RDB 两个取值。其中TSDB 包括 阿里云 TSDB、OpenTSDB、InfluxDB、Prometheus 和 TimeScale。RDB 包括 ADB、MySQL、Oracle、PostgreSQL 和 DRDS 等。
* **endpoint**
* 描述:阿里云 TSDB 的 HTTP 连接地址
* 必选:是
* 格式http://IP:Port
* 默认值:无
* **column**
* 描述TSDB 场景下:数据迁移任务需要迁移的 Metric 列表RDB 场景下:映射到关系型数据库中的表字段,且增加 `__metric__`、`__ts__` 和 `__value__` 三个字段,其中 `__metric__` 用于映射度量字段,`__ts__` 用于映射 timestamp 字段,而 `__value__` 仅适用于单值场景,用于映射度量值,多值场景下,直接指定 field 字段即可
* 必选:是
* 默认值:无
* **metric**
* 描述:仅适用于 RDB 场景下,表示数据迁移任务需要迁移的 Metric 列表
* 必选:否
* 默认值:无
* **field**
* 描述:仅适用于多值场景下,表示数据迁移任务需要迁移的 Field 列表
* 必选:否
* 默认值:无
* **tag**
* 描述:数据迁移任务需要迁移的 TagK 和 TagV用于进一步过滤时间线
* 必选:否
* 默认值:无
* **splitIntervalMs**
* 描述:用于 DataX 内部切分 Task每个 Task 只查询一小部分的时间段
* 必选:是
* 默认值:无
* 注意:单位是 ms 毫秒
* **beginDateTime**
* 描述:和 endDateTime 配合使用,用于指定哪个时间段内的数据点,需要被迁移
* 必选:是
* 格式:`yyyy-MM-dd HH:mm:ss`
* 默认值:无
* 注意:指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的 [3:35, 4:55) 会被转为 [3:00, 4:00)
* **endDateTime**
* 描述:和 beginDateTime 配合使用,用于指定哪个时间段内的数据点,需要被迁移
* 必选:是
* 格式:`yyyy-MM-dd HH:mm:ss`
* 默认值:无
* 注意:指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的 [3:35, 4:55) 会被转为 [3:00, 4:00)
### 3.3 类型转换
| DataX 内部类型 | TSDB 数据类型 |
| -------------- | ------------------------------------------------------------ |
| String | TSDB 数据点序列化字符串,包括 timestamp、metric、tags、fields 和 value |
## 4 约束限制
### 4.2 如果存在某一个 Metric 下在一个小时范围内的数据量过大,可能需要通过 `-j` 参数调整 JVM 内存大小
考虑到下游 Writer 如果写入速度不及 TSDB Reader 的查询数据,可能会存在积压的情况,因此需要适当地调整 JVM 参数。以"从 阿里云 TSDB 数据库同步抽取数据到本地的作业"为例,启动命令如下:
```bash
python datax/bin/datax.py tsdb2stream.json -j "-Xms4096m -Xmx4096m"
```
### 4.3 指定起止时间会自动被转为整点时刻
指定起止时间会自动被转为整点时刻,例如 2019-4-18 的 `[3:35, 3:55)` 会被转为 `[3:00, 4:00)`

146
tsdbreader/pom.xml Normal file
View File

@ -0,0 +1,146 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>tsdbreader</artifactId>
<name>tsdbreader</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- common -->
<commons-lang3.version>3.3.2</commons-lang3.version>
<!-- http -->
<httpclient.version>4.4</httpclient.version>
<commons-io.version>2.4</commons-io.version>
<!-- json -->
<fastjson.version>1.2.28</fastjson.version>
<!-- test -->
<junit4.version>4.12</junit4.version>
<!-- time -->
<joda-time.version>2.9.9</joda-time.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>fastjson</artifactId>
<groupId>com.alibaba</groupId>
</exclusion>
<exclusion>
<artifactId>commons-math3</artifactId>
<groupId>org.apache.commons</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<!-- common -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<!-- http -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>fluent-hc</artifactId>
<version>${httpclient.version}</version>
</dependency>
<!-- json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- time -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda-time.version}</version>
</dependency>
<!-- test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit4.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,35 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/reader/tsdbreader</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>tsdbreader-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/reader/tsdbreader</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/reader/tsdbreader/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,29 @@
package com.alibaba.datax.plugin.reader.tsdbreader;
import java.util.HashSet;
import java.util.Set;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* FunctionConstant
*
* @author Benedict Jin
* @since 2019-10-21
*/
public final class Constant {
static final String DEFAULT_DATA_FORMAT = "yyyy-MM-dd HH:mm:ss";
public static final String METRIC_SPECIFY_KEY = "__metric__";
public static final String TS_SPECIFY_KEY = "__ts__";
public static final String VALUE_SPECIFY_KEY = "__value__";
static final Set<String> MUST_CONTAINED_SPECIFY_KEYS = new HashSet<>();
static {
MUST_CONTAINED_SPECIFY_KEYS.add(METRIC_SPECIFY_KEY);
MUST_CONTAINED_SPECIFY_KEYS.add(TS_SPECIFY_KEY);
// __value__ 在多值场景下可以不指定
}
}

View File

@ -0,0 +1,36 @@
package com.alibaba.datax.plugin.reader.tsdbreader;
import java.util.HashSet;
import java.util.Set;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* FunctionKey
*
* @author Benedict Jin
* @since 2019-10-21
*/
public class Key {
// TSDB for OpenTSDB / InfluxDB / TimeScale / Prometheus etc.
// RDB for MySQL / ADB etc.
static final String SINK_DB_TYPE = "sinkDbType";
static final String ENDPOINT = "endpoint";
static final String COLUMN = "column";
static final String METRIC = "metric";
static final String FIELD = "field";
static final String TAG = "tag";
static final String INTERVAL_DATE_TIME = "splitIntervalMs";
static final String BEGIN_DATE_TIME = "beginDateTime";
static final String END_DATE_TIME = "endDateTime";
static final Integer INTERVAL_DATE_TIME_DEFAULT_VALUE = 60;
static final String TYPE_DEFAULT_VALUE = "TSDB";
static final Set<String> TYPE_SET = new HashSet<>();
static {
TYPE_SET.add("TSDB");
TYPE_SET.add("RDB");
}
}

View File

@ -0,0 +1,320 @@
package com.alibaba.datax.plugin.reader.tsdbreader;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.reader.tsdbreader.conn.TSDBConnection;
import com.alibaba.datax.plugin.reader.tsdbreader.util.TimeUtils;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* FunctionTSDB Reader
*
* @author Benedict Jin
* @since 2019-10-21
*/
@SuppressWarnings("unused")
public class TSDBReader extends Reader {
public static class Job extends Reader.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig;
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
String type = originalConfig.getString(Key.SINK_DB_TYPE, Key.TYPE_DEFAULT_VALUE);
if (StringUtils.isBlank(type)) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.SINK_DB_TYPE + "] is not set.");
}
if (!Key.TYPE_SET.contains(type)) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.SINK_DB_TYPE + "] should be one of [" +
JSON.toJSONString(Key.TYPE_SET) + "].");
}
String address = originalConfig.getString(Key.ENDPOINT);
if (StringUtils.isBlank(address)) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.ENDPOINT + "] is not set.");
}
// tagK / field could be empty
if ("TSDB".equals(type)) {
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
if (columns == null || columns.isEmpty()) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.COLUMN + "] is not set.");
}
} else {
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
if (columns == null || columns.isEmpty()) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.COLUMN + "] is not set.");
}
for (String specifyKey : Constant.MUST_CONTAINED_SPECIFY_KEYS) {
if (!columns.contains(specifyKey)) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.COLUMN + "] should contain "
+ JSON.toJSONString(Constant.MUST_CONTAINED_SPECIFY_KEYS) + ".");
}
}
final List<String> metrics = originalConfig.getList(Key.METRIC, String.class);
if (metrics == null || metrics.isEmpty()) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.METRIC + "] is not set.");
}
}
Integer splitIntervalMs = originalConfig.getInt(Key.INTERVAL_DATE_TIME,
Key.INTERVAL_DATE_TIME_DEFAULT_VALUE);
if (splitIntervalMs <= 0) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.INTERVAL_DATE_TIME + "] should be great than zero.");
}
SimpleDateFormat format = new SimpleDateFormat(Constant.DEFAULT_DATA_FORMAT);
String startTime = originalConfig.getString(Key.BEGIN_DATE_TIME);
Long startDate;
if (startTime == null || startTime.trim().length() == 0) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.BEGIN_DATE_TIME + "] is not set.");
} else {
try {
startDate = format.parse(startTime).getTime();
} catch (ParseException e) {
throw DataXException.asDataXException(TSDBReaderErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.BEGIN_DATE_TIME +
"] needs to conform to the [" + Constant.DEFAULT_DATA_FORMAT + "] format.");
}
}
String endTime = originalConfig.getString(Key.END_DATE_TIME);
Long endDate;
if (endTime == null || endTime.trim().length() == 0) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.END_DATE_TIME + "] is not set.");
} else {
try {
endDate = format.parse(endTime).getTime();
} catch (ParseException e) {
throw DataXException.asDataXException(TSDBReaderErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.END_DATE_TIME +
"] needs to conform to the [" + Constant.DEFAULT_DATA_FORMAT + "] format.");
}
}
if (startDate >= endDate) {
throw DataXException.asDataXException(TSDBReaderErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.BEGIN_DATE_TIME +
"] should be less than the parameter [" + Key.END_DATE_TIME + "].");
}
}
@Override
public void prepare() {
}
@Override
public List<Configuration> split(int adviceNumber) {
List<Configuration> configurations = new ArrayList<>();
// get metrics
String type = originalConfig.getString(Key.SINK_DB_TYPE, Key.TYPE_DEFAULT_VALUE);
List<String> columns4TSDB = null;
List<String> columns4RDB = null;
List<String> metrics = null;
if ("TSDB".equals(type)) {
columns4TSDB = originalConfig.getList(Key.COLUMN, String.class);
} else {
columns4RDB = originalConfig.getList(Key.COLUMN, String.class);
metrics = originalConfig.getList(Key.METRIC, String.class);
}
// get time interval
Integer splitIntervalMs = originalConfig.getInt(Key.INTERVAL_DATE_TIME,
Key.INTERVAL_DATE_TIME_DEFAULT_VALUE);
// get time range
SimpleDateFormat format = new SimpleDateFormat(Constant.DEFAULT_DATA_FORMAT);
long startTime;
try {
startTime = format.parse(originalConfig.getString(Key.BEGIN_DATE_TIME)).getTime();
} catch (ParseException e) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.ILLEGAL_VALUE, "解析[" + Key.BEGIN_DATE_TIME + "]失败.", e);
}
long endTime;
try {
endTime = format.parse(originalConfig.getString(Key.END_DATE_TIME)).getTime();
} catch (ParseException e) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.ILLEGAL_VALUE, "解析[" + Key.END_DATE_TIME + "]失败.", e);
}
if (TimeUtils.isSecond(startTime)) {
startTime *= 1000;
}
if (TimeUtils.isSecond(endTime)) {
endTime *= 1000;
}
DateTime startDateTime = new DateTime(TimeUtils.getTimeInHour(startTime));
DateTime endDateTime = new DateTime(TimeUtils.getTimeInHour(endTime));
if ("TSDB".equals(type)) {
// split by metric
for (String column : columns4TSDB) {
// split by time in hour
while (startDateTime.isBefore(endDateTime)) {
Configuration clone = this.originalConfig.clone();
clone.set(Key.COLUMN, Collections.singletonList(column));
clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
startDateTime = startDateTime.plusMillis(splitIntervalMs);
// Make sure the time interval is [start, end).
clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1);
configurations.add(clone);
LOG.info("Configuration: {}", JSON.toJSONString(clone));
}
}
} else {
// split by metric
for (String metric : metrics) {
// split by time in hour
while (startDateTime.isBefore(endDateTime)) {
Configuration clone = this.originalConfig.clone();
clone.set(Key.COLUMN, columns4RDB);
clone.set(Key.METRIC, Collections.singletonList(metric));
clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
startDateTime = startDateTime.plusMillis(splitIntervalMs);
// Make sure the time interval is [start, end).
clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1);
configurations.add(clone);
LOG.info("Configuration: {}", JSON.toJSONString(clone));
}
}
}
return configurations;
}
@Override
public void post() {
}
@Override
public void destroy() {
}
}
public static class Task extends Reader.Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private String type;
private List<String> columns4TSDB = null;
private List<String> columns4RDB = null;
private List<String> metrics = null;
private Map<String, Object> fields;
private Map<String, Object> tags;
private TSDBConnection conn;
private Long startTime;
private Long endTime;
@Override
public void init() {
Configuration readerSliceConfig = super.getPluginJobConf();
LOG.info("getPluginJobConf: {}", JSON.toJSONString(readerSliceConfig));
this.type = readerSliceConfig.getString(Key.SINK_DB_TYPE);
if ("TSDB".equals(type)) {
columns4TSDB = readerSliceConfig.getList(Key.COLUMN, String.class);
} else {
columns4RDB = readerSliceConfig.getList(Key.COLUMN, String.class);
metrics = readerSliceConfig.getList(Key.METRIC, String.class);
}
this.fields = readerSliceConfig.getMap(Key.FIELD);
this.tags = readerSliceConfig.getMap(Key.TAG);
String address = readerSliceConfig.getString(Key.ENDPOINT);
conn = new TSDBConnection(address);
this.startTime = readerSliceConfig.getLong(Key.BEGIN_DATE_TIME);
this.endTime = readerSliceConfig.getLong(Key.END_DATE_TIME);
}
@Override
public void prepare() {
}
@Override
@SuppressWarnings("unchecked")
public void startRead(RecordSender recordSender) {
try {
if ("TSDB".equals(type)) {
for (String metric : columns4TSDB) {
final Map<String, String> tags = this.tags == null ?
null : (Map<String, String>) this.tags.get(metric);
if (fields == null || !fields.containsKey(metric)) {
conn.sendDPs(metric, tags, this.startTime, this.endTime, recordSender);
} else {
conn.sendDPs(metric, (List<String>) fields.get(metric),
tags, this.startTime, this.endTime, recordSender);
}
}
} else {
for (String metric : metrics) {
final Map<String, String> tags = this.tags == null ?
null : (Map<String, String>) this.tags.get(metric);
if (fields == null || !fields.containsKey(metric)) {
conn.sendRecords(metric, tags, startTime, endTime, columns4RDB, recordSender);
} else {
conn.sendRecords(metric, (List<String>) fields.get(metric),
tags, startTime, endTime, columns4RDB, recordSender);
}
}
}
} catch (Exception e) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e);
}
}
@Override
public void post() {
}
@Override
public void destroy() {
}
}
}

View File

@ -0,0 +1,40 @@
package com.alibaba.datax.plugin.reader.tsdbreader;
import com.alibaba.datax.common.spi.ErrorCode;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* FunctionTSDB Reader Error Code
*
* @author Benedict Jin
* @since 2019-10-21
*/
public enum TSDBReaderErrorCode implements ErrorCode {
REQUIRED_VALUE("TSDBReader-00", "缺失必要的值"),
ILLEGAL_VALUE("TSDBReader-01", "值非法");
private final String code;
private final String description;
TSDBReaderErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);
}
}

View File

@ -0,0 +1,88 @@
package com.alibaba.datax.plugin.reader.tsdbreader.conn;
import com.alibaba.datax.common.plugin.RecordSender;
import java.util.List;
import java.util.Map;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* FunctionConnection for TSDB-like databases
*
* @author Benedict Jin
* @since 2019-10-21
*/
public interface Connection4TSDB {
/**
* Get the address of Database.
*
* @return host+ip
*/
String address();
/**
* Get the version of Database.
*
* @return version
*/
String version();
/**
* Get these configurations.
*
* @return configs
*/
String config();
/**
* Get the list of supported version.
*
* @return version list
*/
String[] getSupportVersionPrefix();
/**
* Send data points for TSDB with single field.
*/
void sendDPs(String metric, Map<String, String> tags, Long start, Long end, RecordSender recordSender) throws Exception;
/**
* Send data points for TSDB with multi fields.
*/
void sendDPs(String metric, List<String> fields, Map<String, String> tags, Long start, Long end, RecordSender recordSender) throws Exception;
/**
* Send data points for RDB with single field.
*/
void sendRecords(String metric, Map<String, String> tags, Long start, Long end, List<String> columns4RDB, RecordSender recordSender) throws Exception;
/**
* Send data points for RDB with multi fields.
*/
void sendRecords(String metric, List<String> fields, Map<String, String> tags, Long start, Long end, List<String> columns4RDB, RecordSender recordSender) throws Exception;
/**
* Put data point.
*
* @param dp data point
* @return whether the data point is written successfully
*/
boolean put(DataPoint4TSDB dp);
/**
* Put data points.
*
* @param dps data points
* @return whether the data point is written successfully
*/
boolean put(List<DataPoint4TSDB> dps);
/**
* Whether current version is supported.
*
* @return true: supported; false: not yet!
*/
boolean isSupported();
}

View File

@ -0,0 +1,68 @@
package com.alibaba.datax.plugin.reader.tsdbreader.conn;
import com.alibaba.fastjson.JSON;
import java.util.Map;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* FunctionDataPoint for TSDB with Multi Fields
*
* @author Benedict Jin
* @since 2019-10-21
*/
public class DataPoint4MultiFieldsTSDB {
private long timestamp;
private String metric;
private Map<String, Object> tags;
private Map<String, Object> fields;
public DataPoint4MultiFieldsTSDB() {
}
public DataPoint4MultiFieldsTSDB(long timestamp, String metric, Map<String, Object> tags, Map<String, Object> fields) {
this.timestamp = timestamp;
this.metric = metric;
this.tags = tags;
this.fields = fields;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public String getMetric() {
return metric;
}
public void setMetric(String metric) {
this.metric = metric;
}
public Map<String, Object> getTags() {
return tags;
}
public void setTags(Map<String, Object> tags) {
this.tags = tags;
}
public Map<String, Object> getFields() {
return fields;
}
public void setFields(Map<String, Object> fields) {
this.fields = fields;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,68 @@
package com.alibaba.datax.plugin.reader.tsdbreader.conn;
import com.alibaba.fastjson.JSON;
import java.util.Map;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* FunctionDataPoint for TSDB
*
* @author Benedict Jin
* @since 2019-10-21
*/
public class DataPoint4TSDB {
private long timestamp;
private String metric;
private Map<String, Object> tags;
private Object value;
public DataPoint4TSDB() {
}
public DataPoint4TSDB(long timestamp, String metric, Map<String, Object> tags, Object value) {
this.timestamp = timestamp;
this.metric = metric;
this.tags = tags;
this.value = value;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public String getMetric() {
return metric;
}
public void setMetric(String metric) {
this.metric = metric;
}
public Map<String, Object> getTags() {
return tags;
}
public void setTags(Map<String, Object> tags) {
this.tags = tags;
}
public Object getValue() {
return value;
}
public void setValue(Object value) {
this.value = value;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,64 @@
package com.alibaba.datax.plugin.reader.tsdbreader.conn;
import java.util.List;
import java.util.Map;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* FunctionMulti Field Query Result
*
* @author Benedict Jin
* @since 2019-10-22
*/
public class MultiFieldQueryResult {
private String metric;
private Map<String, Object> tags;
private List<String> aggregatedTags;
private List<String> columns;
private List<List<Object>> values;
public MultiFieldQueryResult() {
}
public String getMetric() {
return metric;
}
public void setMetric(String metric) {
this.metric = metric;
}
public Map<String, Object> getTags() {
return tags;
}
public void setTags(Map<String, Object> tags) {
this.tags = tags;
}
public List<String> getAggregatedTags() {
return aggregatedTags;
}
public void setAggregatedTags(List<String> aggregatedTags) {
this.aggregatedTags = aggregatedTags;
}
public List<String> getColumns() {
return columns;
}
public void setColumns(List<String> columns) {
this.columns = columns;
}
public List<List<Object>> getValues() {
return values;
}
public void setValues(List<List<Object>> values) {
this.values = values;
}
}

View File

@ -0,0 +1,64 @@
package com.alibaba.datax.plugin.reader.tsdbreader.conn;
import java.util.List;
import java.util.Map;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* FunctionQuery Result
*
* @author Benedict Jin
* @since 2019-09-19
*/
public class QueryResult {
private String metricName;
private Map<String, Object> tags;
private List<String> groupByTags;
private List<String> aggregatedTags;
private Map<String, Object> dps;
public QueryResult() {
}
public String getMetricName() {
return metricName;
}
public void setMetricName(String metricName) {
this.metricName = metricName;
}
public Map<String, Object> getTags() {
return tags;
}
public void setTags(Map<String, Object> tags) {
this.tags = tags;
}
public List<String> getGroupByTags() {
return groupByTags;
}
public void setGroupByTags(List<String> groupByTags) {
this.groupByTags = groupByTags;
}
public List<String> getAggregatedTags() {
return aggregatedTags;
}
public void setAggregatedTags(List<String> aggregatedTags) {
this.aggregatedTags = aggregatedTags;
}
public Map<String, Object> getDps() {
return dps;
}
public void setDps(Map<String, Object> dps) {
this.dps = dps;
}
}

View File

@ -0,0 +1,94 @@
package com.alibaba.datax.plugin.reader.tsdbreader.conn;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.plugin.reader.tsdbreader.util.TSDBUtils;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
import java.util.Map;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* FunctionTSDB Connection
*
* @author Benedict Jin
* @since 2019-10-21
*/
public class TSDBConnection implements Connection4TSDB {
private String address;
public TSDBConnection(String address) {
this.address = address;
}
@Override
public String address() {
return address;
}
@Override
public String version() {
return TSDBUtils.version(address);
}
@Override
public String config() {
return TSDBUtils.config(address);
}
@Override
public String[] getSupportVersionPrefix() {
return new String[]{"2.4", "2.5"};
}
@Override
public void sendDPs(String metric, Map<String, String> tags, Long start, Long end, RecordSender recordSender) throws Exception {
TSDBDump.dump4TSDB(this, metric, tags, start, end, recordSender);
}
@Override
public void sendDPs(String metric, List<String> fields, Map<String, String> tags, Long start, Long end, RecordSender recordSender) throws Exception {
TSDBDump.dump4TSDB(this, metric, fields, tags, start, end, recordSender);
}
@Override
public void sendRecords(String metric, Map<String, String> tags, Long start, Long end, List<String> columns4RDB, RecordSender recordSender) throws Exception {
TSDBDump.dump4RDB(this, metric, tags, start, end, columns4RDB, recordSender);
}
@Override
public void sendRecords(String metric, List<String> fields, Map<String, String> tags, Long start, Long end, List<String> columns4RDB, RecordSender recordSender) throws Exception {
TSDBDump.dump4RDB(this, metric, fields, tags, start, end, columns4RDB, recordSender);
}
@Override
public boolean put(DataPoint4TSDB dp) {
return false;
}
@Override
public boolean put(List<DataPoint4TSDB> dps) {
return false;
}
@Override
public boolean isSupported() {
String versionJson = version();
if (StringUtils.isBlank(versionJson)) {
throw new RuntimeException("Cannot get the version!");
}
String version = JSON.parseObject(versionJson).getString("version");
if (StringUtils.isBlank(version)) {
return false;
}
for (String prefix : getSupportVersionPrefix()) {
if (version.startsWith(prefix)) {
return true;
}
}
return false;
}
}

View File

@ -0,0 +1,318 @@
package com.alibaba.datax.plugin.reader.tsdbreader.conn;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.plugin.reader.tsdbreader.Constant;
import com.alibaba.datax.plugin.reader.tsdbreader.util.HttpUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* FunctionTSDB Dump
*
* @author Benedict Jin
* @since 2019-10-21
*/
final class TSDBDump {
private static final Logger LOG = LoggerFactory.getLogger(TSDBDump.class);
private static final String QUERY = "/api/query";
private static final String QUERY_MULTI_FIELD = "/api/mquery";
static {
JSON.DEFAULT_PARSER_FEATURE &= ~Feature.UseBigDecimal.getMask();
}
private TSDBDump() {
}
static void dump4TSDB(TSDBConnection conn, String metric, Map<String, String> tags,
Long start, Long end, RecordSender sender) throws Exception {
LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end);
String res = queryRange4SingleField(conn, metric, tags, start, end);
List<String> dps = getDps4TSDB(metric, res);
if (dps == null || dps.isEmpty()) {
return;
}
sendTSDBDps(sender, dps);
}
static void dump4TSDB(TSDBConnection conn, String metric, List<String> fields, Map<String, String> tags,
Long start, Long end, RecordSender sender) throws Exception {
LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end);
String res = queryRange4MultiFields(conn, metric, fields, tags, start, end);
List<String> dps = getDps4TSDB(metric, fields, res);
if (dps == null || dps.isEmpty()) {
return;
}
sendTSDBDps(sender, dps);
}
static void dump4RDB(TSDBConnection conn, String metric, Map<String, String> tags,
Long start, Long end, List<String> columns4RDB, RecordSender sender) throws Exception {
LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end);
String res = queryRange4SingleField(conn, metric, tags, start, end);
List<DataPoint4TSDB> dps = getDps4RDB(metric, res);
if (dps == null || dps.isEmpty()) {
return;
}
for (DataPoint4TSDB dp : dps) {
final Record record = sender.createRecord();
final Map<String, Object> tagKV = dp.getTags();
for (String column : columns4RDB) {
if (Constant.METRIC_SPECIFY_KEY.equals(column)) {
record.addColumn(new StringColumn(dp.getMetric()));
} else if (Constant.TS_SPECIFY_KEY.equals(column)) {
record.addColumn(new LongColumn(dp.getTimestamp()));
} else if (Constant.VALUE_SPECIFY_KEY.equals(column)) {
record.addColumn(getColumn(dp.getValue()));
} else {
final Object tagk = tagKV.get(column);
if (tagk == null) {
continue;
}
record.addColumn(getColumn(tagk));
}
}
sender.sendToWriter(record);
}
}
static void dump4RDB(TSDBConnection conn, String metric, List<String> fields,
Map<String, String> tags, Long start, Long end,
List<String> columns4RDB, RecordSender sender) throws Exception {
LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end);
String res = queryRange4MultiFields(conn, metric, fields, tags, start, end);
List<DataPoint4TSDB> dps = getDps4RDB(metric, fields, res);
if (dps == null || dps.isEmpty()) {
return;
}
for (DataPoint4TSDB dp : dps) {
final Record record = sender.createRecord();
final Map<String, Object> tagKV = dp.getTags();
for (String column : columns4RDB) {
if (Constant.METRIC_SPECIFY_KEY.equals(column)) {
record.addColumn(new StringColumn(dp.getMetric()));
} else if (Constant.TS_SPECIFY_KEY.equals(column)) {
record.addColumn(new LongColumn(dp.getTimestamp()));
} else {
final Object tagvOrField = tagKV.get(column);
if (tagvOrField == null) {
continue;
}
record.addColumn(getColumn(tagvOrField));
}
}
sender.sendToWriter(record);
}
}
private static Column getColumn(Object value) throws Exception {
Column valueColumn;
if (value instanceof Double) {
valueColumn = new DoubleColumn((Double) value);
} else if (value instanceof Long) {
valueColumn = new LongColumn((Long) value);
} else if (value instanceof String) {
valueColumn = new StringColumn((String) value);
} else {
throw new Exception(String.format("value 不支持类型: [%s]", value.getClass().getSimpleName()));
}
return valueColumn;
}
private static String queryRange4SingleField(TSDBConnection conn, String metric, Map<String, String> tags,
Long start, Long end) throws Exception {
String tagKV = getFilterByTags(tags);
String body = "{\n" +
" \"start\": " + start + ",\n" +
" \"end\": " + end + ",\n" +
" \"queries\": [\n" +
" {\n" +
" \"aggregator\": \"none\",\n" +
" \"metric\": \"" + metric + "\"\n" +
(tagKV == null ? "" : tagKV) +
" }\n" +
" ]\n" +
"}";
return HttpUtils.post(conn.address() + QUERY, body);
}
private static String queryRange4MultiFields(TSDBConnection conn, String metric, List<String> fields,
Map<String, String> tags, Long start, Long end) throws Exception {
// fields
StringBuilder fieldBuilder = new StringBuilder();
fieldBuilder.append("\"fields\":[");
for (int i = 0; i < fields.size(); i++) {
fieldBuilder.append("{\"field\": \"").append(fields.get(i)).append("\",\"aggregator\": \"none\"}");
if (i != fields.size() - 1) {
fieldBuilder.append(",");
}
}
fieldBuilder.append("]");
// tagkv
String tagKV = getFilterByTags(tags);
String body = "{\n" +
" \"start\": " + start + ",\n" +
" \"end\": " + end + ",\n" +
" \"queries\": [\n" +
" {\n" +
" \"aggregator\": \"none\",\n" +
" \"metric\": \"" + metric + "\",\n" +
fieldBuilder.toString() +
(tagKV == null ? "" : tagKV) +
" }\n" +
" ]\n" +
"}";
return HttpUtils.post(conn.address() + QUERY_MULTI_FIELD, body);
}
private static String getFilterByTags(Map<String, String> tags) {
if (tags != null && !tags.isEmpty()) {
// tagKV = ",\"tags:\":" + JSON.toJSONString(tags);
StringBuilder tagBuilder = new StringBuilder();
tagBuilder.append(",\"filters\":[");
int count = 1;
final int size = tags.size();
for (Map.Entry<String, String> entry : tags.entrySet()) {
final String tagK = entry.getKey();
final String tagV = entry.getValue();
tagBuilder.append("{\"type\":\"literal_or\",\"tagk\":\"").append(tagK)
.append("\",\"filter\":\"").append(tagV).append("\",\"groupBy\":false}");
if (count != size) {
tagBuilder.append(",");
}
count++;
}
tagBuilder.append("]");
return tagBuilder.toString();
}
return null;
}
private static List<String> getDps4TSDB(String metric, String dps) {
final List<QueryResult> jsonArray = JSON.parseArray(dps, QueryResult.class);
if (jsonArray.size() == 0) {
return null;
}
List<String> dpsArr = new LinkedList<>();
for (QueryResult queryResult : jsonArray) {
final Map<String, Object> tags = queryResult.getTags();
final Map<String, Object> points = queryResult.getDps();
for (Map.Entry<String, Object> entry : points.entrySet()) {
final String ts = entry.getKey();
final Object value = entry.getValue();
DataPoint4TSDB dp = new DataPoint4TSDB();
dp.setMetric(metric);
dp.setTags(tags);
dp.setTimestamp(Long.parseLong(ts));
dp.setValue(value);
dpsArr.add(dp.toString());
}
}
return dpsArr;
}
private static List<String> getDps4TSDB(String metric, List<String> fields, String dps) {
final List<MultiFieldQueryResult> jsonArray = JSON.parseArray(dps, MultiFieldQueryResult.class);
if (jsonArray.size() == 0) {
return null;
}
List<String> dpsArr = new LinkedList<>();
for (MultiFieldQueryResult queryResult : jsonArray) {
final Map<String, Object> tags = queryResult.getTags();
final List<List<Object>> values = queryResult.getValues();
for (List<Object> value : values) {
final String ts = value.get(0).toString();
Map<String, Object> fieldsAndValues = new HashMap<>();
for (int i = 0; i < fields.size(); i++) {
fieldsAndValues.put(fields.get(i), value.get(i + 1));
}
final DataPoint4MultiFieldsTSDB dp = new DataPoint4MultiFieldsTSDB();
dp.setMetric(metric);
dp.setTimestamp(Long.parseLong(ts));
dp.setTags(tags);
dp.setFields(fieldsAndValues);
dpsArr.add(dp.toString());
}
}
return dpsArr;
}
private static List<DataPoint4TSDB> getDps4RDB(String metric, String dps) {
final List<QueryResult> jsonArray = JSON.parseArray(dps, QueryResult.class);
if (jsonArray.size() == 0) {
return null;
}
List<DataPoint4TSDB> dpsArr = new LinkedList<>();
for (QueryResult queryResult : jsonArray) {
final Map<String, Object> tags = queryResult.getTags();
final Map<String, Object> points = queryResult.getDps();
for (Map.Entry<String, Object> entry : points.entrySet()) {
final String ts = entry.getKey();
final Object value = entry.getValue();
final DataPoint4TSDB dp = new DataPoint4TSDB();
dp.setMetric(metric);
dp.setTags(tags);
dp.setTimestamp(Long.parseLong(ts));
dp.setValue(value);
dpsArr.add(dp);
}
}
return dpsArr;
}
private static List<DataPoint4TSDB> getDps4RDB(String metric, List<String> fields, String dps) {
final List<MultiFieldQueryResult> jsonArray = JSON.parseArray(dps, MultiFieldQueryResult.class);
if (jsonArray.size() == 0) {
return null;
}
List<DataPoint4TSDB> dpsArr = new LinkedList<>();
for (MultiFieldQueryResult queryResult : jsonArray) {
final Map<String, Object> tags = queryResult.getTags();
final List<List<Object>> values = queryResult.getValues();
for (List<Object> value : values) {
final String ts = value.get(0).toString();
Map<String, Object> tagsTmp = new HashMap<>(tags);
for (int i = 0; i < fields.size(); i++) {
tagsTmp.put(fields.get(i), value.get(i + 1));
}
final DataPoint4TSDB dp = new DataPoint4TSDB();
dp.setMetric(metric);
dp.setTimestamp(Long.parseLong(ts));
dp.setTags(tagsTmp);
dpsArr.add(dp);
}
}
return dpsArr;
}
private static void sendTSDBDps(RecordSender sender, List<String> dps) {
for (String dp : dps) {
StringColumn tsdbColumn = new StringColumn(dp);
Record record = sender.createRecord();
record.addColumn(tsdbColumn);
sender.sendToWriter(record);
}
}
}

View File

@ -0,0 +1,67 @@
package com.alibaba.datax.plugin.reader.tsdbreader.util;
import com.alibaba.fastjson.JSON;
import org.apache.http.client.fluent.Content;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.ContentType;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* FunctionHttpUtils
*
* @author Benedict Jin
* @since 2019-10-21
*/
public final class HttpUtils {
public final static int CONNECT_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60);
public final static int SOCKET_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60);
private HttpUtils() {
}
public static String get(String url) throws Exception {
Content content = Request.Get(url)
.connectTimeout(CONNECT_TIMEOUT_DEFAULT_IN_MILL)
.socketTimeout(SOCKET_TIMEOUT_DEFAULT_IN_MILL)
.execute()
.returnContent();
if (content == null) {
return null;
}
return content.asString(StandardCharsets.UTF_8);
}
public static String post(String url, Map<String, Object> params) throws Exception {
return post(url, JSON.toJSONString(params), CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
}
public static String post(String url, String params) throws Exception {
return post(url, params, CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
}
public static String post(String url, Map<String, Object> params,
int connectTimeoutInMill, int socketTimeoutInMill) throws Exception {
return post(url, JSON.toJSONString(params), connectTimeoutInMill, socketTimeoutInMill);
}
public static String post(String url, String params,
int connectTimeoutInMill, int socketTimeoutInMill) throws Exception {
Content content = Request.Post(url)
.connectTimeout(connectTimeoutInMill)
.socketTimeout(socketTimeoutInMill)
.addHeader("Content-Type", "application/json")
.bodyString(params, ContentType.APPLICATION_JSON)
.execute()
.returnContent();
if (content == null) {
return null;
}
return content.asString(StandardCharsets.UTF_8);
}
}

View File

@ -0,0 +1,68 @@
package com.alibaba.datax.plugin.reader.tsdbreader.util;
import com.alibaba.datax.plugin.reader.tsdbreader.conn.DataPoint4TSDB;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* FunctionTSDB Utils
*
* @author Benedict Jin
* @since 2019-10-21
*/
public final class TSDBUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(TSDBUtils.class);
private TSDBUtils() {
}
public static String version(String address) {
String url = String.format("%s/api/version", address);
String rsp;
try {
rsp = HttpUtils.get(url);
} catch (Exception e) {
throw new RuntimeException(e);
}
return rsp;
}
public static String config(String address) {
String url = String.format("%s/api/config", address);
String rsp;
try {
rsp = HttpUtils.get(url);
} catch (Exception e) {
throw new RuntimeException(e);
}
return rsp;
}
public static boolean put(String address, List<DataPoint4TSDB> dps) {
return put(address, JSON.toJSON(dps));
}
public static boolean put(String address, DataPoint4TSDB dp) {
return put(address, JSON.toJSON(dp));
}
private static boolean put(String address, Object o) {
String url = String.format("%s/api/put", address);
String rsp;
try {
rsp = HttpUtils.post(url, o.toString());
// If successful, the returned content should be null.
assert rsp == null;
} catch (Exception e) {
LOGGER.error("Address: {}, DataPoints: {}", url, o);
throw new RuntimeException(e);
}
return true;
}
}

View File

@ -0,0 +1,38 @@
package com.alibaba.datax.plugin.reader.tsdbreader.util;
import java.util.concurrent.TimeUnit;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* FunctionTimeUtils
*
* @author Benedict Jin
* @since 2019-10-21
*/
public final class TimeUtils {
private TimeUtils() {
}
private static final long SECOND_MASK = 0xFFFFFFFF00000000L;
private static final long HOUR_IN_MILL = TimeUnit.HOURS.toMillis(1);
/**
* Weather the timestamp is second.
*
* @param ts timestamp
*/
public static boolean isSecond(long ts) {
return (ts & SECOND_MASK) == 0;
}
/**
* Get the hour.
*
* @param ms time in millisecond
*/
public static long getTimeInHour(long ms) {
return ms - ms % HOUR_IN_MILL;
}
}

View File

@ -0,0 +1,10 @@
{
"name": "tsdbreader",
"class": "com.alibaba.datax.plugin.reader.tsdbreader.TSDBReader",
"description": {
"useScene": "从 TSDB 中摄取数据点",
"mechanism": "通过 /api/query 接口查询出符合条件的数据点",
"warn": "指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的 [3:35, 4:55) 会被转为 [3:00, 4:00)"
},
"developer": "Benedict Jin"
}

View File

@ -0,0 +1,29 @@
{
"name": "tsdbreader",
"parameter": {
"sinkDbType": "RDB",
"endpoint": "http://localhost:8242",
"column": [
"__metric__",
"__ts__",
"app",
"cluster",
"group",
"ip",
"zone",
"__value__"
],
"metric": [
"m"
],
"tag": {
"m": {
"app": "a1",
"cluster": "c1"
}
},
"splitIntervalMs": 60000,
"beginDateTime": "2019-01-01 00:00:00",
"endDateTime": "2019-01-01 01:00:00"
}
}

View File

@ -0,0 +1,30 @@
package com.alibaba.datax.plugin.reader.tsdbreader.conn;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* FunctionTSDB Connection4TSDB Test
*
* @author Benedict Jin
* @since 2019-10-21
*/
@Ignore
public class TSDBConnectionTest {
private static final String TSDB_ADDRESS = "http://localhost:8242";
@Test
public void testVersion() {
String version = new TSDBConnection(TSDB_ADDRESS).version();
Assert.assertNotNull(version);
}
@Test
public void testIsSupported() {
Assert.assertTrue(new TSDBConnection(TSDB_ADDRESS).isSupported());
}
}

View File

@ -0,0 +1,17 @@
package com.alibaba.datax.plugin.reader.tsdbreader.util;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* FunctionConst
*
* @author Benedict Jin
* @since 2019-10-21
*/
final class Const {
private Const() {
}
static final String TSDB_ADDRESS = "http://localhost:8242";
}

View File

@ -0,0 +1,39 @@
package com.alibaba.datax.plugin.reader.tsdbreader.util;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* FunctionHttpUtils Test
*
* @author Benedict Jin
* @since 2019-10-21
*/
@Ignore
public class HttpUtilsTest {
@Test
public void testSimpleCase() throws Exception {
String url = "https://httpbin.org/post";
Map<String, Object> params = new HashMap<>();
params.put("foo", "bar");
String rsp = HttpUtils.post(url, params);
System.out.println(rsp);
Assert.assertNotNull(rsp);
}
@Test
public void testGet() throws Exception {
String url = String.format("%s/api/version", Const.TSDB_ADDRESS);
String rsp = HttpUtils.get(url);
System.out.println(rsp);
Assert.assertNotNull(rsp);
}
}

View File

@ -0,0 +1,33 @@
package com.alibaba.datax.plugin.reader.tsdbreader.util;
import org.junit.Assert;
import org.junit.Test;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Functioncom.alibaba.datax.common.util
*
* @author Benedict Jin
* @since 2019-10-21
*/
public class TimeUtilsTest {
@Test
public void testIsSecond() {
Assert.assertFalse(TimeUtils.isSecond(System.currentTimeMillis()));
Assert.assertTrue(TimeUtils.isSecond(System.currentTimeMillis() / 1000));
}
@Test
public void testGetTimeInHour() throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = sdf.parse("2019-04-18 15:32:33");
long timeInHour = TimeUtils.getTimeInHour(date.getTime());
Assert.assertEquals("2019-04-18 15:00:00", sdf.format(timeInHour));
}
}