mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 17:40:28 +08:00
Merge 153b2fcd65
into 0824b45c5e
This commit is contained in:
commit
6da8ae510c
85
datax-example/datax-example-iotdb/pom.xml
Normal file
85
datax-example/datax-example-iotdb/pom.xml
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<parent>
|
||||||
|
<artifactId>datax-example</artifactId>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<artifactId>datax-example-iotdb</artifactId>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<maven.compiler.source>8</maven.compiler.source>
|
||||||
|
<maven.compiler.target>8</maven.compiler.target>
|
||||||
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>iotdbreader</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>iotdbwriter</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>datax-example-core</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>streamreader</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>streamwriter</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>txtfilereader</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>txtfilewriter</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>mysqlreader</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>mysqlwriter</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>mysql</groupId>
|
||||||
|
<artifactId>mysql-connector-java</artifactId>
|
||||||
|
<version>8.0.31</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
</project>
|
@ -0,0 +1,147 @@
|
|||||||
|
package com.alibaba.datax.example.iotdb;
|
||||||
|
|
||||||
|
import org.apache.iotdb.isession.util.Version;
|
||||||
|
import org.apache.iotdb.rpc.IoTDBConnectionException;
|
||||||
|
import org.apache.iotdb.rpc.StatementExecutionException;
|
||||||
|
import org.apache.iotdb.rpc.TSStatusCode;
|
||||||
|
import org.apache.iotdb.session.Session;
|
||||||
|
import org.apache.tsfile.enums.TSDataType;
|
||||||
|
import org.apache.tsfile.file.metadata.enums.CompressionType;
|
||||||
|
import org.apache.tsfile.file.metadata.enums.TSEncoding;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.FileReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.ZoneOffset;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
public class TestCreateData {
|
||||||
|
private static Session session;
|
||||||
|
private static Random random = new Random();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void createAndInsert()
|
||||||
|
throws IoTDBConnectionException, StatementExecutionException {
|
||||||
|
// 创建测试数据
|
||||||
|
// session init
|
||||||
|
session =
|
||||||
|
new Session.Builder()
|
||||||
|
// .host("192.168.150.100")
|
||||||
|
.host("172.20.31.61")
|
||||||
|
.port(6667)
|
||||||
|
.username("root")
|
||||||
|
.password("root")
|
||||||
|
.version(Version.V_0_13)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// open session, close RPCCompression
|
||||||
|
session.open(false);
|
||||||
|
|
||||||
|
// set session fetchSize
|
||||||
|
session.setFetchSize(10000);
|
||||||
|
|
||||||
|
// 创建测点并插入数据
|
||||||
|
String filePath = "src/test/resources/testData.txt";
|
||||||
|
String database = "root.cgn";
|
||||||
|
try {
|
||||||
|
session.createDatabase(database);
|
||||||
|
} catch (StatementExecutionException e) {
|
||||||
|
if (e.getStatusCode() != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
String device = "root.cgn.device";
|
||||||
|
createAndInsert2(filePath, device);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void createAndInsert2(String filePath, String device)
|
||||||
|
throws IoTDBConnectionException, StatementExecutionException {
|
||||||
|
// 读取文件(文件中无表头)
|
||||||
|
// 点的类型 点名 描述 量纲 量程下限 量程上限
|
||||||
|
// AX L2KRT008MA 测试性描述 % 0 1.00E+02
|
||||||
|
// AX L2ETY101MP 测试性描述 % 0 1.00E+02
|
||||||
|
List<List<String>> res = new ArrayList<>();
|
||||||
|
try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {
|
||||||
|
String line;
|
||||||
|
while ((line = br.readLine()) != null) {
|
||||||
|
String[] words = line.split("\\s+");
|
||||||
|
List<String> wordList = new ArrayList<>(Arrays.asList(words));
|
||||||
|
res.add(wordList);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
// 准备传入的参数,构造时间序列
|
||||||
|
List<String> paths = new ArrayList<>();
|
||||||
|
List<String> measurements = new ArrayList<>();
|
||||||
|
List<Boolean> isDoubleList = new ArrayList<>();
|
||||||
|
List<TSDataType> tsDataTypes = new ArrayList<>();
|
||||||
|
List<TSEncoding> tsEncodings = new ArrayList<>();
|
||||||
|
List<CompressionType> compressionTypes = new ArrayList<>();
|
||||||
|
List<Map<String, String>> tagsList = new ArrayList<>();
|
||||||
|
List<Map<String, String>> attributesList = new ArrayList<>();
|
||||||
|
List<String> alias = new ArrayList<>();
|
||||||
|
|
||||||
|
for (int i = 0; i < res.size(); i++) {
|
||||||
|
measurements.add(res.get(i).get(1));
|
||||||
|
paths.add(device + "." + res.get(i).get(1));
|
||||||
|
boolean isDouble = "AX".equals(res.get(i).get(0));
|
||||||
|
isDoubleList.add(isDouble);
|
||||||
|
tsDataTypes.add(isDouble ? TSDataType.DOUBLE : TSDataType.BOOLEAN);
|
||||||
|
tsEncodings.add(isDouble ? TSEncoding.GORILLA : TSEncoding.RLE);
|
||||||
|
compressionTypes.add(CompressionType.SNAPPY);
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("描述", "测试性描述");
|
||||||
|
attributes.put("量纲", isDouble ? "%" : "");
|
||||||
|
attributes.put("量程下限", "0");
|
||||||
|
attributes.put("量程上限", isDouble ? "1.00E+02" : "2.00E+02");
|
||||||
|
attributesList.add(attributes);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 先删除已有的时间序列
|
||||||
|
if (session.checkTimeseriesExists(device + ".**")) {
|
||||||
|
session.deleteTimeseries(device + ".**");
|
||||||
|
System.out.println("删除已有的时间序列完成==============");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建测点时间序列
|
||||||
|
session.createMultiTimeseries(
|
||||||
|
paths, tsDataTypes, tsEncodings, compressionTypes, null, null, attributesList, null);
|
||||||
|
|
||||||
|
// 插入数据:每个测点里都写1万条数据,时间间隔1秒
|
||||||
|
List<List<String>> measurementsList = new ArrayList<>();
|
||||||
|
List<List<Object>> valuesList = new ArrayList<>();
|
||||||
|
List<Long> timestamps = new ArrayList<>();
|
||||||
|
List<List<TSDataType>> typesList = new ArrayList<>();
|
||||||
|
|
||||||
|
long startTime =
|
||||||
|
LocalDateTime.of(2024, 1, 1, 0, 0, 0, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
|
||||||
|
int count = 10000; // 每个测点插入数据条数
|
||||||
|
|
||||||
|
for (long time = startTime; count >= 0; time += 1000, count--) {
|
||||||
|
timestamps.add(time);
|
||||||
|
measurementsList.add(measurements); // 39个测点
|
||||||
|
typesList.add(tsDataTypes);
|
||||||
|
|
||||||
|
List<Object> randomValue = new ArrayList<>();
|
||||||
|
for (Boolean isDouble : isDoubleList) {
|
||||||
|
randomValue.add(isDouble ? random.nextDouble() * 100 : random.nextBoolean());
|
||||||
|
}
|
||||||
|
valuesList.add(randomValue); // 39个随机数
|
||||||
|
|
||||||
|
// 每1000次插入一批数据
|
||||||
|
if (count != 10000 && count % 1000 == 0) {
|
||||||
|
session.insertRecordsOfOneDevice(
|
||||||
|
device, timestamps, measurementsList, typesList, valuesList);
|
||||||
|
measurementsList.clear();
|
||||||
|
valuesList.clear();
|
||||||
|
typesList.clear();
|
||||||
|
timestamps.clear();
|
||||||
|
valuesList.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,64 @@
|
|||||||
|
package com.alibaba.datax.example.iotdb;
|
||||||
|
|
||||||
|
import com.alibaba.datax.example.ExampleContainer;
|
||||||
|
import com.alibaba.datax.example.util.PathUtil;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestIoTDB {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIoTDBReader2MySQLWriter() {
|
||||||
|
String path = "/iotdb2mysql.json";
|
||||||
|
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
|
||||||
|
ExampleContainer.start(jobPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMySQLReader2IoTDBWriter() {
|
||||||
|
String path = "/mysql2iotdb.json";
|
||||||
|
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
|
||||||
|
ExampleContainer.start(jobPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIoTDBReader2txtWriter() {
|
||||||
|
String path = "/iotdb2txt.json";
|
||||||
|
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
|
||||||
|
ExampleContainer.start(jobPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIoTDBReader2StreamWriter() {
|
||||||
|
String path = "/iotdb2stream.json";
|
||||||
|
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
|
||||||
|
ExampleContainer.start(jobPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMySQLReader2StreamWriter() {
|
||||||
|
String path = "/mysql2stream.json";
|
||||||
|
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
|
||||||
|
ExampleContainer.start(jobPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMySQLReader2txtWriter() {
|
||||||
|
String path = "/mysql2txt.json";
|
||||||
|
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
|
||||||
|
ExampleContainer.start(jobPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStreamReader2TxtWriter() {
|
||||||
|
String path = "/stream2txt.json";
|
||||||
|
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
|
||||||
|
ExampleContainer.start(jobPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStreamReader2StreamWriter() {
|
||||||
|
String path = "/stream2stream.json";
|
||||||
|
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
|
||||||
|
ExampleContainer.start(jobPath);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,56 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 3
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "iotdbreader",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "root",
|
||||||
|
"host": "172.20.31.61",
|
||||||
|
"port": 6667,
|
||||||
|
"fetchSize": 10000,
|
||||||
|
"version": "V_1_0",
|
||||||
|
"timeColumnPosition": 0,
|
||||||
|
"querySqls":[
|
||||||
|
],
|
||||||
|
"device": "root.cgn.device",
|
||||||
|
"measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
|
||||||
|
"where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "mysqlwriter",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "toy123",
|
||||||
|
"writeMode": "insert",
|
||||||
|
"#需要提前建表": "CREATE TABLE device (`time` DATETIME,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);",
|
||||||
|
"column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
|
||||||
|
"session": [
|
||||||
|
"set session sql_mode='ANSI'"
|
||||||
|
],
|
||||||
|
"preSql": [
|
||||||
|
"delete from device"
|
||||||
|
],
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"device"
|
||||||
|
],
|
||||||
|
"#": "下面的URL需要把中括号去掉,否则报错,mysqlwriter的bug,未修改",
|
||||||
|
"jdbcUrl": "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,42 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 3
|
||||||
|
},
|
||||||
|
"errorLimit": {
|
||||||
|
"record": 0,
|
||||||
|
"percentage": 0.02
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "iotdbreader",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "root",
|
||||||
|
"host": "192.168.150.100",
|
||||||
|
"port": 6667,
|
||||||
|
"fetchSize": 10000,
|
||||||
|
"version": "V_1_0",
|
||||||
|
"timeColumnPosition": 0,
|
||||||
|
"querySqls":[
|
||||||
|
|
||||||
|
],
|
||||||
|
"device": "root.cgn.device",
|
||||||
|
"measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
|
||||||
|
"where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "streamwriter",
|
||||||
|
"parameter": {
|
||||||
|
"print":true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,42 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 3
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "iotdbreader",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "root",
|
||||||
|
"host": "172.20.31.61",
|
||||||
|
"port": 6667,
|
||||||
|
"fetchSize": 10000,
|
||||||
|
"version": "V_1_0",
|
||||||
|
"timeColumnPosition": 0,
|
||||||
|
"querySqls":[
|
||||||
|
"select * from root.cgn.device",
|
||||||
|
"select A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU from root.cgn.device"
|
||||||
|
],
|
||||||
|
"device": "root.cgn.device",
|
||||||
|
"measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
|
||||||
|
"where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "txtfilewriter",
|
||||||
|
"parameter": {
|
||||||
|
"path": "D:/下载",
|
||||||
|
"fileName": "txtText",
|
||||||
|
"writeMode": "truncate",
|
||||||
|
"dateFormat": "yyyy-MM-dd"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,50 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "mysqlreader",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "toy123",
|
||||||
|
"column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
|
||||||
|
"splitPk": "",
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"device"
|
||||||
|
],
|
||||||
|
"jdbcUrl": [
|
||||||
|
"jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "iotdbwriter",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "root",
|
||||||
|
"host": "172.20.31.61",
|
||||||
|
"port": 6667,
|
||||||
|
"fetchSize": 10000,
|
||||||
|
"version": "V_1_0",
|
||||||
|
"timeColumnPosition": 0,
|
||||||
|
"insertBatchSize": 1000,
|
||||||
|
"device": "root.cgn.device",
|
||||||
|
"measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
|
||||||
|
"preSql": [
|
||||||
|
"delete timeseries root.cgn.device.**"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 3
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,46 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 3
|
||||||
|
},
|
||||||
|
"errorLimit": {
|
||||||
|
"record": 0,
|
||||||
|
"percentage": 0.02
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "mysqlreader",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "toy123",
|
||||||
|
"column": [
|
||||||
|
"order_num",
|
||||||
|
"cust_id"
|
||||||
|
],
|
||||||
|
"splitPk": "",
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"orders"
|
||||||
|
],
|
||||||
|
"jdbcUrl": [
|
||||||
|
"jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "streamwriter",
|
||||||
|
"parameter": {
|
||||||
|
"print":true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,41 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "mysqlreader",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "toy123",
|
||||||
|
"column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
|
||||||
|
"splitPk": "",
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"device"
|
||||||
|
],
|
||||||
|
"jdbcUrl": [
|
||||||
|
"jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "txtfilewriter",
|
||||||
|
"parameter": {
|
||||||
|
"path": "D:/下载",
|
||||||
|
"fileName": "txtText",
|
||||||
|
"writeMode": "truncate",
|
||||||
|
"dateFormat": "yyyy-MM-dd"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 5
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,36 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "streamreader",
|
||||||
|
"parameter": {
|
||||||
|
"sliceRecordCount": 10,
|
||||||
|
"column": [
|
||||||
|
{
|
||||||
|
"type": "long",
|
||||||
|
"value": "15"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "string",
|
||||||
|
"value": "hello,你好,世界-DataX"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "streamwriter",
|
||||||
|
"parameter": {
|
||||||
|
"encoding": "UTF-8",
|
||||||
|
"print": true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 5
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,38 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "streamreader",
|
||||||
|
"parameter": {
|
||||||
|
"sliceRecordCount": 10,
|
||||||
|
"column": [
|
||||||
|
{
|
||||||
|
"type": "long",
|
||||||
|
"value": "10"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "string",
|
||||||
|
"value": "hello,你好,世界-DataX"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "txtfilewriter",
|
||||||
|
"parameter": {
|
||||||
|
"path": "D:/下载",
|
||||||
|
"fileName": "txtText",
|
||||||
|
"writeMode": "truncate",
|
||||||
|
"dateFormat": "yyyy-MM-dd"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 5
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,39 @@
|
|||||||
|
AX L2KRT008MA 测试性描述 % 0 1.00E+02
|
||||||
|
AX L2ETY101MP 测试性描述 % 0 1.00E+02
|
||||||
|
AX L2SEC002MD 测试性描述 % 0 1.00E+02
|
||||||
|
AX L1ETY103MP 测试性描述 % 0 1.00E+02
|
||||||
|
AX D1KRT003EU 测试性描述 % 0 1.00E+02
|
||||||
|
AX L4EAS008MDX_XQ01 测试性描述 % 0 1.00E+02
|
||||||
|
AX L3RIC039KMX_XQ01 测试性描述 % 0 1.00E+02
|
||||||
|
AX L2VVP008MP 测试性描述 % 0 1.00E+02
|
||||||
|
AX D2RCV039MD 测试性描述 % 0 1.00E+02
|
||||||
|
DX D2RPA063EC 测试性描述 0 2.00E+00
|
||||||
|
AX L4KRT022MAK_XP01 测试性描述 % 0 1.00E+02
|
||||||
|
AX L3SEC002MPX_XQ01 测试性描述 % 0 1.00E+02
|
||||||
|
DX L4RCP651KSX_XG52 测试性描述 0 2.00E+00
|
||||||
|
AX L2RRI082MT 测试性描述 % 0 1.00E+02
|
||||||
|
AX A5STD 测试性描述 % 0 1.00E+02
|
||||||
|
DX D2RRI003PO 测试性描述 0 2.00E+00
|
||||||
|
AX L4RCP091MT_XQ01 测试性描述 % 0 1.00E+02
|
||||||
|
DX L1GCT132SM5 测试性描述 0 2.00E+00
|
||||||
|
DX L1GCT132SM3 测试性描述 0 2.00E+00
|
||||||
|
AX L2RIC035MT 测试性描述 % 0 1.00E+02
|
||||||
|
AX L1RGL018QM 测试性描述 % 0 1.00E+02
|
||||||
|
AX L1RIC037MT 测试性描述 % 0 1.00E+02
|
||||||
|
AX D2RCP042MD 测试性描述 % 0 1.00E+02
|
||||||
|
AX D2RGL001QM 测试性描述 % 0 1.00E+02
|
||||||
|
AX D2RIC020MT 测试性描述 % 0 1.00E+02
|
||||||
|
AX D1RIC022MT 测试性描述 % 0 1.00E+02
|
||||||
|
AX D1RGL003QM 测试性描述 % 0 1.00E+02
|
||||||
|
AX L3LGE005TU_XQ01 测试性描述 % 0 1.00E+02
|
||||||
|
DX D2RCP517EC 测试性描述 0 2.00E+00
|
||||||
|
AX D1RCP054MD 测试性描述 % 0 1.00E+02
|
||||||
|
AX L2RIS014MD 测试性描述 % 0 1.00E+02
|
||||||
|
DX D1LGE001JA 测试性描述 0 2.00E+00
|
||||||
|
DX D2SEC001PO 测试性描述 0 2.00E+00
|
||||||
|
DX D1SEC003PO 测试性描述 0 2.00E+00
|
||||||
|
AX D1RIS001MD 测试性描述 % 0 1.00E+02
|
||||||
|
AX L1RCP090MT 测试性描述 % 0 1.00E+02
|
||||||
|
DX L2VVP003SM5 测试性描述 0 2.00E+00
|
||||||
|
AX L3RIC022KMX_XQ01 测试性描述 % 0 1.00E+02
|
||||||
|
DX L1GCT501EC 测试性描述 0 2.00E+00
|
@ -15,6 +15,7 @@
|
|||||||
<module>datax-example-core</module>
|
<module>datax-example-core</module>
|
||||||
<module>datax-example-streamreader</module>
|
<module>datax-example-streamreader</module>
|
||||||
<module>datax-example-neo4j</module>
|
<module>datax-example-neo4j</module>
|
||||||
|
<module>datax-example-iotdb</module>
|
||||||
</modules>
|
</modules>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
|
212
iotdbreader/doc/iotdbreader-CN.md
Normal file
212
iotdbreader/doc/iotdbreader-CN.md
Normal file
@ -0,0 +1,212 @@
|
|||||||
|
# DataX IoTDBReader
|
||||||
|
|
||||||
|
## 1 快速介绍
|
||||||
|
|
||||||
|
IoTDBReader 插件实现了 IoTDB 读取数据的功能。
|
||||||
|
|
||||||
|
## 2 实现原理
|
||||||
|
|
||||||
|
IoTDBReader 通过 IoTDB 的 原生java session 查询获取数据。
|
||||||
|
|
||||||
|
## 3 功能说明
|
||||||
|
|
||||||
|
### 3.1 配置样例
|
||||||
|
|
||||||
|
* 配置一个从 IoTDB 抽取数据作业:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 3
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "iotdbreader",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "root",
|
||||||
|
"host": "192.168.150.100",
|
||||||
|
"port": 6667,
|
||||||
|
"fetchSize": 10000,
|
||||||
|
"version": "V_1_0",
|
||||||
|
"##": "时间列插入DataX的Record中的位置,默认第0列",
|
||||||
|
"timeColumnPosition": 0,
|
||||||
|
"##":"写了querySqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句",
|
||||||
|
"querySqls":[
|
||||||
|
],
|
||||||
|
"device": "root.cgn.device",
|
||||||
|
"##":"时间列不属于测点",
|
||||||
|
"measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
|
||||||
|
"where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "mysqlwriter",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "toy123",
|
||||||
|
"writeMode": "insert",
|
||||||
|
"#需要提前建表": "CREATE TABLE device (`time` DATETIME,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);",
|
||||||
|
"column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
|
||||||
|
"session": [
|
||||||
|
"set session sql_mode='ANSI'"
|
||||||
|
],
|
||||||
|
"preSql": [
|
||||||
|
"delete from device"
|
||||||
|
],
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"device"
|
||||||
|
],
|
||||||
|
"#": "下面的URL需要把中括号去掉,否则报错,mysqlreader的bug,未修改",
|
||||||
|
"jdbcUrl": "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
* 配置一个自定义 SQL 的数据抽取作业:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 3
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "iotdbreader",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "root",
|
||||||
|
"host": "192.168.150.100",
|
||||||
|
"port": 6667,
|
||||||
|
"fetchSize": 10000,
|
||||||
|
"version": "V_1_0",
|
||||||
|
"timeColumnPosition": 0,
|
||||||
|
"##":"写了querySqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句",
|
||||||
|
"querySqls":[
|
||||||
|
"select * from root.cgn.device",
|
||||||
|
"select A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU from root.cgn.device"
|
||||||
|
],
|
||||||
|
"device": "",
|
||||||
|
"measurements": "",
|
||||||
|
"where": ""
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "txtfilewriter",
|
||||||
|
"parameter": {
|
||||||
|
"path": "D:/下载",
|
||||||
|
"fileName": "txtText",
|
||||||
|
"writeMode": "truncate",
|
||||||
|
"dateFormat": "yyyy-MM-dd"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3.2 参数说明
|
||||||
|
* username
|
||||||
|
* 描述:用户名
|
||||||
|
* 必选:是
|
||||||
|
* 默认值:无
|
||||||
|
* password
|
||||||
|
* 描述:用户名的密码
|
||||||
|
* 必选:是
|
||||||
|
* 默认值:无
|
||||||
|
* host
|
||||||
|
* 描述:连接iotdb数据库的主机地址
|
||||||
|
* 必选:是
|
||||||
|
* 默认值:无
|
||||||
|
* port
|
||||||
|
* 描述:端口
|
||||||
|
* 必选:是
|
||||||
|
* 默认值:无
|
||||||
|
* version
|
||||||
|
* 描述:iotdb版本
|
||||||
|
* 必选:是
|
||||||
|
* 默认值:无
|
||||||
|
* timeColumnPosition
|
||||||
|
* 描述:时间列在Record中列的位置
|
||||||
|
* 必选:否
|
||||||
|
* 默认值:0
|
||||||
|
* querySqls
|
||||||
|
* 描述:直接写多行SQL,可以并行读取,此时下面的参数失效。
|
||||||
|
* 必选:否
|
||||||
|
* 默认值:
|
||||||
|
* device
|
||||||
|
* 描述:IoTDB中的概念,可理解为mysql中的表。
|
||||||
|
* 必选:querySqls为空时必选
|
||||||
|
* 默认值:无
|
||||||
|
* measurements
|
||||||
|
* 描述:IoTDB中的概念,可理解为mysql中的字段。
|
||||||
|
* 必选:querySqls为空时必选
|
||||||
|
* 默认值:无
|
||||||
|
* where
|
||||||
|
* 描述:查询条件
|
||||||
|
* 必选:否
|
||||||
|
* 默认值:无
|
||||||
|
|
||||||
|
### 3.3 类型转换
|
||||||
|
|
||||||
|
| IoTDB 数据类型 | DataX 内部类型 |
|
||||||
|
|-----------------|--------|
|
||||||
|
| INT32 | Int |
|
||||||
|
| INT64,TIMESTAMP | Long |
|
||||||
|
| FLOAT | FLOAT |
|
||||||
|
| DOUBLE | Double |
|
||||||
|
| BOOLEAN | Bool |
|
||||||
|
| DATE | Date |
|
||||||
|
| STRING,TEXT | String |
|
||||||
|
|
||||||
|
## 4 性能报告
|
||||||
|
|
||||||
|
### 4.1 环境准备
|
||||||
|
|
||||||
|
#### 4.1.1 数据特征
|
||||||
|
|
||||||
|
#### 4.1.2 机器参数
|
||||||
|
|
||||||
|
#### 4.1.3 DataX jvm 参数
|
||||||
|
|
||||||
|
-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
|
||||||
|
|
||||||
|
### 4.2 测试报告
|
||||||
|
|
||||||
|
#### 4.2.1 单表测试报告
|
||||||
|
|
||||||
|
| 通道数| DataX速度(Rec/s)|DataX流量(MB/s)| DataX机器网卡流出流量(MB/s)|DataX机器运行负载|DB网卡进入流量(MB/s)|DB运行负载|DB TPS|
|
||||||
|
|--------| --------|--------|--------|--------|--------|--------|--------|
|
||||||
|
|1| | | | | | | |
|
||||||
|
|4| | | | | | | |
|
||||||
|
|8| | | | | | | |
|
||||||
|
|16| | | | | | | |
|
||||||
|
|32| | | | | | | |
|
||||||
|
|
||||||
|
说明:
|
||||||
|
|
||||||
|
#### 4.2.4 性能测试小结
|
||||||
|
|
||||||
|
1.
|
||||||
|
2.
|
||||||
|
|
||||||
|
## 5 约束限制
|
||||||
|
|
||||||
|
## FAQ
|
102
iotdbreader/pom.xml
Normal file
102
iotdbreader/pom.xml
Normal file
@ -0,0 +1,102 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<parent>
|
||||||
|
<artifactId>datax-all</artifactId>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<artifactId>iotdbreader</artifactId>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<maven.compiler.source>8</maven.compiler.source>
|
||||||
|
<maven.compiler.target>8</maven.compiler.target>
|
||||||
|
<iotdb.session.version>1.3.3-SNAPSHOT</iotdb.session.version>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>datax-core</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>datax-common</artifactId>
|
||||||
|
<version>${datax-project-version}</version>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>slf4j-log4j12</artifactId>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.iotdb</groupId>
|
||||||
|
<artifactId>iotdb-session</artifactId>
|
||||||
|
<version>${iotdb.session.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.iotdb</groupId>
|
||||||
|
<artifactId>node-commons</artifactId>
|
||||||
|
<version>${iotdb.session.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<!-- compiler plugin -->
|
||||||
|
<plugin>
|
||||||
|
<artifactId>maven-compiler-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<source>${jdk-version}</source>
|
||||||
|
<target>${jdk-version}</target>
|
||||||
|
<encoding>${project-sourceEncoding}</encoding>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<artifactId>maven-assembly-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<descriptors>
|
||||||
|
<descriptor>src/main/assembly/package.xml</descriptor>
|
||||||
|
</descriptors>
|
||||||
|
<finalName>datax</finalName>
|
||||||
|
</configuration>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>dwzip</id>
|
||||||
|
<phase>package</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>single</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-surefire-plugin</artifactId>
|
||||||
|
<version>2.12.4</version>
|
||||||
|
<configuration>
|
||||||
|
<!-- 包含哪些测试用例 -->
|
||||||
|
<includes>
|
||||||
|
<include>**/*Test.java</include>
|
||||||
|
</includes>
|
||||||
|
<!-- 不包含哪些测试用例 -->
|
||||||
|
<excludes>
|
||||||
|
</excludes>
|
||||||
|
<testFailureIgnore>true</testFailureIgnore>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
|
||||||
|
</project>
|
34
iotdbreader/src/main/assembly/package.xml
Normal file
34
iotdbreader/src/main/assembly/package.xml
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
|
||||||
|
<id></id>
|
||||||
|
<formats>
|
||||||
|
<format>dir</format>
|
||||||
|
</formats>
|
||||||
|
<includeBaseDirectory>false</includeBaseDirectory>
|
||||||
|
<fileSets>
|
||||||
|
<fileSet>
|
||||||
|
<directory>src/main/resources</directory>
|
||||||
|
<includes>
|
||||||
|
<include>plugin.json</include>
|
||||||
|
<include>plugin_job_template.json</include>
|
||||||
|
</includes>
|
||||||
|
<outputDirectory>plugin/writer/iotdbreader</outputDirectory>
|
||||||
|
</fileSet>
|
||||||
|
<fileSet>
|
||||||
|
<directory>target/</directory>
|
||||||
|
<includes>
|
||||||
|
<include>iotdbreader-0.0.1-SNAPSHOT.jar</include>
|
||||||
|
</includes>
|
||||||
|
<outputDirectory>plugin/writer/iotdbreader</outputDirectory>
|
||||||
|
</fileSet>
|
||||||
|
</fileSets>
|
||||||
|
|
||||||
|
<dependencySets>
|
||||||
|
<dependencySet>
|
||||||
|
<useProjectArtifact>false</useProjectArtifact>
|
||||||
|
<outputDirectory>plugin/reader/iotdbreader/libs</outputDirectory>
|
||||||
|
<scope>runtime</scope>
|
||||||
|
</dependencySet>
|
||||||
|
</dependencySets>
|
||||||
|
</assembly>
|
@ -0,0 +1,252 @@
|
|||||||
|
package com.alibaba.datax.plugin.reader.iotdbreader;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.element.*;
|
||||||
|
import com.alibaba.datax.common.exception.DataXException;
|
||||||
|
import com.alibaba.datax.common.plugin.AbstractTaskPlugin;
|
||||||
|
import com.alibaba.datax.common.plugin.RecordSender;
|
||||||
|
import com.alibaba.datax.common.plugin.TaskPluginCollector;
|
||||||
|
import com.alibaba.datax.common.spi.Reader;
|
||||||
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
|
import org.apache.iotdb.isession.SessionDataSet;
|
||||||
|
import org.apache.iotdb.isession.util.Version;
|
||||||
|
import org.apache.iotdb.rpc.IoTDBConnectionException;
|
||||||
|
import org.apache.iotdb.rpc.StatementExecutionException;
|
||||||
|
import org.apache.iotdb.session.Session;
|
||||||
|
import org.apache.tsfile.enums.TSDataType;
|
||||||
|
import org.apache.tsfile.read.common.Field;
|
||||||
|
import org.apache.tsfile.read.common.RowRecord;
|
||||||
|
import org.apache.tsfile.read.common.block.column.NullColumn;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.sql.*;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class IoTDBReader extends Reader {
|
||||||
|
public static class Job extends Reader.Job {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
|
||||||
|
private Configuration jobConf;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Job对象初始化工作
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
// 通过super.getPluginJobConf()获取与本插件相关的配置。
|
||||||
|
this.jobConf = super.getPluginJobConf();
|
||||||
|
// 检查各种参数是否正确
|
||||||
|
String username = this.jobConf.getString(Key.USERNAME);
|
||||||
|
if (username == null || username.isEmpty()) {
|
||||||
|
throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.USERNAME + "] is not set.");
|
||||||
|
}
|
||||||
|
String password = this.jobConf.getString(Key.PASSWORD);
|
||||||
|
if (password == null || password.isEmpty()) {
|
||||||
|
throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PASSWORD + "] is not set.");
|
||||||
|
}
|
||||||
|
String host = this.jobConf.getString(Key.HOST);
|
||||||
|
if (host == null || host.isEmpty()) {
|
||||||
|
throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.HOST + "] is not set.");
|
||||||
|
}
|
||||||
|
String port = this.jobConf.getString(Key.PORT);
|
||||||
|
if (port == null || port.isEmpty()) {
|
||||||
|
throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PORT + "] is not set.");
|
||||||
|
}
|
||||||
|
String fetchSize = this.jobConf.getString(Key.FETCH_SIZE);
|
||||||
|
if (fetchSize == null || fetchSize.isEmpty()) {
|
||||||
|
throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.FETCH_SIZE + "] is not set.");
|
||||||
|
}
|
||||||
|
// 还有一部分参数没检查,没必要了。
|
||||||
|
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void prepare() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 将Job拆分Task。
|
||||||
|
* @param adviceNumber 框架建议的拆分数,一般是运行时所配置的并发度。
|
||||||
|
* @return Task的配置列表,一个配置文件对应一个task。
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public List<Configuration> split(int adviceNumber) {
|
||||||
|
// 每个config对应一个task
|
||||||
|
List<Configuration> configs = new ArrayList<>();
|
||||||
|
List<String> queryList = this.jobConf.getList(Key.QUERY_SQLS, String.class);
|
||||||
|
if (queryList == null || queryList.size() == 0){
|
||||||
|
Configuration clone = this.jobConf.clone();
|
||||||
|
String device = this.jobConf.getString(Key.DEVICE);
|
||||||
|
List<String> measurements = this.jobConf.getList(Key.MEASUREMENTS, String.class);
|
||||||
|
String where = this.jobConf.getString(Key.WHERE);
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append("select ").append(String.join(",", measurements));
|
||||||
|
sb.append(" from ").append(device);
|
||||||
|
if (where != null && !where.isEmpty()){
|
||||||
|
sb.append(" where ").append(where);
|
||||||
|
}
|
||||||
|
clone.set(Key.QUERY_SQL, sb.toString());
|
||||||
|
configs.add(clone);
|
||||||
|
//DataX中一个查询是单线程,实际上底层session中是多线程读取。
|
||||||
|
}else{
|
||||||
|
// 直接读取最终SQL
|
||||||
|
for (String query : queryList) {
|
||||||
|
Configuration clone = this.jobConf.clone();
|
||||||
|
clone.remove(Key.QUERY_SQLS);
|
||||||
|
clone.set(Key.QUERY_SQL, query);
|
||||||
|
configs.add(clone);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// LOG.info("configs: {}", configs);
|
||||||
|
return configs;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Job对象自身的销毁工作。
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 全局的后置工作。
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void post() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Task extends Reader.Task {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
|
||||||
|
|
||||||
|
private Configuration taskConf;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* IoTDB原生读写工具
|
||||||
|
*/
|
||||||
|
private Session session;
|
||||||
|
/**
|
||||||
|
* IoTDB中的时间列插入的位置,默认为0,即第一列。
|
||||||
|
*/
|
||||||
|
private int timeColumnPosition;
|
||||||
|
/**
|
||||||
|
* 最终的查询SQL,交给session执行。
|
||||||
|
*/
|
||||||
|
private String querySql;
|
||||||
|
private TaskPluginCollector taskPluginCollector;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
// 获取与本Task相关的配置,是Job的split方法返回的配置列表中的其中一个。
|
||||||
|
taskConf = super.getPluginJobConf();
|
||||||
|
|
||||||
|
// session init
|
||||||
|
session =
|
||||||
|
new Session.Builder()
|
||||||
|
.host(taskConf.getString(Key.HOST))
|
||||||
|
.port(taskConf.getInt(Key.PORT))
|
||||||
|
.username(taskConf.getString(Key.USERNAME))
|
||||||
|
.password(taskConf.getString(Key.PASSWORD))
|
||||||
|
.version(Version.valueOf(taskConf.getString(Key.VERSION)))
|
||||||
|
.build();
|
||||||
|
// open session, close RPCCompression
|
||||||
|
try {
|
||||||
|
session.open(false);
|
||||||
|
} catch (IoTDBConnectionException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// set session fetchSize
|
||||||
|
session.setFetchSize(taskConf.getInt(Key.FETCH_SIZE));
|
||||||
|
|
||||||
|
this.timeColumnPosition = (taskConf.getInt(Key.TIME_COLUMN_POSITION) == null) ? 0 : taskConf.getInt(Key.TIME_COLUMN_POSITION);
|
||||||
|
this.querySql = taskConf.getString(Key.QUERY_SQL);
|
||||||
|
taskPluginCollector = super.getTaskPluginCollector();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
// Task自身的销毁工作。
|
||||||
|
try {
|
||||||
|
if (session != null){
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
} catch (IoTDBConnectionException e) {
|
||||||
|
LOG.info(e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 从数据源读数据,写入到RecordSender中。
|
||||||
|
* @param recordSender 把数据写入连接Reader和Writer的缓存队列。
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void startRead(RecordSender recordSender) {
|
||||||
|
try {
|
||||||
|
SessionDataSet dataSet = session.executeQueryStatement(this.querySql);
|
||||||
|
while (dataSet.hasNext()) {
|
||||||
|
// DataX中的行record
|
||||||
|
Record record = recordSender.createRecord();
|
||||||
|
// IoTDB中的行
|
||||||
|
RowRecord rowRecord = dataSet.next();
|
||||||
|
List<Field> fields = rowRecord.getFields();
|
||||||
|
try {
|
||||||
|
// 除time列外的其他列遍历类型后转换
|
||||||
|
for (int i = 0; i < fields.size(); i++) {
|
||||||
|
if (i == timeColumnPosition){
|
||||||
|
// time列插入指定位置,时间列不在fields中,需要单独处理。不能为null
|
||||||
|
long timestamp = rowRecord.getTimestamp();
|
||||||
|
record.addColumn(new DateColumn(timestamp));
|
||||||
|
}
|
||||||
|
Field field = fields.get(i);
|
||||||
|
TSDataType dataType = field.getDataType();
|
||||||
|
if (dataType == null) {
|
||||||
|
// 需要写插件支持处理null数据,否则会空指向异常,这里先当成脏数据
|
||||||
|
// record.addColumn(null);
|
||||||
|
// continue;
|
||||||
|
throw new RuntimeException("null datatype");
|
||||||
|
}
|
||||||
|
switch (dataType) {
|
||||||
|
case BOOLEAN:
|
||||||
|
record.addColumn(new BoolColumn(field.getBoolV()));
|
||||||
|
break;
|
||||||
|
case INT32:
|
||||||
|
record.addColumn(new LongColumn(field.getIntV()));
|
||||||
|
break;
|
||||||
|
case INT64:
|
||||||
|
case TIMESTAMP:
|
||||||
|
record.addColumn(new LongColumn(field.getLongV()));
|
||||||
|
break;
|
||||||
|
case FLOAT:
|
||||||
|
record.addColumn(new DoubleColumn(field.getFloatV()));
|
||||||
|
break;
|
||||||
|
case DOUBLE:
|
||||||
|
record.addColumn(new DoubleColumn(field.getDoubleV()));
|
||||||
|
break;
|
||||||
|
case STRING:
|
||||||
|
case TEXT:
|
||||||
|
record.addColumn(new StringColumn(field.getStringValue()));
|
||||||
|
break;
|
||||||
|
case DATE:
|
||||||
|
record.addColumn(new DateColumn(Date.valueOf(field.getDateV())));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new RuntimeException("Unsupported data type: " + dataType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 发送
|
||||||
|
recordSender.sendToWriter(record);
|
||||||
|
}catch (RuntimeException e){
|
||||||
|
LOG.info(e.getMessage());
|
||||||
|
this.taskPluginCollector.collectDirtyRecord(record, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (StatementExecutionException | IoTDBConnectionException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,33 @@
|
|||||||
|
package com.alibaba.datax.plugin.reader.iotdbreader;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.spi.ErrorCode;
|
||||||
|
|
||||||
|
public enum IoTDBReaderErrorCode implements ErrorCode {
|
||||||
|
|
||||||
|
REQUIRED_VALUE("IoTDBReader-00", "parameter value is missing"),
|
||||||
|
ILLEGAL_VALUE("IoTDBReader-01", "invalid parameter value"),
|
||||||
|
CONNECTION_FAILED("IoTDBReader-02", "connection error");
|
||||||
|
|
||||||
|
private final String code;
|
||||||
|
private final String description;
|
||||||
|
|
||||||
|
IoTDBReaderErrorCode(String code, String description) {
|
||||||
|
this.code = code;
|
||||||
|
this.description = description;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getCode() {
|
||||||
|
return this.code;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDescription() {
|
||||||
|
return this.description;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.format("Code:[%s], Description:[%s].", this.code, this.description);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,16 @@
|
|||||||
|
package com.alibaba.datax.plugin.reader.iotdbreader;
|
||||||
|
|
||||||
|
public class Key {
|
||||||
|
public static final String USERNAME = "username";
|
||||||
|
public static final String PASSWORD = "password";
|
||||||
|
public static final String HOST = "host";
|
||||||
|
public static final String PORT = "port";
|
||||||
|
public static final String FETCH_SIZE = "fetchSize";
|
||||||
|
public static final String VERSION = "version";
|
||||||
|
public static final String QUERY_SQLS = "querySqls";
|
||||||
|
public static final String QUERY_SQL = "querySql";
|
||||||
|
public static final String TIME_COLUMN_POSITION = "timeColumnPosition";
|
||||||
|
public static final String DEVICE = "device";
|
||||||
|
public static final String MEASUREMENTS = "measurements";
|
||||||
|
public static final String WHERE = "where";
|
||||||
|
}
|
10
iotdbreader/src/main/resources/plugin.json
Normal file
10
iotdbreader/src/main/resources/plugin.json
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
{
|
||||||
|
"name": "iotdbreader",
|
||||||
|
"class": "com.alibaba.datax.plugin.reader.iotdbreader.IoTDBReader",
|
||||||
|
"description": {
|
||||||
|
"useScene": "data migration to iotdb",
|
||||||
|
"mechanism": "use iotdb-java-session to write data."
|
||||||
|
},
|
||||||
|
"developer": "timecho.com"
|
||||||
|
}
|
||||||
|
|
25
iotdbreader/src/main/resources/plugin_job_template.json
Normal file
25
iotdbreader/src/main/resources/plugin_job_template.json
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
{
|
||||||
|
"name": "iotdbreader",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "root",
|
||||||
|
"host": "192.168.150.100",
|
||||||
|
"port": 6667,
|
||||||
|
"fetchSize": 10000,
|
||||||
|
"version": "V_1_0",
|
||||||
|
"##": "时间列插入DataX的Record中的位置,默认第0列",
|
||||||
|
"timeColumnPosition": 0,
|
||||||
|
"##":"写了querySqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句",
|
||||||
|
"querySqls": [
|
||||||
|
"select * from root.cgn.device",
|
||||||
|
"select * from root.cgn.device",
|
||||||
|
"select * from root.cgn.device",
|
||||||
|
"select * from root.cgn.device",
|
||||||
|
"select * from root.cgn.device"
|
||||||
|
],
|
||||||
|
"device": "root.cgn.device",
|
||||||
|
"##":"时间列不属于测点",
|
||||||
|
"measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
|
||||||
|
"where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00"
|
||||||
|
}
|
||||||
|
}
|
200
iotdbwriter/doc/iotdbwriter-CN.md
Normal file
200
iotdbwriter/doc/iotdbwriter-CN.md
Normal file
@ -0,0 +1,200 @@
|
|||||||
|
# DataX IoTDBWriter
|
||||||
|
|
||||||
|
## 1 快速介绍
|
||||||
|
IoTDBWriter插件实现了写入数据到IoTDB数据库目标表(设备)的功能。
|
||||||
|
|
||||||
|
底层实现上,IoTDBWriter通过iotdb.session连接IoTDB,按照IoTDB的SQL语法,
|
||||||
|
执行session.insertRecordsOfOneDevice语句,将数据写入IoTDB。
|
||||||
|
|
||||||
|
IoTDBWriter可以作为数据迁移工具供DBA将其它数据库的数据导入到IoTDB。
|
||||||
|
|
||||||
|
## 2 实现原理
|
||||||
|
|
||||||
|
IoTDBWriter 通过 DataX 框架获取 Reader 生成的协议数据Record,通过Session连接IoTDB,执行insert语句,将数据写入IoTDB。
|
||||||
|
|
||||||
|
IoTDB中设备与列的概念见IoTDB官方文档。
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
## 3 功能说明
|
||||||
|
### 3.1 配置样例
|
||||||
|
|
||||||
|
配置一个MySQL数据写入IoTDB的作业
|
||||||
|
|
||||||
|
使用下面的Job配置,将数据写入IoTDB:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "mysqlreader",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "toy123",
|
||||||
|
"column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
|
||||||
|
"splitPk": "",
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"device"
|
||||||
|
],
|
||||||
|
"jdbcUrl": [
|
||||||
|
"jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "iotdbwriter",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "root",
|
||||||
|
"host": "192.168.150.100",
|
||||||
|
"port": 6667,
|
||||||
|
"version": "V_1_0",
|
||||||
|
"##": "Reader中时间列的位置,默认0列",
|
||||||
|
"timeColumnPosition": 0,
|
||||||
|
"batchSize": 1000,
|
||||||
|
"device": "root.cgn.device",
|
||||||
|
"measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
|
||||||
|
"###": "在插入之前,预先执行的SQL,默认为空",
|
||||||
|
"preSql": []
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 3
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
注意:mysqlreader插件,在src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java 中270行左右,修改了代码,将mysql中的tinyint(1)转为boolean类型
|
||||||
|
case Types.SMALLINT:
|
||||||
|
case Types.TINYINT:
|
||||||
|
// 将mysql中的tinyint(1)转为boolean类型
|
||||||
|
if (metaData.getPrecision(i) <= 3){
|
||||||
|
record.addColumn(new BoolColumn(rs.getBoolean(i)));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
### 3.2 参数说明
|
||||||
|
|
||||||
|
* username
|
||||||
|
* 描述:用户名
|
||||||
|
* 必选:是
|
||||||
|
* 默认值:无
|
||||||
|
* password
|
||||||
|
* 描述:用户名的密码
|
||||||
|
* 必选:是
|
||||||
|
* 默认值:无
|
||||||
|
* host
|
||||||
|
* 描述:连接iotdb数据库的主机地址
|
||||||
|
* 必选:是
|
||||||
|
* 默认值:无
|
||||||
|
* port
|
||||||
|
* 描述:端口
|
||||||
|
* 必选:是
|
||||||
|
* 默认值:无
|
||||||
|
* version
|
||||||
|
* 描述:iotdb版本
|
||||||
|
* 必选:是
|
||||||
|
* 默认值:无
|
||||||
|
* timeColumnPosition
|
||||||
|
* 描述:时间列在Record中列的位置
|
||||||
|
* 必选:否
|
||||||
|
* 默认值:0
|
||||||
|
* device
|
||||||
|
* 描述:iotdb中的概念,对应mysql中的表名
|
||||||
|
* 必选:是
|
||||||
|
* 默认值:无
|
||||||
|
* measurements
|
||||||
|
* 描述:iotdb中的概念,对应mysql中的字段集合,顺序应该与record中column的顺序相同
|
||||||
|
* 必选:是
|
||||||
|
* 默认值:无
|
||||||
|
* batchSize
|
||||||
|
* 描述:每batchSize条record为一个batch进行写入
|
||||||
|
* 必选:否
|
||||||
|
* 默认值:1000
|
||||||
|
* preSql
|
||||||
|
* 描述:插入前是否预先执行SQL
|
||||||
|
* 必选:否
|
||||||
|
* 默认值:无
|
||||||
|
|
||||||
|
### 3.3 类型转换
|
||||||
|
|
||||||
|
datax中的数据类型,映射到IoTDB的数据类型
|
||||||
|
|
||||||
|
| DataX 内部类型 | IoTDB 数据类型 |
|
||||||
|
| -------------- |------------------|
|
||||||
|
| INT | INT32 |
|
||||||
|
| LONG | TIMESTAMP, INT64 |
|
||||||
|
| DOUBLE | DOUBLE |
|
||||||
|
| STRING | STRING |
|
||||||
|
| BOOL | BOOL |
|
||||||
|
| DATE | TIMESTAMP,DATE |
|
||||||
|
| BYTES | BINARY |
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
### 3.4 各数据源到IoTDB的参考示例
|
||||||
|
见datax-example/datax-example-iotdb
|
||||||
|
|
||||||
|
|
||||||
|
## 4 性能报告
|
||||||
|
|
||||||
|
### 4.1 环境准备
|
||||||
|
|
||||||
|
#### 4.1.1 数据特征
|
||||||
|
|
||||||
|
建表语句:
|
||||||
|
|
||||||
|
单行记录类似于:
|
||||||
|
|
||||||
|
#### 4.1.2 机器参数
|
||||||
|
|
||||||
|
* 执行DataX的机器参数为:
|
||||||
|
1. cpu:
|
||||||
|
2. mem:
|
||||||
|
3. net: 千兆双网卡
|
||||||
|
4. disc: DataX 数据不落磁盘,不统计此项
|
||||||
|
|
||||||
|
* IoTDB数据库机器参数为:
|
||||||
|
1. cpu:
|
||||||
|
2. mem:
|
||||||
|
3. net: 千兆双网卡
|
||||||
|
4. disc:
|
||||||
|
|
||||||
|
#### 4.1.3 DataX jvm 参数
|
||||||
|
|
||||||
|
-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
|
||||||
|
|
||||||
|
### 4.2 测试报告
|
||||||
|
|
||||||
|
#### 4.2.1 单表测试报告
|
||||||
|
|
||||||
|
| 通道数 | DataX速度(Rec/s) | DataX流量(MB/s) | DataX机器网卡流出流量(MB/s) | DataX机器运行负载 | DB网卡进入流量(MB/s) | DB运行负载 | DB TPS |
|
||||||
|
| ------ | ---------------- | --------------- | --------------------------- | ----------------- | -------------------- | ---------- | ------ |
|
||||||
|
| 1 | | | | | | | |
|
||||||
|
| 4 | | | | | | | |
|
||||||
|
| 8 | | | | | | | |
|
||||||
|
| 16 | | | | | | | |
|
||||||
|
| 32 | | | | | | | |
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#### 4.2.4 性能测试小结
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
## 5 约束限制
|
||||||
|
|
||||||
|
|
102
iotdbwriter/pom.xml
Normal file
102
iotdbwriter/pom.xml
Normal file
@ -0,0 +1,102 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<parent>
|
||||||
|
<artifactId>datax-all</artifactId>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<artifactId>iotdbwriter</artifactId>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<maven.compiler.source>8</maven.compiler.source>
|
||||||
|
<maven.compiler.target>8</maven.compiler.target>
|
||||||
|
<iotdb.session.version>1.3.3-SNAPSHOT</iotdb.session.version>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>datax-core</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>datax-common</artifactId>
|
||||||
|
<version>${datax-project-version}</version>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>slf4j-log4j12</artifactId>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.iotdb</groupId>
|
||||||
|
<artifactId>iotdb-session</artifactId>
|
||||||
|
<version>${iotdb.session.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.iotdb</groupId>
|
||||||
|
<artifactId>node-commons</artifactId>
|
||||||
|
<version>${iotdb.session.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<!-- compiler plugin -->
|
||||||
|
<plugin>
|
||||||
|
<artifactId>maven-compiler-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<source>${jdk-version}</source>
|
||||||
|
<target>${jdk-version}</target>
|
||||||
|
<encoding>${project-sourceEncoding}</encoding>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<artifactId>maven-assembly-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<descriptors>
|
||||||
|
<descriptor>src/main/assembly/package.xml</descriptor>
|
||||||
|
</descriptors>
|
||||||
|
<finalName>datax</finalName>
|
||||||
|
</configuration>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>dwzip</id>
|
||||||
|
<phase>package</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>single</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-surefire-plugin</artifactId>
|
||||||
|
<version>2.12.4</version>
|
||||||
|
<configuration>
|
||||||
|
<!-- 包含哪些测试用例 -->
|
||||||
|
<includes>
|
||||||
|
<include>**/*Test.java</include>
|
||||||
|
</includes>
|
||||||
|
<!-- 不包含哪些测试用例 -->
|
||||||
|
<excludes>
|
||||||
|
</excludes>
|
||||||
|
<testFailureIgnore>true</testFailureIgnore>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
|
||||||
|
</project>
|
34
iotdbwriter/src/main/assembly/package.xml
Normal file
34
iotdbwriter/src/main/assembly/package.xml
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
|
||||||
|
<id></id>
|
||||||
|
<formats>
|
||||||
|
<format>dir</format>
|
||||||
|
</formats>
|
||||||
|
<includeBaseDirectory>false</includeBaseDirectory>
|
||||||
|
<fileSets>
|
||||||
|
<fileSet>
|
||||||
|
<directory>src/main/resources</directory>
|
||||||
|
<includes>
|
||||||
|
<include>plugin.json</include>
|
||||||
|
<include>plugin_job_template.json</include>
|
||||||
|
</includes>
|
||||||
|
<outputDirectory>plugin/writer/iotdbwriter</outputDirectory>
|
||||||
|
</fileSet>
|
||||||
|
<fileSet>
|
||||||
|
<directory>target/</directory>
|
||||||
|
<includes>
|
||||||
|
<include>iotdbwriter-0.0.1-SNAPSHOT.jar</include>
|
||||||
|
</includes>
|
||||||
|
<outputDirectory>plugin/writer/iotdbwriter</outputDirectory>
|
||||||
|
</fileSet>
|
||||||
|
</fileSets>
|
||||||
|
|
||||||
|
<dependencySets>
|
||||||
|
<dependencySet>
|
||||||
|
<useProjectArtifact>false</useProjectArtifact>
|
||||||
|
<outputDirectory>plugin/writer/iotdbwriter/libs</outputDirectory>
|
||||||
|
<scope>runtime</scope>
|
||||||
|
</dependencySet>
|
||||||
|
</dependencySets>
|
||||||
|
</assembly>
|
@ -0,0 +1,246 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.iotdbwriter;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.element.Column;
|
||||||
|
import com.alibaba.datax.common.element.Record;
|
||||||
|
import com.alibaba.datax.common.exception.DataXException;
|
||||||
|
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||||
|
import com.alibaba.datax.common.plugin.TaskPluginCollector;
|
||||||
|
import com.alibaba.datax.common.spi.Writer;
|
||||||
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
|
import org.apache.iotdb.isession.util.Version;
|
||||||
|
import org.apache.iotdb.rpc.IoTDBConnectionException;
|
||||||
|
import org.apache.iotdb.rpc.StatementExecutionException;
|
||||||
|
import org.apache.iotdb.session.Session;
|
||||||
|
import org.apache.tsfile.enums.TSDataType;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class IoTDBWriter extends Writer {
|
||||||
|
|
||||||
|
public static class Job extends Writer.Job {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
|
||||||
|
private Configuration jobConf;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
this.jobConf = super.getPluginJobConf();
|
||||||
|
// 检查各种参数是否正确
|
||||||
|
String username = this.jobConf.getString(Key.USERNAME);
|
||||||
|
if (username == null || username.isEmpty()) {
|
||||||
|
throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.USERNAME + "] is not set.");
|
||||||
|
}
|
||||||
|
String password = this.jobConf.getString(Key.PASSWORD);
|
||||||
|
if (password == null || password.isEmpty()) {
|
||||||
|
throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PASSWORD + "] is not set.");
|
||||||
|
}
|
||||||
|
String host = this.jobConf.getString(Key.HOST);
|
||||||
|
if (host == null || host.isEmpty()) {
|
||||||
|
throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.HOST + "] is not set.");
|
||||||
|
}
|
||||||
|
String port = this.jobConf.getString(Key.PORT);
|
||||||
|
if (port == null || port.isEmpty()) {
|
||||||
|
throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PORT + "] is not set.");
|
||||||
|
}
|
||||||
|
// 还有一部分参数没检查,没必要了。
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void prepare(){
|
||||||
|
// 写入前准备,IOTDB不需要提前创建表。
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Configuration> split(int mandatoryNumber) {
|
||||||
|
List<Configuration> configs = new ArrayList<>();
|
||||||
|
// 根据源端划分多个task,每个写task对应一个读task,并行插入下放到session批次处理。
|
||||||
|
for (int i = 0; i < mandatoryNumber; i++) {
|
||||||
|
Configuration clone = this.jobConf.clone();
|
||||||
|
configs.add(clone);
|
||||||
|
}
|
||||||
|
// LOG.info("configs: {}", configs);
|
||||||
|
return configs;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Task extends Writer.Task {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
|
||||||
|
private Configuration taskConf;
|
||||||
|
|
||||||
|
// IoTDB原生读写工具
|
||||||
|
private Session session;
|
||||||
|
|
||||||
|
// 是否在插入前删除已有的时间序列,为""表示不执行
|
||||||
|
// private String deleteBeforeInsert;
|
||||||
|
|
||||||
|
// 插入批次大小
|
||||||
|
private int insertBatchSize;
|
||||||
|
|
||||||
|
// IoTDB中的时间列插入的位置,默认为0,即第一列。
|
||||||
|
private int timeColumnPosition;
|
||||||
|
|
||||||
|
// 处理脏数据
|
||||||
|
private TaskPluginCollector taskPluginCollector;
|
||||||
|
|
||||||
|
// 预先执行的SQL语句
|
||||||
|
private List<String> preSqls;
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
// 获取与本Task相关的配置,是Job的split方法返回的配置列表中的其中一个。
|
||||||
|
this.taskConf = super.getPluginJobConf();
|
||||||
|
|
||||||
|
// session init
|
||||||
|
session =
|
||||||
|
new Session.Builder()
|
||||||
|
.host(taskConf.getString(Key.HOST))
|
||||||
|
.port(taskConf.getInt(Key.PORT))
|
||||||
|
.username(taskConf.getString(Key.USERNAME))
|
||||||
|
.password(taskConf.getString(Key.PASSWORD))
|
||||||
|
.version(Version.valueOf(taskConf.getString(Key.VERSION)))
|
||||||
|
.build();
|
||||||
|
// open session, close RPCCompression
|
||||||
|
try {
|
||||||
|
session.open(false);
|
||||||
|
} catch (IoTDBConnectionException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 获取参数,否则默认值
|
||||||
|
insertBatchSize = (taskConf.getInt(Key.BATCH_SIZE) == null) ? 1000 : taskConf.getInt(Key.BATCH_SIZE);
|
||||||
|
timeColumnPosition = (taskConf.getInt(Key.TIME_COLUMN_POSITION) == null) ? 0 : taskConf.getInt(Key.TIME_COLUMN_POSITION);
|
||||||
|
preSqls = (taskConf.getList(Key.PRE_SQL, String.class) == null) ? new ArrayList<>() : taskConf.getList(Key.PRE_SQL, String.class);
|
||||||
|
taskPluginCollector = super.getTaskPluginCollector();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void prepare() {
|
||||||
|
if (preSqls.size() != 0){
|
||||||
|
for (String sql : preSqls) {
|
||||||
|
try {
|
||||||
|
session.executeNonQueryStatement(sql);
|
||||||
|
|
||||||
|
} catch (IoTDBConnectionException | StatementExecutionException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("=======Complated preSqls=======");
|
||||||
|
}
|
||||||
|
|
||||||
|
// IoTDB会自动创建时间序列,无需提前创建
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
try {
|
||||||
|
if (session != null){
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
} catch (IoTDBConnectionException e) {
|
||||||
|
LOG.info(e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 从RecordReceiver中读取数据,写入目标数据源。
|
||||||
|
* @param lineReceiver 数据来自Reader和Writer之间的缓存队列。
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void startWrite(RecordReceiver lineReceiver) {
|
||||||
|
// 往一个设备device中插入数据
|
||||||
|
Record record = null;
|
||||||
|
try{
|
||||||
|
// 构建List,一个设备批量写入
|
||||||
|
String device = taskConf.getString(Key.DEVICE);
|
||||||
|
List<Long> timestamps = new ArrayList<>();
|
||||||
|
List<List<String>> measurementsList = new ArrayList<>();
|
||||||
|
List<List<Object>> valuesList = new ArrayList<>();
|
||||||
|
List<List<TSDataType>> typesList = new ArrayList<>();
|
||||||
|
|
||||||
|
// 获取Record记录,传输结束返回null
|
||||||
|
int count; // 统计插入记录数
|
||||||
|
for (count = 0; (record = lineReceiver.getFromReader()) != null; count++) {
|
||||||
|
// 处理时间列
|
||||||
|
timestamps.add(record.getColumn(timeColumnPosition).asLong());
|
||||||
|
// 处理测点
|
||||||
|
List<String> measurements = taskConf.getList(Key.MEASUREMENTS, String.class);
|
||||||
|
measurementsList.add(measurements);
|
||||||
|
// 处理类型和值
|
||||||
|
List<TSDataType> types = new ArrayList<>();
|
||||||
|
List<Object> values = new ArrayList<>();
|
||||||
|
try{
|
||||||
|
for (int i = 0; i < record.getColumnNumber(); i++) {
|
||||||
|
if (i == timeColumnPosition){
|
||||||
|
continue; // 跳过时间列
|
||||||
|
}
|
||||||
|
Column col = record.getColumn(i);
|
||||||
|
switch (col.getType()) {
|
||||||
|
case BOOL:
|
||||||
|
types.add(TSDataType.BOOLEAN);
|
||||||
|
values.add(col.asBoolean());
|
||||||
|
break;
|
||||||
|
case INT:
|
||||||
|
types.add(TSDataType.INT32);
|
||||||
|
values.add((Integer) col.getRawData());
|
||||||
|
break;
|
||||||
|
case LONG:
|
||||||
|
types.add(TSDataType.INT64);
|
||||||
|
values.add(col.asLong());
|
||||||
|
break;
|
||||||
|
case DOUBLE:
|
||||||
|
types.add(TSDataType.DOUBLE);
|
||||||
|
values.add(col.asDouble());
|
||||||
|
break;
|
||||||
|
case NULL:
|
||||||
|
// IoTDB可以处理null
|
||||||
|
types.add(null);
|
||||||
|
values.add(null);
|
||||||
|
break;
|
||||||
|
case STRING:
|
||||||
|
types.add(TSDataType.STRING);
|
||||||
|
values.add(col.asString());
|
||||||
|
break;
|
||||||
|
case DATE:
|
||||||
|
types.add(TSDataType.DATE);
|
||||||
|
values.add(col.asDate());
|
||||||
|
break;
|
||||||
|
case BAD:
|
||||||
|
default:
|
||||||
|
throw new RuntimeException("unsupported type:" + col.getType());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
typesList.add(types);
|
||||||
|
valuesList.add(values);
|
||||||
|
}catch (RuntimeException e){
|
||||||
|
LOG.info(e.getMessage());
|
||||||
|
taskPluginCollector.collectDirtyRecord(record, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (count != 0 && count % insertBatchSize == 0) {
|
||||||
|
session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList);
|
||||||
|
timestamps.clear();
|
||||||
|
measurementsList.clear();
|
||||||
|
typesList.clear();
|
||||||
|
valuesList.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!timestamps.isEmpty()){
|
||||||
|
session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList);
|
||||||
|
}
|
||||||
|
LOG.info("========= task all data inserted, total record: " + (count-1));
|
||||||
|
}catch (IoTDBConnectionException | StatementExecutionException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,31 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.iotdbwriter;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.spi.ErrorCode;
|
||||||
|
|
||||||
|
public enum IoTDBWriterErrorCode implements ErrorCode {
|
||||||
|
|
||||||
|
REQUIRED_VALUE("IoTDBWriter-00", "parameter value is missing");
|
||||||
|
|
||||||
|
private final String code;
|
||||||
|
private final String description;
|
||||||
|
|
||||||
|
IoTDBWriterErrorCode(String code, String description) {
|
||||||
|
this.code = code;
|
||||||
|
this.description = description;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getCode() {
|
||||||
|
return this.code;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDescription() {
|
||||||
|
return this.description;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.format("Code:[%s], Description:[%s].", this.code, this.description);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,14 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.iotdbwriter;
|
||||||
|
|
||||||
|
public class Key {
|
||||||
|
public static final String USERNAME = "username";
|
||||||
|
public static final String PASSWORD = "password";
|
||||||
|
public static final String HOST = "host";
|
||||||
|
public static final String PORT = "port";
|
||||||
|
public static final String VERSION = "version";
|
||||||
|
public static final String TIME_COLUMN_POSITION = "timeColumnPosition";
|
||||||
|
public static final String DEVICE = "device";
|
||||||
|
public static final String MEASUREMENTS = "measurements";
|
||||||
|
public static final String PRE_SQL = "preSql";
|
||||||
|
public static final String BATCH_SIZE = "batchSize";
|
||||||
|
}
|
9
iotdbwriter/src/main/resources/plugin.json
Normal file
9
iotdbwriter/src/main/resources/plugin.json
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
{
|
||||||
|
"name": "iotdbwriter",
|
||||||
|
"class": "com.alibaba.datax.plugin.writer.iotdbwriter.IoTDBWriter",
|
||||||
|
"description": {
|
||||||
|
"useScene": "data migration to iotdb",
|
||||||
|
"mechanism": "use iotdb-java-session to write data."
|
||||||
|
},
|
||||||
|
"developer": "timecho.com"
|
||||||
|
}
|
19
iotdbwriter/src/main/resources/plugin_job_template.json
Normal file
19
iotdbwriter/src/main/resources/plugin_job_template.json
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
{
|
||||||
|
"name": "iotdbwriter",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "root",
|
||||||
|
"host": "192.168.150.100",
|
||||||
|
"port": 6667,
|
||||||
|
"version": "V_1_0",
|
||||||
|
"##": "注意是Reader插件读取到的数据中时间列的位置,不是该插件,默认0列",
|
||||||
|
"timeColumnPosition": 0,
|
||||||
|
"batchSize": 1000,
|
||||||
|
"device": "root.cgn.device",
|
||||||
|
"measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"],
|
||||||
|
"###": "在插入之前,执行删除操作,为空或不配置表示不执行",
|
||||||
|
"preSql": [
|
||||||
|
"delete timeseries root.cgn.device.**"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
@ -269,6 +269,11 @@ public class CommonRdbmsReader {
|
|||||||
|
|
||||||
case Types.SMALLINT:
|
case Types.SMALLINT:
|
||||||
case Types.TINYINT:
|
case Types.TINYINT:
|
||||||
|
// 将mysql中的tinyint(1)转为boolean类型
|
||||||
|
if (metaData.getPrecision(i) <= 3){
|
||||||
|
record.addColumn(new BoolColumn(rs.getBoolean(i)));
|
||||||
|
break;
|
||||||
|
}
|
||||||
case Types.INTEGER:
|
case Types.INTEGER:
|
||||||
case Types.BIGINT:
|
case Types.BIGINT:
|
||||||
record.addColumn(new LongColumn(rs.getString(i)));
|
record.addColumn(new LongColumn(rs.getString(i)));
|
||||||
|
2
pom.xml
2
pom.xml
@ -74,6 +74,7 @@
|
|||||||
<module>clickhousereader</module>
|
<module>clickhousereader</module>
|
||||||
|
|
||||||
<module>mongodbreader</module>
|
<module>mongodbreader</module>
|
||||||
|
<module>iotdbreader</module>
|
||||||
<module>tdenginereader</module>
|
<module>tdenginereader</module>
|
||||||
<module>gdbreader</module>
|
<module>gdbreader</module>
|
||||||
<module>tsdbreader</module>
|
<module>tsdbreader</module>
|
||||||
@ -116,6 +117,7 @@
|
|||||||
<module>elasticsearchwriter</module>
|
<module>elasticsearchwriter</module>
|
||||||
<module>mongodbwriter</module>
|
<module>mongodbwriter</module>
|
||||||
<module>tdenginewriter</module>
|
<module>tdenginewriter</module>
|
||||||
|
<module>iotdbwriter</module>
|
||||||
<module>ocswriter</module>
|
<module>ocswriter</module>
|
||||||
<module>tsdbwriter</module>
|
<module>tsdbwriter</module>
|
||||||
<module>gdbwriter</module>
|
<module>gdbwriter</module>
|
||||||
|
Loading…
Reference in New Issue
Block a user