diff --git a/datax-example/datax-example-iotdb/pom.xml b/datax-example/datax-example-iotdb/pom.xml new file mode 100644 index 00000000..f6857e06 --- /dev/null +++ b/datax-example/datax-example-iotdb/pom.xml @@ -0,0 +1,85 @@ + + + + datax-example + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + datax-example-iotdb + + + 8 + 8 + UTF-8 + + + + + com.alibaba.datax + iotdbreader + 0.0.1-SNAPSHOT + compile + + + com.alibaba.datax + iotdbwriter + 0.0.1-SNAPSHOT + compile + + + + com.alibaba.datax + datax-example-core + 0.0.1-SNAPSHOT + + + com.alibaba.datax + streamreader + 0.0.1-SNAPSHOT + + + com.alibaba.datax + streamwriter + 0.0.1-SNAPSHOT + compile + + + + com.alibaba.datax + txtfilereader + 0.0.1-SNAPSHOT + compile + + + com.alibaba.datax + txtfilewriter + 0.0.1-SNAPSHOT + compile + + + + com.alibaba.datax + mysqlreader + 0.0.1-SNAPSHOT + compile + + + com.alibaba.datax + mysqlwriter + 0.0.1-SNAPSHOT + compile + + + + mysql + mysql-connector-java + 8.0.31 + + + + + \ No newline at end of file diff --git a/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestCreateData.java b/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestCreateData.java new file mode 100644 index 00000000..c521d4d8 --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestCreateData.java @@ -0,0 +1,147 @@ +package com.alibaba.datax.example.iotdb; + +import org.apache.iotdb.isession.util.Version; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.session.Session; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.*; + +public class TestCreateData { + private static Session session; + private static Random random = new Random(); + + @Test + public void createAndInsert() + throws IoTDBConnectionException, StatementExecutionException { + // 创建测试数据 + // session init + session = + new Session.Builder() + // .host("192.168.150.100") + .host("172.20.31.61") + .port(6667) + .username("root") + .password("root") + .version(Version.V_0_13) + .build(); + + // open session, close RPCCompression + session.open(false); + + // set session fetchSize + session.setFetchSize(10000); + + // 创建测点并插入数据 + String filePath = "src/test/resources/testData.txt"; + String database = "root.cgn"; + try { + session.createDatabase(database); + } catch (StatementExecutionException e) { + if (e.getStatusCode() != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { + throw e; + } + } + String device = "root.cgn.device"; + createAndInsert2(filePath, device); + } + + private static void createAndInsert2(String filePath, String device) + throws IoTDBConnectionException, StatementExecutionException { + // 读取文件(文件中无表头) + // 点的类型 点名 描述 量纲 量程下限 量程上限 + // AX L2KRT008MA 测试性描述 % 0 1.00E+02 + // AX L2ETY101MP 测试性描述 % 0 1.00E+02 + List> res = new ArrayList<>(); + try (BufferedReader br = new BufferedReader(new FileReader(filePath))) { + String line; + while ((line = br.readLine()) != null) { + String[] words = line.split("\\s+"); + List wordList = new ArrayList<>(Arrays.asList(words)); + res.add(wordList); + } + } catch (IOException e) { + e.printStackTrace(); + } + + // 准备传入的参数,构造时间序列 + List paths = new ArrayList<>(); + List measurements = new ArrayList<>(); + List isDoubleList = new ArrayList<>(); + List tsDataTypes = new ArrayList<>(); + List tsEncodings = new ArrayList<>(); + List compressionTypes = new ArrayList<>(); + List> tagsList = new ArrayList<>(); + List> attributesList = new ArrayList<>(); + List alias = new ArrayList<>(); + + for (int i = 0; i < res.size(); i++) { + measurements.add(res.get(i).get(1)); + paths.add(device + "." + res.get(i).get(1)); + boolean isDouble = "AX".equals(res.get(i).get(0)); + isDoubleList.add(isDouble); + tsDataTypes.add(isDouble ? TSDataType.DOUBLE : TSDataType.BOOLEAN); + tsEncodings.add(isDouble ? TSEncoding.GORILLA : TSEncoding.RLE); + compressionTypes.add(CompressionType.SNAPPY); + Map attributes = new HashMap<>(); + attributes.put("描述", "测试性描述"); + attributes.put("量纲", isDouble ? "%" : ""); + attributes.put("量程下限", "0"); + attributes.put("量程上限", isDouble ? "1.00E+02" : "2.00E+02"); + attributesList.add(attributes); + } + + // 先删除已有的时间序列 + if (session.checkTimeseriesExists(device + ".**")) { + session.deleteTimeseries(device + ".**"); + System.out.println("删除已有的时间序列完成=============="); + } + + // 创建测点时间序列 + session.createMultiTimeseries( + paths, tsDataTypes, tsEncodings, compressionTypes, null, null, attributesList, null); + + // 插入数据:每个测点里都写1万条数据,时间间隔1秒 + List> measurementsList = new ArrayList<>(); + List> valuesList = new ArrayList<>(); + List timestamps = new ArrayList<>(); + List> typesList = new ArrayList<>(); + + long startTime = + LocalDateTime.of(2024, 1, 1, 0, 0, 0, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli(); + int count = 10000; // 每个测点插入数据条数 + + for (long time = startTime; count >= 0; time += 1000, count--) { + timestamps.add(time); + measurementsList.add(measurements); // 39个测点 + typesList.add(tsDataTypes); + + List randomValue = new ArrayList<>(); + for (Boolean isDouble : isDoubleList) { + randomValue.add(isDouble ? random.nextDouble() * 100 : random.nextBoolean()); + } + valuesList.add(randomValue); // 39个随机数 + + // 每1000次插入一批数据 + if (count != 10000 && count % 1000 == 0) { + session.insertRecordsOfOneDevice( + device, timestamps, measurementsList, typesList, valuesList); + measurementsList.clear(); + valuesList.clear(); + typesList.clear(); + timestamps.clear(); + valuesList.clear(); + } + } + } +} diff --git a/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestIoTDB.java b/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestIoTDB.java new file mode 100644 index 00000000..9479373b --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/java/com/alibaba/datax/example/iotdb/TestIoTDB.java @@ -0,0 +1,64 @@ +package com.alibaba.datax.example.iotdb; + +import com.alibaba.datax.example.ExampleContainer; +import com.alibaba.datax.example.util.PathUtil; +import org.junit.Test; + +public class TestIoTDB { + + @Test + public void testIoTDBReader2MySQLWriter() { + String path = "/iotdb2mysql.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + ExampleContainer.start(jobPath); + } + + @Test + public void testMySQLReader2IoTDBWriter() { + String path = "/mysql2iotdb.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + ExampleContainer.start(jobPath); + } + + @Test + public void testIoTDBReader2txtWriter() { + String path = "/iotdb2txt.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + ExampleContainer.start(jobPath); + } + + @Test + public void testIoTDBReader2StreamWriter() { + String path = "/iotdb2stream.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + ExampleContainer.start(jobPath); + } + + @Test + public void testMySQLReader2StreamWriter() { + String path = "/mysql2stream.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + ExampleContainer.start(jobPath); + } + + @Test + public void testMySQLReader2txtWriter() { + String path = "/mysql2txt.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + ExampleContainer.start(jobPath); + } + + @Test + public void testStreamReader2TxtWriter() { + String path = "/stream2txt.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + ExampleContainer.start(jobPath); + } + + @Test + public void testStreamReader2StreamWriter() { + String path = "/stream2stream.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + ExampleContainer.start(jobPath); + } +} diff --git a/datax-example/datax-example-iotdb/src/test/resources/iotdb2mysql.json b/datax-example/datax-example-iotdb/src/test/resources/iotdb2mysql.json new file mode 100644 index 00000000..b77bfbee --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/resources/iotdb2mysql.json @@ -0,0 +1,56 @@ +{ + "job": { + "setting": { + "speed": { + "channel": 3 + } + }, + "content": [ + { + "reader": { + "name": "iotdbreader", + "parameter": { + "username": "root", + "password": "root", + "host": "172.20.31.61", + "port": 6667, + "fetchSize": 10000, + "version": "V_1_0", + "timeColumnPosition": 0, + "querySqls":[ + ], + "device": "root.cgn.device", + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00" + } + }, + "writer": { + "name": "mysqlwriter", + "parameter": { + "username": "root", + "password": "toy123", + "writeMode": "insert", + "#需要提前建表": "CREATE TABLE device (`time` DATETIME,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);", + "column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "session": [ + "set session sql_mode='ANSI'" + ], + "preSql": [ + "delete from device" + ], + "connection": [ + { + "table": [ + "device" + ], + "#": "下面的URL需要把中括号去掉,否则报错,mysqlwriter的bug,未修改", + "jdbcUrl": "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" + } + ] + } + } + } + ] + } +} + diff --git a/datax-example/datax-example-iotdb/src/test/resources/iotdb2stream.json b/datax-example/datax-example-iotdb/src/test/resources/iotdb2stream.json new file mode 100644 index 00000000..ce8f401a --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/resources/iotdb2stream.json @@ -0,0 +1,42 @@ +{ + "job": { + "setting": { + "speed": { + "channel": 3 + }, + "errorLimit": { + "record": 0, + "percentage": 0.02 + } + }, + "content": [ + { + "reader": { + "name": "iotdbreader", + "parameter": { + "username": "root", + "password": "root", + "host": "192.168.150.100", + "port": 6667, + "fetchSize": 10000, + "version": "V_1_0", + "timeColumnPosition": 0, + "querySqls":[ + + ], + "device": "root.cgn.device", + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00" + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "print":true + } + } + } + ] + } +} + diff --git a/datax-example/datax-example-iotdb/src/test/resources/iotdb2txt.json b/datax-example/datax-example-iotdb/src/test/resources/iotdb2txt.json new file mode 100644 index 00000000..990411c5 --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/resources/iotdb2txt.json @@ -0,0 +1,42 @@ +{ + "job": { + "setting": { + "speed": { + "channel": 3 + } + }, + "content": [ + { + "reader": { + "name": "iotdbreader", + "parameter": { + "username": "root", + "password": "root", + "host": "172.20.31.61", + "port": 6667, + "fetchSize": 10000, + "version": "V_1_0", + "timeColumnPosition": 0, + "querySqls":[ + "select * from root.cgn.device", + "select A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU from root.cgn.device" + ], + "device": "root.cgn.device", + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00" + } + }, + "writer": { + "name": "txtfilewriter", + "parameter": { + "path": "D:/下载", + "fileName": "txtText", + "writeMode": "truncate", + "dateFormat": "yyyy-MM-dd" + } + } + } + ] + } +} + diff --git a/datax-example/datax-example-iotdb/src/test/resources/mysql2iotdb.json b/datax-example/datax-example-iotdb/src/test/resources/mysql2iotdb.json new file mode 100644 index 00000000..d66d9c94 --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/resources/mysql2iotdb.json @@ -0,0 +1,50 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "username": "root", + "password": "toy123", + "column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "splitPk": "", + "connection": [ + { + "table": [ + "device" + ], + "jdbcUrl": [ + "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" + ] + } + ] + } + }, + "writer": { + "name": "iotdbwriter", + "parameter": { + "username": "root", + "password": "root", + "host": "172.20.31.61", + "port": 6667, + "fetchSize": 10000, + "version": "V_1_0", + "timeColumnPosition": 0, + "insertBatchSize": 1000, + "device": "root.cgn.device", + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "preSql": [ + "delete timeseries root.cgn.device.**" + ] + } + } + } + ], + "setting": { + "speed": { + "channel": 3 + } + } + } +} \ No newline at end of file diff --git a/datax-example/datax-example-iotdb/src/test/resources/mysql2stream.json b/datax-example/datax-example-iotdb/src/test/resources/mysql2stream.json new file mode 100644 index 00000000..5414a5f4 --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/resources/mysql2stream.json @@ -0,0 +1,46 @@ +{ + "job": { + "setting": { + "speed": { + "channel": 3 + }, + "errorLimit": { + "record": 0, + "percentage": 0.02 + } + }, + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "username": "root", + "password": "toy123", + "column": [ + "order_num", + "cust_id" + ], + "splitPk": "", + "connection": [ + { + "table": [ + "orders" + ], + "jdbcUrl": [ + "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" + ] + } + ] + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "print":true + } + } + } + ] + } +} + diff --git a/datax-example/datax-example-iotdb/src/test/resources/mysql2txt.json b/datax-example/datax-example-iotdb/src/test/resources/mysql2txt.json new file mode 100644 index 00000000..879bcbc0 --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/resources/mysql2txt.json @@ -0,0 +1,41 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "username": "root", + "password": "toy123", + "column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "splitPk": "", + "connection": [ + { + "table": [ + "device" + ], + "jdbcUrl": [ + "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" + ] + } + ] + } + }, + "writer": { + "name": "txtfilewriter", + "parameter": { + "path": "D:/下载", + "fileName": "txtText", + "writeMode": "truncate", + "dateFormat": "yyyy-MM-dd" + } + } + } + ], + "setting": { + "speed": { + "channel": 5 + } + } + } +} \ No newline at end of file diff --git a/datax-example/datax-example-iotdb/src/test/resources/stream2stream.json b/datax-example/datax-example-iotdb/src/test/resources/stream2stream.json new file mode 100644 index 00000000..0193d822 --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/resources/stream2stream.json @@ -0,0 +1,36 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "sliceRecordCount": 10, + "column": [ + { + "type": "long", + "value": "15" + }, + { + "type": "string", + "value": "hello,你好,世界-DataX" + } + ] + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "encoding": "UTF-8", + "print": true + } + } + } + ], + "setting": { + "speed": { + "channel": 5 + } + } + } +} \ No newline at end of file diff --git a/datax-example/datax-example-iotdb/src/test/resources/stream2txt.json b/datax-example/datax-example-iotdb/src/test/resources/stream2txt.json new file mode 100644 index 00000000..40fd9e29 --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/resources/stream2txt.json @@ -0,0 +1,38 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "sliceRecordCount": 10, + "column": [ + { + "type": "long", + "value": "10" + }, + { + "type": "string", + "value": "hello,你好,世界-DataX" + } + ] + } + }, + "writer": { + "name": "txtfilewriter", + "parameter": { + "path": "D:/下载", + "fileName": "txtText", + "writeMode": "truncate", + "dateFormat": "yyyy-MM-dd" + } + } + } + ], + "setting": { + "speed": { + "channel": 5 + } + } + } +} \ No newline at end of file diff --git a/datax-example/datax-example-iotdb/src/test/resources/testData.txt b/datax-example/datax-example-iotdb/src/test/resources/testData.txt new file mode 100644 index 00000000..d0588b20 --- /dev/null +++ b/datax-example/datax-example-iotdb/src/test/resources/testData.txt @@ -0,0 +1,39 @@ +AX L2KRT008MA 测试性描述 % 0 1.00E+02 +AX L2ETY101MP 测试性描述 % 0 1.00E+02 +AX L2SEC002MD 测试性描述 % 0 1.00E+02 +AX L1ETY103MP 测试性描述 % 0 1.00E+02 +AX D1KRT003EU 测试性描述 % 0 1.00E+02 +AX L4EAS008MDX_XQ01 测试性描述 % 0 1.00E+02 +AX L3RIC039KMX_XQ01 测试性描述 % 0 1.00E+02 +AX L2VVP008MP 测试性描述 % 0 1.00E+02 +AX D2RCV039MD 测试性描述 % 0 1.00E+02 +DX D2RPA063EC 测试性描述 0 2.00E+00 +AX L4KRT022MAK_XP01 测试性描述 % 0 1.00E+02 +AX L3SEC002MPX_XQ01 测试性描述 % 0 1.00E+02 +DX L4RCP651KSX_XG52 测试性描述 0 2.00E+00 +AX L2RRI082MT 测试性描述 % 0 1.00E+02 +AX A5STD 测试性描述 % 0 1.00E+02 +DX D2RRI003PO 测试性描述 0 2.00E+00 +AX L4RCP091MT_XQ01 测试性描述 % 0 1.00E+02 +DX L1GCT132SM5 测试性描述 0 2.00E+00 +DX L1GCT132SM3 测试性描述 0 2.00E+00 +AX L2RIC035MT 测试性描述 % 0 1.00E+02 +AX L1RGL018QM 测试性描述 % 0 1.00E+02 +AX L1RIC037MT 测试性描述 % 0 1.00E+02 +AX D2RCP042MD 测试性描述 % 0 1.00E+02 +AX D2RGL001QM 测试性描述 % 0 1.00E+02 +AX D2RIC020MT 测试性描述 % 0 1.00E+02 +AX D1RIC022MT 测试性描述 % 0 1.00E+02 +AX D1RGL003QM 测试性描述 % 0 1.00E+02 +AX L3LGE005TU_XQ01 测试性描述 % 0 1.00E+02 +DX D2RCP517EC 测试性描述 0 2.00E+00 +AX D1RCP054MD 测试性描述 % 0 1.00E+02 +AX L2RIS014MD 测试性描述 % 0 1.00E+02 +DX D1LGE001JA 测试性描述 0 2.00E+00 +DX D2SEC001PO 测试性描述 0 2.00E+00 +DX D1SEC003PO 测试性描述 0 2.00E+00 +AX D1RIS001MD 测试性描述 % 0 1.00E+02 +AX L1RCP090MT 测试性描述 % 0 1.00E+02 +DX L2VVP003SM5 测试性描述 0 2.00E+00 +AX L3RIC022KMX_XQ01 测试性描述 % 0 1.00E+02 +DX L1GCT501EC 测试性描述 0 2.00E+00 \ No newline at end of file diff --git a/datax-example/pom.xml b/datax-example/pom.xml index 9c4c9200..ac50d774 100644 --- a/datax-example/pom.xml +++ b/datax-example/pom.xml @@ -15,6 +15,7 @@ datax-example-core datax-example-streamreader datax-example-neo4j + datax-example-iotdb diff --git a/iotdbreader/doc/iotdbreader-CN.md b/iotdbreader/doc/iotdbreader-CN.md new file mode 100644 index 00000000..17f26b57 --- /dev/null +++ b/iotdbreader/doc/iotdbreader-CN.md @@ -0,0 +1,212 @@ +# DataX IoTDBReader + +## 1 快速介绍 + +IoTDBReader 插件实现了 IoTDB 读取数据的功能。 + +## 2 实现原理 + +IoTDBReader 通过 IoTDB 的 原生java session 查询获取数据。 + +## 3 功能说明 + +### 3.1 配置样例 + +* 配置一个从 IoTDB 抽取数据作业: + +```json +{ + "job": { + "setting": { + "speed": { + "channel": 3 + } + }, + "content": [ + { + "reader": { + "name": "iotdbreader", + "parameter": { + "username": "root", + "password": "root", + "host": "192.168.150.100", + "port": 6667, + "fetchSize": 10000, + "version": "V_1_0", + "##": "时间列插入DataX的Record中的位置,默认第0列", + "timeColumnPosition": 0, + "##":"写了querySqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句", + "querySqls":[ + ], + "device": "root.cgn.device", + "##":"时间列不属于测点", + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00" + } + }, + "writer": { + "name": "mysqlwriter", + "parameter": { + "username": "root", + "password": "toy123", + "writeMode": "insert", + "#需要提前建表": "CREATE TABLE device (`time` DATETIME,`A5STD` DOUBLE,`L2RIS014MD` DOUBLE,`L2VVP003SM5` BOOLEAN,`D1RIS001MD` DOUBLE,`D1KRT003EU` DOUBLE);", + "column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "session": [ + "set session sql_mode='ANSI'" + ], + "preSql": [ + "delete from device" + ], + "connection": [ + { + "table": [ + "device" + ], + "#": "下面的URL需要把中括号去掉,否则报错,mysqlreader的bug,未修改", + "jdbcUrl": "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" + } + ] + } + } + } + ] + } +} +``` + +* 配置一个自定义 SQL 的数据抽取作业: + +```json +{ + "job": { + "setting": { + "speed": { + "channel": 3 + } + }, + "content": [ + { + "reader": { + "name": "iotdbreader", + "parameter": { + "username": "root", + "password": "root", + "host": "192.168.150.100", + "port": 6667, + "fetchSize": 10000, + "version": "V_1_0", + "timeColumnPosition": 0, + "##":"写了querySqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句", + "querySqls":[ + "select * from root.cgn.device", + "select A5STD,L2RIS014MD,L2VVP003SM5,D1RIS001MD,D1KRT003EU from root.cgn.device" + ], + "device": "", + "measurements": "", + "where": "" + } + }, + "writer": { + "name": "txtfilewriter", + "parameter": { + "path": "D:/下载", + "fileName": "txtText", + "writeMode": "truncate", + "dateFormat": "yyyy-MM-dd" + } + } + } + ] + } +} +``` + +### 3.2 参数说明 +* username + * 描述:用户名 + * 必选:是 + * 默认值:无 +* password + * 描述:用户名的密码 + * 必选:是 + * 默认值:无 +* host + * 描述:连接iotdb数据库的主机地址 + * 必选:是 + * 默认值:无 +* port + * 描述:端口 + * 必选:是 + * 默认值:无 +* version + * 描述:iotdb版本 + * 必选:是 + * 默认值:无 +* timeColumnPosition + * 描述:时间列在Record中列的位置 + * 必选:否 + * 默认值:0 +* querySqls + * 描述:直接写多行SQL,可以并行读取,此时下面的参数失效。 + * 必选:否 + * 默认值: +* device + * 描述:IoTDB中的概念,可理解为mysql中的表。 + * 必选:querySqls为空时必选 + * 默认值:无 +* measurements + * 描述:IoTDB中的概念,可理解为mysql中的字段。 + * 必选:querySqls为空时必选 + * 默认值:无 +* where + * 描述:查询条件 + * 必选:否 + * 默认值:无 + +### 3.3 类型转换 + +| IoTDB 数据类型 | DataX 内部类型 | +|-----------------|--------| +| INT32 | Int | +| INT64,TIMESTAMP | Long | +| FLOAT | FLOAT | +| DOUBLE | Double | +| BOOLEAN | Bool | +| DATE | Date | +| STRING,TEXT | String | + +## 4 性能报告 + +### 4.1 环境准备 + +#### 4.1.1 数据特征 + +#### 4.1.2 机器参数 + +#### 4.1.3 DataX jvm 参数 + + -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError + +### 4.2 测试报告 + +#### 4.2.1 单表测试报告 + +| 通道数| DataX速度(Rec/s)|DataX流量(MB/s)| DataX机器网卡流出流量(MB/s)|DataX机器运行负载|DB网卡进入流量(MB/s)|DB运行负载|DB TPS| +|--------| --------|--------|--------|--------|--------|--------|--------| +|1| | | | | | | | +|4| | | | | | | | +|8| | | | | | | | +|16| | | | | | | | +|32| | | | | | | | + +说明: + +#### 4.2.4 性能测试小结 + +1. +2. + +## 5 约束限制 + +## FAQ \ No newline at end of file diff --git a/iotdbreader/pom.xml b/iotdbreader/pom.xml new file mode 100644 index 00000000..b531891e --- /dev/null +++ b/iotdbreader/pom.xml @@ -0,0 +1,102 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + iotdbreader + + + 8 + 8 + 1.3.3-SNAPSHOT + + + + + com.alibaba.datax + datax-core + 0.0.1-SNAPSHOT + test + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.iotdb + iotdb-session + ${iotdb.session.version} + + + org.apache.iotdb + node-commons + ${iotdb.session.version} + + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.12.4 + + + + **/*Test.java + + + + + true + + + + + + + \ No newline at end of file diff --git a/iotdbreader/src/main/assembly/package.xml b/iotdbreader/src/main/assembly/package.xml new file mode 100644 index 00000000..f01364f1 --- /dev/null +++ b/iotdbreader/src/main/assembly/package.xml @@ -0,0 +1,34 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/iotdbreader + + + target/ + + iotdbreader-0.0.1-SNAPSHOT.jar + + plugin/writer/iotdbreader + + + + + + false + plugin/reader/iotdbreader/libs + runtime + + + diff --git a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java new file mode 100644 index 00000000..b285e906 --- /dev/null +++ b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReader.java @@ -0,0 +1,252 @@ +package com.alibaba.datax.plugin.reader.iotdbreader; + +import com.alibaba.datax.common.element.*; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.AbstractTaskPlugin; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.isession.util.Version; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.Field; +import org.apache.tsfile.read.common.RowRecord; +import org.apache.tsfile.read.common.block.column.NullColumn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; + +public class IoTDBReader extends Reader { + public static class Job extends Reader.Job { + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + private Configuration jobConf; + + /** + * Job对象初始化工作 + */ + @Override + public void init() { + // 通过super.getPluginJobConf()获取与本插件相关的配置。 + this.jobConf = super.getPluginJobConf(); + // 检查各种参数是否正确 + String username = this.jobConf.getString(Key.USERNAME); + if (username == null || username.isEmpty()) { + throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.USERNAME + "] is not set."); + } + String password = this.jobConf.getString(Key.PASSWORD); + if (password == null || password.isEmpty()) { + throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PASSWORD + "] is not set."); + } + String host = this.jobConf.getString(Key.HOST); + if (host == null || host.isEmpty()) { + throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.HOST + "] is not set."); + } + String port = this.jobConf.getString(Key.PORT); + if (port == null || port.isEmpty()) { + throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PORT + "] is not set."); + } + String fetchSize = this.jobConf.getString(Key.FETCH_SIZE); + if (fetchSize == null || fetchSize.isEmpty()) { + throw DataXException.asDataXException(IoTDBReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.FETCH_SIZE + "] is not set."); + } + // 还有一部分参数没检查,没必要了。 + + } + @Override + public void prepare() { + } + + /** + * 将Job拆分Task。 + * @param adviceNumber 框架建议的拆分数,一般是运行时所配置的并发度。 + * @return Task的配置列表,一个配置文件对应一个task。 + */ + @Override + public List split(int adviceNumber) { + // 每个config对应一个task + List configs = new ArrayList<>(); + List queryList = this.jobConf.getList(Key.QUERY_SQLS, String.class); + if (queryList == null || queryList.size() == 0){ + Configuration clone = this.jobConf.clone(); + String device = this.jobConf.getString(Key.DEVICE); + List measurements = this.jobConf.getList(Key.MEASUREMENTS, String.class); + String where = this.jobConf.getString(Key.WHERE); + StringBuilder sb = new StringBuilder(); + sb.append("select ").append(String.join(",", measurements)); + sb.append(" from ").append(device); + if (where != null && !where.isEmpty()){ + sb.append(" where ").append(where); + } + clone.set(Key.QUERY_SQL, sb.toString()); + configs.add(clone); + //DataX中一个查询是单线程,实际上底层session中是多线程读取。 + }else{ + // 直接读取最终SQL + for (String query : queryList) { + Configuration clone = this.jobConf.clone(); + clone.remove(Key.QUERY_SQLS); + clone.set(Key.QUERY_SQL, query); + configs.add(clone); + } + } + // LOG.info("configs: {}", configs); + return configs; + } + + /** + * Job对象自身的销毁工作。 + */ + @Override + public void destroy() { + + } + + /** + * 全局的后置工作。 + */ + @Override + public void post() { + + } + + + } + + public static class Task extends Reader.Task { + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + + private Configuration taskConf; + + /** + * IoTDB原生读写工具 + */ + private Session session; + /** + * IoTDB中的时间列插入的位置,默认为0,即第一列。 + */ + private int timeColumnPosition; + /** + * 最终的查询SQL,交给session执行。 + */ + private String querySql; + private TaskPluginCollector taskPluginCollector; + + @Override + public void init() { + // 获取与本Task相关的配置,是Job的split方法返回的配置列表中的其中一个。 + taskConf = super.getPluginJobConf(); + + // session init + session = + new Session.Builder() + .host(taskConf.getString(Key.HOST)) + .port(taskConf.getInt(Key.PORT)) + .username(taskConf.getString(Key.USERNAME)) + .password(taskConf.getString(Key.PASSWORD)) + .version(Version.valueOf(taskConf.getString(Key.VERSION))) + .build(); + // open session, close RPCCompression + try { + session.open(false); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + + // set session fetchSize + session.setFetchSize(taskConf.getInt(Key.FETCH_SIZE)); + + this.timeColumnPosition = (taskConf.getInt(Key.TIME_COLUMN_POSITION) == null) ? 0 : taskConf.getInt(Key.TIME_COLUMN_POSITION); + this.querySql = taskConf.getString(Key.QUERY_SQL); + taskPluginCollector = super.getTaskPluginCollector(); + } + + @Override + public void destroy() { + // Task自身的销毁工作。 + try { + if (session != null){ + session.close(); + } + } catch (IoTDBConnectionException e) { + LOG.info(e.getMessage()); + } + } + + /** + * 从数据源读数据,写入到RecordSender中。 + * @param recordSender 把数据写入连接Reader和Writer的缓存队列。 + */ + @Override + public void startRead(RecordSender recordSender) { + try { + SessionDataSet dataSet = session.executeQueryStatement(this.querySql); + while (dataSet.hasNext()) { + // DataX中的行record + Record record = recordSender.createRecord(); + // IoTDB中的行 + RowRecord rowRecord = dataSet.next(); + List fields = rowRecord.getFields(); + try { + // 除time列外的其他列遍历类型后转换 + for (int i = 0; i < fields.size(); i++) { + if (i == timeColumnPosition){ + // time列插入指定位置,时间列不在fields中,需要单独处理。不能为null + long timestamp = rowRecord.getTimestamp(); + record.addColumn(new DateColumn(timestamp)); + } + Field field = fields.get(i); + TSDataType dataType = field.getDataType(); + if (dataType == null) { + // 需要写插件支持处理null数据,否则会空指向异常,这里先当成脏数据 + // record.addColumn(null); + // continue; + throw new RuntimeException("null datatype"); + } + switch (dataType) { + case BOOLEAN: + record.addColumn(new BoolColumn(field.getBoolV())); + break; + case INT32: + record.addColumn(new LongColumn(field.getIntV())); + break; + case INT64: + case TIMESTAMP: + record.addColumn(new LongColumn(field.getLongV())); + break; + case FLOAT: + record.addColumn(new DoubleColumn(field.getFloatV())); + break; + case DOUBLE: + record.addColumn(new DoubleColumn(field.getDoubleV())); + break; + case STRING: + case TEXT: + record.addColumn(new StringColumn(field.getStringValue())); + break; + case DATE: + record.addColumn(new DateColumn(Date.valueOf(field.getDateV()))); + break; + default: + throw new RuntimeException("Unsupported data type: " + dataType); + } + } + // 发送 + recordSender.sendToWriter(record); + }catch (RuntimeException e){ + LOG.info(e.getMessage()); + this.taskPluginCollector.collectDirtyRecord(record, e); + } + } + } catch (StatementExecutionException | IoTDBConnectionException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReaderErrorCode.java b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReaderErrorCode.java new file mode 100644 index 00000000..96fa2ff6 --- /dev/null +++ b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/IoTDBReaderErrorCode.java @@ -0,0 +1,33 @@ +package com.alibaba.datax.plugin.reader.iotdbreader; + +import com.alibaba.datax.common.spi.ErrorCode; + +public enum IoTDBReaderErrorCode implements ErrorCode { + + REQUIRED_VALUE("IoTDBReader-00", "parameter value is missing"), + ILLEGAL_VALUE("IoTDBReader-01", "invalid parameter value"), + CONNECTION_FAILED("IoTDBReader-02", "connection error"); + + private final String code; + private final String description; + + IoTDBReaderErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s].", this.code, this.description); + } +} diff --git a/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/Key.java b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/Key.java new file mode 100644 index 00000000..0283be6a --- /dev/null +++ b/iotdbreader/src/main/java/com/alibaba/datax/plugin/reader/iotdbreader/Key.java @@ -0,0 +1,16 @@ +package com.alibaba.datax.plugin.reader.iotdbreader; + +public class Key { + public static final String USERNAME = "username"; + public static final String PASSWORD = "password"; + public static final String HOST = "host"; + public static final String PORT = "port"; + public static final String FETCH_SIZE = "fetchSize"; + public static final String VERSION = "version"; + public static final String QUERY_SQLS = "querySqls"; + public static final String QUERY_SQL = "querySql"; + public static final String TIME_COLUMN_POSITION = "timeColumnPosition"; + public static final String DEVICE = "device"; + public static final String MEASUREMENTS = "measurements"; + public static final String WHERE = "where"; +} diff --git a/iotdbreader/src/main/resources/plugin.json b/iotdbreader/src/main/resources/plugin.json new file mode 100644 index 00000000..39d6b897 --- /dev/null +++ b/iotdbreader/src/main/resources/plugin.json @@ -0,0 +1,10 @@ +{ + "name": "iotdbreader", + "class": "com.alibaba.datax.plugin.reader.iotdbreader.IoTDBReader", + "description": { + "useScene": "data migration to iotdb", + "mechanism": "use iotdb-java-session to write data." + }, + "developer": "timecho.com" +} + diff --git a/iotdbreader/src/main/resources/plugin_job_template.json b/iotdbreader/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..14287ed7 --- /dev/null +++ b/iotdbreader/src/main/resources/plugin_job_template.json @@ -0,0 +1,25 @@ +{ + "name": "iotdbreader", + "parameter": { + "username": "root", + "password": "root", + "host": "192.168.150.100", + "port": 6667, + "fetchSize": 10000, + "version": "V_1_0", + "##": "时间列插入DataX的Record中的位置,默认第0列", + "timeColumnPosition": 0, + "##":"写了querySqls 默认下面全部参数失效,适合开发人员, 多个session并行执行多条SQL语句", + "querySqls": [ + "select * from root.cgn.device", + "select * from root.cgn.device", + "select * from root.cgn.device", + "select * from root.cgn.device", + "select * from root.cgn.device" + ], + "device": "root.cgn.device", + "##":"时间列不属于测点", + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "where": "time > 2023-03-07 12:00:00 and time < 2024-03-07 19:00:00" + } +} \ No newline at end of file diff --git a/iotdbwriter/doc/iotdbwriter-CN.md b/iotdbwriter/doc/iotdbwriter-CN.md new file mode 100644 index 00000000..dc350cac --- /dev/null +++ b/iotdbwriter/doc/iotdbwriter-CN.md @@ -0,0 +1,200 @@ +# DataX IoTDBWriter + +## 1 快速介绍 +IoTDBWriter插件实现了写入数据到IoTDB数据库目标表(设备)的功能。 + +底层实现上,IoTDBWriter通过iotdb.session连接IoTDB,按照IoTDB的SQL语法, +执行session.insertRecordsOfOneDevice语句,将数据写入IoTDB。 + +IoTDBWriter可以作为数据迁移工具供DBA将其它数据库的数据导入到IoTDB。 + +## 2 实现原理 + +IoTDBWriter 通过 DataX 框架获取 Reader 生成的协议数据Record,通过Session连接IoTDB,执行insert语句,将数据写入IoTDB。 + +IoTDB中设备与列的概念见IoTDB官方文档。 + + + +## 3 功能说明 +### 3.1 配置样例 + +配置一个MySQL数据写入IoTDB的作业 + +使用下面的Job配置,将数据写入IoTDB: + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "username": "root", + "password": "toy123", + "column": ["time","A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "splitPk": "", + "connection": [ + { + "table": [ + "device" + ], + "jdbcUrl": [ + "jdbc:mysql://localhost:3306/demodb?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" + ] + } + ] + } + }, + "writer": { + "name": "iotdbwriter", + "parameter": { + "username": "root", + "password": "root", + "host": "192.168.150.100", + "port": 6667, + "version": "V_1_0", + "##": "Reader中时间列的位置,默认0列", + "timeColumnPosition": 0, + "batchSize": 1000, + "device": "root.cgn.device", + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "###": "在插入之前,预先执行的SQL,默认为空", + "preSql": [] + } + } + } + ], + "setting": { + "speed": { + "channel": 3 + } + } + } +} +``` + +注意:mysqlreader插件,在src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java 中270行左右,修改了代码,将mysql中的tinyint(1)转为boolean类型 +case Types.SMALLINT: +case Types.TINYINT: + // 将mysql中的tinyint(1)转为boolean类型 + if (metaData.getPrecision(i) <= 3){ + record.addColumn(new BoolColumn(rs.getBoolean(i))); + break; + } + + +### 3.2 参数说明 + +* username + * 描述:用户名 + * 必选:是 + * 默认值:无 +* password + * 描述:用户名的密码 + * 必选:是 + * 默认值:无 +* host + * 描述:连接iotdb数据库的主机地址 + * 必选:是 + * 默认值:无 +* port + * 描述:端口 + * 必选:是 + * 默认值:无 +* version + * 描述:iotdb版本 + * 必选:是 + * 默认值:无 +* timeColumnPosition + * 描述:时间列在Record中列的位置 + * 必选:否 + * 默认值:0 +* device + * 描述:iotdb中的概念,对应mysql中的表名 + * 必选:是 + * 默认值:无 +* measurements + * 描述:iotdb中的概念,对应mysql中的字段集合,顺序应该与record中column的顺序相同 + * 必选:是 + * 默认值:无 +* batchSize + * 描述:每batchSize条record为一个batch进行写入 + * 必选:否 + * 默认值:1000 +* preSql + * 描述:插入前是否预先执行SQL + * 必选:否 + * 默认值:无 + +### 3.3 类型转换 + +datax中的数据类型,映射到IoTDB的数据类型 + +| DataX 内部类型 | IoTDB 数据类型 | +| -------------- |------------------| +| INT | INT32 | +| LONG | TIMESTAMP, INT64 | +| DOUBLE | DOUBLE | +| STRING | STRING | +| BOOL | BOOL | +| DATE | TIMESTAMP,DATE | +| BYTES | BINARY | + + + +### 3.4 各数据源到IoTDB的参考示例 +见datax-example/datax-example-iotdb + + +## 4 性能报告 + +### 4.1 环境准备 + +#### 4.1.1 数据特征 + +建表语句: + +单行记录类似于: + +#### 4.1.2 机器参数 + +* 执行DataX的机器参数为: + 1. cpu: + 2. mem: + 3. net: 千兆双网卡 + 4. disc: DataX 数据不落磁盘,不统计此项 + +* IoTDB数据库机器参数为: + 1. cpu: + 2. mem: + 3. net: 千兆双网卡 + 4. disc: + +#### 4.1.3 DataX jvm 参数 + + -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError + +### 4.2 测试报告 + +#### 4.2.1 单表测试报告 + +| 通道数 | DataX速度(Rec/s) | DataX流量(MB/s) | DataX机器网卡流出流量(MB/s) | DataX机器运行负载 | DB网卡进入流量(MB/s) | DB运行负载 | DB TPS | +| ------ | ---------------- | --------------- | --------------------------- | ----------------- | -------------------- | ---------- | ------ | +| 1 | | | | | | | | +| 4 | | | | | | | | +| 8 | | | | | | | | +| 16 | | | | | | | | +| 32 | | | | | | | | + + + +#### 4.2.4 性能测试小结 + + + + +## 5 约束限制 + + diff --git a/iotdbwriter/pom.xml b/iotdbwriter/pom.xml new file mode 100644 index 00000000..31c747be --- /dev/null +++ b/iotdbwriter/pom.xml @@ -0,0 +1,102 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + iotdbwriter + + + 8 + 8 + 1.3.3-SNAPSHOT + + + + + com.alibaba.datax + datax-core + 0.0.1-SNAPSHOT + test + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.iotdb + iotdb-session + ${iotdb.session.version} + + + org.apache.iotdb + node-commons + ${iotdb.session.version} + + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.12.4 + + + + **/*Test.java + + + + + true + + + + + + + \ No newline at end of file diff --git a/iotdbwriter/src/main/assembly/package.xml b/iotdbwriter/src/main/assembly/package.xml new file mode 100644 index 00000000..ca1ba9ad --- /dev/null +++ b/iotdbwriter/src/main/assembly/package.xml @@ -0,0 +1,34 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/iotdbwriter + + + target/ + + iotdbwriter-0.0.1-SNAPSHOT.jar + + plugin/writer/iotdbwriter + + + + + + false + plugin/writer/iotdbwriter/libs + runtime + + + diff --git a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java new file mode 100644 index 00000000..ddade9e3 --- /dev/null +++ b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriter.java @@ -0,0 +1,246 @@ +package com.alibaba.datax.plugin.writer.iotdbwriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import org.apache.iotdb.isession.util.Version; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.tsfile.enums.TSDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class IoTDBWriter extends Writer { + + public static class Job extends Writer.Job { + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + private Configuration jobConf; + + @Override + public void init() { + this.jobConf = super.getPluginJobConf(); + // 检查各种参数是否正确 + String username = this.jobConf.getString(Key.USERNAME); + if (username == null || username.isEmpty()) { + throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.USERNAME + "] is not set."); + } + String password = this.jobConf.getString(Key.PASSWORD); + if (password == null || password.isEmpty()) { + throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PASSWORD + "] is not set."); + } + String host = this.jobConf.getString(Key.HOST); + if (host == null || host.isEmpty()) { + throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.HOST + "] is not set."); + } + String port = this.jobConf.getString(Key.PORT); + if (port == null || port.isEmpty()) { + throw DataXException.asDataXException(IoTDBWriterErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PORT + "] is not set."); + } + // 还有一部分参数没检查,没必要了。 + } + + @Override + public void prepare(){ + // 写入前准备,IOTDB不需要提前创建表。 + } + + @Override + public List split(int mandatoryNumber) { + List configs = new ArrayList<>(); + // 根据源端划分多个task,每个写task对应一个读task,并行插入下放到session批次处理。 + for (int i = 0; i < mandatoryNumber; i++) { + Configuration clone = this.jobConf.clone(); + configs.add(clone); + } + // LOG.info("configs: {}", configs); + return configs; + } + + @Override + public void destroy() { + } + + } + + public static class Task extends Writer.Task { + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + private Configuration taskConf; + + // IoTDB原生读写工具 + private Session session; + + // 是否在插入前删除已有的时间序列,为""表示不执行 + // private String deleteBeforeInsert; + + // 插入批次大小 + private int insertBatchSize; + + // IoTDB中的时间列插入的位置,默认为0,即第一列。 + private int timeColumnPosition; + + // 处理脏数据 + private TaskPluginCollector taskPluginCollector; + + // 预先执行的SQL语句 + private List preSqls; + + + @Override + public void init() { + // 获取与本Task相关的配置,是Job的split方法返回的配置列表中的其中一个。 + this.taskConf = super.getPluginJobConf(); + + // session init + session = + new Session.Builder() + .host(taskConf.getString(Key.HOST)) + .port(taskConf.getInt(Key.PORT)) + .username(taskConf.getString(Key.USERNAME)) + .password(taskConf.getString(Key.PASSWORD)) + .version(Version.valueOf(taskConf.getString(Key.VERSION))) + .build(); + // open session, close RPCCompression + try { + session.open(false); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + + // 获取参数,否则默认值 + insertBatchSize = (taskConf.getInt(Key.BATCH_SIZE) == null) ? 1000 : taskConf.getInt(Key.BATCH_SIZE); + timeColumnPosition = (taskConf.getInt(Key.TIME_COLUMN_POSITION) == null) ? 0 : taskConf.getInt(Key.TIME_COLUMN_POSITION); + preSqls = (taskConf.getList(Key.PRE_SQL, String.class) == null) ? new ArrayList<>() : taskConf.getList(Key.PRE_SQL, String.class); + taskPluginCollector = super.getTaskPluginCollector(); + } + + @Override + public void prepare() { + if (preSqls.size() != 0){ + for (String sql : preSqls) { + try { + session.executeNonQueryStatement(sql); + + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + } + LOG.info("=======Complated preSqls======="); + } + + // IoTDB会自动创建时间序列,无需提前创建 + } + + @Override + public void destroy() { + try { + if (session != null){ + session.close(); + } + } catch (IoTDBConnectionException e) { + LOG.info(e.getMessage()); + } + } + + /** + * 从RecordReceiver中读取数据,写入目标数据源。 + * @param lineReceiver 数据来自Reader和Writer之间的缓存队列。 + */ + @Override + public void startWrite(RecordReceiver lineReceiver) { + // 往一个设备device中插入数据 + Record record = null; + try{ + // 构建List,一个设备批量写入 + String device = taskConf.getString(Key.DEVICE); + List timestamps = new ArrayList<>(); + List> measurementsList = new ArrayList<>(); + List> valuesList = new ArrayList<>(); + List> typesList = new ArrayList<>(); + + // 获取Record记录,传输结束返回null + int count; // 统计插入记录数 + for (count = 0; (record = lineReceiver.getFromReader()) != null; count++) { + // 处理时间列 + timestamps.add(record.getColumn(timeColumnPosition).asLong()); + // 处理测点 + List measurements = taskConf.getList(Key.MEASUREMENTS, String.class); + measurementsList.add(measurements); + // 处理类型和值 + List types = new ArrayList<>(); + List values = new ArrayList<>(); + try{ + for (int i = 0; i < record.getColumnNumber(); i++) { + if (i == timeColumnPosition){ + continue; // 跳过时间列 + } + Column col = record.getColumn(i); + switch (col.getType()) { + case BOOL: + types.add(TSDataType.BOOLEAN); + values.add(col.asBoolean()); + break; + case INT: + types.add(TSDataType.INT32); + values.add((Integer) col.getRawData()); + break; + case LONG: + types.add(TSDataType.INT64); + values.add(col.asLong()); + break; + case DOUBLE: + types.add(TSDataType.DOUBLE); + values.add(col.asDouble()); + break; + case NULL: + // IoTDB可以处理null + types.add(null); + values.add(null); + break; + case STRING: + types.add(TSDataType.STRING); + values.add(col.asString()); + break; + case DATE: + types.add(TSDataType.DATE); + values.add(col.asDate()); + break; + case BAD: + default: + throw new RuntimeException("unsupported type:" + col.getType()); + } + } + typesList.add(types); + valuesList.add(values); + }catch (RuntimeException e){ + LOG.info(e.getMessage()); + taskPluginCollector.collectDirtyRecord(record, e); + } + + if (count != 0 && count % insertBatchSize == 0) { + session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList); + timestamps.clear(); + measurementsList.clear(); + typesList.clear(); + valuesList.clear(); + } + } + if (!timestamps.isEmpty()){ + session.insertRecordsOfOneDevice(device, timestamps, measurementsList, typesList, valuesList); + } + LOG.info("========= task all data inserted, total record: " + (count-1)); + }catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriterErrorCode.java b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriterErrorCode.java new file mode 100644 index 00000000..cfedb8fc --- /dev/null +++ b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/IoTDBWriterErrorCode.java @@ -0,0 +1,31 @@ +package com.alibaba.datax.plugin.writer.iotdbwriter; + +import com.alibaba.datax.common.spi.ErrorCode; + +public enum IoTDBWriterErrorCode implements ErrorCode { + + REQUIRED_VALUE("IoTDBWriter-00", "parameter value is missing"); + + private final String code; + private final String description; + + IoTDBWriterErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s].", this.code, this.description); + } +} diff --git a/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/Key.java b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/Key.java new file mode 100644 index 00000000..552690f7 --- /dev/null +++ b/iotdbwriter/src/main/java/com/alibaba/datax/plugin/writer/iotdbwriter/Key.java @@ -0,0 +1,14 @@ +package com.alibaba.datax.plugin.writer.iotdbwriter; + +public class Key { + public static final String USERNAME = "username"; + public static final String PASSWORD = "password"; + public static final String HOST = "host"; + public static final String PORT = "port"; + public static final String VERSION = "version"; + public static final String TIME_COLUMN_POSITION = "timeColumnPosition"; + public static final String DEVICE = "device"; + public static final String MEASUREMENTS = "measurements"; + public static final String PRE_SQL = "preSql"; + public static final String BATCH_SIZE = "batchSize"; +} diff --git a/iotdbwriter/src/main/resources/plugin.json b/iotdbwriter/src/main/resources/plugin.json new file mode 100644 index 00000000..9d034718 --- /dev/null +++ b/iotdbwriter/src/main/resources/plugin.json @@ -0,0 +1,9 @@ +{ + "name": "iotdbwriter", + "class": "com.alibaba.datax.plugin.writer.iotdbwriter.IoTDBWriter", + "description": { + "useScene": "data migration to iotdb", + "mechanism": "use iotdb-java-session to write data." + }, + "developer": "timecho.com" +} \ No newline at end of file diff --git a/iotdbwriter/src/main/resources/plugin_job_template.json b/iotdbwriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..796cd135 --- /dev/null +++ b/iotdbwriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,19 @@ +{ + "name": "iotdbwriter", + "parameter": { + "username": "root", + "password": "root", + "host": "192.168.150.100", + "port": 6667, + "version": "V_1_0", + "##": "注意是Reader插件读取到的数据中时间列的位置,不是该插件,默认0列", + "timeColumnPosition": 0, + "batchSize": 1000, + "device": "root.cgn.device", + "measurements": ["A5STD","L2RIS014MD","L2VVP003SM5","D1RIS001MD","D1KRT003EU"], + "###": "在插入之前,执行删除操作,为空或不配置表示不执行", + "preSql": [ + "delete timeseries root.cgn.device.**" + ] + } +} \ No newline at end of file diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java index f3180402..29f02e46 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java @@ -269,6 +269,11 @@ public class CommonRdbmsReader { case Types.SMALLINT: case Types.TINYINT: + // 将mysql中的tinyint(1)转为boolean类型 + if (metaData.getPrecision(i) <= 3){ + record.addColumn(new BoolColumn(rs.getBoolean(i))); + break; + } case Types.INTEGER: case Types.BIGINT: record.addColumn(new LongColumn(rs.getString(i))); diff --git a/pom.xml b/pom.xml index 1b364a75..cf09819f 100644 --- a/pom.xml +++ b/pom.xml @@ -74,6 +74,7 @@ clickhousereader mongodbreader + iotdbreader tdenginereader gdbreader tsdbreader @@ -116,6 +117,7 @@ elasticsearchwriter mongodbwriter tdenginewriter + iotdbwriter ocswriter tsdbwriter gdbwriter