From 86553c490dded5709c047de33c1f0cda9004a18d Mon Sep 17 00:00:00 2001 From: zyyang Date: Thu, 16 Dec 2021 11:36:44 +0800 Subject: [PATCH] [TD-10786]: tdenginereader implementation --- package.xml | 7 + pom.xml | 1 + tdenginereader/doc/tdenginereader.md | 162 ++++++++++ tdenginereader/pom.xml | 101 ++++++ tdenginereader/src/main/assembly/package.xml | 34 ++ .../com/alibaba/datax/plugin/reader/Key.java | 19 ++ .../datax/plugin/reader/TDengineReader.java | 291 ++++++++++++++++++ .../reader/TDengineReaderErrorCode.java | 33 ++ tdenginereader/src/main/resources/plugin.json | 9 + .../main/resources/plugin_job_template.json | 21 ++ .../plugin/reader/TDengineReaderTest.java | 73 +++++ 11 files changed, 751 insertions(+) create mode 100644 tdenginereader/doc/tdenginereader.md create mode 100644 tdenginereader/pom.xml create mode 100755 tdenginereader/src/main/assembly/package.xml create mode 100644 tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/Key.java create mode 100644 tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java create mode 100644 tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReaderErrorCode.java create mode 100755 tdenginereader/src/main/resources/plugin.json create mode 100644 tdenginereader/src/main/resources/plugin_job_template.json create mode 100644 tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengineReaderTest.java diff --git a/package.xml b/package.xml index d95feb40..3f024f9d 100755 --- a/package.xml +++ b/package.xml @@ -180,6 +180,13 @@ datax + + tdenginereader/target/datax/ + + **/*.* + + datax + diff --git a/pom.xml b/pom.xml index 1a9da81b..72241137 100644 --- a/pom.xml +++ b/pom.xml @@ -108,6 +108,7 @@ hbase20xsqlreader hbase20xsqlwriter kuduwriter + tdenginereader diff --git a/tdenginereader/doc/tdenginereader.md b/tdenginereader/doc/tdenginereader.md new file mode 100644 index 00000000..3c683a64 --- /dev/null +++ b/tdenginereader/doc/tdenginereader.md @@ -0,0 +1,162 @@ +# DataX TDengineReader + +## 1 快速介绍 + +TDengineReader 插件实现了 TDengine 读取数据的功能。 + +## 2 实现原理 + +TDengineReader 通过TDengine的JDBC driver查询获取数据。 + +## 3 功能说明 + +### 3.1 配置样例 + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "tdenginereader", + "parameter": { + "user": "root", + "password": "taosdata", + "connection": [ + { + "table": [ + "meters" + ], + "jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/test?timestampFormat=TIMESTAMP" + } + ], + "column": [ + "ts", + "current", + "voltage", + "phase" + ], + "beginDateTime": "2017-07-14 10:40:00", + "endDateTime": "2017-08-14 10:40:00", + "splitInterval": "1d" + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "encoding": "UTF-8", + "print": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} +``` + +### 3.2 参数说明 + +* **username** + * 描述:TDengine实例的用户名
+ * 必选:是
+ * 默认值:无
+* **password** + * 描述:TDengine实例的密码
+ * 必选:是
+ * 默认值:无
+* **table** + * 描述:所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构, + TDengineReader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。
+ * 必选:是
+ * 默认值:无
+* **jdbcUrl** + * 描述:TDengine数据库的JDBC连接信息。注意,jdbcUrl必须包含在connection配置单元中。JdbcUrl具体请参看TDengine官方文档。 + * 必选:是
+ * 默认值:无
+* **beginDateTime** + * 描述:数据的开始时间,Job迁移从begineDateTime到endDateTime的数据,格式为yyyy-MM-dd HH:mm:ss
+ * 必选:是
+ * 默认值:无
+* **endDateTime** + * 描述:数据的结束时间,Job迁移从begineDateTime到endDateTime的数据,格式为yyyy-MM-dd HH:mm:ss
+ * 必选:是
+ * 默认值:无
+* **splitInterval** + * 描述:按照splitInterval来划分task, 每splitInterval创建一个task。例如,20d代表按照每20天的数据划分为1个task。 + 可以配置的时间单位为:d(天), h(小时), m(分钟), s(秒)
+ * 必选:是
+ * 默认值:无
+ +### 3.3 类型转换 +| TDengine 数据类型 | DataX 内部类型 | +| --------------- | ------------- | +| TINYINT | Long | +| SMALLINT | Long | +| INTEGER | Long | +| BIGINT | Long | +| FLOAT | Double | +| DOUBLE | Double | +| BOOLEAN | Bool | +| TIMESTAMP | Date | +| BINARY | Bytes | +| NCHAR | String | + + +## 4 性能报告 + +### 4.1 环境准备 + +#### 4.1.1 数据特征 + +建表语句: + +单行记录类似于: + +#### 4.1.2 机器参数 + +* 执行DataX的机器参数为: + 1. cpu: + 2. mem: + 3. net: 千兆双网卡 + 4. disc: DataX 数据不落磁盘,不统计此项 + +* TDengine数据库机器参数为: + 1. cpu: + 2. mem: + 3. net: 千兆双网卡 + 4. disc: + +#### 4.1.3 DataX jvm 参数 + + -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError + +### 4.2 测试报告 + +#### 4.2.1 单表测试报告 + +| 通道数| DataX速度(Rec/s)|DataX流量(MB/s)| DataX机器网卡流出流量(MB/s)|DataX机器运行负载|DB网卡进入流量(MB/s)|DB运行负载|DB TPS| +|--------| --------|--------|--------|--------|--------|--------|--------| +|1| | | | | | | | +|4| | | | | | | | +|8| | | | | | | | +|16| | | | | | | | +|32| | | | | | | | + +说明: + +1. 这里的单表,主键类型为 bigint(20),自增。 +2. batchSize 和 通道个数,对性能影响较大。 + +#### 4.2.4 性能测试小结 + +1. +2. + +## 5 约束限制 + +## FAQ \ No newline at end of file diff --git a/tdenginereader/pom.xml b/tdenginereader/pom.xml new file mode 100644 index 00000000..8c0f6645 --- /dev/null +++ b/tdenginereader/pom.xml @@ -0,0 +1,101 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + tdenginereader + + + 8 + 8 + + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + + com.taosdata.jdbc + taos-jdbcdriver + 2.0.34 + + + + junit + junit + ${junit-version} + test + + + com.alibaba.datax + plugin-rdbms-util + 0.0.1-SNAPSHOT + compile + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.12.4 + + + + **/*Test.java + + + + + true + + + + + + + \ No newline at end of file diff --git a/tdenginereader/src/main/assembly/package.xml b/tdenginereader/src/main/assembly/package.xml new file mode 100755 index 00000000..b52f20fb --- /dev/null +++ b/tdenginereader/src/main/assembly/package.xml @@ -0,0 +1,34 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/reader/tdenginereader + + + target/ + + tdenginereader-0.0.1-SNAPSHOT.jar + + plugin/reader/tdenginereader + + + + + + false + plugin/reader/tdenginereader/libs + runtime + + + diff --git a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/Key.java b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/Key.java new file mode 100644 index 00000000..eddf98eb --- /dev/null +++ b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/Key.java @@ -0,0 +1,19 @@ +package com.alibaba.datax.plugin.reader; + +public class Key { + + public static final String JDBC_URL = "jdbcUrl"; +// public static final String HOST = "host"; +// public static final String PORT = "port"; +// public static final String DB = "db"; + public static final String TABLE = "table"; + public static final String USER = "user"; + public static final String PASSWORD = "password"; + public static final String CONNECTION = "connection"; +// public static final String SQL = "sql"; + public static final String BEGIN_DATETIME = "beginDateTime"; + public static final String END_DATETIME = "endDateTime"; + public static final String SPLIT_INTERVAL = "splitInterval"; + public static final String COLUMN = "column"; + public static final String MANDATORY_ENCODING = "mandatoryEncoding"; +} diff --git a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java new file mode 100644 index 00000000..dfdce7b4 --- /dev/null +++ b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java @@ -0,0 +1,291 @@ +package com.alibaba.datax.plugin.reader; + +import com.alibaba.datax.common.constant.CommonConstant; +import com.alibaba.datax.common.element.*; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.fastjson.JSON; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UnsupportedEncodingException; +import java.sql.*; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class TDengineReader extends Reader { + private static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + public static class Job extends Reader.Job { + + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + private Configuration originalConfig; + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + // check user + String user = this.originalConfig.getString(Key.USER); + if (StringUtils.isBlank(user)) + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.USER + "] is not set."); + + // check password + String password = this.originalConfig.getString(Key.PASSWORD); + if (StringUtils.isBlank(password)) + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PASSWORD + "] is not set."); + + SimpleDateFormat format = new SimpleDateFormat(DATETIME_FORMAT); + // check beginDateTime + String beginDatetime = this.originalConfig.getString(Key.BEGIN_DATETIME); + if (StringUtils.isBlank(beginDatetime)) + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.BEGIN_DATETIME + "] is not set."); + Long start; + try { + start = format.parse(beginDatetime).getTime(); + } catch (ParseException e) { + throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.BEGIN_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format."); + } + + // check endDateTime + String endDatetime = this.originalConfig.getString(Key.END_DATETIME); + if (StringUtils.isBlank(endDatetime)) + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.END_DATETIME + "] is not set."); + Long end; + try { + end = format.parse(endDatetime).getTime(); + } catch (ParseException e) { + throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.END_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format."); + } + if (start >= end) + throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.BEGIN_DATETIME + "] should be less than the parameter [" + Key.END_DATETIME + "]."); + + // check splitInterval + String splitInterval = this.originalConfig.getString(Key.SPLIT_INTERVAL); + Long split; + if (StringUtils.isBlank(splitInterval)) + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.SPLIT_INTERVAL + "] is not set."); + try { + split = parseSplitInterval(splitInterval); + } catch (Exception e) { + throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.SPLIT_INTERVAL + "] should be like: \"123d|h|m|s\", error: " + e.getMessage()); + } + + this.originalConfig.set(Key.BEGIN_DATETIME, start); + this.originalConfig.set(Key.END_DATETIME, end); + this.originalConfig.set(Key.SPLIT_INTERVAL, split); + + // check connection + List connection = this.originalConfig.getList(Key.CONNECTION); + if (connection == null || connection.isEmpty()) + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.CONNECTION + "] is not set."); + for (int i = 0; i < connection.size(); i++) { + Configuration conn = Configuration.from(connection.get(i).toString()); + List table = conn.getList(Key.TABLE); + if (table == null || table.isEmpty()) + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.TABLE + "] of connection[" + (i + 1) + "] is not set."); + String jdbcUrl = conn.getString(Key.JDBC_URL); + if (StringUtils.isBlank(jdbcUrl)) + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.JDBC_URL + "] of connection[" + (i + 1) + "] is not set."); + } + + // check column + List column = this.originalConfig.getList(Key.COLUMN); + if (column == null || column.isEmpty()) + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.CONNECTION + "] is not set or is empty."); + } + + @Override + public void destroy() { + + } + + @Override + public List split(int adviceNumber) { + List configurations = new ArrayList<>(); + // do split + Long start = this.originalConfig.getLong(Key.BEGIN_DATETIME); + Long end = this.originalConfig.getLong(Key.END_DATETIME); + Long split = this.originalConfig.getLong(Key.SPLIT_INTERVAL); + + List conns = this.originalConfig.getList(Key.CONNECTION); + + for (Long ts = start; ts < end; ts += split) { + for (int i = 0; i < conns.size(); i++) { + Configuration clone = this.originalConfig.clone(); + clone.remove(Key.SPLIT_INTERVAL); + + clone.set(Key.BEGIN_DATETIME, ts); + clone.set(Key.END_DATETIME, Math.min(ts + split, end)); + + Configuration conf = Configuration.from(conns.get(i).toString()); + String jdbcUrl = conf.getString(Key.JDBC_URL); + clone.set(Key.JDBC_URL, jdbcUrl); + clone.set(Key.TABLE, conf.getList(Key.TABLE)); + + // 抽取 jdbcUrl 中的 ip/port 进行资源使用的打标,以提供给 core 做有意义的 shuffle 操作 + clone.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, DataBaseType.parseIpFromJdbcUrl(jdbcUrl)); + clone.remove(Key.CONNECTION); + + configurations.add(clone); + LOG.info("Configuration: {}", JSON.toJSONString(clone)); + } + } + return configurations; + } + } + + public static class Task extends Reader.Task { + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + private Connection conn; + + private Long startTime; + private Long endTime; + private List tables; + private List columns; + private String mandatoryEncoding; + + @Override + public void init() { + Configuration readerSliceConfig = super.getPluginJobConf(); + LOG.info("getPluginJobConf: {}", JSON.toJSONString(readerSliceConfig)); + + String url = readerSliceConfig.getString(Key.JDBC_URL); + if (StringUtils.isBlank(url)) + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.JDBC_URL + "] is not set."); + + tables = readerSliceConfig.getList(Key.TABLE, String.class); + columns = readerSliceConfig.getList(Key.COLUMN, String.class); + + String user = readerSliceConfig.getString(Key.USER); + String password = readerSliceConfig.getString(Key.PASSWORD); + + try { + conn = DriverManager.getConnection(url, user, password); + } catch (SQLException e) { + throw DataXException.asDataXException(TDengineReaderErrorCode.CONNECTION_FAILED, + "The parameter [" + Key.JDBC_URL + "] : " + url + " failed to connect since: " + e.getMessage()); + } + + this.mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, ""); + + startTime = readerSliceConfig.getLong(Key.BEGIN_DATETIME); + endTime = readerSliceConfig.getLong(Key.END_DATETIME); + } + + + @Override + public void destroy() { + + } + + @Override + public void startRead(RecordSender recordSender) { + try (Statement stmt = conn.createStatement()) { + for (int i = 0; i < tables.size(); i++) { + String sql = "select " + StringUtils.join(columns, ",") + " from " + tables.get(i) + " where _c0 >= " + startTime + " and _c0 < " + endTime; + ResultSet rs = stmt.executeQuery(sql); + ResultSetMetaData metaData = rs.getMetaData(); + while (rs.next()) { + transportOneRecord(recordSender, rs, metaData, metaData.getColumnCount(), this.mandatoryEncoding); + } + } + } catch (SQLException e) { + throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e); + } finally { + try { + if (conn != null) + conn.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + } + + private Record transportOneRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnCount, String mandatoryEncoding) { + Record record = buildRecord(recordSender, rs, metaData, columnCount, mandatoryEncoding); + recordSender.sendToWriter(record); + return record; + } + + private Record buildRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnCount, String mandatoryEncoding) { + Record record = recordSender.createRecord(); + + try { + for (int i = 1; i <= columnCount; i++) { + int columnType = metaData.getColumnType(i); + switch (columnType) { + case Types.SMALLINT: + case Types.TINYINT: + case Types.INTEGER: + case Types.BIGINT: + record.addColumn(new LongColumn(rs.getString(i))); + break; + case Types.FLOAT: + case Types.DOUBLE: + record.addColumn(new DoubleColumn(rs.getString(i))); + break; + case Types.BOOLEAN: + record.addColumn(new BoolColumn(rs.getBoolean(i))); + break; + case Types.TIMESTAMP: + record.addColumn(new DateColumn(rs.getTimestamp(i))); + break; + case Types.BINARY: + record.addColumn(new BytesColumn(rs.getBytes(i))); + break; + case Types.NCHAR: + String rawData; + if (StringUtils.isBlank(mandatoryEncoding)) { + rawData = rs.getString(i); + } else { + rawData = new String((rs.getBytes(i) == null ? new byte[0] : rs.getBytes(i)), mandatoryEncoding); + } + record.addColumn(new StringColumn(rawData)); + break; + } + } + } catch (SQLException | UnsupportedEncodingException e) { + throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e); + } + return record; + } + } + + private static final long second = 1000; + private static final long minute = 60 * second; + private static final long hour = 60 * minute; + private static final long day = 24 * hour; + + private static Long parseSplitInterval(String splitInterval) throws Exception { + Pattern compile = Pattern.compile("^(\\d+)([dhms])$"); + Matcher matcher = compile.matcher(splitInterval); + while (matcher.find()) { + Long value = Long.valueOf(matcher.group(1)); + if (value == 0) + throw new Exception("invalid splitInterval: 0"); + char unit = matcher.group(2).charAt(0); + switch (unit) { + case 'd': + return value * day; + default: + case 'h': + return value * hour; + case 'm': + return value * minute; + case 's': + return value * second; + } + } + throw new Exception("invalid splitInterval: " + splitInterval); + } + +} diff --git a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReaderErrorCode.java b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReaderErrorCode.java new file mode 100644 index 00000000..68bc11e7 --- /dev/null +++ b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReaderErrorCode.java @@ -0,0 +1,33 @@ +package com.alibaba.datax.plugin.reader; + +import com.alibaba.datax.common.spi.ErrorCode; + +public enum TDengineReaderErrorCode implements ErrorCode { + + REQUIRED_VALUE("TDengineReader-00", "缺失必要的值"), + ILLEGAL_VALUE("TDengineReader-01", "值非法"), + CONNECTION_FAILED("TDengineReader-02", "连接错误"); + + private final String code; + private final String description; + + TDengineReaderErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s]. ", this.code, this.description); + } +} diff --git a/tdenginereader/src/main/resources/plugin.json b/tdenginereader/src/main/resources/plugin.json new file mode 100755 index 00000000..7ccdbe63 --- /dev/null +++ b/tdenginereader/src/main/resources/plugin.json @@ -0,0 +1,9 @@ +{ + "name": "tdenginereader", + "class": "com.alibaba.datax.plugin.reader.TDengineReader", + "description": { + "useScene": "data migration from tdengine", + "mechanism": "use JDBC to read data from tdengine." + }, + "developer": "zyyang-taosdata" +} \ No newline at end of file diff --git a/tdenginereader/src/main/resources/plugin_job_template.json b/tdenginereader/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..2c7f4cb9 --- /dev/null +++ b/tdenginereader/src/main/resources/plugin_job_template.json @@ -0,0 +1,21 @@ +{ + "name": "tdenginereader", + "parameter": { + "user": "", + "password": "", + "connection": [ + { + "table": [ + "" + ], + "jdbcUrl": "" + } + ], + "column": [ + "" + ], + "beginDateTime": "", + "endDateTime": "", + "splitInterval": "" + } +} \ No newline at end of file diff --git a/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengineReaderTest.java b/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengineReaderTest.java new file mode 100644 index 00000000..9d83f743 --- /dev/null +++ b/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengineReaderTest.java @@ -0,0 +1,73 @@ +package com.alibaba.datax.plugin.reader; + +import com.alibaba.datax.common.util.Configuration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.List; +import java.util.stream.IntStream; + +public class TDengineReaderTest { + + TDengineReader.Job job; + + @Before + public void before() { + job = new TDengineReader.Job(); + Configuration configuration = Configuration.from("{" + + "\"user\": \"root\"," + + "\"password\": \"taosdata\"," + + "\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," + + "\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," + + "\"beginDateTime\": \"2021-01-01 00:00:00\"," + + "\"endDateTime\": \"2021-01-01 12:00:00\"," + + "\"splitInterval\": \"1h\"" + + "}"); + job.setPluginJobConf(configuration); + } + + @Test + public void jobInit() throws ParseException { + // when + job.init(); + + // assert + Configuration conf = job.getPluginJobConf(); + + Assert.assertEquals("root", conf.getString("user")); + Assert.assertEquals("taosdata", conf.getString("password")); + Assert.assertEquals("weather", conf.getString("connection[0].table[0]")); + Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl")); + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + Long start = sdf.parse("2021-01-01 00:00:00").getTime(); + Assert.assertEquals(start, conf.getLong("beginDateTime")); + + Long end = sdf.parse("2021-01-01 12:00:00").getTime(); + Assert.assertEquals(end, conf.getLong("endDateTime")); + + Assert.assertEquals(new Long(3600 * 1000), conf.getLong("splitInterval")); + } + + @Test + public void jobSplit() { + // when + job.init(); + List configurationList = job.split(1); + + // assert + Assert.assertEquals(12, configurationList.size()); + for (int i = 0; i < configurationList.size(); i++) { + Configuration conf = configurationList.get(i); + Assert.assertEquals("root", conf.getString("user")); + Assert.assertEquals("taosdata", conf.getString("password")); + Assert.assertEquals("weather", conf.getString("table[0]")); + Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl")); + } + } + +} \ No newline at end of file