From 65f704b7772a1fadc2f308600e310c4762d554c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E6=B5=A9=E7=84=B6?= Date: Thu, 20 Jun 2024 10:07:47 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0DataX=E8=AF=BB=E5=86=99Demo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- iotdbreader/doc/iotdbreader-CN.md | 4 + iotdbreader/pom.xml | 102 ++++++++++ iotdbreader/src/main/assembly/package.xml | 34 ++++ .../reader/iotdbreader/IoTDBReader.java | 190 ++++++++++++++++++ .../iotdbreader/IoTDBReaderErrorCode.java | 28 +++ iotdbreader/src/main/resources/plugin.json | 10 + .../main/resources/plugin_job_template.json | 23 +++ iotdbwriter/doc/iotdbwriter-CN.md | 0 iotdbwriter/pom.xml | 102 ++++++++++ iotdbwriter/src/main/assembly/package.xml | 34 ++++ .../writer/iotdbwriter/IoTDBWriter.java | 165 +++++++++++++++ .../iotdbwriter/IoTDBWriterErrorCode.java | 29 +++ iotdbwriter/src/main/resources/plugin.json | 9 + .../main/resources/plugin_job_template.json | 20 ++ 14 files changed, 750 insertions(+) create mode 100644 iotdbreader/doc/iotdbreader-CN.md create mode 100644 iotdbreader/pom.xml create mode 100644 iotdbreader/src/main/assembly/package.xml create mode 100644 iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java create mode 100644 iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReaderErrorCode.java create mode 100644 iotdbreader/src/main/resources/plugin.json create mode 100644 iotdbreader/src/main/resources/plugin_job_template.json create mode 100644 iotdbwriter/doc/iotdbwriter-CN.md create mode 100644 iotdbwriter/pom.xml create mode 100644 iotdbwriter/src/main/assembly/package.xml create mode 100644 iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java create mode 100644 iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriterErrorCode.java create mode 100644 iotdbwriter/src/main/resources/plugin.json create mode 100644 iotdbwriter/src/main/resources/plugin_job_template.json diff --git a/iotdbreader/doc/iotdbreader-CN.md b/iotdbreader/doc/iotdbreader-CN.md new file mode 100644 index 00000000..d4b2d70f --- /dev/null +++ b/iotdbreader/doc/iotdbreader-CN.md @@ -0,0 +1,4 @@ + + +## 快速介绍 +iotdbreader用来读取iotdb中的数据,然后传输到其他数据库。 diff --git a/iotdbreader/pom.xml b/iotdbreader/pom.xml new file mode 100644 index 00000000..b531891e --- /dev/null +++ b/iotdbreader/pom.xml @@ -0,0 +1,102 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + iotdbreader + + + 8 + 8 + 1.3.3-SNAPSHOT + + + + + com.alibaba.datax + datax-core + 0.0.1-SNAPSHOT + test + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.iotdb + iotdb-session + ${iotdb.session.version} + + + org.apache.iotdb + node-commons + ${iotdb.session.version} + + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.12.4 + + + + **/*Test.java + + + + + true + + + + + + + \ No newline at end of file diff --git a/iotdbreader/src/main/assembly/package.xml b/iotdbreader/src/main/assembly/package.xml new file mode 100644 index 00000000..f01364f1 --- /dev/null +++ b/iotdbreader/src/main/assembly/package.xml @@ -0,0 +1,34 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/iotdbreader + + + target/ + + iotdbreader-0.0.1-SNAPSHOT.jar + + plugin/writer/iotdbreader + + + + + + false + plugin/reader/iotdbreader/libs + runtime + + + diff --git a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java new file mode 100644 index 00000000..cd26cd46 --- /dev/null +++ b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java @@ -0,0 +1,190 @@ +package com.alibaba.datax.plugin.reader.iotdbreader; + +import com.alibaba.datax.common.element.*; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.ErrorCode; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; +import org.apache.commons.lang3.StringUtils; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.isession.util.Version; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.Field; +import org.apache.tsfile.read.common.RowRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UnsupportedEncodingException; +import java.sql.*; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class IoTDBReader extends Reader { + public static class Job extends Reader.Job { + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + private Configuration originalConfig; + + /** + * Job对象初始化工作 + */ + @Override + public void init() { + // TODO 配置文件还没规划格式 + // 通过super.getPluginJobConf()获取与本插件相关的配置。 + // 读插件获得配置中reader部分,写插件获得writer部分。 + this.originalConfig = super.getPluginJobConf(); + // TODO 检查各种参数是否正确 + } + + /** + * Job对象自身的销毁工作。 + */ + @Override + public void destroy() { + + } + + /** + * 全局的后置工作,比如mysqlwriter同步完影子表后的rename操作。 + */ + @Override + public void post() { + + } + + /** + * 将Job拆分Task。 + * @param adviceNumber 框架建议的拆分数,一般是运行时所配置的并发度。 + * @return Task的配置列表。 + */ + @Override + public List split(int adviceNumber) { + // TODO 暂时拆分为adviceNumber个,不知道是怎么切割的。。。后序需要继续测试 + // TODO DEBUG看看是不是一个配置对应一个Task,一个Task启动配置文件中的连接,执行一个查询。 + // 本机增加100个配置文件,写入txt,生成100个txt文件,运行如下 + //任务启动时刻 : 2024-06-19 16:21:13 + // 任务结束时刻 : 2024-06-19 16:21:24 + // 任务总计耗时 : 10s + // 任务平均流量 : 42.93MB/s + // 记录写入速度 : 90010rec/s + // 读出记录总数 : 900100 + // 读写失败总数 : 0 + List configurations = new ArrayList<>(); + for (int i = 0; i < 100; i++){ + configurations.add(this.originalConfig); + } + return configurations; + } + } + + public static class Task extends Reader.Task { + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + + private Configuration readerSliceConfig; + private String mandatoryEncoding; + + private Session session; + + @Override + public void init() { + // session init + session = + new Session.Builder() + .host("192.168.150.100") + .port(6667) + .username("root") + .password("root") + .version(Version.V_0_13) + .build(); + // open session, close RPCCompression + try { + session.open(false); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + + // set session fetchSize + session.setFetchSize(10000); + } + + @Override + public void destroy() { + try { + if (session != null){ + session.close(); + } + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + } + + @Override + public void startRead(RecordSender recordSender) { + try { + // TODO 把流程调通后把SQL语句抽出去 + // SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1"); + SessionDataSet dataSet = session.executeQueryStatement("select * from root.cgn.device"); + // System.out.println(dataSet.getColumnNames()); + List columnTypes = dataSet.getColumnTypes(); + // System.out.println(columnTypes); + int columnNums = columnTypes.size(); + // dataSet.setFetchSize(1024); + while (dataSet.hasNext()) { + RowRecord rowRecord = dataSet.next(); + // 将iotdb中的行 转为datax中的record + Record record = recordSender.createRecord(); + // time列直接处理 + long timestamp = rowRecord.getTimestamp(); + record.addColumn(new LongColumn(timestamp)); + List fields = rowRecord.getFields(); + // 其他列遍历类型后转换 + for (Field field : fields) { + TSDataType dataType = field.getDataType(); + // null类型暂时转为字符串 + if (dataType == null) { + record.addColumn(new StringColumn("null")); + continue; + } + switch (dataType) { + // TODO 把所有数据类型都测一遍 + case BOOLEAN: + record.addColumn(new BoolColumn(field.getBoolV())); + break; + case INT32: + record.addColumn(new LongColumn(field.getIntV())); + break; + case INT64: + case TIMESTAMP: + record.addColumn(new LongColumn(field.getLongV())); + break; + case FLOAT: + record.addColumn(new DoubleColumn(field.getFloatV())); + break; + case DOUBLE: + record.addColumn(new DoubleColumn(field.getDoubleV())); + break; + case STRING: + case TEXT: + record.addColumn(new StringColumn(field.getStringValue())); + break; + case DATE: + record.addColumn(new DateColumn(Date.valueOf(field.getDateV()))); + break; + default: + System.out.println("类型错误"+field.getDataType()); + } + } + recordSender.sendToWriter(record); + } + } catch (StatementExecutionException | IoTDBConnectionException e) { + throw new RuntimeException(e); + } + } + } + +} diff --git a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReaderErrorCode.java b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReaderErrorCode.java new file mode 100644 index 00000000..e61aa53e --- /dev/null +++ b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReaderErrorCode.java @@ -0,0 +1,28 @@ +package com.alibaba.datax.plugin.reader.iotdbreader; + +import com.alibaba.datax.common.spi.ErrorCode; + +public class IoTDBReaderErrorCode implements ErrorCode { + private final String code; + private final String description; + + public IoTDBReaderErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s].", this.code, this.description); + } +} diff --git a/iotdbreader/src/main/resources/plugin.json b/iotdbreader/src/main/resources/plugin.json new file mode 100644 index 00000000..84dfa086 --- /dev/null +++ b/iotdbreader/src/main/resources/plugin.json @@ -0,0 +1,10 @@ +{ + "name": "iotdbreader", + "class": "com.alibaba.datax.plugin.reader.iotdbreader.IoTDBReader", + "description": { + "useScene": "data migration to iotdb", + "mechanism": "use iotdb-java-session to write data." + }, + "developer": "lihaoran-Timecho" +} + diff --git a/iotdbreader/src/main/resources/plugin_job_template.json b/iotdbreader/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..0722bf0e --- /dev/null +++ b/iotdbreader/src/main/resources/plugin_job_template.json @@ -0,0 +1,23 @@ +{ + "name": "iotdbreader", + "parameter": { + "user": "", + "password": "", + "connection": [ + { + "table": [ + "" + ], + "sessionUrl": [ + "" + ] + } + ], + "column": [ + "" + ], + "beginDateTime": "", + "endDateTime": "", + "where": "" + } +} \ No newline at end of file diff --git a/iotdbwriter/doc/iotdbwriter-CN.md b/iotdbwriter/doc/iotdbwriter-CN.md new file mode 100644 index 00000000..e69de29b diff --git a/iotdbwriter/pom.xml b/iotdbwriter/pom.xml new file mode 100644 index 00000000..31c747be --- /dev/null +++ b/iotdbwriter/pom.xml @@ -0,0 +1,102 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + iotdbwriter + + + 8 + 8 + 1.3.3-SNAPSHOT + + + + + com.alibaba.datax + datax-core + 0.0.1-SNAPSHOT + test + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.iotdb + iotdb-session + ${iotdb.session.version} + + + org.apache.iotdb + node-commons + ${iotdb.session.version} + + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.12.4 + + + + **/*Test.java + + + + + true + + + + + + + \ No newline at end of file diff --git a/iotdbwriter/src/main/assembly/package.xml b/iotdbwriter/src/main/assembly/package.xml new file mode 100644 index 00000000..ca1ba9ad --- /dev/null +++ b/iotdbwriter/src/main/assembly/package.xml @@ -0,0 +1,34 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/iotdbwriter + + + target/ + + iotdbwriter-0.0.1-SNAPSHOT.jar + + plugin/writer/iotdbwriter + + + + + + false + plugin/writer/iotdbwriter/libs + runtime + + + diff --git a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java new file mode 100644 index 00000000..f4f424c2 --- /dev/null +++ b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java @@ -0,0 +1,165 @@ +package com.alibaba.datax.plugin.writer.iotdbwriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import org.apache.iotdb.isession.util.Version; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.tsfile.enums.TSDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class IoTDBWriter extends Writer { + + public static class Job extends Writer.Job { + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + private Configuration originalConfig; + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + // TODO 检查配置文件参数 + + } + + @Override + public void destroy() { + + } + + @Override + public List split(int mandatoryNumber) { + // TODO 根据什么拆分Task? + List configs = new ArrayList<>(); + for (int i = 0; i < mandatoryNumber; i++) { + configs.add(originalConfig); + } + return configs; + } + } + + public static class Task extends Writer.Task { + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + private Configuration writerSliceConfig; + private TaskPluginCollector taskPluginCollector; + + private Session session; + @Override + public void init() { + this.writerSliceConfig = getPluginJobConf(); + this.taskPluginCollector = super.getTaskPluginCollector(); + + // session init + session = + new Session.Builder() + .host("192.168.150.100") + .port(6667) + .username("root") + .password("root") + .version(Version.V_0_13) + .build(); + // open session, close RPCCompression + try { + session.open(false); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + + // set session fetchSize + session.setFetchSize(10000); + + // // 先删除已有的时间序列 + // if (session.checkTimeseriesExists(device + ".**")) { + // session.deleteTimeseries(device + ".**"); + // System.out.println("删除已有的时间序列完成=============="); + // } + } + + @Override + public void destroy() { + try { + if (session != null){ + session.close(); + } + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + } + + @Override + public void startWrite(RecordReceiver lineReceiver) { + // 暂时实现往一个设备中插入数据(也就是类似一个表) + // 插入1条的原因是这里的只读了一次。 + Record record = null; + for (int count = 1; (record = lineReceiver.getFromReader()) != null; count++) { + System.out.println(record); + int columnNums = record.getColumnNumber(); + // 先实现一条条插入 + String device = "root.test.device2"; + List measurements = new ArrayList<>(); // TODO 这个好像没传过来。 + List types = new ArrayList<>(); + // List values = new ArrayList<>(); + List values = new ArrayList<>(); + for (int i = 0; i < columnNums; i++) { + measurements.add("ss" + i); // 没传过来先用这个 + Column column = record.getColumn(i); + // values.add(column.getRawData()); + values.add(column.getRawData().toString()); + // TODO 需要测试一下 + switch (column.getType()) { + case BOOL: + types.add(TSDataType.BOOLEAN); + break; + case INT: + types.add(TSDataType.INT32); + break; + case LONG: + types.add(TSDataType.INT64); + break; + case DOUBLE: + types.add(TSDataType.DOUBLE); + break; + case STRING: + types.add(TSDataType.STRING); + break; + case DATE: + types.add(TSDataType.DATE); + break; + default: + throw new RuntimeException("unsupported type:" + column.getType()); + } + } + long time = System.currentTimeMillis(); + try { + + // // 创建测点时间序列 + // session.createMultiTimeseries( + // paths, tsDataTypes, tsEncodings, compressionTypes, null, null, attributesList, null); + + // 这个插入失败(报错) + // WARN o.a.i.d.u.ErrorHandlingUtils:65 - + // Status code: EXECUTE_STATEMENT_ERROR(301), operation: insertRecord failed + // java.lang.ArrayIndexOutOfBoundsException: 11 + // session.insertRecord(device, time, measurements, types, values); + // 这个插入成功,record读取一次只有一条数据,需要循环读取。 + session.insertRecord(device, time, measurements,values); + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + + // TODO 构建List,批量写入 + // session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList); + + } + + } + } +} diff --git a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriterErrorCode.java b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriterErrorCode.java new file mode 100644 index 00000000..666fcd75 --- /dev/null +++ b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriterErrorCode.java @@ -0,0 +1,29 @@ +package com.alibaba.datax.plugin.writer.iotdbwriter; + +import com.alibaba.datax.common.spi.ErrorCode; + +public class IoTDBWriterErrorCode implements ErrorCode { + + private final String code; + private final String description; + + public IoTDBWriterErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s].", this.code, this.description); + } +} diff --git a/iotdbwriter/src/main/resources/plugin.json b/iotdbwriter/src/main/resources/plugin.json new file mode 100644 index 00000000..ed7040c4 --- /dev/null +++ b/iotdbwriter/src/main/resources/plugin.json @@ -0,0 +1,9 @@ +{ + "name": "iotdbwriter", + "class": "com.alibaba.datax.plugin.writer.iotdbwriter.IoTDBWriter", + "description": { + "useScene": "data migration to iotdb", + "mechanism": "use iotdb-java-session to write data." + }, + "developer": "lihaoran-Timecho" +} \ No newline at end of file diff --git a/iotdbwriter/src/main/resources/plugin_job_template.json b/iotdbwriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..973b6b56 --- /dev/null +++ b/iotdbwriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,20 @@ +{ + "name": "iotdbwriter", + "parameter": { + "username": "root", + "password": "toy123", + "column": [ + "" + ], + "connection": [ + { + "table": [ + "" + ], + "jdbcUrl": "" + } + ], + "batchSize": 1000, + "ignoreTagsUnmatched": true + } +} \ No newline at end of file