From 65f704b7772a1fadc2f308600e310c4762d554c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E6=B5=A9=E7=84=B6?= Date: Thu, 20 Jun 2024 10:07:47 +0800 Subject: [PATCH 1/7] =?UTF-8?q?=E5=AE=9E=E7=8E=B0DataX=E8=AF=BB=E5=86=99De?= =?UTF-8?q?mo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- iotdbreader/doc/iotdbreader-CN.md | 4 + iotdbreader/pom.xml | 102 ++++++++++ iotdbreader/src/main/assembly/package.xml | 34 ++++ .../reader/iotdbreader/IoTDBReader.java | 190 ++++++++++++++++++ .../iotdbreader/IoTDBReaderErrorCode.java | 28 +++ iotdbreader/src/main/resources/plugin.json | 10 + .../main/resources/plugin_job_template.json | 23 +++ iotdbwriter/doc/iotdbwriter-CN.md | 0 iotdbwriter/pom.xml | 102 ++++++++++ iotdbwriter/src/main/assembly/package.xml | 34 ++++ .../writer/iotdbwriter/IoTDBWriter.java | 165 +++++++++++++++ .../iotdbwriter/IoTDBWriterErrorCode.java | 29 +++ iotdbwriter/src/main/resources/plugin.json | 9 + .../main/resources/plugin_job_template.json | 20 ++ 14 files changed, 750 insertions(+) create mode 100644 iotdbreader/doc/iotdbreader-CN.md create mode 100644 iotdbreader/pom.xml create mode 100644 iotdbreader/src/main/assembly/package.xml create mode 100644 iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java create mode 100644 iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReaderErrorCode.java create mode 100644 iotdbreader/src/main/resources/plugin.json create mode 100644 iotdbreader/src/main/resources/plugin_job_template.json create mode 100644 iotdbwriter/doc/iotdbwriter-CN.md create mode 100644 iotdbwriter/pom.xml create mode 100644 iotdbwriter/src/main/assembly/package.xml create mode 100644 iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java create mode 100644 iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriterErrorCode.java create mode 100644 iotdbwriter/src/main/resources/plugin.json create mode 100644 iotdbwriter/src/main/resources/plugin_job_template.json diff --git a/iotdbreader/doc/iotdbreader-CN.md b/iotdbreader/doc/iotdbreader-CN.md new file mode 100644 index 00000000..d4b2d70f --- /dev/null +++ b/iotdbreader/doc/iotdbreader-CN.md @@ -0,0 +1,4 @@ + + +## 快速介绍 +iotdbreader用来读取iotdb中的数据,然后传输到其他数据库。 diff --git a/iotdbreader/pom.xml b/iotdbreader/pom.xml new file mode 100644 index 00000000..b531891e --- /dev/null +++ b/iotdbreader/pom.xml @@ -0,0 +1,102 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + iotdbreader + + + 8 + 8 + 1.3.3-SNAPSHOT + + + + + com.alibaba.datax + datax-core + 0.0.1-SNAPSHOT + test + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.iotdb + iotdb-session + ${iotdb.session.version} + + + org.apache.iotdb + node-commons + ${iotdb.session.version} + + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.12.4 + + + + **/*Test.java + + + + + true + + + + + + + \ No newline at end of file diff --git a/iotdbreader/src/main/assembly/package.xml b/iotdbreader/src/main/assembly/package.xml new file mode 100644 index 00000000..f01364f1 --- /dev/null +++ b/iotdbreader/src/main/assembly/package.xml @@ -0,0 +1,34 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/iotdbreader + + + target/ + + iotdbreader-0.0.1-SNAPSHOT.jar + + plugin/writer/iotdbreader + + + + + + false + plugin/reader/iotdbreader/libs + runtime + + + diff --git a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java new file mode 100644 index 00000000..cd26cd46 --- /dev/null +++ b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java @@ -0,0 +1,190 @@ +package com.alibaba.datax.plugin.reader.iotdbreader; + +import com.alibaba.datax.common.element.*; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.ErrorCode; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; +import org.apache.commons.lang3.StringUtils; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.isession.util.Version; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.Field; +import org.apache.tsfile.read.common.RowRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UnsupportedEncodingException; +import java.sql.*; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class IoTDBReader extends Reader { + public static class Job extends Reader.Job { + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + private Configuration originalConfig; + + /** + * Job对象初始化工作 + */ + @Override + public void init() { + // TODO 配置文件还没规划格式 + // 通过super.getPluginJobConf()获取与本插件相关的配置。 + // 读插件获得配置中reader部分,写插件获得writer部分。 + this.originalConfig = super.getPluginJobConf(); + // TODO 检查各种参数是否正确 + } + + /** + * Job对象自身的销毁工作。 + */ + @Override + public void destroy() { + + } + + /** + * 全局的后置工作,比如mysqlwriter同步完影子表后的rename操作。 + */ + @Override + public void post() { + + } + + /** + * 将Job拆分Task。 + * @param adviceNumber 框架建议的拆分数,一般是运行时所配置的并发度。 + * @return Task的配置列表。 + */ + @Override + public List split(int adviceNumber) { + // TODO 暂时拆分为adviceNumber个,不知道是怎么切割的。。。后序需要继续测试 + // TODO DEBUG看看是不是一个配置对应一个Task,一个Task启动配置文件中的连接,执行一个查询。 + // 本机增加100个配置文件,写入txt,生成100个txt文件,运行如下 + //任务启动时刻 : 2024-06-19 16:21:13 + // 任务结束时刻 : 2024-06-19 16:21:24 + // 任务总计耗时 : 10s + // 任务平均流量 : 42.93MB/s + // 记录写入速度 : 90010rec/s + // 读出记录总数 : 900100 + // 读写失败总数 : 0 + List configurations = new ArrayList<>(); + for (int i = 0; i < 100; i++){ + configurations.add(this.originalConfig); + } + return configurations; + } + } + + public static class Task extends Reader.Task { + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + + private Configuration readerSliceConfig; + private String mandatoryEncoding; + + private Session session; + + @Override + public void init() { + // session init + session = + new Session.Builder() + .host("192.168.150.100") + .port(6667) + .username("root") + .password("root") + .version(Version.V_0_13) + .build(); + // open session, close RPCCompression + try { + session.open(false); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + + // set session fetchSize + session.setFetchSize(10000); + } + + @Override + public void destroy() { + try { + if (session != null){ + session.close(); + } + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + } + + @Override + public void startRead(RecordSender recordSender) { + try { + // TODO 把流程调通后把SQL语句抽出去 + // SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1"); + SessionDataSet dataSet = session.executeQueryStatement("select * from root.cgn.device"); + // System.out.println(dataSet.getColumnNames()); + List columnTypes = dataSet.getColumnTypes(); + // System.out.println(columnTypes); + int columnNums = columnTypes.size(); + // dataSet.setFetchSize(1024); + while (dataSet.hasNext()) { + RowRecord rowRecord = dataSet.next(); + // 将iotdb中的行 转为datax中的record + Record record = recordSender.createRecord(); + // time列直接处理 + long timestamp = rowRecord.getTimestamp(); + record.addColumn(new LongColumn(timestamp)); + List fields = rowRecord.getFields(); + // 其他列遍历类型后转换 + for (Field field : fields) { + TSDataType dataType = field.getDataType(); + // null类型暂时转为字符串 + if (dataType == null) { + record.addColumn(new StringColumn("null")); + continue; + } + switch (dataType) { + // TODO 把所有数据类型都测一遍 + case BOOLEAN: + record.addColumn(new BoolColumn(field.getBoolV())); + break; + case INT32: + record.addColumn(new LongColumn(field.getIntV())); + break; + case INT64: + case TIMESTAMP: + record.addColumn(new LongColumn(field.getLongV())); + break; + case FLOAT: + record.addColumn(new DoubleColumn(field.getFloatV())); + break; + case DOUBLE: + record.addColumn(new DoubleColumn(field.getDoubleV())); + break; + case STRING: + case TEXT: + record.addColumn(new StringColumn(field.getStringValue())); + break; + case DATE: + record.addColumn(new DateColumn(Date.valueOf(field.getDateV()))); + break; + default: + System.out.println("类型错误"+field.getDataType()); + } + } + recordSender.sendToWriter(record); + } + } catch (StatementExecutionException | IoTDBConnectionException e) { + throw new RuntimeException(e); + } + } + } + +} diff --git a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReaderErrorCode.java b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReaderErrorCode.java new file mode 100644 index 00000000..e61aa53e --- /dev/null +++ b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReaderErrorCode.java @@ -0,0 +1,28 @@ +package com.alibaba.datax.plugin.reader.iotdbreader; + +import com.alibaba.datax.common.spi.ErrorCode; + +public class IoTDBReaderErrorCode implements ErrorCode { + private final String code; + private final String description; + + public IoTDBReaderErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s].", this.code, this.description); + } +} diff --git a/iotdbreader/src/main/resources/plugin.json b/iotdbreader/src/main/resources/plugin.json new file mode 100644 index 00000000..84dfa086 --- /dev/null +++ b/iotdbreader/src/main/resources/plugin.json @@ -0,0 +1,10 @@ +{ + "name": "iotdbreader", + "class": "com.alibaba.datax.plugin.reader.iotdbreader.IoTDBReader", + "description": { + "useScene": "data migration to iotdb", + "mechanism": "use iotdb-java-session to write data." + }, + "developer": "lihaoran-Timecho" +} + diff --git a/iotdbreader/src/main/resources/plugin_job_template.json b/iotdbreader/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..0722bf0e --- /dev/null +++ b/iotdbreader/src/main/resources/plugin_job_template.json @@ -0,0 +1,23 @@ +{ + "name": "iotdbreader", + "parameter": { + "user": "", + "password": "", + "connection": [ + { + "table": [ + "" + ], + "sessionUrl": [ + "" + ] + } + ], + "column": [ + "" + ], + "beginDateTime": "", + "endDateTime": "", + "where": "" + } +} \ No newline at end of file diff --git a/iotdbwriter/doc/iotdbwriter-CN.md b/iotdbwriter/doc/iotdbwriter-CN.md new file mode 100644 index 00000000..e69de29b diff --git a/iotdbwriter/pom.xml b/iotdbwriter/pom.xml new file mode 100644 index 00000000..31c747be --- /dev/null +++ b/iotdbwriter/pom.xml @@ -0,0 +1,102 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + iotdbwriter + + + 8 + 8 + 1.3.3-SNAPSHOT + + + + + com.alibaba.datax + datax-core + 0.0.1-SNAPSHOT + test + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.iotdb + iotdb-session + ${iotdb.session.version} + + + org.apache.iotdb + node-commons + ${iotdb.session.version} + + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.12.4 + + + + **/*Test.java + + + + + true + + + + + + + \ No newline at end of file diff --git a/iotdbwriter/src/main/assembly/package.xml b/iotdbwriter/src/main/assembly/package.xml new file mode 100644 index 00000000..ca1ba9ad --- /dev/null +++ b/iotdbwriter/src/main/assembly/package.xml @@ -0,0 +1,34 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/iotdbwriter + + + target/ + + iotdbwriter-0.0.1-SNAPSHOT.jar + + plugin/writer/iotdbwriter + + + + + + false + plugin/writer/iotdbwriter/libs + runtime + + + diff --git a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java new file mode 100644 index 00000000..f4f424c2 --- /dev/null +++ b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java @@ -0,0 +1,165 @@ +package com.alibaba.datax.plugin.writer.iotdbwriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import org.apache.iotdb.isession.util.Version; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.tsfile.enums.TSDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class IoTDBWriter extends Writer { + + public static class Job extends Writer.Job { + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + private Configuration originalConfig; + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + // TODO 检查配置文件参数 + + } + + @Override + public void destroy() { + + } + + @Override + public List split(int mandatoryNumber) { + // TODO 根据什么拆分Task? + List configs = new ArrayList<>(); + for (int i = 0; i < mandatoryNumber; i++) { + configs.add(originalConfig); + } + return configs; + } + } + + public static class Task extends Writer.Task { + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + private Configuration writerSliceConfig; + private TaskPluginCollector taskPluginCollector; + + private Session session; + @Override + public void init() { + this.writerSliceConfig = getPluginJobConf(); + this.taskPluginCollector = super.getTaskPluginCollector(); + + // session init + session = + new Session.Builder() + .host("192.168.150.100") + .port(6667) + .username("root") + .password("root") + .version(Version.V_0_13) + .build(); + // open session, close RPCCompression + try { + session.open(false); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + + // set session fetchSize + session.setFetchSize(10000); + + // // 先删除已有的时间序列 + // if (session.checkTimeseriesExists(device + ".**")) { + // session.deleteTimeseries(device + ".**"); + // System.out.println("删除已有的时间序列完成=============="); + // } + } + + @Override + public void destroy() { + try { + if (session != null){ + session.close(); + } + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + } + + @Override + public void startWrite(RecordReceiver lineReceiver) { + // 暂时实现往一个设备中插入数据(也就是类似一个表) + // 插入1条的原因是这里的只读了一次。 + Record record = null; + for (int count = 1; (record = lineReceiver.getFromReader()) != null; count++) { + System.out.println(record); + int columnNums = record.getColumnNumber(); + // 先实现一条条插入 + String device = "root.test.device2"; + List measurements = new ArrayList<>(); // TODO 这个好像没传过来。 + List types = new ArrayList<>(); + // List values = new ArrayList<>(); + List values = new ArrayList<>(); + for (int i = 0; i < columnNums; i++) { + measurements.add("ss" + i); // 没传过来先用这个 + Column column = record.getColumn(i); + // values.add(column.getRawData()); + values.add(column.getRawData().toString()); + // TODO 需要测试一下 + switch (column.getType()) { + case BOOL: + types.add(TSDataType.BOOLEAN); + break; + case INT: + types.add(TSDataType.INT32); + break; + case LONG: + types.add(TSDataType.INT64); + break; + case DOUBLE: + types.add(TSDataType.DOUBLE); + break; + case STRING: + types.add(TSDataType.STRING); + break; + case DATE: + types.add(TSDataType.DATE); + break; + default: + throw new RuntimeException("unsupported type:" + column.getType()); + } + } + long time = System.currentTimeMillis(); + try { + + // // 创建测点时间序列 + // session.createMultiTimeseries( + // paths, tsDataTypes, tsEncodings, compressionTypes, null, null, attributesList, null); + + // 这个插入失败(报错) + // WARN o.a.i.d.u.ErrorHandlingUtils:65 - + // Status code: EXECUTE_STATEMENT_ERROR(301), operation: insertRecord failed + // java.lang.ArrayIndexOutOfBoundsException: 11 + // session.insertRecord(device, time, measurements, types, values); + // 这个插入成功,record读取一次只有一条数据,需要循环读取。 + session.insertRecord(device, time, measurements,values); + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + + // TODO 构建List,批量写入 + // session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList); + + } + + } + } +} diff --git a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriterErrorCode.java b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriterErrorCode.java new file mode 100644 index 00000000..666fcd75 --- /dev/null +++ b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriterErrorCode.java @@ -0,0 +1,29 @@ +package com.alibaba.datax.plugin.writer.iotdbwriter; + +import com.alibaba.datax.common.spi.ErrorCode; + +public class IoTDBWriterErrorCode implements ErrorCode { + + private final String code; + private final String description; + + public IoTDBWriterErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s].", this.code, this.description); + } +} diff --git a/iotdbwriter/src/main/resources/plugin.json b/iotdbwriter/src/main/resources/plugin.json new file mode 100644 index 00000000..ed7040c4 --- /dev/null +++ b/iotdbwriter/src/main/resources/plugin.json @@ -0,0 +1,9 @@ +{ + "name": "iotdbwriter", + "class": "com.alibaba.datax.plugin.writer.iotdbwriter.IoTDBWriter", + "description": { + "useScene": "data migration to iotdb", + "mechanism": "use iotdb-java-session to write data." + }, + "developer": "lihaoran-Timecho" +} \ No newline at end of file diff --git a/iotdbwriter/src/main/resources/plugin_job_template.json b/iotdbwriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..973b6b56 --- /dev/null +++ b/iotdbwriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,20 @@ +{ + "name": "iotdbwriter", + "parameter": { + "username": "root", + "password": "toy123", + "column": [ + "" + ], + "connection": [ + { + "table": [ + "" + ], + "jdbcUrl": "" + } + ], + "batchSize": 1000, + "ignoreTagsUnmatched": true + } +} \ No newline at end of file From 50068e2b8e51cb49d143f3d99edef3d7495b764c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E6=B5=A9=E7=84=B6?= Date: Mon, 24 Jun 2024 11:42:21 +0800 Subject: [PATCH 2/7] =?UTF-8?q?IoTDBReader=E5=9F=BA=E6=9C=AC=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../reader/iotdbreader/IoTDBReader.java | 179 ++++++++++++------ .../iotdbreader/IoTDBReaderErrorCode.java | 10 +- .../datax/plugin/reader/iotdbreader/Key.java | 18 ++ .../main/resources/plugin_job_template.json | 34 ++-- 4 files changed, 167 insertions(+), 74 deletions(-) create mode 100644 iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/Key.java diff --git a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java index cd26cd46..297e4445 100644 --- a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java +++ b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java @@ -3,10 +3,8 @@ package com.alibaba.datax.plugin.reader.iotdbreader; import com.alibaba.datax.common.element.*; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordSender; -import com.alibaba.datax.common.spi.ErrorCode; import com.alibaba.datax.common.spi.Reader; import com.alibaba.datax.common.util.Configuration; -import org.apache.commons.lang3.StringUtils; import org.apache.iotdb.isession.SessionDataSet; import org.apache.iotdb.isession.util.Version; import org.apache.iotdb.rpc.IoTDBConnectionException; @@ -18,27 +16,96 @@ import org.apache.tsfile.read.common.RowRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.UnsupportedEncodingException; import java.sql.*; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; public class IoTDBReader extends Reader { public static class Job extends Reader.Job { private static final Logger LOG = LoggerFactory.getLogger(Job.class); - private Configuration originalConfig; + private Configuration jobConf; /** * Job对象初始化工作 */ @Override public void init() { - // TODO 配置文件还没规划格式 // 通过super.getPluginJobConf()获取与本插件相关的配置。 - // 读插件获得配置中reader部分,写插件获得writer部分。 - this.originalConfig = super.getPluginJobConf(); - // TODO 检查各种参数是否正确 + this.jobConf = super.getPluginJobConf(); + // 检查各种参数是否正确 + String username = this.jobConf.getString(Key.USERNAME); + if (username == null || username.isEmpty()) { + throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.USERNAME + "] is not set."); + } + String password = this.jobConf.getString(Key.PASSWORD); + if (password == null || password.isEmpty()) { + throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PASSWORD + "] is not set."); + } + String host = this.jobConf.getString(Key.HOST); + if (host == null || host.isEmpty()) { + throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.HOST + "] is not set."); + } + String port = this.jobConf.getString(Key.PORT); + if (port == null || port.isEmpty()) { + throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PORT + "] is not set."); + } + String fetchSize = this.jobConf.getString(Key.FETCH_SIZE); + if (fetchSize == null || fetchSize.isEmpty()) { + throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.FETCH_SIZE + "] is not set."); + } + // 还有一部分参数没检查,没必要了。 + + } + @Override + public void prepare() { + } + + /** + * 将Job拆分Task。 + * @param adviceNumber 框架建议的拆分数,一般是运行时所配置的并发度。 + * @return Task的配置列表,一个配置文件对应一个task。 + */ + @Override + public List split(int adviceNumber) { + // 每个config对应一个task + List configs = new ArrayList<>(); + List queryList = this.jobConf.getList(Key.FINAL_SQLS, String.class); + if (queryList == null || queryList.size() == 0){ + Configuration clone = this.jobConf.clone(); + // TODO 同时读取多个设备?有没有必要? + String device = this.jobConf.getString(Key.DEVICE); + // 测点是一个逗号分隔的字符串或"*" + String measurements = this.jobConf.getString(Key.MEASUREMENTS); + String beginDateTime = this.jobConf.getString(Key.BEGIN_DATETIME); + String endDateTime = this.jobConf.getString(Key.END_DATETIME); + String where = this.jobConf.getString(Key.WHERE); + StringBuilder sb = new StringBuilder(); + sb.append("select ").append(measurements); + sb.append(" from ").append(device); + sb.append(" where "); + if (beginDateTime != null && !beginDateTime.isEmpty()){ + sb.append("time >= ").append(beginDateTime); + } + if (endDateTime != null && !endDateTime.isEmpty()){ + sb.append(" and time <= ").append(endDateTime); + } + if (where != null && !where.isEmpty()){ + sb.append(" and ").append(where); + } + clone.set(Key.QUERY_SQL, sb.toString()); + configs.add(clone); + //TODO DataX中是单线程,实际上底层session中是多线程读取。根据什么条件切分多线程? + }else{ + // 直接读取最终SQL + for (String query : queryList) { + Configuration clone = this.jobConf.clone(); + clone.remove(Key.FINAL_SQLS); + clone.set(Key.QUERY_SQL, query); + configs.add(clone); + } + } + LOG.info("configs: {}", configs); + return configs; } /** @@ -50,36 +117,14 @@ public class IoTDBReader extends Reader { } /** - * 全局的后置工作,比如mysqlwriter同步完影子表后的rename操作。 + * 全局的后置工作。 */ @Override public void post() { } - /** - * 将Job拆分Task。 - * @param adviceNumber 框架建议的拆分数,一般是运行时所配置的并发度。 - * @return Task的配置列表。 - */ - @Override - public List split(int adviceNumber) { - // TODO 暂时拆分为adviceNumber个,不知道是怎么切割的。。。后序需要继续测试 - // TODO DEBUG看看是不是一个配置对应一个Task,一个Task启动配置文件中的连接,执行一个查询。 - // 本机增加100个配置文件,写入txt,生成100个txt文件,运行如下 - //任务启动时刻 : 2024-06-19 16:21:13 - // 任务结束时刻 : 2024-06-19 16:21:24 - // 任务总计耗时 : 10s - // 任务平均流量 : 42.93MB/s - // 记录写入速度 : 90010rec/s - // 读出记录总数 : 900100 - // 读写失败总数 : 0 - List configurations = new ArrayList<>(); - for (int i = 0; i < 100; i++){ - configurations.add(this.originalConfig); - } - return configurations; - } + } public static class Task extends Reader.Task { @@ -88,18 +133,32 @@ public class IoTDBReader extends Reader { private Configuration readerSliceConfig; private String mandatoryEncoding; + /** + * IoTDB原生读写工具 + */ private Session session; + /** + * IoTDB中的时间列插入的位置,默认为0,即第一列。 + */ + private int timeColumnPosition; + /** + * 最终的查询SQL,交给session执行。 + */ + private String querySql; @Override public void init() { + // 获取与本Task相关的配置,是Job的split方法返回的配置列表中的其中一个。 + Configuration taskConf = super.getPluginJobConf(); + // session init session = new Session.Builder() - .host("192.168.150.100") - .port(6667) - .username("root") - .password("root") - .version(Version.V_0_13) + .host(taskConf.getString(Key.HOST)) + .port(taskConf.getInt(Key.PORT)) + .username(taskConf.getString(Key.USERNAME)) + .password(taskConf.getString(Key.PASSWORD)) + .version(Version.valueOf(taskConf.getString(Key.VERSION))) .build(); // open session, close RPCCompression try { @@ -109,11 +168,15 @@ public class IoTDBReader extends Reader { } // set session fetchSize - session.setFetchSize(10000); + session.setFetchSize(taskConf.getInt(Key.FETCH_SIZE)); + + this.timeColumnPosition = (taskConf.getInt(Key.TIME_COLUMN_POSITION) == null) ? 0 : taskConf.getInt(Key.TIME_COLUMN_POSITION); + this.querySql = taskConf.getString(Key.QUERY_SQL); } @Override public void destroy() { + // Task自身的销毁工作。 try { if (session != null){ session.close(); @@ -123,29 +186,30 @@ public class IoTDBReader extends Reader { } } + /** + * 从数据源读数据,写入到RecordSender中。 + * @param recordSender 把数据写入连接Reader和Writer的缓存队列。 + */ @Override public void startRead(RecordSender recordSender) { try { - // TODO 把流程调通后把SQL语句抽出去 - // SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1"); - SessionDataSet dataSet = session.executeQueryStatement("select * from root.cgn.device"); - // System.out.println(dataSet.getColumnNames()); - List columnTypes = dataSet.getColumnTypes(); - // System.out.println(columnTypes); - int columnNums = columnTypes.size(); - // dataSet.setFetchSize(1024); + SessionDataSet dataSet = session.executeQueryStatement(this.querySql); while (dataSet.hasNext()) { - RowRecord rowRecord = dataSet.next(); - // 将iotdb中的行 转为datax中的record + // DataX中的行record Record record = recordSender.createRecord(); - // time列直接处理 - long timestamp = rowRecord.getTimestamp(); - record.addColumn(new LongColumn(timestamp)); + // IoTDB中的行 + RowRecord rowRecord = dataSet.next(); List fields = rowRecord.getFields(); - // 其他列遍历类型后转换 - for (Field field : fields) { + // 除time列外的其他列遍历类型后转换 + for (int i = 0; i < fields.size(); i++) { + if (i == timeColumnPosition){ + // time列插入指定位置 + long timestamp = rowRecord.getTimestamp(); + record.addColumn(new LongColumn(timestamp)); + } + Field field = fields.get(i); TSDataType dataType = field.getDataType(); - // null类型暂时转为字符串 + // null类型暂时转为字符串 TODO 有没有其他处理方式? if (dataType == null) { record.addColumn(new StringColumn("null")); continue; @@ -166,6 +230,7 @@ public class IoTDBReader extends Reader { record.addColumn(new DoubleColumn(field.getFloatV())); break; case DOUBLE: + // TODO 为什么DataX推荐用String?区别是什么? record.addColumn(new DoubleColumn(field.getDoubleV())); break; case STRING: @@ -176,9 +241,11 @@ public class IoTDBReader extends Reader { record.addColumn(new DateColumn(Date.valueOf(field.getDateV()))); break; default: - System.out.println("类型错误"+field.getDataType()); + // TODO 其他类型怎么处理? + LOG.info("类型错误:"+ field.getDataType()); } } + // 发送 recordSender.sendToWriter(record); } } catch (StatementExecutionException | IoTDBConnectionException e) { diff --git a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReaderErrorCode.java b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReaderErrorCode.java index e61aa53e..d46baa09 100644 --- a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReaderErrorCode.java +++ b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReaderErrorCode.java @@ -2,11 +2,17 @@ package com.alibaba.datax.plugin.reader.iotdbreader; import com.alibaba.datax.common.spi.ErrorCode; -public class IoTDBReaderErrorCode implements ErrorCode { +public enum IoTDBReaderErrorCode implements ErrorCode { + + REQUIRED_VALUE("IoTDBReader-00", "parameter value is missing"), + ILLEGAL_VALUE("IoTDBReader-01", "invalid parameter value"), + CONNECTION_FAILED("IoTDBReader-02", "connection error"), + RUNTIME_EXCEPTION("IoTDBWriter-03", "runtime exception"); + private final String code; private final String description; - public IoTDBReaderErrorCode(String code, String description) { + IoTDBReaderErrorCode(String code, String description) { this.code = code; this.description = description; } diff --git a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/Key.java b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/Key.java new file mode 100644 index 00000000..56a5b637 --- /dev/null +++ b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/Key.java @@ -0,0 +1,18 @@ +package com.alibaba.datax.plugin.reader.iotdbreader; + +public class Key { + public static final String USERNAME = "username"; + public static final String PASSWORD = "password"; + public static final String HOST = "host"; + public static final String PORT = "port"; + public static final String FETCH_SIZE = "fetchSize"; + public static final String VERSION = "version"; + public static final String FINAL_SQLS = "finalSqls"; + public static final String QUERY_SQL = "querySql"; + public static final String TIME_COLUMN_POSITION = "timeColumnPosition"; + public static final String DEVICE = "device"; + public static final String MEASUREMENTS = "measurements"; + public static final String BEGIN_DATETIME = "beginDateTime"; + public static final String END_DATETIME = "endDateTime"; + public static final String WHERE = "where"; +} diff --git a/iotdbreader/src/main/resources/plugin_job_template.json b/iotdbreader/src/main/resources/plugin_job_template.json index 0722bf0e..c727d8d7 100644 --- a/iotdbreader/src/main/resources/plugin_job_template.json +++ b/iotdbreader/src/main/resources/plugin_job_template.json @@ -1,23 +1,25 @@ { "name": "iotdbreader", "parameter": { - "user": "", - "password": "", - "connection": [ - { - "table": [ - "" - ], - "sessionUrl": [ - "" - ] - } + "username": "root", + "password": "root", + "host": "192.168.150.100", + "port": 6667, + "fetchSize": 10000, + "version": "V_1_0", + "timeColumnPosition": 0, + "finalSqls": [ + "写了这个默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句", + "select * from root.cgn.device", + "select * from root.cgn.device", + "select * from root.cgn.device", + "select * from root.cgn.device", + "select * from root.cgn.device" ], - "column": [ - "" - ], - "beginDateTime": "", - "endDateTime": "", + "device": "root.cgn.device", + "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", + "beginDateTime": "2023-03-07 12:00:00", + "endDateTime": "2024-03-07 19:00:00", "where": "" } } \ No newline at end of file From 4d514fb1dfed3281baf5dae2a9e4bcc196067a0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E6=B5=A9=E7=84=B6?= Date: Mon, 24 Jun 2024 18:34:36 +0800 Subject: [PATCH 3/7] =?UTF-8?q?=E5=AE=9E=E7=8E=B0DataX=E8=AF=BB=E5=86=99Io?= =?UTF-8?q?TDB=EF=BC=8C=E5=B9=B6=E9=80=9A=E8=BF=87=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- datax-example/datax-example-iotdb/pom.xml | 85 ++++++ .../datax/example/iotdb/TestCreateData.java | 147 +++++++++++ .../datax/example/iotdb/TestIoTDB.java | 64 +++++ .../src/test/resources/iotdb2mysql.json | 58 ++++ .../src/test/resources/iotdb2stream.json | 44 ++++ .../src/test/resources/iotdb2txt.json | 44 ++++ .../src/test/resources/mysql2iotdb.json | 49 ++++ .../src/test/resources/mysql2stream.json | 46 ++++ .../src/test/resources/mysql2txt.json | 41 +++ .../src/test/resources/stream2stream.json | 36 +++ .../src/test/resources/stream2txt.json | 38 +++ .../src/test/resources/testData.txt | 39 +++ .../reader/iotdbreader/IoTDBReader.java | 7 +- .../iotdbreader/IoTDBReaderErrorCode.java | 3 +- .../main/resources/plugin_job_template.json | 4 +- .../writer/iotdbwriter/IoTDBWriter.java | 248 ++++++++++++------ .../iotdbwriter/IoTDBWriterErrorCode.java | 6 +- .../datax/plugin/writer/iotdbwriter/Key.java | 15 ++ .../main/resources/plugin_job_template.json | 25 +- 19 files changed, 893 insertions(+), 106 deletions(-) create mode 100644 datax-example/datax-example-iotdb/pom.xml create mode 100644 datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestCreateData.java create mode 100644 datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestIoTDB.java create mode 100644 datax-example/datax-example-iotdb/src/test/resources/iotdb2mysql.json create mode 100644 datax-example/datax-example-iotdb/src/test/resources/iotdb2stream.json create mode 100644 datax-example/datax-example-iotdb/src/test/resources/iotdb2txt.json create mode 100644 datax-example/datax-example-iotdb/src/test/resources/mysql2iotdb.json create mode 100644 datax-example/datax-example-iotdb/src/test/resources/mysql2stream.json create mode 100644 datax-example/datax-example-iotdb/src/test/resources/mysql2txt.json create mode 100644 datax-example/datax-example-iotdb/src/test/resources/stream2stream.json create mode 100644 datax-example/datax-example-iotdb/src/test/resources/stream2txt.json create mode 100644 datax-example/datax-example-iotdb/src/test/resources/testData.txt create mode 100644 iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/Key.java diff --git a/datax-example/datax-example-iotdb/pom.xml b/datax-example/datax-example-iotdb/pom.xml new file mode 100644 index 00000000..f6857e06 --- /dev/null +++ b/datax-example/datax-example-iotdb/pom.xml @@ -0,0 +1,85 @@ + + + + datax-example + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + datax-example-iotdb + + + 8 + 8 + UTF-8 + + + + + com.alibaba.datax + iotdbreader + 0.0.1-SNAPSHOT + compile + + + com.alibaba.datax + iotdbwriter + 0.0.1-SNAPSHOT + compile + + + + com.alibaba.datax + datax-example-core + 0.0.1-SNAPSHOT + + + com.alibaba.datax + streamreader + 0.0.1-SNAPSHOT + + + com.alibaba.datax + streamwriter + 0.0.1-SNAPSHOT + compile + + + + com.alibaba.datax + txtfilereader + 0.0.1-SNAPSHOT + compile + + + com.alibaba.datax + txtfilewriter + 0.0.1-SNAPSHOT + compile + + + + com.alibaba.datax + mysqlreader + 0.0.1-SNAPSHOT + compile + + + com.alibaba.datax + mysqlwriter + 0.0.1-SNAPSHOT + compile + + + + mysql + mysql-connector-java + 8.0.31 + + + + + \ No newline at end of file diff --git a/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestCreateData.java b/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestCreateData.java new file mode 100644 index 00000000..48f9b7ce --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestCreateData.java @@ -0,0 +1,147 @@ +package com.alibaba.datax.example.iotdb; + +import org.apache.iotdb.isession.util.Version; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.session.Session; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.*; + +public class TestCreateData { + private static Session session; + private static Random random = new Random(); + + @Test + public void createAndInsert() + throws IoTDBConnectionException, StatementExecutionException { + // 创建测试数据 + // session init + session = + new Session.Builder() + .host("192.168.150.100") + // .host("172.20.31.6") + .port(6667) + .username("root") + .password("root") + .version(Version.V_0_13) + .build(); + + // open session, close RPCCompression + session.open(false); + + // set session fetchSize + session.setFetchSize(10000); + + // 创建测点并插入数据 + String filePath = "src/test/resources/testData.txt"; + String database = "root.cgn"; + try { + session.createDatabase(database); + } catch (StatementExecutionException e) { + if (e.getStatusCode() != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { + throw e; + } + } + String device = "root.cgn.device"; + createAndInsert2(filePath, device); + } + + private static void createAndInsert2(String filePath, String device) + throws IoTDBConnectionException, StatementExecutionException { + // 读取文件(文件中无表头) + // 点的类型 点名 描述 量纲 量程下限 量程上限 + // AX L2KRT008MA 测试性描述 % 0 1.00E+02 + // AX L2ETY101MP 测试性描述 % 0 1.00E+02 + List> res = new ArrayList<>(); + try (BufferedReader br = new BufferedReader(new FileReader(filePath))) { + String line; + while ((line = br.readLine()) != null) { + String[] words = line.split("\\s+"); + List wordList = new ArrayList<>(Arrays.asList(words)); + res.add(wordList); + } + } catch (IOException e) { + e.printStackTrace(); + } + + // 准备传入的参数,构造时间序列 + List paths = new ArrayList<>(); + List measurements = new ArrayList<>(); + List isDoubleList = new ArrayList<>(); + List tsDataTypes = new ArrayList<>(); + List tsEncodings = new ArrayList<>(); + List compressionTypes = new ArrayList<>(); + List> tagsList = new ArrayList<>(); + List> attributesList = new ArrayList<>(); + List alias = new ArrayList<>(); + + for (int i = 0; i < res.size(); i++) { + measurements.add(res.get(i).get(1)); + paths.add(device + "." + res.get(i).get(1)); + boolean isDouble = "AX".equals(res.get(i).get(0)); + isDoubleList.add(isDouble); + tsDataTypes.add(isDouble ? TSDataType.DOUBLE : TSDataType.BOOLEAN); + tsEncodings.add(isDouble ? TSEncoding.GORILLA : TSEncoding.RLE); + compressionTypes.add(CompressionType.SNAPPY); + Map attributes = new HashMap<>(); + attributes.put("描述", "测试性描述"); + attributes.put("量纲", isDouble ? "%" : ""); + attributes.put("量程下限", "0"); + attributes.put("量程上限", isDouble ? "1.00E+02" : "2.00E+02"); + attributesList.add(attributes); + } + + // 先删除已有的时间序列 + if (session.checkTimeseriesExists(device + ".**")) { + session.deleteTimeseries(device + ".**"); + System.out.println("删除已有的时间序列完成=============="); + } + + // 创建测点时间序列 + session.createMultiTimeseries( + paths, tsDataTypes, tsEncodings, compressionTypes, null, null, attributesList, null); + + // 插入数据:每个测点里都写1万条数据,时间间隔1秒 + List> measurementsList = new ArrayList<>(); + List> valuesList = new ArrayList<>(); + List timestamps = new ArrayList<>(); + List> typesList = new ArrayList<>(); + + long startTime = + LocalDateTime.of(2024, 1, 1, 0, 0, 0, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli(); + int count = 10000; // 每个测点插入数据条数 + + for (long time = startTime; count >= 0; time += 1000, count--) { + timestamps.add(time); + measurementsList.add(measurements); // 39个测点 + typesList.add(tsDataTypes); + + List randomValue = new ArrayList<>(); + for (Boolean isDouble : isDoubleList) { + randomValue.add(isDouble ? random.nextDouble() * 100 : random.nextBoolean()); + } + valuesList.add(randomValue); // 39个随机数 + + // 每1000次插入一批数据 + if (count != 10000 && count % 1000 == 0) { + session.insertRecordsOfOneDevice( + device, timestamps, measurementsList, typesList, valuesList); + measurementsList.clear(); + valuesList.clear(); + typesList.clear(); + timestamps.clear(); + valuesList.clear(); + } + } + } +} diff --git a/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestIoTDB.java b/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestIoTDB.java new file mode 100644 index 00000000..9479373b --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestIoTDB.java @@ -0,0 +1,64 @@ +package com.alibaba.datax.example.iotdb; + +import com.alibaba.datax.example.ExampleContainer; +import com.alibaba.datax.example.util.PathUtil; +import org.junit.Test; + +public class TestIoTDB { + + @Test + public void testIoTDBReader2MySQLWriter() { + String path = "/iotdb2mysql.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + ExampleContainer.start(jobPath); + } + + @Test + public void testMySQLReader2IoTDBWriter() { + String path = "/mysql2iotdb.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + ExampleContainer.start(jobPath); + } + + @Test + public void testIoTDBReader2txtWriter() { + String path = "/iotdb2txt.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + ExampleContainer.start(jobPath); + } + + @Test + public void testIoTDBReader2StreamWriter() { + String path = "/iotdb2stream.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + ExampleContainer.start(jobPath); + } + + @Test + public void testMySQLReader2StreamWriter() { + String path = "/mysql2stream.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + ExampleContainer.start(jobPath); + } + + @Test + public void testMySQLReader2txtWriter() { + String path = "/mysql2txt.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + ExampleContainer.start(jobPath); + } + + @Test + public void testStreamReader2TxtWriter() { + String path = "/stream2txt.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + ExampleContainer.start(jobPath); + } + + @Test + public void testStreamReader2StreamWriter() { + String path = "/stream2stream.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + ExampleContainer.start(jobPath); + } +} diff --git a/datax-example/datax-example-iotdb/src/test/resources/iotdb2mysql.json b/datax-example/datax-example-iotdb/src/test/resources/iotdb2mysql.json new file mode 100644 index 00000000..b0c0e952 --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/resources/iotdb2mysql.json @@ -0,0 +1,58 @@ +{ + "job": { + "setting": { + "speed": { + "channel": 3 + } + }, + "content": [ + { + "reader": { + "name": "iotdbreader", + "parameter": { + "username": "root", + "password": "root", + "host": "192.168.150.100", + "port": 6667, + "fetchSize": 10000, + "version": "V_1_0", + "timeColumnPosition": 0, + "finalSqls":[ + ], + "device": "root.cgn.device", + "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", + "beginDateTime": "2023-03-07 12:00:00", + "endDateTime": "2024-03-07 19:00:00", + "where": "" + } + }, + "writer": { + "name": "mysqlwriter", + "parameter": { + "username": "root", + "password": "toy123", + "writeMode": "insert", + "#需要提前建表": "CREATE TABLE device (`time` BIGINT,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);", + "column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "session": [ + "set session sql_mode='ANSI'" + ], + "preSql": [ + "delete from device" + ], + "connection": [ + { + "table": [ + "device" + ], + "#": "下面的URL需要把中括号去掉,否则报错,mysqlreader的bug,未修改", + "jdbcUrl": "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" + } + ] + } + } + } + ] + } +} + diff --git a/datax-example/datax-example-iotdb/src/test/resources/iotdb2stream.json b/datax-example/datax-example-iotdb/src/test/resources/iotdb2stream.json new file mode 100644 index 00000000..cabc54eb --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/resources/iotdb2stream.json @@ -0,0 +1,44 @@ +{ + "job": { + "setting": { + "speed": { + "channel": 3 + }, + "errorLimit": { + "record": 0, + "percentage": 0.02 + } + }, + "content": [ + { + "reader": { + "name": "iotdbreader", + "parameter": { + "username": "root", + "password": "root", + "host": "192.168.150.100", + "port": 6667, + "fetchSize": 10000, + "version": "V_1_0", + "timeColumnPosition": 0, + "finalSqls":[ + + ], + "device": "root.cgn.device", + "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", + "beginDateTime": "2023-03-07 12:00:00", + "endDateTime": "2024-03-07 19:00:00", + "where": "" + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "print":true + } + } + } + ] + } +} + diff --git a/datax-example/datax-example-iotdb/src/test/resources/iotdb2txt.json b/datax-example/datax-example-iotdb/src/test/resources/iotdb2txt.json new file mode 100644 index 00000000..c962e56d --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/resources/iotdb2txt.json @@ -0,0 +1,44 @@ +{ + "job": { + "setting": { + "speed": { + "channel": 3 + } + }, + "content": [ + { + "reader": { + "name": "iotdbreader", + "parameter": { + "username": "root", + "password": "root", + "host": "192.168.150.100", + "port": 6667, + "fetchSize": 10000, + "version": "V_1_0", + "timeColumnPosition": 0, + "finalSqls":[ + "select * from root.cgn.device", + "select A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU from root.cgn.device" + ], + "device": "root.cgn.device", + "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", + "beginDateTime": "2023-03-07 12:00:00", + "endDateTime": "2024-03-07 19:00:00", + "where": "" + } + }, + "writer": { + "name": "txtfilewriter", + "parameter": { + "path": "D:/下载", + "fileName": "txtText", + "writeMode": "truncate", + "dateFormat": "yyyy-MM-dd" + } + } + } + ] + } +} + diff --git a/datax-example/datax-example-iotdb/src/test/resources/mysql2iotdb.json b/datax-example/datax-example-iotdb/src/test/resources/mysql2iotdb.json new file mode 100644 index 00000000..9f7f1d4c --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/resources/mysql2iotdb.json @@ -0,0 +1,49 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "username": "root", + "password": "toy123", + "column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "splitPk": "", + "connection": [ + { + "table": [ + "device" + ], + "jdbcUrl": [ + "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" + ] + } + ] + } + }, + "writer": { + "name": "iotdbwriter", + "parameter": { + "username": "root", + "password": "root", + "host": "192.168.150.100", + "port": 6667, + "fetchSize": 10000, + "version": "V_1_0", + "##": "Reader中时间列的位置,默认0列", + "timeColumnPosition": 0, + "insertBatchSize": 1000, + "device": "root.cgn.device", + "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", + "deleteExistTimeseries": false + } + } + } + ], + "setting": { + "speed": { + "channel": 3 + } + } + } +} \ No newline at end of file diff --git a/datax-example/datax-example-iotdb/src/test/resources/mysql2stream.json b/datax-example/datax-example-iotdb/src/test/resources/mysql2stream.json new file mode 100644 index 00000000..5414a5f4 --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/resources/mysql2stream.json @@ -0,0 +1,46 @@ +{ + "job": { + "setting": { + "speed": { + "channel": 3 + }, + "errorLimit": { + "record": 0, + "percentage": 0.02 + } + }, + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "username": "root", + "password": "toy123", + "column": [ + "order_num", + "cust_id" + ], + "splitPk": "", + "connection": [ + { + "table": [ + "orders" + ], + "jdbcUrl": [ + "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" + ] + } + ] + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "print":true + } + } + } + ] + } +} + diff --git a/datax-example/datax-example-iotdb/src/test/resources/mysql2txt.json b/datax-example/datax-example-iotdb/src/test/resources/mysql2txt.json new file mode 100644 index 00000000..879bcbc0 --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/resources/mysql2txt.json @@ -0,0 +1,41 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "username": "root", + "password": "toy123", + "column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "splitPk": "", + "connection": [ + { + "table": [ + "device" + ], + "jdbcUrl": [ + "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" + ] + } + ] + } + }, + "writer": { + "name": "txtfilewriter", + "parameter": { + "path": "D:/下载", + "fileName": "txtText", + "writeMode": "truncate", + "dateFormat": "yyyy-MM-dd" + } + } + } + ], + "setting": { + "speed": { + "channel": 5 + } + } + } +} \ No newline at end of file diff --git a/datax-example/datax-example-iotdb/src/test/resources/stream2stream.json b/datax-example/datax-example-iotdb/src/test/resources/stream2stream.json new file mode 100644 index 00000000..0193d822 --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/resources/stream2stream.json @@ -0,0 +1,36 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "sliceRecordCount": 10, + "column": [ + { + "type": "long", + "value": "15" + }, + { + "type": "string", + "value": "hello,你好,世界-DataX" + } + ] + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "encoding": "UTF-8", + "print": true + } + } + } + ], + "setting": { + "speed": { + "channel": 5 + } + } + } +} \ No newline at end of file diff --git a/datax-example/datax-example-iotdb/src/test/resources/stream2txt.json b/datax-example/datax-example-iotdb/src/test/resources/stream2txt.json new file mode 100644 index 00000000..40fd9e29 --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/resources/stream2txt.json @@ -0,0 +1,38 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "sliceRecordCount": 10, + "column": [ + { + "type": "long", + "value": "10" + }, + { + "type": "string", + "value": "hello,你好,世界-DataX" + } + ] + } + }, + "writer": { + "name": "txtfilewriter", + "parameter": { + "path": "D:/下载", + "fileName": "txtText", + "writeMode": "truncate", + "dateFormat": "yyyy-MM-dd" + } + } + } + ], + "setting": { + "speed": { + "channel": 5 + } + } + } +} \ No newline at end of file diff --git a/datax-example/datax-example-iotdb/src/test/resources/testData.txt b/datax-example/datax-example-iotdb/src/test/resources/testData.txt new file mode 100644 index 00000000..d0588b20 --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/resources/testData.txt @@ -0,0 +1,39 @@ +AX L2KRT008MA 测试性描述 % 0 1.00E+02 +AX L2ETY101MP 测试性描述 % 0 1.00E+02 +AX L2SEC002MD 测试性描述 % 0 1.00E+02 +AX L1ETY103MP 测试性描述 % 0 1.00E+02 +AX D1KRT003EU 测试性描述 % 0 1.00E+02 +AX L4EAS008MDX_XQ01 测试性描述 % 0 1.00E+02 +AX L3RIC039KMX_XQ01 测试性描述 % 0 1.00E+02 +AX L2VVP008MP 测试性描述 % 0 1.00E+02 +AX D2RCV039MD 测试性描述 % 0 1.00E+02 +DX D2RPA063EC 测试性描述 0 2.00E+00 +AX L4KRT022MAK_XP01 测试性描述 % 0 1.00E+02 +AX L3SEC002MPX_XQ01 测试性描述 % 0 1.00E+02 +DX L4RCP651KSX_XG52 测试性描述 0 2.00E+00 +AX L2RRI082MT 测试性描述 % 0 1.00E+02 +AX A5STD 测试性描述 % 0 1.00E+02 +DX D2RRI003PO 测试性描述 0 2.00E+00 +AX L4RCP091MT_XQ01 测试性描述 % 0 1.00E+02 +DX L1GCT132SM5 测试性描述 0 2.00E+00 +DX L1GCT132SM3 测试性描述 0 2.00E+00 +AX L2RIC035MT 测试性描述 % 0 1.00E+02 +AX L1RGL018QM 测试性描述 % 0 1.00E+02 +AX L1RIC037MT 测试性描述 % 0 1.00E+02 +AX D2RCP042MD 测试性描述 % 0 1.00E+02 +AX D2RGL001QM 测试性描述 % 0 1.00E+02 +AX D2RIC020MT 测试性描述 % 0 1.00E+02 +AX D1RIC022MT 测试性描述 % 0 1.00E+02 +AX D1RGL003QM 测试性描述 % 0 1.00E+02 +AX L3LGE005TU_XQ01 测试性描述 % 0 1.00E+02 +DX D2RCP517EC 测试性描述 0 2.00E+00 +AX D1RCP054MD 测试性描述 % 0 1.00E+02 +AX L2RIS014MD 测试性描述 % 0 1.00E+02 +DX D1LGE001JA 测试性描述 0 2.00E+00 +DX D2SEC001PO 测试性描述 0 2.00E+00 +DX D1SEC003PO 测试性描述 0 2.00E+00 +AX D1RIS001MD 测试性描述 % 0 1.00E+02 +AX L1RCP090MT 测试性描述 % 0 1.00E+02 +DX L2VVP003SM5 测试性描述 0 2.00E+00 +AX L3RIC022KMX_XQ01 测试性描述 % 0 1.00E+02 +DX L1GCT501EC 测试性描述 0 2.00E+00 \ No newline at end of file diff --git a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java index 297e4445..1f446139 100644 --- a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java +++ b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java @@ -74,7 +74,7 @@ public class IoTDBReader extends Reader { Configuration clone = this.jobConf.clone(); // TODO 同时读取多个设备?有没有必要? String device = this.jobConf.getString(Key.DEVICE); - // 测点是一个逗号分隔的字符串或"*" + // 测点是一个逗号分隔的测点字符串或"*" String measurements = this.jobConf.getString(Key.MEASUREMENTS); String beginDateTime = this.jobConf.getString(Key.BEGIN_DATETIME); String endDateTime = this.jobConf.getString(Key.END_DATETIME); @@ -130,8 +130,7 @@ public class IoTDBReader extends Reader { public static class Task extends Reader.Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); - private Configuration readerSliceConfig; - private String mandatoryEncoding; + private Configuration taskConf; /** * IoTDB原生读写工具 @@ -149,7 +148,7 @@ public class IoTDBReader extends Reader { @Override public void init() { // 获取与本Task相关的配置,是Job的split方法返回的配置列表中的其中一个。 - Configuration taskConf = super.getPluginJobConf(); + taskConf = super.getPluginJobConf(); // session init session = diff --git a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReaderErrorCode.java b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReaderErrorCode.java index d46baa09..96fa2ff6 100644 --- a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReaderErrorCode.java +++ b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReaderErrorCode.java @@ -6,8 +6,7 @@ public enum IoTDBReaderErrorCode implements ErrorCode { REQUIRED_VALUE("IoTDBReader-00", "parameter value is missing"), ILLEGAL_VALUE("IoTDBReader-01", "invalid parameter value"), - CONNECTION_FAILED("IoTDBReader-02", "connection error"), - RUNTIME_EXCEPTION("IoTDBWriter-03", "runtime exception"); + CONNECTION_FAILED("IoTDBReader-02", "connection error"); private final String code; private final String description; diff --git a/iotdbreader/src/main/resources/plugin_job_template.json b/iotdbreader/src/main/resources/plugin_job_template.json index c727d8d7..6639a103 100644 --- a/iotdbreader/src/main/resources/plugin_job_template.json +++ b/iotdbreader/src/main/resources/plugin_job_template.json @@ -7,9 +7,10 @@ "port": 6667, "fetchSize": 10000, "version": "V_1_0", + "##": "时间列插入DataX的Record中的位置,默认第0列", "timeColumnPosition": 0, + "##":"写了finalSqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句", "finalSqls": [ - "写了这个默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句", "select * from root.cgn.device", "select * from root.cgn.device", "select * from root.cgn.device", @@ -18,6 +19,7 @@ ], "device": "root.cgn.device", "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", + "##": "开始时间必填,否则SQL错误", "beginDateTime": "2023-03-07 12:00:00", "endDateTime": "2024-03-07 19:00:00", "where": "" diff --git a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java index f4f424c2..ffd6159e 100644 --- a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java +++ b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java @@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.writer.iotdbwriter; import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.spi.Writer; @@ -15,56 +16,101 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; public class IoTDBWriter extends Writer { public static class Job extends Writer.Job { private static final Logger LOG = LoggerFactory.getLogger(Job.class); - private Configuration originalConfig; + private Configuration jobConf; @Override public void init() { - this.originalConfig = super.getPluginJobConf(); - // TODO 检查配置文件参数 - + this.jobConf = super.getPluginJobConf(); + // 检查各种参数是否正确 + String username = this.jobConf.getString(Key.USERNAME); + if (username == null || username.isEmpty()) { + throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.USERNAME + "] is not set."); + } + String password = this.jobConf.getString(Key.PASSWORD); + if (password == null || password.isEmpty()) { + throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PASSWORD + "] is not set."); + } + String host = this.jobConf.getString(Key.HOST); + if (host == null || host.isEmpty()) { + throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.HOST + "] is not set."); + } + String port = this.jobConf.getString(Key.PORT); + if (port == null || port.isEmpty()) { + throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PORT + "] is not set."); + } + String fetchSize = this.jobConf.getString(Key.FETCH_SIZE); + if (fetchSize == null || fetchSize.isEmpty()) { + throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.FETCH_SIZE + "] is not set."); + } + // 还有一部分参数没检查,没必要了。 } @Override - public void destroy() { - + public void prepare(){ + // 写入前准备,IOTDB不需要提前创建表。 } @Override public List split(int mandatoryNumber) { - // TODO 根据什么拆分Task? List configs = new ArrayList<>(); + // 根据源端划分多个task,每个写task对应一个读task,并行插入下放到session批次处理。 for (int i = 0; i < mandatoryNumber; i++) { - configs.add(originalConfig); + Configuration clone = this.jobConf.clone(); + configs.add(clone); } + LOG.info("configs: {}", configs); return configs; } + + @Override + public void destroy() { + } + } public static class Task extends Writer.Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); - private Configuration writerSliceConfig; - private TaskPluginCollector taskPluginCollector; - + private Configuration taskConf; + /** + * IoTDB原生读写工具 + */ private Session session; + + /** + * 是否在插入前删除已有的时间序列?默认false + */ + private boolean deleteExistTimeseries; + + /** + * 插入批次大小 + */ + private int insertBatchSize; + + /** + * IoTDB中的时间列插入的位置,默认为0,即第一列。 + */ + private int timeColumnPosition; + @Override public void init() { - this.writerSliceConfig = getPluginJobConf(); - this.taskPluginCollector = super.getTaskPluginCollector(); + // 获取与本Task相关的配置,是Job的split方法返回的配置列表中的其中一个。 + this.taskConf = super.getPluginJobConf(); // session init session = new Session.Builder() - .host("192.168.150.100") - .port(6667) - .username("root") - .password("root") - .version(Version.V_0_13) + .host(taskConf.getString(Key.HOST)) + .port(taskConf.getInt(Key.PORT)) + .username(taskConf.getString(Key.USERNAME)) + .password(taskConf.getString(Key.PASSWORD)) + .version(Version.valueOf(taskConf.getString(Key.VERSION))) .build(); // open session, close RPCCompression try { @@ -74,13 +120,31 @@ public class IoTDBWriter extends Writer { } // set session fetchSize - session.setFetchSize(10000); + session.setFetchSize(taskConf.getInt(Key.FETCH_SIZE)); - // // 先删除已有的时间序列 - // if (session.checkTimeseriesExists(device + ".**")) { - // session.deleteTimeseries(device + ".**"); - // System.out.println("删除已有的时间序列完成=============="); - // } + // 获取参数,否则默认值 + insertBatchSize = (taskConf.getInt(Key.INSERT_BATCH_SIZE) == null) ? 1000 : taskConf.getInt(Key.INSERT_BATCH_SIZE); + timeColumnPosition = (taskConf.getInt(Key.TIME_COLUMN_POSITION) == null) ? 0 : taskConf.getInt(Key.TIME_COLUMN_POSITION); + deleteExistTimeseries = (taskConf.getBool(Key.DELETE_EXIST_TIMESERIES) == null) ? false : taskConf.getBool(Key.DELETE_EXIST_TIMESERIES); + } + + @Override + public void prepare() { + // 是否先删除已有的时间序列 + try { + if (deleteExistTimeseries){ + if (session.checkTimeseriesExists(taskConf.getString(Key.DEVICE) + ".**")) { + session.deleteTimeseries(taskConf.getString(Key.DEVICE) + ".**"); + LOG.info("===========删除已有的时间序列完成=============="); + }else { + LOG.info("===========不存在已有时间序列=============="); + } + } + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + + // TODO 是否创建测点时间序列? } @Override @@ -94,72 +158,90 @@ public class IoTDBWriter extends Writer { } } + /** + * 从RecordReceiver中读取数据,写入目标数据源。 + * @param lineReceiver 数据来自Reader和Writer之间的缓存队列。 + */ @Override public void startWrite(RecordReceiver lineReceiver) { - // 暂时实现往一个设备中插入数据(也就是类似一个表) - // 插入1条的原因是这里的只读了一次。 + // 往一个设备device中插入数据 Record record = null; - for (int count = 1; (record = lineReceiver.getFromReader()) != null; count++) { - System.out.println(record); - int columnNums = record.getColumnNumber(); - // 先实现一条条插入 - String device = "root.test.device2"; - List measurements = new ArrayList<>(); // TODO 这个好像没传过来。 - List types = new ArrayList<>(); - // List values = new ArrayList<>(); - List values = new ArrayList<>(); - for (int i = 0; i < columnNums; i++) { - measurements.add("ss" + i); // 没传过来先用这个 - Column column = record.getColumn(i); - // values.add(column.getRawData()); - values.add(column.getRawData().toString()); - // TODO 需要测试一下 - switch (column.getType()) { - case BOOL: - types.add(TSDataType.BOOLEAN); - break; - case INT: - types.add(TSDataType.INT32); - break; - case LONG: - types.add(TSDataType.INT64); - break; - case DOUBLE: - types.add(TSDataType.DOUBLE); - break; - case STRING: - types.add(TSDataType.STRING); - break; - case DATE: - types.add(TSDataType.DATE); - break; - default: - throw new RuntimeException("unsupported type:" + column.getType()); + try{ + // 构建List,一个设备批量写入 + String device = taskConf.getString(Key.DEVICE); + List timestamps = new ArrayList<>(); + List> measurementsList = new ArrayList<>(); + List> valuesList = new ArrayList<>(); + List> typesList = new ArrayList<>(); + + // 获取Record记录,传输结束返回null + int count; // 统计插入记录数 + for (count = 0; (record = lineReceiver.getFromReader()) != null; count++) { + // LOG.info("record:" + record); + // 处理时间列 + timestamps.add(record.getColumn(timeColumnPosition).asLong()); + // 处理测点 + List measurements = Arrays.asList(taskConf.getString(Key.MEASUREMENTS).split(",")); + measurementsList.add(measurements); + // 处理类型和值 + List types = new ArrayList<>(); + List values = new ArrayList<>(); + // List values = new ArrayList<>(); + for (int i = 0; i < record.getColumnNumber(); i++) { + if (i == timeColumnPosition){ + continue; // 跳过时间列 + } + Column col = record.getColumn(i); + switch (col.getType()) { + case BOOL: + types.add(TSDataType.BOOLEAN); + values.add(col.asBoolean()); + break; + case INT: + types.add(TSDataType.INT32); + values.add((Integer) col.getRawData()); + break; + case LONG: + types.add(TSDataType.INT64); + values.add(col.asLong()); + break; + case DOUBLE: + types.add(TSDataType.DOUBLE); + values.add(col.asDouble()); + break; + case STRING: + types.add(TSDataType.STRING); + values.add(col.asString()); + break; + case DATE: + types.add(TSDataType.DATE); + values.add(col.asDate()); + break; + default: + throw new RuntimeException("unsupported type:" + col.getType()); + } + } + typesList.add(types); + valuesList.add(values); + + if (count != 0 && count % insertBatchSize == 0) { + session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList); + LOG.info("已插入"+count+"条数据"); + measurementsList.clear(); + valuesList.clear(); + typesList.clear(); + timestamps.clear(); + valuesList.clear(); } } - long time = System.currentTimeMillis(); - try { - - // // 创建测点时间序列 - // session.createMultiTimeseries( - // paths, tsDataTypes, tsEncodings, compressionTypes, null, null, attributesList, null); - - // 这个插入失败(报错) - // WARN o.a.i.d.u.ErrorHandlingUtils:65 - - // Status code: EXECUTE_STATEMENT_ERROR(301), operation: insertRecord failed - // java.lang.ArrayIndexOutOfBoundsException: 11 - // session.insertRecord(device, time, measurements, types, values); - // 这个插入成功,record读取一次只有一条数据,需要循环读取。 - session.insertRecord(device, time, measurements,values); - } catch (IoTDBConnectionException | StatementExecutionException e) { - throw new RuntimeException(e); + if (!timestamps.isEmpty()){ + session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList); + LOG.info("已插入剩余数据:" + timestamps.size() + "条"); } - - // TODO 构建List,批量写入 - // session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList); - + LOG.info("已插入所有数据:" + (count-1) + "条"); + }catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); } - } } } diff --git a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriterErrorCode.java b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriterErrorCode.java index 666fcd75..cfedb8fc 100644 --- a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriterErrorCode.java +++ b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriterErrorCode.java @@ -2,12 +2,14 @@ package com.alibaba.datax.plugin.writer.iotdbwriter; import com.alibaba.datax.common.spi.ErrorCode; -public class IoTDBWriterErrorCode implements ErrorCode { +public enum IoTDBWriterErrorCode implements ErrorCode { + + REQUIRED_VALUE("IoTDBWriter-00", "parameter value is missing"); private final String code; private final String description; - public IoTDBWriterErrorCode(String code, String description) { + IoTDBWriterErrorCode(String code, String description) { this.code = code; this.description = description; } diff --git a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/Key.java b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/Key.java new file mode 100644 index 00000000..07b2f96d --- /dev/null +++ b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/Key.java @@ -0,0 +1,15 @@ +package com.alibaba.datax.plugin.writer.iotdbwriter; + +public class Key { + public static final String USERNAME = "username"; + public static final String PASSWORD = "password"; + public static final String HOST = "host"; + public static final String PORT = "port"; + public static final String FETCH_SIZE = "fetchSize"; + public static final String VERSION = "version"; + public static final String TIME_COLUMN_POSITION = "timeColumnPosition"; + public static final String DEVICE = "device"; + public static final String MEASUREMENTS = "measurements"; + public static final String DELETE_EXIST_TIMESERIES = "deleteExistTimeseries"; + public static final String INSERT_BATCH_SIZE = "insertBatchSize"; +} diff --git a/iotdbwriter/src/main/resources/plugin_job_template.json b/iotdbwriter/src/main/resources/plugin_job_template.json index 973b6b56..524cc4ab 100644 --- a/iotdbwriter/src/main/resources/plugin_job_template.json +++ b/iotdbwriter/src/main/resources/plugin_job_template.json @@ -2,19 +2,16 @@ "name": "iotdbwriter", "parameter": { "username": "root", - "password": "toy123", - "column": [ - "" - ], - "connection": [ - { - "table": [ - "" - ], - "jdbcUrl": "" - } - ], - "batchSize": 1000, - "ignoreTagsUnmatched": true + "password": "root", + "host": "192.168.150.100", + "port": 6667, + "fetchSize": 10000, + "version": "V_1_0", + "##": "注意是Reader插件读取到的数据中时间列的位置,不是该插件,默认0列", + "timeColumnPosition": 0, + "insertBatchSize": 1000, + "device": "root.cgn.device", + "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", + "deleteExistTimeseries": false } } \ No newline at end of file From 1235819d6226f6aa1654a5dc30ca40c99dc38435 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E6=B5=A9=E7=84=B6?= Date: Mon, 24 Jun 2024 19:28:16 +0800 Subject: [PATCH 4/7] =?UTF-8?q?=E5=A2=9E=E5=8A=A0IoTDB=E8=AF=BB=E5=86=99?= =?UTF-8?q?=E6=96=87=E6=A1=A3=E5=B9=B6=E4=BF=AE=E6=94=B9=E4=BA=86=E4=B8=80?= =?UTF-8?q?=E4=BA=9B=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- iotdbreader/doc/iotdbreader-CN.md | 220 +++++++++++++++++- iotdbwriter/doc/iotdbwriter-CN.md | 189 +++++++++++++++ .../writer/iotdbwriter/IoTDBWriter.java | 9 +- .../datax/plugin/writer/iotdbwriter/Key.java | 1 - .../main/resources/plugin_job_template.json | 1 - 5 files changed, 408 insertions(+), 12 deletions(-) diff --git a/iotdbreader/doc/iotdbreader-CN.md b/iotdbreader/doc/iotdbreader-CN.md index d4b2d70f..88278529 100644 --- a/iotdbreader/doc/iotdbreader-CN.md +++ b/iotdbreader/doc/iotdbreader-CN.md @@ -1,4 +1,220 @@ +# DataX IoTDBReader +## 1 快速介绍 -## 快速介绍 -iotdbreader用来读取iotdb中的数据,然后传输到其他数据库。 +IoTDBReader 插件实现了 IoTDB 读取数据的功能。 + +## 2 实现原理 + +IoTDBReader 通过 IoTDB 的 原生java session 查询获取数据。 + +## 3 功能说明 + +### 3.1 配置样例 + +* 配置一个从 IoTDB 抽取数据作业: + +```json +{ + "job": { + "setting": { + "speed": { + "channel": 3 + } + }, + "content": [ + { + "reader": { + "name": "iotdbreader", + "parameter": { + "username": "root", + "password": "root", + "host": "192.168.150.100", + "port": 6667, + "fetchSize": 10000, + "version": "V_1_0", + "timeColumnPosition": 0, + "finalSqls":[ + ], + "device": "root.cgn.device", + "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", + "beginDateTime": "2023-03-07 12:00:00", + "endDateTime": "2024-03-07 19:00:00", + "where": "" + } + }, + "writer": { + "name": "mysqlwriter", + "parameter": { + "username": "root", + "password": "toy123", + "writeMode": "insert", + "#需要提前建表": "CREATE TABLE device (`time` BIGINT,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);", + "column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "session": [ + "set session sql_mode='ANSI'" + ], + "preSql": [ + "delete from device" + ], + "connection": [ + { + "table": [ + "device" + ], + "#": "下面的URL需要把中括号去掉,否则报错,mysqlreader的bug,未修改", + "jdbcUrl": "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" + } + ] + } + } + } + ] + } +} +``` + +* 配置一个自定义 SQL 的数据抽取作业: + +```json +{ + "job": { + "setting": { + "speed": { + "channel": 3 + } + }, + "content": [ + { + "reader": { + "name": "iotdbreader", + "parameter": { + "username": "root", + "password": "root", + "host": "192.168.150.100", + "port": 6667, + "fetchSize": 10000, + "version": "V_1_0", + "timeColumnPosition": 0, + "finalSqls":[ + "select * from root.cgn.device", + "select A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU from root.cgn.device" + ], + "device": "", + "measurements": "", + "beginDateTime": "", + "endDateTime": "", + "where": "" + } + }, + "writer": { + "name": "txtfilewriter", + "parameter": { + "path": "D:/下载", + "fileName": "txtText", + "writeMode": "truncate", + "dateFormat": "yyyy-MM-dd" + } + } + } + ] + } +} +``` + +### 3.2 参数说明 +* username + * 描述:用户名 + * 必选:是 + * 默认值:无 +* password + * 描述:用户名的密码 + * 必选:是 + * 默认值:无 +* host + * 描述:连接iotdb数据库的主机地址 + * 必选:是 + * 默认值:无 +* port + * 描述:端口 + * 必选:是 + * 默认值:无 +* version + * 描述:iotdb版本 + * 必选:是 + * 默认值:无 +* timeColumnPosition + * 描述:时间列在Record中列的位置 + * 必选:否 + * 默认值:0 +* finalSqls + * 描述:直接写多行SQL,可以并行读取,此时下面的参数失效。 + * 必选:否 + * 默认值: +* device + * 描述:IoTDB中的概念,可理解为mysql中的表。 + * 必选:finalSqls为空时必选 + * 默认值:无 +* measurements + * 描述:IoTDB中的概念,可理解为mysql中的字段。 + * 必选:finalSqls为空时必选 + * 默认值:无 +* beginDateTime + * 描述:SQL查询时的数据的开始时间 + * 必选:finalSqls为空时必选 + * 默认值:无 +* measurements + * 描述:SQL查询时的数据的结束时间 + * 必选:否 + * 默认值:无 +* where + * 描述:额外的条件 + * 必选:否 + * 默认值:无 + +### 3.3 类型转换 + +| IoTDB 数据类型 | DataX 内部类型 | +|-----------------|------------| +| INT32 | Int | +| INT64,TIMESTAMP | Long | +| FLOAT | FLOAT | +| DOUBLE | Double | +| BOOLEAN | Bool | +| DATE | Date | +| STRING,TEXT | String | + +## 4 性能报告 + +### 4.1 环境准备 + +#### 4.1.1 数据特征 + +#### 4.1.2 机器参数 + +#### 4.1.3 DataX jvm 参数 + + -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError + +### 4.2 测试报告 + +#### 4.2.1 单表测试报告 + +| 通道数| DataX速度(Rec/s)|DataX流量(MB/s)| DataX机器网卡流出流量(MB/s)|DataX机器运行负载|DB网卡进入流量(MB/s)|DB运行负载|DB TPS| +|--------| --------|--------|--------|--------|--------|--------|--------| +|1| | | | | | | | +|4| | | | | | | | +|8| | | | | | | | +|16| | | | | | | | +|32| | | | | | | | + +说明: + +#### 4.2.4 性能测试小结 + +1. +2. + +## 5 约束限制 + +## FAQ \ No newline at end of file diff --git a/iotdbwriter/doc/iotdbwriter-CN.md b/iotdbwriter/doc/iotdbwriter-CN.md index e69de29b..7921b233 100644 --- a/iotdbwriter/doc/iotdbwriter-CN.md +++ b/iotdbwriter/doc/iotdbwriter-CN.md @@ -0,0 +1,189 @@ +# DataX IoTDBWriter + +## 1 快速介绍 +IoTDBWriter插件实现了写入数据到IoTDB数据库目标表(设备)的功能。 + +底层实现上,IoTDBWriter通过iotdb.session连接IoTDB,按照IoTDB的SQL语法, +执行session.insertRecordsOfOneDevice语句,将数据写入IoTDB。 + +IoTDBWriter可以作为数据迁移工具供DBA将其它数据库的数据导入到IoTDB。 + +## 2 实现原理 + +IoTDBWriter 通过 DataX 框架获取 Reader 生成的协议数据Record,通过Session连接IoTDB,执行insert语句,将数据写入IoTDB。 + +IoTDB中设备与列的概念见IoTDB官方文档。 + + + +## 3 功能说明 +### 3.1 配置样例 + +配置一个MySQL数据写入IoTDB的作业 + +使用下面的Job配置,将数据写入IoTDB: + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "username": "root", + "password": "toy123", + "column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "splitPk": "", + "connection": [ + { + "table": [ + "device" + ], + "jdbcUrl": [ + "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" + ] + } + ] + } + }, + "writer": { + "name": "iotdbwriter", + "parameter": { + "username": "root", + "password": "root", + "host": "192.168.150.100", + "port": 6667, + "version": "V_1_0", + "##": "Reader中时间列的位置,默认0列", + "timeColumnPosition": 0, + "insertBatchSize": 1000, + "device": "root.cgn.device", + "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", + "deleteExistTimeseries": false + } + } + } + ], + "setting": { + "speed": { + "channel": 3 + } + } + } +} +``` + +### 3.2 参数说明 + +* username + * 描述:用户名 + * 必选:是 + * 默认值:无 +* password + * 描述:用户名的密码 + * 必选:是 + * 默认值:无 +* host + * 描述:连接iotdb数据库的主机地址 + * 必选:是 + * 默认值:无 +* port + * 描述:端口 + * 必选:是 + * 默认值:无 +* version + * 描述:iotdb版本 + * 必选:是 + * 默认值:无 +* timeColumnPosition + * 描述:时间列在Record中列的位置 + * 必选:否 + * 默认值:0 +* device + * 描述:iotdb中的概念,对应mysql中的表名 + * 必选:是 + * 默认值:无 +* measurements + * 描述:iotdb中的概念,对应mysql中的字段集合,顺序应该与record中column的顺序相同 + * 必选:是 + * 默认值:无 +* batchSize + * 描述:每batchSize条record为一个batch进行写入 + * 必选:否 + * 默认值:1000 +* deleteExistTimeseries + * 描述:插入前是否删除该device下的所有数据 + * 必选:否 + * 默认值:false + +### 3.3 类型转换 + +datax中的数据类型,映射到IoTDB的数据类型 + +| DataX 内部类型 | IoTDB 数据类型 | +| -------------- |------------------| +| INT | INT32 | +| LONG | TIMESTAMP, INT64 | +| DOUBLE | DOUBLE | +| STRING | STRING | +| BOOL | BOOL | +| DATE | TIMESTAMP,DATE | +| BYTES | BINARY | + + + +### 3.4 各数据源到IoTDB的参考示例 +见datax-example/datax-example-iotdb + + +## 4 性能报告 + +### 4.1 环境准备 + +#### 4.1.1 数据特征 + +建表语句: + +单行记录类似于: + +#### 4.1.2 机器参数 + +* 执行DataX的机器参数为: + 1. cpu: + 2. mem: + 3. net: 千兆双网卡 + 4. disc: DataX 数据不落磁盘,不统计此项 + +* IoTDB数据库机器参数为: + 1. cpu: + 2. mem: + 3. net: 千兆双网卡 + 4. disc: + +#### 4.1.3 DataX jvm 参数 + + -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError + +### 4.2 测试报告 + +#### 4.2.1 单表测试报告 + +| 通道数 | DataX速度(Rec/s) | DataX流量(MB/s) | DataX机器网卡流出流量(MB/s) | DataX机器运行负载 | DB网卡进入流量(MB/s) | DB运行负载 | DB TPS | +| ------ | ---------------- | --------------- | --------------------------- | ----------------- | -------------------- | ---------- | ------ | +| 1 | | | | | | | | +| 4 | | | | | | | | +| 8 | | | | | | | | +| 16 | | | | | | | | +| 32 | | | | | | | | + + + +#### 4.2.4 性能测试小结 + + + + +## 5 约束限制 + + diff --git a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java index ffd6159e..2e4de1ec 100644 --- a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java +++ b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java @@ -45,10 +45,6 @@ public class IoTDBWriter extends Writer { if (port == null || port.isEmpty()) { throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PORT + "] is not set."); } - String fetchSize = this.jobConf.getString(Key.FETCH_SIZE); - if (fetchSize == null || fetchSize.isEmpty()) { - throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.FETCH_SIZE + "] is not set."); - } // 还有一部分参数没检查,没必要了。 } @@ -119,9 +115,6 @@ public class IoTDBWriter extends Writer { throw new RuntimeException(e); } - // set session fetchSize - session.setFetchSize(taskConf.getInt(Key.FETCH_SIZE)); - // 获取参数,否则默认值 insertBatchSize = (taskConf.getInt(Key.INSERT_BATCH_SIZE) == null) ? 1000 : taskConf.getInt(Key.INSERT_BATCH_SIZE); timeColumnPosition = (taskConf.getInt(Key.TIME_COLUMN_POSITION) == null) ? 0 : taskConf.getInt(Key.TIME_COLUMN_POSITION); @@ -144,7 +137,7 @@ public class IoTDBWriter extends Writer { throw new RuntimeException(e); } - // TODO 是否创建测点时间序列? + // 是否创建测点时间序列?不需要,IoTDB会自动创建时间序列。 } @Override diff --git a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/Key.java b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/Key.java index 07b2f96d..f702d979 100644 --- a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/Key.java +++ b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/Key.java @@ -5,7 +5,6 @@ public class Key { public static final String PASSWORD = "password"; public static final String HOST = "host"; public static final String PORT = "port"; - public static final String FETCH_SIZE = "fetchSize"; public static final String VERSION = "version"; public static final String TIME_COLUMN_POSITION = "timeColumnPosition"; public static final String DEVICE = "device"; diff --git a/iotdbwriter/src/main/resources/plugin_job_template.json b/iotdbwriter/src/main/resources/plugin_job_template.json index 524cc4ab..739cead1 100644 --- a/iotdbwriter/src/main/resources/plugin_job_template.json +++ b/iotdbwriter/src/main/resources/plugin_job_template.json @@ -5,7 +5,6 @@ "password": "root", "host": "192.168.150.100", "port": 6667, - "fetchSize": 10000, "version": "V_1_0", "##": "注意是Reader插件读取到的数据中时间列的位置,不是该插件,默认0列", "timeColumnPosition": 0, From a58f58f3e6841ef4a62ac2f33f02136db561860a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E6=B5=A9=E7=84=B6?= Date: Mon, 24 Jun 2024 19:32:38 +0800 Subject: [PATCH 5/7] =?UTF-8?q?=E6=A0=B8=E5=BF=83=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E5=92=8Cpom=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- datax-example/pom.xml | 5 +- mysqlwriter/pom.xml | 3 +- .../rdbms/reader/CommonRdbmsReader.java | 5 + pom.xml | 146 +++++++++--------- 4 files changed, 83 insertions(+), 76 deletions(-) diff --git a/datax-example/pom.xml b/datax-example/pom.xml index 9c4c9200..c428aed4 100644 --- a/datax-example/pom.xml +++ b/datax-example/pom.xml @@ -13,8 +13,9 @@ pom datax-example-core - datax-example-streamreader - datax-example-neo4j + + + datax-example-iotdb diff --git a/mysqlwriter/pom.xml b/mysqlwriter/pom.xml index 1c3891f5..58892850 100755 --- a/mysqlwriter/pom.xml +++ b/mysqlwriter/pom.xml @@ -40,7 +40,8 @@ mysql mysql-connector-java - ${mysql.driver.version} + 8.0.31 + diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java index f3180402..29f02e46 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java @@ -269,6 +269,11 @@ public class CommonRdbmsReader { case Types.SMALLINT: case Types.TINYINT: + // 将mysql中的tinyint(1)转为boolean类型 + if (metaData.getPrecision(i) <= 3){ + record.addColumn(new BoolColumn(rs.getBoolean(i))); + break; + } case Types.INTEGER: case Types.BIGINT: record.addColumn(new LongColumn(rs.getString(i))); diff --git a/pom.xml b/pom.xml index eeb4bfaf..5f02b891 100644 --- a/pom.xml +++ b/pom.xml @@ -48,92 +48,92 @@ mysqlreader - drdsreader - sqlserverreader - postgresqlreader - kingbaseesreader - oraclereader - cassandrareader - oceanbasev10reader - rdbmsreader - - odpsreader - otsreader - otsstreamreader - hbase11xreader - hbase094xreader - hbase11xsqlreader - hbase20xsqlreader - - ossreader - hdfsreader - ftpreader txtfilereader streamreader - clickhousereader - - mongodbreader + iotdbreader + + + + + + + + + + + + + + + + + + + + tdenginereader - gdbreader - tsdbreader - opentsdbreader - loghubreader - datahubreader - starrocksreader - sybasereader - dorisreader + + + + + + + + + mysqlwriter - starrockswriter - drdswriter - databendwriter - oraclewriter - sqlserverwriter - postgresqlwriter - kingbaseeswriter - adswriter - oceanbasev10writer - adbpgwriter - hologresjdbcwriter - rdbmswriter - - - odpswriter - osswriter - otswriter - hbase11xwriter - hbase094xwriter - hbase11xsqlwriter - hbase20xsqlwriter - kuduwriter - ftpwriter - hdfswriter txtfilewriter streamwriter - - elasticsearchwriter - mongodbwriter + iotdbwriter + + + + + + + + + + + + + + + + + + + + + + + + tdenginewriter - ocswriter - tsdbwriter - gdbwriter - oscarwriter - loghubwriter - datahubwriter - cassandrawriter - clickhousewriter - doriswriter - selectdbwriter - adbmysqlwriter - sybasewriter - neo4jwriter + + + + + + + + + + + + + + plugin-rdbms-util plugin-unstructured-storage-util - gaussdbreader - gaussdbwriter + + datax-example + + From 5001809dd91bb2ca858c43b0f9fdfd87a6ac6568 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E6=B5=A9=E7=84=B6?= Date: Fri, 19 Jul 2024 15:08:29 +0800 Subject: [PATCH 6/7] =?UTF-8?q?=E6=8C=89=E7=85=A7=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E6=84=8F=E8=A7=81=E8=BF=9B=E8=A1=8C=E4=BA=86=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datax/example/iotdb/TestCreateData.java | 4 +- .../src/test/resources/iotdb2mysql.json | 14 +- .../src/test/resources/iotdb2stream.json | 8 +- .../src/test/resources/iotdb2txt.json | 10 +- .../src/test/resources/mysql2iotdb.json | 9 +- iotdbreader/doc/iotdbreader-CN.md | 50 +++--- .../reader/iotdbreader/IoTDBReader.java | 126 ++++++++------- .../datax/plugin/reader/iotdbreader/Key.java | 4 +- iotdbreader/src/main/resources/plugin.json | 2 +- .../main/resources/plugin_job_template.json | 12 +- iotdbwriter/doc/iotdbwriter-CN.md | 13 +- .../writer/iotdbwriter/IoTDBWriter.java | 148 +++++++++--------- .../datax/plugin/writer/iotdbwriter/Key.java | 4 +- iotdbwriter/src/main/resources/plugin.json | 2 +- .../main/resources/plugin_job_template.json | 9 +- 15 files changed, 202 insertions(+), 213 deletions(-) diff --git a/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestCreateData.java b/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestCreateData.java index 48f9b7ce..c521d4d8 100644 --- a/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestCreateData.java +++ b/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestCreateData.java @@ -28,8 +28,8 @@ public class TestCreateData { // session init session = new Session.Builder() - .host("192.168.150.100") - // .host("172.20.31.6") + // .host("192.168.150.100") + .host("172.20.31.61") .port(6667) .username("root") .password("root") diff --git a/datax-example/datax-example-iotdb/src/test/resources/iotdb2mysql.json b/datax-example/datax-example-iotdb/src/test/resources/iotdb2mysql.json index b0c0e952..b77bfbee 100644 --- a/datax-example/datax-example-iotdb/src/test/resources/iotdb2mysql.json +++ b/datax-example/datax-example-iotdb/src/test/resources/iotdb2mysql.json @@ -12,18 +12,16 @@ "parameter": { "username": "root", "password": "root", - "host": "192.168.150.100", + "host": "172.20.31.61", "port": 6667, "fetchSize": 10000, "version": "V_1_0", "timeColumnPosition": 0, - "finalSqls":[ + "querySqls":[ ], "device": "root.cgn.device", - "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", - "beginDateTime": "2023-03-07 12:00:00", - "endDateTime": "2024-03-07 19:00:00", - "where": "" + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00" } }, "writer": { @@ -32,7 +30,7 @@ "username": "root", "password": "toy123", "writeMode": "insert", - "#需要提前建表": "CREATE TABLE device (`time` BIGINT,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);", + "#需要提前建表": "CREATE TABLE device (`time` DATETIME,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);", "column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], "session": [ "set session sql_mode='ANSI'" @@ -45,7 +43,7 @@ "table": [ "device" ], - "#": "下面的URL需要把中括号去掉,否则报错,mysqlreader的bug,未修改", + "#": "下面的URL需要把中括号去掉,否则报错,mysqlwriter的bug,未修改", "jdbcUrl": "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" } ] diff --git a/datax-example/datax-example-iotdb/src/test/resources/iotdb2stream.json b/datax-example/datax-example-iotdb/src/test/resources/iotdb2stream.json index cabc54eb..ce8f401a 100644 --- a/datax-example/datax-example-iotdb/src/test/resources/iotdb2stream.json +++ b/datax-example/datax-example-iotdb/src/test/resources/iotdb2stream.json @@ -21,14 +21,12 @@ "fetchSize": 10000, "version": "V_1_0", "timeColumnPosition": 0, - "finalSqls":[ + "querySqls":[ ], "device": "root.cgn.device", - "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", - "beginDateTime": "2023-03-07 12:00:00", - "endDateTime": "2024-03-07 19:00:00", - "where": "" + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00" } }, "writer": { diff --git a/datax-example/datax-example-iotdb/src/test/resources/iotdb2txt.json b/datax-example/datax-example-iotdb/src/test/resources/iotdb2txt.json index c962e56d..990411c5 100644 --- a/datax-example/datax-example-iotdb/src/test/resources/iotdb2txt.json +++ b/datax-example/datax-example-iotdb/src/test/resources/iotdb2txt.json @@ -12,20 +12,18 @@ "parameter": { "username": "root", "password": "root", - "host": "192.168.150.100", + "host": "172.20.31.61", "port": 6667, "fetchSize": 10000, "version": "V_1_0", "timeColumnPosition": 0, - "finalSqls":[ + "querySqls":[ "select * from root.cgn.device", "select A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU from root.cgn.device" ], "device": "root.cgn.device", - "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", - "beginDateTime": "2023-03-07 12:00:00", - "endDateTime": "2024-03-07 19:00:00", - "where": "" + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00" } }, "writer": { diff --git a/datax-example/datax-example-iotdb/src/test/resources/mysql2iotdb.json b/datax-example/datax-example-iotdb/src/test/resources/mysql2iotdb.json index 9f7f1d4c..d66d9c94 100644 --- a/datax-example/datax-example-iotdb/src/test/resources/mysql2iotdb.json +++ b/datax-example/datax-example-iotdb/src/test/resources/mysql2iotdb.json @@ -26,16 +26,17 @@ "parameter": { "username": "root", "password": "root", - "host": "192.168.150.100", + "host": "172.20.31.61", "port": 6667, "fetchSize": 10000, "version": "V_1_0", - "##": "Reader中时间列的位置,默认0列", "timeColumnPosition": 0, "insertBatchSize": 1000, "device": "root.cgn.device", - "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", - "deleteExistTimeseries": false + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "preSql": [ + "delete timeseries root.cgn.device.**" + ] } } } diff --git a/iotdbreader/doc/iotdbreader-CN.md b/iotdbreader/doc/iotdbreader-CN.md index 88278529..17f26b57 100644 --- a/iotdbreader/doc/iotdbreader-CN.md +++ b/iotdbreader/doc/iotdbreader-CN.md @@ -33,14 +33,15 @@ IoTDBReader 通过 IoTDB 的 原生java session 查询获取数据。 "port": 6667, "fetchSize": 10000, "version": "V_1_0", + "##": "时间列插入DataX的Record中的位置,默认第0列", "timeColumnPosition": 0, - "finalSqls":[ + "##":"写了querySqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句", + "querySqls":[ ], "device": "root.cgn.device", - "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", - "beginDateTime": "2023-03-07 12:00:00", - "endDateTime": "2024-03-07 19:00:00", - "where": "" + "##":"时间列不属于测点", + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00" } }, "writer": { @@ -49,7 +50,7 @@ IoTDBReader 通过 IoTDB 的 原生java session 查询获取数据。 "username": "root", "password": "toy123", "writeMode": "insert", - "#需要提前建表": "CREATE TABLE device (`time` BIGINT,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);", + "#需要提前建表": "CREATE TABLE device (`time` DATETIME,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);", "column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], "session": [ "set session sql_mode='ANSI'" @@ -96,14 +97,13 @@ IoTDBReader 通过 IoTDB 的 原生java session 查询获取数据。 "fetchSize": 10000, "version": "V_1_0", "timeColumnPosition": 0, - "finalSqls":[ + "##":"写了querySqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句", + "querySqls":[ "select * from root.cgn.device", "select A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU from root.cgn.device" ], "device": "", "measurements": "", - "beginDateTime": "", - "endDateTime": "", "where": "" } }, @@ -147,42 +147,34 @@ IoTDBReader 通过 IoTDB 的 原生java session 查询获取数据。 * 描述:时间列在Record中列的位置 * 必选:否 * 默认值:0 -* finalSqls +* querySqls * 描述:直接写多行SQL,可以并行读取,此时下面的参数失效。 * 必选:否 * 默认值: * device * 描述:IoTDB中的概念,可理解为mysql中的表。 - * 必选:finalSqls为空时必选 + * 必选:querySqls为空时必选 * 默认值:无 * measurements * 描述:IoTDB中的概念,可理解为mysql中的字段。 - * 必选:finalSqls为空时必选 - * 默认值:无 -* beginDateTime - * 描述:SQL查询时的数据的开始时间 - * 必选:finalSqls为空时必选 - * 默认值:无 -* measurements - * 描述:SQL查询时的数据的结束时间 - * 必选:否 + * 必选:querySqls为空时必选 * 默认值:无 * where - * 描述:额外的条件 + * 描述:查询条件 * 必选:否 * 默认值:无 ### 3.3 类型转换 | IoTDB 数据类型 | DataX 内部类型 | -|-----------------|------------| -| INT32 | Int | -| INT64,TIMESTAMP | Long | -| FLOAT | FLOAT | -| DOUBLE | Double | -| BOOLEAN | Bool | -| DATE | Date | -| STRING,TEXT | String | +|-----------------|--------| +| INT32 | Int | +| INT64,TIMESTAMP | Long | +| FLOAT | FLOAT | +| DOUBLE | Double | +| BOOLEAN | Bool | +| DATE | Date | +| STRING,TEXT | String | ## 4 性能报告 diff --git a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java index 1f446139..b285e906 100644 --- a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java +++ b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java @@ -2,7 +2,9 @@ package com.alibaba.datax.plugin.reader.iotdbreader; import com.alibaba.datax.common.element.*; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.AbstractTaskPlugin; import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.spi.Reader; import com.alibaba.datax.common.util.Configuration; import org.apache.iotdb.isession.SessionDataSet; @@ -13,6 +15,7 @@ import org.apache.iotdb.session.Session; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.Field; import org.apache.tsfile.read.common.RowRecord; +import org.apache.tsfile.read.common.block.column.NullColumn; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,42 +72,31 @@ public class IoTDBReader extends Reader { public List split(int adviceNumber) { // 每个config对应一个task List configs = new ArrayList<>(); - List queryList = this.jobConf.getList(Key.FINAL_SQLS, String.class); + List queryList = this.jobConf.getList(Key.QUERY_SQLS, String.class); if (queryList == null || queryList.size() == 0){ Configuration clone = this.jobConf.clone(); - // TODO 同时读取多个设备?有没有必要? String device = this.jobConf.getString(Key.DEVICE); - // 测点是一个逗号分隔的测点字符串或"*" - String measurements = this.jobConf.getString(Key.MEASUREMENTS); - String beginDateTime = this.jobConf.getString(Key.BEGIN_DATETIME); - String endDateTime = this.jobConf.getString(Key.END_DATETIME); + List measurements = this.jobConf.getList(Key.MEASUREMENTS, String.class); String where = this.jobConf.getString(Key.WHERE); StringBuilder sb = new StringBuilder(); - sb.append("select ").append(measurements); + sb.append("select ").append(String.join(",", measurements)); sb.append(" from ").append(device); - sb.append(" where "); - if (beginDateTime != null && !beginDateTime.isEmpty()){ - sb.append("time >= ").append(beginDateTime); - } - if (endDateTime != null && !endDateTime.isEmpty()){ - sb.append(" and time <= ").append(endDateTime); - } if (where != null && !where.isEmpty()){ - sb.append(" and ").append(where); + sb.append(" where ").append(where); } clone.set(Key.QUERY_SQL, sb.toString()); configs.add(clone); - //TODO DataX中是单线程,实际上底层session中是多线程读取。根据什么条件切分多线程? + //DataX中一个查询是单线程,实际上底层session中是多线程读取。 }else{ // 直接读取最终SQL for (String query : queryList) { Configuration clone = this.jobConf.clone(); - clone.remove(Key.FINAL_SQLS); + clone.remove(Key.QUERY_SQLS); clone.set(Key.QUERY_SQL, query); configs.add(clone); } } - LOG.info("configs: {}", configs); + // LOG.info("configs: {}", configs); return configs; } @@ -144,6 +136,7 @@ public class IoTDBReader extends Reader { * 最终的查询SQL,交给session执行。 */ private String querySql; + private TaskPluginCollector taskPluginCollector; @Override public void init() { @@ -171,6 +164,7 @@ public class IoTDBReader extends Reader { this.timeColumnPosition = (taskConf.getInt(Key.TIME_COLUMN_POSITION) == null) ? 0 : taskConf.getInt(Key.TIME_COLUMN_POSITION); this.querySql = taskConf.getString(Key.QUERY_SQL); + taskPluginCollector = super.getTaskPluginCollector(); } @Override @@ -181,7 +175,7 @@ public class IoTDBReader extends Reader { session.close(); } } catch (IoTDBConnectionException e) { - throw new RuntimeException(e); + LOG.info(e.getMessage()); } } @@ -199,58 +193,60 @@ public class IoTDBReader extends Reader { // IoTDB中的行 RowRecord rowRecord = dataSet.next(); List fields = rowRecord.getFields(); - // 除time列外的其他列遍历类型后转换 - for (int i = 0; i < fields.size(); i++) { - if (i == timeColumnPosition){ - // time列插入指定位置 - long timestamp = rowRecord.getTimestamp(); - record.addColumn(new LongColumn(timestamp)); - } - Field field = fields.get(i); - TSDataType dataType = field.getDataType(); - // null类型暂时转为字符串 TODO 有没有其他处理方式? - if (dataType == null) { - record.addColumn(new StringColumn("null")); - continue; - } - switch (dataType) { - // TODO 把所有数据类型都测一遍 - case BOOLEAN: - record.addColumn(new BoolColumn(field.getBoolV())); - break; - case INT32: - record.addColumn(new LongColumn(field.getIntV())); - break; - case INT64: - case TIMESTAMP: - record.addColumn(new LongColumn(field.getLongV())); - break; - case FLOAT: - record.addColumn(new DoubleColumn(field.getFloatV())); - break; - case DOUBLE: - // TODO 为什么DataX推荐用String?区别是什么? - record.addColumn(new DoubleColumn(field.getDoubleV())); - break; - case STRING: - case TEXT: - record.addColumn(new StringColumn(field.getStringValue())); - break; - case DATE: - record.addColumn(new DateColumn(Date.valueOf(field.getDateV()))); - break; - default: - // TODO 其他类型怎么处理? - LOG.info("类型错误:"+ field.getDataType()); + try { + // 除time列外的其他列遍历类型后转换 + for (int i = 0; i < fields.size(); i++) { + if (i == timeColumnPosition){ + // time列插入指定位置,时间列不在fields中,需要单独处理。不能为null + long timestamp = rowRecord.getTimestamp(); + record.addColumn(new DateColumn(timestamp)); + } + Field field = fields.get(i); + TSDataType dataType = field.getDataType(); + if (dataType == null) { + // 需要写插件支持处理null数据,否则会空指向异常,这里先当成脏数据 + // record.addColumn(null); + // continue; + throw new RuntimeException("null datatype"); + } + switch (dataType) { + case BOOLEAN: + record.addColumn(new BoolColumn(field.getBoolV())); + break; + case INT32: + record.addColumn(new LongColumn(field.getIntV())); + break; + case INT64: + case TIMESTAMP: + record.addColumn(new LongColumn(field.getLongV())); + break; + case FLOAT: + record.addColumn(new DoubleColumn(field.getFloatV())); + break; + case DOUBLE: + record.addColumn(new DoubleColumn(field.getDoubleV())); + break; + case STRING: + case TEXT: + record.addColumn(new StringColumn(field.getStringValue())); + break; + case DATE: + record.addColumn(new DateColumn(Date.valueOf(field.getDateV()))); + break; + default: + throw new RuntimeException("Unsupported data type: " + dataType); + } } + // 发送 + recordSender.sendToWriter(record); + }catch (RuntimeException e){ + LOG.info(e.getMessage()); + this.taskPluginCollector.collectDirtyRecord(record, e); } - // 发送 - recordSender.sendToWriter(record); } } catch (StatementExecutionException | IoTDBConnectionException e) { throw new RuntimeException(e); } } } - } diff --git a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/Key.java b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/Key.java index 56a5b637..0283be6a 100644 --- a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/Key.java +++ b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/Key.java @@ -7,12 +7,10 @@ public class Key { public static final String PORT = "port"; public static final String FETCH_SIZE = "fetchSize"; public static final String VERSION = "version"; - public static final String FINAL_SQLS = "finalSqls"; + public static final String QUERY_SQLS = "querySqls"; public static final String QUERY_SQL = "querySql"; public static final String TIME_COLUMN_POSITION = "timeColumnPosition"; public static final String DEVICE = "device"; public static final String MEASUREMENTS = "measurements"; - public static final String BEGIN_DATETIME = "beginDateTime"; - public static final String END_DATETIME = "endDateTime"; public static final String WHERE = "where"; } diff --git a/iotdbreader/src/main/resources/plugin.json b/iotdbreader/src/main/resources/plugin.json index 84dfa086..39d6b897 100644 --- a/iotdbreader/src/main/resources/plugin.json +++ b/iotdbreader/src/main/resources/plugin.json @@ -5,6 +5,6 @@ "useScene": "data migration to iotdb", "mechanism": "use iotdb-java-session to write data." }, - "developer": "lihaoran-Timecho" + "developer": "timecho.com" } diff --git a/iotdbreader/src/main/resources/plugin_job_template.json b/iotdbreader/src/main/resources/plugin_job_template.json index 6639a103..14287ed7 100644 --- a/iotdbreader/src/main/resources/plugin_job_template.json +++ b/iotdbreader/src/main/resources/plugin_job_template.json @@ -9,8 +9,8 @@ "version": "V_1_0", "##": "时间列插入DataX的Record中的位置,默认第0列", "timeColumnPosition": 0, - "##":"写了finalSqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句", - "finalSqls": [ + "##":"写了querySqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句", + "querySqls": [ "select * from root.cgn.device", "select * from root.cgn.device", "select * from root.cgn.device", @@ -18,10 +18,8 @@ "select * from root.cgn.device" ], "device": "root.cgn.device", - "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", - "##": "开始时间必填,否则SQL错误", - "beginDateTime": "2023-03-07 12:00:00", - "endDateTime": "2024-03-07 19:00:00", - "where": "" + "##":"时间列不属于测点", + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00" } } \ No newline at end of file diff --git a/iotdbwriter/doc/iotdbwriter-CN.md b/iotdbwriter/doc/iotdbwriter-CN.md index 7921b233..508e367a 100644 --- a/iotdbwriter/doc/iotdbwriter-CN.md +++ b/iotdbwriter/doc/iotdbwriter-CN.md @@ -57,10 +57,11 @@ IoTDB中设备与列的概念见IoTDB官方文档。 "version": "V_1_0", "##": "Reader中时间列的位置,默认0列", "timeColumnPosition": 0, - "insertBatchSize": 1000, + "batchSize": 1000, "device": "root.cgn.device", - "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", - "deleteExistTimeseries": false + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "###": "在插入之前,预先执行的SQL,默认为空", + "preSql": [] } } } @@ -112,10 +113,10 @@ IoTDB中设备与列的概念见IoTDB官方文档。 * 描述:每batchSize条record为一个batch进行写入 * 必选:否 * 默认值:1000 -* deleteExistTimeseries - * 描述:插入前是否删除该device下的所有数据 +* preSql + * 描述:插入前是否预先执行SQL * 必选:否 - * 默认值:false + * 默认值:无 ### 3.3 类型转换 diff --git a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java index 2e4de1ec..ddade9e3 100644 --- a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java +++ b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java @@ -15,6 +15,7 @@ import org.apache.tsfile.enums.TSDataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Date; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -61,7 +62,7 @@ public class IoTDBWriter extends Writer { Configuration clone = this.jobConf.clone(); configs.add(clone); } - LOG.info("configs: {}", configs); + // LOG.info("configs: {}", configs); return configs; } @@ -74,26 +75,26 @@ public class IoTDBWriter extends Writer { public static class Task extends Writer.Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); private Configuration taskConf; - /** - * IoTDB原生读写工具 - */ + + // IoTDB原生读写工具 private Session session; - /** - * 是否在插入前删除已有的时间序列?默认false - */ - private boolean deleteExistTimeseries; + // 是否在插入前删除已有的时间序列,为""表示不执行 + // private String deleteBeforeInsert; - /** - * 插入批次大小 - */ + // 插入批次大小 private int insertBatchSize; - /** - * IoTDB中的时间列插入的位置,默认为0,即第一列。 - */ + // IoTDB中的时间列插入的位置,默认为0,即第一列。 private int timeColumnPosition; + // 处理脏数据 + private TaskPluginCollector taskPluginCollector; + + // 预先执行的SQL语句 + private List preSqls; + + @Override public void init() { // 获取与本Task相关的配置,是Job的split方法返回的配置列表中的其中一个。 @@ -116,28 +117,27 @@ public class IoTDBWriter extends Writer { } // 获取参数,否则默认值 - insertBatchSize = (taskConf.getInt(Key.INSERT_BATCH_SIZE) == null) ? 1000 : taskConf.getInt(Key.INSERT_BATCH_SIZE); + insertBatchSize = (taskConf.getInt(Key.BATCH_SIZE) == null) ? 1000 : taskConf.getInt(Key.BATCH_SIZE); timeColumnPosition = (taskConf.getInt(Key.TIME_COLUMN_POSITION) == null) ? 0 : taskConf.getInt(Key.TIME_COLUMN_POSITION); - deleteExistTimeseries = (taskConf.getBool(Key.DELETE_EXIST_TIMESERIES) == null) ? false : taskConf.getBool(Key.DELETE_EXIST_TIMESERIES); + preSqls = (taskConf.getList(Key.PRE_SQL, String.class) == null) ? new ArrayList<>() : taskConf.getList(Key.PRE_SQL, String.class); + taskPluginCollector = super.getTaskPluginCollector(); } @Override public void prepare() { - // 是否先删除已有的时间序列 - try { - if (deleteExistTimeseries){ - if (session.checkTimeseriesExists(taskConf.getString(Key.DEVICE) + ".**")) { - session.deleteTimeseries(taskConf.getString(Key.DEVICE) + ".**"); - LOG.info("===========删除已有的时间序列完成=============="); - }else { - LOG.info("===========不存在已有时间序列=============="); + if (preSqls.size() != 0){ + for (String sql : preSqls) { + try { + session.executeNonQueryStatement(sql); + + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); } } - } catch (IoTDBConnectionException | StatementExecutionException e) { - throw new RuntimeException(e); + LOG.info("=======Complated preSqls======="); } - // 是否创建测点时间序列?不需要,IoTDB会自动创建时间序列。 + // IoTDB会自动创建时间序列,无需提前创建 } @Override @@ -147,7 +147,7 @@ public class IoTDBWriter extends Writer { session.close(); } } catch (IoTDBConnectionException e) { - throw new RuntimeException(e); + LOG.info(e.getMessage()); } } @@ -170,68 +170,74 @@ public class IoTDBWriter extends Writer { // 获取Record记录,传输结束返回null int count; // 统计插入记录数 for (count = 0; (record = lineReceiver.getFromReader()) != null; count++) { - // LOG.info("record:" + record); // 处理时间列 timestamps.add(record.getColumn(timeColumnPosition).asLong()); // 处理测点 - List measurements = Arrays.asList(taskConf.getString(Key.MEASUREMENTS).split(",")); + List measurements = taskConf.getList(Key.MEASUREMENTS, String.class); measurementsList.add(measurements); // 处理类型和值 List types = new ArrayList<>(); List values = new ArrayList<>(); - // List values = new ArrayList<>(); - for (int i = 0; i < record.getColumnNumber(); i++) { - if (i == timeColumnPosition){ - continue; // 跳过时间列 - } - Column col = record.getColumn(i); - switch (col.getType()) { - case BOOL: - types.add(TSDataType.BOOLEAN); - values.add(col.asBoolean()); - break; - case INT: - types.add(TSDataType.INT32); - values.add((Integer) col.getRawData()); - break; - case LONG: - types.add(TSDataType.INT64); - values.add(col.asLong()); - break; - case DOUBLE: - types.add(TSDataType.DOUBLE); - values.add(col.asDouble()); - break; - case STRING: - types.add(TSDataType.STRING); - values.add(col.asString()); - break; - case DATE: - types.add(TSDataType.DATE); - values.add(col.asDate()); - break; - default: - throw new RuntimeException("unsupported type:" + col.getType()); + try{ + for (int i = 0; i < record.getColumnNumber(); i++) { + if (i == timeColumnPosition){ + continue; // 跳过时间列 + } + Column col = record.getColumn(i); + switch (col.getType()) { + case BOOL: + types.add(TSDataType.BOOLEAN); + values.add(col.asBoolean()); + break; + case INT: + types.add(TSDataType.INT32); + values.add((Integer) col.getRawData()); + break; + case LONG: + types.add(TSDataType.INT64); + values.add(col.asLong()); + break; + case DOUBLE: + types.add(TSDataType.DOUBLE); + values.add(col.asDouble()); + break; + case NULL: + // IoTDB可以处理null + types.add(null); + values.add(null); + break; + case STRING: + types.add(TSDataType.STRING); + values.add(col.asString()); + break; + case DATE: + types.add(TSDataType.DATE); + values.add(col.asDate()); + break; + case BAD: + default: + throw new RuntimeException("unsupported type:" + col.getType()); + } } + typesList.add(types); + valuesList.add(values); + }catch (RuntimeException e){ + LOG.info(e.getMessage()); + taskPluginCollector.collectDirtyRecord(record, e); } - typesList.add(types); - valuesList.add(values); if (count != 0 && count % insertBatchSize == 0) { session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList); - LOG.info("已插入"+count+"条数据"); - measurementsList.clear(); - valuesList.clear(); - typesList.clear(); timestamps.clear(); + measurementsList.clear(); + typesList.clear(); valuesList.clear(); } } if (!timestamps.isEmpty()){ session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList); - LOG.info("已插入剩余数据:" + timestamps.size() + "条"); } - LOG.info("已插入所有数据:" + (count-1) + "条"); + LOG.info("========= task all data inserted, total record: " + (count-1)); }catch (IoTDBConnectionException | StatementExecutionException e) { throw new RuntimeException(e); } diff --git a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/Key.java b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/Key.java index f702d979..552690f7 100644 --- a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/Key.java +++ b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/Key.java @@ -9,6 +9,6 @@ public class Key { public static final String TIME_COLUMN_POSITION = "timeColumnPosition"; public static final String DEVICE = "device"; public static final String MEASUREMENTS = "measurements"; - public static final String DELETE_EXIST_TIMESERIES = "deleteExistTimeseries"; - public static final String INSERT_BATCH_SIZE = "insertBatchSize"; + public static final String PRE_SQL = "preSql"; + public static final String BATCH_SIZE = "batchSize"; } diff --git a/iotdbwriter/src/main/resources/plugin.json b/iotdbwriter/src/main/resources/plugin.json index ed7040c4..9d034718 100644 --- a/iotdbwriter/src/main/resources/plugin.json +++ b/iotdbwriter/src/main/resources/plugin.json @@ -5,5 +5,5 @@ "useScene": "data migration to iotdb", "mechanism": "use iotdb-java-session to write data." }, - "developer": "lihaoran-Timecho" + "developer": "timecho.com" } \ No newline at end of file diff --git a/iotdbwriter/src/main/resources/plugin_job_template.json b/iotdbwriter/src/main/resources/plugin_job_template.json index 739cead1..796cd135 100644 --- a/iotdbwriter/src/main/resources/plugin_job_template.json +++ b/iotdbwriter/src/main/resources/plugin_job_template.json @@ -8,9 +8,12 @@ "version": "V_1_0", "##": "注意是Reader插件读取到的数据中时间列的位置,不是该插件,默认0列", "timeColumnPosition": 0, - "insertBatchSize": 1000, + "batchSize": 1000, "device": "root.cgn.device", - "measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", - "deleteExistTimeseries": false + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "###": "在插入之前,执行删除操作,为空或不配置表示不执行", + "preSql": [ + "delete timeseries root.cgn.device.**" + ] } } \ No newline at end of file From 153b2fcd657268aa1afe8a24697b6bf826d9cd11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E6=B5=A9=E7=84=B6?= Date: Fri, 19 Jul 2024 16:21:20 +0800 Subject: [PATCH 7/7] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BA=86=E4=B8=80?= =?UTF-8?q?=E4=BA=9Bpom=E6=96=87=E4=BB=B6=EF=BC=8C=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E4=BA=86iotdb=E4=BE=9D=E8=B5=96=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- datax-example/pom.xml | 4 +- iotdbwriter/doc/iotdbwriter-CN.md | 10 ++ mysqlwriter/pom.xml | 3 +- pom.xml | 148 +++++++++++++++--------------- 4 files changed, 88 insertions(+), 77 deletions(-) diff --git a/datax-example/pom.xml b/datax-example/pom.xml index c428aed4..ac50d774 100644 --- a/datax-example/pom.xml +++ b/datax-example/pom.xml @@ -13,8 +13,8 @@ pom datax-example-core - - + datax-example-streamreader + datax-example-neo4j datax-example-iotdb diff --git a/iotdbwriter/doc/iotdbwriter-CN.md b/iotdbwriter/doc/iotdbwriter-CN.md index 508e367a..dc350cac 100644 --- a/iotdbwriter/doc/iotdbwriter-CN.md +++ b/iotdbwriter/doc/iotdbwriter-CN.md @@ -75,6 +75,16 @@ IoTDB中设备与列的概念见IoTDB官方文档。 } ``` +注意:mysqlreader插件,在src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java 中270行左右,修改了代码,将mysql中的tinyint(1)转为boolean类型 +case Types.SMALLINT: +case Types.TINYINT: + // 将mysql中的tinyint(1)转为boolean类型 + if (metaData.getPrecision(i) <= 3){ + record.addColumn(new BoolColumn(rs.getBoolean(i))); + break; + } + + ### 3.2 参数说明 * username diff --git a/mysqlwriter/pom.xml b/mysqlwriter/pom.xml index 58892850..1c3891f5 100755 --- a/mysqlwriter/pom.xml +++ b/mysqlwriter/pom.xml @@ -40,8 +40,7 @@ mysql mysql-connector-java - 8.0.31 - + ${mysql.driver.version} diff --git a/pom.xml b/pom.xml index 5f02b891..c6124f00 100644 --- a/pom.xml +++ b/pom.xml @@ -48,92 +48,94 @@ mysqlreader + drdsreader + sqlserverreader + postgresqlreader + kingbaseesreader + oraclereader + cassandrareader + oceanbasev10reader + rdbmsreader + + odpsreader + otsreader + otsstreamreader + hbase11xreader + hbase094xreader + hbase11xsqlreader + hbase20xsqlreader + + ossreader + hdfsreader + ftpreader txtfilereader streamreader - iotdbreader - - - - - - - - - - - - - - - - - - - - - tdenginereader - - - - - - - - + clickhousereader + mongodbreader + iotdbreader + tdenginereader + gdbreader + tsdbreader + opentsdbreader + loghubreader + datahubreader + starrocksreader + sybasereader + dorisreader mysqlwriter + starrockswriter + drdswriter + databendwriter + oraclewriter + sqlserverwriter + postgresqlwriter + kingbaseeswriter + adswriter + oceanbasev10writer + adbpgwriter + hologresjdbcwriter + rdbmswriter + + + odpswriter + osswriter + otswriter + hbase11xwriter + hbase094xwriter + hbase11xsqlwriter + hbase20xsqlwriter + kuduwriter + ftpwriter + hdfswriter txtfilewriter streamwriter - iotdbwriter - - - - - - - - - - - - - - - - - - - - - - - - - tdenginewriter - - - - - - - - - - - - - + elasticsearchwriter + mongodbwriter + tdenginewriter + iotdbwriter + ocswriter + tsdbwriter + gdbwriter + oscarwriter + loghubwriter + datahubwriter + cassandrawriter + clickhousewriter + doriswriter + selectdbwriter + adbmysqlwriter + sybasewriter + neo4jwriter plugin-rdbms-util plugin-unstructured-storage-util - - + gaussdbreader + gaussdbwriter datax-example - -