按照修改意见进行了修改

This commit is contained in:
李浩然 2024-07-19 15:08:29 +08:00
parent a58f58f3e6
commit 5001809dd9
15 changed files with 202 additions and 213 deletions

View File

@ -28,8 +28,8 @@ public class TestCreateData {
// session init // session init
session = session =
new Session.Builder() new Session.Builder()
.host("192.168.150.100") // .host("192.168.150.100")
// .host("172.20.31.6") .host("172.20.31.61")
.port(6667) .port(6667)
.username("root") .username("root")
.password("root") .password("root")

View File

@ -12,18 +12,16 @@
"parameter": { "parameter": {
"username": "root", "username": "root",
"password": "root", "password": "root",
"host": "192.168.150.100", "host": "172.20.31.61",
"port": 6667, "port": 6667,
"fetchSize": 10000, "fetchSize": 10000,
"version": "V_1_0", "version": "V_1_0",
"timeColumnPosition": 0, "timeColumnPosition": 0,
"finalSqls":[ "querySqls":[
], ],
"device": "root.cgn.device", "device": "root.cgn.device",
"measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
"beginDateTime": "2023-03-07 12:00:00", "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00"
"endDateTime": "2024-03-07 19:00:00",
"where": ""
} }
}, },
"writer": { "writer": {
@ -32,7 +30,7 @@
"username": "root", "username": "root",
"password": "toy123", "password": "toy123",
"writeMode": "insert", "writeMode": "insert",
"#需要提前建表": "CREATE TABLE device (`time` BIGINT,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);", "#需要提前建表": "CREATE TABLE device (`time` DATETIME,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);",
"column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], "column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
"session": [ "session": [
"set session sql_mode='ANSI'" "set session sql_mode='ANSI'"
@ -45,7 +43,7 @@
"table": [ "table": [
"device" "device"
], ],
"#": "下面的URL需要把中括号去掉否则报错mysqlreader的bug未修改", "#": "下面的URL需要把中括号去掉否则报错mysqlwriter的bug未修改",
"jdbcUrl": "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" "jdbcUrl": "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
} }
] ]

View File

@ -21,14 +21,12 @@
"fetchSize": 10000, "fetchSize": 10000,
"version": "V_1_0", "version": "V_1_0",
"timeColumnPosition": 0, "timeColumnPosition": 0,
"finalSqls":[ "querySqls":[
], ],
"device": "root.cgn.device", "device": "root.cgn.device",
"measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
"beginDateTime": "2023-03-07 12:00:00", "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00"
"endDateTime": "2024-03-07 19:00:00",
"where": ""
} }
}, },
"writer": { "writer": {

View File

@ -12,20 +12,18 @@
"parameter": { "parameter": {
"username": "root", "username": "root",
"password": "root", "password": "root",
"host": "192.168.150.100", "host": "172.20.31.61",
"port": 6667, "port": 6667,
"fetchSize": 10000, "fetchSize": 10000,
"version": "V_1_0", "version": "V_1_0",
"timeColumnPosition": 0, "timeColumnPosition": 0,
"finalSqls":[ "querySqls":[
"select * from root.cgn.device", "select * from root.cgn.device",
"select A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU from root.cgn.device" "select A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU from root.cgn.device"
], ],
"device": "root.cgn.device", "device": "root.cgn.device",
"measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
"beginDateTime": "2023-03-07 12:00:00", "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00"
"endDateTime": "2024-03-07 19:00:00",
"where": ""
} }
}, },
"writer": { "writer": {

View File

@ -26,16 +26,17 @@
"parameter": { "parameter": {
"username": "root", "username": "root",
"password": "root", "password": "root",
"host": "192.168.150.100", "host": "172.20.31.61",
"port": 6667, "port": 6667,
"fetchSize": 10000, "fetchSize": 10000,
"version": "V_1_0", "version": "V_1_0",
"##": "Reader中时间列的位置默认0列",
"timeColumnPosition": 0, "timeColumnPosition": 0,
"insertBatchSize": 1000, "insertBatchSize": 1000,
"device": "root.cgn.device", "device": "root.cgn.device",
"measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
"deleteExistTimeseries": false "preSql": [
"delete timeseries root.cgn.device.**"
]
} }
} }
} }

View File

@ -33,14 +33,15 @@ IoTDBReader 通过 IoTDB 的 原生java session 查询获取数据。
"port": 6667, "port": 6667,
"fetchSize": 10000, "fetchSize": 10000,
"version": "V_1_0", "version": "V_1_0",
"##": "时间列插入DataX的Record中的位置默认第0列",
"timeColumnPosition": 0, "timeColumnPosition": 0,
"finalSqls":[ "##":"写了querySqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句",
"querySqls":[
], ],
"device": "root.cgn.device", "device": "root.cgn.device",
"measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", "##":"时间列不属于测点",
"beginDateTime": "2023-03-07 12:00:00", "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
"endDateTime": "2024-03-07 19:00:00", "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00"
"where": ""
} }
}, },
"writer": { "writer": {
@ -49,7 +50,7 @@ IoTDBReader 通过 IoTDB 的 原生java session 查询获取数据。
"username": "root", "username": "root",
"password": "toy123", "password": "toy123",
"writeMode": "insert", "writeMode": "insert",
"#需要提前建表": "CREATE TABLE device (`time` BIGINT,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);", "#需要提前建表": "CREATE TABLE device (`time` DATETIME,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);",
"column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], "column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
"session": [ "session": [
"set session sql_mode='ANSI'" "set session sql_mode='ANSI'"
@ -96,14 +97,13 @@ IoTDBReader 通过 IoTDB 的 原生java session 查询获取数据。
"fetchSize": 10000, "fetchSize": 10000,
"version": "V_1_0", "version": "V_1_0",
"timeColumnPosition": 0, "timeColumnPosition": 0,
"finalSqls":[ "##":"写了querySqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句",
"querySqls":[
"select * from root.cgn.device", "select * from root.cgn.device",
"select A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU from root.cgn.device" "select A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU from root.cgn.device"
], ],
"device": "", "device": "",
"measurements": "", "measurements": "",
"beginDateTime": "",
"endDateTime": "",
"where": "" "where": ""
} }
}, },
@ -147,42 +147,34 @@ IoTDBReader 通过 IoTDB 的 原生java session 查询获取数据。
* 描述时间列在Record中列的位置 * 描述时间列在Record中列的位置
* 必选:否 * 必选:否
* 默认值0 * 默认值0
* finalSqls * querySqls
* 描述直接写多行SQL可以并行读取此时下面的参数失效。 * 描述直接写多行SQL可以并行读取此时下面的参数失效。
* 必选:否 * 必选:否
* 默认值: * 默认值:
* device * device
* 描述IoTDB中的概念可理解为mysql中的表。 * 描述IoTDB中的概念可理解为mysql中的表。
* 必选:finalSqls为空时必选 * 必选:querySqls为空时必选
* 默认值:无 * 默认值:无
* measurements * measurements
* 描述IoTDB中的概念可理解为mysql中的字段。 * 描述IoTDB中的概念可理解为mysql中的字段。
* 必选finalSqls为空时必选 * 必选querySqls为空时必选
* 默认值:无
* beginDateTime
* 描述SQL查询时的数据的开始时间
* 必选finalSqls为空时必选
* 默认值:无
* measurements
* 描述SQL查询时的数据的结束时间
* 必选:否
* 默认值:无 * 默认值:无
* where * where
* 描述:额外的条件 * 描述:查询条件
* 必选:否 * 必选:否
* 默认值:无 * 默认值:无
### 3.3 类型转换 ### 3.3 类型转换
| IoTDB 数据类型 | DataX 内部类型 | | IoTDB 数据类型 | DataX 内部类型 |
|-----------------|------------| |-----------------|--------|
| INT32 | Int | | INT32 | Int |
| INT64,TIMESTAMP | Long | | INT64,TIMESTAMP | Long |
| FLOAT | FLOAT | | FLOAT | FLOAT |
| DOUBLE | Double | | DOUBLE | Double |
| BOOLEAN | Bool | | BOOLEAN | Bool |
| DATE | Date | | DATE | Date |
| STRING,TEXT | String | | STRING,TEXT | String |
## 4 性能报告 ## 4 性能报告

View File

@ -2,7 +2,9 @@ package com.alibaba.datax.plugin.reader.iotdbreader;
import com.alibaba.datax.common.element.*; import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.AbstractTaskPlugin;
import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.spi.Reader; import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import org.apache.iotdb.isession.SessionDataSet; import org.apache.iotdb.isession.SessionDataSet;
@ -13,6 +15,7 @@ import org.apache.iotdb.session.Session;
import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.Field; import org.apache.tsfile.read.common.Field;
import org.apache.tsfile.read.common.RowRecord; import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.read.common.block.column.NullColumn;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -69,42 +72,31 @@ public class IoTDBReader extends Reader {
public List<Configuration> split(int adviceNumber) { public List<Configuration> split(int adviceNumber) {
// 每个config对应一个task // 每个config对应一个task
List<Configuration> configs = new ArrayList<>(); List<Configuration> configs = new ArrayList<>();
List<String> queryList = this.jobConf.getList(Key.FINAL_SQLS, String.class); List<String> queryList = this.jobConf.getList(Key.QUERY_SQLS, String.class);
if (queryList == null || queryList.size() == 0){ if (queryList == null || queryList.size() == 0){
Configuration clone = this.jobConf.clone(); Configuration clone = this.jobConf.clone();
// TODO 同时读取多个设备有没有必要
String device = this.jobConf.getString(Key.DEVICE); String device = this.jobConf.getString(Key.DEVICE);
// 测点是一个逗号分隔的测点字符串或"*" List<String> measurements = this.jobConf.getList(Key.MEASUREMENTS, String.class);
String measurements = this.jobConf.getString(Key.MEASUREMENTS);
String beginDateTime = this.jobConf.getString(Key.BEGIN_DATETIME);
String endDateTime = this.jobConf.getString(Key.END_DATETIME);
String where = this.jobConf.getString(Key.WHERE); String where = this.jobConf.getString(Key.WHERE);
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("select ").append(measurements); sb.append("select ").append(String.join(",", measurements));
sb.append(" from ").append(device); sb.append(" from ").append(device);
sb.append(" where ");
if (beginDateTime != null && !beginDateTime.isEmpty()){
sb.append("time >= ").append(beginDateTime);
}
if (endDateTime != null && !endDateTime.isEmpty()){
sb.append(" and time <= ").append(endDateTime);
}
if (where != null && !where.isEmpty()){ if (where != null && !where.isEmpty()){
sb.append(" and ").append(where); sb.append(" where ").append(where);
} }
clone.set(Key.QUERY_SQL, sb.toString()); clone.set(Key.QUERY_SQL, sb.toString());
configs.add(clone); configs.add(clone);
//TODO DataX中是单线程实际上底层session中是多线程读取根据什么条件切分多线程 //DataX中一个查询是单线程实际上底层session中是多线程读取
}else{ }else{
// 直接读取最终SQL // 直接读取最终SQL
for (String query : queryList) { for (String query : queryList) {
Configuration clone = this.jobConf.clone(); Configuration clone = this.jobConf.clone();
clone.remove(Key.FINAL_SQLS); clone.remove(Key.QUERY_SQLS);
clone.set(Key.QUERY_SQL, query); clone.set(Key.QUERY_SQL, query);
configs.add(clone); configs.add(clone);
} }
} }
LOG.info("configs: {}", configs); // LOG.info("configs: {}", configs);
return configs; return configs;
} }
@ -144,6 +136,7 @@ public class IoTDBReader extends Reader {
* 最终的查询SQL交给session执行 * 最终的查询SQL交给session执行
*/ */
private String querySql; private String querySql;
private TaskPluginCollector taskPluginCollector;
@Override @Override
public void init() { public void init() {
@ -171,6 +164,7 @@ public class IoTDBReader extends Reader {
this.timeColumnPosition = (taskConf.getInt(Key.TIME_COLUMN_POSITION) == null) ? 0 : taskConf.getInt(Key.TIME_COLUMN_POSITION); this.timeColumnPosition = (taskConf.getInt(Key.TIME_COLUMN_POSITION) == null) ? 0 : taskConf.getInt(Key.TIME_COLUMN_POSITION);
this.querySql = taskConf.getString(Key.QUERY_SQL); this.querySql = taskConf.getString(Key.QUERY_SQL);
taskPluginCollector = super.getTaskPluginCollector();
} }
@Override @Override
@ -181,7 +175,7 @@ public class IoTDBReader extends Reader {
session.close(); session.close();
} }
} catch (IoTDBConnectionException e) { } catch (IoTDBConnectionException e) {
throw new RuntimeException(e); LOG.info(e.getMessage());
} }
} }
@ -199,58 +193,60 @@ public class IoTDBReader extends Reader {
// IoTDB中的行 // IoTDB中的行
RowRecord rowRecord = dataSet.next(); RowRecord rowRecord = dataSet.next();
List<Field> fields = rowRecord.getFields(); List<Field> fields = rowRecord.getFields();
// 除time列外的其他列遍历类型后转换 try {
for (int i = 0; i < fields.size(); i++) { // 除time列外的其他列遍历类型后转换
if (i == timeColumnPosition){ for (int i = 0; i < fields.size(); i++) {
// time列插入指定位置 if (i == timeColumnPosition){
long timestamp = rowRecord.getTimestamp(); // time列插入指定位置时间列不在fields中需要单独处理不能为null
record.addColumn(new LongColumn(timestamp)); long timestamp = rowRecord.getTimestamp();
} record.addColumn(new DateColumn(timestamp));
Field field = fields.get(i); }
TSDataType dataType = field.getDataType(); Field field = fields.get(i);
// null类型暂时转为字符串 TODO 有没有其他处理方式 TSDataType dataType = field.getDataType();
if (dataType == null) { if (dataType == null) {
record.addColumn(new StringColumn("null")); // 需要写插件支持处理null数据否则会空指向异常这里先当成脏数据
continue; // record.addColumn(null);
} // continue;
switch (dataType) { throw new RuntimeException("null datatype");
// TODO 把所有数据类型都测一遍 }
case BOOLEAN: switch (dataType) {
record.addColumn(new BoolColumn(field.getBoolV())); case BOOLEAN:
break; record.addColumn(new BoolColumn(field.getBoolV()));
case INT32: break;
record.addColumn(new LongColumn(field.getIntV())); case INT32:
break; record.addColumn(new LongColumn(field.getIntV()));
case INT64: break;
case TIMESTAMP: case INT64:
record.addColumn(new LongColumn(field.getLongV())); case TIMESTAMP:
break; record.addColumn(new LongColumn(field.getLongV()));
case FLOAT: break;
record.addColumn(new DoubleColumn(field.getFloatV())); case FLOAT:
break; record.addColumn(new DoubleColumn(field.getFloatV()));
case DOUBLE: break;
// TODO 为什么DataX推荐用String区别是什么 case DOUBLE:
record.addColumn(new DoubleColumn(field.getDoubleV())); record.addColumn(new DoubleColumn(field.getDoubleV()));
break; break;
case STRING: case STRING:
case TEXT: case TEXT:
record.addColumn(new StringColumn(field.getStringValue())); record.addColumn(new StringColumn(field.getStringValue()));
break; break;
case DATE: case DATE:
record.addColumn(new DateColumn(Date.valueOf(field.getDateV()))); record.addColumn(new DateColumn(Date.valueOf(field.getDateV())));
break; break;
default: default:
// TODO 其他类型怎么处理 throw new RuntimeException("Unsupported data type: " + dataType);
LOG.info("类型错误:"+ field.getDataType()); }
} }
// 发送
recordSender.sendToWriter(record);
}catch (RuntimeException e){
LOG.info(e.getMessage());
this.taskPluginCollector.collectDirtyRecord(record, e);
} }
// 发送
recordSender.sendToWriter(record);
} }
} catch (StatementExecutionException | IoTDBConnectionException e) { } catch (StatementExecutionException | IoTDBConnectionException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
} }
} }

View File

@ -7,12 +7,10 @@ public class Key {
public static final String PORT = "port"; public static final String PORT = "port";
public static final String FETCH_SIZE = "fetchSize"; public static final String FETCH_SIZE = "fetchSize";
public static final String VERSION = "version"; public static final String VERSION = "version";
public static final String FINAL_SQLS = "finalSqls"; public static final String QUERY_SQLS = "querySqls";
public static final String QUERY_SQL = "querySql"; public static final String QUERY_SQL = "querySql";
public static final String TIME_COLUMN_POSITION = "timeColumnPosition"; public static final String TIME_COLUMN_POSITION = "timeColumnPosition";
public static final String DEVICE = "device"; public static final String DEVICE = "device";
public static final String MEASUREMENTS = "measurements"; public static final String MEASUREMENTS = "measurements";
public static final String BEGIN_DATETIME = "beginDateTime";
public static final String END_DATETIME = "endDateTime";
public static final String WHERE = "where"; public static final String WHERE = "where";
} }

View File

@ -5,6 +5,6 @@
"useScene": "data migration to iotdb", "useScene": "data migration to iotdb",
"mechanism": "use iotdb-java-session to write data." "mechanism": "use iotdb-java-session to write data."
}, },
"developer": "lihaoran-Timecho" "developer": "timecho.com"
} }

View File

@ -9,8 +9,8 @@
"version": "V_1_0", "version": "V_1_0",
"##": "时间列插入DataX的Record中的位置默认第0列", "##": "时间列插入DataX的Record中的位置默认第0列",
"timeColumnPosition": 0, "timeColumnPosition": 0,
"##":"写了finalSqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句", "##":"写了querySqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句",
"finalSqls": [ "querySqls": [
"select * from root.cgn.device", "select * from root.cgn.device",
"select * from root.cgn.device", "select * from root.cgn.device",
"select * from root.cgn.device", "select * from root.cgn.device",
@ -18,10 +18,8 @@
"select * from root.cgn.device" "select * from root.cgn.device"
], ],
"device": "root.cgn.device", "device": "root.cgn.device",
"measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", "##":"时间列不属于测点",
"##": "开始时间必填否则SQL错误", "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
"beginDateTime": "2023-03-07 12:00:00", "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00"
"endDateTime": "2024-03-07 19:00:00",
"where": ""
} }
} }

View File

@ -57,10 +57,11 @@ IoTDB中设备与列的概念见IoTDB官方文档。
"version": "V_1_0", "version": "V_1_0",
"##": "Reader中时间列的位置默认0列", "##": "Reader中时间列的位置默认0列",
"timeColumnPosition": 0, "timeColumnPosition": 0,
"insertBatchSize": 1000, "batchSize": 1000,
"device": "root.cgn.device", "device": "root.cgn.device",
"measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
"deleteExistTimeseries": false "###": "在插入之前预先执行的SQL默认为空",
"preSql": []
} }
} }
} }
@ -112,10 +113,10 @@ IoTDB中设备与列的概念见IoTDB官方文档。
* 描述每batchSize条record为一个batch进行写入 * 描述每batchSize条record为一个batch进行写入
* 必选:否 * 必选:否
* 默认值1000 * 默认值1000
* deleteExistTimeseries * preSql
* 描述:插入前是否删除该device下的所有数据 * 描述:插入前是否预先执行SQL
* 必选:否 * 必选:否
* 默认值:false * 默认值:
### 3.3 类型转换 ### 3.3 类型转换

View File

@ -15,6 +15,7 @@ import org.apache.tsfile.enums.TSDataType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -61,7 +62,7 @@ public class IoTDBWriter extends Writer {
Configuration clone = this.jobConf.clone(); Configuration clone = this.jobConf.clone();
configs.add(clone); configs.add(clone);
} }
LOG.info("configs: {}", configs); // LOG.info("configs: {}", configs);
return configs; return configs;
} }
@ -74,26 +75,26 @@ public class IoTDBWriter extends Writer {
public static class Task extends Writer.Task { public static class Task extends Writer.Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class); private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private Configuration taskConf; private Configuration taskConf;
/**
* IoTDB原生读写工具 // IoTDB原生读写工具
*/
private Session session; private Session session;
/** // 是否在插入前删除已有的时间序列""表示不执行
* 是否在插入前删除已有的时间序列默认false // private String deleteBeforeInsert;
*/
private boolean deleteExistTimeseries;
/** // 插入批次大小
* 插入批次大小
*/
private int insertBatchSize; private int insertBatchSize;
/** // IoTDB中的时间列插入的位置默认为0即第一列
* IoTDB中的时间列插入的位置默认为0即第一列
*/
private int timeColumnPosition; private int timeColumnPosition;
// 处理脏数据
private TaskPluginCollector taskPluginCollector;
// 预先执行的SQL语句
private List<String> preSqls;
@Override @Override
public void init() { public void init() {
// 获取与本Task相关的配置是Job的split方法返回的配置列表中的其中一个 // 获取与本Task相关的配置是Job的split方法返回的配置列表中的其中一个
@ -116,28 +117,27 @@ public class IoTDBWriter extends Writer {
} }
// 获取参数否则默认值 // 获取参数否则默认值
insertBatchSize = (taskConf.getInt(Key.INSERT_BATCH_SIZE) == null) ? 1000 : taskConf.getInt(Key.INSERT_BATCH_SIZE); insertBatchSize = (taskConf.getInt(Key.BATCH_SIZE) == null) ? 1000 : taskConf.getInt(Key.BATCH_SIZE);
timeColumnPosition = (taskConf.getInt(Key.TIME_COLUMN_POSITION) == null) ? 0 : taskConf.getInt(Key.TIME_COLUMN_POSITION); timeColumnPosition = (taskConf.getInt(Key.TIME_COLUMN_POSITION) == null) ? 0 : taskConf.getInt(Key.TIME_COLUMN_POSITION);
deleteExistTimeseries = (taskConf.getBool(Key.DELETE_EXIST_TIMESERIES) == null) ? false : taskConf.getBool(Key.DELETE_EXIST_TIMESERIES); preSqls = (taskConf.getList(Key.PRE_SQL, String.class) == null) ? new ArrayList<>() : taskConf.getList(Key.PRE_SQL, String.class);
taskPluginCollector = super.getTaskPluginCollector();
} }
@Override @Override
public void prepare() { public void prepare() {
// 是否先删除已有的时间序列 if (preSqls.size() != 0){
try { for (String sql : preSqls) {
if (deleteExistTimeseries){ try {
if (session.checkTimeseriesExists(taskConf.getString(Key.DEVICE) + ".**")) { session.executeNonQueryStatement(sql);
session.deleteTimeseries(taskConf.getString(Key.DEVICE) + ".**");
LOG.info("===========删除已有的时间序列完成=============="); } catch (IoTDBConnectionException | StatementExecutionException e) {
}else { throw new RuntimeException(e);
LOG.info("===========不存在已有时间序列==============");
} }
} }
} catch (IoTDBConnectionException | StatementExecutionException e) { LOG.info("=======Complated preSqls=======");
throw new RuntimeException(e);
} }
// 是否创建测点时间序列不需要IoTDB会自动创建时间序列 // IoTDB会自动创建时间序列无需提前创建
} }
@Override @Override
@ -147,7 +147,7 @@ public class IoTDBWriter extends Writer {
session.close(); session.close();
} }
} catch (IoTDBConnectionException e) { } catch (IoTDBConnectionException e) {
throw new RuntimeException(e); LOG.info(e.getMessage());
} }
} }
@ -170,68 +170,74 @@ public class IoTDBWriter extends Writer {
// 获取Record记录传输结束返回null // 获取Record记录传输结束返回null
int count; // 统计插入记录数 int count; // 统计插入记录数
for (count = 0; (record = lineReceiver.getFromReader()) != null; count++) { for (count = 0; (record = lineReceiver.getFromReader()) != null; count++) {
// LOG.info("record:" + record);
// 处理时间列 // 处理时间列
timestamps.add(record.getColumn(timeColumnPosition).asLong()); timestamps.add(record.getColumn(timeColumnPosition).asLong());
// 处理测点 // 处理测点
List<String> measurements = Arrays.asList(taskConf.getString(Key.MEASUREMENTS).split(",")); List<String> measurements = taskConf.getList(Key.MEASUREMENTS, String.class);
measurementsList.add(measurements); measurementsList.add(measurements);
// 处理类型和值 // 处理类型和值
List<TSDataType> types = new ArrayList<>(); List<TSDataType> types = new ArrayList<>();
List<Object> values = new ArrayList<>(); List<Object> values = new ArrayList<>();
// List<String> values = new ArrayList<>(); try{
for (int i = 0; i < record.getColumnNumber(); i++) { for (int i = 0; i < record.getColumnNumber(); i++) {
if (i == timeColumnPosition){ if (i == timeColumnPosition){
continue; // 跳过时间列 continue; // 跳过时间列
} }
Column col = record.getColumn(i); Column col = record.getColumn(i);
switch (col.getType()) { switch (col.getType()) {
case BOOL: case BOOL:
types.add(TSDataType.BOOLEAN); types.add(TSDataType.BOOLEAN);
values.add(col.asBoolean()); values.add(col.asBoolean());
break; break;
case INT: case INT:
types.add(TSDataType.INT32); types.add(TSDataType.INT32);
values.add((Integer) col.getRawData()); values.add((Integer) col.getRawData());
break; break;
case LONG: case LONG:
types.add(TSDataType.INT64); types.add(TSDataType.INT64);
values.add(col.asLong()); values.add(col.asLong());
break; break;
case DOUBLE: case DOUBLE:
types.add(TSDataType.DOUBLE); types.add(TSDataType.DOUBLE);
values.add(col.asDouble()); values.add(col.asDouble());
break; break;
case STRING: case NULL:
types.add(TSDataType.STRING); // IoTDB可以处理null
values.add(col.asString()); types.add(null);
break; values.add(null);
case DATE: break;
types.add(TSDataType.DATE); case STRING:
values.add(col.asDate()); types.add(TSDataType.STRING);
break; values.add(col.asString());
default: break;
throw new RuntimeException("unsupported type:" + col.getType()); case DATE:
types.add(TSDataType.DATE);
values.add(col.asDate());
break;
case BAD:
default:
throw new RuntimeException("unsupported type:" + col.getType());
}
} }
typesList.add(types);
valuesList.add(values);
}catch (RuntimeException e){
LOG.info(e.getMessage());
taskPluginCollector.collectDirtyRecord(record, e);
} }
typesList.add(types);
valuesList.add(values);
if (count != 0 && count % insertBatchSize == 0) { if (count != 0 && count % insertBatchSize == 0) {
session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList); session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList);
LOG.info("已插入"+count+"条数据");
measurementsList.clear();
valuesList.clear();
typesList.clear();
timestamps.clear(); timestamps.clear();
measurementsList.clear();
typesList.clear();
valuesList.clear(); valuesList.clear();
} }
} }
if (!timestamps.isEmpty()){ if (!timestamps.isEmpty()){
session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList); session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList);
LOG.info("已插入剩余数据:" + timestamps.size() + "");
} }
LOG.info("已插入所有数据:" + (count-1) + ""); LOG.info("========= task all data inserted, total record: " + (count-1));
}catch (IoTDBConnectionException | StatementExecutionException e) { }catch (IoTDBConnectionException | StatementExecutionException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }

View File

@ -9,6 +9,6 @@ public class Key {
public static final String TIME_COLUMN_POSITION = "timeColumnPosition"; public static final String TIME_COLUMN_POSITION = "timeColumnPosition";
public static final String DEVICE = "device"; public static final String DEVICE = "device";
public static final String MEASUREMENTS = "measurements"; public static final String MEASUREMENTS = "measurements";
public static final String DELETE_EXIST_TIMESERIES = "deleteExistTimeseries"; public static final String PRE_SQL = "preSql";
public static final String INSERT_BATCH_SIZE = "insertBatchSize"; public static final String BATCH_SIZE = "batchSize";
} }

View File

@ -5,5 +5,5 @@
"useScene": "data migration to iotdb", "useScene": "data migration to iotdb",
"mechanism": "use iotdb-java-session to write data." "mechanism": "use iotdb-java-session to write data."
}, },
"developer": "lihaoran-Timecho" "developer": "timecho.com"
} }

View File

@ -8,9 +8,12 @@
"version": "V_1_0", "version": "V_1_0",
"##": "注意是Reader插件读取到的数据中时间列的位置不是该插件默认0列", "##": "注意是Reader插件读取到的数据中时间列的位置不是该插件默认0列",
"timeColumnPosition": 0, "timeColumnPosition": 0,
"insertBatchSize": 1000, "batchSize": 1000,
"device": "root.cgn.device", "device": "root.cgn.device",
"measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU", "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
"deleteExistTimeseries": false "###": "在插入之前,执行删除操作,为空或不配置表示不执行",
"preSql": [
"delete timeseries root.cgn.device.**"
]
} }
} }