IoTDBReader基本实现

This commit is contained in:
李浩然 2024-06-24 11:42:21 +08:00
parent 65f704b777
commit 50068e2b8e
4 changed files with 167 additions and 74 deletions

View File

@ -3,10 +3,8 @@ package com.alibaba.datax.plugin.reader.iotdbreader;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.ErrorCode;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.isession.util.Version;
import org.apache.iotdb.rpc.IoTDBConnectionException;
@ -18,27 +16,96 @@ import org.apache.tsfile.read.common.RowRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.sql.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class IoTDBReader extends Reader {
public static class Job extends Reader.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig;
private Configuration jobConf;
/**
* Job对象初始化工作
*/
@Override
public void init() {
// TODO 配置文件还没规划格式
// 通过super.getPluginJobConf()获取与本插件相关的配置
// 读插件获得配置中reader部分写插件获得writer部分
this.originalConfig = super.getPluginJobConf();
// TODO 检查各种参数是否正确
this.jobConf = super.getPluginJobConf();
// 检查各种参数是否正确
String username = this.jobConf.getString(Key.USERNAME);
if (username == null || username.isEmpty()) {
throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.USERNAME + "] is not set.");
}
String password = this.jobConf.getString(Key.PASSWORD);
if (password == null || password.isEmpty()) {
throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PASSWORD + "] is not set.");
}
String host = this.jobConf.getString(Key.HOST);
if (host == null || host.isEmpty()) {
throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.HOST + "] is not set.");
}
String port = this.jobConf.getString(Key.PORT);
if (port == null || port.isEmpty()) {
throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PORT + "] is not set.");
}
String fetchSize = this.jobConf.getString(Key.FETCH_SIZE);
if (fetchSize == null || fetchSize.isEmpty()) {
throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.FETCH_SIZE + "] is not set.");
}
// 还有一部分参数没检查没必要了
}
@Override
public void prepare() {
}
/**
* 将Job拆分Task
* @param adviceNumber 框架建议的拆分数一般是运行时所配置的并发度
* @return Task的配置列表一个配置文件对应一个task
*/
@Override
public List<Configuration> split(int adviceNumber) {
// 每个config对应一个task
List<Configuration> configs = new ArrayList<>();
List<String> queryList = this.jobConf.getList(Key.FINAL_SQLS, String.class);
if (queryList == null || queryList.size() == 0){
Configuration clone = this.jobConf.clone();
// TODO 同时读取多个设备有没有必要
String device = this.jobConf.getString(Key.DEVICE);
// 测点是一个逗号分隔的字符串或"*"
String measurements = this.jobConf.getString(Key.MEASUREMENTS);
String beginDateTime = this.jobConf.getString(Key.BEGIN_DATETIME);
String endDateTime = this.jobConf.getString(Key.END_DATETIME);
String where = this.jobConf.getString(Key.WHERE);
StringBuilder sb = new StringBuilder();
sb.append("select ").append(measurements);
sb.append(" from ").append(device);
sb.append(" where ");
if (beginDateTime != null && !beginDateTime.isEmpty()){
sb.append("time >= ").append(beginDateTime);
}
if (endDateTime != null && !endDateTime.isEmpty()){
sb.append(" and time <= ").append(endDateTime);
}
if (where != null && !where.isEmpty()){
sb.append(" and ").append(where);
}
clone.set(Key.QUERY_SQL, sb.toString());
configs.add(clone);
//TODO DataX中是单线程实际上底层session中是多线程读取根据什么条件切分多线程
}else{
// 直接读取最终SQL
for (String query : queryList) {
Configuration clone = this.jobConf.clone();
clone.remove(Key.FINAL_SQLS);
clone.set(Key.QUERY_SQL, query);
configs.add(clone);
}
}
LOG.info("configs: {}", configs);
return configs;
}
/**
@ -50,36 +117,14 @@ public class IoTDBReader extends Reader {
}
/**
* 全局的后置工作比如mysqlwriter同步完影子表后的rename操作
* 全局的后置工作
*/
@Override
public void post() {
}
/**
* 将Job拆分Task
* @param adviceNumber 框架建议的拆分数一般是运行时所配置的并发度
* @return Task的配置列表
*/
@Override
public List<Configuration> split(int adviceNumber) {
// TODO 暂时拆分为adviceNumber个不知道是怎么切割的后序需要继续测试
// TODO DEBUG看看是不是一个配置对应一个Task一个Task启动配置文件中的连接执行一个查询
// 本机增加100个配置文件写入txt生成100个txt文件运行如下
//任务启动时刻 : 2024-06-19 16:21:13
// 任务结束时刻 : 2024-06-19 16:21:24
// 任务总计耗时 : 10s
// 任务平均流量 : 42.93MB/s
// 记录写入速度 : 90010rec/s
// 读出记录总数 : 900100
// 读写失败总数 : 0
List<Configuration> configurations = new ArrayList<>();
for (int i = 0; i < 100; i++){
configurations.add(this.originalConfig);
}
return configurations;
}
}
public static class Task extends Reader.Task {
@ -88,18 +133,32 @@ public class IoTDBReader extends Reader {
private Configuration readerSliceConfig;
private String mandatoryEncoding;
/**
* IoTDB原生读写工具
*/
private Session session;
/**
* IoTDB中的时间列插入的位置默认为0即第一列
*/
private int timeColumnPosition;
/**
* 最终的查询SQL交给session执行
*/
private String querySql;
@Override
public void init() {
// 获取与本Task相关的配置是Job的split方法返回的配置列表中的其中一个
Configuration taskConf = super.getPluginJobConf();
// session init
session =
new Session.Builder()
.host("192.168.150.100")
.port(6667)
.username("root")
.password("root")
.version(Version.V_0_13)
.host(taskConf.getString(Key.HOST))
.port(taskConf.getInt(Key.PORT))
.username(taskConf.getString(Key.USERNAME))
.password(taskConf.getString(Key.PASSWORD))
.version(Version.valueOf(taskConf.getString(Key.VERSION)))
.build();
// open session, close RPCCompression
try {
@ -109,11 +168,15 @@ public class IoTDBReader extends Reader {
}
// set session fetchSize
session.setFetchSize(10000);
session.setFetchSize(taskConf.getInt(Key.FETCH_SIZE));
this.timeColumnPosition = (taskConf.getInt(Key.TIME_COLUMN_POSITION) == null) ? 0 : taskConf.getInt(Key.TIME_COLUMN_POSITION);
this.querySql = taskConf.getString(Key.QUERY_SQL);
}
@Override
public void destroy() {
// Task自身的销毁工作
try {
if (session != null){
session.close();
@ -123,29 +186,30 @@ public class IoTDBReader extends Reader {
}
}
/**
* 从数据源读数据写入到RecordSender中
* @param recordSender 把数据写入连接Reader和Writer的缓存队列
*/
@Override
public void startRead(RecordSender recordSender) {
try {
// TODO 把流程调通后把SQL语句抽出去
// SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1");
SessionDataSet dataSet = session.executeQueryStatement("select * from root.cgn.device");
// System.out.println(dataSet.getColumnNames());
List<String> columnTypes = dataSet.getColumnTypes();
// System.out.println(columnTypes);
int columnNums = columnTypes.size();
// dataSet.setFetchSize(1024);
SessionDataSet dataSet = session.executeQueryStatement(this.querySql);
while (dataSet.hasNext()) {
RowRecord rowRecord = dataSet.next();
// 将iotdb中的行 转为datax中的record
// DataX中的行record
Record record = recordSender.createRecord();
// time列直接处理
long timestamp = rowRecord.getTimestamp();
record.addColumn(new LongColumn(timestamp));
// IoTDB中的行
RowRecord rowRecord = dataSet.next();
List<Field> fields = rowRecord.getFields();
// 其他列遍历类型后转换
for (Field field : fields) {
// 除time列外的其他列遍历类型后转换
for (int i = 0; i < fields.size(); i++) {
if (i == timeColumnPosition){
// time列插入指定位置
long timestamp = rowRecord.getTimestamp();
record.addColumn(new LongColumn(timestamp));
}
Field field = fields.get(i);
TSDataType dataType = field.getDataType();
// null类型暂时转为字符串
// null类型暂时转为字符串 TODO 有没有其他处理方式
if (dataType == null) {
record.addColumn(new StringColumn("null"));
continue;
@ -166,6 +230,7 @@ public class IoTDBReader extends Reader {
record.addColumn(new DoubleColumn(field.getFloatV()));
break;
case DOUBLE:
// TODO 为什么DataX推荐用String区别是什么
record.addColumn(new DoubleColumn(field.getDoubleV()));
break;
case STRING:
@ -176,9 +241,11 @@ public class IoTDBReader extends Reader {
record.addColumn(new DateColumn(Date.valueOf(field.getDateV())));
break;
default:
System.out.println("类型错误"+field.getDataType());
// TODO 其他类型怎么处理
LOG.info("类型错误:"+ field.getDataType());
}
}
// 发送
recordSender.sendToWriter(record);
}
} catch (StatementExecutionException | IoTDBConnectionException e) {

View File

@ -2,11 +2,17 @@ package com.alibaba.datax.plugin.reader.iotdbreader;
import com.alibaba.datax.common.spi.ErrorCode;
public class IoTDBReaderErrorCode implements ErrorCode {
public enum IoTDBReaderErrorCode implements ErrorCode {
REQUIRED_VALUE("IoTDBReader-00", "parameter value is missing"),
ILLEGAL_VALUE("IoTDBReader-01", "invalid parameter value"),
CONNECTION_FAILED("IoTDBReader-02", "connection error"),
RUNTIME_EXCEPTION("IoTDBWriter-03", "runtime exception");
private final String code;
private final String description;
public IoTDBReaderErrorCode(String code, String description) {
IoTDBReaderErrorCode(String code, String description) {
this.code = code;
this.description = description;
}

View File

@ -0,0 +1,18 @@
package com.alibaba.datax.plugin.reader.iotdbreader;
public class Key {
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String HOST = "host";
public static final String PORT = "port";
public static final String FETCH_SIZE = "fetchSize";
public static final String VERSION = "version";
public static final String FINAL_SQLS = "finalSqls";
public static final String QUERY_SQL = "querySql";
public static final String TIME_COLUMN_POSITION = "timeColumnPosition";
public static final String DEVICE = "device";
public static final String MEASUREMENTS = "measurements";
public static final String BEGIN_DATETIME = "beginDateTime";
public static final String END_DATETIME = "endDateTime";
public static final String WHERE = "where";
}

View File

@ -1,23 +1,25 @@
{
"name": "iotdbreader",
"parameter": {
"user": "",
"password": "",
"connection": [
{
"table": [
""
],
"sessionUrl": [
""
]
}
"username": "root",
"password": "root",
"host": "192.168.150.100",
"port": 6667,
"fetchSize": 10000,
"version": "V_1_0",
"timeColumnPosition": 0,
"finalSqls": [
"写了这个默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句",
"select * from root.cgn.device",
"select * from root.cgn.device",
"select * from root.cgn.device",
"select * from root.cgn.device",
"select * from root.cgn.device"
],
"column": [
""
],
"beginDateTime": "",
"endDateTime": "",
"device": "root.cgn.device",
"measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU",
"beginDateTime": "2023-03-07 12:00:00",
"endDateTime": "2024-03-07 19:00:00",
"where": ""
}
}