diff --git a/core/src/main/java/com/alibaba/datax/core/Engine.java b/core/src/main/java/com/alibaba/datax/core/Engine.java index f80d792f..be21512b 100755 --- a/core/src/main/java/com/alibaba/datax/core/Engine.java +++ b/core/src/main/java/com/alibaba/datax/core/Engine.java @@ -73,7 +73,7 @@ public class Engine { boolean traceEnable = allConf.getBool(CoreConstant.DATAX_CORE_CONTAINER_TRACE_ENABLE, true); boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true); - //standlone模式的datax shell任务不进行汇报 + //standalone模式的 datax shell任务不进行汇报 if(instanceId == -1){ perfReportEnable = false; } diff --git a/tdenginereader/doc/tdenginereader.md b/tdenginereader/doc/tdenginereader-CN.md similarity index 55% rename from tdenginereader/doc/tdenginereader.md rename to tdenginereader/doc/tdenginereader-CN.md index 3c683a64..aa3751ef 100644 --- a/tdenginereader/doc/tdenginereader.md +++ b/tdenginereader/doc/tdenginereader-CN.md @@ -6,12 +6,14 @@ TDengineReader 插件实现了 TDengine 读取数据的功能。 ## 2 实现原理 -TDengineReader 通过TDengine的JDBC driver查询获取数据。 +TDengineReader 通过 TDengine 的 JDBC driver 查询获取数据。 ## 3 功能说明 ### 3.1 配置样例 +* 配置一个从 TDengine 抽取数据作业: + ```json { "job": { @@ -27,7 +29,9 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。 "table": [ "meters" ], - "jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/test?timestampFormat=TIMESTAMP" + "jdbcUrl": [ + "jdbc:TAOS-RS://192.168.56.105:6041/test?timestampFormat=TIMESTAMP" + ] } ], "column": [ @@ -36,9 +40,51 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。 "voltage", "phase" ], + "where": "ts>=0", "beginDateTime": "2017-07-14 10:40:00", - "endDateTime": "2017-08-14 10:40:00", - "splitInterval": "1d" + "endDateTime": "2017-08-14 10:40:00" + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "encoding": "UTF-8", + "print": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} +``` + +* 配置一个自定义 SQL 的数据抽取作业: + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "tdenginereader", + "parameter": { + "user": "root", + "password": "taosdata", + "connection": [ + { + "querySql": [ + "select * from test.meters" + ], + "jdbcUrl": [ + "jdbc:TAOS-RS://192.168.56.105:6041/test?timestampFormat=TIMESTAMP" + ] + } + ] } }, "writer": { @@ -62,37 +108,44 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。 ### 3.2 参数说明 * **username** - * 描述:TDengine实例的用户名
+ * 描述:TDengine 实例的用户名
* 必选:是
* 默认值:无
* **password** - * 描述:TDengine实例的密码
+ * 描述:TDengine 实例的密码
* 必选:是
* 默认值:无
-* **table** - * 描述:所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构, - TDengineReader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。
- * 必选:是
- * 默认值:无
* **jdbcUrl** - * 描述:TDengine数据库的JDBC连接信息。注意,jdbcUrl必须包含在connection配置单元中。JdbcUrl具体请参看TDengine官方文档。 + * 描述:TDengine 数据库的JDBC连接信息。注意,jdbcUrl必须包含在connection配置单元中。JdbcUrl具体请参看TDengine官方文档。 * 必选:是
* 默认值:无
+* **querySql** + * 描述:在有些业务场景下,where 这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了 querySql 后, TDengineReader 就会忽略 table, column, + where, beginDateTime, endDateTime这些配置型,直接使用这个配置项的内容对数据进行筛选。例如需要 进行多表join后同步数据,使用 select a,b from table_a join + table_b on table_a.id = table_b.id
+ * 必选:否
+ * 默认值:无
+* **table** + * 描述:所选取的需要同步的表。使用 JSON 的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一 schema 结构, TDengineReader不予检查表是否同一逻辑表。注意,table必须包含在 + connection 配置单元中。
+ * 必选:是
+ * 默认值:无
+* **where** + * 描述:筛选条件中的 where 子句,TDengineReader 根据指定的column, table, where, begingDateTime, endDateTime 条件拼接 SQL,并根据这个 SQL + 进行数据抽取。
+ * 必选:否
+ * 默认值:无
* **beginDateTime** - * 描述:数据的开始时间,Job迁移从begineDateTime到endDateTime的数据,格式为yyyy-MM-dd HH:mm:ss
- * 必选:是
+ * 描述:数据的开始时间,Job 迁移从 begineDateTime 到 endDateTime 的数据,格式为 yyyy-MM-dd HH:mm:ss
+ * 必选:否
* 默认值:无
* **endDateTime** - * 描述:数据的结束时间,Job迁移从begineDateTime到endDateTime的数据,格式为yyyy-MM-dd HH:mm:ss
- * 必选:是
- * 默认值:无
-* **splitInterval** - * 描述:按照splitInterval来划分task, 每splitInterval创建一个task。例如,20d代表按照每20天的数据划分为1个task。 - 可以配置的时间单位为:d(天), h(小时), m(分钟), s(秒)
- * 必选:是
+ * 描述:数据的结束时间,Job 迁移从 begineDateTime 到 endDateTime 的数据,格式为 yyyy-MM-dd HH:mm:ss
+ * 必选:否
* 默认值:无
### 3.3 类型转换 + | TDengine 数据类型 | DataX 内部类型 | | --------------- | ------------- | | TINYINT | Long | @@ -106,7 +159,6 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。 | BINARY | Bytes | | NCHAR | String | - ## 4 性能报告 ### 4.1 环境准备 diff --git a/tdenginereader/pom.xml b/tdenginereader/pom.xml index ca444bce..319152f8 100644 --- a/tdenginereader/pom.xml +++ b/tdenginereader/pom.xml @@ -29,10 +29,29 @@ + + com.alibaba + fastjson + 1.2.78 + + + + com.alibaba.datax.tdenginewriter + tdenginewriter + 0.0.1-SNAPSHOT + compile + + com.taosdata.jdbc taos-jdbcdriver 2.0.37 + + + com.alibaba + fastjson + + @@ -47,6 +66,21 @@ 0.0.1-SNAPSHOT compile + + + com.alibaba.datax + datax-core + 0.0.1-SNAPSHOT + test + + + + + + + + + diff --git a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/Key.java b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/Key.java deleted file mode 100644 index 95b55386..00000000 --- a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/Key.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.alibaba.datax.plugin.reader; - -public class Key { - - public static final String JDBC_URL = "jdbcUrl"; -// public static final String HOST = "host"; -// public static final String PORT = "port"; -// public static final String DB = "db"; - public static final String TABLE = "table"; - public static final String USER = "username"; - public static final String PASSWORD = "password"; - public static final String CONNECTION = "connection"; -// public static final String SQL = "sql"; - public static final String BEGIN_DATETIME = "beginDateTime"; - public static final String END_DATETIME = "endDateTime"; - public static final String SPLIT_INTERVAL = "splitInterval"; - public static final String COLUMN = "column"; - public static final String MANDATORY_ENCODING = "mandatoryEncoding"; -} diff --git a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java index f00d879f..332ddf3a 100644 --- a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java +++ b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java @@ -1,13 +1,11 @@ package com.alibaba.datax.plugin.reader; -import com.alibaba.datax.common.constant.CommonConstant; import com.alibaba.datax.common.element.*; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.spi.Reader; import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.plugin.rdbms.util.DataBaseType; -import com.alibaba.fastjson.JSON; +import com.alibaba.datax.plugin.writer.tdenginewriter.Key; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,89 +15,86 @@ import java.sql.*; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; public class TDengineReader extends Reader { + private static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; public static class Job extends Reader.Job { - private static final Logger LOG = LoggerFactory.getLogger(Job.class); private Configuration originalConfig; @Override public void init() { this.originalConfig = super.getPluginJobConf(); - // check user - String user = this.originalConfig.getString(Key.USER); - if (StringUtils.isBlank(user)) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.USER + "] is not set."); + // check username + String username = this.originalConfig.getString(Key.USERNAME); + if (StringUtils.isBlank(username)) + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.USERNAME + "] is not set."); // check password String password = this.originalConfig.getString(Key.PASSWORD); if (StringUtils.isBlank(password)) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PASSWORD + "] is not set."); + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.PASSWORD + "] is not set."); + + // check connection + List connectionList = this.originalConfig.getListConfiguration(Key.CONNECTION); + if (connectionList == null || connectionList.isEmpty()) + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.CONNECTION + "] is not set."); + for (int i = 0; i < connectionList.size(); i++) { + Configuration conn = connectionList.get(i); + // check jdbcUrl + List jdbcUrlList = conn.getList(Key.JDBC_URL); + if (jdbcUrlList == null || jdbcUrlList.isEmpty()) { + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.JDBC_URL + "] of connection[" + (i + 1) + "] is not set."); + } + // check table/querySql + List querySqlList = conn.getList(Key.QUERY_SQL); + if (querySqlList == null || querySqlList.isEmpty()) { + String querySql = conn.getString(Key.QUERY_SQL); + if (StringUtils.isBlank(querySql)) { + List table = conn.getList(Key.TABLE); + if (table == null || table.isEmpty()) + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.TABLE + "] of connection[" + (i + 1) + "] is not set."); + } + } + } SimpleDateFormat format = new SimpleDateFormat(DATETIME_FORMAT); // check beginDateTime String beginDatetime = this.originalConfig.getString(Key.BEGIN_DATETIME); - if (StringUtils.isBlank(beginDatetime)) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.BEGIN_DATETIME + "] is not set."); - Long start; - try { - start = format.parse(beginDatetime).getTime(); - } catch (ParseException e) { - throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.BEGIN_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format."); + long start = Long.MIN_VALUE; + if (!StringUtils.isBlank(beginDatetime)) { + try { + start = format.parse(beginDatetime).getTime(); + } catch (ParseException e) { + throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, + "The parameter [" + Key.BEGIN_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format."); + } } - // check endDateTime String endDatetime = this.originalConfig.getString(Key.END_DATETIME); - if (StringUtils.isBlank(endDatetime)) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.END_DATETIME + "] is not set."); - Long end; - try { - end = format.parse(endDatetime).getTime(); - } catch (ParseException e) { - throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.END_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format."); + long end = Long.MAX_VALUE; + if (!StringUtils.isBlank(endDatetime)) { + try { + end = format.parse(endDatetime).getTime(); + } catch (ParseException e) { + throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, + "The parameter [" + Key.END_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format."); + } } if (start >= end) - throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.BEGIN_DATETIME + "] should be less than the parameter [" + Key.END_DATETIME + "]."); + throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, + "The parameter [" + Key.BEGIN_DATETIME + "] should be less than the parameter [" + Key.END_DATETIME + "]."); - // check splitInterval - String splitInterval = this.originalConfig.getString(Key.SPLIT_INTERVAL); - Long split; - if (StringUtils.isBlank(splitInterval)) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.SPLIT_INTERVAL + "] is not set."); - try { - split = parseSplitInterval(splitInterval); - } catch (Exception e) { - throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.SPLIT_INTERVAL + "] should be like: \"123d|h|m|s\", error: " + e.getMessage()); - } - - this.originalConfig.set(Key.BEGIN_DATETIME, start); - this.originalConfig.set(Key.END_DATETIME, end); - this.originalConfig.set(Key.SPLIT_INTERVAL, split); - - // check connection - List connection = this.originalConfig.getList(Key.CONNECTION); - if (connection == null || connection.isEmpty()) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.CONNECTION + "] is not set."); - for (int i = 0; i < connection.size(); i++) { - Configuration conn = Configuration.from(connection.get(i).toString()); - List table = conn.getList(Key.TABLE); - if (table == null || table.isEmpty()) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.TABLE + "] of connection[" + (i + 1) + "] is not set."); - String jdbcUrl = conn.getString(Key.JDBC_URL); - if (StringUtils.isBlank(jdbcUrl)) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.JDBC_URL + "] of connection[" + (i + 1) + "] is not set."); - } - - // check column - List column = this.originalConfig.getList(Key.COLUMN); - if (column == null || column.isEmpty()) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.CONNECTION + "] is not set or is empty."); } @Override @@ -110,78 +105,73 @@ public class TDengineReader extends Reader { @Override public List split(int adviceNumber) { List configurations = new ArrayList<>(); - // do split - Long start = this.originalConfig.getLong(Key.BEGIN_DATETIME); - Long end = this.originalConfig.getLong(Key.END_DATETIME); - Long split = this.originalConfig.getLong(Key.SPLIT_INTERVAL); - List conns = this.originalConfig.getList(Key.CONNECTION); - - for (Long ts = start; ts < end; ts += split) { - for (int i = 0; i < conns.size(); i++) { + List connectionList = this.originalConfig.getListConfiguration(Key.CONNECTION); + for (Configuration conn : connectionList) { + List jdbcUrlList = conn.getList(Key.JDBC_URL, String.class); + for (String jdbcUrl : jdbcUrlList) { Configuration clone = this.originalConfig.clone(); - clone.remove(Key.SPLIT_INTERVAL); - - clone.set(Key.BEGIN_DATETIME, ts); - clone.set(Key.END_DATETIME, Math.min(ts + split, end)); - - Configuration conf = Configuration.from(conns.get(i).toString()); - String jdbcUrl = conf.getString(Key.JDBC_URL); clone.set(Key.JDBC_URL, jdbcUrl); - clone.set(Key.TABLE, conf.getList(Key.TABLE)); - - // 抽取 jdbcUrl 中的 ip/port 进行资源使用的打标,以提供给 core 做有意义的 shuffle 操作 - clone.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, DataBaseType.parseIpFromJdbcUrl(jdbcUrl)); + clone.set(Key.TABLE, conn.getList(Key.TABLE)); + clone.set(Key.QUERY_SQL, conn.getList(Key.QUERY_SQL)); clone.remove(Key.CONNECTION); - configurations.add(clone); - LOG.info("Configuration: {}", JSON.toJSONString(clone)); } } + + LOG.info("Configuration: {}", configurations); return configurations; } } public static class Task extends Reader.Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); + + private Configuration readerSliceConfig; + private String mandatoryEncoding; private Connection conn; - private Long startTime; - private Long endTime; private List tables; private List columns; - private String mandatoryEncoding; + private String startTime; + private String endTime; + private String where; + private List querySql; + + static { + try { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + Class.forName("com.taosdata.jdbc.rs.RestfulDriver"); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + } @Override public void init() { - Configuration readerSliceConfig = super.getPluginJobConf(); - LOG.info("getPluginJobConf: {}", JSON.toJSONString(readerSliceConfig)); + this.readerSliceConfig = super.getPluginJobConf(); + LOG.info("getPluginJobConf: {}", readerSliceConfig); - String url = readerSliceConfig.getString(Key.JDBC_URL); - if (StringUtils.isBlank(url)) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, - "The parameter [" + Key.JDBC_URL + "] is not set."); - - tables = readerSliceConfig.getList(Key.TABLE, String.class); - columns = readerSliceConfig.getList(Key.COLUMN, String.class); - - String user = readerSliceConfig.getString(Key.USER); + String user = readerSliceConfig.getString(Key.USERNAME); String password = readerSliceConfig.getString(Key.PASSWORD); + String url = readerSliceConfig.getString(Key.JDBC_URL); try { - conn = DriverManager.getConnection(url, user, password); + this.conn = DriverManager.getConnection(url, user, password); } catch (SQLException e) { throw DataXException.asDataXException(TDengineReaderErrorCode.CONNECTION_FAILED, - "The parameter [" + Key.JDBC_URL + "] : " + url + " failed to connect since: " + e.getMessage()); + "The parameter [" + Key.JDBC_URL + "] : " + url + " failed to connect since: " + e.getMessage(), e); } - this.mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, ""); - - startTime = readerSliceConfig.getLong(Key.BEGIN_DATETIME); - endTime = readerSliceConfig.getLong(Key.END_DATETIME); + this.tables = readerSliceConfig.getList(Key.TABLE, String.class); + this.columns = readerSliceConfig.getList(Key.COLUMN, String.class); + this.startTime = readerSliceConfig.getString(Key.BEGIN_DATETIME); + this.endTime = readerSliceConfig.getString(Key.END_DATETIME); + this.where = readerSliceConfig.getString(Key.WHERE, "_c0 > " + Long.MIN_VALUE); + this.querySql = readerSliceConfig.getList(Key.QUERY_SQL, String.class); + this.mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, "UTF-8"); } - @Override public void destroy() { @@ -189,18 +179,36 @@ public class TDengineReader extends Reader { @Override public void startRead(RecordSender recordSender) { - try (Statement stmt = conn.createStatement()) { + List sqlList = new ArrayList<>(); + + if (querySql == null || querySql.isEmpty()) { for (String table : tables) { - String sql = "select " + StringUtils.join(columns, ",") + " from " + table - + " where _c0 >= " + startTime + " and _c0 < " + endTime; + StringBuilder sb = new StringBuilder(); + sb.append("select ").append(StringUtils.join(columns, ",")).append(" from ").append(table).append(" "); + sb.append("where ").append(where); + if (!StringUtils.isBlank(startTime)) { + sb.append(" and _c0 >= '").append(startTime).append("'"); + } + if (!StringUtils.isBlank(endTime)) { + sb.append(" and _c0 < '").append(endTime).append("'"); + } + String sql = sb.toString().trim(); + sqlList.add(sql); + } + } else { + sqlList.addAll(querySql); + } + + try (Statement stmt = conn.createStatement()) { + for (String sql : sqlList) { ResultSet rs = stmt.executeQuery(sql); - ResultSetMetaData metaData = rs.getMetaData(); while (rs.next()) { - transportOneRecord(recordSender, rs, metaData, metaData.getColumnCount(), this.mandatoryEncoding); + Record record = buildRecord(recordSender, rs, mandatoryEncoding); + recordSender.sendToWriter(record); } } } catch (SQLException e) { - throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e); + throw DataXException.asDataXException(TDengineReaderErrorCode.RUNTIME_EXCEPTION, e.getMessage(), e); } finally { try { if (conn != null) @@ -211,17 +219,11 @@ public class TDengineReader extends Reader { } } - private Record transportOneRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnCount, String mandatoryEncoding) { - Record record = buildRecord(recordSender, rs, metaData, columnCount, mandatoryEncoding); - recordSender.sendToWriter(record); - return record; - } - - private Record buildRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnCount, String mandatoryEncoding) { + private Record buildRecord(RecordSender recordSender, ResultSet rs, String mandatoryEncoding) { Record record = recordSender.createRecord(); - try { - for (int i = 1; i <= columnCount; i++) { + ResultSetMetaData metaData = rs.getMetaData(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { int columnType = metaData.getColumnType(i); switch (columnType) { case Types.SMALLINT: @@ -254,39 +256,14 @@ public class TDengineReader extends Reader { break; } } - } catch (SQLException | UnsupportedEncodingException e) { - throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e); + } catch (SQLException e) { + throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "database query error!", e); + } catch (UnsupportedEncodingException e) { + throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "illegal mandatoryEncoding", e); } return record; } } - private static final long second = 1000; - private static final long minute = 60 * second; - private static final long hour = 60 * minute; - private static final long day = 24 * hour; - - private static Long parseSplitInterval(String splitInterval) throws Exception { - Pattern compile = Pattern.compile("^(\\d+)([dhms])$"); - Matcher matcher = compile.matcher(splitInterval); - while (matcher.find()) { - Long value = Long.valueOf(matcher.group(1)); - if (value == 0) - throw new Exception("invalid splitInterval: 0"); - char unit = matcher.group(2).charAt(0); - switch (unit) { - case 'd': - return value * day; - default: - case 'h': - return value * hour; - case 'm': - return value * minute; - case 's': - return value * second; - } - } - throw new Exception("invalid splitInterval: " + splitInterval); - } } diff --git a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReaderErrorCode.java b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReaderErrorCode.java index 68bc11e7..b784ab06 100644 --- a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReaderErrorCode.java +++ b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReaderErrorCode.java @@ -4,9 +4,10 @@ import com.alibaba.datax.common.spi.ErrorCode; public enum TDengineReaderErrorCode implements ErrorCode { - REQUIRED_VALUE("TDengineReader-00", "缺失必要的值"), - ILLEGAL_VALUE("TDengineReader-01", "值非法"), - CONNECTION_FAILED("TDengineReader-02", "连接错误"); + REQUIRED_VALUE("TDengineReader-00", "parameter value is missing"), + ILLEGAL_VALUE("TDengineReader-01", "invalid parameter value"), + CONNECTION_FAILED("TDengineReader-02", "connection error"), + RUNTIME_EXCEPTION("TDengineWriter-03", "runtime exception"); private final String code; private final String description; diff --git a/tdenginereader/src/main/resources/plugin_job_template.json b/tdenginereader/src/main/resources/plugin_job_template.json index 2c7f4cb9..934fe96a 100644 --- a/tdenginereader/src/main/resources/plugin_job_template.json +++ b/tdenginereader/src/main/resources/plugin_job_template.json @@ -8,7 +8,9 @@ "table": [ "" ], - "jdbcUrl": "" + "jdbcUrl": [ + "" + ] } ], "column": [ @@ -16,6 +18,6 @@ ], "beginDateTime": "", "endDateTime": "", - "splitInterval": "" + "where": "" } } \ No newline at end of file diff --git a/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengine2DMTest.java b/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengine2DMTest.java new file mode 100644 index 00000000..e1064717 --- /dev/null +++ b/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengine2DMTest.java @@ -0,0 +1,86 @@ +package com.alibaba.datax.plugin.reader; + +import com.alibaba.datax.core.Engine; +import org.junit.Ignore; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Random; + +@Ignore +public class TDengine2DMTest { + private static final String host1 = "192.168.56.105"; + private static final String host2 = "192.168.0.72"; + + private final Random random = new Random(System.currentTimeMillis()); + + @Test + public void t2dm_case01() throws Throwable { + // given + createSupTable("ms"); + + // when + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2dm.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + + @Test + public void t2dm_case02() throws Throwable { + // given + createSupTable("us"); + + // when + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2dm.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + + @Test + public void t2dm_case03() throws Throwable { + // given + createSupTable("ns"); + + // when + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2dm.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + + private void createSupTable(String precision) throws SQLException { + final String url = "jdbc:TAOS-RS://" + host1 + ":6041/"; + try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) { + Statement stmt = conn.createStatement(); + + stmt.execute("drop database if exists db1"); + stmt.execute("create database if not exists db1 precision '" + precision + "'"); + stmt.execute("create table db1.stb1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, " + + "f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, t3 smallint, " + + "t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(100))"); + + for (int i = 1; i <= 10; i++) { + stmt.execute("insert into db1.tb" + i + " using db1.stb1 tags(now, " + random.nextInt(10) + "," + + random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextInt(10) + "," + + random.nextFloat() + "," + random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123'," + + "'北京朝阳望京') values(now+" + i + "s, " + random.nextInt(10) + "," + random.nextInt(10) + "," + + +random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextFloat() + "," + + random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123','北京朝阳望京')"); + } + stmt.close(); + } + + final String url2 = "jdbc:dm://" + host2 + ":5236"; + try (Connection conn = DriverManager.getConnection(url2, "TESTUSER", "test123456")) { + conn.setAutoCommit(true); + Statement stmt = conn.createStatement(); + stmt.execute("drop table if exists stb2"); + stmt.execute("create table stb2(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, " + + "f6 double, f7 BIT, f8 VARCHAR(100), f9 VARCHAR2(200), t1 timestamp, t2 tinyint, t3 smallint, " + + "t4 int, t5 bigint, t6 float, t7 double, t8 BIT, t9 VARCHAR(100), t10 VARCHAR2(200))"); + } + } + +} diff --git a/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengine2StreamTest.java b/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengine2StreamTest.java new file mode 100644 index 00000000..f628a648 --- /dev/null +++ b/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengine2StreamTest.java @@ -0,0 +1,66 @@ +package com.alibaba.datax.plugin.reader; + +import com.alibaba.datax.core.Engine; +import org.junit.Ignore; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Random; + +@Ignore +public class TDengine2StreamTest { + + private static final String host = "192.168.56.105"; + private static final Random random = new Random(System.currentTimeMillis()); + + @Test + public void case01() throws Throwable { + // given + prepare("ms"); + + // when + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2stream-1.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + + @Test + public void case02() throws Throwable { + // given + prepare("ms"); + + // when + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2stream-2.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + + + private void prepare(String precision) throws SQLException { + final String url = "jdbc:TAOS-RS://" + host + ":6041/"; + try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) { + Statement stmt = conn.createStatement(); + + stmt.execute("drop database if exists db1"); + stmt.execute("create database if not exists db1 precision '" + precision + "'"); + stmt.execute("create table db1.stb1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, " + + "f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, t3 smallint, " + + "t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(100))"); + + for (int i = 1; i <= 10; i++) { + stmt.execute("insert into db1.tb" + i + " using db1.stb1 tags(now, " + random.nextInt(10) + "," + + random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextInt(10) + "," + + random.nextFloat() + "," + random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123'," + + "'北京朝阳望京') values(now+" + i + "s, " + random.nextInt(10) + "," + random.nextInt(10) + "," + + +random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextFloat() + "," + + random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123','北京朝阳望京')"); + } + stmt.close(); + } + } + + +} diff --git a/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengineReaderTest.java b/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengineReaderTest.java index d51d4229..491ddbaf 100644 --- a/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengineReaderTest.java +++ b/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengineReaderTest.java @@ -1,72 +1,153 @@ package com.alibaba.datax.plugin.reader; import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.tdenginewriter.Key; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; -import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.List; public class TDengineReaderTest { - TDengineReader.Job job; - - @Before - public void before() { - job = new TDengineReader.Job(); + @Test + public void jobInit_case01() { + // given + TDengineReader.Job job = new TDengineReader.Job(); Configuration configuration = Configuration.from("{" + - "\"user\": \"root\"," + + "\"username\": \"root\"," + "\"password\": \"taosdata\"," + - "\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," + + "\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," + "\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," + + "\"where\":\"_c0 > 0\"," + "\"beginDateTime\": \"2021-01-01 00:00:00\"," + - "\"endDateTime\": \"2021-01-01 12:00:00\"," + - "\"splitInterval\": \"1h\"" + + "\"endDateTime\": \"2021-01-01 12:00:00\"" + "}"); job.setPluginJobConf(configuration); - } - @Test - public void jobInit() throws ParseException { // when job.init(); // assert Configuration conf = job.getPluginJobConf(); - Assert.assertEquals("root", conf.getString("user")); + Assert.assertEquals("root", conf.getString(Key.USERNAME)); Assert.assertEquals("taosdata", conf.getString("password")); Assert.assertEquals("weather", conf.getString("connection[0].table[0]")); - Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl")); + Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl[0]")); + Assert.assertEquals("2021-01-01 00:00:00", conf.getString("beginDateTime")); + Assert.assertEquals("2021-01-01 12:00:00", conf.getString("endDateTime")); + Assert.assertEquals("_c0 > 0", conf.getString("where")); + } - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - Long start = sdf.parse("2021-01-01 00:00:00").getTime(); - Assert.assertEquals(start, conf.getLong("beginDateTime")); + @Test + public void jobInit_case02() { + // given + TDengineReader.Job job = new TDengineReader.Job(); + Configuration configuration = Configuration.from("{" + + "\"username\": \"root\"," + + "\"password\": \"taosdata\"," + + "\"connection\": [{\"querySql\":[\"select * from weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," + + "}"); + job.setPluginJobConf(configuration); - Long end = sdf.parse("2021-01-01 12:00:00").getTime(); - Assert.assertEquals(end, conf.getLong("endDateTime")); + // when + job.init(); - Assert.assertEquals(new Long(3600 * 1000), conf.getLong("splitInterval")); + // assert + Configuration conf = job.getPluginJobConf(); + + Assert.assertEquals("root", conf.getString(Key.USERNAME)); + Assert.assertEquals("taosdata", conf.getString("password")); + Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl[0]")); + Assert.assertEquals("select * from weather", conf.getString("connection[0].querySql[0]")); } @Test - public void jobSplit() { + public void jobSplit_case01() { + // given + TDengineReader.Job job = new TDengineReader.Job(); + Configuration configuration = Configuration.from("{" + + "\"username\": \"root\"," + + "\"password\": \"taosdata\"," + + "\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," + + "\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," + + "\"where\":\"_c0 > 0\"," + + "\"beginDateTime\": \"2021-01-01 00:00:00\"," + + "\"endDateTime\": \"2021-01-01 12:00:00\"" + + "}"); + job.setPluginJobConf(configuration); + // when job.init(); List configurationList = job.split(1); // assert - Assert.assertEquals(12, configurationList.size()); - for (int i = 0; i < configurationList.size(); i++) { - Configuration conf = configurationList.get(i); - Assert.assertEquals("root", conf.getString("user")); - Assert.assertEquals("taosdata", conf.getString("password")); - Assert.assertEquals("weather", conf.getString("table[0]")); - Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl")); - } + Assert.assertEquals(1, configurationList.size()); + Configuration conf = configurationList.get(0); + Assert.assertEquals("root", conf.getString("username")); + Assert.assertEquals("taosdata", conf.getString("password")); + Assert.assertEquals("_c0 > 0", conf.getString("where")); + Assert.assertEquals("weather", conf.getString("table[0]")); + Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl")); + + } + + @Test + public void jobSplit_case02() { + // given + TDengineReader.Job job = new TDengineReader.Job(); + Configuration configuration = Configuration.from("{" + + "\"username\": \"root\"," + + "\"password\": \"taosdata\"," + + "\"connection\": [{\"querySql\":[\"select * from weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," + + "\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," + + "}"); + job.setPluginJobConf(configuration); + + // when + job.init(); + List configurationList = job.split(1); + + // assert + Assert.assertEquals(1, configurationList.size()); + Configuration conf = configurationList.get(0); + Assert.assertEquals("root", conf.getString("username")); + Assert.assertEquals("taosdata", conf.getString("password")); + Assert.assertEquals("select * from weather", conf.getString("querySql[0]")); + Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl")); + } + + @Test + public void jobSplit_case03() { + // given + TDengineReader.Job job = new TDengineReader.Job(); + Configuration configuration = Configuration.from("{" + + "\"username\": \"root\"," + + "\"password\": \"taosdata\"," + + "\"connection\": [{\"querySql\":[\"select * from weather\",\"select * from test.meters\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\", \"jdbc:TAOS://master:6030/test\"]}]," + + "\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," + + "}"); + job.setPluginJobConf(configuration); + + // when + job.init(); + List configurationList = job.split(1); + + // assert + Assert.assertEquals(2, configurationList.size()); + Configuration conf = configurationList.get(0); + Assert.assertEquals("root", conf.getString("username")); + Assert.assertEquals("taosdata", conf.getString("password")); + Assert.assertEquals("select * from weather", conf.getString("querySql[0]")); + Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl")); + + Configuration conf1 = configurationList.get(1); + Assert.assertEquals("root", conf1.getString("username")); + Assert.assertEquals("taosdata", conf1.getString("password")); + Assert.assertEquals("select * from weather", conf1.getString("querySql[0]")); + Assert.assertEquals("select * from test.meters", conf1.getString("querySql[1]")); + Assert.assertEquals("jdbc:TAOS://master:6030/test", conf1.getString("jdbcUrl")); } } \ No newline at end of file diff --git a/tdenginereader/src/test/resources/t2dm.json b/tdenginereader/src/test/resources/t2dm.json new file mode 100644 index 00000000..d87ade0c --- /dev/null +++ b/tdenginereader/src/test/resources/t2dm.json @@ -0,0 +1,52 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "tdenginereader", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "*" + ], + "connection": [ + { + "table": [ + "stb1" + ], + "jdbcUrl": [ + "jdbc:TAOS-RS://192.168.56.105:6041/db1" + ] + } + ] + } + }, + "writer": { + "name": "rdbmswriter", + "parameter": { + "connection": [ + { + "table": [ + "stb2" + ], + "jdbcUrl": "jdbc:dm://192.168.0.72:5236" + } + ], + "username": "TESTUSER", + "password": "test123456", + "table": "stb2", + "column": [ + "*" + ] + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginereader/src/test/resources/t2stream-1.json b/tdenginereader/src/test/resources/t2stream-1.json new file mode 100644 index 00000000..183ab7e2 --- /dev/null +++ b/tdenginereader/src/test/resources/t2stream-1.json @@ -0,0 +1,47 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "tdenginereader", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "ts", + "f1", + "f2", + "t1", + "t2" + ], + "connection": [ + { + "table": [ + "stb1" + ], + "jdbcUrl": [ + "jdbc:TAOS-RS://192.168.56.105:6041/db1" + ] + } + ], + "where": "t10 = '北京朝阳望京'", + "beginDateTime": "2022-03-07 12:00:00", + "endDateTime": "2022-03-07 19:00:00" + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "encoding": "UTF-8", + "print": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginereader/src/test/resources/t2stream-2.json b/tdenginereader/src/test/resources/t2stream-2.json new file mode 100644 index 00000000..15bfe9be --- /dev/null +++ b/tdenginereader/src/test/resources/t2stream-2.json @@ -0,0 +1,37 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "tdenginereader", + "parameter": { + "username": "root", + "password": "taosdata", + "connection": [ + { + "querySql": [ + "select * from stb1 where t10 = '北京朝阳望京' and _c0 >= '2022-03-07 12:00:00' and _c0 < '2022-03-07 19:00:00'" + ], + "jdbcUrl": [ + "jdbc:TAOS-RS://192.168.56.105:6041/db1" + ] + } + ] + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "encoding": "UTF-8", + "print": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginewriter/pom.xml b/tdenginewriter/pom.xml index 05fcd4d8..6a7ff251 100644 --- a/tdenginewriter/pom.xml +++ b/tdenginewriter/pom.xml @@ -68,6 +68,21 @@ 0.0.1-SNAPSHOT test + + mysql + mysql-connector-java + 5.1.49 + test + + + + + + + + + + diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java index 970be3a5..4ee91ce0 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java @@ -21,6 +21,7 @@ import java.util.stream.Collectors; public class DefaultDataHandler implements DataHandler { private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class); + private final TaskPluginCollector taskPluginCollector; private String username; private String password; private String jdbcUrl; @@ -47,7 +48,6 @@ public class DefaultDataHandler implements DataHandler { private Map> columnMetas; - static { try { Class.forName("com.taosdata.jdbc.TSDBDriver"); @@ -56,8 +56,8 @@ public class DefaultDataHandler implements DataHandler { e.printStackTrace(); } } - - public DefaultDataHandler(Configuration configuration) { + + public DefaultDataHandler(Configuration configuration, TaskPluginCollector taskPluginCollector) { this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME); this.password = configuration.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD); this.jdbcUrl = configuration.getString(Key.JDBC_URL); @@ -65,6 +65,7 @@ public class DefaultDataHandler implements DataHandler { this.tables = configuration.getList(Key.TABLE, String.class); this.columns = configuration.getList(Key.COLUMN, String.class); this.ignoreTagsUnmatched = configuration.getBool(Key.IGNORE_TAGS_UNMATCHED, Constants.DEFAULT_IGNORE_TAGS_UNMATCHED); + this.taskPluginCollector = taskPluginCollector; } @Override @@ -72,6 +73,7 @@ public class DefaultDataHandler implements DataHandler { int count = 0; int affectedRows = 0; + try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) { LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established."); // prepare table_name -> table_meta @@ -86,14 +88,24 @@ public class DefaultDataHandler implements DataHandler { if (i % batchSize != 0) { recordBatch.add(record); } else { - affectedRows = writeBatch(conn, recordBatch); + try { + affectedRows = writeBatch(conn, recordBatch); + } catch (SQLException e) { + LOG.warn("use one row insert. because:" + e.getMessage()); + affectedRows = writeEachRow(conn, recordBatch); + } recordBatch.clear(); } count++; } if (!recordBatch.isEmpty()) { - affectedRows = writeBatch(conn, recordBatch); + try { + affectedRows = writeBatch(conn, recordBatch); + } catch (SQLException e) { + LOG.warn("use one row insert. because:" + e.getMessage()); + affectedRows = writeEachRow(conn, recordBatch); + } recordBatch.clear(); } } catch (SQLException e) { @@ -107,6 +119,21 @@ public class DefaultDataHandler implements DataHandler { return affectedRows; } + private int writeEachRow(Connection conn, List recordBatch) { + int affectedRows = 0; + for (Record record : recordBatch) { + List recordList = new ArrayList<>(); + recordList.add(record); + try { + affectedRows += writeBatch(conn, recordList); + } catch (SQLException e) { + LOG.error(e.getMessage()); + this.taskPluginCollector.collectDirtyRecord(record, e); + } + } + return affectedRows; + } + /** * table: [ "stb1", "stb2", "tb1", "tb2", "t1" ] * stb1[ts,f1,f2] tags:[t1] @@ -118,7 +145,7 @@ public class DefaultDataHandler implements DataHandler { * 3. 对于tb,拼sql,例如:data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => insert into tb(ts, f1, f2) values(ts, f1, f2) * 4. 对于t,拼sql,例如:data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, f3, t1, t2] insert into t(ts, f1, f2, f3, t1, t2) values(ts, f1, f2, f3, t1, t2) */ - public int writeBatch(Connection conn, List recordBatch) { + public int writeBatch(Connection conn, List recordBatch) throws SQLException { int affectedRows = 0; for (String table : tables) { TableMeta tableMeta = tableMetas.get(table); @@ -146,7 +173,7 @@ public class DefaultDataHandler implements DataHandler { * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) */ - private int writeBatchToSupTableBySQL(Connection conn, String table, List recordBatch) { + private int writeBatchToSupTableBySQL(Connection conn, String table, List recordBatch) throws SQLException { List columnMetas = this.columnMetas.get(table); StringBuilder sb = new StringBuilder("insert into"); @@ -177,13 +204,11 @@ public class DefaultDataHandler implements DataHandler { return executeUpdate(conn, sql); } - private int executeUpdate(Connection conn, String sql) throws DataXException { + private int executeUpdate(Connection conn, String sql) throws SQLException { int count; try (Statement stmt = conn.createStatement()) { LOG.debug(">>> " + sql); count = stmt.executeUpdate(sql); - } catch (SQLException e) { - throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage()); } return count; } @@ -227,7 +252,7 @@ public class DefaultDataHandler implements DataHandler { * table: ["stb1"], column: ["ts", "f1", "f2", "t1"] * data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => schemaless: stb1,t1=t1 f1=f1,f2=f2 ts */ - private int writeBatchToSupTableBySchemaless(Connection conn, String table, List recordBatch) { + private int writeBatchToSupTableBySchemaless(Connection conn, String table, List recordBatch) throws SQLException { int count = 0; TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision(); @@ -296,11 +321,8 @@ public class DefaultDataHandler implements DataHandler { default: timestampType = SchemalessTimestampType.NOT_CONFIGURED; } - try { - writer.write(lines, SchemalessProtocolType.LINE, timestampType); - } catch (SQLException e) { - throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage()); - } + + writer.write(lines, SchemalessProtocolType.LINE, timestampType); LOG.warn("schemalessWriter does not return affected rows!"); return count; @@ -370,7 +392,7 @@ public class DefaultDataHandler implements DataHandler { * else * insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)]) */ - private int writeBatchToSubTable(Connection conn, String table, List recordBatch) { + private int writeBatchToSubTable(Connection conn, String table, List recordBatch) throws SQLException { List columnMetas = this.columnMetas.get(table); StringBuilder sb = new StringBuilder(); @@ -440,7 +462,7 @@ public class DefaultDataHandler implements DataHandler { * table: ["weather"], column: ["ts, f1, f2, f3, t1, t2"] * sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...) */ - private int writeBatchToNormalTable(Connection conn, String table, List recordBatch) { + private int writeBatchToNormalTable(Connection conn, String table, List recordBatch) throws SQLException { List columnMetas = this.columnMetas.get(table); StringBuilder sb = new StringBuilder(); diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java index 1d7ee214..1a9358db 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java @@ -9,4 +9,10 @@ public class Key { public static final String JDBC_URL = "jdbcUrl"; public static final String COLUMN = "column"; public static final String IGNORE_TAGS_UNMATCHED = "ignoreTagsUnmatched"; + + public static final String BEGIN_DATETIME = "beginDateTime"; + public static final String END_DATETIME = "endDateTime"; + public static final String WHERE = "where"; + public static final String QUERY_SQL = "querySql"; + public static final String MANDATORY_ENCODING = "mandatoryEncoding"; } \ No newline at end of file diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java index 63d67a2a..c48b7942 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java @@ -17,7 +17,7 @@ public class SchemaManager { private final Connection conn; private TimestampPrecision precision; - SchemaManager(Connection conn) { + public SchemaManager(Connection conn) { this.conn = conn; } diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java index 7cc76a77..73982744 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java @@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.writer.tdenginewriter; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.util.Configuration; import org.apache.commons.lang3.StringUtils; @@ -25,7 +26,7 @@ public class TDengineWriter extends Writer { this.originalConfig = super.getPluginJobConf(); this.originalConfig.set(PEER_PLUGIN_NAME, getPeerPluginName()); - // check user + // check username String user = this.originalConfig.getString(Key.USERNAME); if (StringUtils.isBlank(user)) throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter [" @@ -81,10 +82,12 @@ public class TDengineWriter extends Writer { private static final Logger LOG = LoggerFactory.getLogger(Task.class); private Configuration writerSliceConfig; + private TaskPluginCollector taskPluginCollector; @Override public void init() { this.writerSliceConfig = getPluginJobConf(); + this.taskPluginCollector = super.getTaskPluginCollector(); } @Override @@ -101,7 +104,7 @@ public class TDengineWriter extends Writer { if (peerPluginName.equals("opentsdbreader")) handler = new OpentsdbDataHandler(this.writerSliceConfig); else - handler = new DefaultDataHandler(this.writerSliceConfig); + handler = new DefaultDataHandler(this.writerSliceConfig, this.taskPluginCollector); long records = handler.handle(lineReceiver, getTaskPluginCollector()); LOG.debug("handle data finished, records: " + records); diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Csv2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Csv2TDengineTest.java new file mode 100644 index 00000000..7352c3ca --- /dev/null +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Csv2TDengineTest.java @@ -0,0 +1,41 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.core.Engine; +import org.junit.Ignore; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +@Ignore +public class Csv2TDengineTest { + + private static final String host = "192.168.56.105"; + + @Test + public void case01() throws Throwable { + // given + prepareTable(); + + // when + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/csv2t.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + + public void prepareTable() throws SQLException { + final String url = "jdbc:TAOS-RS://" + host + ":6041"; + try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) { + Statement stmt = conn.createStatement(); + + stmt.execute("drop database if exists test"); + stmt.execute("create database if not exists test"); + stmt.execute("create table test.weather (ts timestamp, temperature bigint, humidity double, is_normal bool) " + + "tags(device_id binary(10),address nchar(10))"); + } + } + + +} diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DM2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DM2TDengineTest.java index 2ee02ff3..b6932f60 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DM2TDengineTest.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DM2TDengineTest.java @@ -101,7 +101,7 @@ public class DM2TDengineTest { } @Before - public void before() throws SQLException, ClassNotFoundException { + public void before() throws SQLException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); long ts = System.currentTimeMillis(); diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java index 75debe3c..e0acacb8 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java @@ -4,6 +4,8 @@ import com.alibaba.datax.common.element.DateColumn; import com.alibaba.datax.common.element.LongColumn; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.core.transport.record.DefaultRecord; import org.junit.*; @@ -23,6 +25,8 @@ public class DefaultDataHandlerTest { private static final String host = "192.168.1.93"; private static Connection conn; + private final TaskPluginCollector taskPluginCollector = new TDengineWriter.Task().getTaskPluginCollector(); + @Test public void writeSupTableBySQL() throws SQLException { // given @@ -46,8 +50,9 @@ public class DefaultDataHandlerTest { return record; }).collect(Collectors.toList()); + // when - DefaultDataHandler handler = new DefaultDataHandler(configuration); + DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector); List tables = configuration.getList("table", String.class); SchemaManager schemaManager = new SchemaManager(conn); Map tableMetas = schemaManager.loadTableMeta(tables); @@ -85,7 +90,7 @@ public class DefaultDataHandlerTest { }).collect(Collectors.toList()); // when - DefaultDataHandler handler = new DefaultDataHandler(configuration); + DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector); List tables = configuration.getList("table", String.class); SchemaManager schemaManager = new SchemaManager(conn); Map tableMetas = schemaManager.loadTableMeta(tables); @@ -125,7 +130,7 @@ public class DefaultDataHandlerTest { }).collect(Collectors.toList()); // when - DefaultDataHandler handler = new DefaultDataHandler(configuration); + DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector); List tables = configuration.getList("table", String.class); SchemaManager schemaManager = new SchemaManager(connection); Map tableMetas = schemaManager.loadTableMeta(tables); @@ -164,7 +169,7 @@ public class DefaultDataHandlerTest { }).collect(Collectors.toList()); // when - DefaultDataHandler handler = new DefaultDataHandler(configuration); + DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector); List tables = configuration.getList("table", String.class); SchemaManager schemaManager = new SchemaManager(conn); Map tableMetas = schemaManager.loadTableMeta(tables); @@ -203,7 +208,7 @@ public class DefaultDataHandlerTest { }).collect(Collectors.toList()); // when - DefaultDataHandler handler = new DefaultDataHandler(configuration); + DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector); List tables = configuration.getList("table", String.class); SchemaManager schemaManager = new SchemaManager(conn); Map tableMetas = schemaManager.loadTableMeta(tables); @@ -242,7 +247,7 @@ public class DefaultDataHandlerTest { }).collect(Collectors.toList()); // when - DefaultDataHandler handler = new DefaultDataHandler(configuration); + DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector); List tables = configuration.getList("table", String.class); SchemaManager schemaManager = new SchemaManager(conn); Map tableMetas = schemaManager.loadTableMeta(tables); @@ -258,7 +263,7 @@ public class DefaultDataHandlerTest { } private void createSupAndSubTable() throws SQLException { - try(Statement stmt = conn.createStatement()){ + try (Statement stmt = conn.createStatement()) { stmt.execute("drop database if exists scm_test"); stmt.execute("create database if not exists scm_test"); stmt.execute("use scm_test"); @@ -273,7 +278,7 @@ public class DefaultDataHandlerTest { } private void createSupTable() throws SQLException { - try (Statement stmt = conn.createStatement()){ + try (Statement stmt = conn.createStatement()) { stmt.execute("drop database if exists scm_test"); stmt.execute("create database if not exists scm_test"); stmt.execute("use scm_test"); diff --git a/tdenginewriter/src/test/resources/csv2t.json b/tdenginewriter/src/test/resources/csv2t.json new file mode 100644 index 00000000..ef5c4d04 --- /dev/null +++ b/tdenginewriter/src/test/resources/csv2t.json @@ -0,0 +1,80 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "txtfilereader", + "parameter": { + "path": [ + "/Users/yangzy/IdeaProjects/DataX/tdenginewriter/src/test/resources/weather.csv" + ], + "encoding": "UTF-8", + "column": [ + { + "index": 0, + "type": "string" + }, + { + "index": 1, + "type": "date", + "format": "yyy-MM-dd HH:mm:ss.SSS" + }, + { + "index": 2, + "type": "long" + }, + { + "index": 3, + "type": "double" + }, + { + "index": 4, + "type": "long" + }, + { + "index": 5, + "type": "string" + }, + { + "index": 6, + "type": "String" + } + ], + "fieldDelimiter": "," + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "tbname", + "ts", + "temperature", + "humidity", + "is_normal", + "device_id", + "address" + ], + "connection": [ + { + "table": [ + "weather" + ], + "jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/test" + } + ], + "batchSize": 100, + "ignoreTagsUnmatched": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/weather.csv b/tdenginewriter/src/test/resources/weather.csv new file mode 100644 index 00000000..21c4a1aa --- /dev/null +++ b/tdenginewriter/src/test/resources/weather.csv @@ -0,0 +1,10 @@ +tb1,2022-02-20 04:05:59.255,5,8.591868744,1,abcABC123,北京朝阳望京 +tb1,2022-02-20 04:58:47.068,3,1.489693641,1,abcABC123,北京朝阳望京 +tb1,2022-02-20 06:31:09.408,1,4.026500719,1,abcABC123,北京朝阳望京 +tb1,2022-02-20 08:08:00.336,1,9.606400360,1,abcABC123,北京朝阳望京 +tb1,2022-02-20 08:28:58.053,9,7.872178184,1,abcABC123123,北京朝阳望京 +tb1,2022-02-20 10:23:20.836,9,2.699478524,1,abcABC123,北京朝阳望京 +tb1,2022-02-20 11:09:59.739,7,7.906723716,1,abcABC123,北京朝阳望京 +tb1,2022-02-20 19:08:29.315,1,5.852338895,1,abcABC123,北京朝阳望京 +tb1,2022-02-20 22:10:06.243,10,5.535007901,1,abcABC123,北京朝阳望京 +tb1,2022-02-20 23:52:43.683,10,10.642013185,1,abcABC123,北京朝阳望京