mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-03 06:21:01 +08:00
[TS-1280]<feature>: add parameter where and querySql
This commit is contained in:
parent
bcbe242a29
commit
8d29a621be
@ -12,6 +12,8 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。
|
|||||||
|
|
||||||
### 3.1 配置样例
|
### 3.1 配置样例
|
||||||
|
|
||||||
|
* 配置一个从 TDengine 抽取数据作业:
|
||||||
|
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"job": {
|
"job": {
|
||||||
@ -27,7 +29,9 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。
|
|||||||
"table": [
|
"table": [
|
||||||
"meters"
|
"meters"
|
||||||
],
|
],
|
||||||
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/test?timestampFormat=TIMESTAMP"
|
"jdbcUrl": [
|
||||||
|
"jdbc:TAOS-RS://192.168.56.105:6041/test?timestampFormat=TIMESTAMP"
|
||||||
|
]
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"column": [
|
"column": [
|
||||||
@ -36,6 +40,7 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。
|
|||||||
"voltage",
|
"voltage",
|
||||||
"phase"
|
"phase"
|
||||||
],
|
],
|
||||||
|
"where": "ts>=0",
|
||||||
"beginDateTime": "2017-07-14 10:40:00",
|
"beginDateTime": "2017-07-14 10:40:00",
|
||||||
"endDateTime": "2017-08-14 10:40:00"
|
"endDateTime": "2017-08-14 10:40:00"
|
||||||
}
|
}
|
||||||
@ -58,6 +63,48 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
* 配置一个自定义 SQL 的数据抽取作业:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "tdenginereader",
|
||||||
|
"parameter": {
|
||||||
|
"user": "root",
|
||||||
|
"password": "taosdata",
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"querySql": [
|
||||||
|
"select * from test.meters"
|
||||||
|
],
|
||||||
|
"jdbcUrl": [
|
||||||
|
"jdbc:TAOS-RS://192.168.56.105:6041/test?timestampFormat=TIMESTAMP"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "streamwriter",
|
||||||
|
"parameter": {
|
||||||
|
"encoding": "UTF-8",
|
||||||
|
"print": true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
### 3.2 参数说明
|
### 3.2 参数说明
|
||||||
|
|
||||||
* **username**
|
* **username**
|
||||||
@ -68,25 +115,37 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。
|
|||||||
* 描述:TDengine 实例的密码 <br />
|
* 描述:TDengine 实例的密码 <br />
|
||||||
* 必选:是 <br />
|
* 必选:是 <br />
|
||||||
* 默认值:无 <br />
|
* 默认值:无 <br />
|
||||||
* **table**
|
|
||||||
* 描述:所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构,
|
|
||||||
TDengineReader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。<br />
|
|
||||||
* 必选:是 <br />
|
|
||||||
* 默认值:无 <br />
|
|
||||||
* **jdbcUrl**
|
* **jdbcUrl**
|
||||||
* 描述:TDengine 数据库的JDBC连接信息。注意,jdbcUrl必须包含在connection配置单元中。JdbcUrl具体请参看TDengine官方文档。
|
* 描述:TDengine 数据库的JDBC连接信息。注意,jdbcUrl必须包含在connection配置单元中。JdbcUrl具体请参看TDengine官方文档。
|
||||||
* 必选:是 <br />
|
* 必选:是 <br />
|
||||||
* 默认值:无<br />
|
* 默认值:无<br />
|
||||||
|
* **querySql**
|
||||||
|
* 描述:在有些业务场景下,where 这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了 querySql 后, TDengineReader 就会忽略 table, column,
|
||||||
|
where, beginDateTime, endDateTime这些配置型,直接使用这个配置项的内容对数据进行筛选。例如需要 进行多表join后同步数据,使用 select a,b from table_a join
|
||||||
|
table_b on table_a.id = table_b.id<br />
|
||||||
|
* 必选:否 <br />
|
||||||
|
* 默认值:无 <br />
|
||||||
|
* **table**
|
||||||
|
* 描述:所选取的需要同步的表。使用 JSON 的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一 schema 结构, TDengineReader不予检查表是否同一逻辑表。注意,table必须包含在
|
||||||
|
connection 配置单元中。<br />
|
||||||
|
* 必选:是 <br />
|
||||||
|
* 默认值:无 <br />
|
||||||
|
* **where**
|
||||||
|
* 描述:筛选条件中的 where 子句,TDengineReader 根据指定的column, table, where, begingDateTime, endDateTime 条件拼接 SQL,并根据这个 SQL
|
||||||
|
进行数据抽取。 <br />
|
||||||
|
* 必选:否 <br />
|
||||||
|
* 默认值:无 <br />
|
||||||
* **beginDateTime**
|
* **beginDateTime**
|
||||||
* 描述:数据的开始时间,Job迁移从begineDateTime到endDateTime的数据,格式为yyyy-MM-dd HH:mm:ss,如果不填为全量同步 <br />
|
* 描述:数据的开始时间,Job 迁移从 begineDateTime 到 endDateTime 的数据,格式为 yyyy-MM-dd HH:mm:ss <br />
|
||||||
* 必选:否 <br />
|
* 必选:否 <br />
|
||||||
* 默认值:无 <br />
|
* 默认值:无 <br />
|
||||||
* **endDateTime**
|
* **endDateTime**
|
||||||
* 描述:数据的结束时间,Job迁移从begineDateTime到endDateTime的数据,格式为yyyy-MM-dd HH:mm:ss,如果不填为全量同步 <br />
|
* 描述:数据的结束时间,Job 迁移从 begineDateTime 到 endDateTime 的数据,格式为 yyyy-MM-dd HH:mm:ss <br />
|
||||||
* 必选:否 <br />
|
* 必选:否 <br />
|
||||||
* 默认值:无 <br />
|
* 默认值:无 <br />
|
||||||
|
|
||||||
### 3.3 类型转换
|
### 3.3 类型转换
|
||||||
|
|
||||||
| TDengine 数据类型 | DataX 内部类型 |
|
| TDengine 数据类型 | DataX 内部类型 |
|
||||||
| --------------- | ------------- |
|
| --------------- | ------------- |
|
||||||
| TINYINT | Long |
|
| TINYINT | Long |
|
||||||
|
@ -15,6 +15,8 @@ import java.sql.*;
|
|||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class TDengineReader extends Reader {
|
public class TDengineReader extends Reader {
|
||||||
@ -41,26 +43,30 @@ public class TDengineReader extends Reader {
|
|||||||
"The parameter [" + Key.PASSWORD + "] is not set.");
|
"The parameter [" + Key.PASSWORD + "] is not set.");
|
||||||
|
|
||||||
// check connection
|
// check connection
|
||||||
List<Object> connection = this.originalConfig.getList(Key.CONNECTION);
|
List<Configuration> connectionList = this.originalConfig.getListConfiguration(Key.CONNECTION);
|
||||||
if (connection == null || connection.isEmpty())
|
if (connectionList == null || connectionList.isEmpty())
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
|
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
|
||||||
"The parameter [" + Key.CONNECTION + "] is not set.");
|
"The parameter [" + Key.CONNECTION + "] is not set.");
|
||||||
for (int i = 0; i < connection.size(); i++) {
|
for (int i = 0; i < connectionList.size(); i++) {
|
||||||
Configuration conn = Configuration.from(connection.get(i).toString());
|
Configuration conn = connectionList.get(i);
|
||||||
|
// check jdbcUrl
|
||||||
|
List<Object> jdbcUrlList = conn.getList(Key.JDBC_URL);
|
||||||
|
if (jdbcUrlList == null || jdbcUrlList.isEmpty()) {
|
||||||
|
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
|
||||||
|
"The parameter [" + Key.JDBC_URL + "] of connection[" + (i + 1) + "] is not set.");
|
||||||
|
}
|
||||||
|
// check table/querySql
|
||||||
|
List<Object> querySqlList = conn.getList(Key.QUERY_SQL);
|
||||||
|
if (querySqlList == null || querySqlList.isEmpty()) {
|
||||||
|
String querySql = conn.getString(Key.QUERY_SQL);
|
||||||
|
if (StringUtils.isBlank(querySql)) {
|
||||||
List<Object> table = conn.getList(Key.TABLE);
|
List<Object> table = conn.getList(Key.TABLE);
|
||||||
if (table == null || table.isEmpty())
|
if (table == null || table.isEmpty())
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
|
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
|
||||||
"The parameter [" + Key.TABLE + "] of connection[" + (i + 1) + "] is not set.");
|
"The parameter [" + Key.TABLE + "] of connection[" + (i + 1) + "] is not set.");
|
||||||
String jdbcUrl = conn.getString(Key.JDBC_URL);
|
|
||||||
if (StringUtils.isBlank(jdbcUrl))
|
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
|
|
||||||
"The parameter [" + Key.JDBC_URL + "] of connection[" + (i + 1) + "] is not set.");
|
|
||||||
}
|
}
|
||||||
// check column
|
}
|
||||||
List<Object> column = this.originalConfig.getList(Key.COLUMN);
|
}
|
||||||
if (column == null || column.isEmpty())
|
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
|
|
||||||
"The parameter [" + Key.CONNECTION + "] is not set or is empty.");
|
|
||||||
|
|
||||||
SimpleDateFormat format = new SimpleDateFormat(DATETIME_FORMAT);
|
SimpleDateFormat format = new SimpleDateFormat(DATETIME_FORMAT);
|
||||||
// check beginDateTime
|
// check beginDateTime
|
||||||
@ -100,16 +106,18 @@ public class TDengineReader extends Reader {
|
|||||||
public List<Configuration> split(int adviceNumber) {
|
public List<Configuration> split(int adviceNumber) {
|
||||||
List<Configuration> configurations = new ArrayList<>();
|
List<Configuration> configurations = new ArrayList<>();
|
||||||
|
|
||||||
List<Object> connectionList = this.originalConfig.getList(Key.CONNECTION);
|
List<Configuration> connectionList = this.originalConfig.getListConfiguration(Key.CONNECTION);
|
||||||
for (Object conn : connectionList) {
|
for (Configuration conn : connectionList) {
|
||||||
|
List<String> jdbcUrlList = conn.getList(Key.JDBC_URL, String.class);
|
||||||
|
for (String jdbcUrl : jdbcUrlList) {
|
||||||
Configuration clone = this.originalConfig.clone();
|
Configuration clone = this.originalConfig.clone();
|
||||||
Configuration conf = Configuration.from(conn.toString());
|
|
||||||
String jdbcUrl = conf.getString(Key.JDBC_URL);
|
|
||||||
clone.set(Key.JDBC_URL, jdbcUrl);
|
clone.set(Key.JDBC_URL, jdbcUrl);
|
||||||
clone.set(Key.TABLE, conf.getList(Key.TABLE));
|
clone.set(Key.TABLE, conn.getList(Key.TABLE));
|
||||||
|
clone.set(Key.QUERY_SQL, conn.getList(Key.QUERY_SQL));
|
||||||
clone.remove(Key.CONNECTION);
|
clone.remove(Key.CONNECTION);
|
||||||
configurations.add(clone);
|
configurations.add(clone);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
LOG.info("Configuration: {}", configurations);
|
LOG.info("Configuration: {}", configurations);
|
||||||
return configurations;
|
return configurations;
|
||||||
@ -120,12 +128,15 @@ public class TDengineReader extends Reader {
|
|||||||
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
|
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
|
||||||
|
|
||||||
private Configuration readerSliceConfig;
|
private Configuration readerSliceConfig;
|
||||||
|
private String mandatoryEncoding;
|
||||||
private Connection conn;
|
private Connection conn;
|
||||||
|
|
||||||
private List<String> tables;
|
private List<String> tables;
|
||||||
private List<String> columns;
|
private List<String> columns;
|
||||||
|
|
||||||
private String startTime;
|
private String startTime;
|
||||||
private String endTime;
|
private String endTime;
|
||||||
|
private String where;
|
||||||
|
private List<String> querySql;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
try {
|
try {
|
||||||
@ -141,12 +152,10 @@ public class TDengineReader extends Reader {
|
|||||||
this.readerSliceConfig = super.getPluginJobConf();
|
this.readerSliceConfig = super.getPluginJobConf();
|
||||||
LOG.info("getPluginJobConf: {}", readerSliceConfig);
|
LOG.info("getPluginJobConf: {}", readerSliceConfig);
|
||||||
|
|
||||||
String url = readerSliceConfig.getString(Key.JDBC_URL);
|
|
||||||
if (StringUtils.isBlank(url))
|
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
|
|
||||||
"The parameter [" + Key.JDBC_URL + "] is not set.");
|
|
||||||
String user = readerSliceConfig.getString(Key.USERNAME);
|
String user = readerSliceConfig.getString(Key.USERNAME);
|
||||||
String password = readerSliceConfig.getString(Key.PASSWORD);
|
String password = readerSliceConfig.getString(Key.PASSWORD);
|
||||||
|
|
||||||
|
String url = readerSliceConfig.getString(Key.JDBC_URL);
|
||||||
try {
|
try {
|
||||||
this.conn = DriverManager.getConnection(url, user, password);
|
this.conn = DriverManager.getConnection(url, user, password);
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
@ -158,6 +167,9 @@ public class TDengineReader extends Reader {
|
|||||||
this.columns = readerSliceConfig.getList(Key.COLUMN, String.class);
|
this.columns = readerSliceConfig.getList(Key.COLUMN, String.class);
|
||||||
this.startTime = readerSliceConfig.getString(Key.BEGIN_DATETIME);
|
this.startTime = readerSliceConfig.getString(Key.BEGIN_DATETIME);
|
||||||
this.endTime = readerSliceConfig.getString(Key.END_DATETIME);
|
this.endTime = readerSliceConfig.getString(Key.END_DATETIME);
|
||||||
|
this.where = readerSliceConfig.getString(Key.WHERE, "_c0 > " + Long.MIN_VALUE);
|
||||||
|
this.querySql = readerSliceConfig.getList(Key.QUERY_SQL, String.class);
|
||||||
|
this.mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, "UTF-8");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -167,26 +179,30 @@ public class TDengineReader extends Reader {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void startRead(RecordSender recordSender) {
|
public void startRead(RecordSender recordSender) {
|
||||||
|
List<String> sqlList = new ArrayList<>();
|
||||||
|
|
||||||
try (Statement stmt = conn.createStatement()) {
|
if (querySql == null || querySql.isEmpty()) {
|
||||||
for (String table : tables) {
|
for (String table : tables) {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
sb.append("select ").append(StringUtils.join(columns, ","))
|
sb.append("select ").append(StringUtils.join(columns, ",")).append(" from ").append(table).append(" ");
|
||||||
.append(" from ").append(table).append(" ");
|
sb.append("where ").append(where);
|
||||||
|
if (!StringUtils.isBlank(startTime)) {
|
||||||
if (StringUtils.isBlank(startTime)) {
|
sb.append(" and _c0 >= '").append(startTime).append("'");
|
||||||
sb.append("where _c0 >= ").append(Long.MIN_VALUE);
|
|
||||||
} else {
|
|
||||||
sb.append("where _c0 >= '").append(startTime).append("'");
|
|
||||||
}
|
}
|
||||||
if (!StringUtils.isBlank(endTime)) {
|
if (!StringUtils.isBlank(endTime)) {
|
||||||
sb.append(" and _c0 < '").append(endTime).append("'");
|
sb.append(" and _c0 < '").append(endTime).append("'");
|
||||||
}
|
}
|
||||||
|
String sql = sb.toString().trim();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
sqlList.addAll(querySql);
|
||||||
|
}
|
||||||
|
|
||||||
String sql = sb.toString();
|
try (Statement stmt = conn.createStatement()) {
|
||||||
|
for (String sql : sqlList) {
|
||||||
ResultSet rs = stmt.executeQuery(sql);
|
ResultSet rs = stmt.executeQuery(sql);
|
||||||
while (rs.next()) {
|
while (rs.next()) {
|
||||||
Record record = buildRecord(recordSender, rs, "UTF-8");
|
Record record = buildRecord(recordSender, rs, mandatoryEncoding);
|
||||||
recordSender.sendToWriter(record);
|
recordSender.sendToWriter(record);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -239,13 +255,14 @@ public class TDengineReader extends Reader {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (SQLException | UnsupportedEncodingException e) {
|
} catch (SQLException e) {
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e);
|
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "database query error!", e);
|
||||||
|
} catch (UnsupportedEncodingException e) {
|
||||||
|
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "illegal mandatoryEncoding", e);
|
||||||
}
|
}
|
||||||
return record;
|
return record;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -8,7 +8,9 @@
|
|||||||
"table": [
|
"table": [
|
||||||
""
|
""
|
||||||
],
|
],
|
||||||
"jdbcUrl": ""
|
"jdbcUrl": [
|
||||||
|
""
|
||||||
|
]
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"column": [
|
"column": [
|
||||||
@ -16,6 +18,6 @@
|
|||||||
],
|
],
|
||||||
"beginDateTime": "",
|
"beginDateTime": "",
|
||||||
"endDateTime": "",
|
"endDateTime": "",
|
||||||
"splitInterval": ""
|
"where": ""
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1,7 +1,6 @@
|
|||||||
package com.alibaba.datax.plugin.reader;
|
package com.alibaba.datax.plugin.reader;
|
||||||
|
|
||||||
import com.alibaba.datax.core.Engine;
|
import com.alibaba.datax.core.Engine;
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -0,0 +1,66 @@
|
|||||||
|
package com.alibaba.datax.plugin.reader;
|
||||||
|
|
||||||
|
import com.alibaba.datax.core.Engine;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.DriverManager;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.sql.Statement;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
@Ignore
|
||||||
|
public class TDengine2StreamTest {
|
||||||
|
|
||||||
|
private static final String host = "192.168.56.105";
|
||||||
|
private static final Random random = new Random(System.currentTimeMillis());
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void case01() throws Throwable {
|
||||||
|
// given
|
||||||
|
prepare("ms");
|
||||||
|
|
||||||
|
// when
|
||||||
|
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2stream-1.json"};
|
||||||
|
System.setProperty("datax.home", "../target/datax/datax");
|
||||||
|
Engine.entry(params);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void case02() throws Throwable {
|
||||||
|
// given
|
||||||
|
prepare("ms");
|
||||||
|
|
||||||
|
// when
|
||||||
|
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2stream-2.json"};
|
||||||
|
System.setProperty("datax.home", "../target/datax/datax");
|
||||||
|
Engine.entry(params);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void prepare(String precision) throws SQLException {
|
||||||
|
final String url = "jdbc:TAOS-RS://" + host + ":6041/";
|
||||||
|
try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) {
|
||||||
|
Statement stmt = conn.createStatement();
|
||||||
|
|
||||||
|
stmt.execute("drop database if exists db1");
|
||||||
|
stmt.execute("create database if not exists db1 precision '" + precision + "'");
|
||||||
|
stmt.execute("create table db1.stb1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, " +
|
||||||
|
"f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, t3 smallint, " +
|
||||||
|
"t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(100))");
|
||||||
|
|
||||||
|
for (int i = 1; i <= 10; i++) {
|
||||||
|
stmt.execute("insert into db1.tb" + i + " using db1.stb1 tags(now, " + random.nextInt(10) + "," +
|
||||||
|
random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextInt(10) + "," +
|
||||||
|
random.nextFloat() + "," + random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123'," +
|
||||||
|
"'北京朝阳望京') values(now+" + i + "s, " + random.nextInt(10) + "," + random.nextInt(10) + "," +
|
||||||
|
+random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextFloat() + "," +
|
||||||
|
random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123','北京朝阳望京')");
|
||||||
|
}
|
||||||
|
stmt.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
@ -16,8 +16,9 @@ public class TDengineReaderTest {
|
|||||||
Configuration configuration = Configuration.from("{" +
|
Configuration configuration = Configuration.from("{" +
|
||||||
"\"username\": \"root\"," +
|
"\"username\": \"root\"," +
|
||||||
"\"password\": \"taosdata\"," +
|
"\"password\": \"taosdata\"," +
|
||||||
"\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," +
|
"\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," +
|
||||||
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," +
|
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," +
|
||||||
|
"\"where\":\"_c0 > 0\"," +
|
||||||
"\"beginDateTime\": \"2021-01-01 00:00:00\"," +
|
"\"beginDateTime\": \"2021-01-01 00:00:00\"," +
|
||||||
"\"endDateTime\": \"2021-01-01 12:00:00\"" +
|
"\"endDateTime\": \"2021-01-01 12:00:00\"" +
|
||||||
"}");
|
"}");
|
||||||
@ -32,9 +33,10 @@ public class TDengineReaderTest {
|
|||||||
Assert.assertEquals("root", conf.getString(Key.USERNAME));
|
Assert.assertEquals("root", conf.getString(Key.USERNAME));
|
||||||
Assert.assertEquals("taosdata", conf.getString("password"));
|
Assert.assertEquals("taosdata", conf.getString("password"));
|
||||||
Assert.assertEquals("weather", conf.getString("connection[0].table[0]"));
|
Assert.assertEquals("weather", conf.getString("connection[0].table[0]"));
|
||||||
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl"));
|
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl[0]"));
|
||||||
Assert.assertEquals("2021-01-01 00:00:00", conf.getString("beginDateTime"));
|
Assert.assertEquals("2021-01-01 00:00:00", conf.getString("beginDateTime"));
|
||||||
Assert.assertEquals("2021-01-01 12:00:00", conf.getString("endDateTime"));
|
Assert.assertEquals("2021-01-01 12:00:00", conf.getString("endDateTime"));
|
||||||
|
Assert.assertEquals("_c0 > 0", conf.getString("where"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -45,8 +47,7 @@ public class TDengineReaderTest {
|
|||||||
Configuration configuration = Configuration.from("{" +
|
Configuration configuration = Configuration.from("{" +
|
||||||
"\"username\": \"root\"," +
|
"\"username\": \"root\"," +
|
||||||
"\"password\": \"taosdata\"," +
|
"\"password\": \"taosdata\"," +
|
||||||
"\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," +
|
"\"connection\": [{\"querySql\":[\"select * from weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," +
|
||||||
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]" +
|
|
||||||
"}");
|
"}");
|
||||||
job.setPluginJobConf(configuration);
|
job.setPluginJobConf(configuration);
|
||||||
|
|
||||||
@ -58,8 +59,8 @@ public class TDengineReaderTest {
|
|||||||
|
|
||||||
Assert.assertEquals("root", conf.getString(Key.USERNAME));
|
Assert.assertEquals("root", conf.getString(Key.USERNAME));
|
||||||
Assert.assertEquals("taosdata", conf.getString("password"));
|
Assert.assertEquals("taosdata", conf.getString("password"));
|
||||||
Assert.assertEquals("weather", conf.getString("connection[0].table[0]"));
|
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl[0]"));
|
||||||
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl"));
|
Assert.assertEquals("select * from weather", conf.getString("connection[0].querySql[0]"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -69,8 +70,9 @@ public class TDengineReaderTest {
|
|||||||
Configuration configuration = Configuration.from("{" +
|
Configuration configuration = Configuration.from("{" +
|
||||||
"\"username\": \"root\"," +
|
"\"username\": \"root\"," +
|
||||||
"\"password\": \"taosdata\"," +
|
"\"password\": \"taosdata\"," +
|
||||||
"\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," +
|
"\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," +
|
||||||
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," +
|
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," +
|
||||||
|
"\"where\":\"_c0 > 0\"," +
|
||||||
"\"beginDateTime\": \"2021-01-01 00:00:00\"," +
|
"\"beginDateTime\": \"2021-01-01 00:00:00\"," +
|
||||||
"\"endDateTime\": \"2021-01-01 12:00:00\"" +
|
"\"endDateTime\": \"2021-01-01 12:00:00\"" +
|
||||||
"}");
|
"}");
|
||||||
@ -85,6 +87,7 @@ public class TDengineReaderTest {
|
|||||||
Configuration conf = configurationList.get(0);
|
Configuration conf = configurationList.get(0);
|
||||||
Assert.assertEquals("root", conf.getString("username"));
|
Assert.assertEquals("root", conf.getString("username"));
|
||||||
Assert.assertEquals("taosdata", conf.getString("password"));
|
Assert.assertEquals("taosdata", conf.getString("password"));
|
||||||
|
Assert.assertEquals("_c0 > 0", conf.getString("where"));
|
||||||
Assert.assertEquals("weather", conf.getString("table[0]"));
|
Assert.assertEquals("weather", conf.getString("table[0]"));
|
||||||
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl"));
|
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl"));
|
||||||
|
|
||||||
@ -97,7 +100,7 @@ public class TDengineReaderTest {
|
|||||||
Configuration configuration = Configuration.from("{" +
|
Configuration configuration = Configuration.from("{" +
|
||||||
"\"username\": \"root\"," +
|
"\"username\": \"root\"," +
|
||||||
"\"password\": \"taosdata\"," +
|
"\"password\": \"taosdata\"," +
|
||||||
"\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," +
|
"\"connection\": [{\"querySql\":[\"select * from weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," +
|
||||||
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," +
|
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," +
|
||||||
"}");
|
"}");
|
||||||
job.setPluginJobConf(configuration);
|
job.setPluginJobConf(configuration);
|
||||||
@ -111,8 +114,40 @@ public class TDengineReaderTest {
|
|||||||
Configuration conf = configurationList.get(0);
|
Configuration conf = configurationList.get(0);
|
||||||
Assert.assertEquals("root", conf.getString("username"));
|
Assert.assertEquals("root", conf.getString("username"));
|
||||||
Assert.assertEquals("taosdata", conf.getString("password"));
|
Assert.assertEquals("taosdata", conf.getString("password"));
|
||||||
Assert.assertEquals("weather", conf.getString("table[0]"));
|
Assert.assertEquals("select * from weather", conf.getString("querySql[0]"));
|
||||||
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl"));
|
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void jobSplit_case03() {
|
||||||
|
// given
|
||||||
|
TDengineReader.Job job = new TDengineReader.Job();
|
||||||
|
Configuration configuration = Configuration.from("{" +
|
||||||
|
"\"username\": \"root\"," +
|
||||||
|
"\"password\": \"taosdata\"," +
|
||||||
|
"\"connection\": [{\"querySql\":[\"select * from weather\",\"select * from test.meters\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\", \"jdbc:TAOS://master:6030/test\"]}]," +
|
||||||
|
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," +
|
||||||
|
"}");
|
||||||
|
job.setPluginJobConf(configuration);
|
||||||
|
|
||||||
|
// when
|
||||||
|
job.init();
|
||||||
|
List<Configuration> configurationList = job.split(1);
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.assertEquals(2, configurationList.size());
|
||||||
|
Configuration conf = configurationList.get(0);
|
||||||
|
Assert.assertEquals("root", conf.getString("username"));
|
||||||
|
Assert.assertEquals("taosdata", conf.getString("password"));
|
||||||
|
Assert.assertEquals("select * from weather", conf.getString("querySql[0]"));
|
||||||
|
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl"));
|
||||||
|
|
||||||
|
Configuration conf1 = configurationList.get(1);
|
||||||
|
Assert.assertEquals("root", conf1.getString("username"));
|
||||||
|
Assert.assertEquals("taosdata", conf1.getString("password"));
|
||||||
|
Assert.assertEquals("select * from weather", conf1.getString("querySql[0]"));
|
||||||
|
Assert.assertEquals("select * from test.meters", conf1.getString("querySql[1]"));
|
||||||
|
Assert.assertEquals("jdbc:TAOS://master:6030/test", conf1.getString("jdbcUrl"));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
@ -15,7 +15,9 @@
|
|||||||
"table": [
|
"table": [
|
||||||
"stb1"
|
"stb1"
|
||||||
],
|
],
|
||||||
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1"
|
"jdbcUrl": [
|
||||||
|
"jdbc:TAOS-RS://192.168.56.105:6041/db1"
|
||||||
|
]
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
47
tdenginereader/src/test/resources/t2stream-1.json
Normal file
47
tdenginereader/src/test/resources/t2stream-1.json
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "tdenginereader",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "taosdata",
|
||||||
|
"column": [
|
||||||
|
"ts",
|
||||||
|
"f1",
|
||||||
|
"f2",
|
||||||
|
"t1",
|
||||||
|
"t2"
|
||||||
|
],
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"stb1"
|
||||||
|
],
|
||||||
|
"jdbcUrl": [
|
||||||
|
"jdbc:TAOS-RS://192.168.56.105:6041/db1"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"where": "t10 = '北京朝阳望京'",
|
||||||
|
"beginDateTime": "2022-03-07 12:00:00",
|
||||||
|
"endDateTime": "2022-03-07 19:00:00"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "streamwriter",
|
||||||
|
"parameter": {
|
||||||
|
"encoding": "UTF-8",
|
||||||
|
"print": true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
37
tdenginereader/src/test/resources/t2stream-2.json
Normal file
37
tdenginereader/src/test/resources/t2stream-2.json
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "tdenginereader",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "taosdata",
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"querySql": [
|
||||||
|
"select * from stb1 where t10 = '北京朝阳望京' and _c0 >= '2022-03-07 12:00:00' and _c0 < '2022-03-07 19:00:00'"
|
||||||
|
],
|
||||||
|
"jdbcUrl": [
|
||||||
|
"jdbc:TAOS-RS://192.168.56.105:6041/db1"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "streamwriter",
|
||||||
|
"parameter": {
|
||||||
|
"encoding": "UTF-8",
|
||||||
|
"print": true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -12,4 +12,7 @@ public class Key {
|
|||||||
|
|
||||||
public static final String BEGIN_DATETIME = "beginDateTime";
|
public static final String BEGIN_DATETIME = "beginDateTime";
|
||||||
public static final String END_DATETIME = "endDateTime";
|
public static final String END_DATETIME = "endDateTime";
|
||||||
|
public static final String WHERE = "where";
|
||||||
|
public static final String QUERY_SQL = "querySql";
|
||||||
|
public static final String MANDATORY_ENCODING = "mandatoryEncoding";
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user