diff --git a/tdenginereader/doc/tdenginereader-CN.md b/tdenginereader/doc/tdenginereader-CN.md index fe0e4877..aa3751ef 100644 --- a/tdenginereader/doc/tdenginereader-CN.md +++ b/tdenginereader/doc/tdenginereader-CN.md @@ -6,12 +6,14 @@ TDengineReader 插件实现了 TDengine 读取数据的功能。 ## 2 实现原理 -TDengineReader 通过TDengine的JDBC driver查询获取数据。 +TDengineReader 通过 TDengine 的 JDBC driver 查询获取数据。 ## 3 功能说明 ### 3.1 配置样例 +* 配置一个从 TDengine 抽取数据作业: + ```json { "job": { @@ -27,7 +29,9 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。 "table": [ "meters" ], - "jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/test?timestampFormat=TIMESTAMP" + "jdbcUrl": [ + "jdbc:TAOS-RS://192.168.56.105:6041/test?timestampFormat=TIMESTAMP" + ] } ], "column": [ @@ -36,6 +40,7 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。 "voltage", "phase" ], + "where": "ts>=0", "beginDateTime": "2017-07-14 10:40:00", "endDateTime": "2017-08-14 10:40:00" } @@ -58,35 +63,89 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。 } ``` +* 配置一个自定义 SQL 的数据抽取作业: + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "tdenginereader", + "parameter": { + "user": "root", + "password": "taosdata", + "connection": [ + { + "querySql": [ + "select * from test.meters" + ], + "jdbcUrl": [ + "jdbc:TAOS-RS://192.168.56.105:6041/test?timestampFormat=TIMESTAMP" + ] + } + ] + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "encoding": "UTF-8", + "print": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} +``` + ### 3.2 参数说明 * **username** - * 描述:TDengine实例的用户名
+ * 描述:TDengine 实例的用户名
* 必选:是
* 默认值:无
* **password** - * 描述:TDengine实例的密码
+ * 描述:TDengine 实例的密码
* 必选:是
* 默认值:无
-* **table** - * 描述:所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构, - TDengineReader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。
- * 必选:是
- * 默认值:无
* **jdbcUrl** - * 描述:TDengine数据库的JDBC连接信息。注意,jdbcUrl必须包含在connection配置单元中。JdbcUrl具体请参看TDengine官方文档。 + * 描述:TDengine 数据库的JDBC连接信息。注意,jdbcUrl必须包含在connection配置单元中。JdbcUrl具体请参看TDengine官方文档。 * 必选:是
* 默认值:无
+* **querySql** + * 描述:在有些业务场景下,where 这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了 querySql 后, TDengineReader 就会忽略 table, column, + where, beginDateTime, endDateTime这些配置型,直接使用这个配置项的内容对数据进行筛选。例如需要 进行多表join后同步数据,使用 select a,b from table_a join + table_b on table_a.id = table_b.id
+ * 必选:否
+ * 默认值:无
+* **table** + * 描述:所选取的需要同步的表。使用 JSON 的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一 schema 结构, TDengineReader不予检查表是否同一逻辑表。注意,table必须包含在 + connection 配置单元中。
+ * 必选:是
+ * 默认值:无
+* **where** + * 描述:筛选条件中的 where 子句,TDengineReader 根据指定的column, table, where, begingDateTime, endDateTime 条件拼接 SQL,并根据这个 SQL + 进行数据抽取。
+ * 必选:否
+ * 默认值:无
* **beginDateTime** - * 描述:数据的开始时间,Job迁移从begineDateTime到endDateTime的数据,格式为yyyy-MM-dd HH:mm:ss,如果不填为全量同步
+ * 描述:数据的开始时间,Job 迁移从 begineDateTime 到 endDateTime 的数据,格式为 yyyy-MM-dd HH:mm:ss
* 必选:否
* 默认值:无
* **endDateTime** - * 描述:数据的结束时间,Job迁移从begineDateTime到endDateTime的数据,格式为yyyy-MM-dd HH:mm:ss,如果不填为全量同步
+ * 描述:数据的结束时间,Job 迁移从 begineDateTime 到 endDateTime 的数据,格式为 yyyy-MM-dd HH:mm:ss
* 必选:否
* 默认值:无
### 3.3 类型转换 + | TDengine 数据类型 | DataX 内部类型 | | --------------- | ------------- | | TINYINT | Long | diff --git a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java index 0cf214c1..35c82bed 100644 --- a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java +++ b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java @@ -15,6 +15,8 @@ import java.sql.*; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; public class TDengineReader extends Reader { @@ -41,26 +43,30 @@ public class TDengineReader extends Reader { "The parameter [" + Key.PASSWORD + "] is not set."); // check connection - List connection = this.originalConfig.getList(Key.CONNECTION); - if (connection == null || connection.isEmpty()) + List connectionList = this.originalConfig.getListConfiguration(Key.CONNECTION); + if (connectionList == null || connectionList.isEmpty()) throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.CONNECTION + "] is not set."); - for (int i = 0; i < connection.size(); i++) { - Configuration conn = Configuration.from(connection.get(i).toString()); - List table = conn.getList(Key.TABLE); - if (table == null || table.isEmpty()) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, - "The parameter [" + Key.TABLE + "] of connection[" + (i + 1) + "] is not set."); - String jdbcUrl = conn.getString(Key.JDBC_URL); - if (StringUtils.isBlank(jdbcUrl)) + for (int i = 0; i < connectionList.size(); i++) { + Configuration conn = connectionList.get(i); + // check jdbcUrl + List jdbcUrlList = conn.getList(Key.JDBC_URL); + if (jdbcUrlList == null || jdbcUrlList.isEmpty()) { throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.JDBC_URL + "] of connection[" + (i + 1) + "] is not set."); + } + // check table/querySql + List querySqlList = conn.getList(Key.QUERY_SQL); + if (querySqlList == null || querySqlList.isEmpty()) { + String querySql = conn.getString(Key.QUERY_SQL); + if (StringUtils.isBlank(querySql)) { + List table = conn.getList(Key.TABLE); + if (table == null || table.isEmpty()) + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.TABLE + "] of connection[" + (i + 1) + "] is not set."); + } + } } - // check column - List column = this.originalConfig.getList(Key.COLUMN); - if (column == null || column.isEmpty()) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, - "The parameter [" + Key.CONNECTION + "] is not set or is empty."); SimpleDateFormat format = new SimpleDateFormat(DATETIME_FORMAT); // check beginDateTime @@ -100,15 +106,17 @@ public class TDengineReader extends Reader { public List split(int adviceNumber) { List configurations = new ArrayList<>(); - List connectionList = this.originalConfig.getList(Key.CONNECTION); - for (Object conn : connectionList) { - Configuration clone = this.originalConfig.clone(); - Configuration conf = Configuration.from(conn.toString()); - String jdbcUrl = conf.getString(Key.JDBC_URL); - clone.set(Key.JDBC_URL, jdbcUrl); - clone.set(Key.TABLE, conf.getList(Key.TABLE)); - clone.remove(Key.CONNECTION); - configurations.add(clone); + List connectionList = this.originalConfig.getListConfiguration(Key.CONNECTION); + for (Configuration conn : connectionList) { + List jdbcUrlList = conn.getList(Key.JDBC_URL, String.class); + for (String jdbcUrl : jdbcUrlList) { + Configuration clone = this.originalConfig.clone(); + clone.set(Key.JDBC_URL, jdbcUrl); + clone.set(Key.TABLE, conn.getList(Key.TABLE)); + clone.set(Key.QUERY_SQL, conn.getList(Key.QUERY_SQL)); + clone.remove(Key.CONNECTION); + configurations.add(clone); + } } LOG.info("Configuration: {}", configurations); @@ -120,12 +128,15 @@ public class TDengineReader extends Reader { private static final Logger LOG = LoggerFactory.getLogger(Task.class); private Configuration readerSliceConfig; + private String mandatoryEncoding; private Connection conn; + private List tables; private List columns; - private String startTime; private String endTime; + private String where; + private List querySql; static { try { @@ -141,12 +152,10 @@ public class TDengineReader extends Reader { this.readerSliceConfig = super.getPluginJobConf(); LOG.info("getPluginJobConf: {}", readerSliceConfig); - String url = readerSliceConfig.getString(Key.JDBC_URL); - if (StringUtils.isBlank(url)) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, - "The parameter [" + Key.JDBC_URL + "] is not set."); String user = readerSliceConfig.getString(Key.USERNAME); String password = readerSliceConfig.getString(Key.PASSWORD); + + String url = readerSliceConfig.getString(Key.JDBC_URL); try { this.conn = DriverManager.getConnection(url, user, password); } catch (SQLException e) { @@ -158,6 +167,9 @@ public class TDengineReader extends Reader { this.columns = readerSliceConfig.getList(Key.COLUMN, String.class); this.startTime = readerSliceConfig.getString(Key.BEGIN_DATETIME); this.endTime = readerSliceConfig.getString(Key.END_DATETIME); + this.where = readerSliceConfig.getString(Key.WHERE, "_c0 > " + Long.MIN_VALUE); + this.querySql = readerSliceConfig.getList(Key.QUERY_SQL, String.class); + this.mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, "UTF-8"); } @Override @@ -167,26 +179,30 @@ public class TDengineReader extends Reader { @Override public void startRead(RecordSender recordSender) { + List sqlList = new ArrayList<>(); - try (Statement stmt = conn.createStatement()) { + if (querySql == null || querySql.isEmpty()) { for (String table : tables) { StringBuilder sb = new StringBuilder(); - sb.append("select ").append(StringUtils.join(columns, ",")) - .append(" from ").append(table).append(" "); - - if (StringUtils.isBlank(startTime)) { - sb.append("where _c0 >= ").append(Long.MIN_VALUE); - } else { - sb.append("where _c0 >= '").append(startTime).append("'"); + sb.append("select ").append(StringUtils.join(columns, ",")).append(" from ").append(table).append(" "); + sb.append("where ").append(where); + if (!StringUtils.isBlank(startTime)) { + sb.append(" and _c0 >= '").append(startTime).append("'"); } if (!StringUtils.isBlank(endTime)) { sb.append(" and _c0 < '").append(endTime).append("'"); } + String sql = sb.toString().trim(); + } + } else { + sqlList.addAll(querySql); + } - String sql = sb.toString(); + try (Statement stmt = conn.createStatement()) { + for (String sql : sqlList) { ResultSet rs = stmt.executeQuery(sql); while (rs.next()) { - Record record = buildRecord(recordSender, rs, "UTF-8"); + Record record = buildRecord(recordSender, rs, mandatoryEncoding); recordSender.sendToWriter(record); } } @@ -239,13 +255,14 @@ public class TDengineReader extends Reader { break; } } - } catch (SQLException | UnsupportedEncodingException e) { - throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e); + } catch (SQLException e) { + throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "database query error!", e); + } catch (UnsupportedEncodingException e) { + throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "illegal mandatoryEncoding", e); } return record; } } - } diff --git a/tdenginereader/src/main/resources/plugin_job_template.json b/tdenginereader/src/main/resources/plugin_job_template.json index 2c7f4cb9..934fe96a 100644 --- a/tdenginereader/src/main/resources/plugin_job_template.json +++ b/tdenginereader/src/main/resources/plugin_job_template.json @@ -8,7 +8,9 @@ "table": [ "" ], - "jdbcUrl": "" + "jdbcUrl": [ + "" + ] } ], "column": [ @@ -16,6 +18,6 @@ ], "beginDateTime": "", "endDateTime": "", - "splitInterval": "" + "where": "" } } \ No newline at end of file diff --git a/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengine2DMTest.java b/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengine2DMTest.java index d9099e01..e1064717 100644 --- a/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengine2DMTest.java +++ b/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengine2DMTest.java @@ -1,7 +1,6 @@ package com.alibaba.datax.plugin.reader; import com.alibaba.datax.core.Engine; -import org.junit.Before; import org.junit.Ignore; import org.junit.Test; diff --git a/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengine2StreamTest.java b/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengine2StreamTest.java new file mode 100644 index 00000000..f628a648 --- /dev/null +++ b/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengine2StreamTest.java @@ -0,0 +1,66 @@ +package com.alibaba.datax.plugin.reader; + +import com.alibaba.datax.core.Engine; +import org.junit.Ignore; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Random; + +@Ignore +public class TDengine2StreamTest { + + private static final String host = "192.168.56.105"; + private static final Random random = new Random(System.currentTimeMillis()); + + @Test + public void case01() throws Throwable { + // given + prepare("ms"); + + // when + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2stream-1.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + + @Test + public void case02() throws Throwable { + // given + prepare("ms"); + + // when + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2stream-2.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + + + private void prepare(String precision) throws SQLException { + final String url = "jdbc:TAOS-RS://" + host + ":6041/"; + try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) { + Statement stmt = conn.createStatement(); + + stmt.execute("drop database if exists db1"); + stmt.execute("create database if not exists db1 precision '" + precision + "'"); + stmt.execute("create table db1.stb1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, " + + "f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, t3 smallint, " + + "t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(100))"); + + for (int i = 1; i <= 10; i++) { + stmt.execute("insert into db1.tb" + i + " using db1.stb1 tags(now, " + random.nextInt(10) + "," + + random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextInt(10) + "," + + random.nextFloat() + "," + random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123'," + + "'北京朝阳望京') values(now+" + i + "s, " + random.nextInt(10) + "," + random.nextInt(10) + "," + + +random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextFloat() + "," + + random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123','北京朝阳望京')"); + } + stmt.close(); + } + } + + +} diff --git a/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengineReaderTest.java b/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengineReaderTest.java index da980c5f..491ddbaf 100644 --- a/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengineReaderTest.java +++ b/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengineReaderTest.java @@ -16,8 +16,9 @@ public class TDengineReaderTest { Configuration configuration = Configuration.from("{" + "\"username\": \"root\"," + "\"password\": \"taosdata\"," + - "\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," + + "\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," + "\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," + + "\"where\":\"_c0 > 0\"," + "\"beginDateTime\": \"2021-01-01 00:00:00\"," + "\"endDateTime\": \"2021-01-01 12:00:00\"" + "}"); @@ -32,9 +33,10 @@ public class TDengineReaderTest { Assert.assertEquals("root", conf.getString(Key.USERNAME)); Assert.assertEquals("taosdata", conf.getString("password")); Assert.assertEquals("weather", conf.getString("connection[0].table[0]")); - Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl")); + Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl[0]")); Assert.assertEquals("2021-01-01 00:00:00", conf.getString("beginDateTime")); Assert.assertEquals("2021-01-01 12:00:00", conf.getString("endDateTime")); + Assert.assertEquals("_c0 > 0", conf.getString("where")); } @@ -45,8 +47,7 @@ public class TDengineReaderTest { Configuration configuration = Configuration.from("{" + "\"username\": \"root\"," + "\"password\": \"taosdata\"," + - "\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," + - "\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]" + + "\"connection\": [{\"querySql\":[\"select * from weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," + "}"); job.setPluginJobConf(configuration); @@ -58,8 +59,8 @@ public class TDengineReaderTest { Assert.assertEquals("root", conf.getString(Key.USERNAME)); Assert.assertEquals("taosdata", conf.getString("password")); - Assert.assertEquals("weather", conf.getString("connection[0].table[0]")); - Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl")); + Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl[0]")); + Assert.assertEquals("select * from weather", conf.getString("connection[0].querySql[0]")); } @Test @@ -69,8 +70,9 @@ public class TDengineReaderTest { Configuration configuration = Configuration.from("{" + "\"username\": \"root\"," + "\"password\": \"taosdata\"," + - "\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," + + "\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," + "\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," + + "\"where\":\"_c0 > 0\"," + "\"beginDateTime\": \"2021-01-01 00:00:00\"," + "\"endDateTime\": \"2021-01-01 12:00:00\"" + "}"); @@ -85,6 +87,7 @@ public class TDengineReaderTest { Configuration conf = configurationList.get(0); Assert.assertEquals("root", conf.getString("username")); Assert.assertEquals("taosdata", conf.getString("password")); + Assert.assertEquals("_c0 > 0", conf.getString("where")); Assert.assertEquals("weather", conf.getString("table[0]")); Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl")); @@ -97,7 +100,7 @@ public class TDengineReaderTest { Configuration configuration = Configuration.from("{" + "\"username\": \"root\"," + "\"password\": \"taosdata\"," + - "\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," + + "\"connection\": [{\"querySql\":[\"select * from weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," + "\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," + "}"); job.setPluginJobConf(configuration); @@ -111,8 +114,40 @@ public class TDengineReaderTest { Configuration conf = configurationList.get(0); Assert.assertEquals("root", conf.getString("username")); Assert.assertEquals("taosdata", conf.getString("password")); - Assert.assertEquals("weather", conf.getString("table[0]")); + Assert.assertEquals("select * from weather", conf.getString("querySql[0]")); Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl")); } + @Test + public void jobSplit_case03() { + // given + TDengineReader.Job job = new TDengineReader.Job(); + Configuration configuration = Configuration.from("{" + + "\"username\": \"root\"," + + "\"password\": \"taosdata\"," + + "\"connection\": [{\"querySql\":[\"select * from weather\",\"select * from test.meters\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\", \"jdbc:TAOS://master:6030/test\"]}]," + + "\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," + + "}"); + job.setPluginJobConf(configuration); + + // when + job.init(); + List configurationList = job.split(1); + + // assert + Assert.assertEquals(2, configurationList.size()); + Configuration conf = configurationList.get(0); + Assert.assertEquals("root", conf.getString("username")); + Assert.assertEquals("taosdata", conf.getString("password")); + Assert.assertEquals("select * from weather", conf.getString("querySql[0]")); + Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl")); + + Configuration conf1 = configurationList.get(1); + Assert.assertEquals("root", conf1.getString("username")); + Assert.assertEquals("taosdata", conf1.getString("password")); + Assert.assertEquals("select * from weather", conf1.getString("querySql[0]")); + Assert.assertEquals("select * from test.meters", conf1.getString("querySql[1]")); + Assert.assertEquals("jdbc:TAOS://master:6030/test", conf1.getString("jdbcUrl")); + } + } \ No newline at end of file diff --git a/tdenginereader/src/test/resources/t2dm.json b/tdenginereader/src/test/resources/t2dm.json index b2cf91e2..d87ade0c 100644 --- a/tdenginereader/src/test/resources/t2dm.json +++ b/tdenginereader/src/test/resources/t2dm.json @@ -15,7 +15,9 @@ "table": [ "stb1" ], - "jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1" + "jdbcUrl": [ + "jdbc:TAOS-RS://192.168.56.105:6041/db1" + ] } ] } diff --git a/tdenginereader/src/test/resources/t2stream-1.json b/tdenginereader/src/test/resources/t2stream-1.json new file mode 100644 index 00000000..183ab7e2 --- /dev/null +++ b/tdenginereader/src/test/resources/t2stream-1.json @@ -0,0 +1,47 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "tdenginereader", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "ts", + "f1", + "f2", + "t1", + "t2" + ], + "connection": [ + { + "table": [ + "stb1" + ], + "jdbcUrl": [ + "jdbc:TAOS-RS://192.168.56.105:6041/db1" + ] + } + ], + "where": "t10 = '北京朝阳望京'", + "beginDateTime": "2022-03-07 12:00:00", + "endDateTime": "2022-03-07 19:00:00" + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "encoding": "UTF-8", + "print": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginereader/src/test/resources/t2stream-2.json b/tdenginereader/src/test/resources/t2stream-2.json new file mode 100644 index 00000000..15bfe9be --- /dev/null +++ b/tdenginereader/src/test/resources/t2stream-2.json @@ -0,0 +1,37 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "tdenginereader", + "parameter": { + "username": "root", + "password": "taosdata", + "connection": [ + { + "querySql": [ + "select * from stb1 where t10 = '北京朝阳望京' and _c0 >= '2022-03-07 12:00:00' and _c0 < '2022-03-07 19:00:00'" + ], + "jdbcUrl": [ + "jdbc:TAOS-RS://192.168.56.105:6041/db1" + ] + } + ] + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "encoding": "UTF-8", + "print": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java index b95ff584..1a9358db 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java @@ -12,4 +12,7 @@ public class Key { public static final String BEGIN_DATETIME = "beginDateTime"; public static final String END_DATETIME = "endDateTime"; + public static final String WHERE = "where"; + public static final String QUERY_SQL = "querySql"; + public static final String MANDATORY_ENCODING = "mandatoryEncoding"; } \ No newline at end of file