Merge pull request #7 from taosdata/feature/TD-10787

TD-10787:[datax]support migrate data from mysql to tdengine
This commit is contained in:
Shuduo Sang 2021-11-24 18:10:19 +08:00 committed by GitHub
commit 7ef6475a53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 246 additions and 50 deletions

View File

@ -60,7 +60,8 @@ DataX目前已经有了比较全面的插件体系主流的RDBMS数据库、N
| | FTP | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md)| | | FTP | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md)|
| | HDFS | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md)| | | HDFS | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md)|
| | Elasticsearch | | √ |[写](https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md)| | | Elasticsearch | | √ |[写](https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md)|
| 时间序列数据库 | OpenTSDB | √ | |[读](https://github.com/alibaba/DataX/blob/master/opentsdbreader/doc/opentsdbreader.md)| | 时间序列数据库 | TDengine | √ | √ |[读](https://github.com/taosdata/DataX/blob/master/tdenginereader/doc/tdenginereader.md) 、[写](https://github.com/taosdata/DataX/blob/master/tdenginewriter/doc/tdenginewriter.md)|
| | OpenTSDB | √ | |[读](https://github.com/alibaba/DataX/blob/master/opentsdbreader/doc/opentsdbreader.md)|
| | TSDB | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/tsdbreader/doc/tsdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/tsdbwriter/doc/tsdbhttpwriter.md)| | | TSDB | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/tsdbreader/doc/tsdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/tsdbwriter/doc/tsdbhttpwriter.md)|
# 阿里云DataWorks数据集成 # 阿里云DataWorks数据集成

View File

@ -97,6 +97,18 @@
<artifactId>groovy-all</artifactId> <artifactId>groovy-all</artifactId>
<version>2.1.9</version> <version>2.1.9</version>
</dependency> </dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.driver.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.34</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -6,19 +6,18 @@
"name": "mysqlreader", "name": "mysqlreader",
"parameter": { "parameter": {
"username": "root", "username": "root",
"password": "root", "password": "passw0rd",
"column": [ "column": [
"id", "*"
"name"
], ],
"splitPk": "db_id", "splitPk": "station",
"connection": [ "connection": [
{ {
"table": [ "table": [
"test" "weather"
], ],
"jdbcUrl": [ "jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/database" "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8"
] ]
} }
] ]
@ -27,12 +26,25 @@
"writer": { "writer": {
"name": "tdenginewriter", "name": "tdenginewriter",
"parameter": { "parameter": {
"host": "192.168.56.105", "host": "127.0.0.1",
"port": 6030, "port": 6030,
"dbname": "test", "dbname": "test",
"user": "root", "user": "root",
"password": "taosdata", "password": "taosdata",
"batchSize": 1000 "batchSize": 1000,
"stable": "weather",
"tagColumn": {
"station": 0
},
"fieldColumn": {
"latitude": 1,
"longtitude": 2,
"tmax": 4,
"tmin": 5
},
"timestampColumn":{
"date": 3
}
} }
} }
} }

View File

@ -1,31 +0,0 @@
{
"job": {
"content": [
{
"reader": {
"name": "opentsdbreader",
"parameter": {
"endpoint": "http://192.168.1.180:4242",
"column": [
"weather.temperature"
],
"beginDateTime": "2021-01-01 00:00:00",
"endDateTime": "2021-01-01 01:00:00"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -9,7 +9,7 @@ public class EngineTest {
public void test() { public void test() {
System.out.println(System.getProperty("java.library.path")); System.out.println(System.getProperty("java.library.path"));
// String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/main/job/opentsdb2tdengine.json"}; // String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/main/job/opentsdb2tdengine.json"};
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/main/job/stream2tdengine.json"}; String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/main/job/mysql2tdengine.json"};
System.setProperty("datax.home", "../target/datax/datax"); System.setProperty("datax.home", "../target/datax/datax");
try { try {
Engine.entry(params); Engine.entry(params);

View File

@ -0,0 +1,104 @@
package com.alibaba.datax.core;
import org.junit.Test;
import java.sql.*;
import java.util.Arrays;
import java.util.Calendar;
import java.util.List;
import java.util.Random;
/**
* 测试从mysql到TDengine
*/
public class TestMysql2TDengine {
@Test
public void genTestData() throws SQLException {
Connection conn;
Statement stmt = null;
PreparedStatement pstmt = null;
try {
conn = DriverManager.getConnection("jdbc:mysql://localhost/mysql?" +
"user=root&password=passw0rd");
stmt = conn.createStatement();
stmt.execute("create database if not exists test");
stmt.execute("use test");
stmt.execute("drop table weather");
stmt.execute("CREATE TABLE IF NOT EXISTS weather(station varchar(100), latitude DOUBLE, longtitude DOUBLE, `date` DATETIME, tmax INT, tmin INT)");
pstmt = conn.prepareStatement("insert into weather(station, latitude, longtitude, `date`, tmax, tmin) values (?, ?, ?, ?, ?, ?)");
genRandomData(pstmt);
} finally {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException sqlEx) {
} // ignore
stmt = null;
}
if (pstmt != null) {
pstmt.close();
pstmt = null;
}
}
}
private void genRandomData(PreparedStatement psmt) throws SQLException {
Random random = new Random();
Calendar calendar = Calendar.getInstance();
calendar.set(1990, 0, 1, 1, 0, 0);
List<String> stations = Arrays.asList("STA", "STB", "STC");
for (int i = 0; i < (10 * 100 * 24); i++) {
for (int j = 0; j < 3; j++) {
psmt.setString(1, stations.get(j));
psmt.setDouble(2, random.nextDouble() * 1000);
psmt.setDouble(3, random.nextDouble() * 1000);
psmt.setTimestamp(4, new java.sql.Timestamp(calendar.getTime().getTime()));
psmt.setInt(5, random.nextInt(100));
psmt.setInt(6, random.nextInt(100));
psmt.addBatch();
}
calendar.add(Calendar.MINUTE, 60);
if (i % 1000 == 0) {
psmt.executeBatch();
}
}
psmt.executeBatch();
}
@Test
public void prepareTDengine() throws SQLException {
Connection conn;
Statement stmt = null;
try {
conn = DriverManager.getConnection("jdbc:TAOS://127.0.0.1:6030/log?user=root&password=taosdata");
stmt = conn.createStatement();
stmt.execute("drop database if exists test");
stmt.execute("create database if not exists test keep 36500");
stmt.execute("drop stable if exists test.weather");
} finally {
if (stmt != null) {
stmt.close();
}
}
}
@Test
public void test() {
System.out.println(System.getProperty("java.library.path"));
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/main/job/mysql2tdengine.json"};
System.setProperty("datax.home", "../target/datax/datax");
try {
Engine.entry(params);
} catch (Throwable e) {
e.printStackTrace();
}
}
}

View File

@ -181,8 +181,8 @@ TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据根据rea
| fieldColumn | 字段列的列名和位置 | 否 | 无 | | | fieldColumn | 字段列的列名和位置 | 否 | 无 | |
| timestampColumn | 时间戳列的列名和位置 | 否 | 无 | 时间戳列只能有一个 | | timestampColumn | 时间戳列的列名和位置 | 否 | 无 | 时间戳列只能有一个 |
#### 3.3.3 自动建表规则 #### 3.2.3 自动建表规则
##### 3.3.3.1 超级表创建规则 ##### 3.2.3.1 超级表创建规则
如果配置了tagColumn、 fieldColumn和timestampColumn将会在插入第一条数据前自动创建超级表。<br> 如果配置了tagColumn、 fieldColumn和timestampColumn将会在插入第一条数据前自动创建超级表。<br>
数据列的类型从第1条记录自动推断, 标签列默认类型为`NCHAR(64)`, 比如示例配置,可能生成以下建表语句: 数据列的类型从第1条记录自动推断, 标签列默认类型为`NCHAR(64)`, 比如示例配置,可能生成以下建表语句:
@ -201,14 +201,14 @@ TAGS(
); );
``` ```
##### 3.3.3.2 子表创建规则 ##### 3.2.3.2 子表创建规则
子表结果与超表相同,子表表名生成规则: 子表结果与超表相同,子表表名生成规则:
1. 将标签的value 组合成为如下的字符串: `tag_value1!tag_value2!tag_value3` 1. 将标签的value 组合成为如下的字符串: `tag_value1!tag_value2!tag_value3`
2. 计算该字符串的 MD5 散列值 "md5_val"。 2. 计算该字符串的 MD5 散列值 "md5_val"。
3. "t_md5val"作为子表名。其中的 "t" 是固定的前缀。 3. "t_md5val"作为子表名。其中的 "t" 是固定的前缀。
#### 3.3.4 用户提前建表 #### 3.2.4 用户提前建表
如果你已经创建好目标超级表那么tagColumn、 fieldColumn和timestampColumn三个字段均可省略, 插件将通过执行通过`describe stableName`获取表结构的信息。 如果你已经创建好目标超级表那么tagColumn、 fieldColumn和timestampColumn三个字段均可省略, 插件将通过执行通过`describe stableName`获取表结构的信息。
此时要求接收到的Record中Column的顺序和执行`describe stableName`返回的列顺序相同, 比如通过`describe stableName`返回以下内容: 此时要求接收到的Record中Column的顺序和执行`describe stableName`返回的列顺序相同, 比如通过`describe stableName`返回以下内容:
@ -221,13 +221,13 @@ TAGS(
``` ```
那么插件收到的数据第1列必须代表时间戳第2列必须代表电流第3列必须代表位置。 那么插件收到的数据第1列必须代表时间戳第2列必须代表电流第3列必须代表位置。
#### 3.3.5 注意事项 #### 3.2.5 注意事项
1. tagColumn、 fieldColumn和timestampColumn三个字段用于描述目标表的结构信息这三个配置字段必须同时存在或同时省略。 1. tagColumn、 fieldColumn和timestampColumn三个字段用于描述目标表的结构信息这三个配置字段必须同时存在或同时省略。
2. 如果存在以上三个配置,且目标表也已经存在,则两者必须一致。**一致性**由用户自己保证,插件不做检查。不一致可能会导致插入失败或插入数据错乱。 2. 如果存在以上三个配置,且目标表也已经存在,则两者必须一致。**一致性**由用户自己保证,插件不做检查。不一致可能会导致插入失败或插入数据错乱。
3. 插件优先使用配置文件中指定的表结构。 3. 插件优先使用配置文件中指定的表结构。
#### 3.3.6 类型转换 #### 3.2.6 类型转换
| MongoDB 数据类型 | DataX 内部类型 | TDengine 数据类型 | | MongoDB 数据类型 | DataX 内部类型 | TDengine 数据类型 |
| ---------------- | -------------- | ----------------- | | ---------------- | -------------- | ----------------- |
@ -238,6 +238,84 @@ TAGS(
| boolean | Boolean | BOOL | | boolean | Boolean | BOOL |
| bytes | Bytes | BINARY | | bytes | Bytes | BINARY |
### 3.3 从关系型数据库到TDengine
writer部分的配置规则和上述MongoDB的示例是一样的这里给出一个MySQL的示例。
#### 3.3.1 MySQL中表结构
```sql
CREATE TABLE IF NOT EXISTS weather(
station varchar(100),
latitude DOUBLE,
longtitude DOUBLE,
`date` DATE,
TMAX int,
TMIN int
)
```
#### 3.3.2 配置文件示例
```json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "passw0rd",
"column": [
"*"
],
"splitPk": "station",
"connection": [
{
"table": [
"weather"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8"
]
}
]
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"host": "127.0.0.1",
"port": 6030,
"dbname": "test",
"user": "root",
"password": "taosdata",
"batchSize": 1000,
"stable": "weather",
"tagColumn": {
"station": 0
},
"fieldColumn": {
"latitude": 1,
"longtitude": 2,
"tmax": 4,
"tmin": 5
},
"timestampColumn":{
"date": 3
}
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
```
## 4 性能报告 ## 4 性能报告
@ -313,4 +391,8 @@ TDengine要求每个表第一列是时间戳列后边是普通字段最后
### 插件如何确定各列的数据类型? ### 插件如何确定各列的数据类型?
根据收到的第一批数据自动推断各列的类型。 根据收到的第一批数据自动推断各列的类型。
### 为什么插入10年前的数据会抛异常`TDengine ERROR (2350): failed to execute batch bind` ?
因为创建数据库的时候默认保留10年的数据。可以手动指定要保留多长时间的数据比如:`CREATE DATABASE power KEEP 36500;`。

View File

@ -36,6 +36,12 @@
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.34</version>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>

View File

@ -1,6 +1,7 @@
package com.alibaba.datax.plugin.writer.tdenginewriter; package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.plugin.TaskPluginCollector;
import java.util.Properties; import java.util.Properties;

View File

@ -16,6 +16,14 @@ public class TDengineWriter extends Writer {
private static final String PEER_PLUGIN_NAME = "peerPluginName"; private static final String PEER_PLUGIN_NAME = "peerPluginName";
static {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public static class Job extends Writer.Job { public static class Job extends Writer.Job {
private Configuration originalConfig; private Configuration originalConfig;
@ -33,7 +41,7 @@ public class TDengineWriter extends Writer {
@Override @Override
public List<Configuration> split(int mandatoryNumber) { public List<Configuration> split(int mandatoryNumber) {
List<Configuration> writerSplitConfigs = new ArrayList<Configuration>(); List<Configuration> writerSplitConfigs = new ArrayList<>();
for (int i = 0; i < mandatoryNumber; i++) { for (int i = 0; i < mandatoryNumber; i++) {
writerSplitConfigs.add(this.originalConfig); writerSplitConfigs.add(this.originalConfig);
} }
@ -49,6 +57,7 @@ public class TDengineWriter extends Writer {
@Override @Override
public void init() { public void init() {
this.writerSliceConfig = getPluginJobConf(); this.writerSliceConfig = getPluginJobConf();
} }
@Override @Override
@ -70,7 +79,7 @@ public class TDengineWriter extends Writer {
if (!keys.contains(Key.PASSWORD)) { if (!keys.contains(Key.PASSWORD)) {
properties.setProperty(Key.PASSWORD, "taosdata"); properties.setProperty(Key.PASSWORD, "taosdata");
} }
LOG.debug("========================properties==========================\n" + properties.toString()); LOG.debug("========================properties==========================\n" + properties);
String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME); String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME);
LOG.debug("start to handle record from: " + peerPluginName); LOG.debug("start to handle record from: " + peerPluginName);
DataHandler handler = DataHandlerFactory.build(peerPluginName); DataHandler handler = DataHandlerFactory.build(peerPluginName);