diff --git a/.gitignore b/.gitignore
index fbfffba8..925cf0ab 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,8 +1,157 @@
-/target/
-.classpath
-.project
-.settings
+# Created by .ignore support plugin (hsz.mobi)
.DS_Store
-/logs/
-.idea/
+.AppleDouble
+.LSOverride
+Icon
+._*
+.DocumentRevisions-V100
+.fseventsd
+.Spotlight-V100
+.TemporaryItems
+.Trashes
+.VolumeIcon.icns
+.com.apple.timemachine.donotpresent
+.AppleDB
+.AppleDesktop
+Network Trash Folder
+Temporary Items
+.apdisk
+*.class
+*.log
+*.ctxt
+.mtj.tmp/
+*.jar
+*.war
+*.nar
+*.ear
+*.zip
+*.tar.gz
+*.rar
+hs_err_pid*
+.idea/**/workspace.xml
+.idea/**/tasks.xml
+.idea/**/dictionaries
+.idea/**/shelf
+.idea/**/dataSources/
+.idea/**/dataSources.ids
+.idea/**/dataSources.local.xml
+.idea/**/sqlDataSources.xml
+.idea/**/dynamic.xml
+.idea/**/uiDesigner.xml
+.idea/**/dbnavigator.xml
+.idea/**/gradle.xml
+.idea/**/libraries
+cmake-build-debug/
+cmake-build-release/
+.idea/**/mongoSettings.xml
+*.iws
+out/
+.idea_modules/
+atlassian-ide-plugin.xml
+.idea/replstate.xml
+com_crashlytics_export_strings.xml
+crashlytics.properties
+crashlytics-build.properties
+fabric.properties
+.idea/httpRequests
+target/
+pom.xml.tag
+pom.xml.releaseBackup
+pom.xml.versionsBackup
+pom.xml.next
+release.properties
+dependency-reduced-pom.xml
+buildNumber.properties
+.mvn/timing.properties
+!/.mvn/wrapper/maven-wrapper.jar
+.idea
*.iml
+out
+gen### Python template
+__pycache__/
+*.py[cod]
+*$py.class
+*.so
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+MANIFEST
+*.manifest
+*.spec
+pip-log.txt
+pip-delete-this-directory.txt
+htmlcov/
+.tox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*.cover
+.hypothesis/
+.pytest_cache/
+*.mo
+*.pot
+*.log
+local_settings.py
+db.sqlite3
+instance/
+.webassets-cache
+.scrapy
+docs/_build/
+target/
+.ipynb_checkpoints
+.python-version
+celerybeat-schedule
+*.sage.py
+.env
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+.spyderproject
+.spyproject
+.ropeproject
+/site
+.mypy_cache/
+.metadata
+bin/
+tmp/
+*.tmp
+*.bak
+*.swp
+*~.nib
+local.properties
+.settings/
+.loadpath
+.recommenders
+.externalToolBuilders/
+*.launch
+*.pydevproject
+.cproject
+.autotools
+.factorypath
+.buildpath
+.target
+.tern-project
+.texlipse
+.springBeans
+.recommenders/
+.cache-main
+.scala_dependencies
+.worksheet
diff --git a/opentsdbreader/doc/opentsdbreader.md b/opentsdbreader/doc/opentsdbreader.md
new file mode 100644
index 00000000..d9fe742b
--- /dev/null
+++ b/opentsdbreader/doc/opentsdbreader.md
@@ -0,0 +1,209 @@
+
+# OpenTSDBReader 插件文档
+
+___
+
+
+## 1 快速介绍
+
+OpenTSDBReader 插件实现了从 OpenTSDB 读取数据。OpenTSDB 是主要由 Yahoo 维护的、可扩展的、分布式时序数据库,与阿里巴巴自研 TSDB 的关系与区别详见阿里云官网:《[相比 OpenTSDB 优势](https://help.aliyun.com/document_detail/113368.html)》
+
+
+
+## 2 实现原理
+
+在底层实现上,OpenTSDBReader 通过 HTTP 请求链接到 OpenTSDB 实例,利用 `/api/config` 接口获取到其底层存储 HBase 的连接信息,再利用 AsyncHBase 框架连接 HBase,通过 Scan 的方式将数据点扫描出来。整个同步的过程通过 metric 和时间段进行切分,即某个 metric 在某一个小时内的数据迁移,组合成一个迁移 Task。
+
+
+
+## 3 功能说明
+
+### 3.1 配置样例
+
+* 配置一个从 OpenTSDB 数据库同步抽取数据到本地的作业:
+
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "opentsdbreader",
+ "parameter": {
+ "endpoint": "http://localhost:4242",
+ "column": [
+ "m"
+ ],
+ "beginDateTime": "2019-01-01 00:00:00",
+ "endDateTime": "2019-01-01 03:00:00"
+ }
+ },
+ "writer": {
+ "name": "streamwriter",
+ "parameter": {
+ "encoding": "UTF-8",
+ "print": true
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
+```
+
+
+
+### 3.2 参数说明
+
+* **name**
+ * 描述:本插件的名称
+ * 必选:是
+ * 默认值:opentsdbreader
+
+* **parameter**
+ * **endpoint**
+ * 描述:OpenTSDB 的 HTTP 连接地址
+ * 必选:是
+ * 格式:http://IP:Port
+* 默认值:无
+
+ * **column**
+ * 描述:数据迁移任务需要迁移的 Metric 列表
+ * 必选:是
+ * 默认值:无
+
+* **beginDateTime**
+ * 描述:和 endDateTime 配合使用,用于指定哪个时间段内的数据点,需要被迁移
+ * 必选:是
+ * 格式:`yyyy-MM-dd HH:mm:ss`
+ * 默认值:无
+ * 注意:指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的 [3:35, 4:55) 会被转为 [3:00, 4:00)
+
+* **endDateTime**
+ * 描述:和 beginDateTime 配合使用,用于指定哪个时间段内的数据点,需要被迁移
+ * 必选:是
+ * 格式:`yyyy-MM-dd HH:mm:ss`
+ * 默认值:无
+ * 注意:指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的 [3:35, 4:55) 会被转为 [3:00, 4:00)
+
+
+
+
+### 3.3 类型转换
+
+| DataX 内部类型 | TSDB 数据类型 |
+| -------------- | ------------------------------------------------------------ |
+| String | TSDB 数据点序列化字符串,包括 timestamp、metric、tags 和 value |
+
+
+
+
+
+## 4 性能报告
+
+### 4.1 环境准备
+
+#### 4.1.1 数据特征
+从 Metric、时间线、Value 和 采集周期 四个方面来描述:
+
+##### metric
+
+固定指定一个 metric 为 `m`。
+
+##### tagkv
+
+前四个 tagkv 全排列,形成 `10 * 20 * 100 * 100 = 2000000` 条时间线,最后 IP 对应 2000000 条时间线从 1 开始自增。
+
+| **tag_k** | **tag_v** |
+| --------- | ------------- |
+| zone | z1~z10 |
+| cluster | c1~c20 |
+| group | g1~100 |
+| app | a1~a100 |
+| ip | ip1~ip2000000 |
+
+##### value
+
+度量值为 [1, 100] 区间内的随机值
+
+##### interval
+
+采集周期为 10 秒,持续摄入 3 小时,总数据量为 `3 * 60 * 60 / 10 * 2000000 = 2,160,000,000` 个数据点。
+
+
+
+#### 4.1.2 机器参数
+
+OpenTSDB Reader 机型: 64C256G
+
+HBase 机型: 8C16G * 5
+
+
+#### 4.1.3 DataX jvm 参数
+
+"-Xms4096m -Xmx4096m"
+
+
+
+
+### 4.2 测试报告
+
+
+| 通道数| DataX 速度 (Rec/s) |DataX 流量 (MB/s)|
+|--------| --------|--------|
+|1| 215428 | 25.65 |
+|2| 424994 | 50.60 |
+|3| 603132 | 71.81 |
+
+
+
+
+
+
+## 5 约束限制
+
+### 5.1 需要确保与 OpenTSDB 底层存储的网络是连通的
+
+具体缘由详见 6.1
+
+
+
+### 5.2 如果存在某一个 Metric 下在一个小时范围内的数据量过大,可能需要通过 `-j` 参数调整 JVM 内存大小
+
+考虑到下游 Writer 如果写入速度不及 OpenTSDB reader 的查询数据,可能会存在积压的情况,因此需要适当地调整 JVM 参数。以"从 OpenTSDB 数据库同步抽取数据到本地的作业"为例,启动命令如下:
+
+```bash
+ python datax/bin/datax.py opentsdb2stream.json -j "-Xms4096m -Xmx4096m"
+```
+
+
+
+### 5.3 指定起止时间会自动被转为整点时刻
+
+指定起止时间会自动被转为整点时刻,例如 2019-4-18 的 `[3:35, 3:55)` 会被转为 `[3:00, 4:00)`
+
+
+
+### 5.4 目前只支持兼容 OpenTSDB 2.3.x
+
+其他版本暂不保证兼容
+
+
+
+
+
+## 6 FAQ
+
+***
+
+**Q:为什么需要连接 OpenTSDB 的底层存储,为什么不直接使用 `/api/query` 查询获取数据点?**
+
+A:因为通过 OpenTSDB 的 HTTP 接口(`/api/query`)来读取数据的话,经内部压测发现,在大数据量的情况下,会导致 OpenTSDB 的异步框架会报 CallBack 过多的问题;所以,采用了直连底层 HBase 存储,通过 Scan 的方式来扫描数据点,来避免这个问题。另外,还考虑到,可以通过指定 metric 和时间范围,可以顺序地 Scan HBase 表,提高查询效率。
+
+
+
diff --git a/opentsdbreader/pom.xml b/opentsdbreader/pom.xml
new file mode 100644
index 00000000..bcbf8414
--- /dev/null
+++ b/opentsdbreader/pom.xml
@@ -0,0 +1,156 @@
+
+
+ 4.0.0
+
+
+ com.alibaba.datax
+ datax-all
+ 0.0.1-SNAPSHOT
+
+
+ opentsdbreader
+ opentsdbreader
+ jar
+
+
+ UTF-8
+
+
+ 3.3.2
+
+
+ 4.4
+ 2.4
+
+
+ 1.2.28
+
+
+ 2.3.2
+
+
+ 4.12
+
+
+ 2.9.9
+
+
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax-project-version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ fastjson
+ com.alibaba
+
+
+ commons-math3
+ org.apache.commons
+
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+ ch.qos.logback
+ logback-classic
+
+
+
+
+ org.apache.commons
+ commons-lang3
+ ${commons-lang3.version}
+
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ ${httpclient.version}
+
+
+ commons-io
+ commons-io
+ ${commons-io.version}
+
+
+ org.apache.httpcomponents
+ fluent-hc
+ ${httpclient.version}
+
+
+
+
+ com.alibaba
+ fastjson
+ ${fastjson.version}
+
+
+
+
+ net.opentsdb
+ opentsdb
+ ${opentsdb.version}
+
+
+
+
+ joda-time
+ joda-time
+ ${joda-time.version}
+
+
+
+
+ junit
+ junit
+ ${junit4.version}
+ test
+
+
+
+
+
+
+
+ maven-compiler-plugin
+
+ 1.6
+ 1.6
+ ${project-sourceEncoding}
+
+
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
diff --git a/opentsdbreader/src/main/assembly/package.xml b/opentsdbreader/src/main/assembly/package.xml
new file mode 100755
index 00000000..f4ac3b4b
--- /dev/null
+++ b/opentsdbreader/src/main/assembly/package.xml
@@ -0,0 +1,35 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/reader/opentsdbreader
+
+
+ target/
+
+ opentsdbreader-0.0.1-SNAPSHOT.jar
+
+ plugin/reader/opentsdbreader
+
+
+
+
+
+ false
+ plugin/reader/opentsdbreader/libs
+ runtime
+
+
+
diff --git a/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/CliQuery.java b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/CliQuery.java
new file mode 100644
index 00000000..fe8dce2b
--- /dev/null
+++ b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/CliQuery.java
@@ -0,0 +1,104 @@
+package com.alibaba.datax.plugin.reader.conn;
+
+import net.opentsdb.core.*;
+import net.opentsdb.utils.DateTime;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:CliQuery
+ *
+ * @author Benedict Jin
+ * @since 2019-04-17
+ */
+final class CliQuery {
+
+ /**
+ * Parses the query from the command lines.
+ *
+ * @param args The command line arguments.
+ * @param tsdb The TSDB to use.
+ * @param queries The list in which {@link Query}s will be appended.
+ */
+ static void parseCommandLineQuery(final String[] args,
+ final TSDB tsdb,
+ final ArrayList queries) {
+ long start_ts = DateTime.parseDateTimeString(args[0], null);
+ if (start_ts >= 0) {
+ start_ts /= 1000;
+ }
+ long end_ts = -1;
+ if (args.length > 3) {
+ // see if we can detect an end time
+ try {
+ if (args[1].charAt(0) != '+' && (args[1].indexOf(':') >= 0
+ || args[1].indexOf('/') >= 0 || args[1].indexOf('-') >= 0
+ || Long.parseLong(args[1]) > 0)) {
+ end_ts = DateTime.parseDateTimeString(args[1], null);
+ }
+ } catch (NumberFormatException ignore) {
+ // ignore it as it means the third parameter is likely the aggregator
+ }
+ }
+ // temp fixup to seconds from ms until the rest of TSDB supports ms
+ // Note you can't append this to the DateTime.parseDateTimeString() call as
+ // it clobbers -1 results
+ if (end_ts >= 0) {
+ end_ts /= 1000;
+ }
+
+ int i = end_ts < 0 ? 1 : 2;
+ while (i < args.length && args[i].charAt(0) == '+') {
+ i++;
+ }
+
+ while (i < args.length) {
+ final Aggregator agg = Aggregators.get(args[i++]);
+ final boolean rate = "rate".equals(args[i]);
+ RateOptions rate_options = new RateOptions(false, Long.MAX_VALUE,
+ RateOptions.DEFAULT_RESET_VALUE);
+ if (rate) {
+ i++;
+
+ long counterMax = Long.MAX_VALUE;
+ long resetValue = RateOptions.DEFAULT_RESET_VALUE;
+ if (args[i].startsWith("counter")) {
+ String[] parts = Tags.splitString(args[i], ',');
+ if (parts.length >= 2 && parts[1].length() > 0) {
+ counterMax = Long.parseLong(parts[1]);
+ }
+ if (parts.length >= 3 && parts[2].length() > 0) {
+ resetValue = Long.parseLong(parts[2]);
+ }
+ rate_options = new RateOptions(true, counterMax, resetValue);
+ i++;
+ }
+ }
+ final boolean downsample = "downsample".equals(args[i]);
+ if (downsample) {
+ i++;
+ }
+ final long interval = downsample ? Long.parseLong(args[i++]) : 0;
+ final Aggregator sampler = downsample ? Aggregators.get(args[i++]) : null;
+ final String metric = args[i++];
+ final HashMap tags = new HashMap();
+ while (i < args.length && args[i].indexOf(' ', 1) < 0
+ && args[i].indexOf('=', 1) > 0) {
+ Tags.parse(tags, args[i++]);
+ }
+ final Query query = tsdb.newQuery();
+ query.setStartTime(start_ts);
+ if (end_ts > 0) {
+ query.setEndTime(end_ts);
+ }
+ query.setTimeSeries(metric, tags, agg, rate, rate_options);
+ if (downsample) {
+ query.downsample(interval, sampler);
+ }
+ queries.add(query);
+ }
+ }
+}
diff --git a/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/Connection4TSDB.java b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/Connection4TSDB.java
new file mode 100644
index 00000000..97a841cf
--- /dev/null
+++ b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/Connection4TSDB.java
@@ -0,0 +1,77 @@
+package com.alibaba.datax.plugin.reader.conn;
+
+import com.alibaba.datax.common.plugin.RecordSender;
+
+import java.util.List;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:Connection for TSDB-like databases
+ *
+ * @author Benedict Jin
+ * @since 2019-03-29
+ */
+public interface Connection4TSDB {
+
+ /**
+ * Get the address of Database.
+ *
+ * @return host+ip
+ */
+ String address();
+
+ /**
+ * Get the version of Database.
+ *
+ * @return version
+ */
+ String version();
+
+ /**
+ * Get these configurations.
+ *
+ * @return configs
+ */
+ String config();
+
+ /**
+ * Get the list of supported version.
+ *
+ * @return version list
+ */
+ String[] getSupportVersionPrefix();
+
+ /**
+ * Send data points by metric & start time & end time.
+ *
+ * @param metric metric
+ * @param start startTime
+ * @param end endTime
+ * @param recordSender sender
+ */
+ void sendDPs(String metric, Long start, Long end, RecordSender recordSender) throws Exception;
+
+ /**
+ * Put data point.
+ *
+ * @param dp data point
+ * @return whether the data point is written successfully
+ */
+ boolean put(DataPoint4TSDB dp);
+
+ /**
+ * Put data points.
+ *
+ * @param dps data points
+ * @return whether the data point is written successfully
+ */
+ boolean put(List dps);
+
+ /**
+ * Whether current version is supported.
+ *
+ * @return true: supported; false: not yet!
+ */
+ boolean isSupported();
+}
diff --git a/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/DataPoint4TSDB.java b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/DataPoint4TSDB.java
new file mode 100644
index 00000000..1f690245
--- /dev/null
+++ b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/DataPoint4TSDB.java
@@ -0,0 +1,68 @@
+package com.alibaba.datax.plugin.reader.conn;
+
+import com.alibaba.fastjson.JSON;
+
+import java.util.Map;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:DataPoint for TSDB
+ *
+ * @author Benedict Jin
+ * @since 2019-04-10
+ */
+public class DataPoint4TSDB {
+
+ private long timestamp;
+ private String metric;
+ private Map tags;
+ private Object value;
+
+ public DataPoint4TSDB() {
+ }
+
+ public DataPoint4TSDB(long timestamp, String metric, Map tags, Object value) {
+ this.timestamp = timestamp;
+ this.metric = metric;
+ this.tags = tags;
+ this.value = value;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public String getMetric() {
+ return metric;
+ }
+
+ public void setMetric(String metric) {
+ this.metric = metric;
+ }
+
+ public Map getTags() {
+ return tags;
+ }
+
+ public void setTags(Map tags) {
+ this.tags = tags;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ public void setValue(Object value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return JSON.toJSONString(this);
+ }
+}
diff --git a/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/DumpSeries.java b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/DumpSeries.java
new file mode 100644
index 00000000..56ab0bc2
--- /dev/null
+++ b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/DumpSeries.java
@@ -0,0 +1,96 @@
+package com.alibaba.datax.plugin.reader.conn;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.element.StringColumn;
+import com.alibaba.datax.common.plugin.RecordSender;
+import net.opentsdb.core.*;
+import net.opentsdb.core.Internal.Cell;
+import org.hbase.async.KeyValue;
+import org.hbase.async.Scanner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:Tool to dump the data straight from HBase
+ *
+ * @author Benedict Jin
+ * @since 2019-04-17
+ */
+final class DumpSeries {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DumpSeries.class);
+
+ /**
+ * Dump all data points with special metric and time range, then send them all by {@link RecordSender}.
+ */
+ static void doDump(TSDB tsdb, String[] args, RecordSender sender) throws Exception {
+ final ArrayList queries = new ArrayList();
+ CliQuery.parseCommandLineQuery(args, tsdb, queries);
+
+ List dps = new LinkedList();
+ for (final Query query : queries) {
+ final List scanners = Internal.getScanners(query);
+ for (Scanner scanner : scanners) {
+ ArrayList> rows;
+ while ((rows = scanner.nextRows().join()) != null) {
+ for (final ArrayList row : rows) {
+ final byte[] key = row.get(0).key();
+ final long baseTime = Internal.baseTime(tsdb, key);
+ final String metric = Internal.metricName(tsdb, key);
+ for (final KeyValue kv : row) {
+ formatKeyValue(dps, tsdb, kv, baseTime, metric);
+ for (DataPoint4TSDB dp : dps) {
+ StringColumn tsdbColumn = new StringColumn(dp.toString());
+ Record record = sender.createRecord();
+ record.addColumn(tsdbColumn);
+ sender.sendToWriter(record);
+ }
+ dps.clear();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Parse KeyValue into data points.
+ */
+ private static void formatKeyValue(final List dps, final TSDB tsdb,
+ final KeyValue kv, final long baseTime, final String metric) {
+ Map tagKVs = Internal.getTags(tsdb, kv.key());
+
+ final byte[] qualifier = kv.qualifier();
+ final int q_len = qualifier.length;
+
+ if (!AppendDataPoints.isAppendDataPoints(qualifier) && q_len % 2 != 0) {
+ // custom data object, not a data point
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not a data point");
+ }
+ } else if (q_len == 2 || q_len == 4 && Internal.inMilliseconds(qualifier)) {
+ // regular data point
+ final Cell cell = Internal.parseSingleValue(kv);
+ if (cell == null) {
+ throw new IllegalDataException("Unable to parse row: " + kv);
+ }
+ dps.add(new DataPoint4TSDB(cell.absoluteTimestamp(baseTime), metric, tagKVs, cell.parseValue()));
+ } else {
+ final Collection cells;
+ if (q_len == 3) {
+ // append data points
+ cells = new AppendDataPoints().parseKeyValue(tsdb, kv);
+ } else {
+ // compacted column
+ cells = Internal.extractDataPoints(kv);
+ }
+ for (Cell cell : cells) {
+ dps.add(new DataPoint4TSDB(cell.absoluteTimestamp(baseTime), metric, tagKVs, cell.parseValue()));
+ }
+ }
+ }
+}
diff --git a/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/OpenTSDBConnection.java b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/OpenTSDBConnection.java
new file mode 100644
index 00000000..9e7f12c9
--- /dev/null
+++ b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/OpenTSDBConnection.java
@@ -0,0 +1,78 @@
+package com.alibaba.datax.plugin.reader.conn;
+
+import com.alibaba.datax.common.plugin.RecordSender;
+import com.alibaba.datax.plugin.reader.util.TSDBUtils;
+import com.alibaba.fastjson.JSON;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:OpenTSDB Connection
+ *
+ * @author Benedict Jin
+ * @since 2019-03-29
+ */
+public class OpenTSDBConnection implements Connection4TSDB {
+
+ private String address;
+
+ public OpenTSDBConnection(String address) {
+ this.address = address;
+ }
+
+ @Override
+ public String address() {
+ return address;
+ }
+
+ @Override
+ public String version() {
+ return TSDBUtils.version(address);
+ }
+
+ @Override
+ public String config() {
+ return TSDBUtils.config(address);
+ }
+
+ @Override
+ public String[] getSupportVersionPrefix() {
+ return new String[]{"2.3"};
+ }
+
+ @Override
+ public void sendDPs(String metric, Long start, Long end, RecordSender recordSender) throws Exception {
+ OpenTSDBDump.dump(this, metric, start, end, recordSender);
+ }
+
+ @Override
+ public boolean put(DataPoint4TSDB dp) {
+ return false;
+ }
+
+ @Override
+ public boolean put(List dps) {
+ return false;
+ }
+
+ @Override
+ public boolean isSupported() {
+ String versionJson = version();
+ if (StringUtils.isBlank(versionJson)) {
+ throw new RuntimeException("Cannot get the version!");
+ }
+ String version = JSON.parseObject(versionJson).getString("version");
+ if (StringUtils.isBlank(version)) {
+ return false;
+ }
+ for (String prefix : getSupportVersionPrefix()) {
+ if (version.startsWith(prefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/OpenTSDBDump.java b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/OpenTSDBDump.java
new file mode 100644
index 00000000..5ed0a314
--- /dev/null
+++ b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/OpenTSDBDump.java
@@ -0,0 +1,48 @@
+package com.alibaba.datax.plugin.reader.conn;
+
+import com.alibaba.datax.common.plugin.RecordSender;
+import com.alibaba.fastjson.JSON;
+import net.opentsdb.core.TSDB;
+import net.opentsdb.utils.Config;
+
+import java.util.Map;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:OpenTSDB Dump
+ *
+ * @author Benedict Jin
+ * @since 2019-04-15
+ */
+final class OpenTSDBDump {
+
+ private static TSDB TSDB_INSTANCE;
+
+ private OpenTSDBDump() {
+ }
+
+ static void dump(OpenTSDBConnection conn, String metric, Long start, Long end, RecordSender sender) throws Exception {
+ DumpSeries.doDump(getTSDB(conn), new String[]{start + "", end + "", "none", metric}, sender);
+ }
+
+ private static TSDB getTSDB(OpenTSDBConnection conn) {
+ if (TSDB_INSTANCE == null) {
+ synchronized (TSDB.class) {
+ if (TSDB_INSTANCE == null) {
+ try {
+ Config config = new Config(false);
+ Map configurations = JSON.parseObject(conn.config(), Map.class);
+ for (Object key : configurations.keySet()) {
+ config.overrideConfig(key.toString(), configurations.get(key.toString()).toString());
+ }
+ TSDB_INSTANCE = new TSDB(config);
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot init OpenTSDB connection!");
+ }
+ }
+ }
+ }
+ return TSDB_INSTANCE;
+ }
+}
diff --git a/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/opentsdbreader/Constant.java b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/opentsdbreader/Constant.java
new file mode 100644
index 00000000..6017d4e5
--- /dev/null
+++ b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/opentsdbreader/Constant.java
@@ -0,0 +1,14 @@
+package com.alibaba.datax.plugin.reader.opentsdbreader;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:Key
+ *
+ * @author Benedict Jin
+ * @since 2019-04-18
+ */
+public final class Constant {
+
+ static final String DEFAULT_DATA_FORMAT = "yyyy-MM-dd HH:mm:ss";
+}
diff --git a/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/opentsdbreader/Key.java b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/opentsdbreader/Key.java
new file mode 100644
index 00000000..5b8c4adc
--- /dev/null
+++ b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/opentsdbreader/Key.java
@@ -0,0 +1,17 @@
+package com.alibaba.datax.plugin.reader.opentsdbreader;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:Key
+ *
+ * @author Benedict Jin
+ * @since 2019-04-18
+ */
+public class Key {
+
+ static final String ENDPOINT = "endpoint";
+ static final String COLUMN = "column";
+ static final String BEGIN_DATE_TIME = "beginDateTime";
+ static final String END_DATE_TIME = "endDateTime";
+}
diff --git a/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/opentsdbreader/OpenTSDBReader.java b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/opentsdbreader/OpenTSDBReader.java
new file mode 100755
index 00000000..d57456d1
--- /dev/null
+++ b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/opentsdbreader/OpenTSDBReader.java
@@ -0,0 +1,207 @@
+package com.alibaba.datax.plugin.reader.opentsdbreader;
+
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordSender;
+import com.alibaba.datax.common.spi.Reader;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.reader.conn.OpenTSDBConnection;
+import com.alibaba.datax.plugin.reader.util.TimeUtils;
+import com.alibaba.fastjson.JSON;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:Key
+ *
+ * @author Benedict Jin
+ * @since 2019-04-18
+ */
+@SuppressWarnings("unused")
+public class OpenTSDBReader extends Reader {
+
+ public static class Job extends Reader.Job {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Job.class);
+
+ private Configuration originalConfig;
+
+ @Override
+ public void init() {
+ this.originalConfig = super.getPluginJobConf();
+
+ String address = originalConfig.getString(Key.ENDPOINT);
+ if (StringUtils.isBlank(address)) {
+ throw DataXException.asDataXException(
+ OpenTSDBReaderErrorCode.REQUIRED_VALUE,
+ "The parameter [" + Key.ENDPOINT + "] is not set.");
+ }
+
+ List columns = originalConfig.getList(Key.COLUMN, String.class);
+ if (columns == null || columns.isEmpty()) {
+ throw DataXException.asDataXException(
+ OpenTSDBReaderErrorCode.REQUIRED_VALUE,
+ "The parameter [" + Key.COLUMN + "] is not set.");
+ }
+
+ SimpleDateFormat format = new SimpleDateFormat(Constant.DEFAULT_DATA_FORMAT);
+ String startTime = originalConfig.getString(Key.BEGIN_DATE_TIME);
+ Long startDate;
+ if (startTime == null || startTime.trim().length() == 0) {
+ throw DataXException.asDataXException(
+ OpenTSDBReaderErrorCode.REQUIRED_VALUE,
+ "The parameter [" + Key.BEGIN_DATE_TIME + "] is not set.");
+ } else {
+ try {
+ startDate = format.parse(startTime).getTime();
+ } catch (ParseException e) {
+ throw DataXException.asDataXException(OpenTSDBReaderErrorCode.ILLEGAL_VALUE,
+ "The parameter [" + Key.BEGIN_DATE_TIME +
+ "] needs to conform to the [" + Constant.DEFAULT_DATA_FORMAT + "] format.");
+ }
+ }
+ String endTime = originalConfig.getString(Key.END_DATE_TIME);
+ Long endDate;
+ if (endTime == null || endTime.trim().length() == 0) {
+ throw DataXException.asDataXException(
+ OpenTSDBReaderErrorCode.REQUIRED_VALUE,
+ "The parameter [" + Key.END_DATE_TIME + "] is not set.");
+ } else {
+ try {
+ endDate = format.parse(endTime).getTime();
+ } catch (ParseException e) {
+ throw DataXException.asDataXException(OpenTSDBReaderErrorCode.ILLEGAL_VALUE,
+ "The parameter [" + Key.END_DATE_TIME +
+ "] needs to conform to the [" + Constant.DEFAULT_DATA_FORMAT + "] format.");
+ }
+ }
+ if (startDate >= endDate) {
+ throw DataXException.asDataXException(OpenTSDBReaderErrorCode.ILLEGAL_VALUE,
+ "The parameter [" + Key.BEGIN_DATE_TIME +
+ "] should be less than the parameter [" + Key.END_DATE_TIME + "].");
+ }
+ }
+
+ @Override
+ public void prepare() {
+ }
+
+ @Override
+ public List split(int adviceNumber) {
+ List configurations = new ArrayList();
+
+ // get metrics
+ List columns = originalConfig.getList(Key.COLUMN, String.class);
+
+ // get time range
+ SimpleDateFormat format = new SimpleDateFormat(Constant.DEFAULT_DATA_FORMAT);
+ long startTime;
+ try {
+ startTime = format.parse(originalConfig.getString(Key.BEGIN_DATE_TIME)).getTime();
+ } catch (ParseException e) {
+ throw DataXException.asDataXException(
+ OpenTSDBReaderErrorCode.ILLEGAL_VALUE, "解析[" + Key.BEGIN_DATE_TIME + "]失败.", e);
+ }
+ long endTime;
+ try {
+ endTime = format.parse(originalConfig.getString(Key.END_DATE_TIME)).getTime();
+ } catch (ParseException e) {
+ throw DataXException.asDataXException(
+ OpenTSDBReaderErrorCode.ILLEGAL_VALUE, "解析[" + Key.END_DATE_TIME + "]失败.", e);
+ }
+ if (TimeUtils.isSecond(startTime)) {
+ startTime *= 1000;
+ }
+ if (TimeUtils.isSecond(endTime)) {
+ endTime *= 1000;
+ }
+ DateTime startDateTime = new DateTime(TimeUtils.getTimeInHour(startTime));
+ DateTime endDateTime = new DateTime(TimeUtils.getTimeInHour(endTime));
+
+ // split by metric
+ for (String column : columns) {
+ // split by time in hour
+ while (startDateTime.isBefore(endDateTime)) {
+ Configuration clone = this.originalConfig.clone();
+ clone.set(Key.COLUMN, Collections.singletonList(column));
+
+ clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
+ startDateTime = startDateTime.plusHours(1);
+ // Make sure the time interval is [start, end).
+ // Because net.opentsdb.core.Query.setEndTime means less than or equal to the end time.
+ clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1);
+ configurations.add(clone);
+
+ LOG.info("Configuration: {}", JSON.toJSONString(clone));
+ }
+ }
+ return configurations;
+ }
+
+ @Override
+ public void post() {
+ }
+
+ @Override
+ public void destroy() {
+ }
+ }
+
+ public static class Task extends Reader.Task {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+ private List columns;
+ private OpenTSDBConnection conn;
+ private Long startTime;
+ private Long endTime;
+
+ @Override
+ public void init() {
+ Configuration readerSliceConfig = super.getPluginJobConf();
+
+ LOG.info("getPluginJobConf: {}", JSON.toJSONString(readerSliceConfig));
+
+ this.columns = readerSliceConfig.getList(Key.COLUMN, String.class);
+ String address = readerSliceConfig.getString(Key.ENDPOINT);
+
+ conn = new OpenTSDBConnection(address);
+
+ this.startTime = readerSliceConfig.getLong(Key.BEGIN_DATE_TIME);
+ this.endTime = readerSliceConfig.getLong(Key.END_DATE_TIME);
+ }
+
+ @Override
+ public void prepare() {
+ }
+
+ @Override
+ public void startRead(RecordSender recordSender) {
+ try {
+ for (String column : columns) {
+ conn.sendDPs(column, this.startTime, this.endTime, recordSender);
+ }
+ } catch (Exception e) {
+ throw DataXException.asDataXException(
+ OpenTSDBReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e);
+ }
+ }
+
+ @Override
+ public void post() {
+ }
+
+ @Override
+ public void destroy() {
+ }
+ }
+}
diff --git a/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/opentsdbreader/OpenTSDBReaderErrorCode.java b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/opentsdbreader/OpenTSDBReaderErrorCode.java
new file mode 100755
index 00000000..0d9de4c4
--- /dev/null
+++ b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/opentsdbreader/OpenTSDBReaderErrorCode.java
@@ -0,0 +1,40 @@
+package com.alibaba.datax.plugin.reader.opentsdbreader;
+
+import com.alibaba.datax.common.spi.ErrorCode;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:OpenTSDB Reader Error Code
+ *
+ * @author Benedict Jin
+ * @since 2019-04-18
+ */
+public enum OpenTSDBReaderErrorCode implements ErrorCode {
+
+ REQUIRED_VALUE("OpenTSDBReader-00", "缺失必要的值"),
+ ILLEGAL_VALUE("OpenTSDBReader-01", "值非法");
+
+ private final String code;
+ private final String description;
+
+ OpenTSDBReaderErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);
+ }
+}
diff --git a/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/util/HttpUtils.java b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/util/HttpUtils.java
new file mode 100644
index 00000000..cdf5c9c1
--- /dev/null
+++ b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/util/HttpUtils.java
@@ -0,0 +1,68 @@
+package com.alibaba.datax.plugin.reader.util;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.http.client.fluent.Content;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:HttpUtils
+ *
+ * @author Benedict Jin
+ * @since 2019-03-29
+ */
+public final class HttpUtils {
+
+ public final static Charset UTF_8 = Charset.forName("UTF-8");
+ public final static int CONNECT_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60);
+ public final static int SOCKET_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60);
+
+ private HttpUtils() {
+ }
+
+ public static String get(String url) throws Exception {
+ Content content = Request.Get(url)
+ .connectTimeout(CONNECT_TIMEOUT_DEFAULT_IN_MILL)
+ .socketTimeout(SOCKET_TIMEOUT_DEFAULT_IN_MILL)
+ .execute()
+ .returnContent();
+ if (content == null) {
+ return null;
+ }
+ return content.asString(UTF_8);
+ }
+
+ public static String post(String url, Map params) throws Exception {
+ return post(url, JSON.toJSONString(params), CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
+ }
+
+ public static String post(String url, String params) throws Exception {
+ return post(url, params, CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
+ }
+
+ public static String post(String url, Map params,
+ int connectTimeoutInMill, int socketTimeoutInMill) throws Exception {
+ return post(url, JSON.toJSONString(params), connectTimeoutInMill, socketTimeoutInMill);
+ }
+
+ public static String post(String url, String params,
+ int connectTimeoutInMill, int socketTimeoutInMill) throws Exception {
+ Content content = Request.Post(url)
+ .connectTimeout(connectTimeoutInMill)
+ .socketTimeout(socketTimeoutInMill)
+ .addHeader("Content-Type", "application/json")
+ .bodyString(params, ContentType.APPLICATION_JSON)
+ .execute()
+ .returnContent();
+ if (content == null) {
+ return null;
+ }
+ return content.asString(UTF_8);
+ }
+}
diff --git a/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/util/TSDBUtils.java b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/util/TSDBUtils.java
new file mode 100644
index 00000000..72c7fd62
--- /dev/null
+++ b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/util/TSDBUtils.java
@@ -0,0 +1,68 @@
+package com.alibaba.datax.plugin.reader.util;
+
+import com.alibaba.datax.plugin.reader.conn.DataPoint4TSDB;
+import com.alibaba.fastjson.JSON;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:TSDB Utils
+ *
+ * @author Benedict Jin
+ * @since 2019-03-29
+ */
+public final class TSDBUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TSDBUtils.class);
+
+ private TSDBUtils() {
+ }
+
+ public static String version(String address) {
+ String url = String.format("%s/api/version", address);
+ String rsp;
+ try {
+ rsp = HttpUtils.get(url);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return rsp;
+ }
+
+ public static String config(String address) {
+ String url = String.format("%s/api/config", address);
+ String rsp;
+ try {
+ rsp = HttpUtils.get(url);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return rsp;
+ }
+
+ public static boolean put(String address, List dps) {
+ return put(address, JSON.toJSON(dps));
+ }
+
+ public static boolean put(String address, DataPoint4TSDB dp) {
+ return put(address, JSON.toJSON(dp));
+ }
+
+ private static boolean put(String address, Object o) {
+ String url = String.format("%s/api/put", address);
+ String rsp;
+ try {
+ rsp = HttpUtils.post(url, o.toString());
+ // If successful, the returned content should be null.
+ assert rsp == null;
+ } catch (Exception e) {
+ LOG.error("Address: {}, DataPoints: {}", url, o);
+ throw new RuntimeException(e);
+ }
+ return true;
+ }
+}
diff --git a/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/util/TimeUtils.java b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/util/TimeUtils.java
new file mode 100644
index 00000000..9bc11b36
--- /dev/null
+++ b/opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/util/TimeUtils.java
@@ -0,0 +1,38 @@
+package com.alibaba.datax.plugin.reader.util;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:TimeUtils
+ *
+ * @author Benedict Jin
+ * @since 2019-04-22
+ */
+public final class TimeUtils {
+
+ private TimeUtils() {
+ }
+
+ private static final long SECOND_MASK = 0xFFFFFFFF00000000L;
+ private static final long HOUR_IN_MILL = TimeUnit.HOURS.toMillis(1);
+
+ /**
+ * Weather the timestamp is second.
+ *
+ * @param ts timestamp
+ */
+ public static boolean isSecond(long ts) {
+ return (ts & SECOND_MASK) == 0;
+ }
+
+ /**
+ * Get the hour.
+ *
+ * @param ms time in millisecond
+ */
+ public static long getTimeInHour(long ms) {
+ return ms - ms % HOUR_IN_MILL;
+ }
+}
diff --git a/opentsdbreader/src/main/resources/plugin.json b/opentsdbreader/src/main/resources/plugin.json
new file mode 100755
index 00000000..692a9853
--- /dev/null
+++ b/opentsdbreader/src/main/resources/plugin.json
@@ -0,0 +1,10 @@
+{
+ "name": "opentsdbreader",
+ "class": "com.alibaba.datax.plugin.reader.opentsdbreader.OpenTSDBReader",
+ "description": {
+ "useScene": "从 OpenTSDB 中摄取数据点",
+ "mechanism": "根据时间和 metric 直连底层 HBase 存储,从而 Scan 出符合条件的数据点",
+ "warn": "指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的 [3:35, 4:55) 会被转为 [3:00, 4:00)"
+ },
+ "developer": "Benedict Jin"
+}
diff --git a/opentsdbreader/src/main/resources/plugin_job_template.json b/opentsdbreader/src/main/resources/plugin_job_template.json
new file mode 100644
index 00000000..c1f29f3d
--- /dev/null
+++ b/opentsdbreader/src/main/resources/plugin_job_template.json
@@ -0,0 +1,11 @@
+{
+ "name": "opentsdbreader",
+ "parameter": {
+ "endpoint": "http://localhost:8242",
+ "column": [
+ "m"
+ ],
+ "startTime": "2019-01-01 00:00:00",
+ "endTime": "2019-01-01 01:00:00"
+ }
+}
diff --git a/opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/conn/OpenTSDBConnectionTest.java b/opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/conn/OpenTSDBConnectionTest.java
new file mode 100644
index 00000000..91429b4a
--- /dev/null
+++ b/opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/conn/OpenTSDBConnectionTest.java
@@ -0,0 +1,30 @@
+package com.alibaba.datax.plugin.reader.conn;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:OpenTSDB Connection4TSDB Test
+ *
+ * @author Benedict Jin
+ * @since 2019-03-29
+ */
+@Ignore
+public class OpenTSDBConnectionTest {
+
+ private static final String OPENTSDB_ADDRESS = "http://localhost:8242";
+
+ @Test
+ public void testVersion() {
+ String version = new OpenTSDBConnection(OPENTSDB_ADDRESS).version();
+ Assert.assertNotNull(version);
+ }
+
+ @Test
+ public void testIsSupported() {
+ Assert.assertTrue(new OpenTSDBConnection(OPENTSDB_ADDRESS).isSupported());
+ }
+}
diff --git a/opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/util/Const.java b/opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/util/Const.java
new file mode 100644
index 00000000..df9e0eda
--- /dev/null
+++ b/opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/util/Const.java
@@ -0,0 +1,18 @@
+package com.alibaba.datax.plugin.reader.util;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:Const
+ *
+ * @author Benedict Jin
+ * @since 2019-03-29
+ */
+final class Const {
+
+ private Const() {
+ }
+
+ static final String OPENTSDB_ADDRESS = "http://localhost:8242";
+ static final String TSDB_ADDRESS = "http://localhost:8240";
+}
diff --git a/opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/util/HttpUtilsTest.java b/opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/util/HttpUtilsTest.java
new file mode 100644
index 00000000..ca77597f
--- /dev/null
+++ b/opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/util/HttpUtilsTest.java
@@ -0,0 +1,39 @@
+package com.alibaba.datax.plugin.reader.util;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:HttpUtils Test
+ *
+ * @author Benedict Jin
+ * @since 2019-03-29
+ */
+@Ignore
+public class HttpUtilsTest {
+
+ @Test
+ public void testSimpleCase() throws Exception {
+ String url = "https://httpbin.org/post";
+ Map params = new HashMap();
+ params.put("foo", "bar");
+
+ String rsp = HttpUtils.post(url, params);
+ System.out.println(rsp);
+ Assert.assertNotNull(rsp);
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ String url = String.format("%s/api/version", Const.OPENTSDB_ADDRESS);
+ String rsp = HttpUtils.get(url);
+ System.out.println(rsp);
+ Assert.assertNotNull(rsp);
+ }
+}
diff --git a/opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/util/TSDBTest.java b/opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/util/TSDBTest.java
new file mode 100644
index 00000000..8cd091bf
--- /dev/null
+++ b/opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/util/TSDBTest.java
@@ -0,0 +1,28 @@
+package com.alibaba.datax.plugin.reader.util;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:TSDB Test
+ *
+ * @author Benedict Jin
+ * @since 2019-04-11
+ */
+@Ignore
+public class TSDBTest {
+
+ @Test
+ public void testVersion() {
+ String version = TSDBUtils.version(Const.TSDB_ADDRESS);
+ Assert.assertNotNull(version);
+ System.out.println(version);
+
+ version = TSDBUtils.version(Const.OPENTSDB_ADDRESS);
+ Assert.assertNotNull(version);
+ System.out.println(version);
+ }
+}
diff --git a/opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/util/TimeUtilsTest.java b/opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/util/TimeUtilsTest.java
new file mode 100644
index 00000000..61d29088
--- /dev/null
+++ b/opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/util/TimeUtilsTest.java
@@ -0,0 +1,33 @@
+package com.alibaba.datax.plugin.reader.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:com.alibaba.datax.common.util
+ *
+ * @author Benedict Jin
+ * @since 2019-04-22
+ */
+public class TimeUtilsTest {
+
+ @Test
+ public void testIsSecond() {
+ Assert.assertFalse(TimeUtils.isSecond(System.currentTimeMillis()));
+ Assert.assertTrue(TimeUtils.isSecond(System.currentTimeMillis() / 1000));
+ }
+
+ @Test
+ public void testGetTimeInHour() throws ParseException {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ Date date = sdf.parse("2019-04-18 15:32:33");
+ long timeInHour = TimeUtils.getTimeInHour(date.getTime());
+ Assert.assertEquals("2019-04-18 15:00:00", sdf.format(timeInHour));
+ }
+}
diff --git a/package.xml b/package.xml
index 47e277b8..1ff6391d 100755
--- a/package.xml
+++ b/package.xml
@@ -159,6 +159,13 @@
datax
+
+ opentsdbreader/target/datax/
+
+ **/*.*
+
+ datax
+
@@ -322,5 +329,12 @@
datax
+
+ tsdbwriter/target/datax/
+
+ **/*.*
+
+ datax
+
diff --git a/pom.xml b/pom.xml
index f4d197ba..6e68418e 100755
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,7 @@
rdbmsreader
hbase11xreader
hbase094xreader
+ opentsdbreader
mysqlwriter
@@ -85,6 +86,7 @@
hbase11xsqlwriter
hbase11xsqlreader
elasticsearchwriter
+ tsdbwriter
plugin-rdbms-util
diff --git a/tsdbwriter/doc/tsdbhttpwriter.md b/tsdbwriter/doc/tsdbhttpwriter.md
new file mode 100644
index 00000000..c723a360
--- /dev/null
+++ b/tsdbwriter/doc/tsdbhttpwriter.md
@@ -0,0 +1,187 @@
+
+# TSDBWriter 插件文档
+
+___
+
+
+## 1 快速介绍
+
+TSDBWriter 插件实现了将数据点写入到阿里巴巴自研 TSDB 数据库中(后续简称 TSDB)。
+
+
+时间序列数据库(Time Series Database , 简称 TSDB)是一种高性能,低成本,稳定可靠的在线时序数据库服务;提供高效读写,高压缩比存储、时序数据插值及聚合计算,广泛应用于物联网(IoT)设备监控系统 ,企业能源管理系统(EMS),生产安全监控系统,电力检测系统等行业场景。 TSDB 提供百万级时序数据秒级写入,高压缩比低成本存储、预降采样、插值、多维聚合计算,查询结果可视化功能;解决由于设备采集点数量巨大,数据采集频率高,造成的存储成本高,写入和查询分析效率低的问题。更多关于 TSDB 的介绍,详见[阿里云 TSDB 官网](https://help.aliyun.com/product/54825.html)。
+
+
+
+## 2 实现原理
+
+通过 HTTP 连接 TSDB 实例,并通过 `/api/put` 接口将数据点写入。关于写入接口详见 TSDB 的[接口说明文档](https://help.aliyun.com/document_detail/59939.html)。
+
+
+
+## 3 功能说明
+
+### 3.1 配置样例
+
+* 配置一个从 OpenTSDB 数据库同步抽取数据到 TSDB:
+
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "opentsdbreader",
+ "parameter": {
+ "endpoint": "http://localhost:4242",
+ "column": [
+ "m"
+ ],
+ "startTime": "2019-01-01 00:00:00",
+ "endTime": "2019-01-01 03:00:00"
+ }
+ },
+ "writer": {
+ "name": "tsdbhttpwriter",
+ "parameter": {
+ "endpoint": "http://localhost:8242"
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
+```
+
+
+
+### 3.2 参数说明
+
+* **name**
+ * 描述:本插件的名称
+ * 必选:是
+ * 默认值:tsdbhttpwriter
+
+* **parameter**
+ * **endpoint**
+ * 描述:TSDB 的 HTTP 连接地址
+ * 必选:是
+ * 格式:http://IP:Port
+ * 默认值:无
+
+* **batchSize**
+ * 描述:每次批量数据的条数
+ * 必选:否
+ * 格式:int,需要保证大于 0
+ * 默认值:100
+
+* **maxRetryTime**
+ * 描述:失败后重试的次数
+ * 必选:否
+ * 格式:int,需要保证大于 1
+ * 默认值:3
+
+* **ignoreWriteError**
+ * 描述:如果设置为 true,则忽略写入错误,继续写入;否则,多次重试后仍写入失败的话,则会终止写入任务
+ * 必选:否
+ * 格式:bool
+ * 默认值:false
+
+
+
+
+
+
+### 3.3 类型转换
+
+
+| DataX 内部类型 | TSDB 数据类型 |
+| -------------- | ------------------------------------------------------------ |
+| String | TSDB 数据点序列化字符串,包括 timestamp、metric、tags 和 value |
+
+
+
+
+
+
+## 4 性能报告
+
+### 4.1 环境准备
+
+#### 4.1.1 数据特征
+
+从 Metric、时间线、Value 和 采集周期 四个方面来描述:
+
+##### metric
+
+固定指定一个 metric 为 `m`。
+
+##### tagkv
+
+前四个 tagkv 全排列,形成 `10 * 20 * 100 * 100 = 2000000` 条时间线,最后 IP 对应 2000000 条时间线从 1 开始自增。
+
+| **tag_k** | **tag_v** |
+| --------- | ------------- |
+| zone | z1~z10 |
+| cluster | c1~c20 |
+| group | g1~100 |
+| app | a1~a100 |
+| ip | ip1~ip2000000 |
+
+##### value
+
+度量值为 [1, 100] 区间内的随机值
+
+##### interval
+
+采集周期为 10 秒,持续摄入 3 小时,总数据量为 `3 * 60 * 60 / 10 * 2000000 = 2,160,000,000` 个数据点。
+
+
+
+#### 4.1.2 机器参数
+
+TSDB Writer 机型: 64C256G
+
+HBase 机型: 8C16G * 5
+
+#### 4.1.3 DataX jvm 参数
+
+"-Xms4096m -Xmx4096m"
+
+
+
+
+### 4.2 测试报告
+
+
+| 通道数 | DataX 速度 (Rec/s) | DataX 流量 (MB/s) |
+| ------ | ------------------ | ----------------- |
+| 1 | 129753 | 15.45 |
+| 2 | 284953 | 33.70 |
+| 3 | 385868 | 45.71 |
+
+
+
+
+
+## 5 约束限制
+
+### 5.1 目前只支持兼容 TSDB 2.4.x 及以上版本
+
+其他版本暂不保证兼容
+
+
+
+
+
+## 6 FAQ
+
+
+
+
+
diff --git a/tsdbwriter/pom.xml b/tsdbwriter/pom.xml
new file mode 100644
index 00000000..d74776af
--- /dev/null
+++ b/tsdbwriter/pom.xml
@@ -0,0 +1,136 @@
+
+
+ 4.0.0
+
+
+ com.alibaba.datax
+ datax-all
+ 0.0.1-SNAPSHOT
+
+
+ tsdbwriter
+ tsdbwriter
+ jar
+
+
+ UTF-8
+
+
+ 3.3.2
+
+
+ 4.4
+ 2.4
+
+
+ 1.2.28
+
+
+ 4.12
+
+
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax-project-version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ fastjson
+ com.alibaba
+
+
+ commons-math3
+ org.apache.commons
+
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+ ch.qos.logback
+ logback-classic
+
+
+
+
+ org.apache.commons
+ commons-lang3
+ ${commons-lang3.version}
+
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ ${httpclient.version}
+
+
+ commons-io
+ commons-io
+ ${commons-io.version}
+
+
+ org.apache.httpcomponents
+ fluent-hc
+ ${httpclient.version}
+
+
+
+
+ com.alibaba
+ fastjson
+ ${fastjson.version}
+
+
+
+
+ junit
+ junit
+ ${junit4.version}
+ test
+
+
+
+
+
+
+
+ maven-compiler-plugin
+
+ 1.6
+ 1.6
+ ${project-sourceEncoding}
+
+
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
diff --git a/tsdbwriter/src/main/assembly/package.xml b/tsdbwriter/src/main/assembly/package.xml
new file mode 100755
index 00000000..ff474770
--- /dev/null
+++ b/tsdbwriter/src/main/assembly/package.xml
@@ -0,0 +1,35 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/writer/tsdbwriter
+
+
+ target/
+
+ tsdbwriter-0.0.1-SNAPSHOT.jar
+
+ plugin/writer/tsdbwriter
+
+
+
+
+
+ false
+ plugin/writer/tsdbwriter/libs
+ runtime
+
+
+
diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/Connection4TSDB.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/Connection4TSDB.java
new file mode 100644
index 00000000..8119348d
--- /dev/null
+++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/Connection4TSDB.java
@@ -0,0 +1,85 @@
+package com.alibaba.datax.plugin.writer.conn;
+
+import com.alibaba.datax.common.plugin.RecordSender;
+
+import java.util.List;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:Connection for TSDB-like databases
+ *
+ * @author Benedict Jin
+ * @since 2019-03-29
+ */
+public interface Connection4TSDB {
+
+ /**
+ * Get the address of Database.
+ *
+ * @return host+ip
+ */
+ String address();
+
+ /**
+ * Get the version of Database.
+ *
+ * @return version
+ */
+ String version();
+
+ /**
+ * Get these configurations.
+ *
+ * @return configs
+ */
+ String config();
+
+ /**
+ * Get the list of supported version.
+ *
+ * @return version list
+ */
+ String[] getSupportVersionPrefix();
+
+ /**
+ * Send data points by metric & start time & end time.
+ *
+ * @param metric metric
+ * @param start startTime
+ * @param end endTime
+ * @param recordSender sender
+ */
+ void sendDPs(String metric, Long start, Long end, RecordSender recordSender) throws Exception;
+
+ /**
+ * Put data point.
+ *
+ * @param dp data point
+ * @return whether the data point is written successfully
+ */
+ boolean put(DataPoint4TSDB dp);
+
+ /**
+ * Put data points.
+ *
+ * @param dps data points
+ * @return whether the data point is written successfully
+ */
+ boolean put(List dps);
+
+ /**
+ * Put data points.
+ *
+ * @param dps data points
+ * @return whether the data point is written successfully
+ */
+ boolean put(String dps);
+
+ /**
+ * Whether current version is supported.
+ *
+ * @return true: supported; false: not yet!
+ */
+ boolean isSupported();
+}
diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/DataPoint4TSDB.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/DataPoint4TSDB.java
new file mode 100644
index 00000000..fee012df
--- /dev/null
+++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/DataPoint4TSDB.java
@@ -0,0 +1,68 @@
+package com.alibaba.datax.plugin.writer.conn;
+
+import com.alibaba.fastjson.JSON;
+
+import java.util.Map;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:DataPoint for TSDB
+ *
+ * @author Benedict Jin
+ * @since 2019-04-10
+ */
+public class DataPoint4TSDB {
+
+ private long timestamp;
+ private String metric;
+ private Map tags;
+ private Object value;
+
+ public DataPoint4TSDB() {
+ }
+
+ public DataPoint4TSDB(long timestamp, String metric, Map tags, Object value) {
+ this.timestamp = timestamp;
+ this.metric = metric;
+ this.tags = tags;
+ this.value = value;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public String getMetric() {
+ return metric;
+ }
+
+ public void setMetric(String metric) {
+ this.metric = metric;
+ }
+
+ public Map getTags() {
+ return tags;
+ }
+
+ public void setTags(Map tags) {
+ this.tags = tags;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ public void setValue(Object value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return JSON.toJSONString(this);
+ }
+}
diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/TSDBConnection.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/TSDBConnection.java
new file mode 100644
index 00000000..e4ebad7d
--- /dev/null
+++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/TSDBConnection.java
@@ -0,0 +1,86 @@
+package com.alibaba.datax.plugin.writer.conn;
+
+import com.alibaba.datax.common.plugin.RecordSender;
+import com.alibaba.datax.plugin.writer.util.TSDBUtils;
+import com.alibaba.fastjson.JSON;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:TSDB Connection
+ *
+ * @author Benedict Jin
+ * @since 2019-03-29
+ */
+public class TSDBConnection implements Connection4TSDB {
+
+ private String address;
+
+ public TSDBConnection(String address) {
+ if (StringUtils.isBlank(address)) {
+ throw new RuntimeException("TSDBConnection init failed because address is blank!");
+ }
+ this.address = address;
+ }
+
+ @Override
+ public String address() {
+ return address;
+ }
+
+ @Override
+ public String version() {
+ return TSDBUtils.version(address);
+ }
+
+ @Override
+ public String config() {
+ return TSDBUtils.config(address);
+ }
+
+ @Override
+ public String[] getSupportVersionPrefix() {
+ return new String[]{"2.4.1", "2.4.2"};
+ }
+
+ @Override
+ public void sendDPs(String metric, Long start, Long end, RecordSender recordSender) {
+ throw new RuntimeException("Not support yet!");
+ }
+
+ @Override
+ public boolean put(DataPoint4TSDB dp) {
+ return TSDBUtils.put(address, dp);
+ }
+
+ @Override
+ public boolean put(List dps) {
+ return TSDBUtils.put(address, dps);
+ }
+
+ @Override
+ public boolean put(String dps) {
+ return TSDBUtils.put(address, dps);
+ }
+
+ @Override
+ public boolean isSupported() {
+ String versionJson = version();
+ if (StringUtils.isBlank(versionJson)) {
+ throw new RuntimeException("Cannot get the version!");
+ }
+ String version = JSON.parseObject(versionJson).getString("version");
+ if (StringUtils.isBlank(version)) {
+ return false;
+ }
+ for (String prefix : getSupportVersionPrefix()) {
+ if (version.startsWith(prefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/Constant.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/Constant.java
new file mode 100644
index 00000000..abac14a4
--- /dev/null
+++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/Constant.java
@@ -0,0 +1,16 @@
+package com.alibaba.datax.plugin.writer.tsdbwriter;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:Key
+ *
+ * @author Benedict Jin
+ * @since 2019-04-18
+ */
+public final class Constant {
+
+ static final int DEFAULT_BATCH_SIZE = 100;
+ static final int DEFAULT_TRY_SIZE = 3;
+ static final boolean DEFAULT_IGNORE_WRITE_ERROR = false;
+}
diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/Key.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/Key.java
new file mode 100755
index 00000000..2cc3f671
--- /dev/null
+++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/Key.java
@@ -0,0 +1,17 @@
+package com.alibaba.datax.plugin.writer.tsdbwriter;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:Key
+ *
+ * @author Benedict Jin
+ * @since 2019-04-18
+ */
+public class Key {
+
+ static final String ENDPOINT = "endpoint";
+ static final String BATCH_SIZE = "batchSize";
+ static final String MAX_RETRY_TIME = "maxRetryTime";
+ static final String IGNORE_WRITE_ERROR = "ignoreWriteError";
+}
diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriter.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriter.java
new file mode 100755
index 00000000..e410b2ba
--- /dev/null
+++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriter.java
@@ -0,0 +1,171 @@
+package com.alibaba.datax.plugin.writer.tsdbwriter;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.spi.Writer;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.common.util.RetryUtil;
+import com.alibaba.datax.plugin.writer.conn.TSDBConnection;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:TSDB Http Writer
+ *
+ * @author Benedict Jin
+ * @since 2019-04-18
+ */
+@SuppressWarnings("unused")
+public class TSDBWriter extends Writer {
+
+ public static class Job extends Writer.Job {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Job.class);
+
+ private Configuration originalConfig;
+
+ @Override
+ public void init() {
+ this.originalConfig = super.getPluginJobConf();
+
+ String address = this.originalConfig.getString(Key.ENDPOINT);
+ if (StringUtils.isBlank(address)) {
+ throw DataXException.asDataXException(TSDBWriterErrorCode.REQUIRED_VALUE,
+ "The parameter [" + Key.ENDPOINT + "] is not set.");
+ }
+
+ Integer batchSize = this.originalConfig.getInt(Key.BATCH_SIZE);
+ if (batchSize == null || batchSize < 1) {
+ originalConfig.set(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_SIZE);
+ LOG.info("The parameter [" + Key.BATCH_SIZE +
+ "] will be default value: " + Constant.DEFAULT_BATCH_SIZE);
+ }
+
+ Integer retrySize = this.originalConfig.getInt(Key.MAX_RETRY_TIME);
+ if (retrySize == null || retrySize < 0) {
+ originalConfig.set(Key.MAX_RETRY_TIME, Constant.DEFAULT_TRY_SIZE);
+ LOG.info("The parameter [" + Key.MAX_RETRY_TIME +
+ "] will be default value: " + Constant.DEFAULT_TRY_SIZE);
+ }
+
+ Boolean ignoreWriteError = this.originalConfig.getBool(Key.IGNORE_WRITE_ERROR);
+ if (ignoreWriteError == null) {
+ originalConfig.set(Key.IGNORE_WRITE_ERROR, Constant.DEFAULT_IGNORE_WRITE_ERROR);
+ LOG.info("The parameter [" + Key.IGNORE_WRITE_ERROR +
+ "] will be default value: " + Constant.DEFAULT_IGNORE_WRITE_ERROR);
+ }
+ }
+
+ @Override
+ public void prepare() {
+ }
+
+ @Override
+ public List split(int mandatoryNumber) {
+ ArrayList configurations = new ArrayList(mandatoryNumber);
+ for (int i = 0; i < mandatoryNumber; i++) {
+ configurations.add(this.originalConfig.clone());
+ }
+ return configurations;
+ }
+
+ @Override
+ public void post() {
+ }
+
+ @Override
+ public void destroy() {
+ }
+ }
+
+ public static class Task extends Writer.Task {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+ private TSDBConnection conn;
+ private int batchSize;
+ private int retrySize;
+ private boolean ignoreWriteError;
+
+ @Override
+ public void init() {
+ Configuration writerSliceConfig = getPluginJobConf();
+ String address = writerSliceConfig.getString(Key.ENDPOINT);
+ this.conn = new TSDBConnection(address);
+ this.batchSize = writerSliceConfig.getInt(Key.BATCH_SIZE);
+ this.retrySize = writerSliceConfig.getInt(Key.MAX_RETRY_TIME);
+ this.ignoreWriteError = writerSliceConfig.getBool(Key.IGNORE_WRITE_ERROR);
+ }
+
+ @Override
+ public void prepare() {
+ }
+
+ @Override
+ public void startWrite(RecordReceiver recordReceiver) {
+ try {
+ Record lastRecord = null;
+ Record record;
+ int count = 0;
+ StringBuilder dps = new StringBuilder();
+ while ((record = recordReceiver.getFromReader()) != null) {
+ final int recordLength = record.getColumnNumber();
+ for (int i = 0; i < recordLength; i++) {
+ dps.append(record.getColumn(i).asString());
+ dps.append(",");
+ count++;
+ if (count == batchSize) {
+ count = 0;
+ batchPut(record, "[" + dps.substring(0, dps.length() - 1) + "]");
+ dps = new StringBuilder();
+ }
+ }
+ lastRecord = record;
+ }
+ if (StringUtils.isNotBlank(dps.toString())) {
+ batchPut(lastRecord, "[" + dps.substring(0, dps.length() - 1) + "]");
+ }
+ } catch (Exception e) {
+ throw DataXException.asDataXException(TSDBWriterErrorCode.RUNTIME_EXCEPTION, e);
+ }
+ }
+
+ private void batchPut(final Record record, final String dps) {
+ try {
+ RetryUtil.executeWithRetry(new Callable() {
+ @Override
+ public Integer call() {
+ if (!conn.put(dps)) {
+ getTaskPluginCollector().collectDirtyRecord(record, "Put data points failed!");
+ throw DataXException.asDataXException(TSDBWriterErrorCode.RUNTIME_EXCEPTION,
+ "Put data points failed!");
+ }
+ return 0;
+ }
+ }, retrySize, 60000L, true);
+ } catch (Exception e) {
+ if (ignoreWriteError) {
+ LOG.warn("Ignore write exceptions and continue writing.");
+ } else {
+ throw DataXException.asDataXException(TSDBWriterErrorCode.RETRY_WRITER_EXCEPTION, e);
+ }
+ }
+ }
+
+ @Override
+ public void post() {
+ }
+
+ @Override
+ public void destroy() {
+ }
+ }
+}
diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriterErrorCode.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriterErrorCode.java
new file mode 100755
index 00000000..f907fb67
--- /dev/null
+++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriterErrorCode.java
@@ -0,0 +1,41 @@
+package com.alibaba.datax.plugin.writer.tsdbwriter;
+
+import com.alibaba.datax.common.spi.ErrorCode;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:TSDB Http Writer Error Code
+ *
+ * @author Benedict Jin
+ * @since 2019-04-18
+ */
+public enum TSDBWriterErrorCode implements ErrorCode {
+
+ REQUIRED_VALUE("TSDBWriter-00", "Missing the necessary value"),
+ RUNTIME_EXCEPTION("TSDBWriter-01", "Runtime exception"),
+ RETRY_WRITER_EXCEPTION("TSDBWriter-02", "After repeated attempts, the write still fails");
+
+ private final String code;
+ private final String description;
+
+ TSDBWriterErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);
+ }
+}
diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/HttpUtils.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/HttpUtils.java
new file mode 100644
index 00000000..b81512f7
--- /dev/null
+++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/HttpUtils.java
@@ -0,0 +1,68 @@
+package com.alibaba.datax.plugin.writer.util;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.http.client.fluent.Content;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:HttpUtils
+ *
+ * @author Benedict Jin
+ * @since 2019-03-29
+ */
+public final class HttpUtils {
+
+ public final static Charset UTF_8 = Charset.forName("UTF-8");
+ public final static int CONNECT_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60);
+ public final static int SOCKET_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60);
+
+ private HttpUtils() {
+ }
+
+ public static String get(String url) throws Exception {
+ Content content = Request.Get(url)
+ .connectTimeout(CONNECT_TIMEOUT_DEFAULT_IN_MILL)
+ .socketTimeout(SOCKET_TIMEOUT_DEFAULT_IN_MILL)
+ .execute()
+ .returnContent();
+ if (content == null) {
+ return null;
+ }
+ return content.asString(UTF_8);
+ }
+
+ public static String post(String url, Map params) throws Exception {
+ return post(url, JSON.toJSONString(params), CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
+ }
+
+ public static String post(String url, String params) throws Exception {
+ return post(url, params, CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
+ }
+
+ public static String post(String url, Map params,
+ int connectTimeoutInMill, int socketTimeoutInMill) throws Exception {
+ return post(url, JSON.toJSONString(params), connectTimeoutInMill, socketTimeoutInMill);
+ }
+
+ public static String post(String url, String params,
+ int connectTimeoutInMill, int socketTimeoutInMill) throws Exception {
+ Content content = Request.Post(url)
+ .connectTimeout(connectTimeoutInMill)
+ .socketTimeout(socketTimeoutInMill)
+ .addHeader("Content-Type", "application/json")
+ .bodyString(params, ContentType.APPLICATION_JSON)
+ .execute()
+ .returnContent();
+ if (content == null) {
+ return null;
+ }
+ return content.asString(UTF_8);
+ }
+}
diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/TSDBUtils.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/TSDBUtils.java
new file mode 100644
index 00000000..ed01d877
--- /dev/null
+++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/TSDBUtils.java
@@ -0,0 +1,72 @@
+package com.alibaba.datax.plugin.writer.util;
+
+import com.alibaba.datax.plugin.writer.conn.DataPoint4TSDB;
+import com.alibaba.fastjson.JSON;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:TSDB Utils
+ *
+ * @author Benedict Jin
+ * @since 2019-03-29
+ */
+public final class TSDBUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TSDBUtils.class);
+
+ private TSDBUtils() {
+ }
+
+ public static String version(String address) {
+ String url = String.format("%s/api/version", address);
+ String rsp;
+ try {
+ rsp = HttpUtils.get(url);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return rsp;
+ }
+
+ public static String config(String address) {
+ String url = String.format("%s/api/config", address);
+ String rsp;
+ try {
+ rsp = HttpUtils.get(url);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return rsp;
+ }
+
+ public static boolean put(String address, List dps) {
+ return put(address, JSON.toJSON(dps));
+ }
+
+ public static boolean put(String address, DataPoint4TSDB dp) {
+ return put(address, JSON.toJSON(dp));
+ }
+
+ private static boolean put(String address, Object o) {
+ return put(address, o.toString());
+ }
+
+ public static boolean put(String address, String s) {
+ String url = String.format("%s/api/put", address);
+ String rsp;
+ try {
+ rsp = HttpUtils.post(url, s);
+ // If successful, the returned content should be null.
+ assert rsp == null;
+ } catch (Exception e) {
+ LOG.error("Address: {}, DataPoints: {}", url, s);
+ throw new RuntimeException(e);
+ }
+ return true;
+ }
+}
diff --git a/tsdbwriter/src/main/resources/plugin.json b/tsdbwriter/src/main/resources/plugin.json
new file mode 100755
index 00000000..78c8273f
--- /dev/null
+++ b/tsdbwriter/src/main/resources/plugin.json
@@ -0,0 +1,10 @@
+{
+ "name": "tsdbwriter",
+ "class": "com.alibaba.datax.plugin.writer.tsdbwriter.TSDBWriter",
+ "description": {
+ "useScene": "往 TSDB 中摄入数据点",
+ "mechanism": "调用 TSDB 的 /api/put 接口,实现数据点的写入",
+ "warn": ""
+ },
+ "developer": "Benedict Jin"
+}
diff --git a/tsdbwriter/src/main/resources/plugin_job_template.json b/tsdbwriter/src/main/resources/plugin_job_template.json
new file mode 100644
index 00000000..5d9b43db
--- /dev/null
+++ b/tsdbwriter/src/main/resources/plugin_job_template.json
@@ -0,0 +1,6 @@
+{
+ "name": "tsdbwriter",
+ "parameter": {
+ "endpoint": "http://localhost:8242"
+ }
+}
diff --git a/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/conn/TSDBConnectionTest.java b/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/conn/TSDBConnectionTest.java
new file mode 100644
index 00000000..455f4ce6
--- /dev/null
+++ b/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/conn/TSDBConnectionTest.java
@@ -0,0 +1,30 @@
+package com.alibaba.datax.plugin.writer.conn;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:TSDBConnection Test
+ *
+ * @author Benedict Jin
+ * @since 2019-03-29
+ */
+@Ignore
+public class TSDBConnectionTest {
+
+ private static final String TSDB_ADDRESS = "http://localhost:8240";
+
+ @Test
+ public void testVersion() {
+ String version = new TSDBConnection(TSDB_ADDRESS).version();
+ Assert.assertNotNull(version);
+ }
+
+ @Test
+ public void testIsSupported() {
+ Assert.assertTrue(new TSDBConnection(TSDB_ADDRESS).isSupported());
+ }
+}
diff --git a/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/Const.java b/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/Const.java
new file mode 100644
index 00000000..34b074d6
--- /dev/null
+++ b/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/Const.java
@@ -0,0 +1,18 @@
+package com.alibaba.datax.plugin.writer.util;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:Const
+ *
+ * @author Benedict Jin
+ * @since 2019-03-29
+ */
+final class Const {
+
+ private Const() {
+ }
+
+ static final String OPENTSDB_ADDRESS = "http://localhost:8242";
+ static final String TSDB_ADDRESS = "http://localhost:8240";
+}
diff --git a/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/HttpUtilsTest.java b/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/HttpUtilsTest.java
new file mode 100644
index 00000000..69f26b80
--- /dev/null
+++ b/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/HttpUtilsTest.java
@@ -0,0 +1,39 @@
+package com.alibaba.datax.plugin.writer.util;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:HttpUtils Test
+ *
+ * @author Benedict Jin
+ * @since 2019-03-29
+ */
+@Ignore
+public class HttpUtilsTest {
+
+ @Test
+ public void testSimpleCase() throws Exception {
+ String url = "https://httpbin.org/post";
+ Map params = new HashMap();
+ params.put("foo", "bar");
+
+ String rsp = HttpUtils.post(url, params);
+ System.out.println(rsp);
+ Assert.assertNotNull(rsp);
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ String url = String.format("%s/api/version", Const.OPENTSDB_ADDRESS);
+ String rsp = HttpUtils.get(url);
+ System.out.println(rsp);
+ Assert.assertNotNull(rsp);
+ }
+}
diff --git a/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/TSDBTest.java b/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/TSDBTest.java
new file mode 100644
index 00000000..7d22bb72
--- /dev/null
+++ b/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/TSDBTest.java
@@ -0,0 +1,28 @@
+package com.alibaba.datax.plugin.writer.util;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Copyright @ 2019 alibaba.com
+ * All right reserved.
+ * Function:TSDB Test
+ *
+ * @author Benedict Jin
+ * @since 2019-04-11
+ */
+@Ignore
+public class TSDBTest {
+
+ @Test
+ public void testVersion() {
+ String version = TSDBUtils.version(Const.TSDB_ADDRESS);
+ Assert.assertNotNull(version);
+ System.out.println(version);
+
+ version = TSDBUtils.version(Const.OPENTSDB_ADDRESS);
+ Assert.assertNotNull(version);
+ System.out.println(version);
+ }
+}
|