This commit is contained in:
1486756632 2025-04-10 16:21:43 +08:00 committed by GitHub
commit 2785234d3d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
150 changed files with 6862 additions and 108 deletions

View File

@ -37,6 +37,7 @@ DataX本身作为数据同步框架将不同数据源的同步抽象为从源
DataX目前已经有了比较全面的插件体系主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入目前支持数据如下图详情请点击[DataX数据源参考指南](https://github.com/alibaba/DataX/wiki/DataX-all-data-channels)
| 类型 | 数据源 | Reader(读) | Writer(写) | 文档 |
|--------------|---------------------------|:---------:|:---------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:|
| RDBMS 关系型数据库 | MySQL | √ | √ | [](https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md) |

View File

@ -132,7 +132,14 @@
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>tdenginereader/target/datax/</directory>
<directory>tdengine20reader/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>tdengine30reader/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
@ -267,7 +274,14 @@
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>tdenginewriter/target/datax/</directory>
<directory>tdengine20writer/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>tdengine30writer/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>

View File

@ -74,7 +74,8 @@
<module>clickhousereader</module>
<module>mongodbreader</module>
<module>tdenginereader</module>
<module>tdengine20reader</module>
<module>tdengine30reader</module>
<module>gdbreader</module>
<module>tsdbreader</module>
<module>opentsdbreader</module>
@ -115,7 +116,8 @@
<module>elasticsearchwriter</module>
<module>mongodbwriter</module>
<module>tdenginewriter</module>
<module>tdengine20writer</module>
<module>tdengine30writer</module>
<module>ocswriter</module>
<module>tsdbwriter</module>
<module>gdbwriter</module>

View File

@ -20,7 +20,7 @@ TDengineReader 通过 TDengine 的 JDBC driver 查询获取数据。
"content": [
{
"reader": {
"name": "tdenginereader",
"name": "tdengine20reader",
"parameter": {
"username": "root",
"password": "taosdata",
@ -71,7 +71,7 @@ TDengineReader 通过 TDengine 的 JDBC driver 查询获取数据。
"content": [
{
"reader": {
"name": "tdenginereader",
"name": "tdengine20reader",
"parameter": {
"user": "root",
"password": "taosdata",

123
tdengine20reader/pom.xml Normal file
View File

@ -0,0 +1,123 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>tdengine20reader</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.datax.tdengine20writer</groupId>
<artifactId>tdengine20writer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.42</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<!-- 添加 dm8 jdbc jar 包依赖-->
<!-- <dependency>-->
<!-- <groupId>com.dameng</groupId>-->
<!-- <artifactId>dm-jdbc</artifactId>-->
<!-- <version>1.8</version>-->
<!-- <scope>system</scope>-->
<!-- <systemPath>${project.basedir}/src/test/resources/DmJdbcDriver18.jar</systemPath>-->
<!-- </dependency>-->
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12.4</version>
<configuration>
<!-- 包含哪些测试用例 -->
<includes>
<include>**/*Test.java</include>
</includes>
<!-- 不包含哪些测试用例 -->
<excludes>
</excludes>
<testFailureIgnore>true</testFailureIgnore>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -13,21 +13,21 @@
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/reader/tdenginereader</outputDirectory>
<outputDirectory>plugin/reader/tdengine20reader</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>tdenginereader-0.0.1-SNAPSHOT.jar</include>
<include>tdengine20reader-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/reader/tdenginereader</outputDirectory>
<outputDirectory>plugin/reader/tdengine20reader</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/reader/tdenginereader/libs</outputDirectory>
<outputDirectory>plugin/reader/tdengine20reader/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>

View File

@ -1,11 +1,12 @@
package com.alibaba.datax.plugin.reader;
package com.alibaba.datax.plugin.tdengine20reader;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.tdenginewriter.Key;
import com.alibaba.datax.plugin.writer.tdengine20writer.Key;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -15,8 +16,6 @@ import java.sql.*;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class TDengineReader extends Reader {

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.reader;
package com.alibaba.datax.plugin.tdengine20reader;
import com.alibaba.datax.common.spi.ErrorCode;

View File

@ -1,6 +1,6 @@
{
"name": "tdenginereader",
"class": "com.alibaba.datax.plugin.reader.TDengineReader",
"name": "tdengine20reader",
"class": "com.alibaba.datax.plugin.tdengine20reader.TDengineReader",
"description": {
"useScene": "data migration from tdengine",
"mechanism": "use JDBC to read data from tdengine."

View File

@ -1,5 +1,5 @@
{
"name": "tdenginereader",
"name": "tdengine20reader",
"parameter": {
"user": "",
"password": "",

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.reader;
package com.alibaba.datax.plugin.tdengine20reader;
import com.alibaba.datax.core.Engine;
import org.junit.Ignore;

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.reader;
package com.alibaba.datax.plugin.tdengine20reader;
import com.alibaba.datax.core.Engine;
import org.junit.Ignore;

View File

@ -1,7 +1,7 @@
package com.alibaba.datax.plugin.reader;
package com.alibaba.datax.plugin.tdengine20reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.tdenginewriter.Key;
import com.alibaba.datax.plugin.writer.tdengine20writer.Key;
import org.junit.Assert;
import org.junit.Test;

View File

@ -3,7 +3,7 @@
"content": [
{
"reader": {
"name": "tdenginereader",
"name": "tdengine20reader",
"parameter": {
"username": "root",
"password": "taosdata",

View File

@ -3,7 +3,7 @@
"content": [
{
"reader": {
"name": "tdenginereader",
"name": "tdengine20reader",
"parameter": {
"username": "root",
"password": "taosdata",

View File

@ -3,7 +3,7 @@
"content": [
{
"reader": {
"name": "tdenginereader",
"name": "tdengine20reader",
"parameter": {
"username": "root",
"password": "taosdata",

View File

@ -1,6 +1,6 @@
# DataX TDengineWriter
简体中文| [English](./tdenginewriter.md)
简体中文| [English](./tdengine20writer.md)
## 1 快速介绍
@ -80,7 +80,7 @@ create table test.weather (ts timestamp, temperature int, humidity double) tags(
}
},
"writer": {
"name": "tdenginewriter",
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",

View File

@ -1,6 +1,6 @@
# DataX TDengineWriter
[简体中文](./tdenginewriter-CN.md) | English
[简体中文](./tdengine20writer-CN.md) | English
## 1 Quick Introduction
@ -82,7 +82,7 @@ Write data to TDengine using the following Job configuration:
}
},
"writer": {
"name": "tdenginewriter",
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",

View File

@ -9,8 +9,8 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.alibaba.datax.tdenginewriter</groupId>
<artifactId>tdenginewriter</artifactId>
<groupId>com.alibaba.datax.tdengine20writer</groupId>
<artifactId>tdengine20writer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
@ -23,7 +23,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.39</version>
<version>2.0.42</version>
</dependency>
<dependency>

View File

@ -13,21 +13,21 @@
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/tdenginewriter</outputDirectory>
<outputDirectory>plugin/writer/tdengine20writer</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>tdenginewriter-0.0.1-SNAPSHOT.jar</include>
<include>tdengine20writer-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/tdenginewriter</outputDirectory>
<outputDirectory>plugin/writer/tdengine20writer</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/tdenginewriter/libs</outputDirectory>
<outputDirectory>plugin/writer/tdengine20writer/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
public class ColumnMeta {
String field;

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
public class Constants {
public static final String DEFAULT_USERNAME = "root";

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
@ -82,6 +82,18 @@ public class DefaultDataHandler implements DataHandler {
// prepare table_name -> column_meta
this.tbnameColumnMetasMap = schemaManager.loadColumnMetas(tables);
// filter column
for (String tableName : tbnameColumnMetasMap.keySet()) {
List<ColumnMeta> columnMetaList = tbnameColumnMetasMap.get(tableName);
Iterator<ColumnMeta> iterator = columnMetaList.iterator();
while (iterator.hasNext()) {
ColumnMeta columnMeta = iterator.next();
if (!this.columns.contains(columnMeta.field)) {
iterator.remove();
}
}
}
List<Record> recordBatch = new ArrayList<>();
Record record;
for (int i = 1; (record = lineReceiver.getFromReader()) != null; i++) {
@ -226,14 +238,18 @@ public class DefaultDataHandler implements DataHandler {
ColumnMeta columnMeta = columnMetas.get(colIndex);
if (columnMeta.isTag) {
Column column = record.getColumn(colIndex);
switch (columnMeta.type) {
case "TINYINT":
case "SMALLINT":
case "INT":
case "BIGINT":
return column.asLong().toString();
default:
return column.asString();
try {
switch (columnMeta.type) {
case "TINYINT":
case "SMALLINT":
case "INT":
case "BIGINT":
return column.asLong().toString();
default:
return column.asString();
}
} catch (Exception e) {
LOG.error("failed to get Tag, colIndex: " + colIndex + ", ColumnMeta: " + columnMeta + ", record: " + record, e);
}
}
return "";
@ -250,8 +266,8 @@ public class DefaultDataHandler implements DataHandler {
StringBuilder sb = new StringBuilder("insert into");
for (Record record : recordBatch) {
sb.append(" ").append(record.getColumn(indexOf("tbname")).asString())
.append(" using ").append(table)
sb.append(" `").append(record.getColumn(indexOf("tbname")).asString())
.append("` using ").append(table)
.append(" tags")
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return colMeta.isTag;
@ -470,7 +486,7 @@ public class DefaultDataHandler implements DataHandler {
List<ColumnMeta> columnMetas = this.tbnameColumnMetasMap.get(table);
StringBuilder sb = new StringBuilder();
sb.append("insert into ").append(table).append(" ")
sb.append("insert into `").append(table).append("` ")
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return !colMeta.isTag;
}).map(colMeta -> {
@ -540,8 +556,8 @@ public class DefaultDataHandler implements DataHandler {
List<ColumnMeta> columnMetas = this.tbnameColumnMetasMap.get(table);
StringBuilder sb = new StringBuilder();
sb.append("insert into ").append(table)
.append(" ")
sb.append("insert into `").append(table)
.append("` ")
.append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
return colMeta.field;
}).collect(Collectors.joining(",", "(", ")")))

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
public class Key {
public static final String USERNAME = "username";

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
@ -14,7 +14,6 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class OpentsdbDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(OpentsdbDataHandler.class);

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
import com.alibaba.datax.common.exception.DataXException;
import org.apache.commons.lang3.StringUtils;

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
import com.alibaba.datax.common.spi.ErrorCode;

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
public class TableMeta {
TableType tableType;

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
public enum TableType {
SUP_TABLE, SUB_TABLE, NML_TABLE

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
public enum TimestampPrecision {
MILLISEC, MICROSEC, NANOSEC

View File

@ -1,6 +1,6 @@
{
"name": "tdenginewriter",
"class": "com.alibaba.datax.plugin.writer.tdenginewriter.TDengineWriter",
"name": "tdengine20writer",
"class": "com.alibaba.datax.plugin.writer.tdengine20writer.TDengineWriter",
"description": {
"useScene": "data migration to tdengine",
"mechanism": "use taos-jdbcdriver to write data."

View File

@ -1,5 +1,5 @@
{
"name": "tdenginewriter",
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
import com.alibaba.datax.core.Engine;
import org.junit.Ignore;

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
import com.alibaba.datax.core.Engine;
import org.junit.Before;

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.LongColumn;

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
import com.alibaba.datax.core.Engine;
import org.junit.Test;

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
import com.alibaba.datax.core.Engine;
import org.junit.Before;
@ -11,7 +11,7 @@ import java.util.Random;
public class Mysql2TDengineTest {
private static final String host1 = "192.168.56.105";
private static final String host2 = "192.168.1.93";
private static final String host2 = "192.168.56.105";
private static final Random random = new Random(System.currentTimeMillis());
@Test
@ -21,6 +21,13 @@ public class Mysql2TDengineTest {
Engine.entry(params);
}
@Test
public void test2() throws Throwable {
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/m2t-2.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Before
public void before() throws SQLException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@ -50,7 +57,7 @@ public class Mysql2TDengineTest {
stmt.close();
}
final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041/";
final String url2 = "jdbc:TAOS://" + host2 + ":6030/";
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
Statement stmt = conn.createStatement();

View File

@ -0,0 +1,66 @@
package com.alibaba.datax.plugin.writer.tdengine20writer;
import com.alibaba.datax.core.Engine;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Random;
public class Mysql2TDengineTest2 {
private static final String host1 = "192.168.56.105";
private static final String host2 = "192.168.56.105";
private static final Random random = new Random(System.currentTimeMillis());
@Test
public void test2() throws Throwable {
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/m2t-2.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Before
public void before() throws SQLException {
final String[] tagList = {"北京", "海淀", "上海", "河北", "天津"};
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String ts = sdf.format(new Date(System.currentTimeMillis()));
final String url = "jdbc:mysql://" + host1 + ":3306/?useSSL=false&useUnicode=true&charset=UTF-8&generateSimpleParameterMetadata=true";
try (Connection conn = DriverManager.getConnection(url, "root", "123456")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db1");
stmt.execute("create database if not exists db1");
stmt.execute("use db1");
stmt.execute("create table stb1(id int primary key AUTO_INCREMENT, " +
"f1 int, f2 float, f3 double, f4 varchar(100), t1 varchar(100), ts timestamp)");
for (int i = 1; i <= 10; i++) {
String sql = "insert into stb1(f1, f2, f3, f4, t1, ts) values("
+ random.nextInt(100) + "," + random.nextFloat() * 100 + "," + random.nextDouble() * 100
+ ",'" + RandomStringUtils.randomAlphanumeric(10)
+ "', '" + tagList[random.nextInt(tagList.length)]
+ "', '" + (ts + i * 1000) + "')";
stmt.execute(sql);
}
stmt.close();
}
final String url2 = "jdbc:TAOS://" + host2 + ":6030/";
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db2");
stmt.execute("create database if not exists db2");
stmt.execute("create table db2.stb2(ts timestamp, f1 int, f2 float, f3 double, f4 nchar(100)) tags(t1 nchar(100))");
stmt.close();
}
}
}

View File

@ -0,0 +1,76 @@
package com.alibaba.datax.plugin.writer.tdengine20writer;
import com.alibaba.datax.core.Engine;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class Mysql2TDengineTest3 {
private static final String host1 = "192.168.56.105";
private static final String host2 = "192.168.56.105";
private static final Random random = new Random(System.currentTimeMillis());
@Test
public void test2() throws Throwable {
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/m2t-3.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Before
public void before() throws SQLException {
// given
long ts_start = new Date(System.currentTimeMillis()).getTime();
final int columnSize = 10;
final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
final String url = "jdbc:mysql://" + host1 + ":3306/?useSSL=false&useUnicode=true&charset=UTF-8&generateSimpleParameterMetadata=true";
try (Connection conn = DriverManager.getConnection(url, "root", "123456")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db1");
stmt.execute("create database if not exists db1");
stmt.execute("use db1");
stmt.execute("create table stb1(id int primary key AUTO_INCREMENT, "
+ IntStream.range(1, columnSize).mapToObj(i -> "f" + i + " int").collect(Collectors.joining(",")) + ", "
+ IntStream.range(1, columnSize).mapToObj(i -> "t" + i + " varchar(20)").collect(Collectors.joining(",")) + ", ts timestamp)");
for (int i = 1; i <= 10; i++) {
String sql = "insert into stb1("
+ IntStream.range(1, columnSize).mapToObj(index -> "f" + index).collect(Collectors.joining(",")) + ", "
+ IntStream.range(1, columnSize).mapToObj(index -> "t" + index).collect(Collectors.joining(","))
+ ", ts) values("
+ IntStream.range(1, columnSize).mapToObj(index -> random.nextInt(10) + "").collect(Collectors.joining(","))
+ ","
+ IntStream.range(1, columnSize).mapToObj(index -> "'" + RandomStringUtils.randomAlphanumeric(15) + "'").collect(Collectors.joining(","))
+ ", '" + sdf.format(new Date(ts_start + i * 1000)) + "')";
stmt.execute(sql);
}
stmt.close();
}
final String url2 = "jdbc:TAOS://" + host2 + ":6030/";
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db2");
stmt.execute("create database if not exists db2");
stmt.execute("create table db2.stb2(ts timestamp, "
+ IntStream.range(1, 101).mapToObj(i -> "f" + i + " int").collect(Collectors.joining(",")) + ") tags("
+ IntStream.range(1, 101).mapToObj(i -> "t" + i + " nchar(20)").collect(Collectors.joining(","))
+ ")"
);
stmt.close();
}
}
}

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
import com.alibaba.datax.core.Engine;
import org.junit.Assert;

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
import org.junit.AfterClass;
import org.junit.Assert;

View File

@ -1,7 +1,6 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
import com.alibaba.datax.core.Engine;
import org.junit.Before;
import org.junit.Test;
import java.sql.Connection;

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
import com.alibaba.datax.core.Engine;
import org.junit.Before;

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
package com.alibaba.datax.plugin.writer.tdengine20writer;
import com.alibaba.datax.common.util.Configuration;
import org.junit.Assert;

View File

@ -6,7 +6,7 @@
"name": "txtfilereader",
"parameter": {
"path": [
"/Users/yangzy/IdeaProjects/DataX/tdenginewriter/src/test/resources/weather.csv"
"/Users/yangzy/IdeaProjects/DataX/tdengine20writer/src/test/resources/weather.csv"
],
"encoding": "UTF-8",
"column": [
@ -44,7 +44,7 @@
}
},
"writer": {
"name": "tdenginewriter",
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",

View File

@ -161,7 +161,7 @@
}
},
"writer": {
"name": "tdenginewriter",
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",

View File

@ -21,7 +21,7 @@
}
},
"writer": {
"name": "tdenginewriter",
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",

View File

@ -0,0 +1,62 @@
{
"job": {
"content": [
{
"reader": {
"name": "rdbmsreader",
"parameter": {
"username": "TESTUSER",
"password": "test123456",
"connection": [
{
"querySql": [
"select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1;"
],
"jdbcUrl": [
"jdbc:dm://192.168.0.72:5236"
]
}
],
"fetchSize": 1024
}
},
"writer": {
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"tbname",
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"f10"
],
"connection": [
{
"table": [
"t1_0"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -36,7 +36,7 @@
}
},
"writer": {
"name": "tdenginewriter",
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",
@ -58,7 +58,7 @@
"table": [
"stb2"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
"jdbcUrl": "jdbc:TAOS://192.168.1.93:6030/db2"
}
],
"batchSize": 1000,
@ -73,4 +73,4 @@
}
}
}
}
}

View File

@ -21,7 +21,7 @@
}
},
"writer": {
"name": "tdenginewriter",
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",

View File

@ -65,7 +65,7 @@
}
},
"writer": {
"name": "tdenginewriter",
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",

View File

@ -16,7 +16,7 @@
}
},
"writer": {
"name": "tdenginewriter",
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",

View File

@ -21,7 +21,7 @@
}
},
"writer": {
"name": "tdenginewriter",
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",

View File

@ -21,7 +21,7 @@
}
},
"writer": {
"name": "tdenginewriter",
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",

View File

@ -22,7 +22,7 @@
}
},
"writer": {
"name": "tdenginewriter",
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",

View File

@ -3,7 +3,7 @@
"content": [
{
"reader": {
"name": "tdenginereader",
"name": "tdengine20reader",
"parameter": {
"username": "root",
"password": "taosdata",

View File

@ -3,7 +3,7 @@
"content": [
{
"reader": {
"name": "tdenginereader",
"name": "tdengine20reader",
"parameter": {
"username": "root",
"password": "taosdata",

View File

@ -33,7 +33,7 @@
}
},
"writer": {
"name": "tdenginewriter",
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",

View File

@ -0,0 +1,57 @@
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"splitPk": "id",
"connection": [
{
"querySql": [
"select t1 as tbname, ts, f1,f2,f3,f4,t1 from stb1"
],
"jdbcUrl": [
"jdbc:mysql://192.168.56.105:3306/db1?useSSL=false&useUnicode=true&characterEncoding=utf8"
]
}
]
}
},
"writer": {
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"tbname",
"ts",
"f1",
"f2",
"f3",
"f4",
"t1"
],
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:TAOS://192.168.56.105:6030/db2"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,53 @@
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"splitPk": "id",
"connection": [
{
"querySql": [
"select ts,f1,t1 from stb1"
],
"jdbcUrl": [
"jdbc:mysql://192.168.56.105:3306/db1?useSSL=false&useUnicode=true&characterEncoding=utf8"
]
}
]
}
},
"writer": {
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"ts",
"f1",
"t1"
],
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:TAOS://192.168.56.105:6030/db2"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -33,7 +33,7 @@
}
},
"writer": {
"name": "tdenginewriter",
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "hmdata",

View File

@ -11,7 +11,7 @@
}
},
"writer": {
"name": "tdenginewriter",
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",

View File

@ -0,0 +1,94 @@
{
"job": {
"content": [
{
"reader": {
"name": "tdengine20reader",
"parameter": {
"username": "root",
"password": "taosdata",
"connection": [
{
"table": [
"stb1"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1?timestampFormat=TIMESTAMP"
}
],
"column": [
"tbname",
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"t1",
"t2",
"t3",
"t4",
"t5",
"t6",
"t7",
"t8",
"t9",
"t10"
],
"beginDateTime": "2022-02-15 00:00:00",
"endDateTime": "2022-02-16 00:00:00",
"splitInterval": "1d"
}
},
"writer": {
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"tbname",
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"t1",
"t2",
"t3",
"t4",
"t5",
"t6",
"t7",
"t8",
"t9",
"t10"
],
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2?timestampFormat=TIMESTAMP"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,92 @@
{
"job": {
"content": [
{
"reader": {
"name": "tdengine20reader",
"parameter": {
"username": "root",
"password": "taosdata",
"connection": [
{
"table": [
"stb1"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1?timestampFormat=TIMESTAMP"
}
],
"column": [
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"t1",
"t2",
"t3",
"t4",
"t5",
"t6",
"t7",
"t8",
"t9",
"t10"
],
"beginDateTime": "2022-02-15 00:00:00",
"endDateTime": "2022-02-16 00:00:00",
"splitInterval": "1d"
}
},
"writer": {
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"t1",
"t2",
"t3",
"t4",
"t5",
"t6",
"t7",
"t8",
"t9",
"t10"
],
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:TAOS://192.168.1.93:6030/db2"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,92 @@
{
"job": {
"content": [
{
"reader": {
"name": "tdengine20reader",
"parameter": {
"username": "root",
"password": "taosdata",
"connection": [
{
"table": [
"stb1"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1?timestampFormat=TIMESTAMP"
}
],
"column": [
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"t1",
"t2",
"t3",
"t4",
"t5",
"t6",
"t7",
"t8",
"t9",
"t10"
],
"beginDateTime": "2022-02-15 00:00:00",
"endDateTime": "2022-02-16 00:00:00",
"splitInterval": "1d"
}
},
"writer": {
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"t1",
"t2",
"t3",
"t4",
"t5",
"t6",
"t7",
"t8",
"t9",
"t10"
],
"connection": [
{
"table": [
"t1"
],
"jdbcUrl": "jdbc:TAOS://192.168.1.93:6030/db2?timestampFormat=TIMESTAMP"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,72 @@
{
"job": {
"content": [
{
"reader": {
"name": "tdengine20reader",
"parameter": {
"username": "root",
"password": "taosdata",
"connection": [
{
"table": [
"stb1"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1?timestampFormat=TIMESTAMP"
}
],
"column": [
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9"
],
"beginDateTime": "2022-02-15 00:00:00",
"endDateTime": "2022-02-16 00:00:00",
"splitInterval": "1d"
}
},
"writer": {
"name": "tdengine20writer",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9"
],
"connection": [
{
"table": [
"weather"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,195 @@
# DataX TDengineReader
## 1 快速介绍
TDengineReader 插件实现了 TDengine 读取数据的功能。
## 2 实现原理
TDengineReader 通过 TDengine 的 JDBC driver 查询获取数据。
## 3 功能说明
### 3.1 配置样例
* 配置一个从 TDengine 抽取数据作业:
```json
{
"job": {
"content": [
{
"reader": {
"name": "tdengine30reader",
"parameter": {
"username": "root",
"password": "taosdata",
"connection": [
{
"table": [
"meters"
],
"jdbcUrl": [
"jdbc:TAOS-RS://192.168.56.105:6041/test?timestampFormat=TIMESTAMP"
]
}
],
"column": [
"ts",
"current",
"voltage",
"phase"
],
"where": "ts>=0",
"beginDateTime": "2017-07-14 10:40:00",
"endDateTime": "2017-08-14 10:40:00"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
```
* 配置一个自定义 SQL 的数据抽取作业:
```json
{
"job": {
"content": [
{
"reader": {
"name": "tdengine30reader",
"parameter": {
"user": "root",
"password": "taosdata",
"connection": [
{
"querySql": [
"select * from test.meters"
],
"jdbcUrl": [
"jdbc:TAOS-RS://192.168.56.105:6041/test?timestampFormat=TIMESTAMP"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
```
### 3.2 参数说明
* **username**
* 描述TDengine 实例的用户名 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **password**
* 描述TDengine 实例的密码 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **jdbcUrl**
* 描述TDengine 数据库的JDBC连接信息。注意jdbcUrl必须包含在connection配置单元中。JdbcUrl具体请参看TDengine官方文档。
* 必选:是 <br />
* 默认值:无<br />
* **querySql**
* 描述在有些业务场景下where 这一配置项不足以描述所筛选的条件用户可以通过该配置型来自定义筛选SQL。当用户配置了 querySql 后, TDengineReader 就会忽略 table, column,
where, beginDateTime, endDateTime这些配置型直接使用这个配置项的内容对数据进行筛选。例如需要 进行多表join后同步数据使用 select a,b from table_a join
table_b on table_a.id = table_b.id<br />
* 必选:否 <br />
* 默认值:无 <br />
* **table**
* 描述:所选取的需要同步的表。使用 JSON 的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一 schema 结构, TDengineReader不予检查表是否同一逻辑表。注意table必须包含在
connection 配置单元中。<br />
* 必选:是 <br />
* 默认值:无 <br />
* **where**
* 描述:筛选条件中的 where 子句TDengineReader 根据指定的column, table, where, begingDateTime, endDateTime 条件拼接 SQL并根据这个 SQL
进行数据抽取。 <br />
* 必选:否 <br />
* 默认值:无 <br />
* **beginDateTime**
* 描述数据的开始时间Job 迁移从 begineDateTime 到 endDateTime 的数据,格式为 yyyy-MM-dd HH:mm:ss <br />
* 必选:否 <br />
* 默认值:无 <br />
* **endDateTime**
* 描述数据的结束时间Job 迁移从 begineDateTime 到 endDateTime 的数据,格式为 yyyy-MM-dd HH:mm:ss <br />
* 必选:否 <br />
* 默认值:无 <br />
### 3.3 类型转换
| TDengine 数据类型 | DataX 内部类型 |
| --------------- | ------------- |
| TINYINT | Long |
| SMALLINT | Long |
| INTEGER | Long |
| BIGINT | Long |
| FLOAT | Double |
| DOUBLE | Double |
| BOOLEAN | Bool |
| TIMESTAMP | Date |
| BINARY | Bytes |
| NCHAR | String |
## 4 性能报告
### 4.1 环境准备
#### 4.1.1 数据特征
#### 4.1.2 机器参数
#### 4.1.3 DataX jvm 参数
-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
### 4.2 测试报告
#### 4.2.1 单表测试报告
| 通道数| DataX速度(Rec/s)|DataX流量(MB/s)| DataX机器网卡流出流量(MB/s)|DataX机器运行负载|DB网卡进入流量(MB/s)|DB运行负载|DB TPS|
|--------| --------|--------|--------|--------|--------|--------|--------|
|1| | | | | | | |
|4| | | | | | | |
|8| | | | | | | |
|16| | | | | | | |
|32| | | | | | | |
说明:
#### 4.2.4 性能测试小结
1.
2.
## 5 约束限制
## FAQ

View File

@ -9,7 +9,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>tdenginereader</artifactId>
<artifactId>tdengine30reader</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
@ -28,20 +28,17 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.datax.tdenginewriter</groupId>
<artifactId>tdenginewriter</artifactId>
<groupId>com.alibaba.datax.tdengine30writer</groupId>
<artifactId>tdengine30writer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.39</version>
<version>3.2.9</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -0,0 +1,34 @@
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/reader/tdengine30reader</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>tdengine30reader-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/reader/tdengine30reader</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/reader/tdengine30reader/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,266 @@
package com.alibaba.datax.plugin.tdengine30reader;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.tdengine30writer.Key;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.sql.*;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
public class TDengineReader extends Reader {
private static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
public static class Job extends Reader.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig;
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
// check username
String username = this.originalConfig.getString(Key.USERNAME);
if (StringUtils.isBlank(username))
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.USERNAME + "] is not set.");
// check password
String password = this.originalConfig.getString(Key.PASSWORD);
if (StringUtils.isBlank(password))
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.PASSWORD + "] is not set.");
// check connection
List<Configuration> connectionList = this.originalConfig.getListConfiguration(Key.CONNECTION);
if (connectionList == null || connectionList.isEmpty())
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.CONNECTION + "] is not set.");
for (int i = 0; i < connectionList.size(); i++) {
Configuration conn = connectionList.get(i);
// check jdbcUrl
List<Object> jdbcUrlList = conn.getList(Key.JDBC_URL);
if (jdbcUrlList == null || jdbcUrlList.isEmpty()) {
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.JDBC_URL + "] of connection[" + (i + 1) + "] is not set.");
}
// check table/querySql
List<Object> querySqlList = conn.getList(Key.QUERY_SQL);
if (querySqlList == null || querySqlList.isEmpty()) {
String querySql = conn.getString(Key.QUERY_SQL);
if (StringUtils.isBlank(querySql)) {
List<Object> table = conn.getList(Key.TABLE);
if (table == null || table.isEmpty())
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.TABLE + "] of connection[" + (i + 1) + "] is not set.");
}
}
}
SimpleDateFormat format = new SimpleDateFormat(DATETIME_FORMAT);
// check beginDateTime
String beginDatetime = this.originalConfig.getString(Key.BEGIN_DATETIME);
long start = Long.MIN_VALUE;
if (!StringUtils.isBlank(beginDatetime)) {
try {
start = format.parse(beginDatetime).getTime();
} catch (ParseException e) {
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.BEGIN_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format.");
}
}
// check endDateTime
String endDatetime = this.originalConfig.getString(Key.END_DATETIME);
long end = Long.MAX_VALUE;
if (!StringUtils.isBlank(endDatetime)) {
try {
end = format.parse(endDatetime).getTime();
} catch (ParseException e) {
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.END_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format.");
}
}
if (start >= end)
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.BEGIN_DATETIME + "] should be less than the parameter [" + Key.END_DATETIME + "].");
}
@Override
public void destroy() {
}
@Override
public List<Configuration> split(int adviceNumber) {
List<Configuration> configurations = new ArrayList<>();
List<Configuration> connectionList = this.originalConfig.getListConfiguration(Key.CONNECTION);
for (Configuration conn : connectionList) {
List<String> jdbcUrlList = conn.getList(Key.JDBC_URL, String.class);
for (String jdbcUrl : jdbcUrlList) {
Configuration clone = this.originalConfig.clone();
clone.set(Key.JDBC_URL, jdbcUrl);
clone.set(Key.TABLE, conn.getList(Key.TABLE));
clone.set(Key.QUERY_SQL, conn.getList(Key.QUERY_SQL));
clone.remove(Key.CONNECTION);
configurations.add(clone);
}
}
LOG.info("Configuration: {}", configurations);
return configurations;
}
}
public static class Task extends Reader.Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private Configuration readerSliceConfig;
private String mandatoryEncoding;
private Connection conn;
private List<String> tables;
private List<String> columns;
private String startTime;
private String endTime;
private String where;
private List<String> querySql;
static {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
} catch (ClassNotFoundException ignored) {
LOG.warn(ignored.getMessage(), ignored);
}
}
@Override
public void init() {
this.readerSliceConfig = super.getPluginJobConf();
String user = readerSliceConfig.getString(Key.USERNAME);
String password = readerSliceConfig.getString(Key.PASSWORD);
String url = readerSliceConfig.getString(Key.JDBC_URL);
try {
this.conn = DriverManager.getConnection(url, user, password);
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineReaderErrorCode.CONNECTION_FAILED,
"The parameter [" + Key.JDBC_URL + "] : " + url + " failed to connect since: " + e.getMessage(), e);
}
this.tables = readerSliceConfig.getList(Key.TABLE, String.class);
this.columns = readerSliceConfig.getList(Key.COLUMN, String.class);
this.startTime = readerSliceConfig.getString(Key.BEGIN_DATETIME);
this.endTime = readerSliceConfig.getString(Key.END_DATETIME);
this.where = readerSliceConfig.getString(Key.WHERE, "_c0 > " + Long.MIN_VALUE);
this.querySql = readerSliceConfig.getList(Key.QUERY_SQL, String.class);
this.mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, "UTF-8");
}
@Override
public void destroy() {
try {
if (conn != null)
conn.close();
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
}
@Override
public void startRead(RecordSender recordSender) {
List<String> sqlList = new ArrayList<>();
if (querySql == null || querySql.isEmpty()) {
for (String table : tables) {
StringBuilder sb = new StringBuilder();
sb.append("select ").append(StringUtils.join(columns, ",")).append(" from ").append(table).append(" ");
sb.append("where ").append(where);
if (!StringUtils.isBlank(startTime)) {
sb.append(" and _c0 >= '").append(startTime).append("'");
}
if (!StringUtils.isBlank(endTime)) {
sb.append(" and _c0 < '").append(endTime).append("'");
}
String sql = sb.toString().trim();
sqlList.add(sql);
}
} else {
sqlList.addAll(querySql);
}
for (String sql : sqlList) {
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
Record record = buildRecord(recordSender, rs, mandatoryEncoding);
recordSender.sendToWriter(record);
}
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
}
}
private Record buildRecord(RecordSender recordSender, ResultSet rs, String mandatoryEncoding) {
Record record = recordSender.createRecord();
try {
ResultSetMetaData metaData = rs.getMetaData();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
int columnType = metaData.getColumnType(i);
switch (columnType) {
case Types.SMALLINT:
case Types.TINYINT:
case Types.INTEGER:
case Types.BIGINT:
record.addColumn(new LongColumn(rs.getString(i)));
break;
case Types.FLOAT:
case Types.DOUBLE:
record.addColumn(new DoubleColumn(rs.getString(i)));
break;
case Types.BOOLEAN:
record.addColumn(new BoolColumn(rs.getBoolean(i)));
break;
case Types.TIMESTAMP:
record.addColumn(new DateColumn(rs.getTimestamp(i)));
break;
case Types.BINARY:
case Types.VARCHAR:
record.addColumn(new BytesColumn(rs.getBytes(i)));
break;
case Types.NCHAR:
String rawData;
if (StringUtils.isBlank(mandatoryEncoding)) {
rawData = rs.getString(i);
} else {
rawData = new String((rs.getBytes(i) == null ? new byte[0] : rs.getBytes(i)), mandatoryEncoding);
}
record.addColumn(new StringColumn(rawData));
break;
}
}
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "database query error", e);
} catch (UnsupportedEncodingException e) {
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "illegal mandatoryEncoding", e);
}
return record;
}
}
}

View File

@ -0,0 +1,34 @@
package com.alibaba.datax.plugin.tdengine30reader;
import com.alibaba.datax.common.spi.ErrorCode;
public enum TDengineReaderErrorCode implements ErrorCode {
REQUIRED_VALUE("TDengineReader-00", "parameter value is missing"),
ILLEGAL_VALUE("TDengineReader-01", "invalid parameter value"),
CONNECTION_FAILED("TDengineReader-02", "connection error"),
RUNTIME_EXCEPTION("TDengineWriter-03", "runtime exception");
private final String code;
private final String description;
TDengineReaderErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);
}
}

View File

@ -0,0 +1,9 @@
{
"name": "tdengine30reader",
"class": "com.alibaba.datax.plugin.tdengine30reader.TDengineReader",
"description": {
"useScene": "data migration from tdengine",
"mechanism": "use JDBC to read data from tdengine."
},
"developer": "zyyang-taosdata"
}

View File

@ -0,0 +1,23 @@
{
"name": "tdengine30reader",
"parameter": {
"user": "",
"password": "",
"connection": [
{
"table": [
""
],
"jdbcUrl": [
""
]
}
],
"column": [
""
],
"beginDateTime": "",
"endDateTime": "",
"where": ""
}
}

View File

@ -0,0 +1,86 @@
package com.alibaba.datax.plugin.tdengine30reader;
import com.alibaba.datax.core.Engine;
import org.junit.Ignore;
import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Random;
@Ignore
public class TDengine2DMTest {
private static final String host1 = "192.168.56.105";
private static final String host2 = "192.168.0.72";
private final Random random = new Random(System.currentTimeMillis());
@Test
public void t2dm_case01() throws Throwable {
// given
createSupTable("ms");
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2dm.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Test
public void t2dm_case02() throws Throwable {
// given
createSupTable("us");
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2dm.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Test
public void t2dm_case03() throws Throwable {
// given
createSupTable("ns");
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2dm.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
private void createSupTable(String precision) throws SQLException {
final String url = "jdbc:TAOS-RS://" + host1 + ":6041/";
try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db1");
stmt.execute("create database if not exists db1 precision '" + precision + "'");
stmt.execute("create table db1.stb1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, " +
"f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, t3 smallint, " +
"t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(100))");
for (int i = 1; i <= 10; i++) {
stmt.execute("insert into db1.tb" + i + " using db1.stb1 tags(now, " + random.nextInt(10) + "," +
random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextInt(10) + "," +
random.nextFloat() + "," + random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123'," +
"'北京朝阳望京') values(now+" + i + "s, " + random.nextInt(10) + "," + random.nextInt(10) + "," +
+random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextFloat() + "," +
random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123','北京朝阳望京')");
}
stmt.close();
}
final String url2 = "jdbc:dm://" + host2 + ":5236";
try (Connection conn = DriverManager.getConnection(url2, "TESTUSER", "test123456")) {
conn.setAutoCommit(true);
Statement stmt = conn.createStatement();
stmt.execute("drop table if exists stb2");
stmt.execute("create table stb2(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, " +
"f6 double, f7 BIT, f8 VARCHAR(100), f9 VARCHAR2(200), t1 timestamp, t2 tinyint, t3 smallint, " +
"t4 int, t5 bigint, t6 float, t7 double, t8 BIT, t9 VARCHAR(100), t10 VARCHAR2(200))");
}
}
}

View File

@ -0,0 +1,66 @@
package com.alibaba.datax.plugin.tdengine30reader;
import com.alibaba.datax.core.Engine;
import org.junit.Ignore;
import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Random;
@Ignore
public class TDengine2StreamTest {
private static final String host = "192.168.56.105";
private static final Random random = new Random(System.currentTimeMillis());
@Test
public void case01() throws Throwable {
// given
prepare("ms");
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2stream-1.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Test
public void case02() throws Throwable {
// given
prepare("ms");
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2stream-2.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
private void prepare(String precision) throws SQLException {
final String url = "jdbc:TAOS-RS://" + host + ":6041/";
try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db1");
stmt.execute("create database if not exists db1 precision '" + precision + "'");
stmt.execute("create table db1.stb1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, " +
"f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, t3 smallint, " +
"t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(100))");
for (int i = 1; i <= 10; i++) {
stmt.execute("insert into db1.tb" + i + " using db1.stb1 tags(now, " + random.nextInt(10) + "," +
random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextInt(10) + "," +
random.nextFloat() + "," + random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123'," +
"'北京朝阳望京') values(now+" + i + "s, " + random.nextInt(10) + "," + random.nextInt(10) + "," +
+random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextFloat() + "," +
random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123','北京朝阳望京')");
}
stmt.close();
}
}
}

View File

@ -0,0 +1,154 @@
package com.alibaba.datax.plugin.tdengine30reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.tdengine30reader.TDengineReader;
import com.alibaba.datax.plugin.writer.tdengine30writer.Key;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class TDengineReaderTest {
@Test
public void jobInit_case01() {
// given
TDengineReader.Job job = new TDengineReader.Job();
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," +
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," +
"\"where\":\"_c0 > 0\"," +
"\"beginDateTime\": \"2021-01-01 00:00:00\"," +
"\"endDateTime\": \"2021-01-01 12:00:00\"" +
"}");
job.setPluginJobConf(configuration);
// when
job.init();
// assert
Configuration conf = job.getPluginJobConf();
Assert.assertEquals("root", conf.getString(Key.USERNAME));
Assert.assertEquals("taosdata", conf.getString("password"));
Assert.assertEquals("weather", conf.getString("connection[0].table[0]"));
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl[0]"));
Assert.assertEquals("2021-01-01 00:00:00", conf.getString("beginDateTime"));
Assert.assertEquals("2021-01-01 12:00:00", conf.getString("endDateTime"));
Assert.assertEquals("_c0 > 0", conf.getString("where"));
}
@Test
public void jobInit_case02() {
// given
TDengineReader.Job job = new TDengineReader.Job();
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"connection\": [{\"querySql\":[\"select * from weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," +
"}");
job.setPluginJobConf(configuration);
// when
job.init();
// assert
Configuration conf = job.getPluginJobConf();
Assert.assertEquals("root", conf.getString(Key.USERNAME));
Assert.assertEquals("taosdata", conf.getString("password"));
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl[0]"));
Assert.assertEquals("select * from weather", conf.getString("connection[0].querySql[0]"));
}
@Test
public void jobSplit_case01() {
// given
TDengineReader.Job job = new TDengineReader.Job();
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," +
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," +
"\"where\":\"_c0 > 0\"," +
"\"beginDateTime\": \"2021-01-01 00:00:00\"," +
"\"endDateTime\": \"2021-01-01 12:00:00\"" +
"}");
job.setPluginJobConf(configuration);
// when
job.init();
List<Configuration> configurationList = job.split(1);
// assert
Assert.assertEquals(1, configurationList.size());
Configuration conf = configurationList.get(0);
Assert.assertEquals("root", conf.getString("username"));
Assert.assertEquals("taosdata", conf.getString("password"));
Assert.assertEquals("_c0 > 0", conf.getString("where"));
Assert.assertEquals("weather", conf.getString("table[0]"));
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl"));
}
@Test
public void jobSplit_case02() {
// given
TDengineReader.Job job = new TDengineReader.Job();
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"connection\": [{\"querySql\":[\"select * from weather\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\"]}]," +
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," +
"}");
job.setPluginJobConf(configuration);
// when
job.init();
List<Configuration> configurationList = job.split(1);
// assert
Assert.assertEquals(1, configurationList.size());
Configuration conf = configurationList.get(0);
Assert.assertEquals("root", conf.getString("username"));
Assert.assertEquals("taosdata", conf.getString("password"));
Assert.assertEquals("select * from weather", conf.getString("querySql[0]"));
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl"));
}
@Test
public void jobSplit_case03() {
// given
TDengineReader.Job job = new TDengineReader.Job();
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"connection\": [{\"querySql\":[\"select * from weather\",\"select * from test.meters\"],\"jdbcUrl\":[\"jdbc:TAOS-RS://master:6041/test\", \"jdbc:TAOS://master:6030/test\"]}]," +
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," +
"}");
job.setPluginJobConf(configuration);
// when
job.init();
List<Configuration> configurationList = job.split(1);
// assert
Assert.assertEquals(2, configurationList.size());
Configuration conf = configurationList.get(0);
Assert.assertEquals("root", conf.getString("username"));
Assert.assertEquals("taosdata", conf.getString("password"));
Assert.assertEquals("select * from weather", conf.getString("querySql[0]"));
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl"));
Configuration conf1 = configurationList.get(1);
Assert.assertEquals("root", conf1.getString("username"));
Assert.assertEquals("taosdata", conf1.getString("password"));
Assert.assertEquals("select * from weather", conf1.getString("querySql[0]"));
Assert.assertEquals("select * from test.meters", conf1.getString("querySql[1]"));
Assert.assertEquals("jdbc:TAOS://master:6030/test", conf1.getString("jdbcUrl"));
}
}

View File

@ -0,0 +1,52 @@
{
"job": {
"content": [
{
"reader": {
"name": "tdengine30reader",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"*"
],
"connection": [
{
"table": [
"stb1"
],
"jdbcUrl": [
"jdbc:TAOS-RS://192.168.56.105:6041/db1"
]
}
]
}
},
"writer": {
"name": "rdbmswriter",
"parameter": {
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:dm://192.168.0.72:5236"
}
],
"username": "TESTUSER",
"password": "test123456",
"table": "stb2",
"column": [
"*"
]
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,47 @@
{
"job": {
"content": [
{
"reader": {
"name": "tdengine30reader",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"ts",
"f1",
"f2",
"t1",
"t2"
],
"connection": [
{
"table": [
"stb1"
],
"jdbcUrl": [
"jdbc:TAOS-RS://192.168.56.105:6041/db1"
]
}
],
"where": "t10 = '北京朝阳望京'",
"beginDateTime": "2022-03-07 12:00:00",
"endDateTime": "2022-03-07 19:00:00"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,37 @@
{
"job": {
"content": [
{
"reader": {
"name": "tdengine30reader",
"parameter": {
"username": "root",
"password": "taosdata",
"connection": [
{
"querySql": [
"select * from stb1 where t10 = '北京朝阳望京' and _c0 >= '2022-03-07 12:00:00' and _c0 < '2022-03-07 19:00:00'"
],
"jdbcUrl": [
"jdbc:TAOS-RS://192.168.56.105:6041/db1"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,245 @@
# DataX TDengineWriter
简体中文| [English](./tdengine30writer.md)
## 1 快速介绍
TDengineWriter插件实现了写入数据到TDengine数据库目标表的功能。底层实现上TDengineWriter通过JDBC连接TDengine按照TDengine的SQL语法执行insert语句/schemaless语句将数据写入TDengine。
TDengineWriter可以作为数据迁移工具供DBA将其它数据库的数据导入到TDengine。
## 2 实现原理
TDengineWriter 通过 DataX 框架获取 Reader生成的协议数据通过JDBC Driver连接TDengine执行insert语句/schemaless语句将数据写入TDengine。
在TDengine中table可以分成超级表、子表、普通表三种类型超级表和子表包括colum和tag子表的tag列的值为固定值普通表与关系型数据库中表的概念一致。详细请参考[数据模型](https://www.taosdata.com/docs/cn/v2.0/architecture#model)
TDengineWriter支持向超级表、子表、普通表中写入数据按照table的类型和column参数中是否包含tbname使用以下方法进行写入
1. table为超级表column中指定tbname使用自动建表的insert语句使用tbname作为子表的名称。
2. table为超级表column中未指定tbname使用schemaless写入TDengine会根据超级表名、tag值计算一个子表名称。
3. table为子表使用insert语句写入ignoreTagUnmatched参数为true时忽略record中tag值与table的tag值不一致的数据。
4. table为普通表使用insert语句写入。
## 3 功能说明
### 3.1 配置样例
配置一个写入TDengine的作业
先在TDengine上创建超级表
```sql
create database if not exists test;
create table test.weather (ts timestamp, temperature int, humidity double) tags(is_normal bool, device_id binary(100), address nchar(100));
```
使用下面的Job配置将数据写入TDengine
```json
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"type": "string",
"value": "tb1"
},
{
"type": "date",
"value": "2022-02-20 12:00:01"
},
{
"type": "long",
"random": "0, 10"
},
{
"type": "double",
"random": "0, 10"
},
{
"type": "bool",
"random": "0, 50"
},
{
"type": "bytes",
"value": "abcABC123"
},
{
"type": "string",
"value": "北京朝阳望京"
}
],
"sliceRecordCount": 1
}
},
"writer": {
"name": "tdengine30writer",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"tbname",
"ts",
"temperature",
"humidity",
"is_normal",
"device_id",
"address"
],
"connection": [
{
"table": [
"weather"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/test"
}
],
"batchSize": 100,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
```
### 3.2 参数说明
* jdbcUrl
* 描述数据源的JDBC连接信息TDengine的JDBC信息请参考[Java连接器的使用](https://www.taosdata.com/docs/cn/v2.0/connector/java#url)
* 必选:是
* 默认值:无
* username
* 描述:用户名
* 必选:是
* 默认值:无
* password
* 描述:用户名的密码
* 必选:是
* 默认值:无
* table
* 描述表名的集合table应该包含column参数中的所有列tbname除外。注意column中的tbname会被当作TDengine中子表名使用。
* 必选:是
* 默认值:无
* column
* 描述字段名的集合字段的顺序应该与record中column的
* 必选:是
* 默认值:无
* batchSize
* 描述每batchSize条record为一个batch进行写入
* 必选:否
* 默认值1
* ignoreTagsUnmatched
* 描述当table为TDengine中的一张子表table具有tag值。如果数据的tag值与table的tag值不想等数据不写入到table中。
* 必选:否
* 默认值false
### 3.3 类型转换
datax中的数据类型可以映射到TDengine的数据类型
| DataX 内部类型 | TDengine 数据类型 |
| -------------- | ----------------------------------------- |
| INT | TINYINT, SMALLINT, INT |
| LONG | TIMESTAMP, TINYINT, SMALLINT, INT, BIGINT |
| DOUBLE | FLOAT, DOUBLE |
| STRING | TIMESTAMP, BINARY, NCHAR |
| BOOL | BOOL |
| DATE | TIMESTAMP |
| BYTES | BINARY |
### 3.4 各数据源到TDengine的参考示例
下面是一些数据源到TDengine进行数据迁移的示例
| 数据迁移示例 | 配置的示例 |
| ------------------ | ------------------------------------------------------------ |
| TDengine到TDengine | [超级表到超级表指定tbname](../src/test/resources/t2t-1.json) |
| TDengine到TDengine | [超级表到超级表不指定tbname](../src/test/resources/t2t-2.json) |
| TDengine到TDengine | [超级表到子表](../src/test/resources/t2t-3.json) |
| TDengine到TDengine | [普通表到普通表](../src/test/resources/t2t-4.json) |
| RDBMS到TDengine | [普通表到超级表指定tbname](../src/test/resources/dm2t-1.json) |
| RDBMS到TDengine | [普通表到超级表不指定tbname](../src/test/resources/dm2t-3.json) |
| RDBMS到TDengine | [普通表到子表](../src/test/resources/dm2t-2.json) |
| RDBMS到TDengine | [普通表到普通表](../src/test/resources/dm2t-4.json) |
| OpenTSDB到TDengine | [metric到普通表](../src/test/resources/o2t-1.json) |
## 4 性能报告
### 4.1 环境准备
#### 4.1.1 数据特征
建表语句:
单行记录类似于:
#### 4.1.2 机器参数
* 执行DataX的机器参数为:
1. cpu:
2. mem:
3. net: 千兆双网卡
4. disc: DataX 数据不落磁盘,不统计此项
* TDengine数据库机器参数为:
1. cpu:
2. mem:
3. net: 千兆双网卡
4. disc:
#### 4.1.3 DataX jvm 参数
-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
### 4.2 测试报告
#### 4.2.1 单表测试报告
| 通道数 | DataX速度(Rec/s) | DataX流量(MB/s) | DataX机器网卡流出流量(MB/s) | DataX机器运行负载 | DB网卡进入流量(MB/s) | DB运行负载 | DB TPS |
| ------ | ---------------- | --------------- | --------------------------- | ----------------- | -------------------- | ---------- | ------ |
| 1 | | | | | | | |
| 4 | | | | | | | |
| 8 | | | | | | | |
| 16 | | | | | | | |
| 32 | | | | | | | |
说明:
1.
#### 4.2.4 性能测试小结
## 5 约束限制
1.
## FAQ
### 源表和目标表的字段顺序一致吗?
是的TDengineWriter按照column中字段的顺序解析来自datax的数据。

View File

@ -0,0 +1,196 @@
# DataX TDengineWriter
[简体中文](./tdengine30writer-CN.md) | English
## 1 Quick Introduction
The TDengineWriter plugin enables writing data to the target table of the TDengine database. At the bottom level, TDengineWriter connects TDengine through JDBC, executes insert statement /schemaless statement according to TDengine SQL syntax, and writes data to TDengine.
TDengineWriter can be used as a data migration tool for DBAs to import data from other databases into TDengine.
## 2 Implementation
TDengineWriter obtains the protocol data generated by Reader through DataX framework, connects to TDengine through JDBC Driver, executes insert statement /schemaless statement, and writes the data to TDengine.
In TDengine, table can be divided into super table, sub-table and ordinary table. Super table and sub-table include Colum and Tag. The value of tag column of sub-table is fixed value. (details please refer to: [data model](https://www.taosdata.com/docs/cn/v2.0/architecture#model))
The TDengineWriter can write data to super tables, sub-tables, and ordinary tables using the following methods based on the type of the table and whether the column parameter contains TBName:
1. Table is a super table and column specifies tbname: use the automatic insert statement to create the table and use tbname as the name of the sub-table.
2. Table is a super table and column does not contain tbname: use schemaless to write the table. TDengine will auto-create a tbname based on the super table name and tag value.
3. Table is a sub-table: Use insert statement to write, ignoreTagUnmatched parameter is true, ignore data in record whose tag value is inconsistent with that of table.
4. Table is a common table: use insert statement to write data.
## 3 Features Introduction
### 3.1 Sample
Configure a job to write to TDengine
Create a supertable on TDengine:
```sql
create database if not exists test;
create table test.weather (ts timestamp, temperature int, humidity double) tags(is_normal bool, device_id binary(100), address nchar(100));
```
Write data to TDengine using the following Job configuration:
```json
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"type": "string",
"value": "tb1"
},
{
"type": "date",
"value": "2022-02-20 12:00:01"
},
{
"type": "long",
"random": "0, 10"
},
{
"type": "double",
"random": "0, 10"
},
{
"type": "bool",
"random": "0, 50"
},
{
"type": "bytes",
"value": "abcABC123"
},
{
"type": "string",
"value": "北京朝阳望京"
}
],
"sliceRecordCount": 1
}
},
"writer": {
"name": "tdengine30writer",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"tbname",
"ts",
"temperature",
"humidity",
"is_normal",
"device_id",
"address"
],
"connection": [
{
"table": [
"weather"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/test"
}
],
"batchSize": 100,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
```
### 3.2 Configuration
* jdbcUrl
* Descrption: Data source JDBC connection information, TDengine JDBC information please refer to: [Java connector](https://www.taosdata.com/docs/cn/v2.0/connector/java#url)
* Required: yes
* Default: none
* username
* Descrption: username
* Required: yes
* Default: none
* password
* Descrption: password of username
* Required: yes
* Default: none
* table
* Descrption: A list of table names that should contain all of the columns in the column parameter (except tbname). Note that tbname in column is used as the TDengine sub-table name.
* Required: yes
* Default: none
* column
* Descrption: A list of field names, the order of the fields should be the column in the record
* Required: yes
* Default: none
* batchSize
* Descrption: Each batchSize record is written to a batch
* Required: no
* Default: 1
* ignoreTagsUnmatched
* Descrption: When table is a sub-table in TDengine, table has a tag value. If the tag value of the data and the tag value of the table are not equal, the data is not written to the table.
* Required: no
* Default: false
#### 3.3 Type Convert
Data types in datax that can be mapped to data types in TDengine
| DataX Type | TDengine Type |
| ---------- | ----------------------------------------- |
| INT | TINYINT, SMALLINT, INT |
| LONG | TIMESTAMP, TINYINT, SMALLINT, INT, BIGINT |
| DOUBLE | FLOAT, DOUBLE |
| STRING | TIMESTAMP, BINARY, NCHAR |
| BOOL | BOOL |
| DATE | TIMESTAMP |
| BYTES | BINARY |
### 3.2 From MongoDB to TDengine
Here are some examples of data sources migrating to TDengine
| Sample | Configuration |
| -------------------- | ------------------------------------------------------------ |
| TDengine to TDengine | [super table to super table with tbname](../src/test/resources/t2t-1.json) |
| TDengine to TDengine | [super table to super table without tbname](../src/test/resources/t2t-2.json) |
| TDengine to TDengine | [super table to sub-table](../src/test/resources/t2t-3.json) |
| TDengine to TDengine | [table to table](../src/test/resources/t2t-4.json) |
| RDBMS to TDengine | [table to super table with tbname](../src/test/resources/dm2t-1.json) |
| RDBMS to TDengine | [table to super table without tbname](../src/test/resources/dm2t-2.json) |
| RDBMS to TDengine | [table to sub-table](../src/test/resources/dm2t-3.json) |
| RDBMS to TDengine | [table to table](../src/test/resources/dm2t-4.json) |
| OpenTSDB to TDengine | [metric to table](../src/test/resources/o2t-1.json) |
## 4 Restriction
## FAQ
### Do columns in source table and columns in target table must be in the same order?
Yes, TDengineWriter parses the data from the Datax in the order of the fields in the column.

112
tdengine30writer/pom.xml Normal file
View File

@ -0,0 +1,112 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.alibaba.datax.tdengine30writer</groupId>
<artifactId>tdengine30writer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.2.9</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3-version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12.4</version>
<configuration>
<!-- 包含哪些测试用例 -->
<includes>
<include>**/*Test.java</include>
</includes>
<!-- 不包含哪些测试用例 -->
<excludes>
</excludes>
<testFailureIgnore>true</testFailureIgnore>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,34 @@
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/tdengine30writer</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>tdengine30writer-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/tdengine30writer</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/tdengine30writer/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,22 @@
package com.alibaba.datax.plugin.writer.tdengine30writer;
public class ColumnMeta {
String field;
String type;
int length;
String note;
boolean isTag;
boolean isPrimaryKey;
Object value;
@Override
public String toString() {
return "ColumnMeta{" + "field='" + field + '\'' + ", type='" + type + '\'' + ", length=" + length + ", note='" +
note + '\'' + ", isTag=" + isTag + ", isPrimaryKey=" + isPrimaryKey + ", value=" + value + '}';
}
@Override
public boolean equals(Object obj) {
return obj instanceof ColumnMeta && this.field.equals(((ColumnMeta) obj).field);
}
}

View File

@ -0,0 +1,38 @@
package com.alibaba.datax.plugin.writer.tdengine30writer;
public class Constants {
public static final String DEFAULT_USERNAME = "root";
public static final String DEFAULT_PASSWORD = "taosdata";
public static final int DEFAULT_BATCH_SIZE = 1;
public static final boolean DEFAULT_IGNORE_TAGS_UNMATCHED = false;
// ----------------- tdengine version -------------------
public static final String SERVER_VERSION_2 = "2";
public static final String SERVER_VERSION_3 = "3";
public static final String SERVER_VERSION = "server_version()";
// ----------------- schema -------------------
public static final String INFORMATION_SCHEMA = "information_schema";
public static final String INFORMATION_SCHEMA_TABLE_INS_DATABASES = "ins_databases";
public static final String INFORMATION_SCHEMA_TABLE_INS_STABLES = "ins_stables";
public static final String INFORMATION_SCHEMA_TABLE_INS_TABLES = "ins_tables";
public static final String INFORMATION_SCHEMA_COMMA = ".";
// ----------------- table meta -------------------
public static final String TABLE_META_DB_NAME = "db_name";
public static final String TABLE_META_SUP_TABLE_NAME = "stable_name";
public static final String TABLE_META_TABLE_NAME = "table_name";
public static final String TABLE_META_COLUMNS = "columns";
public static final String TABLE_META_TAGS = "tags";
public static final String COLUMN_META_FIELD = "field";
public static final String COLUMN_META_TYPE = "type";
public static final String COLUMN_META_LENGTH = "length";
public static final String COLUMN_META_NOTE = "note";
public static final String COLUMN_META_NOTE_TAG = "TAG";
// ----------------- database meta -------------------
public static final String DATABASE_META_PRECISION = "precision";
}

View File

@ -0,0 +1,8 @@
package com.alibaba.datax.plugin.writer.tdengine30writer;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
public interface DataHandler {
int handle(RecordReceiver lineReceiver, TaskPluginCollector collector);
}

View File

@ -0,0 +1,610 @@
package com.alibaba.datax.plugin.writer.tdengine30writer;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.taosdata.jdbc.SchemalessWriter;
import com.taosdata.jdbc.enums.SchemalessProtocolType;
import com.taosdata.jdbc.enums.SchemalessTimestampType;
import com.taosdata.jdbc.utils.StringUtils;
import com.taosdata.jdbc.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.*;
import java.util.Date;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class DefaultDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
static {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
} catch (ClassNotFoundException e) {
LOG.error(e.getMessage(), e);
}
}
private final TaskPluginCollector taskPluginCollector;
private final String username;
private final String password;
private final String jdbcUrl;
private final int batchSize;
private final boolean ignoreTagsUnmatched;
private final List<String> tables;
private final List<String> columns;
private SchemaManager schemaManager;
private final SchemaCache schemaCache;
//private Map<String, TableMeta> tableMetas;
private Map<String, List<ColumnMeta>> tbnameColumnMetasMap;
public void setSchemaManager(SchemaManager schemaManager) {
this.schemaManager = schemaManager;
}
public DefaultDataHandler(Configuration configuration, TaskPluginCollector taskPluginCollector) {
this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME);
this.password = configuration.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD);
this.jdbcUrl = configuration.getString(Key.JDBC_URL);
this.batchSize = configuration.getInt(Key.BATCH_SIZE, Constants.DEFAULT_BATCH_SIZE);
this.tables = configuration.getList(Key.TABLE, String.class);
this.columns = configuration.getList(Key.COLUMN, String.class);
this.ignoreTagsUnmatched = configuration.getBool(Key.IGNORE_TAGS_UNMATCHED,
Constants.DEFAULT_IGNORE_TAGS_UNMATCHED);
this.taskPluginCollector = taskPluginCollector;
this.schemaCache = SchemaCache.getInstance(configuration);
}
@Override
public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) {
int count = 0;
int affectedRows = 0;
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established.");
String dbname = TDengineWriter.parseDatabaseFromJdbcUrl(jdbcUrl);
Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery("select " + Constants.SERVER_VERSION);
// 光标移动一行
resultSet.next();
String serverVersion = resultSet.getString(Constants.SERVER_VERSION);
if (StringUtils.isEmpty(dbname)) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION,
"error dbname parsed from jdbcUrl");
}
LOG.info("tdengine server version[{}] dbname[{}]", serverVersion, dbname);
// 版本判断确定使用 SchemaManager 的示例
if (serverVersion.startsWith(Constants.SERVER_VERSION_2)) {
this.schemaManager = new SchemaManager(conn);
} else {
this.schemaManager = new Schema3_0Manager(conn, dbname);
}
List<Record> recordBatch = new ArrayList<>();
Record record;
for (int i = 1; (record = lineReceiver.getFromReader()) != null; i++) {
if (i % batchSize != 0) {
recordBatch.add(record);
} else {
try {
recordBatch.add(record);
affectedRows += writeBatch(conn, recordBatch);
} catch (SQLException e) {
LOG.warn("use one row insert. because:" + e.getMessage());
affectedRows += writeEachRow(conn, recordBatch);
}
recordBatch.clear();
}
count++;
}
if (!recordBatch.isEmpty()) {
try {
affectedRows += writeBatch(conn, recordBatch);
} catch (SQLException e) {
LOG.warn("use one row insert. because:" + e.getMessage());
affectedRows += writeEachRow(conn, recordBatch);
}
recordBatch.clear();
}
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
if (affectedRows != count) {
LOG.error(
"write record missing or incorrect happened, affectedRows: " + affectedRows + ", total: " + count);
}
return affectedRows;
}
private int writeEachRow(Connection conn, List<Record> recordBatch) {
int affectedRows = 0;
for (Record record : recordBatch) {
List<Record> recordList = new ArrayList<>();
recordList.add(record);
try {
affectedRows += writeBatch(conn, recordList);
} catch (SQLException e) {
LOG.error(e.getMessage());
this.taskPluginCollector.collectDirtyRecord(record, e);
}
}
return affectedRows;
}
/**
* table: [ "stb1", "stb2", "tb1", "tb2", "t1" ]
* stb1[ts,f1,f2] tags:[t1]
* stb2[ts,f1,f2,f3] tags:[t1,t2]
* 1. tables 表的的类型分成stb(super table)/tb(sub table)/t(original table)
* 2. 对于stb自动建表/schemaless
* 2.1: data中有tbname字段, 例如data: [ts, f1, f2, f3, t1, t2, tbname] tbColumn: [ts, f1, f2, t1] => insert into tbname using stb1 tags(t1) values(ts, f1, f2)
* 2.2: data中没有tbname字段例如data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => schemaless: stb1,t1=t1 f1=f1,f2=f2 ts, 没有批量写
* 3. 对于tb拼sql例如data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => insert into tb(ts, f1, f2) values(ts, f1, f2)
* 4. 对于t拼sql例如data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, f3, t1, t2] insert into t(ts, f1, f2, f3, t1, t2) values(ts, f1, f2, f3, t1, t2)
*/
public int writeBatch(Connection conn, List<Record> recordBatch) throws SQLException {
int affectedRows = 0;
for (String table : tables) {
TableMeta tableMeta = this.schemaCache.getTableMeta(table);
switch (tableMeta.tableType) {
case SUP_TABLE: {
if (columns.contains("tbname")) {
affectedRows += writeBatchToSupTableBySQL(conn, table, recordBatch);
} else {
Map<String, String> tag2Tbname = schemaManager.loadTagTableNameMap(table);
affectedRows += writeBatchToSupTableWithoutTbname(conn, table, recordBatch, tag2Tbname);
}
}
break;
case SUB_TABLE:
affectedRows += writeBatchToSubTable(conn, table, recordBatch);
break;
case NML_TABLE:
default:
affectedRows += writeBatchToNormalTable(conn, table, recordBatch);
}
}
return affectedRows;
}
private int writeBatchToSupTableWithoutTbname(Connection conn, String table, List<Record> recordBatch,
Map<String, String> tag2Tbname) throws SQLException {
List<ColumnMeta> columnMetas = schemaCache.getColumnMetaList(table, TableType.SUP_TABLE);
List<Record> subTableExist = filterSubTableExistRecords(recordBatch, columnMetas, tag2Tbname);
List<Record> subTableNotExist = filterSubTableNotExistRecords(recordBatch, columnMetas, tag2Tbname);
int affectedRows = 0;
Map<String, List<Record>> subTableRecordsMap = splitRecords(subTableExist, columnMetas, tag2Tbname);
for (String subTable : subTableRecordsMap.keySet()) {
List<Record> subTableRecords = subTableRecordsMap.get(subTable);
affectedRows += writeBatchToNormalTable(conn, subTable, subTableRecords);
}
if (!subTableNotExist.isEmpty())
affectedRows += writeBatchToSupTableBySchemaless(conn, table, subTableNotExist);
return affectedRows;
}
private List<Record> filterSubTableExistRecords(List<Record> recordBatch, List<ColumnMeta> columnMetas,
Map<String, String> tag2Tbname) {
return recordBatch.stream().filter(record -> {
String tagStr = getTagString(columnMetas, record);
return tag2Tbname.containsKey(tagStr);
}).collect(Collectors.toList());
}
private List<Record> filterSubTableNotExistRecords(List<Record> recordBatch, List<ColumnMeta> columnMetas,
Map<String, String> tag2Tbname) {
return recordBatch.stream().filter(record -> {
String tagStr = getTagString(columnMetas, record);
return !tag2Tbname.containsKey(tagStr);
}).collect(Collectors.toList());
}
private Map<String, List<Record>> splitRecords(List<Record> subTableExist, List<ColumnMeta> columnMetas,
Map<String, String> tag2Tbname) {
Map<String, List<Record>> ret = new HashMap<>();
for (Record record : subTableExist) {
String tagstr = getTagString(columnMetas, record);
String tbname = tag2Tbname.get(tagstr);
if (ret.containsKey(tbname)) {
ret.get(tbname).add(record);
} else {
List<Record> list = new ArrayList<>();
list.add(record);
ret.put(tbname, list);
}
}
return ret;
}
private String getTagString(List<ColumnMeta> columnMetas, Record record) {
return IntStream.range(0, columnMetas.size()).mapToObj(colIndex -> {
ColumnMeta columnMeta = columnMetas.get(colIndex);
if (columnMeta.isTag) {
Column column = record.getColumn(colIndex);
try {
switch (columnMeta.type) {
case "TINYINT":
case "SMALLINT":
case "INT":
case "BIGINT":
return column.asLong().toString();
default:
return column.asString();
}
} catch (Exception e) {
LOG.error(
"failed to get Tag, colIndex: " + colIndex + ", ColumnMeta: " + columnMeta + ", record: " +
record, e);
}
}
return "";
}).collect(Collectors.joining());
}
/**
* insert into record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
*/
private int writeBatchToSupTableBySQL(Connection conn, String table, List<Record> recordBatch) throws SQLException {
List<ColumnMeta> columnMetas = this.schemaCache.getColumnMetaList(table, TableType.SUP_TABLE);
StringBuilder sb = new StringBuilder("insert into");
for (Record record : recordBatch) {
sb.append(" `")
.append(record.getColumn(indexOf("tbname")).asString())
.append("` using ")
.append(table)
.append(" tags")
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return colMeta.isTag;
}).map(colMeta -> {
return buildColumnValue(colMeta, record);
}).collect(Collectors.joining(",", "(", ")")))
.append(" ")
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return !colMeta.isTag;
}).map(colMeta -> {
return colMeta.field;
}).collect(Collectors.joining(",", "(", ")")))
.append(" values")
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return !colMeta.isTag;
}).map(colMeta -> {
return buildColumnValue(colMeta, record);
}).collect(Collectors.joining(",", "(", ")")));
}
String sql = sb.toString();
try {
return executeUpdate(conn, sql);
} catch (SQLException e) {
LOG.error("failed to writeBatchToSupTableBySQL, table: " + table + ", column meta: " + columnMetas +
", cause: " + e.getMessage(), e);
throw e;
}
}
private int executeUpdate(Connection conn, String sql) throws SQLException {
int count;
try (Statement stmt = conn.createStatement()) {
LOG.debug(">>> " + sql);
count = stmt.executeUpdate(sql);
}
return count;
}
private String buildColumnValue(ColumnMeta colMeta, Record record) {
Column column = record.getColumn(indexOf(colMeta.field));
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
switch (column.getType()) {
case DATE: {
Date value = column.asDate();
switch (timestampPrecision) {
case MILLISEC:
return "" + (value.getTime());
case MICROSEC:
return "" + (value.getTime() * 1000);
case NANOSEC:
return "" + (value.getTime() * 1000_000);
default:
return "'" + column.asString() + "'";
}
}
case BYTES:
case STRING:
if (colMeta.type.equals("TIMESTAMP"))
return "\"" + column.asString() + "\"";
String value = column.asString();
if (value == null)
return "NULL";
return "\'" + Utils.escapeSingleQuota(value) + "\'";
case NULL:
case BAD:
return "NULL";
case BOOL:
case DOUBLE:
case INT:
case LONG:
default:
return column.asString();
}
}
/**
* table: ["stb1"], column: ["ts", "f1", "f2", "t1"]
* data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => schemaless: stb1,t1=t1 f1=f1,f2=f2 ts
*/
private int writeBatchToSupTableBySchemaless(Connection conn, String table,
List<Record> recordBatch) throws SQLException {
int count = 0;
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
List<ColumnMeta> columnMetaList = this.schemaCache.getColumnMetaList(table, TableType.SUP_TABLE);
ColumnMeta ts = columnMetaList.stream().filter(colMeta -> colMeta.isPrimaryKey).findFirst().get();
List<String> lines = new ArrayList<>();
for (Record record : recordBatch) {
StringBuilder sb = new StringBuilder();
sb.append(table)
.append(",")
.append(columnMetaList.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return colMeta.isTag;
}).map(colMeta -> {
String value = record.getColumn(indexOf(colMeta.field)).asString();
if (value.contains(" "))
value = value.replace(" ", "\\ ");
return colMeta.field + "=" + value;
}).collect(Collectors.joining(",")))
.append(" ")
.append(columnMetaList.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return !colMeta.isTag && !colMeta.isPrimaryKey;
}).map(colMeta -> {
return colMeta.field + "=" + buildSchemalessColumnValue(colMeta, record);
// return colMeta.field + "=" + record.getColumn(indexOf(colMeta.field)).asString();
}).collect(Collectors.joining(",")))
.append(" ");
// timestamp
Column column = record.getColumn(indexOf(ts.field));
Object tsValue = column.getRawData();
if (column.getType() == Column.Type.DATE && tsValue instanceof Date) {
long time = column.asDate().getTime();
switch (timestampPrecision) {
case NANOSEC:
sb.append(time * 1000000);
break;
case MICROSEC:
sb.append(time * 1000);
break;
case MILLISEC:
default:
sb.append(time);
}
} else if (column.getType() == Column.Type.STRING) {
sb.append(Utils.parseTimestamp(column.asString()));
} else {
sb.append(column.asLong());
}
String line = sb.toString();
LOG.debug(">>> " + line);
lines.add(line);
count++;
}
SchemalessWriter writer = new SchemalessWriter(conn);
SchemalessTimestampType timestampType;
switch (timestampPrecision) {
case NANOSEC:
timestampType = SchemalessTimestampType.NANO_SECONDS;
break;
case MICROSEC:
timestampType = SchemalessTimestampType.MICRO_SECONDS;
break;
case MILLISEC:
timestampType = SchemalessTimestampType.MILLI_SECONDS;
break;
default:
timestampType = SchemalessTimestampType.NOT_CONFIGURED;
}
writer.write(lines, SchemalessProtocolType.LINE, timestampType);
LOG.warn("schemalessWriter does not return affected rows!");
return count;
}
private long dateAsLong(Column column) {
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
long time = column.asDate().getTime();
switch (timestampPrecision) {
case NANOSEC:
return time * 1000000;
case MICROSEC:
return time * 1000;
case MILLISEC:
default:
return time;
}
}
private String buildSchemalessColumnValue(ColumnMeta colMeta, Record record) {
Column column = record.getColumn(indexOf(colMeta.field));
switch (column.getType()) {
case DATE:
if (colMeta.type.equals("TIMESTAMP"))
return dateAsLong(column) + "i64";
return "L'" + column.asString() + "'";
case NULL:
case BAD:
return "NULL";
case DOUBLE: {
if (colMeta.type.equals("FLOAT"))
return column.asString() + "f32";
if (colMeta.type.equals("DOUBLE"))
return column.asString() + "f64";
}
case INT:
case LONG: {
if (colMeta.type.equals("TINYINT"))
return column.asString() + "i8";
if (colMeta.type.equals("SMALLINT"))
return column.asString() + "i16";
if (colMeta.type.equals("INT"))
return column.asString() + "i32";
if (colMeta.type.equals("BIGINT"))
return column.asString() + "i64";
}
case BYTES:
case STRING:
if (colMeta.type.equals("TIMESTAMP"))
return column.asString() + "i64";
String value = column.asString();
value = value.replace("\"", "\\\"");
if (colMeta.type.startsWith("BINARY") || colMeta.type.startsWith("VARCHAR"))
return "\"" + value + "\"";
if (colMeta.type.startsWith("NCHAR"))
return "L\"" + value + "\"";
case BOOL:
default:
return column.asString();
}
}
/**
* table: ["tb1"], column: [tbname, ts, f1, f2, t1]
* if contains("tbname") and tbname != tb1 continue;
* else if t1 != record[idx(t1)] or t2 != record[idx(t2)]... continue;
* else
* insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)])
*/
private int writeBatchToSubTable(Connection conn, String table, List<Record> recordBatch) throws SQLException {
List<ColumnMeta> columnMetas = this.schemaCache.getColumnMetaList(table, TableType.SUB_TABLE);
StringBuilder sb = new StringBuilder();
sb.append("insert into `")
.append(table)
.append("` ")
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return !colMeta.isTag;
}).map(colMeta -> {
return colMeta.field;
}).collect(Collectors.joining(",", "(", ")")))
.append(" values");
int validRecords = 0;
for (Record record : recordBatch) {
if (columns.contains("tbname") && !table.equals(record.getColumn(indexOf("tbname")).asString()))
continue;
boolean tagsAllMatch = columnMetas.stream()
.filter(colMeta -> columns.contains(colMeta.field))
.filter(colMeta -> {
return colMeta.isTag;
})
.allMatch(colMeta -> {
Column column = record.getColumn(indexOf(colMeta.field));
boolean equals = equals(column, colMeta);
return equals;
});
if (ignoreTagsUnmatched && !tagsAllMatch)
continue;
sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return !colMeta.isTag;
}).map(colMeta -> {
return buildColumnValue(colMeta, record);
}).collect(Collectors.joining(", ", "(", ") ")));
validRecords++;
}
if (validRecords == 0) {
LOG.warn("no valid records in this batch");
return 0;
}
String sql = sb.toString();
return executeUpdate(conn, sql);
}
private boolean equals(Column column, ColumnMeta colMeta) {
switch (column.getType()) {
case BOOL:
return column.asBoolean().equals(Boolean.valueOf(colMeta.value.toString()));
case INT:
case LONG:
return column.asLong().equals(Long.valueOf(colMeta.value.toString()));
case DOUBLE:
return column.asDouble().equals(Double.valueOf(colMeta.value.toString()));
case NULL:
return colMeta.value == null;
case DATE:
return column.asDate().getTime() == ((Timestamp) colMeta.value).getTime();
case BAD:
case BYTES:
return Arrays.equals(column.asBytes(), (byte[]) colMeta.value);
case STRING:
default:
return column.asString().equals(colMeta.value.toString());
}
}
/**
* table: ["weather"], column: ["ts, f1, f2, f3, t1, t2"]
* sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...)
*/
private int writeBatchToNormalTable(Connection conn, String table, List<Record> recordBatch) throws SQLException {
List<ColumnMeta> columnMetas = this.schemaCache.getColumnMetaList(table, TableType.NML_TABLE);
StringBuilder sb = new StringBuilder();
sb.append("insert into `")
.append(table)
.append("` ")
.append(columnMetas.stream()
.filter(colMeta -> !colMeta.isTag)
.filter(colMeta -> columns.contains(colMeta.field))
.map(colMeta -> {
return colMeta.field;
})
.collect(Collectors.joining(",", "(", ")")))
.append(" values ");
for (Record record : recordBatch) {
sb.append(columnMetas.stream()
.filter(colMeta -> !colMeta.isTag)
.filter(colMeta -> columns.contains(colMeta.field))
.map(colMeta -> {
return buildColumnValue(colMeta, record);
})
.collect(Collectors.joining(",", "(", ")")));
}
String sql = sb.toString();
return executeUpdate(conn, sql);
}
private int indexOf(String colName) throws DataXException {
for (int i = 0; i < columns.size(); i++) {
if (columns.get(i).equals(colName))
return i;
}
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION,
"cannot find col: " + colName + " in columns: " + columns);
}
}

View File

@ -0,0 +1,18 @@
package com.alibaba.datax.plugin.writer.tdengine30writer;
public class Key {
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String CONNECTION = "connection";
public static final String BATCH_SIZE = "batchSize";
public static final String TABLE = "table";
public static final String JDBC_URL = "jdbcUrl";
public static final String COLUMN = "column";
public static final String IGNORE_TAGS_UNMATCHED = "ignoreTagsUnmatched";
public static final String BEGIN_DATETIME = "beginDateTime";
public static final String END_DATETIME = "endDateTime";
public static final String WHERE = "where";
public static final String QUERY_SQL = "querySql";
public static final String MANDATORY_ENCODING = "mandatoryEncoding";
}

View File

@ -0,0 +1,97 @@
package com.alibaba.datax.plugin.writer.tdengine30writer;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.taosdata.jdbc.SchemalessWriter;
import com.taosdata.jdbc.enums.SchemalessProtocolType;
import com.taosdata.jdbc.enums.SchemalessTimestampType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
public class OpentsdbDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(OpentsdbDataHandler.class);
private SchemalessWriter writer;
private String jdbcUrl;
private String user;
private String password;
int batchSize;
public OpentsdbDataHandler(Configuration config) {
// opentsdb json protocol use JNI and schemaless API to write
this.jdbcUrl = config.getString(Key.JDBC_URL);
this.user = config.getString(Key.USERNAME, "root");
this.password = config.getString(Key.PASSWORD, "taosdata");
this.batchSize = config.getInt(Key.BATCH_SIZE, Constants.DEFAULT_BATCH_SIZE);
}
@Override
public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) {
int count = 0;
try (Connection conn = DriverManager.getConnection(jdbcUrl, user, password);) {
LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + user + "] established.");
writer = new SchemalessWriter(conn);
count = write(lineReceiver, batchSize);
} catch (Exception e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e);
}
return count;
}
private int write(RecordReceiver lineReceiver, int batchSize) throws DataXException {
int recordIndex = 1;
try {
Record record;
StringBuilder sb = new StringBuilder();
while ((record = lineReceiver.getFromReader()) != null) {
if (batchSize == 1) {
String jsonData = recordToString(record);
LOG.debug(">>> " + jsonData);
writer.write(jsonData, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
} else if (recordIndex % batchSize == 1) {
sb.append("[").append(recordToString(record)).append(",");
} else if (recordIndex % batchSize == 0) {
sb.append(recordToString(record)).append("]");
String jsonData = sb.toString();
LOG.debug(">>> " + jsonData);
writer.write(jsonData, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
sb.delete(0, sb.length());
} else {
sb.append(recordToString(record)).append(",");
}
recordIndex++;
}
if (sb.length() != 0 && sb.charAt(0) == '[') {
String jsonData = sb.deleteCharAt(sb.length() - 1).append("]").toString();
LOG.debug(">>> " + jsonData);
writer.write(jsonData, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
}
} catch (Exception e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e);
}
return recordIndex - 1;
}
private String recordToString(Record record) {
int recordLength = record.getColumnNumber();
if (0 == recordLength) {
return "";
}
Column column;
StringBuilder sb = new StringBuilder();
for (int i = 0; i < recordLength; i++) {
column = record.getColumn(i);
sb.append(column.asString()).append("\t");
}
sb.setLength(sb.length() - 1);
return sb.toString();
}
}

View File

@ -0,0 +1,190 @@
package com.alibaba.datax.plugin.writer.tdengine30writer;
import com.alibaba.datax.common.exception.DataXException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 适配 TDengine 3.0 SchemaManager
*/
public class Schema3_0Manager extends SchemaManager {
private static final Logger LOG = LoggerFactory.getLogger(Schema3_0Manager.class);
private final String dbname;
public Schema3_0Manager(Connection conn, String dbname) {
super(conn);
this.dbname = dbname;
}
@Override
public Map<String, TableMeta> loadTableMeta(List<String> tables) throws DataXException {
Map<String, TableMeta> tableMetas = new HashMap<>();
try (Statement stmt = conn.createStatement()) {
StringBuilder sb = new StringBuilder();
sb.append("select * from ")
.append(Constants.INFORMATION_SCHEMA)
.append(Constants.INFORMATION_SCHEMA_COMMA)
.append(Constants.INFORMATION_SCHEMA_TABLE_INS_STABLES)
.append(" where db_name = ")
.append(getDbnameForSqlQuery())
.append(" and stable_name in ")
.append(tables.stream().map(t -> "'" + t + "'").collect(Collectors.joining(",", "(", ")")));
ResultSet rs = stmt.executeQuery(sb.toString());
while (rs.next()) {
TableMeta tableMeta = buildSupTableMeta(rs);
if (!tables.contains(tableMeta.tbname))
continue;
tableMetas.put(tableMeta.tbname, tableMeta);
}
sb = new StringBuilder();
sb.append("select * from ")
.append(Constants.INFORMATION_SCHEMA)
.append(Constants.INFORMATION_SCHEMA_COMMA)
.append(Constants.INFORMATION_SCHEMA_TABLE_INS_TABLES)
.append(" where db_name = ")
.append(getDbnameForSqlQuery())
.append(" and table_name in ")
.append(tables.stream().map(t -> "'" + t + "'").collect(Collectors.joining(",", "(", ")")));
rs = stmt.executeQuery(sb.toString());
while (rs.next()) {
TableMeta tableMeta = buildSubTableMeta(rs);
if (!tables.contains(tableMeta.tbname))
continue;
tableMetas.put(tableMeta.tbname, tableMeta);
}
for (String tbname : tables) {
if (!tableMetas.containsKey(tbname)) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION,
"table metadata of " + tbname + " is empty!");
}
}
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
return tableMetas;
}
private String getDbnameForSqlQuery() {
return "\"" + dbname + "\"";
}
@Override
protected TableMeta buildSupTableMeta(ResultSet rs) throws SQLException {
TableMeta tableMeta = new TableMeta();
tableMeta.tableType = TableType.SUP_TABLE;
tableMeta.tbname = rs.getString(Constants.TABLE_META_SUP_TABLE_NAME);
tableMeta.columns = rs.getInt(Constants.TABLE_META_COLUMNS);
tableMeta.tags = rs.getInt(Constants.TABLE_META_TAGS);
// tableMeta.tables = rs.getInt("tables"); // 直接从 ins_stables 查不到子表数量
LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta);
return tableMeta;
}
@Override
protected TableMeta buildSubTableMeta(ResultSet rs) throws SQLException {
TableMeta tableMeta = new TableMeta();
String stable_name = rs.getString(Constants.TABLE_META_SUP_TABLE_NAME);
tableMeta.tableType = StringUtils.isBlank(stable_name) ? TableType.NML_TABLE : TableType.SUB_TABLE;
tableMeta.tbname = rs.getString(Constants.TABLE_META_TABLE_NAME);
tableMeta.columns = rs.getInt(Constants.TABLE_META_COLUMNS);
tableMeta.stable_name = StringUtils.isBlank(stable_name) ? null : stable_name;
LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta);
return tableMeta;
}
@Override
protected ColumnMeta buildColumnMeta(ResultSet rs, boolean isPrimaryKey) throws SQLException {
ColumnMeta columnMeta = new ColumnMeta();
columnMeta.field = rs.getString(Constants.COLUMN_META_FIELD);
columnMeta.type = rs.getString(Constants.COLUMN_META_TYPE);
columnMeta.length = rs.getInt(Constants.COLUMN_META_LENGTH);
columnMeta.note = rs.getString(Constants.COLUMN_META_NOTE);
columnMeta.isTag = Constants.COLUMN_META_NOTE_TAG.equals(columnMeta.note);
// columnMeta.isPrimaryKey = "ts".equals(columnMeta.field);
columnMeta.isPrimaryKey = isPrimaryKey;
return columnMeta;
}
@Override
public TimestampPrecision loadDatabasePrecision() throws DataXException {
if (this.precision != null)
return this.precision;
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery(
"select * from " + Constants.INFORMATION_SCHEMA + Constants.INFORMATION_SCHEMA_COMMA +
Constants.INFORMATION_SCHEMA_TABLE_INS_DATABASES + " where name = " +
getDbnameForSqlQuery());
while (rs.next()) {
String precision = rs.getString(Constants.DATABASE_META_PRECISION);
switch (precision) {
case "ns":
this.precision = TimestampPrecision.NANOSEC;
break;
case "us":
this.precision = TimestampPrecision.MICROSEC;
break;
case "ms":
default:
this.precision = TimestampPrecision.MILLISEC;
}
}
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
return this.precision;
}
@Override
public Map<String, String> loadTagTableNameMap(String table) throws SQLException {
if (tags2tbnameMaps.containsKey(table))
return tags2tbnameMaps.get(table);
Map<String, String> tags2tbname = new HashMap<>();
try (Statement stmt = conn.createStatement()) {
// describe table
List<String> tags = new ArrayList<>();
ResultSet rs = stmt.executeQuery("describe " + table);
while (rs.next()) {
String note = rs.getString(Constants.COLUMN_META_NOTE);
if (Constants.COLUMN_META_NOTE_TAG.equals(note)) {
tags.add(rs.getString(Constants.COLUMN_META_FIELD));
}
}
// select distinct tbname, t1, t2 from stb
rs = stmt.executeQuery("select distinct " + String.join(",", tags) + ",tbname from " + table);
while (rs.next()) {
ResultSet finalRs = rs;
String tagStr = tags.stream().map(t -> {
try {
return finalRs.getString(t);
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
return "NULL";
}).collect(Collectors.joining(TAG_TABLE_NAME_MAP_KEY_SPLITTER));
String tbname = rs.getString("tbname");
tags2tbname.put(tagStr, tbname);
}
}
tags2tbnameMaps.put(table, tags2tbname);
return tags2tbname;
}
}

View File

@ -0,0 +1,162 @@
package com.alibaba.datax.plugin.writer.tdengine30writer;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* Schema cache for TDengine 3.X
*/
public final class SchemaCache {
private static final Logger log = LoggerFactory.getLogger(TDengineWriter.Job.class);
private static volatile SchemaCache instance;
private static Configuration config;
private static Connection conn;
private static String dbname;
// table name -> TableMeta
private static final Map<String, TableMeta> tableMetas = new LinkedHashMap<>();
// table name ->List<ColumnMeta>
private static final Map<String, List<ColumnMeta>> columnMetas = new LinkedHashMap<>();
private SchemaCache(Configuration config) {
SchemaCache.config = config;
// connect
final String user = config.getString(Key.USERNAME, Constants.DEFAULT_USERNAME);
final String pass = config.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD);
Configuration conn = Configuration.from(config.getList(Key.CONNECTION).get(0).toString());
final String url = conn.getString(Key.JDBC_URL);
try {
SchemaCache.conn = DriverManager.getConnection(url, user, pass);
} catch (SQLException e) {
throw DataXException.asDataXException(
"failed to connect to url: " + url + ", cause: {" + e.getMessage() + "}");
}
dbname = TDengineWriter.parseDatabaseFromJdbcUrl(url);
SchemaManager schemaManager = new Schema3_0Manager(SchemaCache.conn, dbname);
// init table meta cache and load
final List<String> tables = conn.getList(Key.TABLE, String.class);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
// init column meta cache
SchemaCache.tableMetas.putAll(tableMetas);
for (String table : tableMetas.keySet()) {
SchemaCache.columnMetas.put(table, new ArrayList<>());
}
}
public static SchemaCache getInstance(Configuration originConfig) {
if (instance == null) {
synchronized (SchemaCache.class) {
if (instance == null) {
instance = new SchemaCache(originConfig);
}
}
}
return instance;
}
public TableMeta getTableMeta(String table_name) {
if (!tableMetas.containsKey(table_name)) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION,
"table metadata of " + table_name + " is empty!");
}
return tableMetas.get(table_name);
}
public List<ColumnMeta> getColumnMetaList(String tbname, TableType tableType) {
if (columnMetas.get(tbname).isEmpty()) {
synchronized (SchemaCache.class) {
if (columnMetas.get(tbname).isEmpty()) {
List<ColumnMeta> colMetaList = getColumnMetaListFromDb(tbname, tableType);
if (colMetaList.isEmpty()) {
throw DataXException.asDataXException("column metadata of table: " + tbname + " is empty!");
}
columnMetas.get(tbname).addAll(colMetaList);
}
}
}
return columnMetas.get(tbname);
}
private List<ColumnMeta> getColumnMetaListFromDb(String tableName, TableType tableType) {
List<ColumnMeta> columnMetaList = new ArrayList<>();
List<String> column_name = config.getList(Key.COLUMN, String.class)
.stream()
.map(String::toLowerCase)
.collect(Collectors.toList());
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("describe " + tableName);
for (int i = 0; rs.next(); i++) {
ColumnMeta columnMeta = buildColumnMeta(rs, i == 0);
if (column_name.contains(columnMeta.field.toLowerCase())) {
columnMetaList.add(columnMeta);
}
}
rs.close();
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
// 如果是子表才需要获取 tag
if (tableType == TableType.SUB_TABLE) {
for (ColumnMeta colMeta : columnMetaList) {
if (!colMeta.isTag)
continue;
Object tagValue = getTagValue(tableName, colMeta.field);
colMeta.value = tagValue;
}
}
return columnMetaList;
}
private Object getTagValue(String tableName, String tagName) {
String sql = "select " + tagName + " from " + tableName + " limit 1";
Object tagValue = null;
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
tagValue = rs.getObject(tagName);
}
} catch (SQLException e) {
log.error("failed to get tag value, use NULL, cause: {" + e.getMessage() + "}");
}
return tagValue;
}
private ColumnMeta buildColumnMeta(ResultSet rs, boolean isPrimaryKey) throws SQLException {
ColumnMeta columnMeta = new ColumnMeta();
columnMeta.field = rs.getString(Constants.COLUMN_META_FIELD);
columnMeta.type = rs.getString(Constants.COLUMN_META_TYPE);
columnMeta.length = rs.getInt(Constants.COLUMN_META_LENGTH);
columnMeta.note = rs.getString(Constants.COLUMN_META_NOTE);
columnMeta.isTag = Constants.COLUMN_META_NOTE_TAG.equals(columnMeta.note);
// columnMeta.isPrimaryKey = "ts".equals(columnMeta.field);
columnMeta.isPrimaryKey = isPrimaryKey;
return columnMeta;
}
}

View File

@ -0,0 +1,208 @@
package com.alibaba.datax.plugin.writer.tdengine30writer;
import com.alibaba.datax.common.exception.DataXException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.*;
import java.util.stream.Collectors;
public class SchemaManager {
private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class);
// private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "_";
protected static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "";
protected final Connection conn;
protected TimestampPrecision precision;
protected Map<String, Map<String, String>> tags2tbnameMaps = new HashMap<>();
public SchemaManager(Connection conn) {
this.conn = conn;
}
public TimestampPrecision loadDatabasePrecision() throws DataXException {
if (this.precision != null)
return this.precision;
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("select database()");
String dbname = null;
while (rs.next()) {
dbname = rs.getString("database()");
}
if (dbname == null)
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION,
"Database not specified or available");
rs = stmt.executeQuery("show databases");
while (rs.next()) {
String name = rs.getString("name");
if (!name.equalsIgnoreCase(dbname))
continue;
String precision = rs.getString("precision");
switch (precision) {
case "ns":
this.precision = TimestampPrecision.NANOSEC;
break;
case "us":
this.precision = TimestampPrecision.MICROSEC;
break;
case "ms":
default:
this.precision = TimestampPrecision.MILLISEC;
}
}
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
return this.precision;
}
public Map<String, TableMeta> loadTableMeta(List<String> tables) throws DataXException {
Map<String, TableMeta> tableMetas = new HashMap();
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("show stables");
while (rs.next()) {
TableMeta tableMeta = buildSupTableMeta(rs);
if (!tables.contains(tableMeta.tbname))
continue;
tableMetas.put(tableMeta.tbname, tableMeta);
}
rs = stmt.executeQuery("show tables");
while (rs.next()) {
TableMeta tableMeta = buildSubTableMeta(rs);
if (!tables.contains(tableMeta.tbname))
continue;
tableMetas.put(tableMeta.tbname, tableMeta);
}
for (String tbname : tables) {
if (!tableMetas.containsKey(tbname)) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION,
"table metadata of " + tbname + " is empty!");
}
}
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
return tableMetas;
}
@Deprecated
public Map<String, List<ColumnMeta>> loadColumnMetas(List<String> tables) throws DataXException {
Map<String, List<ColumnMeta>> ret = new HashMap<>();
for (String table : tables) {
List<ColumnMeta> columnMetaList = new ArrayList<>();
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("describe " + table);
for (int i = 0; rs.next(); i++) {
ColumnMeta columnMeta = buildColumnMeta(rs, i == 0);
columnMetaList.add(columnMeta);
}
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
if (columnMetaList.isEmpty()) {
LOG.error("column metadata of " + table + " is empty!");
continue;
}
columnMetaList.stream().filter(colMeta -> colMeta.isTag).forEach(colMeta -> {
String sql = "select " + colMeta.field + " from " + table;
Object value = null;
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery(sql);
for (int i = 0; rs.next(); i++) {
value = rs.getObject(colMeta.field);
if (i > 0) {
value = null;
break;
}
}
} catch (SQLException e) {
e.printStackTrace();
}
colMeta.value = value;
});
LOG.debug("load column metadata of " + table + ": " + Arrays.toString(columnMetaList.toArray()));
ret.put(table, columnMetaList);
}
return ret;
}
protected TableMeta buildSupTableMeta(ResultSet rs) throws SQLException {
TableMeta tableMeta = new TableMeta();
tableMeta.tableType = TableType.SUP_TABLE;
tableMeta.tbname = rs.getString("name");
tableMeta.columns = rs.getInt("columns");
tableMeta.tags = rs.getInt("tags");
tableMeta.tables = rs.getInt("tables");
LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta);
return tableMeta;
}
protected TableMeta buildSubTableMeta(ResultSet rs) throws SQLException {
TableMeta tableMeta = new TableMeta();
String stable_name = rs.getString("stable_name");
tableMeta.tableType = StringUtils.isBlank(stable_name) ? TableType.NML_TABLE : TableType.SUB_TABLE;
tableMeta.tbname = rs.getString("table_name");
tableMeta.columns = rs.getInt("columns");
tableMeta.stable_name = StringUtils.isBlank(stable_name) ? null : stable_name;
LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta);
return tableMeta;
}
protected ColumnMeta buildColumnMeta(ResultSet rs, boolean isPrimaryKey) throws SQLException {
ColumnMeta columnMeta = new ColumnMeta();
columnMeta.field = rs.getString("Field");
columnMeta.type = rs.getString("Type");
columnMeta.length = rs.getInt("Length");
columnMeta.note = rs.getString("Note");
columnMeta.isTag = columnMeta.note != null && columnMeta.note.equals("TAG");
columnMeta.isPrimaryKey = isPrimaryKey;
return columnMeta;
}
public Map<String, String> loadTagTableNameMap(String table) throws SQLException {
if (tags2tbnameMaps.containsKey(table))
return tags2tbnameMaps.get(table);
Map<String, String> tags2tbname = new HashMap<>();
try (Statement stmt = conn.createStatement()) {
// describe table
List<String> tags = new ArrayList<>();
ResultSet rs = stmt.executeQuery("describe " + table);
while (rs.next()) {
String note = rs.getString("Note");
if ("TAG".equals(note)) {
tags.add(rs.getString("Field"));
}
}
// select distinct tbname, t1, t2 from stb
rs = stmt.executeQuery("select distinct " + String.join(",", tags) + ",tbname from " + table);
while (rs.next()) {
ResultSet finalRs = rs;
String tagStr = tags.stream().map(t -> {
try {
return finalRs.getString(t);
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
return "NULL";
}).collect(Collectors.joining(TAG_TABLE_NAME_MAP_KEY_SPLITTER));
String tbname = rs.getString("tbname");
tags2tbname.put(tagStr, tbname);
}
}
tags2tbnameMaps.put(table, tags2tbname);
return tags2tbname;
}
}

View File

@ -0,0 +1,130 @@
package com.alibaba.datax.plugin.writer.tdengine30writer;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
public class TDengineWriter extends Writer {
private static final String PEER_PLUGIN_NAME = "peerPluginName";
public static class Job extends Writer.Job {
private Configuration originalConfig;
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
this.originalConfig.set(PEER_PLUGIN_NAME, getPeerPluginName());
// check username
String user = this.originalConfig.getString(Key.USERNAME);
if (StringUtils.isBlank(user))
throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.USERNAME + "] is not set.");
// check password
String password = this.originalConfig.getString(Key.PASSWORD);
if (StringUtils.isBlank(password))
throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.PASSWORD + "] is not set.");
// check connection
List<Object> connection = this.originalConfig.getList(Key.CONNECTION);
if (connection == null || connection.isEmpty())
throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.CONNECTION + "] is not set.");
if (connection.size() > 1)
LOG.warn("connection.size is " + connection.size() + " and only connection[0] will be used.");
Configuration conn = Configuration.from(connection.get(0).toString());
String jdbcUrl = conn.getString(Key.JDBC_URL);
if (StringUtils.isBlank(jdbcUrl))
throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.JDBC_URL + "] of connection is not set.");
}
@Override
public void destroy() {
}
@Override
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> writerSplitConfigs = new ArrayList<>();
for (int i = 0; i < mandatoryNumber; i++) {
Configuration clone = this.originalConfig.clone();
Configuration config = Configuration.from(
this.originalConfig.getList(Key.CONNECTION).get(0).toString());
String jdbcUrl = config.getString(Key.JDBC_URL);
clone.set(Key.JDBC_URL, jdbcUrl);
clone.set(Key.TABLE, config.getList(Key.TABLE));
writerSplitConfigs.add(clone);
}
return writerSplitConfigs;
}
}
public static class Task extends Writer.Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private Configuration writerConfig;
private TaskPluginCollector taskPluginCollector;
@Override
public void init() {
this.writerConfig = getPluginJobConf();
this.taskPluginCollector = super.getTaskPluginCollector();
}
@Override
public void destroy() {
}
@Override
public void startWrite(RecordReceiver lineReceiver) {
String peerPluginName = this.writerConfig.getString(PEER_PLUGIN_NAME);
LOG.debug("start to handle record from: " + peerPluginName);
DataHandler handler;
if (peerPluginName.equals("opentsdbreader"))
handler = new OpentsdbDataHandler(this.writerConfig);
else
handler = new DefaultDataHandler(this.writerConfig, this.taskPluginCollector);
long records = handler.handle(lineReceiver, getTaskPluginCollector());
LOG.debug("handle data finished, records: " + records);
}
}
/**
* jdbcUrl 中解析出数据库名称
*
* @param jdbcUrl 格式是 jdbc:<protocol>://<host>:<port>/<dbname>[?可选参数]
* @return 数据库名称
*/
public static String parseDatabaseFromJdbcUrl(String jdbcUrl) {
int questionMarkIndex = -1;
if (jdbcUrl.contains("?")) {
questionMarkIndex = jdbcUrl.indexOf("?");
}
return questionMarkIndex == -1 ? jdbcUrl.substring(jdbcUrl.lastIndexOf("/") + 1) : jdbcUrl.substring(
jdbcUrl.lastIndexOf("/") + 1, questionMarkIndex);
}
}

Some files were not shown because too many files have changed in this diff Show More