mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 07:21:48 +08:00
[TD-10786]<feature>: tdenginereader implementation
This commit is contained in:
parent
f7793f487a
commit
12483fe75a
@ -180,6 +180,13 @@
|
||||
</includes>
|
||||
<outputDirectory>datax</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>tdenginereader/target/datax/</directory>
|
||||
<includes>
|
||||
<include>**/*.*</include>
|
||||
</includes>
|
||||
<outputDirectory>datax</outputDirectory>
|
||||
</fileSet>
|
||||
|
||||
<!-- writer -->
|
||||
<fileSet>
|
||||
|
1
pom.xml
1
pom.xml
@ -108,6 +108,7 @@
|
||||
<module>hbase20xsqlreader</module>
|
||||
<module>hbase20xsqlwriter</module>
|
||||
<module>kuduwriter</module>
|
||||
<module>tdenginereader</module>
|
||||
</modules>
|
||||
|
||||
<dependencyManagement>
|
||||
|
162
tdenginereader/doc/tdenginereader.md
Normal file
162
tdenginereader/doc/tdenginereader.md
Normal file
@ -0,0 +1,162 @@
|
||||
# DataX TDengineReader
|
||||
|
||||
## 1 快速介绍
|
||||
|
||||
TDengineReader 插件实现了 TDengine 读取数据的功能。
|
||||
|
||||
## 2 实现原理
|
||||
|
||||
TDengineReader 通过TDengine的JDBC driver查询获取数据。
|
||||
|
||||
## 3 功能说明
|
||||
|
||||
### 3.1 配置样例
|
||||
|
||||
```json
|
||||
{
|
||||
"job": {
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "tdenginereader",
|
||||
"parameter": {
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"connection": [
|
||||
{
|
||||
"table": [
|
||||
"meters"
|
||||
],
|
||||
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/test?timestampFormat=TIMESTAMP"
|
||||
}
|
||||
],
|
||||
"column": [
|
||||
"ts",
|
||||
"current",
|
||||
"voltage",
|
||||
"phase"
|
||||
],
|
||||
"beginDateTime": "2017-07-14 10:40:00",
|
||||
"endDateTime": "2017-08-14 10:40:00",
|
||||
"splitInterval": "1d"
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "streamwriter",
|
||||
"parameter": {
|
||||
"encoding": "UTF-8",
|
||||
"print": true
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 3.2 参数说明
|
||||
|
||||
* **username**
|
||||
* 描述:TDengine实例的用户名 <br />
|
||||
* 必选:是 <br />
|
||||
* 默认值:无 <br />
|
||||
* **password**
|
||||
* 描述:TDengine实例的密码 <br />
|
||||
* 必选:是 <br />
|
||||
* 默认值:无 <br />
|
||||
* **table**
|
||||
* 描述:所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构,
|
||||
TDengineReader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。<br />
|
||||
* 必选:是 <br />
|
||||
* 默认值:无 <br />
|
||||
* **jdbcUrl**
|
||||
* 描述:TDengine数据库的JDBC连接信息。注意,jdbcUrl必须包含在connection配置单元中。JdbcUrl具体请参看TDengine官方文档。
|
||||
* 必选:是 <br />
|
||||
* 默认值:无<br />
|
||||
* **beginDateTime**
|
||||
* 描述:数据的开始时间,Job迁移从begineDateTime到endDateTime的数据,格式为yyyy-MM-dd HH:mm:ss <br />
|
||||
* 必选:是 <br />
|
||||
* 默认值:无 <br />
|
||||
* **endDateTime**
|
||||
* 描述:数据的结束时间,Job迁移从begineDateTime到endDateTime的数据,格式为yyyy-MM-dd HH:mm:ss <br />
|
||||
* 必选:是 <br />
|
||||
* 默认值:无 <br />
|
||||
* **splitInterval**
|
||||
* 描述:按照splitInterval来划分task, 每splitInterval创建一个task。例如,20d代表按照每20天的数据划分为1个task。
|
||||
可以配置的时间单位为:d(天), h(小时), m(分钟), s(秒) <br />
|
||||
* 必选:是 <br />
|
||||
* 默认值:无 <br />
|
||||
|
||||
### 3.3 类型转换
|
||||
| TDengine 数据类型 | DataX 内部类型 |
|
||||
| --------------- | ------------- |
|
||||
| TINYINT | Long |
|
||||
| SMALLINT | Long |
|
||||
| INTEGER | Long |
|
||||
| BIGINT | Long |
|
||||
| FLOAT | Double |
|
||||
| DOUBLE | Double |
|
||||
| BOOLEAN | Bool |
|
||||
| TIMESTAMP | Date |
|
||||
| BINARY | Bytes |
|
||||
| NCHAR | String |
|
||||
|
||||
|
||||
## 4 性能报告
|
||||
|
||||
### 4.1 环境准备
|
||||
|
||||
#### 4.1.1 数据特征
|
||||
|
||||
建表语句:
|
||||
|
||||
单行记录类似于:
|
||||
|
||||
#### 4.1.2 机器参数
|
||||
|
||||
* 执行DataX的机器参数为:
|
||||
1. cpu:
|
||||
2. mem:
|
||||
3. net: 千兆双网卡
|
||||
4. disc: DataX 数据不落磁盘,不统计此项
|
||||
|
||||
* TDengine数据库机器参数为:
|
||||
1. cpu:
|
||||
2. mem:
|
||||
3. net: 千兆双网卡
|
||||
4. disc:
|
||||
|
||||
#### 4.1.3 DataX jvm 参数
|
||||
|
||||
-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
|
||||
|
||||
### 4.2 测试报告
|
||||
|
||||
#### 4.2.1 单表测试报告
|
||||
|
||||
| 通道数| DataX速度(Rec/s)|DataX流量(MB/s)| DataX机器网卡流出流量(MB/s)|DataX机器运行负载|DB网卡进入流量(MB/s)|DB运行负载|DB TPS|
|
||||
|--------| --------|--------|--------|--------|--------|--------|--------|
|
||||
|1| | | | | | | |
|
||||
|4| | | | | | | |
|
||||
|8| | | | | | | |
|
||||
|16| | | | | | | |
|
||||
|32| | | | | | | |
|
||||
|
||||
说明:
|
||||
|
||||
1. 这里的单表,主键类型为 bigint(20),自增。
|
||||
2. batchSize 和 通道个数,对性能影响较大。
|
||||
|
||||
#### 4.2.4 性能测试小结
|
||||
|
||||
1.
|
||||
2.
|
||||
|
||||
## 5 约束限制
|
||||
|
||||
## FAQ
|
101
tdenginereader/pom.xml
Normal file
101
tdenginereader/pom.xml
Normal file
@ -0,0 +1,101 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>datax-all</artifactId>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>tdenginereader</artifactId>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>8</maven.compiler.source>
|
||||
<maven.compiler.target>8</maven.compiler.target>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>datax-common</artifactId>
|
||||
<version>${datax-project-version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>2.0.34</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>${junit-version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>plugin-rdbms-util</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<!-- compiler plugin -->
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<source>${jdk-version}</source>
|
||||
<target>${jdk-version}</target>
|
||||
<encoding>${project-sourceEncoding}</encoding>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<configuration>
|
||||
<descriptors>
|
||||
<descriptor>src/main/assembly/package.xml</descriptor>
|
||||
</descriptors>
|
||||
<finalName>datax</finalName>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>dwzip</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>2.12.4</version>
|
||||
<configuration>
|
||||
<!-- 包含哪些测试用例 -->
|
||||
<includes>
|
||||
<include>**/*Test.java</include>
|
||||
</includes>
|
||||
<!-- 不包含哪些测试用例 -->
|
||||
<excludes>
|
||||
</excludes>
|
||||
<testFailureIgnore>true</testFailureIgnore>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
34
tdenginereader/src/main/assembly/package.xml
Executable file
34
tdenginereader/src/main/assembly/package.xml
Executable file
@ -0,0 +1,34 @@
|
||||
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
|
||||
<id></id>
|
||||
<formats>
|
||||
<format>dir</format>
|
||||
</formats>
|
||||
<includeBaseDirectory>false</includeBaseDirectory>
|
||||
<fileSets>
|
||||
<fileSet>
|
||||
<directory>src/main/resources</directory>
|
||||
<includes>
|
||||
<include>plugin.json</include>
|
||||
<include>plugin_job_template.json</include>
|
||||
</includes>
|
||||
<outputDirectory>plugin/reader/tdenginereader</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>target/</directory>
|
||||
<includes>
|
||||
<include>tdenginereader-0.0.1-SNAPSHOT.jar</include>
|
||||
</includes>
|
||||
<outputDirectory>plugin/reader/tdenginereader</outputDirectory>
|
||||
</fileSet>
|
||||
</fileSets>
|
||||
|
||||
<dependencySets>
|
||||
<dependencySet>
|
||||
<useProjectArtifact>false</useProjectArtifact>
|
||||
<outputDirectory>plugin/reader/tdenginereader/libs</outputDirectory>
|
||||
<scope>runtime</scope>
|
||||
</dependencySet>
|
||||
</dependencySets>
|
||||
</assembly>
|
@ -0,0 +1,19 @@
|
||||
package com.alibaba.datax.plugin.reader;
|
||||
|
||||
public class Key {
|
||||
|
||||
public static final String JDBC_URL = "jdbcUrl";
|
||||
// public static final String HOST = "host";
|
||||
// public static final String PORT = "port";
|
||||
// public static final String DB = "db";
|
||||
public static final String TABLE = "table";
|
||||
public static final String USER = "user";
|
||||
public static final String PASSWORD = "password";
|
||||
public static final String CONNECTION = "connection";
|
||||
// public static final String SQL = "sql";
|
||||
public static final String BEGIN_DATETIME = "beginDateTime";
|
||||
public static final String END_DATETIME = "endDateTime";
|
||||
public static final String SPLIT_INTERVAL = "splitInterval";
|
||||
public static final String COLUMN = "column";
|
||||
public static final String MANDATORY_ENCODING = "mandatoryEncoding";
|
||||
}
|
@ -0,0 +1,291 @@
|
||||
package com.alibaba.datax.plugin.reader;
|
||||
|
||||
import com.alibaba.datax.common.constant.CommonConstant;
|
||||
import com.alibaba.datax.common.element.*;
|
||||
import com.alibaba.datax.common.exception.DataXException;
|
||||
import com.alibaba.datax.common.plugin.RecordSender;
|
||||
import com.alibaba.datax.common.spi.Reader;
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.sql.*;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class TDengineReader extends Reader {
|
||||
private static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
|
||||
|
||||
public static class Job extends Reader.Job {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
|
||||
private Configuration originalConfig;
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
this.originalConfig = super.getPluginJobConf();
|
||||
// check user
|
||||
String user = this.originalConfig.getString(Key.USER);
|
||||
if (StringUtils.isBlank(user))
|
||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.USER + "] is not set.");
|
||||
|
||||
// check password
|
||||
String password = this.originalConfig.getString(Key.PASSWORD);
|
||||
if (StringUtils.isBlank(password))
|
||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PASSWORD + "] is not set.");
|
||||
|
||||
SimpleDateFormat format = new SimpleDateFormat(DATETIME_FORMAT);
|
||||
// check beginDateTime
|
||||
String beginDatetime = this.originalConfig.getString(Key.BEGIN_DATETIME);
|
||||
if (StringUtils.isBlank(beginDatetime))
|
||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.BEGIN_DATETIME + "] is not set.");
|
||||
Long start;
|
||||
try {
|
||||
start = format.parse(beginDatetime).getTime();
|
||||
} catch (ParseException e) {
|
||||
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.BEGIN_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format.");
|
||||
}
|
||||
|
||||
// check endDateTime
|
||||
String endDatetime = this.originalConfig.getString(Key.END_DATETIME);
|
||||
if (StringUtils.isBlank(endDatetime))
|
||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.END_DATETIME + "] is not set.");
|
||||
Long end;
|
||||
try {
|
||||
end = format.parse(endDatetime).getTime();
|
||||
} catch (ParseException e) {
|
||||
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.END_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format.");
|
||||
}
|
||||
if (start >= end)
|
||||
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.BEGIN_DATETIME + "] should be less than the parameter [" + Key.END_DATETIME + "].");
|
||||
|
||||
// check splitInterval
|
||||
String splitInterval = this.originalConfig.getString(Key.SPLIT_INTERVAL);
|
||||
Long split;
|
||||
if (StringUtils.isBlank(splitInterval))
|
||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.SPLIT_INTERVAL + "] is not set.");
|
||||
try {
|
||||
split = parseSplitInterval(splitInterval);
|
||||
} catch (Exception e) {
|
||||
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.SPLIT_INTERVAL + "] should be like: \"123d|h|m|s\", error: " + e.getMessage());
|
||||
}
|
||||
|
||||
this.originalConfig.set(Key.BEGIN_DATETIME, start);
|
||||
this.originalConfig.set(Key.END_DATETIME, end);
|
||||
this.originalConfig.set(Key.SPLIT_INTERVAL, split);
|
||||
|
||||
// check connection
|
||||
List<Object> connection = this.originalConfig.getList(Key.CONNECTION);
|
||||
if (connection == null || connection.isEmpty())
|
||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.CONNECTION + "] is not set.");
|
||||
for (int i = 0; i < connection.size(); i++) {
|
||||
Configuration conn = Configuration.from(connection.get(i).toString());
|
||||
List<Object> table = conn.getList(Key.TABLE);
|
||||
if (table == null || table.isEmpty())
|
||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.TABLE + "] of connection[" + (i + 1) + "] is not set.");
|
||||
String jdbcUrl = conn.getString(Key.JDBC_URL);
|
||||
if (StringUtils.isBlank(jdbcUrl))
|
||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.JDBC_URL + "] of connection[" + (i + 1) + "] is not set.");
|
||||
}
|
||||
|
||||
// check column
|
||||
List<Object> column = this.originalConfig.getList(Key.COLUMN);
|
||||
if (column == null || column.isEmpty())
|
||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.CONNECTION + "] is not set or is empty.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Configuration> split(int adviceNumber) {
|
||||
List<Configuration> configurations = new ArrayList<>();
|
||||
// do split
|
||||
Long start = this.originalConfig.getLong(Key.BEGIN_DATETIME);
|
||||
Long end = this.originalConfig.getLong(Key.END_DATETIME);
|
||||
Long split = this.originalConfig.getLong(Key.SPLIT_INTERVAL);
|
||||
|
||||
List<Object> conns = this.originalConfig.getList(Key.CONNECTION);
|
||||
|
||||
for (Long ts = start; ts < end; ts += split) {
|
||||
for (int i = 0; i < conns.size(); i++) {
|
||||
Configuration clone = this.originalConfig.clone();
|
||||
clone.remove(Key.SPLIT_INTERVAL);
|
||||
|
||||
clone.set(Key.BEGIN_DATETIME, ts);
|
||||
clone.set(Key.END_DATETIME, Math.min(ts + split, end));
|
||||
|
||||
Configuration conf = Configuration.from(conns.get(i).toString());
|
||||
String jdbcUrl = conf.getString(Key.JDBC_URL);
|
||||
clone.set(Key.JDBC_URL, jdbcUrl);
|
||||
clone.set(Key.TABLE, conf.getList(Key.TABLE));
|
||||
|
||||
// 抽取 jdbcUrl 中的 ip/port 进行资源使用的打标,以提供给 core 做有意义的 shuffle 操作
|
||||
clone.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, DataBaseType.parseIpFromJdbcUrl(jdbcUrl));
|
||||
clone.remove(Key.CONNECTION);
|
||||
|
||||
configurations.add(clone);
|
||||
LOG.info("Configuration: {}", JSON.toJSONString(clone));
|
||||
}
|
||||
}
|
||||
return configurations;
|
||||
}
|
||||
}
|
||||
|
||||
public static class Task extends Reader.Task {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
|
||||
private Connection conn;
|
||||
|
||||
private Long startTime;
|
||||
private Long endTime;
|
||||
private List<String> tables;
|
||||
private List<String> columns;
|
||||
private String mandatoryEncoding;
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
Configuration readerSliceConfig = super.getPluginJobConf();
|
||||
LOG.info("getPluginJobConf: {}", JSON.toJSONString(readerSliceConfig));
|
||||
|
||||
String url = readerSliceConfig.getString(Key.JDBC_URL);
|
||||
if (StringUtils.isBlank(url))
|
||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
|
||||
"The parameter [" + Key.JDBC_URL + "] is not set.");
|
||||
|
||||
tables = readerSliceConfig.getList(Key.TABLE, String.class);
|
||||
columns = readerSliceConfig.getList(Key.COLUMN, String.class);
|
||||
|
||||
String user = readerSliceConfig.getString(Key.USER);
|
||||
String password = readerSliceConfig.getString(Key.PASSWORD);
|
||||
|
||||
try {
|
||||
conn = DriverManager.getConnection(url, user, password);
|
||||
} catch (SQLException e) {
|
||||
throw DataXException.asDataXException(TDengineReaderErrorCode.CONNECTION_FAILED,
|
||||
"The parameter [" + Key.JDBC_URL + "] : " + url + " failed to connect since: " + e.getMessage());
|
||||
}
|
||||
|
||||
this.mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, "");
|
||||
|
||||
startTime = readerSliceConfig.getLong(Key.BEGIN_DATETIME);
|
||||
endTime = readerSliceConfig.getLong(Key.END_DATETIME);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startRead(RecordSender recordSender) {
|
||||
try (Statement stmt = conn.createStatement()) {
|
||||
for (int i = 0; i < tables.size(); i++) {
|
||||
String sql = "select " + StringUtils.join(columns, ",") + " from " + tables.get(i) + " where _c0 >= " + startTime + " and _c0 < " + endTime;
|
||||
ResultSet rs = stmt.executeQuery(sql);
|
||||
ResultSetMetaData metaData = rs.getMetaData();
|
||||
while (rs.next()) {
|
||||
transportOneRecord(recordSender, rs, metaData, metaData.getColumnCount(), this.mandatoryEncoding);
|
||||
}
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e);
|
||||
} finally {
|
||||
try {
|
||||
if (conn != null)
|
||||
conn.close();
|
||||
} catch (SQLException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Record transportOneRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnCount, String mandatoryEncoding) {
|
||||
Record record = buildRecord(recordSender, rs, metaData, columnCount, mandatoryEncoding);
|
||||
recordSender.sendToWriter(record);
|
||||
return record;
|
||||
}
|
||||
|
||||
private Record buildRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnCount, String mandatoryEncoding) {
|
||||
Record record = recordSender.createRecord();
|
||||
|
||||
try {
|
||||
for (int i = 1; i <= columnCount; i++) {
|
||||
int columnType = metaData.getColumnType(i);
|
||||
switch (columnType) {
|
||||
case Types.SMALLINT:
|
||||
case Types.TINYINT:
|
||||
case Types.INTEGER:
|
||||
case Types.BIGINT:
|
||||
record.addColumn(new LongColumn(rs.getString(i)));
|
||||
break;
|
||||
case Types.FLOAT:
|
||||
case Types.DOUBLE:
|
||||
record.addColumn(new DoubleColumn(rs.getString(i)));
|
||||
break;
|
||||
case Types.BOOLEAN:
|
||||
record.addColumn(new BoolColumn(rs.getBoolean(i)));
|
||||
break;
|
||||
case Types.TIMESTAMP:
|
||||
record.addColumn(new DateColumn(rs.getTimestamp(i)));
|
||||
break;
|
||||
case Types.BINARY:
|
||||
record.addColumn(new BytesColumn(rs.getBytes(i)));
|
||||
break;
|
||||
case Types.NCHAR:
|
||||
String rawData;
|
||||
if (StringUtils.isBlank(mandatoryEncoding)) {
|
||||
rawData = rs.getString(i);
|
||||
} else {
|
||||
rawData = new String((rs.getBytes(i) == null ? new byte[0] : rs.getBytes(i)), mandatoryEncoding);
|
||||
}
|
||||
record.addColumn(new StringColumn(rawData));
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (SQLException | UnsupportedEncodingException e) {
|
||||
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e);
|
||||
}
|
||||
return record;
|
||||
}
|
||||
}
|
||||
|
||||
private static final long second = 1000;
|
||||
private static final long minute = 60 * second;
|
||||
private static final long hour = 60 * minute;
|
||||
private static final long day = 24 * hour;
|
||||
|
||||
private static Long parseSplitInterval(String splitInterval) throws Exception {
|
||||
Pattern compile = Pattern.compile("^(\\d+)([dhms])$");
|
||||
Matcher matcher = compile.matcher(splitInterval);
|
||||
while (matcher.find()) {
|
||||
Long value = Long.valueOf(matcher.group(1));
|
||||
if (value == 0)
|
||||
throw new Exception("invalid splitInterval: 0");
|
||||
char unit = matcher.group(2).charAt(0);
|
||||
switch (unit) {
|
||||
case 'd':
|
||||
return value * day;
|
||||
default:
|
||||
case 'h':
|
||||
return value * hour;
|
||||
case 'm':
|
||||
return value * minute;
|
||||
case 's':
|
||||
return value * second;
|
||||
}
|
||||
}
|
||||
throw new Exception("invalid splitInterval: " + splitInterval);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
package com.alibaba.datax.plugin.reader;
|
||||
|
||||
import com.alibaba.datax.common.spi.ErrorCode;
|
||||
|
||||
public enum TDengineReaderErrorCode implements ErrorCode {
|
||||
|
||||
REQUIRED_VALUE("TDengineReader-00", "缺失必要的值"),
|
||||
ILLEGAL_VALUE("TDengineReader-01", "值非法"),
|
||||
CONNECTION_FAILED("TDengineReader-02", "连接错误");
|
||||
|
||||
private final String code;
|
||||
private final String description;
|
||||
|
||||
TDengineReaderErrorCode(String code, String description) {
|
||||
this.code = code;
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCode() {
|
||||
return this.code;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return this.description;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);
|
||||
}
|
||||
}
|
9
tdenginereader/src/main/resources/plugin.json
Executable file
9
tdenginereader/src/main/resources/plugin.json
Executable file
@ -0,0 +1,9 @@
|
||||
{
|
||||
"name": "tdenginereader",
|
||||
"class": "com.alibaba.datax.plugin.reader.TDengineReader",
|
||||
"description": {
|
||||
"useScene": "data migration from tdengine",
|
||||
"mechanism": "use JNI to read data from tdengine."
|
||||
},
|
||||
"developer": "zyyang-taosdata"
|
||||
}
|
14
tdenginereader/src/main/resources/plugin_job_template.json
Normal file
14
tdenginereader/src/main/resources/plugin_job_template.json
Normal file
@ -0,0 +1,14 @@
|
||||
{
|
||||
"name": "tdenginereader",
|
||||
"parameter": {
|
||||
"host": "127.0.0.1",
|
||||
"port": 6030,
|
||||
"db": "test",
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"sql": "select * from weather",
|
||||
"beginDateTime": "2021-01-01 00:00:00",
|
||||
"endDateTime": "2021-01-02 00:00:00",
|
||||
"splitInterval": "1h"
|
||||
}
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
package com.alibaba.datax.plugin.reader;
|
||||
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
public class TDengineReaderTest {
|
||||
|
||||
TDengineReader.Job job;
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
job = new TDengineReader.Job();
|
||||
Configuration configuration = Configuration.from("{" +
|
||||
"\"user\": \"root\"," +
|
||||
"\"password\": \"taosdata\"," +
|
||||
"\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," +
|
||||
"\"beginDateTime\": \"2021-01-01 00:00:00\"," +
|
||||
"\"endDateTime\": \"2021-01-01 10:00:00\"," +
|
||||
"\"splitInterval\": \"1h\"" +
|
||||
"}");
|
||||
job.setPluginJobConf(configuration);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void jobInit() {
|
||||
job.init();
|
||||
|
||||
// assert
|
||||
Configuration conf = job.getPluginJobConf();
|
||||
Assert.assertEquals("select * from weather", conf.getString("sql"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void jobSplit() {
|
||||
// when
|
||||
job.init();
|
||||
List<Configuration> configurationList = job.split(1);
|
||||
|
||||
// IntStream.range(0, configurationList.size()).forEach(i -> System.out.println(i + ": " + configurationList.get(i)));
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user