diff --git a/core/src/main/job/mysql2tdengine.json b/core/src/main/job/mysql2tdengine.json
new file mode 100644
index 00000000..530ee2b5
--- /dev/null
+++ b/core/src/main/job/mysql2tdengine.json
@@ -0,0 +1,46 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "mysqlreader",
+ "parameter": {
+ "username": "root",
+ "password": "root",
+ "column": [
+ "id",
+ "name"
+ ],
+ "splitPk": "db_id",
+ "connection": [
+ {
+ "table": [
+ "test"
+ ],
+ "jdbcUrl": [
+ "jdbc:mysql://127.0.0.1:3306/database"
+ ]
+ }
+ ]
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "host": "192.168.56.105",
+ "port": 6030,
+ "dbname": "test",
+ "user": "root",
+ "password": "taosdata",
+ "batchSize": 1000
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/job/tdengine2tdengine.json b/core/src/main/job/tdengine2tdengine.json
new file mode 100644
index 00000000..750ae202
--- /dev/null
+++ b/core/src/main/job/tdengine2tdengine.json
@@ -0,0 +1,38 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "tdenginereader",
+ "parameter": {
+ "host": "192.168.1.82",
+ "port": 6030,
+ "db": "test",
+ "user": "root",
+ "password": "taosdata",
+ "sql": "select * from weather",
+ "beginDateTime": "2021-01-01 00:00:00",
+ "endDateTime": "2021-01-02 00:00:00",
+ "splitInterval": "1h"
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "host": "192.168.56.105",
+ "port": 6030,
+ "dbname": "test",
+ "user": "root",
+ "password": "taosdata",
+ "batchSize": 1000
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/package.xml b/package.xml
index a93e945c..1b291a99 100755
--- a/package.xml
+++ b/package.xml
@@ -103,13 +103,13 @@
datax
-
- otsstreamreader/target/datax/
-
- **/*.*
-
- datax
-
+
+ otsstreamreader/target/datax/
+
+ **/*.*
+
+ datax
+
txtfilereader/target/datax/
@@ -138,7 +138,7 @@
datax
-
+
ftpreader/target/datax/
**/*.*
@@ -180,6 +180,13 @@
datax
+
+ tdenginereader/target/datax/
+
+ **/*.*
+
+ datax
+
@@ -238,13 +245,6 @@
datax
-
- tdenginewriter/target/datax/
-
- **/*.*
-
- datax
-
otswriter/target/datax/
@@ -259,7 +259,7 @@
datax
-
+
oraclewriter/target/datax/
**/*.*
@@ -273,7 +273,7 @@
datax
-
+
postgresqlwriter/target/datax/
**/*.*
@@ -399,5 +399,12 @@
datax
+
+ tdenginewriter/target/datax/
+
+ **/*.*
+
+ datax
+
diff --git a/pom.xml b/pom.xml
index cb635ad3..2358e212 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,7 +47,7 @@
transformer
-
+ mysqlreader
@@ -109,6 +109,7 @@
tdenginewriter
+ tdenginereader
diff --git a/tdenginereader/doc/tdenginereader.md b/tdenginereader/doc/tdenginereader.md
new file mode 100644
index 00000000..284b8e6d
--- /dev/null
+++ b/tdenginereader/doc/tdenginereader.md
@@ -0,0 +1,145 @@
+# DataX TDengineReader
+
+## 1 快速介绍
+
+TDengineReader 插件实现了 TDengine 读取数据的功能。
+
+## 2 实现原理
+
+TDengineReader 通过TDengine的JDBC driver查询获取数据。
+
+## 3 功能说明
+
+### 3.1 配置样例
+
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "tdenginereader",
+ "parameter": {
+ "host": "192.168.1.82",
+ "port": 6030,
+ "db": "test",
+ "user": "root",
+ "password": "taosdata",
+ "sql": "select * from weather",
+ "beginDateTime": "2021-01-01 00:00:00",
+ "endDateTime": "2021-01-02 00:00:00",
+ "splitInterval": "1h"
+ }
+ },
+ "writer": {
+ "name": "streamwriter",
+ "parameter": {
+ "encoding": "UTF-8",
+ "print": true
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
+```
+
+### 3.2 参数说明
+
+* **host**
+ * 描述:TDengine实例的host。
+ * 必选:是
+ * 默认值:无
+* **port**
+ * 描述:TDengine实例的port。
+ * 必选:是
+ * 默认值:无
+* **dbname**
+ * 描述:目的数据库的名称。
+ * 必选:是
+ * 默认值:无
+* **username**
+ * 描述:TDengine实例的用户名
+ * 必选:是
+ * 默认值:无
+* **password**
+ * 描述:TDengine实例的密码
+ * 必选:是
+ * 默认值:无
+* **sql**
+ * 描述:用来筛选迁移数据的sql
+ * 必选:是
+ * 默认值:无
+* **beginDateTime**
+ * 描述:TDengine实例的密码
+ * 必选:是
+ * 默认值:无
+* **endDateTime**
+ * 描述:
+ * 必选:是
+ * 默认值:无
+* **splitInterval**
+ * 描述:按照splitInterval来划分task, 每splitInterval创建一个task
+ * 必选:否
+ * 默认值:1h
+
+### 3.3 类型转换
+
+
+## 4 性能报告
+
+### 4.1 环境准备
+
+#### 4.1.1 数据特征
+
+建表语句:
+
+单行记录类似于:
+
+#### 4.1.2 机器参数
+
+* 执行DataX的机器参数为:
+ 1. cpu:
+ 2. mem:
+ 3. net: 千兆双网卡
+ 4. disc: DataX 数据不落磁盘,不统计此项
+
+* TDengine数据库机器参数为:
+ 1. cpu:
+ 2. mem:
+ 3. net: 千兆双网卡
+ 4. disc:
+
+#### 4.1.3 DataX jvm 参数
+
+ -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
+
+### 4.2 测试报告
+
+#### 4.2.1 单表测试报告
+
+| 通道数| DataX速度(Rec/s)|DataX流量(MB/s)| DataX机器网卡流出流量(MB/s)|DataX机器运行负载|DB网卡进入流量(MB/s)|DB运行负载|DB TPS|
+|--------| --------|--------|--------|--------|--------|--------|--------|
+|1| | | | | | | |
+|4| | | | | | | |
+|8| | | | | | | |
+|16| | | | | | | |
+|32| | | | | | | |
+
+说明:
+1. 这里的单表,主键类型为 bigint(20),自增。
+2. batchSize 和 通道个数,对性能影响较大。
+
+#### 4.2.4 性能测试小结
+
+1.
+2.
+
+## 5 约束限制
+
+## FAQ
\ No newline at end of file
diff --git a/tdenginereader/pom.xml b/tdenginereader/pom.xml
new file mode 100644
index 00000000..66c64eaf
--- /dev/null
+++ b/tdenginereader/pom.xml
@@ -0,0 +1,90 @@
+
+
+
+ datax-all
+ com.alibaba.datax
+ 0.0.1-SNAPSHOT
+
+ 4.0.0
+
+ tdenginereader
+
+
+ 8
+ 8
+
+
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax-project-version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+
+ junit
+ junit
+ ${junit-version}
+ test
+
+
+
+
+
+
+
+
+ maven-compiler-plugin
+
+ ${jdk-version}
+ ${jdk-version}
+ ${project-sourceEncoding}
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.12.4
+
+
+
+ **/*Test.java
+
+
+
+
+ true
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/tdenginereader/src/main/assembly/package.xml b/tdenginereader/src/main/assembly/package.xml
new file mode 100755
index 00000000..b52f20fb
--- /dev/null
+++ b/tdenginereader/src/main/assembly/package.xml
@@ -0,0 +1,34 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/reader/tdenginereader
+
+
+ target/
+
+ tdenginereader-0.0.1-SNAPSHOT.jar
+
+ plugin/reader/tdenginereader
+
+
+
+
+
+ false
+ plugin/reader/tdenginereader/libs
+ runtime
+
+
+
diff --git a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java
new file mode 100644
index 00000000..cec88eda
--- /dev/null
+++ b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java
@@ -0,0 +1,47 @@
+package com.alibaba.datax.plugin.reader;
+
+import com.alibaba.datax.common.plugin.RecordSender;
+import com.alibaba.datax.common.spi.Reader;
+import com.alibaba.datax.common.util.Configuration;
+
+import java.util.List;
+
+public class TDengineReader extends Reader {
+
+ public static class Job extends Reader.Job {
+
+ @Override
+ public void init() {
+
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+
+ @Override
+ public List split(int adviceNumber) {
+ return null;
+ }
+ }
+
+ public static class Task extends Reader.Task {
+
+ @Override
+ public void init() {
+
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+
+ @Override
+ public void startRead(RecordSender recordSender) {
+
+ }
+ }
+
+}
diff --git a/tdenginereader/src/main/resources/plugin.json b/tdenginereader/src/main/resources/plugin.json
new file mode 100755
index 00000000..dc91982c
--- /dev/null
+++ b/tdenginereader/src/main/resources/plugin.json
@@ -0,0 +1,9 @@
+{
+ "name": "tdenginereader",
+ "class": "com.alibaba.datax.plugin.reader.TDengineReader",
+ "description": {
+ "useScene": "data migration from tdengine",
+ "mechanism": "use JNI to read data from tdengine."
+ },
+ "developer": "zyyang-taosdata"
+}
\ No newline at end of file
diff --git a/tdenginereader/src/main/resources/plugin_job_template.json b/tdenginereader/src/main/resources/plugin_job_template.json
new file mode 100644
index 00000000..3e09dffc
--- /dev/null
+++ b/tdenginereader/src/main/resources/plugin_job_template.json
@@ -0,0 +1,14 @@
+{
+ "name": "tdenginereader",
+ "parameter": {
+ "host": "127.0.0.1",
+ "port": 6030,
+ "db": "test",
+ "user": "root",
+ "password": "taosdata",
+ "sql": "select * from weather",
+ "beginDateTime": "2021-01-01 00:00:00",
+ "endDateTime": "2021-01-02 00:00:00",
+ "splitInterval": "1h"
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/pom.xml b/tdenginewriter/pom.xml
index 2c294b8f..d658d4a2 100644
--- a/tdenginewriter/pom.xml
+++ b/tdenginewriter/pom.xml
@@ -11,7 +11,7 @@
com.alibaba.datax.tdenginewriter
tdenginewriter
- 1.0.0
+ 0.0.1-SNAPSHOT
8
diff --git a/tdenginewriter/src/main/assembly/package.xml b/tdenginewriter/src/main/assembly/package.xml
index f7a7d0bf..d3b75ea2 100755
--- a/tdenginewriter/src/main/assembly/package.xml
+++ b/tdenginewriter/src/main/assembly/package.xml
@@ -18,7 +18,7 @@
target/
- tdenginewriter-1.0.0.jar
+ tdenginewriter-0.0.1-SNAPSHOT.jar
plugin/writer/tdenginewriter
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java
index b1670633..3ce786e5 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/JniConnection.java
@@ -51,11 +51,15 @@ public class JniConnection {
if (this.conn == JNI_NULL_POINTER) {
throw new Exception("JNI connection is NULL");
}
- long code = insertOpentsdbJson(json, this.conn);
- if (code != JNI_SUCCESSFUL) {
- String errMsg = getErrMsgByCode(code);
+
+ long result = insertOpentsdbJson(json, this.conn);
+ int errCode = getErrCodeImp(this.conn, result);
+ if (errCode != JNI_SUCCESSFUL) {
+ String errMsg = getErrMsgImp(result);
+ freeResultSetImp(this.conn, result);
throw new Exception(errMsg);
}
+ freeResultSetImp(this.conn, result);
}
public void close() throws Exception {
@@ -70,19 +74,13 @@ public class JniConnection {
private static native int setOptions(int optionIndex, String optionValue);
- private static native String getTsCharset();
-
private native long connectImp(String host, int port, String dbName, String user, String password);
- private native long executeQueryImp(byte[] sqlBytes, long connection);
-
private native int getErrCodeImp(long connection, long pSql);
private native String getErrMsgImp(long pSql);
- private native String getErrMsgByCode(long code);
-
- private native int getAffectedRowsImp(long connection, long pSql);
+ private native void freeResultSetImp(long connection, long pSql);
private native int closeConnectionImp(long connection);