diff --git a/core/src/main/job/mysql2tdengine.json b/core/src/main/job/mysql2tdengine.json new file mode 100644 index 00000000..530ee2b5 --- /dev/null +++ b/core/src/main/job/mysql2tdengine.json @@ -0,0 +1,46 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "username": "root", + "password": "root", + "column": [ + "id", + "name" + ], + "splitPk": "db_id", + "connection": [ + { + "table": [ + "test" + ], + "jdbcUrl": [ + "jdbc:mysql://127.0.0.1:3306/database" + ] + } + ] + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "host": "192.168.56.105", + "port": 6030, + "dbname": "test", + "user": "root", + "password": "taosdata", + "batchSize": 1000 + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/core/src/main/job/tdengine2tdengine.json b/core/src/main/job/tdengine2tdengine.json new file mode 100644 index 00000000..750ae202 --- /dev/null +++ b/core/src/main/job/tdengine2tdengine.json @@ -0,0 +1,38 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "tdenginereader", + "parameter": { + "host": "192.168.1.82", + "port": 6030, + "db": "test", + "user": "root", + "password": "taosdata", + "sql": "select * from weather", + "beginDateTime": "2021-01-01 00:00:00", + "endDateTime": "2021-01-02 00:00:00", + "splitInterval": "1h" + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "host": "192.168.56.105", + "port": 6030, + "dbname": "test", + "user": "root", + "password": "taosdata", + "batchSize": 1000 + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/package.xml b/package.xml index a93e945c..1b291a99 100755 --- a/package.xml +++ b/package.xml @@ -103,13 +103,13 @@ datax - - otsstreamreader/target/datax/ - - **/*.* - - datax - + + otsstreamreader/target/datax/ + + **/*.* + + datax + txtfilereader/target/datax/ @@ -138,7 +138,7 @@ datax - + ftpreader/target/datax/ **/*.* @@ -180,6 +180,13 @@ datax + + tdenginereader/target/datax/ + + **/*.* + + datax + @@ -238,13 +245,6 @@ datax - - tdenginewriter/target/datax/ - - **/*.* - - datax - otswriter/target/datax/ @@ -259,7 +259,7 @@ datax - + oraclewriter/target/datax/ **/*.* @@ -273,7 +273,7 @@ datax - + postgresqlwriter/target/datax/ **/*.* @@ -399,5 +399,12 @@ datax + + tdenginewriter/target/datax/ + + **/*.* + + datax + diff --git a/pom.xml b/pom.xml index cb635ad3..2358e212 100644 --- a/pom.xml +++ b/pom.xml @@ -47,7 +47,7 @@ transformer - + mysqlreader @@ -109,6 +109,7 @@ tdenginewriter + tdenginereader diff --git a/tdenginereader/doc/tdenginereader.md b/tdenginereader/doc/tdenginereader.md new file mode 100644 index 00000000..284b8e6d --- /dev/null +++ b/tdenginereader/doc/tdenginereader.md @@ -0,0 +1,145 @@ +# DataX TDengineReader + +## 1 快速介绍 + +TDengineReader 插件实现了 TDengine 读取数据的功能。 + +## 2 实现原理 + +TDengineReader 通过TDengine的JDBC driver查询获取数据。 + +## 3 功能说明 + +### 3.1 配置样例 + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "tdenginereader", + "parameter": { + "host": "192.168.1.82", + "port": 6030, + "db": "test", + "user": "root", + "password": "taosdata", + "sql": "select * from weather", + "beginDateTime": "2021-01-01 00:00:00", + "endDateTime": "2021-01-02 00:00:00", + "splitInterval": "1h" + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "encoding": "UTF-8", + "print": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} +``` + +### 3.2 参数说明 + +* **host** + * 描述:TDengine实例的host。 + * 必选:是
+ * 默认值:无
+* **port** + * 描述:TDengine实例的port。 + * 必选:是
+ * 默认值:无
+* **dbname** + * 描述:目的数据库的名称。 + * 必选:是
+ * 默认值:无
+* **username** + * 描述:TDengine实例的用户名
+ * 必选:是
+ * 默认值:无
+* **password** + * 描述:TDengine实例的密码
+ * 必选:是
+ * 默认值:无
+* **sql** + * 描述:用来筛选迁移数据的sql
+ * 必选:是
+ * 默认值:无
+* **beginDateTime** + * 描述:TDengine实例的密码
+ * 必选:是
+ * 默认值:无
+* **endDateTime** + * 描述:
+ * 必选:是
+ * 默认值:无
+* **splitInterval** + * 描述:按照splitInterval来划分task, 每splitInterval创建一个task
+ * 必选:否
+ * 默认值:1h
+ +### 3.3 类型转换 + + +## 4 性能报告 + +### 4.1 环境准备 + +#### 4.1.1 数据特征 + +建表语句: + +单行记录类似于: + +#### 4.1.2 机器参数 + +* 执行DataX的机器参数为: + 1. cpu: + 2. mem: + 3. net: 千兆双网卡 + 4. disc: DataX 数据不落磁盘,不统计此项 + +* TDengine数据库机器参数为: + 1. cpu: + 2. mem: + 3. net: 千兆双网卡 + 4. disc: + +#### 4.1.3 DataX jvm 参数 + + -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError + +### 4.2 测试报告 + +#### 4.2.1 单表测试报告 + +| 通道数| DataX速度(Rec/s)|DataX流量(MB/s)| DataX机器网卡流出流量(MB/s)|DataX机器运行负载|DB网卡进入流量(MB/s)|DB运行负载|DB TPS| +|--------| --------|--------|--------|--------|--------|--------|--------| +|1| | | | | | | | +|4| | | | | | | | +|8| | | | | | | | +|16| | | | | | | | +|32| | | | | | | | + +说明: +1. 这里的单表,主键类型为 bigint(20),自增。 +2. batchSize 和 通道个数,对性能影响较大。 + +#### 4.2.4 性能测试小结 + +1. +2. + +## 5 约束限制 + +## FAQ \ No newline at end of file diff --git a/tdenginereader/pom.xml b/tdenginereader/pom.xml new file mode 100644 index 00000000..66c64eaf --- /dev/null +++ b/tdenginereader/pom.xml @@ -0,0 +1,90 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + tdenginereader + + + 8 + 8 + + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + + junit + junit + ${junit-version} + test + + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.12.4 + + + + **/*Test.java + + + + + true + + + + + + + \ No newline at end of file diff --git a/tdenginereader/src/main/assembly/package.xml b/tdenginereader/src/main/assembly/package.xml new file mode 100755 index 00000000..b52f20fb --- /dev/null +++ b/tdenginereader/src/main/assembly/package.xml @@ -0,0 +1,34 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/reader/tdenginereader + + + target/ + + tdenginereader-0.0.1-SNAPSHOT.jar + + plugin/reader/tdenginereader + + + + + + false + plugin/reader/tdenginereader/libs + runtime + + + diff --git a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java new file mode 100644 index 00000000..cec88eda --- /dev/null +++ b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java @@ -0,0 +1,47 @@ +package com.alibaba.datax.plugin.reader; + +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; + +import java.util.List; + +public class TDengineReader extends Reader { + + public static class Job extends Reader.Job { + + @Override + public void init() { + + } + + @Override + public void destroy() { + + } + + @Override + public List split(int adviceNumber) { + return null; + } + } + + public static class Task extends Reader.Task { + + @Override + public void init() { + + } + + @Override + public void destroy() { + + } + + @Override + public void startRead(RecordSender recordSender) { + + } + } + +} diff --git a/tdenginereader/src/main/resources/plugin.json b/tdenginereader/src/main/resources/plugin.json new file mode 100755 index 00000000..dc91982c --- /dev/null +++ b/tdenginereader/src/main/resources/plugin.json @@ -0,0 +1,9 @@ +{ + "name": "tdenginereader", + "class": "com.alibaba.datax.plugin.reader.TDengineReader", + "description": { + "useScene": "data migration from tdengine", + "mechanism": "use JNI to read data from tdengine." + }, + "developer": "zyyang-taosdata" +} \ No newline at end of file diff --git a/tdenginereader/src/main/resources/plugin_job_template.json b/tdenginereader/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..3e09dffc --- /dev/null +++ b/tdenginereader/src/main/resources/plugin_job_template.json @@ -0,0 +1,14 @@ +{ + "name": "tdenginereader", + "parameter": { + "host": "127.0.0.1", + "port": 6030, + "db": "test", + "user": "root", + "password": "taosdata", + "sql": "select * from weather", + "beginDateTime": "2021-01-01 00:00:00", + "endDateTime": "2021-01-02 00:00:00", + "splitInterval": "1h" + } +} \ No newline at end of file diff --git a/tdenginewriter/pom.xml b/tdenginewriter/pom.xml index 2c294b8f..d658d4a2 100644 --- a/tdenginewriter/pom.xml +++ b/tdenginewriter/pom.xml @@ -11,7 +11,7 @@ com.alibaba.datax.tdenginewriter tdenginewriter - 1.0.0 + 0.0.1-SNAPSHOT 8 diff --git a/tdenginewriter/src/main/assembly/package.xml b/tdenginewriter/src/main/assembly/package.xml index f7a7d0bf..d3b75ea2 100755 --- a/tdenginewriter/src/main/assembly/package.xml +++ b/tdenginewriter/src/main/assembly/package.xml @@ -18,7 +18,7 @@ target/ - tdenginewriter-1.0.0.jar + tdenginewriter-0.0.1-SNAPSHOT.jar plugin/writer/tdenginewriter diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java index b1670633..3ce786e5 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java @@ -51,11 +51,15 @@ public class JniConnection { if (this.conn == JNI_NULL_POINTER) { throw new Exception("JNI connection is NULL"); } - long code = insertOpentsdbJson(json, this.conn); - if (code != JNI_SUCCESSFUL) { - String errMsg = getErrMsgByCode(code); + + long result = insertOpentsdbJson(json, this.conn); + int errCode = getErrCodeImp(this.conn, result); + if (errCode != JNI_SUCCESSFUL) { + String errMsg = getErrMsgImp(result); + freeResultSetImp(this.conn, result); throw new Exception(errMsg); } + freeResultSetImp(this.conn, result); } public void close() throws Exception { @@ -70,19 +74,13 @@ public class JniConnection { private static native int setOptions(int optionIndex, String optionValue); - private static native String getTsCharset(); - private native long connectImp(String host, int port, String dbName, String user, String password); - private native long executeQueryImp(byte[] sqlBytes, long connection); - private native int getErrCodeImp(long connection, long pSql); private native String getErrMsgImp(long pSql); - private native String getErrMsgByCode(long code); - - private native int getAffectedRowsImp(long connection, long pSql); + private native void freeResultSetImp(long connection, long pSql); private native int closeConnectionImp(long connection);