mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 20:21:16 +08:00
实现DataX读写IoTDB,并通过测试。
This commit is contained in:
parent
50068e2b8e
commit
4d514fb1df
85
datax-example/datax-example-iotdb/pom.xml
Normal file
85
datax-example/datax-example-iotdb/pom.xml
Normal file
@ -0,0 +1,85 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>datax-example</artifactId>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>datax-example-iotdb</artifactId>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>8</maven.compiler.source>
|
||||
<maven.compiler.target>8</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>iotdbreader</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>iotdbwriter</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>datax-example-core</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>streamreader</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>streamwriter</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>txtfilereader</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>txtfilewriter</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>mysqlreader</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>mysqlwriter</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>mysql</groupId>
|
||||
<artifactId>mysql-connector-java</artifactId>
|
||||
<version>8.0.31</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
@ -0,0 +1,147 @@
|
||||
package com.alibaba.datax.example.iotdb;
|
||||
|
||||
import org.apache.iotdb.isession.util.Version;
|
||||
import org.apache.iotdb.rpc.IoTDBConnectionException;
|
||||
import org.apache.iotdb.rpc.StatementExecutionException;
|
||||
import org.apache.iotdb.rpc.TSStatusCode;
|
||||
import org.apache.iotdb.session.Session;
|
||||
import org.apache.tsfile.enums.TSDataType;
|
||||
import org.apache.tsfile.file.metadata.enums.CompressionType;
|
||||
import org.apache.tsfile.file.metadata.enums.TSEncoding;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.*;
|
||||
|
||||
public class TestCreateData {
|
||||
private static Session session;
|
||||
private static Random random = new Random();
|
||||
|
||||
@Test
|
||||
public void createAndInsert()
|
||||
throws IoTDBConnectionException, StatementExecutionException {
|
||||
// 创建测试数据
|
||||
// session init
|
||||
session =
|
||||
new Session.Builder()
|
||||
.host("192.168.150.100")
|
||||
// .host("172.20.31.6")
|
||||
.port(6667)
|
||||
.username("root")
|
||||
.password("root")
|
||||
.version(Version.V_0_13)
|
||||
.build();
|
||||
|
||||
// open session, close RPCCompression
|
||||
session.open(false);
|
||||
|
||||
// set session fetchSize
|
||||
session.setFetchSize(10000);
|
||||
|
||||
// 创建测点并插入数据
|
||||
String filePath = "src/test/resources/testData.txt";
|
||||
String database = "root.cgn";
|
||||
try {
|
||||
session.createDatabase(database);
|
||||
} catch (StatementExecutionException e) {
|
||||
if (e.getStatusCode() != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
String device = "root.cgn.device";
|
||||
createAndInsert2(filePath, device);
|
||||
}
|
||||
|
||||
private static void createAndInsert2(String filePath, String device)
|
||||
throws IoTDBConnectionException, StatementExecutionException {
|
||||
// 读取文件(文件中无表头)
|
||||
// 点的类型 点名 描述 量纲 量程下限 量程上限
|
||||
// AX L2KRT008MA 测试性描述 % 0 1.00E+02
|
||||
// AX L2ETY101MP 测试性描述 % 0 1.00E+02
|
||||
List<List<String>> res = new ArrayList<>();
|
||||
try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {
|
||||
String line;
|
||||
while ((line = br.readLine()) != null) {
|
||||
String[] words = line.split("\\s+");
|
||||
List<String> wordList = new ArrayList<>(Arrays.asList(words));
|
||||
res.add(wordList);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
// 准备传入的参数,构造时间序列
|
||||
List<String> paths = new ArrayList<>();
|
||||
List<String> measurements = new ArrayList<>();
|
||||
List<Boolean> isDoubleList = new ArrayList<>();
|
||||
List<TSDataType> tsDataTypes = new ArrayList<>();
|
||||
List<TSEncoding> tsEncodings = new ArrayList<>();
|
||||
List<CompressionType> compressionTypes = new ArrayList<>();
|
||||
List<Map<String, String>> tagsList = new ArrayList<>();
|
||||
List<Map<String, String>> attributesList = new ArrayList<>();
|
||||
List<String> alias = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < res.size(); i++) {
|
||||
measurements.add(res.get(i).get(1));
|
||||
paths.add(device + "." + res.get(i).get(1));
|
||||
boolean isDouble = "AX".equals(res.get(i).get(0));
|
||||
isDoubleList.add(isDouble);
|
||||
tsDataTypes.add(isDouble ? TSDataType.DOUBLE : TSDataType.BOOLEAN);
|
||||
tsEncodings.add(isDouble ? TSEncoding.GORILLA : TSEncoding.RLE);
|
||||
compressionTypes.add(CompressionType.SNAPPY);
|
||||
Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("描述", "测试性描述");
|
||||
attributes.put("量纲", isDouble ? "%" : "");
|
||||
attributes.put("量程下限", "0");
|
||||
attributes.put("量程上限", isDouble ? "1.00E+02" : "2.00E+02");
|
||||
attributesList.add(attributes);
|
||||
}
|
||||
|
||||
// 先删除已有的时间序列
|
||||
if (session.checkTimeseriesExists(device + ".**")) {
|
||||
session.deleteTimeseries(device + ".**");
|
||||
System.out.println("删除已有的时间序列完成==============");
|
||||
}
|
||||
|
||||
// 创建测点时间序列
|
||||
session.createMultiTimeseries(
|
||||
paths, tsDataTypes, tsEncodings, compressionTypes, null, null, attributesList, null);
|
||||
|
||||
// 插入数据:每个测点里都写1万条数据,时间间隔1秒
|
||||
List<List<String>> measurementsList = new ArrayList<>();
|
||||
List<List<Object>> valuesList = new ArrayList<>();
|
||||
List<Long> timestamps = new ArrayList<>();
|
||||
List<List<TSDataType>> typesList = new ArrayList<>();
|
||||
|
||||
long startTime =
|
||||
LocalDateTime.of(2024, 1, 1, 0, 0, 0, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
|
||||
int count = 10000; // 每个测点插入数据条数
|
||||
|
||||
for (long time = startTime; count >= 0; time += 1000, count--) {
|
||||
timestamps.add(time);
|
||||
measurementsList.add(measurements); // 39个测点
|
||||
typesList.add(tsDataTypes);
|
||||
|
||||
List<Object> randomValue = new ArrayList<>();
|
||||
for (Boolean isDouble : isDoubleList) {
|
||||
randomValue.add(isDouble ? random.nextDouble() * 100 : random.nextBoolean());
|
||||
}
|
||||
valuesList.add(randomValue); // 39个随机数
|
||||
|
||||
// 每1000次插入一批数据
|
||||
if (count != 10000 && count % 1000 == 0) {
|
||||
session.insertRecordsOfOneDevice(
|
||||
device, timestamps, measurementsList, typesList, valuesList);
|
||||
measurementsList.clear();
|
||||
valuesList.clear();
|
||||
typesList.clear();
|
||||
timestamps.clear();
|
||||
valuesList.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,64 @@
|
||||
package com.alibaba.datax.example.iotdb;
|
||||
|
||||
import com.alibaba.datax.example.ExampleContainer;
|
||||
import com.alibaba.datax.example.util.PathUtil;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestIoTDB {
|
||||
|
||||
@Test
|
||||
public void testIoTDBReader2MySQLWriter() {
|
||||
String path = "/iotdb2mysql.json";
|
||||
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
|
||||
ExampleContainer.start(jobPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMySQLReader2IoTDBWriter() {
|
||||
String path = "/mysql2iotdb.json";
|
||||
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
|
||||
ExampleContainer.start(jobPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIoTDBReader2txtWriter() {
|
||||
String path = "/iotdb2txt.json";
|
||||
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
|
||||
ExampleContainer.start(jobPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIoTDBReader2StreamWriter() {
|
||||
String path = "/iotdb2stream.json";
|
||||
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
|
||||
ExampleContainer.start(jobPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMySQLReader2StreamWriter() {
|
||||
String path = "/mysql2stream.json";
|
||||
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
|
||||
ExampleContainer.start(jobPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMySQLReader2txtWriter() {
|
||||
String path = "/mysql2txt.json";
|
||||
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
|
||||
ExampleContainer.start(jobPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStreamReader2TxtWriter() {
|
||||
String path = "/stream2txt.json";
|
||||
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
|
||||
ExampleContainer.start(jobPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStreamReader2StreamWriter() {
|
||||
String path = "/stream2stream.json";
|
||||
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
|
||||
ExampleContainer.start(jobPath);
|
||||
}
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
{
|
||||
"job": {
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 3
|
||||
}
|
||||
},
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "iotdbreader",
|
||||
"parameter": {
|
||||
"username": "root",
|
||||
"password": "root",
|
||||
"host": "192.168.150.100",
|
||||
"port": 6667,
|
||||
"fetchSize": 10000,
|
||||
"version": "V_1_0",
|
||||
"timeColumnPosition": 0,
|
||||
"finalSqls":[
|
||||
],
|
||||
"device": "root.cgn.device",
|
||||
"measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU",
|
||||
"beginDateTime": "2023-03-07 12:00:00",
|
||||
"endDateTime": "2024-03-07 19:00:00",
|
||||
"where": ""
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "mysqlwriter",
|
||||
"parameter": {
|
||||
"username": "root",
|
||||
"password": "toy123",
|
||||
"writeMode": "insert",
|
||||
"#需要提前建表": "CREATE TABLE device (`time` BIGINT,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);",
|
||||
"column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
|
||||
"session": [
|
||||
"set session sql_mode='ANSI'"
|
||||
],
|
||||
"preSql": [
|
||||
"delete from device"
|
||||
],
|
||||
"connection": [
|
||||
{
|
||||
"table": [
|
||||
"device"
|
||||
],
|
||||
"#": "下面的URL需要把中括号去掉,否则报错,mysqlreader的bug,未修改",
|
||||
"jdbcUrl": "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,44 @@
|
||||
{
|
||||
"job": {
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 3
|
||||
},
|
||||
"errorLimit": {
|
||||
"record": 0,
|
||||
"percentage": 0.02
|
||||
}
|
||||
},
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "iotdbreader",
|
||||
"parameter": {
|
||||
"username": "root",
|
||||
"password": "root",
|
||||
"host": "192.168.150.100",
|
||||
"port": 6667,
|
||||
"fetchSize": 10000,
|
||||
"version": "V_1_0",
|
||||
"timeColumnPosition": 0,
|
||||
"finalSqls":[
|
||||
|
||||
],
|
||||
"device": "root.cgn.device",
|
||||
"measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU",
|
||||
"beginDateTime": "2023-03-07 12:00:00",
|
||||
"endDateTime": "2024-03-07 19:00:00",
|
||||
"where": ""
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "streamwriter",
|
||||
"parameter": {
|
||||
"print":true
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,44 @@
|
||||
{
|
||||
"job": {
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 3
|
||||
}
|
||||
},
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "iotdbreader",
|
||||
"parameter": {
|
||||
"username": "root",
|
||||
"password": "root",
|
||||
"host": "192.168.150.100",
|
||||
"port": 6667,
|
||||
"fetchSize": 10000,
|
||||
"version": "V_1_0",
|
||||
"timeColumnPosition": 0,
|
||||
"finalSqls":[
|
||||
"select * from root.cgn.device",
|
||||
"select A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU from root.cgn.device"
|
||||
],
|
||||
"device": "root.cgn.device",
|
||||
"measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU",
|
||||
"beginDateTime": "2023-03-07 12:00:00",
|
||||
"endDateTime": "2024-03-07 19:00:00",
|
||||
"where": ""
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "txtfilewriter",
|
||||
"parameter": {
|
||||
"path": "D:/下载",
|
||||
"fileName": "txtText",
|
||||
"writeMode": "truncate",
|
||||
"dateFormat": "yyyy-MM-dd"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,49 @@
|
||||
{
|
||||
"job": {
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "mysqlreader",
|
||||
"parameter": {
|
||||
"username": "root",
|
||||
"password": "toy123",
|
||||
"column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
|
||||
"splitPk": "",
|
||||
"connection": [
|
||||
{
|
||||
"table": [
|
||||
"device"
|
||||
],
|
||||
"jdbcUrl": [
|
||||
"jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "iotdbwriter",
|
||||
"parameter": {
|
||||
"username": "root",
|
||||
"password": "root",
|
||||
"host": "192.168.150.100",
|
||||
"port": 6667,
|
||||
"fetchSize": 10000,
|
||||
"version": "V_1_0",
|
||||
"##": "Reader中时间列的位置,默认0列",
|
||||
"timeColumnPosition": 0,
|
||||
"insertBatchSize": 1000,
|
||||
"device": "root.cgn.device",
|
||||
"measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU",
|
||||
"deleteExistTimeseries": false
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 3
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,46 @@
|
||||
{
|
||||
"job": {
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 3
|
||||
},
|
||||
"errorLimit": {
|
||||
"record": 0,
|
||||
"percentage": 0.02
|
||||
}
|
||||
},
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "mysqlreader",
|
||||
"parameter": {
|
||||
"username": "root",
|
||||
"password": "toy123",
|
||||
"column": [
|
||||
"order_num",
|
||||
"cust_id"
|
||||
],
|
||||
"splitPk": "",
|
||||
"connection": [
|
||||
{
|
||||
"table": [
|
||||
"orders"
|
||||
],
|
||||
"jdbcUrl": [
|
||||
"jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "streamwriter",
|
||||
"parameter": {
|
||||
"print":true
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,41 @@
|
||||
{
|
||||
"job": {
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "mysqlreader",
|
||||
"parameter": {
|
||||
"username": "root",
|
||||
"password": "toy123",
|
||||
"column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
|
||||
"splitPk": "",
|
||||
"connection": [
|
||||
{
|
||||
"table": [
|
||||
"device"
|
||||
],
|
||||
"jdbcUrl": [
|
||||
"jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "txtfilewriter",
|
||||
"parameter": {
|
||||
"path": "D:/下载",
|
||||
"fileName": "txtText",
|
||||
"writeMode": "truncate",
|
||||
"dateFormat": "yyyy-MM-dd"
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 5
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
{
|
||||
"job": {
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "streamreader",
|
||||
"parameter": {
|
||||
"sliceRecordCount": 10,
|
||||
"column": [
|
||||
{
|
||||
"type": "long",
|
||||
"value": "15"
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"value": "hello,你好,世界-DataX"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "streamwriter",
|
||||
"parameter": {
|
||||
"encoding": "UTF-8",
|
||||
"print": true
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 5
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,38 @@
|
||||
{
|
||||
"job": {
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "streamreader",
|
||||
"parameter": {
|
||||
"sliceRecordCount": 10,
|
||||
"column": [
|
||||
{
|
||||
"type": "long",
|
||||
"value": "10"
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"value": "hello,你好,世界-DataX"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "txtfilewriter",
|
||||
"parameter": {
|
||||
"path": "D:/下载",
|
||||
"fileName": "txtText",
|
||||
"writeMode": "truncate",
|
||||
"dateFormat": "yyyy-MM-dd"
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 5
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,39 @@
|
||||
AX L2KRT008MA 测试性描述 % 0 1.00E+02
|
||||
AX L2ETY101MP 测试性描述 % 0 1.00E+02
|
||||
AX L2SEC002MD 测试性描述 % 0 1.00E+02
|
||||
AX L1ETY103MP 测试性描述 % 0 1.00E+02
|
||||
AX D1KRT003EU 测试性描述 % 0 1.00E+02
|
||||
AX L4EAS008MDX_XQ01 测试性描述 % 0 1.00E+02
|
||||
AX L3RIC039KMX_XQ01 测试性描述 % 0 1.00E+02
|
||||
AX L2VVP008MP 测试性描述 % 0 1.00E+02
|
||||
AX D2RCV039MD 测试性描述 % 0 1.00E+02
|
||||
DX D2RPA063EC 测试性描述 0 2.00E+00
|
||||
AX L4KRT022MAK_XP01 测试性描述 % 0 1.00E+02
|
||||
AX L3SEC002MPX_XQ01 测试性描述 % 0 1.00E+02
|
||||
DX L4RCP651KSX_XG52 测试性描述 0 2.00E+00
|
||||
AX L2RRI082MT 测试性描述 % 0 1.00E+02
|
||||
AX A5STD 测试性描述 % 0 1.00E+02
|
||||
DX D2RRI003PO 测试性描述 0 2.00E+00
|
||||
AX L4RCP091MT_XQ01 测试性描述 % 0 1.00E+02
|
||||
DX L1GCT132SM5 测试性描述 0 2.00E+00
|
||||
DX L1GCT132SM3 测试性描述 0 2.00E+00
|
||||
AX L2RIC035MT 测试性描述 % 0 1.00E+02
|
||||
AX L1RGL018QM 测试性描述 % 0 1.00E+02
|
||||
AX L1RIC037MT 测试性描述 % 0 1.00E+02
|
||||
AX D2RCP042MD 测试性描述 % 0 1.00E+02
|
||||
AX D2RGL001QM 测试性描述 % 0 1.00E+02
|
||||
AX D2RIC020MT 测试性描述 % 0 1.00E+02
|
||||
AX D1RIC022MT 测试性描述 % 0 1.00E+02
|
||||
AX D1RGL003QM 测试性描述 % 0 1.00E+02
|
||||
AX L3LGE005TU_XQ01 测试性描述 % 0 1.00E+02
|
||||
DX D2RCP517EC 测试性描述 0 2.00E+00
|
||||
AX D1RCP054MD 测试性描述 % 0 1.00E+02
|
||||
AX L2RIS014MD 测试性描述 % 0 1.00E+02
|
||||
DX D1LGE001JA 测试性描述 0 2.00E+00
|
||||
DX D2SEC001PO 测试性描述 0 2.00E+00
|
||||
DX D1SEC003PO 测试性描述 0 2.00E+00
|
||||
AX D1RIS001MD 测试性描述 % 0 1.00E+02
|
||||
AX L1RCP090MT 测试性描述 % 0 1.00E+02
|
||||
DX L2VVP003SM5 测试性描述 0 2.00E+00
|
||||
AX L3RIC022KMX_XQ01 测试性描述 % 0 1.00E+02
|
||||
DX L1GCT501EC 测试性描述 0 2.00E+00
|
@ -74,7 +74,7 @@ public class IoTDBReader extends Reader {
|
||||
Configuration clone = this.jobConf.clone();
|
||||
// TODO 同时读取多个设备?有没有必要?
|
||||
String device = this.jobConf.getString(Key.DEVICE);
|
||||
// 测点是一个逗号分隔的字符串或"*"
|
||||
// 测点是一个逗号分隔的测点字符串或"*"
|
||||
String measurements = this.jobConf.getString(Key.MEASUREMENTS);
|
||||
String beginDateTime = this.jobConf.getString(Key.BEGIN_DATETIME);
|
||||
String endDateTime = this.jobConf.getString(Key.END_DATETIME);
|
||||
@ -130,8 +130,7 @@ public class IoTDBReader extends Reader {
|
||||
public static class Task extends Reader.Task {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
|
||||
|
||||
private Configuration readerSliceConfig;
|
||||
private String mandatoryEncoding;
|
||||
private Configuration taskConf;
|
||||
|
||||
/**
|
||||
* IoTDB原生读写工具
|
||||
@ -149,7 +148,7 @@ public class IoTDBReader extends Reader {
|
||||
@Override
|
||||
public void init() {
|
||||
// 获取与本Task相关的配置,是Job的split方法返回的配置列表中的其中一个。
|
||||
Configuration taskConf = super.getPluginJobConf();
|
||||
taskConf = super.getPluginJobConf();
|
||||
|
||||
// session init
|
||||
session =
|
||||
|
@ -6,8 +6,7 @@ public enum IoTDBReaderErrorCode implements ErrorCode {
|
||||
|
||||
REQUIRED_VALUE("IoTDBReader-00", "parameter value is missing"),
|
||||
ILLEGAL_VALUE("IoTDBReader-01", "invalid parameter value"),
|
||||
CONNECTION_FAILED("IoTDBReader-02", "connection error"),
|
||||
RUNTIME_EXCEPTION("IoTDBWriter-03", "runtime exception");
|
||||
CONNECTION_FAILED("IoTDBReader-02", "connection error");
|
||||
|
||||
private final String code;
|
||||
private final String description;
|
||||
|
@ -7,9 +7,10 @@
|
||||
"port": 6667,
|
||||
"fetchSize": 10000,
|
||||
"version": "V_1_0",
|
||||
"##": "时间列插入DataX的Record中的位置,默认第0列",
|
||||
"timeColumnPosition": 0,
|
||||
"##":"写了finalSqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句",
|
||||
"finalSqls": [
|
||||
"写了这个默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句",
|
||||
"select * from root.cgn.device",
|
||||
"select * from root.cgn.device",
|
||||
"select * from root.cgn.device",
|
||||
@ -18,6 +19,7 @@
|
||||
],
|
||||
"device": "root.cgn.device",
|
||||
"measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU",
|
||||
"##": "开始时间必填,否则SQL错误",
|
||||
"beginDateTime": "2023-03-07 12:00:00",
|
||||
"endDateTime": "2024-03-07 19:00:00",
|
||||
"where": ""
|
||||
|
@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.writer.iotdbwriter;
|
||||
|
||||
import com.alibaba.datax.common.element.Column;
|
||||
import com.alibaba.datax.common.element.Record;
|
||||
import com.alibaba.datax.common.exception.DataXException;
|
||||
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||
import com.alibaba.datax.common.plugin.TaskPluginCollector;
|
||||
import com.alibaba.datax.common.spi.Writer;
|
||||
@ -15,56 +16,101 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
public class IoTDBWriter extends Writer {
|
||||
|
||||
public static class Job extends Writer.Job {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
|
||||
private Configuration originalConfig;
|
||||
private Configuration jobConf;
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
this.originalConfig = super.getPluginJobConf();
|
||||
// TODO 检查配置文件参数
|
||||
|
||||
this.jobConf = super.getPluginJobConf();
|
||||
// 检查各种参数是否正确
|
||||
String username = this.jobConf.getString(Key.USERNAME);
|
||||
if (username == null || username.isEmpty()) {
|
||||
throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.USERNAME + "] is not set.");
|
||||
}
|
||||
String password = this.jobConf.getString(Key.PASSWORD);
|
||||
if (password == null || password.isEmpty()) {
|
||||
throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PASSWORD + "] is not set.");
|
||||
}
|
||||
String host = this.jobConf.getString(Key.HOST);
|
||||
if (host == null || host.isEmpty()) {
|
||||
throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.HOST + "] is not set.");
|
||||
}
|
||||
String port = this.jobConf.getString(Key.PORT);
|
||||
if (port == null || port.isEmpty()) {
|
||||
throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PORT + "] is not set.");
|
||||
}
|
||||
String fetchSize = this.jobConf.getString(Key.FETCH_SIZE);
|
||||
if (fetchSize == null || fetchSize.isEmpty()) {
|
||||
throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.FETCH_SIZE + "] is not set.");
|
||||
}
|
||||
// 还有一部分参数没检查,没必要了。
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
|
||||
public void prepare(){
|
||||
// 写入前准备,IOTDB不需要提前创建表。
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Configuration> split(int mandatoryNumber) {
|
||||
// TODO 根据什么拆分Task?
|
||||
List<Configuration> configs = new ArrayList<>();
|
||||
// 根据源端划分多个task,每个写task对应一个读task,并行插入下放到session批次处理。
|
||||
for (int i = 0; i < mandatoryNumber; i++) {
|
||||
configs.add(originalConfig);
|
||||
Configuration clone = this.jobConf.clone();
|
||||
configs.add(clone);
|
||||
}
|
||||
LOG.info("configs: {}", configs);
|
||||
return configs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Task extends Writer.Task {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
|
||||
private Configuration writerSliceConfig;
|
||||
private TaskPluginCollector taskPluginCollector;
|
||||
|
||||
private Configuration taskConf;
|
||||
/**
|
||||
* IoTDB原生读写工具
|
||||
*/
|
||||
private Session session;
|
||||
|
||||
/**
|
||||
* 是否在插入前删除已有的时间序列?默认false
|
||||
*/
|
||||
private boolean deleteExistTimeseries;
|
||||
|
||||
/**
|
||||
* 插入批次大小
|
||||
*/
|
||||
private int insertBatchSize;
|
||||
|
||||
/**
|
||||
* IoTDB中的时间列插入的位置,默认为0,即第一列。
|
||||
*/
|
||||
private int timeColumnPosition;
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
this.writerSliceConfig = getPluginJobConf();
|
||||
this.taskPluginCollector = super.getTaskPluginCollector();
|
||||
// 获取与本Task相关的配置,是Job的split方法返回的配置列表中的其中一个。
|
||||
this.taskConf = super.getPluginJobConf();
|
||||
|
||||
// session init
|
||||
session =
|
||||
new Session.Builder()
|
||||
.host("192.168.150.100")
|
||||
.port(6667)
|
||||
.username("root")
|
||||
.password("root")
|
||||
.version(Version.V_0_13)
|
||||
.host(taskConf.getString(Key.HOST))
|
||||
.port(taskConf.getInt(Key.PORT))
|
||||
.username(taskConf.getString(Key.USERNAME))
|
||||
.password(taskConf.getString(Key.PASSWORD))
|
||||
.version(Version.valueOf(taskConf.getString(Key.VERSION)))
|
||||
.build();
|
||||
// open session, close RPCCompression
|
||||
try {
|
||||
@ -74,13 +120,31 @@ public class IoTDBWriter extends Writer {
|
||||
}
|
||||
|
||||
// set session fetchSize
|
||||
session.setFetchSize(10000);
|
||||
session.setFetchSize(taskConf.getInt(Key.FETCH_SIZE));
|
||||
|
||||
// // 先删除已有的时间序列
|
||||
// if (session.checkTimeseriesExists(device + ".**")) {
|
||||
// session.deleteTimeseries(device + ".**");
|
||||
// System.out.println("删除已有的时间序列完成==============");
|
||||
// }
|
||||
// 获取参数,否则默认值
|
||||
insertBatchSize = (taskConf.getInt(Key.INSERT_BATCH_SIZE) == null) ? 1000 : taskConf.getInt(Key.INSERT_BATCH_SIZE);
|
||||
timeColumnPosition = (taskConf.getInt(Key.TIME_COLUMN_POSITION) == null) ? 0 : taskConf.getInt(Key.TIME_COLUMN_POSITION);
|
||||
deleteExistTimeseries = (taskConf.getBool(Key.DELETE_EXIST_TIMESERIES) == null) ? false : taskConf.getBool(Key.DELETE_EXIST_TIMESERIES);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepare() {
|
||||
// 是否先删除已有的时间序列
|
||||
try {
|
||||
if (deleteExistTimeseries){
|
||||
if (session.checkTimeseriesExists(taskConf.getString(Key.DEVICE) + ".**")) {
|
||||
session.deleteTimeseries(taskConf.getString(Key.DEVICE) + ".**");
|
||||
LOG.info("===========删除已有的时间序列完成==============");
|
||||
}else {
|
||||
LOG.info("===========不存在已有时间序列==============");
|
||||
}
|
||||
}
|
||||
} catch (IoTDBConnectionException | StatementExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
// TODO 是否创建测点时间序列?
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -94,72 +158,90 @@ public class IoTDBWriter extends Writer {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从RecordReceiver中读取数据,写入目标数据源。
|
||||
* @param lineReceiver 数据来自Reader和Writer之间的缓存队列。
|
||||
*/
|
||||
@Override
|
||||
public void startWrite(RecordReceiver lineReceiver) {
|
||||
// 暂时实现往一个设备中插入数据(也就是类似一个表)
|
||||
// 插入1条的原因是这里的只读了一次。
|
||||
// 往一个设备device中插入数据
|
||||
Record record = null;
|
||||
for (int count = 1; (record = lineReceiver.getFromReader()) != null; count++) {
|
||||
System.out.println(record);
|
||||
int columnNums = record.getColumnNumber();
|
||||
// 先实现一条条插入
|
||||
String device = "root.test.device2";
|
||||
List<String> measurements = new ArrayList<>(); // TODO 这个好像没传过来。
|
||||
List<TSDataType> types = new ArrayList<>();
|
||||
// List<Object> values = new ArrayList<>();
|
||||
List<String> values = new ArrayList<>();
|
||||
for (int i = 0; i < columnNums; i++) {
|
||||
measurements.add("ss" + i); // 没传过来先用这个
|
||||
Column column = record.getColumn(i);
|
||||
// values.add(column.getRawData());
|
||||
values.add(column.getRawData().toString());
|
||||
// TODO 需要测试一下
|
||||
switch (column.getType()) {
|
||||
case BOOL:
|
||||
types.add(TSDataType.BOOLEAN);
|
||||
break;
|
||||
case INT:
|
||||
types.add(TSDataType.INT32);
|
||||
break;
|
||||
case LONG:
|
||||
types.add(TSDataType.INT64);
|
||||
break;
|
||||
case DOUBLE:
|
||||
types.add(TSDataType.DOUBLE);
|
||||
break;
|
||||
case STRING:
|
||||
types.add(TSDataType.STRING);
|
||||
break;
|
||||
case DATE:
|
||||
types.add(TSDataType.DATE);
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("unsupported type:" + column.getType());
|
||||
try{
|
||||
// 构建List,一个设备批量写入
|
||||
String device = taskConf.getString(Key.DEVICE);
|
||||
List<Long> timestamps = new ArrayList<>();
|
||||
List<List<String>> measurementsList = new ArrayList<>();
|
||||
List<List<Object>> valuesList = new ArrayList<>();
|
||||
List<List<TSDataType>> typesList = new ArrayList<>();
|
||||
|
||||
// 获取Record记录,传输结束返回null
|
||||
int count; // 统计插入记录数
|
||||
for (count = 0; (record = lineReceiver.getFromReader()) != null; count++) {
|
||||
// LOG.info("record:" + record);
|
||||
// 处理时间列
|
||||
timestamps.add(record.getColumn(timeColumnPosition).asLong());
|
||||
// 处理测点
|
||||
List<String> measurements = Arrays.asList(taskConf.getString(Key.MEASUREMENTS).split(","));
|
||||
measurementsList.add(measurements);
|
||||
// 处理类型和值
|
||||
List<TSDataType> types = new ArrayList<>();
|
||||
List<Object> values = new ArrayList<>();
|
||||
// List<String> values = new ArrayList<>();
|
||||
for (int i = 0; i < record.getColumnNumber(); i++) {
|
||||
if (i == timeColumnPosition){
|
||||
continue; // 跳过时间列
|
||||
}
|
||||
Column col = record.getColumn(i);
|
||||
switch (col.getType()) {
|
||||
case BOOL:
|
||||
types.add(TSDataType.BOOLEAN);
|
||||
values.add(col.asBoolean());
|
||||
break;
|
||||
case INT:
|
||||
types.add(TSDataType.INT32);
|
||||
values.add((Integer) col.getRawData());
|
||||
break;
|
||||
case LONG:
|
||||
types.add(TSDataType.INT64);
|
||||
values.add(col.asLong());
|
||||
break;
|
||||
case DOUBLE:
|
||||
types.add(TSDataType.DOUBLE);
|
||||
values.add(col.asDouble());
|
||||
break;
|
||||
case STRING:
|
||||
types.add(TSDataType.STRING);
|
||||
values.add(col.asString());
|
||||
break;
|
||||
case DATE:
|
||||
types.add(TSDataType.DATE);
|
||||
values.add(col.asDate());
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("unsupported type:" + col.getType());
|
||||
}
|
||||
}
|
||||
typesList.add(types);
|
||||
valuesList.add(values);
|
||||
|
||||
if (count != 0 && count % insertBatchSize == 0) {
|
||||
session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList);
|
||||
LOG.info("已插入"+count+"条数据");
|
||||
measurementsList.clear();
|
||||
valuesList.clear();
|
||||
typesList.clear();
|
||||
timestamps.clear();
|
||||
valuesList.clear();
|
||||
}
|
||||
}
|
||||
long time = System.currentTimeMillis();
|
||||
try {
|
||||
|
||||
// // 创建测点时间序列
|
||||
// session.createMultiTimeseries(
|
||||
// paths, tsDataTypes, tsEncodings, compressionTypes, null, null, attributesList, null);
|
||||
|
||||
// 这个插入失败(报错)
|
||||
// WARN o.a.i.d.u.ErrorHandlingUtils:65 -
|
||||
// Status code: EXECUTE_STATEMENT_ERROR(301), operation: insertRecord failed
|
||||
// java.lang.ArrayIndexOutOfBoundsException: 11
|
||||
// session.insertRecord(device, time, measurements, types, values);
|
||||
// 这个插入成功,record读取一次只有一条数据,需要循环读取。
|
||||
session.insertRecord(device, time, measurements,values);
|
||||
} catch (IoTDBConnectionException | StatementExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
if (!timestamps.isEmpty()){
|
||||
session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList);
|
||||
LOG.info("已插入剩余数据:" + timestamps.size() + "条");
|
||||
}
|
||||
|
||||
// TODO 构建List,批量写入
|
||||
// session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList);
|
||||
|
||||
LOG.info("已插入所有数据:" + (count-1) + "条");
|
||||
}catch (IoTDBConnectionException | StatementExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2,12 +2,14 @@ package com.alibaba.datax.plugin.writer.iotdbwriter;
|
||||
|
||||
import com.alibaba.datax.common.spi.ErrorCode;
|
||||
|
||||
public class IoTDBWriterErrorCode implements ErrorCode {
|
||||
public enum IoTDBWriterErrorCode implements ErrorCode {
|
||||
|
||||
REQUIRED_VALUE("IoTDBWriter-00", "parameter value is missing");
|
||||
|
||||
private final String code;
|
||||
private final String description;
|
||||
|
||||
public IoTDBWriterErrorCode(String code, String description) {
|
||||
IoTDBWriterErrorCode(String code, String description) {
|
||||
this.code = code;
|
||||
this.description = description;
|
||||
}
|
||||
|
@ -0,0 +1,15 @@
|
||||
package com.alibaba.datax.plugin.writer.iotdbwriter;
|
||||
|
||||
public class Key {
|
||||
public static final String USERNAME = "username";
|
||||
public static final String PASSWORD = "password";
|
||||
public static final String HOST = "host";
|
||||
public static final String PORT = "port";
|
||||
public static final String FETCH_SIZE = "fetchSize";
|
||||
public static final String VERSION = "version";
|
||||
public static final String TIME_COLUMN_POSITION = "timeColumnPosition";
|
||||
public static final String DEVICE = "device";
|
||||
public static final String MEASUREMENTS = "measurements";
|
||||
public static final String DELETE_EXIST_TIMESERIES = "deleteExistTimeseries";
|
||||
public static final String INSERT_BATCH_SIZE = "insertBatchSize";
|
||||
}
|
@ -2,19 +2,16 @@
|
||||
"name": "iotdbwriter",
|
||||
"parameter": {
|
||||
"username": "root",
|
||||
"password": "toy123",
|
||||
"column": [
|
||||
""
|
||||
],
|
||||
"connection": [
|
||||
{
|
||||
"table": [
|
||||
""
|
||||
],
|
||||
"jdbcUrl": ""
|
||||
}
|
||||
],
|
||||
"batchSize": 1000,
|
||||
"ignoreTagsUnmatched": true
|
||||
"password": "root",
|
||||
"host": "192.168.150.100",
|
||||
"port": 6667,
|
||||
"fetchSize": 10000,
|
||||
"version": "V_1_0",
|
||||
"##": "注意是Reader插件读取到的数据中时间列的位置,不是该插件,默认0列",
|
||||
"timeColumnPosition": 0,
|
||||
"insertBatchSize": 1000,
|
||||
"device": "root.cgn.device",
|
||||
"measurements": "A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU",
|
||||
"deleteExistTimeseries": false
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user