diff --git a/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestCreateData.java b/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestCreateData.java index 48f9b7ce..c521d4d8 100644 --- a/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestCreateData.java +++ b/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestCreateData.java @@ -28,8 +28,8 @@ public class TestCreateData { // session init session = new Session.Builder() - .host("192.168.150.100") - // .host("172.20.31.6") + // .host("192.168.150.100") + .host("172.20.31.61") .port(6667) .username("root") .password("root") diff --git a/datax-example/datax-example-iotdb/src/test/resources/iotdb2mysql.json b/datax-example/datax-example-iotdb/src/test/resources/iotdb2mysql.json index b0c0e952..b77bfbee 100644 --- a/datax-example/datax-example-iotdb/src/test/resources/iotdb2mysql.json +++ b/datax-example/datax-example-iotdb/src/test/resources/iotdb2mysql.json @@ -12,18 +12,16 @@ "parameter": { "username": "root", "password": "root", - "host": "192.168.150.100", + "host": "172.20.31.61", "port": 6667, "fetchSize": 10000, "version": "V_1_0", "timeColumnPosition": 0, - "finalSqls":[ + "querySqls":[ ], "device": "root.cgn.device", - "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", - "beginDateTime": "2023-03-07 12:00:00", - "endDateTime": "2024-03-07 19:00:00", - "where": "" + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00" } }, "writer": { @@ -32,7 +30,7 @@ "username": "root", "password": "toy123", "writeMode": "insert", - "#需要提前建表": "CREATE TABLE device (`time` BIGINT,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);", + "#需要提前建表": "CREATE TABLE device (`time` DATETIME,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);", "column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], "session": [ "set session sql_mode='ANSI'" @@ -45,7 +43,7 @@ "table": [ "device" ], - "#": "下面的URL需要把中括号去掉,否则报错,mysqlreader的bug,未修改", + "#": "下面的URL需要把中括号去掉,否则报错,mysqlwriter的bug,未修改", "jdbcUrl": "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" } ] diff --git a/datax-example/datax-example-iotdb/src/test/resources/iotdb2stream.json b/datax-example/datax-example-iotdb/src/test/resources/iotdb2stream.json index cabc54eb..ce8f401a 100644 --- a/datax-example/datax-example-iotdb/src/test/resources/iotdb2stream.json +++ b/datax-example/datax-example-iotdb/src/test/resources/iotdb2stream.json @@ -21,14 +21,12 @@ "fetchSize": 10000, "version": "V_1_0", "timeColumnPosition": 0, - "finalSqls":[ + "querySqls":[ ], "device": "root.cgn.device", - "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", - "beginDateTime": "2023-03-07 12:00:00", - "endDateTime": "2024-03-07 19:00:00", - "where": "" + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00" } }, "writer": { diff --git a/datax-example/datax-example-iotdb/src/test/resources/iotdb2txt.json b/datax-example/datax-example-iotdb/src/test/resources/iotdb2txt.json index c962e56d..990411c5 100644 --- a/datax-example/datax-example-iotdb/src/test/resources/iotdb2txt.json +++ b/datax-example/datax-example-iotdb/src/test/resources/iotdb2txt.json @@ -12,20 +12,18 @@ "parameter": { "username": "root", "password": "root", - "host": "192.168.150.100", + "host": "172.20.31.61", "port": 6667, "fetchSize": 10000, "version": "V_1_0", "timeColumnPosition": 0, - "finalSqls":[ + "querySqls":[ "select * from root.cgn.device", "select A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU from root.cgn.device" ], "device": "root.cgn.device", - "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", - "beginDateTime": "2023-03-07 12:00:00", - "endDateTime": "2024-03-07 19:00:00", - "where": "" + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00" } }, "writer": { diff --git a/datax-example/datax-example-iotdb/src/test/resources/mysql2iotdb.json b/datax-example/datax-example-iotdb/src/test/resources/mysql2iotdb.json index 9f7f1d4c..d66d9c94 100644 --- a/datax-example/datax-example-iotdb/src/test/resources/mysql2iotdb.json +++ b/datax-example/datax-example-iotdb/src/test/resources/mysql2iotdb.json @@ -26,16 +26,17 @@ "parameter": { "username": "root", "password": "root", - "host": "192.168.150.100", + "host": "172.20.31.61", "port": 6667, "fetchSize": 10000, "version": "V_1_0", - "##": "Reader中时间列的位置,默认0列", "timeColumnPosition": 0, "insertBatchSize": 1000, "device": "root.cgn.device", - "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", - "deleteExistTimeseries": false + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "preSql": [ + "delete timeseries root.cgn.device.**" + ] } } } diff --git a/iotdbreader/doc/iotdbreader-CN.md b/iotdbreader/doc/iotdbreader-CN.md index 88278529..17f26b57 100644 --- a/iotdbreader/doc/iotdbreader-CN.md +++ b/iotdbreader/doc/iotdbreader-CN.md @@ -33,14 +33,15 @@ IoTDBReader 通过 IoTDB 的 原生java session 查询获取数据。 "port": 6667, "fetchSize": 10000, "version": "V_1_0", + "##": "时间列插入DataX的Record中的位置,默认第0列", "timeColumnPosition": 0, - "finalSqls":[ + "##":"写了querySqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句", + "querySqls":[ ], "device": "root.cgn.device", - "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", - "beginDateTime": "2023-03-07 12:00:00", - "endDateTime": "2024-03-07 19:00:00", - "where": "" + "##":"时间列不属于测点", + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00" } }, "writer": { @@ -49,7 +50,7 @@ IoTDBReader 通过 IoTDB 的 原生java session 查询获取数据。 "username": "root", "password": "toy123", "writeMode": "insert", - "#需要提前建表": "CREATE TABLE device (`time` BIGINT,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);", + "#需要提前建表": "CREATE TABLE device (`time` DATETIME,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);", "column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], "session": [ "set session sql_mode='ANSI'" @@ -96,14 +97,13 @@ IoTDBReader 通过 IoTDB 的 原生java session 查询获取数据。 "fetchSize": 10000, "version": "V_1_0", "timeColumnPosition": 0, - "finalSqls":[ + "##":"写了querySqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句", + "querySqls":[ "select * from root.cgn.device", "select A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU from root.cgn.device" ], "device": "", "measurements": "", - "beginDateTime": "", - "endDateTime": "", "where": "" } }, @@ -147,42 +147,34 @@ IoTDBReader 通过 IoTDB 的 原生java session 查询获取数据。 * 描述:时间列在Record中列的位置 * 必选:否 * 默认值:0 -* finalSqls +* querySqls * 描述:直接写多行SQL,可以并行读取,此时下面的参数失效。 * 必选:否 * 默认值: * device * 描述:IoTDB中的概念,可理解为mysql中的表。 - * 必选:finalSqls为空时必选 + * 必选:querySqls为空时必选 * 默认值:无 * measurements * 描述:IoTDB中的概念,可理解为mysql中的字段。 - * 必选:finalSqls为空时必选 - * 默认值:无 -* beginDateTime - * 描述:SQL查询时的数据的开始时间 - * 必选:finalSqls为空时必选 - * 默认值:无 -* measurements - * 描述:SQL查询时的数据的结束时间 - * 必选:否 + * 必选:querySqls为空时必选 * 默认值:无 * where - * 描述:额外的条件 + * 描述:查询条件 * 必选:否 * 默认值:无 ### 3.3 类型转换 | IoTDB 数据类型 | DataX 内部类型 | -|-----------------|------------| -| INT32 | Int | -| INT64,TIMESTAMP | Long | -| FLOAT | FLOAT | -| DOUBLE | Double | -| BOOLEAN | Bool | -| DATE | Date | -| STRING,TEXT | String | +|-----------------|--------| +| INT32 | Int | +| INT64,TIMESTAMP | Long | +| FLOAT | FLOAT | +| DOUBLE | Double | +| BOOLEAN | Bool | +| DATE | Date | +| STRING,TEXT | String | ## 4 性能报告 diff --git a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java index 1f446139..b285e906 100644 --- a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java +++ b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java @@ -2,7 +2,9 @@ package com.alibaba.datax.plugin.reader.iotdbreader; import com.alibaba.datax.common.element.*; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.AbstractTaskPlugin; import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.spi.Reader; import com.alibaba.datax.common.util.Configuration; import org.apache.iotdb.isession.SessionDataSet; @@ -13,6 +15,7 @@ import org.apache.iotdb.session.Session; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.Field; import org.apache.tsfile.read.common.RowRecord; +import org.apache.tsfile.read.common.block.column.NullColumn; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,42 +72,31 @@ public class IoTDBReader extends Reader { public List split(int adviceNumber) { // 每个config对应一个task List configs = new ArrayList<>(); - List queryList = this.jobConf.getList(Key.FINAL_SQLS, String.class); + List queryList = this.jobConf.getList(Key.QUERY_SQLS, String.class); if (queryList == null || queryList.size() == 0){ Configuration clone = this.jobConf.clone(); - // TODO 同时读取多个设备?有没有必要? String device = this.jobConf.getString(Key.DEVICE); - // 测点是一个逗号分隔的测点字符串或"*" - String measurements = this.jobConf.getString(Key.MEASUREMENTS); - String beginDateTime = this.jobConf.getString(Key.BEGIN_DATETIME); - String endDateTime = this.jobConf.getString(Key.END_DATETIME); + List measurements = this.jobConf.getList(Key.MEASUREMENTS, String.class); String where = this.jobConf.getString(Key.WHERE); StringBuilder sb = new StringBuilder(); - sb.append("select ").append(measurements); + sb.append("select ").append(String.join(",", measurements)); sb.append(" from ").append(device); - sb.append(" where "); - if (beginDateTime != null && !beginDateTime.isEmpty()){ - sb.append("time >= ").append(beginDateTime); - } - if (endDateTime != null && !endDateTime.isEmpty()){ - sb.append(" and time <= ").append(endDateTime); - } if (where != null && !where.isEmpty()){ - sb.append(" and ").append(where); + sb.append(" where ").append(where); } clone.set(Key.QUERY_SQL, sb.toString()); configs.add(clone); - //TODO DataX中是单线程,实际上底层session中是多线程读取。根据什么条件切分多线程? + //DataX中一个查询是单线程,实际上底层session中是多线程读取。 }else{ // 直接读取最终SQL for (String query : queryList) { Configuration clone = this.jobConf.clone(); - clone.remove(Key.FINAL_SQLS); + clone.remove(Key.QUERY_SQLS); clone.set(Key.QUERY_SQL, query); configs.add(clone); } } - LOG.info("configs: {}", configs); + // LOG.info("configs: {}", configs); return configs; } @@ -144,6 +136,7 @@ public class IoTDBReader extends Reader { * 最终的查询SQL,交给session执行。 */ private String querySql; + private TaskPluginCollector taskPluginCollector; @Override public void init() { @@ -171,6 +164,7 @@ public class IoTDBReader extends Reader { this.timeColumnPosition = (taskConf.getInt(Key.TIME_COLUMN_POSITION) == null) ? 0 : taskConf.getInt(Key.TIME_COLUMN_POSITION); this.querySql = taskConf.getString(Key.QUERY_SQL); + taskPluginCollector = super.getTaskPluginCollector(); } @Override @@ -181,7 +175,7 @@ public class IoTDBReader extends Reader { session.close(); } } catch (IoTDBConnectionException e) { - throw new RuntimeException(e); + LOG.info(e.getMessage()); } } @@ -199,58 +193,60 @@ public class IoTDBReader extends Reader { // IoTDB中的行 RowRecord rowRecord = dataSet.next(); List fields = rowRecord.getFields(); - // 除time列外的其他列遍历类型后转换 - for (int i = 0; i < fields.size(); i++) { - if (i == timeColumnPosition){ - // time列插入指定位置 - long timestamp = rowRecord.getTimestamp(); - record.addColumn(new LongColumn(timestamp)); - } - Field field = fields.get(i); - TSDataType dataType = field.getDataType(); - // null类型暂时转为字符串 TODO 有没有其他处理方式? - if (dataType == null) { - record.addColumn(new StringColumn("null")); - continue; - } - switch (dataType) { - // TODO 把所有数据类型都测一遍 - case BOOLEAN: - record.addColumn(new BoolColumn(field.getBoolV())); - break; - case INT32: - record.addColumn(new LongColumn(field.getIntV())); - break; - case INT64: - case TIMESTAMP: - record.addColumn(new LongColumn(field.getLongV())); - break; - case FLOAT: - record.addColumn(new DoubleColumn(field.getFloatV())); - break; - case DOUBLE: - // TODO 为什么DataX推荐用String?区别是什么? - record.addColumn(new DoubleColumn(field.getDoubleV())); - break; - case STRING: - case TEXT: - record.addColumn(new StringColumn(field.getStringValue())); - break; - case DATE: - record.addColumn(new DateColumn(Date.valueOf(field.getDateV()))); - break; - default: - // TODO 其他类型怎么处理? - LOG.info("类型错误:"+ field.getDataType()); + try { + // 除time列外的其他列遍历类型后转换 + for (int i = 0; i < fields.size(); i++) { + if (i == timeColumnPosition){ + // time列插入指定位置,时间列不在fields中,需要单独处理。不能为null + long timestamp = rowRecord.getTimestamp(); + record.addColumn(new DateColumn(timestamp)); + } + Field field = fields.get(i); + TSDataType dataType = field.getDataType(); + if (dataType == null) { + // 需要写插件支持处理null数据,否则会空指向异常,这里先当成脏数据 + // record.addColumn(null); + // continue; + throw new RuntimeException("null datatype"); + } + switch (dataType) { + case BOOLEAN: + record.addColumn(new BoolColumn(field.getBoolV())); + break; + case INT32: + record.addColumn(new LongColumn(field.getIntV())); + break; + case INT64: + case TIMESTAMP: + record.addColumn(new LongColumn(field.getLongV())); + break; + case FLOAT: + record.addColumn(new DoubleColumn(field.getFloatV())); + break; + case DOUBLE: + record.addColumn(new DoubleColumn(field.getDoubleV())); + break; + case STRING: + case TEXT: + record.addColumn(new StringColumn(field.getStringValue())); + break; + case DATE: + record.addColumn(new DateColumn(Date.valueOf(field.getDateV()))); + break; + default: + throw new RuntimeException("Unsupported data type: " + dataType); + } } + // 发送 + recordSender.sendToWriter(record); + }catch (RuntimeException e){ + LOG.info(e.getMessage()); + this.taskPluginCollector.collectDirtyRecord(record, e); } - // 发送 - recordSender.sendToWriter(record); } } catch (StatementExecutionException | IoTDBConnectionException e) { throw new RuntimeException(e); } } } - } diff --git a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/Key.java b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/Key.java index 56a5b637..0283be6a 100644 --- a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/Key.java +++ b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/Key.java @@ -7,12 +7,10 @@ public class Key { public static final String PORT = "port"; public static final String FETCH_SIZE = "fetchSize"; public static final String VERSION = "version"; - public static final String FINAL_SQLS = "finalSqls"; + public static final String QUERY_SQLS = "querySqls"; public static final String QUERY_SQL = "querySql"; public static final String TIME_COLUMN_POSITION = "timeColumnPosition"; public static final String DEVICE = "device"; public static final String MEASUREMENTS = "measurements"; - public static final String BEGIN_DATETIME = "beginDateTime"; - public static final String END_DATETIME = "endDateTime"; public static final String WHERE = "where"; } diff --git a/iotdbreader/src/main/resources/plugin.json b/iotdbreader/src/main/resources/plugin.json index 84dfa086..39d6b897 100644 --- a/iotdbreader/src/main/resources/plugin.json +++ b/iotdbreader/src/main/resources/plugin.json @@ -5,6 +5,6 @@ "useScene": "data migration to iotdb", "mechanism": "use iotdb-java-session to write data." }, - "developer": "lihaoran-Timecho" + "developer": "timecho.com" } diff --git a/iotdbreader/src/main/resources/plugin_job_template.json b/iotdbreader/src/main/resources/plugin_job_template.json index 6639a103..14287ed7 100644 --- a/iotdbreader/src/main/resources/plugin_job_template.json +++ b/iotdbreader/src/main/resources/plugin_job_template.json @@ -9,8 +9,8 @@ "version": "V_1_0", "##": "时间列插入DataX的Record中的位置,默认第0列", "timeColumnPosition": 0, - "##":"写了finalSqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句", - "finalSqls": [ + "##":"写了querySqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句", + "querySqls": [ "select * from root.cgn.device", "select * from root.cgn.device", "select * from root.cgn.device", @@ -18,10 +18,8 @@ "select * from root.cgn.device" ], "device": "root.cgn.device", - "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", - "##": "开始时间必填,否则SQL错误", - "beginDateTime": "2023-03-07 12:00:00", - "endDateTime": "2024-03-07 19:00:00", - "where": "" + "##":"时间列不属于测点", + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00" } } \ No newline at end of file diff --git a/iotdbwriter/doc/iotdbwriter-CN.md b/iotdbwriter/doc/iotdbwriter-CN.md index 7921b233..508e367a 100644 --- a/iotdbwriter/doc/iotdbwriter-CN.md +++ b/iotdbwriter/doc/iotdbwriter-CN.md @@ -57,10 +57,11 @@ IoTDB中设备与列的概念见IoTDB官方文档。 "version": "V_1_0", "##": "Reader中时间列的位置,默认0列", "timeColumnPosition": 0, - "insertBatchSize": 1000, + "batchSize": 1000, "device": "root.cgn.device", - "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", - "deleteExistTimeseries": false + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "###": "在插入之前,预先执行的SQL,默认为空", + "preSql": [] } } } @@ -112,10 +113,10 @@ IoTDB中设备与列的概念见IoTDB官方文档。 * 描述:每batchSize条record为一个batch进行写入 * 必选:否 * 默认值:1000 -* deleteExistTimeseries - * 描述:插入前是否删除该device下的所有数据 +* preSql + * 描述:插入前是否预先执行SQL * 必选:否 - * 默认值:false + * 默认值:无 ### 3.3 类型转换 diff --git a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java index 2e4de1ec..ddade9e3 100644 --- a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java +++ b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java @@ -15,6 +15,7 @@ import org.apache.tsfile.enums.TSDataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Date; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -61,7 +62,7 @@ public class IoTDBWriter extends Writer { Configuration clone = this.jobConf.clone(); configs.add(clone); } - LOG.info("configs: {}", configs); + // LOG.info("configs: {}", configs); return configs; } @@ -74,26 +75,26 @@ public class IoTDBWriter extends Writer { public static class Task extends Writer.Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); private Configuration taskConf; - /** - * IoTDB原生读写工具 - */ + + // IoTDB原生读写工具 private Session session; - /** - * 是否在插入前删除已有的时间序列?默认false - */ - private boolean deleteExistTimeseries; + // 是否在插入前删除已有的时间序列,为""表示不执行 + // private String deleteBeforeInsert; - /** - * 插入批次大小 - */ + // 插入批次大小 private int insertBatchSize; - /** - * IoTDB中的时间列插入的位置,默认为0,即第一列。 - */ + // IoTDB中的时间列插入的位置,默认为0,即第一列。 private int timeColumnPosition; + // 处理脏数据 + private TaskPluginCollector taskPluginCollector; + + // 预先执行的SQL语句 + private List preSqls; + + @Override public void init() { // 获取与本Task相关的配置,是Job的split方法返回的配置列表中的其中一个。 @@ -116,28 +117,27 @@ public class IoTDBWriter extends Writer { } // 获取参数,否则默认值 - insertBatchSize = (taskConf.getInt(Key.INSERT_BATCH_SIZE) == null) ? 1000 : taskConf.getInt(Key.INSERT_BATCH_SIZE); + insertBatchSize = (taskConf.getInt(Key.BATCH_SIZE) == null) ? 1000 : taskConf.getInt(Key.BATCH_SIZE); timeColumnPosition = (taskConf.getInt(Key.TIME_COLUMN_POSITION) == null) ? 0 : taskConf.getInt(Key.TIME_COLUMN_POSITION); - deleteExistTimeseries = (taskConf.getBool(Key.DELETE_EXIST_TIMESERIES) == null) ? false : taskConf.getBool(Key.DELETE_EXIST_TIMESERIES); + preSqls = (taskConf.getList(Key.PRE_SQL, String.class) == null) ? new ArrayList<>() : taskConf.getList(Key.PRE_SQL, String.class); + taskPluginCollector = super.getTaskPluginCollector(); } @Override public void prepare() { - // 是否先删除已有的时间序列 - try { - if (deleteExistTimeseries){ - if (session.checkTimeseriesExists(taskConf.getString(Key.DEVICE) + ".**")) { - session.deleteTimeseries(taskConf.getString(Key.DEVICE) + ".**"); - LOG.info("===========删除已有的时间序列完成=============="); - }else { - LOG.info("===========不存在已有时间序列=============="); + if (preSqls.size() != 0){ + for (String sql : preSqls) { + try { + session.executeNonQueryStatement(sql); + + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); } } - } catch (IoTDBConnectionException | StatementExecutionException e) { - throw new RuntimeException(e); + LOG.info("=======Complated preSqls======="); } - // 是否创建测点时间序列?不需要,IoTDB会自动创建时间序列。 + // IoTDB会自动创建时间序列,无需提前创建 } @Override @@ -147,7 +147,7 @@ public class IoTDBWriter extends Writer { session.close(); } } catch (IoTDBConnectionException e) { - throw new RuntimeException(e); + LOG.info(e.getMessage()); } } @@ -170,68 +170,74 @@ public class IoTDBWriter extends Writer { // 获取Record记录,传输结束返回null int count; // 统计插入记录数 for (count = 0; (record = lineReceiver.getFromReader()) != null; count++) { - // LOG.info("record:" + record); // 处理时间列 timestamps.add(record.getColumn(timeColumnPosition).asLong()); // 处理测点 - List measurements = Arrays.asList(taskConf.getString(Key.MEASUREMENTS).split(",")); + List measurements = taskConf.getList(Key.MEASUREMENTS, String.class); measurementsList.add(measurements); // 处理类型和值 List types = new ArrayList<>(); List values = new ArrayList<>(); - // List values = new ArrayList<>(); - for (int i = 0; i < record.getColumnNumber(); i++) { - if (i == timeColumnPosition){ - continue; // 跳过时间列 - } - Column col = record.getColumn(i); - switch (col.getType()) { - case BOOL: - types.add(TSDataType.BOOLEAN); - values.add(col.asBoolean()); - break; - case INT: - types.add(TSDataType.INT32); - values.add((Integer) col.getRawData()); - break; - case LONG: - types.add(TSDataType.INT64); - values.add(col.asLong()); - break; - case DOUBLE: - types.add(TSDataType.DOUBLE); - values.add(col.asDouble()); - break; - case STRING: - types.add(TSDataType.STRING); - values.add(col.asString()); - break; - case DATE: - types.add(TSDataType.DATE); - values.add(col.asDate()); - break; - default: - throw new RuntimeException("unsupported type:" + col.getType()); + try{ + for (int i = 0; i < record.getColumnNumber(); i++) { + if (i == timeColumnPosition){ + continue; // 跳过时间列 + } + Column col = record.getColumn(i); + switch (col.getType()) { + case BOOL: + types.add(TSDataType.BOOLEAN); + values.add(col.asBoolean()); + break; + case INT: + types.add(TSDataType.INT32); + values.add((Integer) col.getRawData()); + break; + case LONG: + types.add(TSDataType.INT64); + values.add(col.asLong()); + break; + case DOUBLE: + types.add(TSDataType.DOUBLE); + values.add(col.asDouble()); + break; + case NULL: + // IoTDB可以处理null + types.add(null); + values.add(null); + break; + case STRING: + types.add(TSDataType.STRING); + values.add(col.asString()); + break; + case DATE: + types.add(TSDataType.DATE); + values.add(col.asDate()); + break; + case BAD: + default: + throw new RuntimeException("unsupported type:" + col.getType()); + } } + typesList.add(types); + valuesList.add(values); + }catch (RuntimeException e){ + LOG.info(e.getMessage()); + taskPluginCollector.collectDirtyRecord(record, e); } - typesList.add(types); - valuesList.add(values); if (count != 0 && count % insertBatchSize == 0) { session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList); - LOG.info("已插入"+count+"条数据"); - measurementsList.clear(); - valuesList.clear(); - typesList.clear(); timestamps.clear(); + measurementsList.clear(); + typesList.clear(); valuesList.clear(); } } if (!timestamps.isEmpty()){ session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList); - LOG.info("已插入剩余数据:" + timestamps.size() + "条"); } - LOG.info("已插入所有数据:" + (count-1) + "条"); + LOG.info("========= task all data inserted, total record: " + (count-1)); }catch (IoTDBConnectionException | StatementExecutionException e) { throw new RuntimeException(e); } diff --git a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/Key.java b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/Key.java index f702d979..552690f7 100644 --- a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/Key.java +++ b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/Key.java @@ -9,6 +9,6 @@ public class Key { public static final String TIME_COLUMN_POSITION = "timeColumnPosition"; public static final String DEVICE = "device"; public static final String MEASUREMENTS = "measurements"; - public static final String DELETE_EXIST_TIMESERIES = "deleteExistTimeseries"; - public static final String INSERT_BATCH_SIZE = "insertBatchSize"; + public static final String PRE_SQL = "preSql"; + public static final String BATCH_SIZE = "batchSize"; } diff --git a/iotdbwriter/src/main/resources/plugin.json b/iotdbwriter/src/main/resources/plugin.json index ed7040c4..9d034718 100644 --- a/iotdbwriter/src/main/resources/plugin.json +++ b/iotdbwriter/src/main/resources/plugin.json @@ -5,5 +5,5 @@ "useScene": "data migration to iotdb", "mechanism": "use iotdb-java-session to write data." }, - "developer": "lihaoran-Timecho" + "developer": "timecho.com" } \ No newline at end of file diff --git a/iotdbwriter/src/main/resources/plugin_job_template.json b/iotdbwriter/src/main/resources/plugin_job_template.json index 739cead1..796cd135 100644 --- a/iotdbwriter/src/main/resources/plugin_job_template.json +++ b/iotdbwriter/src/main/resources/plugin_job_template.json @@ -8,9 +8,12 @@ "version": "V_1_0", "##": "注意是Reader插件读取到的数据中时间列的位置,不是该插件,默认0列", "timeColumnPosition": 0, - "insertBatchSize": 1000, + "batchSize": 1000, "device": "root.cgn.device", - "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", - "deleteExistTimeseries": false + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "###": "在插入之前,执行删除操作,为空或不配置表示不执行", + "preSql": [ + "delete timeseries root.cgn.device.**" + ] } } \ No newline at end of file