实现DataX读写Demo

This commit is contained in:
李浩然 2024-06-20 10:07:47 +08:00
parent 29c3bb4c49
commit 65f704b777
14 changed files with 750 additions and 0 deletions

View File

@ -0,0 +1,4 @@
## 快速介绍
iotdbreader用来读取iotdb中的数据然后传输到其他数据库。

102
iotdbreader/pom.xml Normal file
View File

@ -0,0 +1,102 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>iotdbreader</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<iotdb.session.version>1.3.3-SNAPSHOT</iotdb.session.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>${iotdb.session.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>node-commons</artifactId>
<version>${iotdb.session.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12.4</version>
<configuration>
<!-- 包含哪些测试用例 -->
<includes>
<include>**/*Test.java</include>
</includes>
<!-- 不包含哪些测试用例 -->
<excludes>
</excludes>
<testFailureIgnore>true</testFailureIgnore>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,34 @@
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/iotdbreader</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>iotdbreader-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/iotdbreader</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/reader/iotdbreader/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,190 @@
package com.alibaba.datax.plugin.reader.iotdbreader;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.ErrorCode;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.isession.util.Version;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.Field;
import org.apache.tsfile.read.common.RowRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.sql.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class IoTDBReader extends Reader {
public static class Job extends Reader.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig;
/**
* Job对象初始化工作
*/
@Override
public void init() {
// TODO 配置文件还没规划格式
// 通过super.getPluginJobConf()获取与本插件相关的配置
// 读插件获得配置中reader部分写插件获得writer部分
this.originalConfig = super.getPluginJobConf();
// TODO 检查各种参数是否正确
}
/**
* Job对象自身的销毁工作
*/
@Override
public void destroy() {
}
/**
* 全局的后置工作比如mysqlwriter同步完影子表后的rename操作
*/
@Override
public void post() {
}
/**
* 将Job拆分Task
* @param adviceNumber 框架建议的拆分数一般是运行时所配置的并发度
* @return Task的配置列表
*/
@Override
public List<Configuration> split(int adviceNumber) {
// TODO 暂时拆分为adviceNumber个不知道是怎么切割的后序需要继续测试
// TODO DEBUG看看是不是一个配置对应一个Task一个Task启动配置文件中的连接执行一个查询
// 本机增加100个配置文件写入txt生成100个txt文件运行如下
//任务启动时刻 : 2024-06-19 16:21:13
// 任务结束时刻 : 2024-06-19 16:21:24
// 任务总计耗时 : 10s
// 任务平均流量 : 42.93MB/s
// 记录写入速度 : 90010rec/s
// 读出记录总数 : 900100
// 读写失败总数 : 0
List<Configuration> configurations = new ArrayList<>();
for (int i = 0; i < 100; i++){
configurations.add(this.originalConfig);
}
return configurations;
}
}
public static class Task extends Reader.Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private Configuration readerSliceConfig;
private String mandatoryEncoding;
private Session session;
@Override
public void init() {
// session init
session =
new Session.Builder()
.host("192.168.150.100")
.port(6667)
.username("root")
.password("root")
.version(Version.V_0_13)
.build();
// open session, close RPCCompression
try {
session.open(false);
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
}
// set session fetchSize
session.setFetchSize(10000);
}
@Override
public void destroy() {
try {
if (session != null){
session.close();
}
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
}
}
@Override
public void startRead(RecordSender recordSender) {
try {
// TODO 把流程调通后把SQL语句抽出去
// SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1");
SessionDataSet dataSet = session.executeQueryStatement("select * from root.cgn.device");
// System.out.println(dataSet.getColumnNames());
List<String> columnTypes = dataSet.getColumnTypes();
// System.out.println(columnTypes);
int columnNums = columnTypes.size();
// dataSet.setFetchSize(1024);
while (dataSet.hasNext()) {
RowRecord rowRecord = dataSet.next();
// 将iotdb中的行 转为datax中的record
Record record = recordSender.createRecord();
// time列直接处理
long timestamp = rowRecord.getTimestamp();
record.addColumn(new LongColumn(timestamp));
List<Field> fields = rowRecord.getFields();
// 其他列遍历类型后转换
for (Field field : fields) {
TSDataType dataType = field.getDataType();
// null类型暂时转为字符串
if (dataType == null) {
record.addColumn(new StringColumn("null"));
continue;
}
switch (dataType) {
// TODO 把所有数据类型都测一遍
case BOOLEAN:
record.addColumn(new BoolColumn(field.getBoolV()));
break;
case INT32:
record.addColumn(new LongColumn(field.getIntV()));
break;
case INT64:
case TIMESTAMP:
record.addColumn(new LongColumn(field.getLongV()));
break;
case FLOAT:
record.addColumn(new DoubleColumn(field.getFloatV()));
break;
case DOUBLE:
record.addColumn(new DoubleColumn(field.getDoubleV()));
break;
case STRING:
case TEXT:
record.addColumn(new StringColumn(field.getStringValue()));
break;
case DATE:
record.addColumn(new DateColumn(Date.valueOf(field.getDateV())));
break;
default:
System.out.println("类型错误"+field.getDataType());
}
}
recordSender.sendToWriter(record);
}
} catch (StatementExecutionException | IoTDBConnectionException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -0,0 +1,28 @@
package com.alibaba.datax.plugin.reader.iotdbreader;
import com.alibaba.datax.common.spi.ErrorCode;
public class IoTDBReaderErrorCode implements ErrorCode {
private final String code;
private final String description;
public IoTDBReaderErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s].", this.code, this.description);
}
}

View File

@ -0,0 +1,10 @@
{
"name": "iotdbreader",
"class": "com.alibaba.datax.plugin.reader.iotdbreader.IoTDBReader",
"description": {
"useScene": "data migration to iotdb",
"mechanism": "use iotdb-java-session to write data."
},
"developer": "lihaoran-Timecho"
}

View File

@ -0,0 +1,23 @@
{
"name": "iotdbreader",
"parameter": {
"user": "",
"password": "",
"connection": [
{
"table": [
""
],
"sessionUrl": [
""
]
}
],
"column": [
""
],
"beginDateTime": "",
"endDateTime": "",
"where": ""
}
}

View File

102
iotdbwriter/pom.xml Normal file
View File

@ -0,0 +1,102 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>iotdbwriter</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<iotdb.session.version>1.3.3-SNAPSHOT</iotdb.session.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>${iotdb.session.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>node-commons</artifactId>
<version>${iotdb.session.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12.4</version>
<configuration>
<!-- 包含哪些测试用例 -->
<includes>
<include>**/*Test.java</include>
</includes>
<!-- 不包含哪些测试用例 -->
<excludes>
</excludes>
<testFailureIgnore>true</testFailureIgnore>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,34 @@
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/iotdbwriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>iotdbwriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/iotdbwriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/iotdbwriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,165 @@
package com.alibaba.datax.plugin.writer.iotdbwriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import org.apache.iotdb.isession.util.Version;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.tsfile.enums.TSDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
public class IoTDBWriter extends Writer {
public static class Job extends Writer.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig;
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
// TODO 检查配置文件参数
}
@Override
public void destroy() {
}
@Override
public List<Configuration> split(int mandatoryNumber) {
// TODO 根据什么拆分Task
List<Configuration> configs = new ArrayList<>();
for (int i = 0; i < mandatoryNumber; i++) {
configs.add(originalConfig);
}
return configs;
}
}
public static class Task extends Writer.Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private Configuration writerSliceConfig;
private TaskPluginCollector taskPluginCollector;
private Session session;
@Override
public void init() {
this.writerSliceConfig = getPluginJobConf();
this.taskPluginCollector = super.getTaskPluginCollector();
// session init
session =
new Session.Builder()
.host("192.168.150.100")
.port(6667)
.username("root")
.password("root")
.version(Version.V_0_13)
.build();
// open session, close RPCCompression
try {
session.open(false);
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
}
// set session fetchSize
session.setFetchSize(10000);
// // 先删除已有的时间序列
// if (session.checkTimeseriesExists(device + ".**")) {
// session.deleteTimeseries(device + ".**");
// System.out.println("删除已有的时间序列完成==============");
// }
}
@Override
public void destroy() {
try {
if (session != null){
session.close();
}
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
}
}
@Override
public void startWrite(RecordReceiver lineReceiver) {
// 暂时实现往一个设备中插入数据(也就是类似一个表)
// 插入1条的原因是这里的只读了一次
Record record = null;
for (int count = 1; (record = lineReceiver.getFromReader()) != null; count++) {
System.out.println(record);
int columnNums = record.getColumnNumber();
// 先实现一条条插入
String device = "root.test.device2";
List<String> measurements = new ArrayList<>(); // TODO 这个好像没传过来
List<TSDataType> types = new ArrayList<>();
// List<Object> values = new ArrayList<>();
List<String> values = new ArrayList<>();
for (int i = 0; i < columnNums; i++) {
measurements.add("ss" + i); // 没传过来先用这个
Column column = record.getColumn(i);
// values.add(column.getRawData());
values.add(column.getRawData().toString());
// TODO 需要测试一下
switch (column.getType()) {
case BOOL:
types.add(TSDataType.BOOLEAN);
break;
case INT:
types.add(TSDataType.INT32);
break;
case LONG:
types.add(TSDataType.INT64);
break;
case DOUBLE:
types.add(TSDataType.DOUBLE);
break;
case STRING:
types.add(TSDataType.STRING);
break;
case DATE:
types.add(TSDataType.DATE);
break;
default:
throw new RuntimeException("unsupported type:" + column.getType());
}
}
long time = System.currentTimeMillis();
try {
// // 创建测点时间序列
// session.createMultiTimeseries(
// paths, tsDataTypes, tsEncodings, compressionTypes, null, null, attributesList, null);
// 这个插入失败(报错)
// WARN o.a.i.d.u.ErrorHandlingUtils:65 -
// Status code: EXECUTE_STATEMENT_ERROR(301), operation: insertRecord failed
// java.lang.ArrayIndexOutOfBoundsException: 11
// session.insertRecord(device, time, measurements, types, values);
// 这个插入成功record读取一次只有一条数据需要循环读取
session.insertRecord(device, time, measurements,values);
} catch (IoTDBConnectionException | StatementExecutionException e) {
throw new RuntimeException(e);
}
// TODO 构建List批量写入
// session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList);
}
}
}
}

View File

@ -0,0 +1,29 @@
package com.alibaba.datax.plugin.writer.iotdbwriter;
import com.alibaba.datax.common.spi.ErrorCode;
public class IoTDBWriterErrorCode implements ErrorCode {
private final String code;
private final String description;
public IoTDBWriterErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s].", this.code, this.description);
}
}

View File

@ -0,0 +1,9 @@
{
"name": "iotdbwriter",
"class": "com.alibaba.datax.plugin.writer.iotdbwriter.IoTDBWriter",
"description": {
"useScene": "data migration to iotdb",
"mechanism": "use iotdb-java-session to write data."
},
"developer": "lihaoran-Timecho"
}

View File

@ -0,0 +1,20 @@
{
"name": "iotdbwriter",
"parameter": {
"username": "root",
"password": "toy123",
"column": [
""
],
"connection": [
{
"table": [
""
],
"jdbcUrl": ""
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}