merge master

This commit is contained in:
zyyang 2022-03-14 10:17:29 +08:00
commit 50c40095f3
23 changed files with 853 additions and 255 deletions

View File

@ -73,7 +73,7 @@ public class Engine {
boolean traceEnable = allConf.getBool(CoreConstant.DATAX_CORE_CONTAINER_TRACE_ENABLE, true); boolean traceEnable = allConf.getBool(CoreConstant.DATAX_CORE_CONTAINER_TRACE_ENABLE, true);
boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true); boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true);
//standlone模式的datax shell任务不进行汇报 //standalone模式的 datax shell任务不进行汇报
if(instanceId == -1){ if(instanceId == -1){
perfReportEnable = false; perfReportEnable = false;
} }

View File

@ -12,6 +12,8 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。
### 3.1 配置样例 ### 3.1 配置样例
* 配置一个从 TDengine 抽取数据作业:
```json ```json
{ {
"job": { "job": {
@ -27,7 +29,9 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。
"table": [ "table": [
"meters" "meters"
], ],
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/test?timestampFormat=TIMESTAMP" "jdbcUrl": [
"jdbc:TAOS-RS://192.168.56.105:6041/test?timestampFormat=TIMESTAMP"
]
} }
], ],
"column": [ "column": [
@ -36,9 +40,51 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。
"voltage", "voltage",
"phase" "phase"
], ],
"where": "ts>=0",
"beginDateTime": "2017-07-14 10:40:00", "beginDateTime": "2017-07-14 10:40:00",
"endDateTime": "2017-08-14 10:40:00", "endDateTime": "2017-08-14 10:40:00"
"splitInterval": "1d" }
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
```
* 配置一个自定义 SQL 的数据抽取作业:
```json
{
"job": {
"content": [
{
"reader": {
"name": "tdenginereader",
"parameter": {
"user": "root",
"password": "taosdata",
"connection": [
{
"querySql": [
"select * from test.meters"
],
"jdbcUrl": [
"jdbc:TAOS-RS://192.168.56.105:6041/test?timestampFormat=TIMESTAMP"
]
}
]
} }
}, },
"writer": { "writer": {
@ -69,30 +115,37 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。
* 描述TDengine 实例的密码 <br /> * 描述TDengine 实例的密码 <br />
* 必选:是 <br /> * 必选:是 <br />
* 默认值:无 <br /> * 默认值:无 <br />
* **table**
* 描述所选取的需要同步的表。使用JSON的数组描述因此支持多张表同时抽取。当配置为多张表时用户自己需保证多张表是同一schema结构
TDengineReader不予检查表是否同一逻辑表。注意table必须包含在connection配置单元中。<br />
* 必选:是 <br />
* 默认值:无 <br />
* **jdbcUrl** * **jdbcUrl**
* 描述TDengine 数据库的JDBC连接信息。注意jdbcUrl必须包含在connection配置单元中。JdbcUrl具体请参看TDengine官方文档。 * 描述TDengine 数据库的JDBC连接信息。注意jdbcUrl必须包含在connection配置单元中。JdbcUrl具体请参看TDengine官方文档。
* 必选:是 <br /> * 必选:是 <br />
* 默认值:无<br /> * 默认值:无<br />
* **querySql**
* 描述在有些业务场景下where 这一配置项不足以描述所筛选的条件用户可以通过该配置型来自定义筛选SQL。当用户配置了 querySql 后, TDengineReader 就会忽略 table, column,
where, beginDateTime, endDateTime这些配置型直接使用这个配置项的内容对数据进行筛选。例如需要 进行多表join后同步数据使用 select a,b from table_a join
table_b on table_a.id = table_b.id<br />
* 必选:否 <br />
* 默认值:无 <br />
* **table**
* 描述:所选取的需要同步的表。使用 JSON 的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一 schema 结构, TDengineReader不予检查表是否同一逻辑表。注意table必须包含在
connection 配置单元中。<br />
* 必选:是 <br />
* 默认值:无 <br />
* **where**
* 描述:筛选条件中的 where 子句TDengineReader 根据指定的column, table, where, begingDateTime, endDateTime 条件拼接 SQL并根据这个 SQL
进行数据抽取。 <br />
* 必选:否 <br />
* 默认值:无 <br />
* **beginDateTime** * **beginDateTime**
* 描述数据的开始时间Job 迁移从 begineDateTime 到 endDateTime 的数据,格式为 yyyy-MM-dd HH:mm:ss <br /> * 描述数据的开始时间Job 迁移从 begineDateTime 到 endDateTime 的数据,格式为 yyyy-MM-dd HH:mm:ss <br />
* 必选:是 <br /> * 必选: <br />
* 默认值:无 <br /> * 默认值:无 <br />
* **endDateTime** * **endDateTime**
* 描述数据的结束时间Job 迁移从 begineDateTime 到 endDateTime 的数据,格式为 yyyy-MM-dd HH:mm:ss <br /> * 描述数据的结束时间Job 迁移从 begineDateTime 到 endDateTime 的数据,格式为 yyyy-MM-dd HH:mm:ss <br />
* 必选:是 <br /> * 必选:否 <br />
* 默认值:无 <br />
* **splitInterval**
* 描述按照splitInterval来划分task, 每splitInterval创建一个task。例如20d代表按照每20天的数据划分为1个task。
可以配置的时间单位为d, h小时, m分钟, s <br />
* 必选:是 <br />
* 默认值:无 <br /> * 默认值:无 <br />
### 3.3 类型转换 ### 3.3 类型转换
| TDengine 数据类型 | DataX 内部类型 | | TDengine 数据类型 | DataX 内部类型 |
| --------------- | ------------- | | --------------- | ------------- |
| TINYINT | Long | | TINYINT | Long |
@ -106,7 +159,6 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。
| BINARY | Bytes | | BINARY | Bytes |
| NCHAR | String | | NCHAR | String |
## 4 性能报告 ## 4 性能报告
### 4.1 环境准备 ### 4.1 环境准备

View File

@ -29,10 +29,29 @@
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax.tdenginewriter</groupId>
<artifactId>tdenginewriter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency> <dependency>
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>2.0.37</version> <version>2.0.37</version>
<exclusions>
<exclusion>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
@ -47,6 +66,21 @@
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<!-- 添加 dm8 jdbc jar 包依赖-->
<!-- <dependency>-->
<!-- <groupId>com.dameng</groupId>-->
<!-- <artifactId>dm-jdbc</artifactId>-->
<!-- <version>1.8</version>-->
<!-- <scope>system</scope>-->
<!-- <systemPath>${project.basedir}/src/test/resources/DmJdbcDriver18.jar</systemPath>-->
<!-- </dependency>-->
</dependencies> </dependencies>
<build> <build>

View File

@ -1,19 +0,0 @@
package com.alibaba.datax.plugin.reader;
public class Key {
public static final String JDBC_URL = "jdbcUrl";
// public static final String HOST = "host";
// public static final String PORT = "port";
// public static final String DB = "db";
public static final String TABLE = "table";
public static final String USER = "username";
public static final String PASSWORD = "password";
public static final String CONNECTION = "connection";
// public static final String SQL = "sql";
public static final String BEGIN_DATETIME = "beginDateTime";
public static final String END_DATETIME = "endDateTime";
public static final String SPLIT_INTERVAL = "splitInterval";
public static final String COLUMN = "column";
public static final String MANDATORY_ENCODING = "mandatoryEncoding";
}

View File

@ -1,13 +1,11 @@
package com.alibaba.datax.plugin.reader; package com.alibaba.datax.plugin.reader;
import com.alibaba.datax.common.constant.CommonConstant;
import com.alibaba.datax.common.element.*; import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader; import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.datax.plugin.writer.tdenginewriter.Key;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -17,89 +15,86 @@ import java.sql.*;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class TDengineReader extends Reader { public class TDengineReader extends Reader {
private static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; private static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
public static class Job extends Reader.Job { public static class Job extends Reader.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class); private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig; private Configuration originalConfig;
@Override @Override
public void init() { public void init() {
this.originalConfig = super.getPluginJobConf(); this.originalConfig = super.getPluginJobConf();
// check user // check username
String user = this.originalConfig.getString(Key.USER); String username = this.originalConfig.getString(Key.USERNAME);
if (StringUtils.isBlank(user)) if (StringUtils.isBlank(username))
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.USER + "] is not set."); throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.USERNAME + "] is not set.");
// check password // check password
String password = this.originalConfig.getString(Key.PASSWORD); String password = this.originalConfig.getString(Key.PASSWORD);
if (StringUtils.isBlank(password)) if (StringUtils.isBlank(password))
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PASSWORD + "] is not set."); throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.PASSWORD + "] is not set.");
// check connection
List<Configuration> connectionList = this.originalConfig.getListConfiguration(Key.CONNECTION);
if (connectionList == null || connectionList.isEmpty())
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.CONNECTION + "] is not set.");
for (int i = 0; i < connectionList.size(); i++) {
Configuration conn = connectionList.get(i);
// check jdbcUrl
List<Object> jdbcUrlList = conn.getList(Key.JDBC_URL);
if (jdbcUrlList == null || jdbcUrlList.isEmpty()) {
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.JDBC_URL + "] of connection[" + (i + 1) + "] is not set.");
}
// check table/querySql
List<Object> querySqlList = conn.getList(Key.QUERY_SQL);
if (querySqlList == null || querySqlList.isEmpty()) {
String querySql = conn.getString(Key.QUERY_SQL);
if (StringUtils.isBlank(querySql)) {
List<Object> table = conn.getList(Key.TABLE);
if (table == null || table.isEmpty())
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.TABLE + "] of connection[" + (i + 1) + "] is not set.");
}
}
}
SimpleDateFormat format = new SimpleDateFormat(DATETIME_FORMAT); SimpleDateFormat format = new SimpleDateFormat(DATETIME_FORMAT);
// check beginDateTime // check beginDateTime
String beginDatetime = this.originalConfig.getString(Key.BEGIN_DATETIME); String beginDatetime = this.originalConfig.getString(Key.BEGIN_DATETIME);
if (StringUtils.isBlank(beginDatetime)) long start = Long.MIN_VALUE;
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.BEGIN_DATETIME + "] is not set."); if (!StringUtils.isBlank(beginDatetime)) {
Long start;
try { try {
start = format.parse(beginDatetime).getTime(); start = format.parse(beginDatetime).getTime();
} catch (ParseException e) { } catch (ParseException e) {
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.BEGIN_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format."); throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.BEGIN_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format.");
}
} }
// check endDateTime // check endDateTime
String endDatetime = this.originalConfig.getString(Key.END_DATETIME); String endDatetime = this.originalConfig.getString(Key.END_DATETIME);
if (StringUtils.isBlank(endDatetime)) long end = Long.MAX_VALUE;
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.END_DATETIME + "] is not set."); if (!StringUtils.isBlank(endDatetime)) {
Long end;
try { try {
end = format.parse(endDatetime).getTime(); end = format.parse(endDatetime).getTime();
} catch (ParseException e) { } catch (ParseException e) {
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.END_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format."); throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.END_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format.");
}
} }
if (start >= end) if (start >= end)
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.BEGIN_DATETIME + "] should be less than the parameter [" + Key.END_DATETIME + "]."); throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.BEGIN_DATETIME + "] should be less than the parameter [" + Key.END_DATETIME + "].");
// check splitInterval
String splitInterval = this.originalConfig.getString(Key.SPLIT_INTERVAL);
Long split;
if (StringUtils.isBlank(splitInterval))
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.SPLIT_INTERVAL + "] is not set.");
try {
split = parseSplitInterval(splitInterval);
} catch (Exception e) {
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.SPLIT_INTERVAL + "] should be like: \"123d|h|m|s\", error: " + e.getMessage());
}
this.originalConfig.set(Key.BEGIN_DATETIME, start);
this.originalConfig.set(Key.END_DATETIME, end);
this.originalConfig.set(Key.SPLIT_INTERVAL, split);
// check connection
List<Object> connection = this.originalConfig.getList(Key.CONNECTION);
if (connection == null || connection.isEmpty())
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.CONNECTION + "] is not set.");
for (int i = 0; i < connection.size(); i++) {
Configuration conn = Configuration.from(connection.get(i).toString());
List<Object> table = conn.getList(Key.TABLE);
if (table == null || table.isEmpty())
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.TABLE + "] of connection[" + (i + 1) + "] is not set.");
String jdbcUrl = conn.getString(Key.JDBC_URL);
if (StringUtils.isBlank(jdbcUrl))
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.JDBC_URL + "] of connection[" + (i + 1) + "] is not set.");
}
// check column
List<Object> column = this.originalConfig.getList(Key.COLUMN);
if (column == null || column.isEmpty())
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.CONNECTION + "] is not set or is empty.");
} }
@Override @Override
@ -110,78 +105,73 @@ public class TDengineReader extends Reader {
@Override @Override
public List<Configuration> split(int adviceNumber) { public List<Configuration> split(int adviceNumber) {
List<Configuration> configurations = new ArrayList<>(); List<Configuration> configurations = new ArrayList<>();
// do split
Long start = this.originalConfig.getLong(Key.BEGIN_DATETIME);
Long end = this.originalConfig.getLong(Key.END_DATETIME);
Long split = this.originalConfig.getLong(Key.SPLIT_INTERVAL);
List<Object> conns = this.originalConfig.getList(Key.CONNECTION); List<Configuration> connectionList = this.originalConfig.getListConfiguration(Key.CONNECTION);
for (Configuration conn : connectionList) {
for (Long ts = start; ts < end; ts += split) { List<String> jdbcUrlList = conn.getList(Key.JDBC_URL, String.class);
for (int i = 0; i < conns.size(); i++) { for (String jdbcUrl : jdbcUrlList) {
Configuration clone = this.originalConfig.clone(); Configuration clone = this.originalConfig.clone();
clone.remove(Key.SPLIT_INTERVAL);
clone.set(Key.BEGIN_DATETIME, ts);
clone.set(Key.END_DATETIME, Math.min(ts + split, end));
Configuration conf = Configuration.from(conns.get(i).toString());
String jdbcUrl = conf.getString(Key.JDBC_URL);
clone.set(Key.JDBC_URL, jdbcUrl); clone.set(Key.JDBC_URL, jdbcUrl);
clone.set(Key.TABLE, conf.getList(Key.TABLE)); clone.set(Key.TABLE, conn.getList(Key.TABLE));
clone.set(Key.QUERY_SQL, conn.getList(Key.QUERY_SQL));
// 抽取 jdbcUrl 中的 ip/port 进行资源使用的打标以提供给 core 做有意义的 shuffle 操作
clone.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, DataBaseType.parseIpFromJdbcUrl(jdbcUrl));
clone.remove(Key.CONNECTION); clone.remove(Key.CONNECTION);
configurations.add(clone); configurations.add(clone);
LOG.info("Configuration: {}", JSON.toJSONString(clone));
} }
} }
LOG.info("Configuration: {}", configurations);
return configurations; return configurations;
} }
} }
public static class Task extends Reader.Task { public static class Task extends Reader.Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class); private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private Configuration readerSliceConfig;
private String mandatoryEncoding;
private Connection conn; private Connection conn;
private Long startTime;
private Long endTime;
private List<String> tables; private List<String> tables;
private List<String> columns; private List<String> columns;
private String mandatoryEncoding; private String startTime;
private String endTime;
private String where;
private List<String> querySql;
static {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
@Override @Override
public void init() { public void init() {
Configuration readerSliceConfig = super.getPluginJobConf(); this.readerSliceConfig = super.getPluginJobConf();
LOG.info("getPluginJobConf: {}", JSON.toJSONString(readerSliceConfig)); LOG.info("getPluginJobConf: {}", readerSliceConfig);
String url = readerSliceConfig.getString(Key.JDBC_URL); String user = readerSliceConfig.getString(Key.USERNAME);
if (StringUtils.isBlank(url))
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.JDBC_URL + "] is not set.");
tables = readerSliceConfig.getList(Key.TABLE, String.class);
columns = readerSliceConfig.getList(Key.COLUMN, String.class);
String user = readerSliceConfig.getString(Key.USER);
String password = readerSliceConfig.getString(Key.PASSWORD); String password = readerSliceConfig.getString(Key.PASSWORD);
String url = readerSliceConfig.getString(Key.JDBC_URL);
try { try {
conn = DriverManager.getConnection(url, user, password); this.conn = DriverManager.getConnection(url, user, password);
} catch (SQLException e) { } catch (SQLException e) {
throw DataXException.asDataXException(TDengineReaderErrorCode.CONNECTION_FAILED, throw DataXException.asDataXException(TDengineReaderErrorCode.CONNECTION_FAILED,
"The parameter [" + Key.JDBC_URL + "] : " + url + " failed to connect since: " + e.getMessage()); "The parameter [" + Key.JDBC_URL + "] : " + url + " failed to connect since: " + e.getMessage(), e);
} }
this.mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, ""); this.tables = readerSliceConfig.getList(Key.TABLE, String.class);
this.columns = readerSliceConfig.getList(Key.COLUMN, String.class);
startTime = readerSliceConfig.getLong(Key.BEGIN_DATETIME); this.startTime = readerSliceConfig.getString(Key.BEGIN_DATETIME);
endTime = readerSliceConfig.getLong(Key.END_DATETIME); this.endTime = readerSliceConfig.getString(Key.END_DATETIME);
this.where = readerSliceConfig.getString(Key.WHERE, "_c0 > " + Long.MIN_VALUE);
this.querySql = readerSliceConfig.getList(Key.QUERY_SQL, String.class);
this.mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, "UTF-8");
} }
@Override @Override
public void destroy() { public void destroy() {
@ -189,18 +179,36 @@ public class TDengineReader extends Reader {
@Override @Override
public void startRead(RecordSender recordSender) { public void startRead(RecordSender recordSender) {
try (Statement stmt = conn.createStatement()) { List<String> sqlList = new ArrayList<>();
if (querySql == null || querySql.isEmpty()) {
for (String table : tables) { for (String table : tables) {
String sql = "select " + StringUtils.join(columns, ",") + " from " + table StringBuilder sb = new StringBuilder();
+ " where _c0 >= " + startTime + " and _c0 < " + endTime; sb.append("select ").append(StringUtils.join(columns, ",")).append(" from ").append(table).append(" ");
sb.append("where ").append(where);
if (!StringUtils.isBlank(startTime)) {
sb.append(" and _c0 >= '").append(startTime).append("'");
}
if (!StringUtils.isBlank(endTime)) {
sb.append(" and _c0 < '").append(endTime).append("'");
}
String sql = sb.toString().trim();
sqlList.add(sql);
}
} else {
sqlList.addAll(querySql);
}
try (Statement stmt = conn.createStatement()) {
for (String sql : sqlList) {
ResultSet rs = stmt.executeQuery(sql); ResultSet rs = stmt.executeQuery(sql);
ResultSetMetaData metaData = rs.getMetaData();
while (rs.next()) { while (rs.next()) {
transportOneRecord(recordSender, rs, metaData, metaData.getColumnCount(), this.mandatoryEncoding); Record record = buildRecord(recordSender, rs, mandatoryEncoding);
recordSender.sendToWriter(record);
} }
} }
} catch (SQLException e) { } catch (SQLException e) {
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e); throw DataXException.asDataXException(TDengineReaderErrorCode.RUNTIME_EXCEPTION, e.getMessage(), e);
} finally { } finally {
try { try {
if (conn != null) if (conn != null)
@ -211,17 +219,11 @@ public class TDengineReader extends Reader {
} }
} }
private Record transportOneRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnCount, String mandatoryEncoding) { private Record buildRecord(RecordSender recordSender, ResultSet rs, String mandatoryEncoding) {
Record record = buildRecord(recordSender, rs, metaData, columnCount, mandatoryEncoding);
recordSender.sendToWriter(record);
return record;
}
private Record buildRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnCount, String mandatoryEncoding) {
Record record = recordSender.createRecord(); Record record = recordSender.createRecord();
try { try {
for (int i = 1; i <= columnCount; i++) { ResultSetMetaData metaData = rs.getMetaData();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
int columnType = metaData.getColumnType(i); int columnType = metaData.getColumnType(i);
switch (columnType) { switch (columnType) {
case Types.SMALLINT: case Types.SMALLINT:
@ -254,39 +256,14 @@ public class TDengineReader extends Reader {
break; break;
} }
} }
} catch (SQLException | UnsupportedEncodingException e) { } catch (SQLException e) {
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e); throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "database query error", e);
} catch (UnsupportedEncodingException e) {
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "illegal mandatoryEncoding", e);
} }
return record; return record;
} }
} }
private static final long second = 1000;
private static final long minute = 60 * second;
private static final long hour = 60 * minute;
private static final long day = 24 * hour;
private static Long parseSplitInterval(String splitInterval) throws Exception {
Pattern compile = Pattern.compile("^(\\d+)([dhms])$");
Matcher matcher = compile.matcher(splitInterval);
while (matcher.find()) {
Long value = Long.valueOf(matcher.group(1));
if (value == 0)
throw new Exception("invalid splitInterval: 0");
char unit = matcher.group(2).charAt(0);
switch (unit) {
case 'd':
return value * day;
default:
case 'h':
return value * hour;
case 'm':
return value * minute;
case 's':
return value * second;
}
}
throw new Exception("invalid splitInterval: " + splitInterval);
}
} }

View File

@ -4,9 +4,10 @@ import com.alibaba.datax.common.spi.ErrorCode;
public enum TDengineReaderErrorCode implements ErrorCode { public enum TDengineReaderErrorCode implements ErrorCode {
REQUIRED_VALUE("TDengineReader-00", "缺失必要的值"), REQUIRED_VALUE("TDengineReader-00", "parameter value is missing"),
ILLEGAL_VALUE("TDengineReader-01", "值非法"), ILLEGAL_VALUE("TDengineReader-01", "invalid parameter value"),
CONNECTION_FAILED("TDengineReader-02", "连接错误"); CONNECTION_FAILED("TDengineReader-02", "connection error"),
RUNTIME_EXCEPTION("TDengineWriter-03", "runtime exception");
private final String code; private final String code;
private final String description; private final String description;

View File

@ -8,7 +8,9 @@
"table": [ "table": [
"" ""
], ],
"jdbcUrl": "" "jdbcUrl": [
""
]
} }
], ],
"column": [ "column": [
@ -16,6 +18,6 @@
], ],
"beginDateTime": "", "beginDateTime": "",
"endDateTime": "", "endDateTime": "",
"splitInterval": "" "where": ""
} }
} }

View File

@ -0,0 +1,86 @@
package com.alibaba.datax.plugin.reader;
import com.alibaba.datax.core.Engine;
import org.junit.Ignore;
import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Random;
@Ignore
public class TDengine2DMTest {
private static final String host1 = "192.168.56.105";
private static final String host2 = "192.168.0.72";
private final Random random = new Random(System.currentTimeMillis());
@Test
public void t2dm_case01() throws Throwable {
// given
createSupTable("ms");
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2dm.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Test
public void t2dm_case02() throws Throwable {
// given
createSupTable("us");
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2dm.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Test
public void t2dm_case03() throws Throwable {
// given
createSupTable("ns");
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2dm.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
private void createSupTable(String precision) throws SQLException {
final String url = "jdbc:TAOS-RS://" + host1 + ":6041/";
try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db1");
stmt.execute("create database if not exists db1 precision '" + precision + "'");
stmt.execute("create table db1.stb1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, " +
"f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, t3 smallint, " +
"t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(100))");
for (int i = 1; i <= 10; i++) {
stmt.execute("insert into db1.tb" + i + " using db1.stb1 tags(now, " + random.nextInt(10) + "," +
random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextInt(10) + "," +
random.nextFloat() + "," + random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123'," +
"'北京朝阳望京') values(now+" + i + "s, " + random.nextInt(10) + "," + random.nextInt(10) + "," +
+random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextFloat() + "," +
random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123','北京朝阳望京')");
}
stmt.close();
}
final String url2 = "jdbc:dm://" + host2 + ":5236";
try (Connection conn = DriverManager.getConnection(url2, "TESTUSER", "test123456")) {
conn.setAutoCommit(true);
Statement stmt = conn.createStatement();
stmt.execute("drop table if exists stb2");
stmt.execute("create table stb2(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, " +
"f6 double, f7 BIT, f8 VARCHAR(100), f9 VARCHAR2(200), t1 timestamp, t2 tinyint, t3 smallint, " +
"t4 int, t5 bigint, t6 float, t7 double, t8 BIT, t9 VARCHAR(100), t10 VARCHAR2(200))");
}
}
}

View File

@ -0,0 +1,66 @@
package com.alibaba.datax.plugin.reader;
import com.alibaba.datax.core.Engine;
import org.junit.Ignore;
import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Random;
@Ignore
public class TDengine2StreamTest {
private static final String host = "192.168.56.105";
private static final Random random = new Random(System.currentTimeMillis());
@Test
public void case01() throws Throwable {
// given
prepare("ms");
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2stream-1.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Test
public void case02() throws Throwable {
// given
prepare("ms");
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2stream-2.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
private void prepare(String precision) throws SQLException {
final String url = "jdbc:TAOS-RS://" + host + ":6041/";
try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db1");
stmt.execute("create database if not exists db1 precision '" + precision + "'");
stmt.execute("create table db1.stb1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, " +
"f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, t3 smallint, " +
"t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(100))");
for (int i = 1; i <= 10; i++) {
stmt.execute("insert into db1.tb" + i + " using db1.stb1 tags(now, " + random.nextInt(10) + "," +
random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextInt(10) + "," +
random.nextFloat() + "," + random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123'," +
"'北京朝阳望京') values(now+" + i + "s, " + random.nextInt(10) + "," + random.nextInt(10) + "," +
+random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextFloat() + "," +
random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123','北京朝阳望京')");
}
stmt.close();
}
}
}

View File

@ -1,72 +1,153 @@
package com.alibaba.datax.plugin.reader; package com.alibaba.datax.plugin.reader;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.tdenginewriter.Key;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.List; import java.util.List;
public class TDengineReaderTest { public class TDengineReaderTest {
TDengineReader.Job job; @Test
public void jobInit_case01() {
@Before // given
public void before() { TDengineReader.Job job = new TDengineReader.Job();
job = new TDengineReader.Job();
Configuration configuration = Configuration.from("{" + Configuration configuration = Configuration.from("{" +
"\"user\": \"root\"," + "\"username\": \"root\"," +
"\"password\": \"taosdata\"," + "\"password\": \"taosdata\"," +
"\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," + "\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," +
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," + "\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," +
"\"where\":\"_c0 > 0\"," +
"\"beginDateTime\": \"2021-01-01 00:00:00\"," + "\"beginDateTime\": \"2021-01-01 00:00:00\"," +
"\"endDateTime\": \"2021-01-01 12:00:00\"," + "\"endDateTime\": \"2021-01-01 12:00:00\"" +
"\"splitInterval\": \"1h\"" +
"}"); "}");
job.setPluginJobConf(configuration); job.setPluginJobConf(configuration);
}
@Test
public void jobInit() throws ParseException {
// when // when
job.init(); job.init();
// assert // assert
Configuration conf = job.getPluginJobConf(); Configuration conf = job.getPluginJobConf();
Assert.assertEquals("root", conf.getString("user")); Assert.assertEquals("root", conf.getString(Key.USERNAME));
Assert.assertEquals("taosdata", conf.getString("password")); Assert.assertEquals("taosdata", conf.getString("password"));
Assert.assertEquals("weather", conf.getString("connection[0].table[0]")); Assert.assertEquals("weather", conf.getString("connection[0].table[0]"));
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl")); Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl[0]"));
Assert.assertEquals("2021-01-01 00:00:00", conf.getString("beginDateTime"));
Assert.assertEquals("2021-01-01 12:00:00", conf.getString("endDateTime"));
Assert.assertEquals("_c0 > 0", conf.getString("where"));
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Long start = sdf.parse("2021-01-01 00:00:00").getTime(); @Test
Assert.assertEquals(start, conf.getLong("beginDateTime")); public void jobInit_case02() {
// given
TDengineReader.Job job = new TDengineReader.Job();
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"connection\": [{\"querySql\":[\"select * from weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," +
"}");
job.setPluginJobConf(configuration);
Long end = sdf.parse("2021-01-01 12:00:00").getTime(); // when
Assert.assertEquals(end, conf.getLong("endDateTime")); job.init();
Assert.assertEquals(new Long(3600 * 1000), conf.getLong("splitInterval")); // assert
Configuration conf = job.getPluginJobConf();
Assert.assertEquals("root", conf.getString(Key.USERNAME));
Assert.assertEquals("taosdata", conf.getString("password"));
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl[0]"));
Assert.assertEquals("select * from weather", conf.getString("connection[0].querySql[0]"));
} }
@Test @Test
public void jobSplit() { public void jobSplit_case01() {
// given
TDengineReader.Job job = new TDengineReader.Job();
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," +
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," +
"\"where\":\"_c0 > 0\"," +
"\"beginDateTime\": \"2021-01-01 00:00:00\"," +
"\"endDateTime\": \"2021-01-01 12:00:00\"" +
"}");
job.setPluginJobConf(configuration);
// when // when
job.init(); job.init();
List<Configuration> configurationList = job.split(1); List<Configuration> configurationList = job.split(1);
// assert // assert
Assert.assertEquals(12, configurationList.size()); Assert.assertEquals(1, configurationList.size());
for (int i = 0; i < configurationList.size(); i++) { Configuration conf = configurationList.get(0);
Configuration conf = configurationList.get(i); Assert.assertEquals("root", conf.getString("username"));
Assert.assertEquals("root", conf.getString("user"));
Assert.assertEquals("taosdata", conf.getString("password")); Assert.assertEquals("taosdata", conf.getString("password"));
Assert.assertEquals("_c0 > 0", conf.getString("where"));
Assert.assertEquals("weather", conf.getString("table[0]")); Assert.assertEquals("weather", conf.getString("table[0]"));
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl")); Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl"));
} }
@Test
public void jobSplit_case02() {
// given
TDengineReader.Job job = new TDengineReader.Job();
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"connection\": [{\"querySql\":[\"select * from weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," +
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," +
"}");
job.setPluginJobConf(configuration);
// when
job.init();
List<Configuration> configurationList = job.split(1);
// assert
Assert.assertEquals(1, configurationList.size());
Configuration conf = configurationList.get(0);
Assert.assertEquals("root", conf.getString("username"));
Assert.assertEquals("taosdata", conf.getString("password"));
Assert.assertEquals("select * from weather", conf.getString("querySql[0]"));
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl"));
}
@Test
public void jobSplit_case03() {
// given
TDengineReader.Job job = new TDengineReader.Job();
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"connection\": [{\"querySql\":[\"select * from weather\",\"select * from test.meters\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\", \"jdbc:TAOS://master:6030/test\"]}]," +
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," +
"}");
job.setPluginJobConf(configuration);
// when
job.init();
List<Configuration> configurationList = job.split(1);
// assert
Assert.assertEquals(2, configurationList.size());
Configuration conf = configurationList.get(0);
Assert.assertEquals("root", conf.getString("username"));
Assert.assertEquals("taosdata", conf.getString("password"));
Assert.assertEquals("select * from weather", conf.getString("querySql[0]"));
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl"));
Configuration conf1 = configurationList.get(1);
Assert.assertEquals("root", conf1.getString("username"));
Assert.assertEquals("taosdata", conf1.getString("password"));
Assert.assertEquals("select * from weather", conf1.getString("querySql[0]"));
Assert.assertEquals("select * from test.meters", conf1.getString("querySql[1]"));
Assert.assertEquals("jdbc:TAOS://master:6030/test", conf1.getString("jdbcUrl"));
} }
} }

View File

@ -0,0 +1,52 @@
{
"job": {
"content": [
{
"reader": {
"name": "tdenginereader",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"*"
],
"connection": [
{
"table": [
"stb1"
],
"jdbcUrl": [
"jdbc:TAOS-RS://192.168.56.105:6041/db1"
]
}
]
}
},
"writer": {
"name": "rdbmswriter",
"parameter": {
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:dm://192.168.0.72:5236"
}
],
"username": "TESTUSER",
"password": "test123456",
"table": "stb2",
"column": [
"*"
]
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,47 @@
{
"job": {
"content": [
{
"reader": {
"name": "tdenginereader",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"ts",
"f1",
"f2",
"t1",
"t2"
],
"connection": [
{
"table": [
"stb1"
],
"jdbcUrl": [
"jdbc:TAOS-RS://192.168.56.105:6041/db1"
]
}
],
"where": "t10 = '北京朝阳望京'",
"beginDateTime": "2022-03-07 12:00:00",
"endDateTime": "2022-03-07 19:00:00"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,37 @@
{
"job": {
"content": [
{
"reader": {
"name": "tdenginereader",
"parameter": {
"username": "root",
"password": "taosdata",
"connection": [
{
"querySql": [
"select * from stb1 where t10 = '北京朝阳望京' and _c0 >= '2022-03-07 12:00:00' and _c0 < '2022-03-07 19:00:00'"
],
"jdbcUrl": [
"jdbc:TAOS-RS://192.168.56.105:6041/db1"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -68,6 +68,21 @@
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
<scope>test</scope>
</dependency>
<!-- 添加 dm8 jdbc jar 包依赖-->
<!-- <dependency>-->
<!-- <groupId>com.dameng</groupId>-->
<!-- <artifactId>dm-jdbc</artifactId>-->
<!-- <version>1.8</version>-->
<!-- <scope>system</scope>-->
<!-- <systemPath>${project.basedir}/src/test/resources/DmJdbcDriver18.jar-->
<!-- </systemPath>-->
<!-- </dependency>-->
</dependencies> </dependencies>

View File

@ -21,6 +21,7 @@ import java.util.stream.Collectors;
public class DefaultDataHandler implements DataHandler { public class DefaultDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class); private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
private final TaskPluginCollector taskPluginCollector;
private String username; private String username;
private String password; private String password;
private String jdbcUrl; private String jdbcUrl;
@ -47,7 +48,6 @@ public class DefaultDataHandler implements DataHandler {
private Map<String, List<ColumnMeta>> columnMetas; private Map<String, List<ColumnMeta>> columnMetas;
static { static {
try { try {
Class.forName("com.taosdata.jdbc.TSDBDriver"); Class.forName("com.taosdata.jdbc.TSDBDriver");
@ -57,7 +57,7 @@ public class DefaultDataHandler implements DataHandler {
} }
} }
public DefaultDataHandler(Configuration configuration) { public DefaultDataHandler(Configuration configuration, TaskPluginCollector taskPluginCollector) {
this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME); this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME);
this.password = configuration.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD); this.password = configuration.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD);
this.jdbcUrl = configuration.getString(Key.JDBC_URL); this.jdbcUrl = configuration.getString(Key.JDBC_URL);
@ -65,6 +65,7 @@ public class DefaultDataHandler implements DataHandler {
this.tables = configuration.getList(Key.TABLE, String.class); this.tables = configuration.getList(Key.TABLE, String.class);
this.columns = configuration.getList(Key.COLUMN, String.class); this.columns = configuration.getList(Key.COLUMN, String.class);
this.ignoreTagsUnmatched = configuration.getBool(Key.IGNORE_TAGS_UNMATCHED, Constants.DEFAULT_IGNORE_TAGS_UNMATCHED); this.ignoreTagsUnmatched = configuration.getBool(Key.IGNORE_TAGS_UNMATCHED, Constants.DEFAULT_IGNORE_TAGS_UNMATCHED);
this.taskPluginCollector = taskPluginCollector;
} }
@Override @Override
@ -72,6 +73,7 @@ public class DefaultDataHandler implements DataHandler {
int count = 0; int count = 0;
int affectedRows = 0; int affectedRows = 0;
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) { try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established."); LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established.");
// prepare table_name -> table_meta // prepare table_name -> table_meta
@ -86,14 +88,24 @@ public class DefaultDataHandler implements DataHandler {
if (i % batchSize != 0) { if (i % batchSize != 0) {
recordBatch.add(record); recordBatch.add(record);
} else { } else {
try {
affectedRows = writeBatch(conn, recordBatch); affectedRows = writeBatch(conn, recordBatch);
} catch (SQLException e) {
LOG.warn("use one row insert. because:" + e.getMessage());
affectedRows = writeEachRow(conn, recordBatch);
}
recordBatch.clear(); recordBatch.clear();
} }
count++; count++;
} }
if (!recordBatch.isEmpty()) { if (!recordBatch.isEmpty()) {
try {
affectedRows = writeBatch(conn, recordBatch); affectedRows = writeBatch(conn, recordBatch);
} catch (SQLException e) {
LOG.warn("use one row insert. because:" + e.getMessage());
affectedRows = writeEachRow(conn, recordBatch);
}
recordBatch.clear(); recordBatch.clear();
} }
} catch (SQLException e) { } catch (SQLException e) {
@ -107,6 +119,21 @@ public class DefaultDataHandler implements DataHandler {
return affectedRows; return affectedRows;
} }
private int writeEachRow(Connection conn, List<Record> recordBatch) {
int affectedRows = 0;
for (Record record : recordBatch) {
List<Record> recordList = new ArrayList<>();
recordList.add(record);
try {
affectedRows += writeBatch(conn, recordList);
} catch (SQLException e) {
LOG.error(e.getMessage());
this.taskPluginCollector.collectDirtyRecord(record, e);
}
}
return affectedRows;
}
/** /**
* table: [ "stb1", "stb2", "tb1", "tb2", "t1" ] * table: [ "stb1", "stb2", "tb1", "tb2", "t1" ]
* stb1[ts,f1,f2] tags:[t1] * stb1[ts,f1,f2] tags:[t1]
@ -118,7 +145,7 @@ public class DefaultDataHandler implements DataHandler {
* 3. 对于tb拼sql例如data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => insert into tb(ts, f1, f2) values(ts, f1, f2) * 3. 对于tb拼sql例如data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => insert into tb(ts, f1, f2) values(ts, f1, f2)
* 4. 对于t拼sql例如data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, f3, t1, t2] insert into t(ts, f1, f2, f3, t1, t2) values(ts, f1, f2, f3, t1, t2) * 4. 对于t拼sql例如data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, f3, t1, t2] insert into t(ts, f1, f2, f3, t1, t2) values(ts, f1, f2, f3, t1, t2)
*/ */
public int writeBatch(Connection conn, List<Record> recordBatch) { public int writeBatch(Connection conn, List<Record> recordBatch) throws SQLException {
int affectedRows = 0; int affectedRows = 0;
for (String table : tables) { for (String table : tables) {
TableMeta tableMeta = tableMetas.get(table); TableMeta tableMeta = tableMetas.get(table);
@ -146,7 +173,7 @@ public class DefaultDataHandler implements DataHandler {
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
*/ */
private int writeBatchToSupTableBySQL(Connection conn, String table, List<Record> recordBatch) { private int writeBatchToSupTableBySQL(Connection conn, String table, List<Record> recordBatch) throws SQLException {
List<ColumnMeta> columnMetas = this.columnMetas.get(table); List<ColumnMeta> columnMetas = this.columnMetas.get(table);
StringBuilder sb = new StringBuilder("insert into"); StringBuilder sb = new StringBuilder("insert into");
@ -177,13 +204,11 @@ public class DefaultDataHandler implements DataHandler {
return executeUpdate(conn, sql); return executeUpdate(conn, sql);
} }
private int executeUpdate(Connection conn, String sql) throws DataXException { private int executeUpdate(Connection conn, String sql) throws SQLException {
int count; int count;
try (Statement stmt = conn.createStatement()) { try (Statement stmt = conn.createStatement()) {
LOG.debug(">>> " + sql); LOG.debug(">>> " + sql);
count = stmt.executeUpdate(sql); count = stmt.executeUpdate(sql);
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
} }
return count; return count;
} }
@ -227,7 +252,7 @@ public class DefaultDataHandler implements DataHandler {
* table: ["stb1"], column: ["ts", "f1", "f2", "t1"] * table: ["stb1"], column: ["ts", "f1", "f2", "t1"]
* data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => schemaless: stb1,t1=t1 f1=f1,f2=f2 ts * data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => schemaless: stb1,t1=t1 f1=f1,f2=f2 ts
*/ */
private int writeBatchToSupTableBySchemaless(Connection conn, String table, List<Record> recordBatch) { private int writeBatchToSupTableBySchemaless(Connection conn, String table, List<Record> recordBatch) throws SQLException {
int count = 0; int count = 0;
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision(); TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
@ -296,11 +321,8 @@ public class DefaultDataHandler implements DataHandler {
default: default:
timestampType = SchemalessTimestampType.NOT_CONFIGURED; timestampType = SchemalessTimestampType.NOT_CONFIGURED;
} }
try {
writer.write(lines, SchemalessProtocolType.LINE, timestampType); writer.write(lines, SchemalessProtocolType.LINE, timestampType);
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
LOG.warn("schemalessWriter does not return affected rows!"); LOG.warn("schemalessWriter does not return affected rows!");
return count; return count;
@ -370,7 +392,7 @@ public class DefaultDataHandler implements DataHandler {
* else * else
* insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)]) * insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)])
*/ */
private int writeBatchToSubTable(Connection conn, String table, List<Record> recordBatch) { private int writeBatchToSubTable(Connection conn, String table, List<Record> recordBatch) throws SQLException {
List<ColumnMeta> columnMetas = this.columnMetas.get(table); List<ColumnMeta> columnMetas = this.columnMetas.get(table);
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
@ -440,7 +462,7 @@ public class DefaultDataHandler implements DataHandler {
* table: ["weather"], column: ["ts, f1, f2, f3, t1, t2"] * table: ["weather"], column: ["ts, f1, f2, f3, t1, t2"]
* sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...) * sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...)
*/ */
private int writeBatchToNormalTable(Connection conn, String table, List<Record> recordBatch) { private int writeBatchToNormalTable(Connection conn, String table, List<Record> recordBatch) throws SQLException {
List<ColumnMeta> columnMetas = this.columnMetas.get(table); List<ColumnMeta> columnMetas = this.columnMetas.get(table);
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();

View File

@ -9,4 +9,10 @@ public class Key {
public static final String JDBC_URL = "jdbcUrl"; public static final String JDBC_URL = "jdbcUrl";
public static final String COLUMN = "column"; public static final String COLUMN = "column";
public static final String IGNORE_TAGS_UNMATCHED = "ignoreTagsUnmatched"; public static final String IGNORE_TAGS_UNMATCHED = "ignoreTagsUnmatched";
public static final String BEGIN_DATETIME = "beginDateTime";
public static final String END_DATETIME = "endDateTime";
public static final String WHERE = "where";
public static final String QUERY_SQL = "querySql";
public static final String MANDATORY_ENCODING = "mandatoryEncoding";
} }

View File

@ -17,7 +17,7 @@ public class SchemaManager {
private final Connection conn; private final Connection conn;
private TimestampPrecision precision; private TimestampPrecision precision;
SchemaManager(Connection conn) { public SchemaManager(Connection conn) {
this.conn = conn; this.conn = conn;
} }

View File

@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -25,7 +26,7 @@ public class TDengineWriter extends Writer {
this.originalConfig = super.getPluginJobConf(); this.originalConfig = super.getPluginJobConf();
this.originalConfig.set(PEER_PLUGIN_NAME, getPeerPluginName()); this.originalConfig.set(PEER_PLUGIN_NAME, getPeerPluginName());
// check user // check username
String user = this.originalConfig.getString(Key.USERNAME); String user = this.originalConfig.getString(Key.USERNAME);
if (StringUtils.isBlank(user)) if (StringUtils.isBlank(user))
throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter [" throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter ["
@ -81,10 +82,12 @@ public class TDengineWriter extends Writer {
private static final Logger LOG = LoggerFactory.getLogger(Task.class); private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private Configuration writerSliceConfig; private Configuration writerSliceConfig;
private TaskPluginCollector taskPluginCollector;
@Override @Override
public void init() { public void init() {
this.writerSliceConfig = getPluginJobConf(); this.writerSliceConfig = getPluginJobConf();
this.taskPluginCollector = super.getTaskPluginCollector();
} }
@Override @Override
@ -101,7 +104,7 @@ public class TDengineWriter extends Writer {
if (peerPluginName.equals("opentsdbreader")) if (peerPluginName.equals("opentsdbreader"))
handler = new OpentsdbDataHandler(this.writerSliceConfig); handler = new OpentsdbDataHandler(this.writerSliceConfig);
else else
handler = new DefaultDataHandler(this.writerSliceConfig); handler = new DefaultDataHandler(this.writerSliceConfig, this.taskPluginCollector);
long records = handler.handle(lineReceiver, getTaskPluginCollector()); long records = handler.handle(lineReceiver, getTaskPluginCollector());
LOG.debug("handle data finished, records: " + records); LOG.debug("handle data finished, records: " + records);

View File

@ -0,0 +1,41 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.core.Engine;
import org.junit.Ignore;
import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
@Ignore
public class Csv2TDengineTest {
private static final String host = "192.168.56.105";
@Test
public void case01() throws Throwable {
// given
prepareTable();
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/csv2t.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
public void prepareTable() throws SQLException {
final String url = "jdbc:TAOS-RS://" + host + ":6041";
try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists test");
stmt.execute("create database if not exists test");
stmt.execute("create table test.weather (ts timestamp, temperature bigint, humidity double, is_normal bool) " +
"tags(device_id binary(10),address nchar(10))");
}
}
}

View File

@ -101,7 +101,7 @@ public class DM2TDengineTest {
} }
@Before @Before
public void before() throws SQLException, ClassNotFoundException { public void before() throws SQLException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();

View File

@ -4,6 +4,8 @@ import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.LongColumn; import com.alibaba.datax.common.element.LongColumn;
import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn; import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.record.DefaultRecord; import com.alibaba.datax.core.transport.record.DefaultRecord;
import org.junit.*; import org.junit.*;
@ -23,6 +25,8 @@ public class DefaultDataHandlerTest {
private static final String host = "192.168.1.93"; private static final String host = "192.168.1.93";
private static Connection conn; private static Connection conn;
private final TaskPluginCollector taskPluginCollector = new TDengineWriter.Task().getTaskPluginCollector();
@Test @Test
public void writeSupTableBySQL() throws SQLException { public void writeSupTableBySQL() throws SQLException {
// given // given
@ -46,8 +50,9 @@ public class DefaultDataHandlerTest {
return record; return record;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
// when // when
DefaultDataHandler handler = new DefaultDataHandler(configuration); DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector);
List<String> tables = configuration.getList("table", String.class); List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(conn); SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables); Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
@ -85,7 +90,7 @@ public class DefaultDataHandlerTest {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
// when // when
DefaultDataHandler handler = new DefaultDataHandler(configuration); DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector);
List<String> tables = configuration.getList("table", String.class); List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(conn); SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables); Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
@ -125,7 +130,7 @@ public class DefaultDataHandlerTest {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
// when // when
DefaultDataHandler handler = new DefaultDataHandler(configuration); DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector);
List<String> tables = configuration.getList("table", String.class); List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(connection); SchemaManager schemaManager = new SchemaManager(connection);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables); Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
@ -164,7 +169,7 @@ public class DefaultDataHandlerTest {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
// when // when
DefaultDataHandler handler = new DefaultDataHandler(configuration); DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector);
List<String> tables = configuration.getList("table", String.class); List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(conn); SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables); Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
@ -203,7 +208,7 @@ public class DefaultDataHandlerTest {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
// when // when
DefaultDataHandler handler = new DefaultDataHandler(configuration); DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector);
List<String> tables = configuration.getList("table", String.class); List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(conn); SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables); Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
@ -242,7 +247,7 @@ public class DefaultDataHandlerTest {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
// when // when
DefaultDataHandler handler = new DefaultDataHandler(configuration); DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector);
List<String> tables = configuration.getList("table", String.class); List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(conn); SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables); Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);

View File

@ -0,0 +1,80 @@
{
"job": {
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": [
"/Users/yangzy/IdeaProjects/DataX/tdenginewriter/src/test/resources/weather.csv"
],
"encoding": "UTF-8",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "date",
"format": "yyy-MM-dd HH:mm:ss.SSS"
},
{
"index": 2,
"type": "long"
},
{
"index": 3,
"type": "double"
},
{
"index": 4,
"type": "long"
},
{
"index": 5,
"type": "string"
},
{
"index": 6,
"type": "String"
}
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"tbname",
"ts",
"temperature",
"humidity",
"is_normal",
"device_id",
"address"
],
"connection": [
{
"table": [
"weather"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/test"
}
],
"batchSize": 100,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,10 @@
tb1,2022-02-20 04:05:59.255,5,8.591868744,1,abcABC123,北京朝阳望京
tb1,2022-02-20 04:58:47.068,3,1.489693641,1,abcABC123,北京朝阳望京
tb1,2022-02-20 06:31:09.408,1,4.026500719,1,abcABC123,北京朝阳望京
tb1,2022-02-20 08:08:00.336,1,9.606400360,1,abcABC123,北京朝阳望京
tb1,2022-02-20 08:28:58.053,9,7.872178184,1,abcABC123123,北京朝阳望京
tb1,2022-02-20 10:23:20.836,9,2.699478524,1,abcABC123,北京朝阳望京
tb1,2022-02-20 11:09:59.739,7,7.906723716,1,abcABC123,北京朝阳望京
tb1,2022-02-20 19:08:29.315,1,5.852338895,1,abcABC123,北京朝阳望京
tb1,2022-02-20 22:10:06.243,10,5.535007901,1,abcABC123,北京朝阳望京
tb1,2022-02-20 23:52:43.683,10,10.642013185,1,abcABC123,北京朝阳望京
1 tb1 2022-02-20 04:05:59.255 5 8.591868744 1 abcABC123 北京朝阳望京
2 tb1 2022-02-20 04:58:47.068 3 1.489693641 1 abcABC123 北京朝阳望京
3 tb1 2022-02-20 06:31:09.408 1 4.026500719 1 abcABC123 北京朝阳望京
4 tb1 2022-02-20 08:08:00.336 1 9.606400360 1 abcABC123 北京朝阳望京
5 tb1 2022-02-20 08:28:58.053 9 7.872178184 1 abcABC123123 北京朝阳望京
6 tb1 2022-02-20 10:23:20.836 9 2.699478524 1 abcABC123 北京朝阳望京
7 tb1 2022-02-20 11:09:59.739 7 7.906723716 1 abcABC123 北京朝阳望京
8 tb1 2022-02-20 19:08:29.315 1 5.852338895 1 abcABC123 北京朝阳望京
9 tb1 2022-02-20 22:10:06.243 10 5.535007901 1 abcABC123 北京朝阳望京
10 tb1 2022-02-20 23:52:43.683 10 10.642013185 1 abcABC123 北京朝阳望京