mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-03 01:30:50 +08:00
Merge pull request #4 from taosdata/feature/TD-10725
[TD-10725]<feature>: add tdenginereader
This commit is contained in:
commit
bb0d715eae
46
core/src/main/job/mysql2tdengine.json
Normal file
46
core/src/main/job/mysql2tdengine.json
Normal file
@ -0,0 +1,46 @@
|
||||
{
|
||||
"job": {
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "mysqlreader",
|
||||
"parameter": {
|
||||
"username": "root",
|
||||
"password": "root",
|
||||
"column": [
|
||||
"id",
|
||||
"name"
|
||||
],
|
||||
"splitPk": "db_id",
|
||||
"connection": [
|
||||
{
|
||||
"table": [
|
||||
"test"
|
||||
],
|
||||
"jdbcUrl": [
|
||||
"jdbc:mysql://127.0.0.1:3306/database"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "tdenginewriter",
|
||||
"parameter": {
|
||||
"host": "192.168.56.105",
|
||||
"port": 6030,
|
||||
"dbname": "test",
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"batchSize": 1000
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
38
core/src/main/job/tdengine2tdengine.json
Normal file
38
core/src/main/job/tdengine2tdengine.json
Normal file
@ -0,0 +1,38 @@
|
||||
{
|
||||
"job": {
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "tdenginereader",
|
||||
"parameter": {
|
||||
"host": "192.168.1.82",
|
||||
"port": 6030,
|
||||
"db": "test",
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"sql": "select * from weather",
|
||||
"beginDateTime": "2021-01-01 00:00:00",
|
||||
"endDateTime": "2021-01-02 00:00:00",
|
||||
"splitInterval": "1h"
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "tdenginewriter",
|
||||
"parameter": {
|
||||
"host": "192.168.56.105",
|
||||
"port": 6030,
|
||||
"dbname": "test",
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"batchSize": 1000
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
41
package.xml
41
package.xml
@ -103,13 +103,13 @@
|
||||
</includes>
|
||||
<outputDirectory>datax</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>otsstreamreader/target/datax/</directory>
|
||||
<includes>
|
||||
<include>**/*.*</include>
|
||||
</includes>
|
||||
<outputDirectory>datax</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>otsstreamreader/target/datax/</directory>
|
||||
<includes>
|
||||
<include>**/*.*</include>
|
||||
</includes>
|
||||
<outputDirectory>datax</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>txtfilereader/target/datax/</directory>
|
||||
<includes>
|
||||
@ -138,7 +138,7 @@
|
||||
</includes>
|
||||
<outputDirectory>datax</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<fileSet>
|
||||
<directory>ftpreader/target/datax/</directory>
|
||||
<includes>
|
||||
<include>**/*.*</include>
|
||||
@ -180,6 +180,13 @@
|
||||
</includes>
|
||||
<outputDirectory>datax</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>tdenginereader/target/datax/</directory>
|
||||
<includes>
|
||||
<include>**/*.*</include>
|
||||
</includes>
|
||||
<outputDirectory>datax</outputDirectory>
|
||||
</fileSet>
|
||||
|
||||
<!-- writer -->
|
||||
<fileSet>
|
||||
@ -238,13 +245,6 @@
|
||||
</includes>
|
||||
<outputDirectory>datax</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>tdenginewriter/target/datax/</directory>
|
||||
<includes>
|
||||
<include>**/*.*</include>
|
||||
</includes>
|
||||
<outputDirectory>datax</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>otswriter/target/datax/</directory>
|
||||
<includes>
|
||||
@ -259,7 +259,7 @@
|
||||
</includes>
|
||||
<outputDirectory>datax</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<fileSet>
|
||||
<directory>oraclewriter/target/datax/</directory>
|
||||
<includes>
|
||||
<include>**/*.*</include>
|
||||
@ -273,7 +273,7 @@
|
||||
</includes>
|
||||
<outputDirectory>datax</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<fileSet>
|
||||
<directory>postgresqlwriter/target/datax/</directory>
|
||||
<includes>
|
||||
<include>**/*.*</include>
|
||||
@ -399,5 +399,12 @@
|
||||
</includes>
|
||||
<outputDirectory>datax</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>tdenginewriter/target/datax/</directory>
|
||||
<includes>
|
||||
<include>**/*.*</include>
|
||||
</includes>
|
||||
<outputDirectory>datax</outputDirectory>
|
||||
</fileSet>
|
||||
</fileSets>
|
||||
</assembly>
|
||||
|
3
pom.xml
3
pom.xml
@ -47,7 +47,7 @@
|
||||
<module>transformer</module>
|
||||
|
||||
<!-- reader -->
|
||||
<!-- <module>mysqlreader</module>-->
|
||||
<module>mysqlreader</module>
|
||||
<!-- <module>drdsreader</module>-->
|
||||
<!-- <module>sqlserverreader</module>-->
|
||||
<!-- <module>postgresqlreader</module>-->
|
||||
@ -109,6 +109,7 @@
|
||||
<!-- <module>hbase20xsqlwriter</module>-->
|
||||
<!-- <module>kuduwriter</module>-->
|
||||
<module>tdenginewriter</module>
|
||||
<module>tdenginereader</module>
|
||||
</modules>
|
||||
|
||||
<dependencyManagement>
|
||||
|
145
tdenginereader/doc/tdenginereader.md
Normal file
145
tdenginereader/doc/tdenginereader.md
Normal file
@ -0,0 +1,145 @@
|
||||
# DataX TDengineReader
|
||||
|
||||
## 1 快速介绍
|
||||
|
||||
TDengineReader 插件实现了 TDengine 读取数据的功能。
|
||||
|
||||
## 2 实现原理
|
||||
|
||||
TDengineReader 通过TDengine的JDBC driver查询获取数据。
|
||||
|
||||
## 3 功能说明
|
||||
|
||||
### 3.1 配置样例
|
||||
|
||||
```json
|
||||
{
|
||||
"job": {
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "tdenginereader",
|
||||
"parameter": {
|
||||
"host": "192.168.1.82",
|
||||
"port": 6030,
|
||||
"db": "test",
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"sql": "select * from weather",
|
||||
"beginDateTime": "2021-01-01 00:00:00",
|
||||
"endDateTime": "2021-01-02 00:00:00",
|
||||
"splitInterval": "1h"
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "streamwriter",
|
||||
"parameter": {
|
||||
"encoding": "UTF-8",
|
||||
"print": true
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 3.2 参数说明
|
||||
|
||||
* **host**
|
||||
* 描述:TDengine实例的host。
|
||||
* 必选:是 <br />
|
||||
* 默认值:无 <br />
|
||||
* **port**
|
||||
* 描述:TDengine实例的port。
|
||||
* 必选:是 <br />
|
||||
* 默认值:无 <br />
|
||||
* **dbname**
|
||||
* 描述:目的数据库的名称。
|
||||
* 必选:是 <br />
|
||||
* 默认值:无 <br />
|
||||
* **username**
|
||||
* 描述:TDengine实例的用户名 <br />
|
||||
* 必选:是 <br />
|
||||
* 默认值:无 <br />
|
||||
* **password**
|
||||
* 描述:TDengine实例的密码 <br />
|
||||
* 必选:是 <br />
|
||||
* 默认值:无 <br />
|
||||
* **sql**
|
||||
* 描述:用来筛选迁移数据的sql <br />
|
||||
* 必选:是 <br />
|
||||
* 默认值:无 <br />
|
||||
* **beginDateTime**
|
||||
* 描述:TDengine实例的密码 <br />
|
||||
* 必选:是 <br />
|
||||
* 默认值:无 <br />
|
||||
* **endDateTime**
|
||||
* 描述: <br />
|
||||
* 必选:是 <br />
|
||||
* 默认值:无 <br />
|
||||
* **splitInterval**
|
||||
* 描述:按照splitInterval来划分task, 每splitInterval创建一个task <br />
|
||||
* 必选:否 <br />
|
||||
* 默认值:1h <br />
|
||||
|
||||
### 3.3 类型转换
|
||||
|
||||
|
||||
## 4 性能报告
|
||||
|
||||
### 4.1 环境准备
|
||||
|
||||
#### 4.1.1 数据特征
|
||||
|
||||
建表语句:
|
||||
|
||||
单行记录类似于:
|
||||
|
||||
#### 4.1.2 机器参数
|
||||
|
||||
* 执行DataX的机器参数为:
|
||||
1. cpu:
|
||||
2. mem:
|
||||
3. net: 千兆双网卡
|
||||
4. disc: DataX 数据不落磁盘,不统计此项
|
||||
|
||||
* TDengine数据库机器参数为:
|
||||
1. cpu:
|
||||
2. mem:
|
||||
3. net: 千兆双网卡
|
||||
4. disc:
|
||||
|
||||
#### 4.1.3 DataX jvm 参数
|
||||
|
||||
-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
|
||||
|
||||
### 4.2 测试报告
|
||||
|
||||
#### 4.2.1 单表测试报告
|
||||
|
||||
| 通道数| DataX速度(Rec/s)|DataX流量(MB/s)| DataX机器网卡流出流量(MB/s)|DataX机器运行负载|DB网卡进入流量(MB/s)|DB运行负载|DB TPS|
|
||||
|--------| --------|--------|--------|--------|--------|--------|--------|
|
||||
|1| | | | | | | |
|
||||
|4| | | | | | | |
|
||||
|8| | | | | | | |
|
||||
|16| | | | | | | |
|
||||
|32| | | | | | | |
|
||||
|
||||
说明:
|
||||
1. 这里的单表,主键类型为 bigint(20),自增。
|
||||
2. batchSize 和 通道个数,对性能影响较大。
|
||||
|
||||
#### 4.2.4 性能测试小结
|
||||
|
||||
1.
|
||||
2.
|
||||
|
||||
## 5 约束限制
|
||||
|
||||
## FAQ
|
90
tdenginereader/pom.xml
Normal file
90
tdenginereader/pom.xml
Normal file
@ -0,0 +1,90 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>datax-all</artifactId>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>tdenginereader</artifactId>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>8</maven.compiler.source>
|
||||
<maven.compiler.target>8</maven.compiler.target>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>datax-common</artifactId>
|
||||
<version>${datax-project-version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>${junit-version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<!-- compiler plugin -->
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<source>${jdk-version}</source>
|
||||
<target>${jdk-version}</target>
|
||||
<encoding>${project-sourceEncoding}</encoding>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<configuration>
|
||||
<descriptors>
|
||||
<descriptor>src/main/assembly/package.xml</descriptor>
|
||||
</descriptors>
|
||||
<finalName>datax</finalName>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>dwzip</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>2.12.4</version>
|
||||
<configuration>
|
||||
<!-- 包含哪些测试用例 -->
|
||||
<includes>
|
||||
<include>**/*Test.java</include>
|
||||
</includes>
|
||||
<!-- 不包含哪些测试用例 -->
|
||||
<excludes>
|
||||
</excludes>
|
||||
<testFailureIgnore>true</testFailureIgnore>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
34
tdenginereader/src/main/assembly/package.xml
Executable file
34
tdenginereader/src/main/assembly/package.xml
Executable file
@ -0,0 +1,34 @@
|
||||
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
|
||||
<id></id>
|
||||
<formats>
|
||||
<format>dir</format>
|
||||
</formats>
|
||||
<includeBaseDirectory>false</includeBaseDirectory>
|
||||
<fileSets>
|
||||
<fileSet>
|
||||
<directory>src/main/resources</directory>
|
||||
<includes>
|
||||
<include>plugin.json</include>
|
||||
<include>plugin_job_template.json</include>
|
||||
</includes>
|
||||
<outputDirectory>plugin/reader/tdenginereader</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>target/</directory>
|
||||
<includes>
|
||||
<include>tdenginereader-0.0.1-SNAPSHOT.jar</include>
|
||||
</includes>
|
||||
<outputDirectory>plugin/reader/tdenginereader</outputDirectory>
|
||||
</fileSet>
|
||||
</fileSets>
|
||||
|
||||
<dependencySets>
|
||||
<dependencySet>
|
||||
<useProjectArtifact>false</useProjectArtifact>
|
||||
<outputDirectory>plugin/reader/tdenginereader/libs</outputDirectory>
|
||||
<scope>runtime</scope>
|
||||
</dependencySet>
|
||||
</dependencySets>
|
||||
</assembly>
|
@ -0,0 +1,47 @@
|
||||
package com.alibaba.datax.plugin.reader;
|
||||
|
||||
import com.alibaba.datax.common.plugin.RecordSender;
|
||||
import com.alibaba.datax.common.spi.Reader;
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class TDengineReader extends Reader {
|
||||
|
||||
public static class Job extends Reader.Job {
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Configuration> split(int adviceNumber) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static class Task extends Reader.Task {
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startRead(RecordSender recordSender) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
9
tdenginereader/src/main/resources/plugin.json
Executable file
9
tdenginereader/src/main/resources/plugin.json
Executable file
@ -0,0 +1,9 @@
|
||||
{
|
||||
"name": "tdenginereader",
|
||||
"class": "com.alibaba.datax.plugin.reader.TDengineReader",
|
||||
"description": {
|
||||
"useScene": "data migration from tdengine",
|
||||
"mechanism": "use JNI to read data from tdengine."
|
||||
},
|
||||
"developer": "zyyang-taosdata"
|
||||
}
|
14
tdenginereader/src/main/resources/plugin_job_template.json
Normal file
14
tdenginereader/src/main/resources/plugin_job_template.json
Normal file
@ -0,0 +1,14 @@
|
||||
{
|
||||
"name": "tdenginereader",
|
||||
"parameter": {
|
||||
"host": "127.0.0.1",
|
||||
"port": 6030,
|
||||
"db": "test",
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"sql": "select * from weather",
|
||||
"beginDateTime": "2021-01-01 00:00:00",
|
||||
"endDateTime": "2021-01-02 00:00:00",
|
||||
"splitInterval": "1h"
|
||||
}
|
||||
}
|
@ -11,7 +11,7 @@
|
||||
|
||||
<groupId>com.alibaba.datax.tdenginewriter</groupId>
|
||||
<artifactId>tdenginewriter</artifactId>
|
||||
<version>1.0.0</version>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>8</maven.compiler.source>
|
||||
|
@ -18,7 +18,7 @@
|
||||
<fileSet>
|
||||
<directory>target/</directory>
|
||||
<includes>
|
||||
<include>tdenginewriter-1.0.0.jar</include>
|
||||
<include>tdenginewriter-0.0.1-SNAPSHOT.jar</include>
|
||||
</includes>
|
||||
<outputDirectory>plugin/writer/tdenginewriter</outputDirectory>
|
||||
</fileSet>
|
||||
|
@ -51,11 +51,15 @@ public class JniConnection {
|
||||
if (this.conn == JNI_NULL_POINTER) {
|
||||
throw new Exception("JNI connection is NULL");
|
||||
}
|
||||
long code = insertOpentsdbJson(json, this.conn);
|
||||
if (code != JNI_SUCCESSFUL) {
|
||||
String errMsg = getErrMsgByCode(code);
|
||||
|
||||
long result = insertOpentsdbJson(json, this.conn);
|
||||
int errCode = getErrCodeImp(this.conn, result);
|
||||
if (errCode != JNI_SUCCESSFUL) {
|
||||
String errMsg = getErrMsgImp(result);
|
||||
freeResultSetImp(this.conn, result);
|
||||
throw new Exception(errMsg);
|
||||
}
|
||||
freeResultSetImp(this.conn, result);
|
||||
}
|
||||
|
||||
public void close() throws Exception {
|
||||
@ -70,19 +74,13 @@ public class JniConnection {
|
||||
|
||||
private static native int setOptions(int optionIndex, String optionValue);
|
||||
|
||||
private static native String getTsCharset();
|
||||
|
||||
private native long connectImp(String host, int port, String dbName, String user, String password);
|
||||
|
||||
private native long executeQueryImp(byte[] sqlBytes, long connection);
|
||||
|
||||
private native int getErrCodeImp(long connection, long pSql);
|
||||
|
||||
private native String getErrMsgImp(long pSql);
|
||||
|
||||
private native String getErrMsgByCode(long code);
|
||||
|
||||
private native int getAffectedRowsImp(long connection, long pSql);
|
||||
private native void freeResultSetImp(long connection, long pSql);
|
||||
|
||||
private native int closeConnectionImp(long connection);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user