feat: support databend writer for dataX

datax

save

save

save

save

add official documenations

add doc

add doc

fix internationalization

fix guava
This commit is contained in:
zhihanz 2023-01-12 00:56:12 +08:00
parent c353e81656
commit f641e46ccf
12 changed files with 838 additions and 42 deletions

View File

@ -3,7 +3,7 @@
# DataX
DataX 是阿里云 [DataWorks数据集成](https://www.aliyun.com/product/bigdata/ide) 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。
DataX 是阿里云 [DataWorks数据集成](https://www.aliyun.com/product/bigdata/ide) 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。
# DataX 商业版本
阿里云DataWorks数据集成是DataX团队在阿里云上的商业化产品致力于提供复杂网络环境下、丰富的异构数据源之间高速稳定的数据移动能力以及繁杂业务背景下的数据同步解决方案。目前已经支持云上近3000家客户单日同步数据超过3万亿条。DataWorks数据集成目前支持离线50+种数据源可以进行整库迁移、批量上云、增量同步、分库分表等各类同步解决方案。2020年更新实时同步能力支持10+种数据源的读写任意组合。提供MySQLOracle等多种数据源到阿里云MaxComputeHologres等大数据引擎的一键全增量同步解决方案。
@ -36,45 +36,46 @@ DataX本身作为数据同步框架将不同数据源的同步抽象为从源
DataX目前已经有了比较全面的插件体系主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入目前支持数据如下图详情请点击[DataX数据源参考指南](https://github.com/alibaba/DataX/wiki/DataX-all-data-channels)
| 类型 | 数据源 | Reader(读) |Writer(写)| 文档 |
|--------------|---------------------------|:---------:|:-------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:|
| RDBMS 关系型数据库 | MySQL |√|√| [](https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md) |
| | Oracle |√|√| [](https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/oraclewriter/doc/oraclewriter.md) |
| | OceanBase |√|√| [](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) 、[写](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) |
| | SQLServer |√|√| [](https://github.com/alibaba/DataX/blob/master/sqlserverreader/doc/sqlserverreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/sqlserverwriter/doc/sqlserverwriter.md) |
| | PostgreSQL |√|√| [](https://github.com/alibaba/DataX/blob/master/postgresqlreader/doc/postgresqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/postgresqlwriter/doc/postgresqlwriter.md) |
| | DRDS |√|√| [](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) |
| | Kingbase |√|√| [](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) |
| | 通用RDBMS(支持所有关系型数据库) |√|√| [](https://github.com/alibaba/DataX/blob/master/rdbmsreader/doc/rdbmsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/rdbmswriter/doc/rdbmswriter.md) |
| 阿里云数仓数据存储 | ODPS |√|√| [](https://github.com/alibaba/DataX/blob/master/odpsreader/doc/odpsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/odpswriter/doc/odpswriter.md) |
| | ADB | |√| [](https://github.com/alibaba/DataX/blob/master/adbmysqlwriter/doc/adbmysqlwriter.md) |
| | ADS | |√| [](https://github.com/alibaba/DataX/blob/master/adswriter/doc/adswriter.md) |
| | OSS |√|√| [](https://github.com/alibaba/DataX/blob/master/ossreader/doc/ossreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/osswriter/doc/osswriter.md) |
| | OCS | |√| [](https://github.com/alibaba/DataX/blob/master/ocswriter/doc/ocswriter.md) |
| | Hologres | |√| [](https://github.com/alibaba/DataX/blob/master/hologresjdbcwriter/doc/hologresjdbcwriter.md) |
| | AnalyticDB For PostgreSQL | |√| 写 |
| 阿里云中间件 | datahub |√|√| 读 、写 |
| | SLS |√|√| 读 、写 |
| 阿里云图数据库 | GDB |√|√| [](https://github.com/alibaba/DataX/blob/master/gdbreader/doc/gdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/gdbwriter/doc/gdbwriter.md) |
| NoSQL数据存储 | OTS |√|√| [](https://github.com/alibaba/DataX/blob/master/otsreader/doc/otsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/otswriter/doc/otswriter.md) |
| | Hbase0.94 |√|√| [](https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md) |
| | Hbase1.1 |√|√| [](https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md) |
| | Phoenix4.x |√|√| [](https://github.com/alibaba/DataX/blob/master/hbase11xsqlreader/doc/hbase11xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xsqlwriter/doc/hbase11xsqlwriter.md) |
| | Phoenix5.x |√|√| [](https://github.com/alibaba/DataX/blob/master/hbase20xsqlreader/doc/hbase20xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase20xsqlwriter/doc/hbase20xsqlwriter.md) |
| | MongoDB |√|√| [](https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mongodbwriter/doc/mongodbwriter.md) |
| | Cassandra |√|√| [](https://github.com/alibaba/DataX/blob/master/cassandrareader/doc/cassandrareader.md) 、[写](https://github.com/alibaba/DataX/blob/master/cassandrawriter/doc/cassandrawriter.md) |
| 数仓数据存储 | StarRocks |√|√| 读 、[写](https://github.com/alibaba/DataX/blob/master/starrockswriter/doc/starrockswriter.md) |
| | ApacheDoris | |√| [](https://github.com/alibaba/DataX/blob/master/doriswriter/doc/doriswriter.md) |
| | ClickHouse | |√| 写|
| | Hive |√|√| [](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) |
| | kudu | |√| [](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) |
| 无结构化数据存储 | TxtFile |√|√| [](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md) |
| | FTP |√|√| [](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md) |
| | HDFS |√|√| [](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) |
| | Elasticsearch | |√| [](https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md) |
| 时间序列数据库 | OpenTSDB |√| | [](https://github.com/alibaba/DataX/blob/master/opentsdbreader/doc/opentsdbreader.md) |
| | TSDB |√|√| [](https://github.com/alibaba/DataX/blob/master/tsdbreader/doc/tsdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/tsdbwriter/doc/tsdbhttpwriter.md) |
| | TDengine |√|√| [](https://github.com/alibaba/DataX/blob/master/tdenginereader/doc/tdenginereader-CN.md) 、[写](https://github.com/alibaba/DataX/blob/master/tdenginewriter/doc/tdenginewriter-CN.md) |
| 类型 | 数据源 | Reader(读) | Writer(写) | 文档 |
|--------------|---------------------------|:---------:|:---------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:|
| RDBMS 关系型数据库 | MySQL | √ | √ | [](https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md) |
| | Oracle | √ | √ | [](https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/oraclewriter/doc/oraclewriter.md) |
| | OceanBase | √ | √ | [](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) 、[写](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) |
| | SQLServer | √ | √ | [](https://github.com/alibaba/DataX/blob/master/sqlserverreader/doc/sqlserverreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/sqlserverwriter/doc/sqlserverwriter.md) |
| | PostgreSQL | √ | √ | [](https://github.com/alibaba/DataX/blob/master/postgresqlreader/doc/postgresqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/postgresqlwriter/doc/postgresqlwriter.md) |
| | DRDS | √ | √ | [](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) |
| | Kingbase | √ | √ | [](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) |
| | 通用RDBMS(支持所有关系型数据库) | √ | √ | [](https://github.com/alibaba/DataX/blob/master/rdbmsreader/doc/rdbmsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/rdbmswriter/doc/rdbmswriter.md) |
| 阿里云数仓数据存储 | ODPS | √ | √ | [](https://github.com/alibaba/DataX/blob/master/odpsreader/doc/odpsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/odpswriter/doc/odpswriter.md) |
| | ADB | | √ | [](https://github.com/alibaba/DataX/blob/master/adbmysqlwriter/doc/adbmysqlwriter.md) |
| | ADS | | √ | [](https://github.com/alibaba/DataX/blob/master/adswriter/doc/adswriter.md) |
| | OSS | √ | √ | [](https://github.com/alibaba/DataX/blob/master/ossreader/doc/ossreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/osswriter/doc/osswriter.md) |
| | OCS | | √ | [](https://github.com/alibaba/DataX/blob/master/ocswriter/doc/ocswriter.md) |
| | Hologres | | √ | [](https://github.com/alibaba/DataX/blob/master/hologresjdbcwriter/doc/hologresjdbcwriter.md) |
| | AnalyticDB For PostgreSQL | | √ | 写 |
| 阿里云中间件 | datahub | √ | √ | 读 、写 |
| | SLS | √ | √ | 读 、写 |
| 阿里云图数据库 | GDB | √ | √ | [](https://github.com/alibaba/DataX/blob/master/gdbreader/doc/gdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/gdbwriter/doc/gdbwriter.md) |
| NoSQL数据存储 | OTS | √ | √ | [](https://github.com/alibaba/DataX/blob/master/otsreader/doc/otsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/otswriter/doc/otswriter.md) |
| | Hbase0.94 | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md) |
| | Hbase1.1 | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md) |
| | Phoenix4.x | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hbase11xsqlreader/doc/hbase11xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xsqlwriter/doc/hbase11xsqlwriter.md) |
| | Phoenix5.x | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hbase20xsqlreader/doc/hbase20xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase20xsqlwriter/doc/hbase20xsqlwriter.md) |
| | MongoDB | √ | √ | [](https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mongodbwriter/doc/mongodbwriter.md) |
| | Cassandra | √ | √ | [](https://github.com/alibaba/DataX/blob/master/cassandrareader/doc/cassandrareader.md) 、[写](https://github.com/alibaba/DataX/blob/master/cassandrawriter/doc/cassandrawriter.md) |
| 数仓数据存储 | StarRocks | √ | √ | 读 、[写](https://github.com/alibaba/DataX/blob/master/starrockswriter/doc/starrockswriter.md) |
| | ApacheDoris | | √ | [](https://github.com/alibaba/DataX/blob/master/doriswriter/doc/doriswriter.md) |
| | ClickHouse | | √ | 写 |
| | Databend | | √ | [](https://github.com/alibaba/DataX/blob/master/databendwriter/doc/databendwriter.md) |
| | Hive | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) |
| | kudu | | √ | [](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) |
| 无结构化数据存储 | TxtFile | √ | √ | [](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md) |
| | FTP | √ | √ | [](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md) |
| | HDFS | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) |
| | Elasticsearch | | √ | [](https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md) |
| 时间序列数据库 | OpenTSDB | √ | | [](https://github.com/alibaba/DataX/blob/master/opentsdbreader/doc/opentsdbreader.md) |
| | TSDB | √ | √ | [](https://github.com/alibaba/DataX/blob/master/tsdbreader/doc/tsdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/tsdbwriter/doc/tsdbhttpwriter.md) |
| | TDengine | √ | √ | [](https://github.com/alibaba/DataX/blob/master/tdenginereader/doc/tdenginereader-CN.md) 、[写](https://github.com/alibaba/DataX/blob/master/tdenginewriter/doc/tdenginewriter-CN.md) |
# 阿里云DataWorks数据集成

View File

@ -0,0 +1,171 @@
# DataX DatabendWriter
[简体中文](./databendwriter-CN.md) | [English](./databendwriter.md)
## 1 快速介绍
Databend Writer 是一个 DataX 的插件,用于从 DataX 中写入数据到 Databend 表中。
该插件基于[databend JDBC driver](https://github.com/databendcloud/databend-jdbc) ,它使用 [RESTful http protocol](https://databend.rs/doc/integrations/api/rest)
在开源的 databend 和 [databend cloud](https://app.databend.com/) 上执行查询。
在每个写入批次中databend writer 将批量数据上传到内部的 S3 stage然后执行相应的 insert SQL 将数据上传到 databend 表中。
为了最佳的用户体验,如果您使用的是 databend 社区版本,您应该尝试采用 [S3](https://aws.amazon.com/s3/)/[minio](https://min.io/)/[OSS](https://www.alibabacloud.com/product/object-storage-service) 作为其底层存储层,因为
它们支持预签名上传操作,否则您可能会在数据传输上浪费不必要的成本。
您可以在[文档](https://databend.rs/doc/deploy/deploying-databend)中了解更多详细信息
## 2 实现原理
Databend Writer 将使用 DataX 从 DataX Reader 中获取生成的记录,并将记录批量插入到 databend 表中指定的列中。
## 3 功能说明
### 3.1 配置样例
* 以下配置将从内存中读取一些生成的数据并将数据上传到databend表中
#### 准备工作
```sql
--- create table in databend
drop table if exists datax.sample1;
drop database if exists datax;
create database if not exists datax;
create table if not exsits datax.sample1(a string, b int64, c date, d timestamp, e bool, f string, g variant);
```
#### 配置样例
```json
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": "DataX",
"type": "string"
},
{
"value": 19880808,
"type": "long"
},
{
"value": "1926-08-08 08:08:08",
"type": "date"
},
{
"value": "1988-08-08 08:08:08",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
},
{
"value": "{\"type\": \"variant\", \"value\": \"test\"}",
"type": "string"
}
],
"sliceRecordCount": 10000
}
},
"writer": {
"name": "databendwriter",
"parameter": {
"username": "databend",
"password": "databend",
"column": ["a", "b", "c", "d", "e", "f", "g"],
"batchSize": 1000,
"preSql": [
],
"postSql": [
],
"connection": [
{
"jdbcUrl": "jdbc:databend://localhost:8000/datax",
"table": [
"sample1"
]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
```
### 3.2 参数说明
* jdbcUrl
* 描述: JDBC 数据源 url。请参阅仓库中的详细[文档](https://github.com/databendcloud/databend-jdbc)
* 必选: 是
* 默认值: 无
* 示例: jdbc:databend://localhost:8000/datax
* username
* 描述: JDBC 数据源用户名
* 必选: 是
* 默认值: 无
* 示例: databend
* password
* 描述: JDBC 数据源密码
* 必选: 是
* 默认值: 无
* 示例: databend
* table
* 描述: 表名的集合table应该包含column参数中的所有列。
* 必选: 是
* 默认值: 无
* 示例: ["sample1"]
* column
* 描述: 表中的列名集合字段顺序应该与reader的record中的column类型对应
* 必选: 是
* 默认值: 无
* 示例: ["a", "b", "c", "d", "e", "f", "g"]
* batchSize
* 描述: 每个批次的记录数
* 必选: 否
* 默认值: 1000
* 示例: 1000
* preSql
* 描述: 在写入数据之前执行的SQL语句
* 必选: 否
* 默认值: 无
* 示例: ["delete from datax.sample1"]
* postSql
* 描述: 在写入数据之后执行的SQL语句
* 必选: 否
* 默认值: 无
* 示例: ["select count(*) from datax.sample1"]
### 3.3 类型转化
DataX中的数据类型可以转换为databend中的相应数据类型。下表显示了两种类型之间的对应关系。
| DataX 内部类型 | Databend 数据类型 |
|------------|-----------------------------------------------------------|
| INT | TINYINT, INT8, SMALLINT, INT16, INT, INT32, BIGINT, INT64 |
| LONG | TINYINT, INT8, SMALLINT, INT16, INT, INT32, BIGINT, INT64 |
| STRING | STRING, VARCHAR |
| DOUBLE | FLOAT, DOUBLE |
| BOOL | BOOLEAN, BOOL |
| DATE | DATE, TIMESTAMP |
| BYTES | STRING, VARCHAR |
## 4 性能测试
## 5 约束限制
目前复杂数据类型支持不稳定如果您想使用复杂数据类型例如元组数组请检查databend和jdbc驱动程序的进一步版本。
## FAQ

View File

@ -0,0 +1,166 @@
# DataX DatabendWriter
[简体中文](./databendwriter-CN.md) | [English](./databendwriter.md)
## 1 Introduction
Databend Writer is a plugin for DataX to write data to Databend Table from dataX records.
The plugin is based on [databend JDBC driver](https://github.com/databendcloud/databend-jdbc) which use [RESTful http protocol](https://databend.rs/doc/integrations/api/rest)
to execute query on open source databend and [databend cloud](https://app.databend.com/).
During each write batch, databend writer will upload batch data into internal S3 stage and execute corresponding insert SQL to upload data into databend table.
For best user experience, if you are using databend community distribution, you should try to adopt [S3](https://aws.amazon.com/s3/)/[minio](https://min.io/)/[OSS](https://www.alibabacloud.com/product/object-storage-service) as its underlying storage layer since
they support presign upload operation otherwise you may expend unneeded cost on data transfer.
You could see more details on the [doc](https://databend.rs/doc/deploy/deploying-databend)
## 2 Detailed Implementation
Databend Writer would use DataX to fetch records generated by DataX Reader, and then batch insert records to the designated columns for your databend table.
## 3 Features
### 3.1 Example Configurations
* the following configuration would read some generated data in memory and upload data into databend table
#### Preparation
```sql
--- create table in databend
drop table if exists datax.sample1;
drop database if exists datax;
create database if not exists datax;
create table if not exsits datax.sample1(a string, b int64, c date, d timestamp, e bool, f string, g variant);
```
#### Configurations
```json
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": "DataX",
"type": "string"
},
{
"value": 19880808,
"type": "long"
},
{
"value": "1926-08-08 08:08:08",
"type": "date"
},
{
"value": "1988-08-08 08:08:08",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
},
{
"value": "{\"type\": \"variant\", \"value\": \"test\"}",
"type": "string"
}
],
"sliceRecordCount": 10000
}
},
"writer": {
"name": "databendwriter",
"parameter": {
"username": "databend",
"password": "databend",
"column": ["a", "b", "c", "d", "e", "f", "g"],
"batchSize": 1000,
"preSql": [
],
"postSql": [
],
"connection": [
{
"jdbcUrl": "jdbc:databend://localhost:8000/datax",
"table": [
"sample1"
]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
```
### 3.2 Configuration Description
* jdbcUrl
* Description: JDBC Data source url in Databend. Please take a look at repository for detailed [doc](https://github.com/databendcloud/databend-jdbc)
* Required: yes
* Default: none
* Example: jdbc:databend://localhost:8000/datax
* username
* Description: Databend user name
* Required: yes
* Default: none
* Example: databend
* password
* Description: Databend user password
* Required: yes
* Default: none
* Example: databend
* table
* Description: A list of table names that should contain all of the columns in the column parameter.
* Required: yes
* Default: none
* Example: ["sample1"]
* column
* Description: A list of column field names that should be inserted into the table. if you want to insert all column fields use `["*"]` instead.
* Required: yes
* Default: none
* Example: ["a", "b", "c", "d", "e", "f", "g"]
* batchSize
* Description: The number of records to be inserted in each batch.
* Required: no
* Default: 1024
* preSql
* Description: A list of SQL statements that will be executed before the write operation.
* Required: no
* Default: none
* postSql
* Description: A list of SQL statements that will be executed after the write operation.
* Required: no
* Default: none
### 3.3 Type Convert
Data types in datax can be converted to the corresponding data types in databend. The following table shows the correspondence between the two types.
| DataX Type | Databend Type |
|------------|-----------------------------------------------------------|
| INT | TINYINT, INT8, SMALLINT, INT16, INT, INT32, BIGINT, INT64 |
| LONG | TINYINT, INT8, SMALLINT, INT16, INT, INT32, BIGINT, INT64 |
| STRING | STRING, VARCHAR |
| DOUBLE | FLOAT, DOUBLE |
| BOOL | BOOLEAN, BOOL |
| DATE | DATE, TIMESTAMP |
| BYTES | STRING, VARCHAR |
## 4 Performance Test
## 5 Restrictions
Currently, complex data type support is not stable, if you want to use complex data type such as tuple, array, please check further release version of databend and jdbc driver.
## FAQ

101
databendwriter/pom.xml Normal file
View File

@ -0,0 +1,101 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>databendwriter</artifactId>
<name>databendwriter</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.databend</groupId>
<artifactId>databend-jdbc</artifactId>
<version>0.0.4</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,34 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/databendwriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>databendwriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/databendwriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/databendwriter/libs</outputDirectory>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,248 @@
package com.alibaba.datax.plugin.writer.databendwriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.CommonErrorCode;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import com.alibaba.datax.plugin.writer.databendwriter.util.DatabendWriterUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.List;
import java.util.regex.Pattern;
public class DatabendWriter extends Writer
{
private static final DataBaseType DATABASE_TYPE = DataBaseType.Databend;
public static class Job
extends Writer.Job
{
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig;
private CommonRdbmsWriter.Job commonRdbmsWriterMaster;
@Override
public void init()
{
this.originalConfig = super.getPluginJobConf();
this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE);
this.commonRdbmsWriterMaster.init(this.originalConfig);
// placeholder currently not supported by databend driver, needs special treatment
DatabendWriterUtil.dealWriteMode(this.originalConfig);
}
@Override
public void preCheck()
{
this.init();
this.commonRdbmsWriterMaster.writerPreCheck(this.originalConfig, DATABASE_TYPE);
}
@Override
public void prepare() {
this.commonRdbmsWriterMaster.prepare(this.originalConfig);
}
@Override
public List<Configuration> split(int mandatoryNumber) {
return this.commonRdbmsWriterMaster.split(this.originalConfig, mandatoryNumber);
}
@Override
public void post() {
this.commonRdbmsWriterMaster.post(this.originalConfig);
}
@Override
public void destroy() {
this.commonRdbmsWriterMaster.destroy(this.originalConfig);
}
}
public static class Task extends Writer.Task
{
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private Configuration writerSliceConfig;
private CommonRdbmsWriter.Task commonRdbmsWriterSlave;
@Override
public void init()
{
this.writerSliceConfig = super.getPluginJobConf();
this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DataBaseType.Databend){
@Override
protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqltype, String typeName, Column column) throws SQLException {
try {
if (column.getRawData() == null) {
preparedStatement.setNull(columnIndex + 1, columnSqltype);
return preparedStatement;
}
java.util.Date utilDate;
switch (columnSqltype) {
case Types.TINYINT:
case Types.SMALLINT:
case Types.INTEGER:
preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue());
break;
case Types.BIGINT:
preparedStatement.setLong(columnIndex + 1, column.asLong());
break;
case Types.DECIMAL:
preparedStatement.setBigDecimal(columnIndex + 1, column.asBigDecimal());
break;
case Types.FLOAT:
case Types.REAL:
preparedStatement.setFloat(columnIndex + 1, column.asDouble().floatValue());
break;
case Types.DOUBLE:
preparedStatement.setDouble(columnIndex + 1, column.asDouble());
break;
case Types.DATE:
java.sql.Date sqlDate = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"Date type conversion error: [%s]", column));
}
if (null != utilDate) {
sqlDate = new java.sql.Date(utilDate.getTime());
}
preparedStatement.setDate(columnIndex + 1, sqlDate);
break;
case Types.TIME:
java.sql.Time sqlTime = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"Date type conversion error: [%s]", column));
}
if (null != utilDate) {
sqlTime = new java.sql.Time(utilDate.getTime());
}
preparedStatement.setTime(columnIndex + 1, sqlTime);
break;
case Types.TIMESTAMP:
Timestamp sqlTimestamp = null;
if (column instanceof StringColumn && column.asString() != null) {
String timeStampStr = column.asString();
// JAVA TIMESTAMP 类型入参必须是 "2017-07-12 14:39:00.123566" 格式
String pattern = "^\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+";
boolean isMatch = Pattern.matches(pattern, timeStampStr);
if (isMatch) {
sqlTimestamp = Timestamp.valueOf(timeStampStr);
preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp);
break;
}
}
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"Date type conversion error: [%s]", column));
}
if (null != utilDate) {
sqlTimestamp = new Timestamp(
utilDate.getTime());
}
preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp);
break;
case Types.BINARY:
case Types.VARBINARY:
case Types.BLOB:
case Types.LONGVARBINARY:
preparedStatement.setBytes(columnIndex + 1, column
.asBytes());
break;
case Types.BOOLEAN:
// warn: bit(1) -> Types.BIT 可使用setBoolean
// warn: bit(>1) -> Types.VARBINARY 可使用setBytes
case Types.BIT:
if (this.dataBaseType == DataBaseType.MySql) {
Boolean asBoolean = column.asBoolean();
if (asBoolean != null) {
preparedStatement.setBoolean(columnIndex + 1, asBoolean);
} else {
preparedStatement.setNull(columnIndex + 1, Types.BIT);
}
} else {
preparedStatement.setString(columnIndex + 1, column.asString());
}
break;
default:
// cast variant / array into string is fine.
preparedStatement.setString(columnIndex + 1, column.asString());
break;
}
return preparedStatement;
} catch (DataXException e) {
// fix类型转换或者溢出失败时将具体哪一列打印出来
if (e.getErrorCode() == CommonErrorCode.CONVERT_NOT_SUPPORT ||
e.getErrorCode() == CommonErrorCode.CONVERT_OVER_FLOW) {
throw DataXException
.asDataXException(
e.getErrorCode(),
String.format(
"type conversion error. columnName: [%s], columnType:[%d], columnJavaType: [%s]. please change the data type in given column field or do not sync on the column.",
this.resultSetMetaData.getLeft()
.get(columnIndex),
this.resultSetMetaData.getMiddle()
.get(columnIndex),
this.resultSetMetaData.getRight()
.get(columnIndex)));
} else {
throw e;
}
}
}
};
this.commonRdbmsWriterSlave.init(this.writerSliceConfig);
}
@Override
public void destroy()
{
this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig);
}
@Override
public void prepare() {
this.commonRdbmsWriterSlave.prepare(this.writerSliceConfig);
}
@Override
public void post() {
this.commonRdbmsWriterSlave.post(this.writerSliceConfig);
}
@Override
public void startWrite(RecordReceiver lineReceiver)
{
this.commonRdbmsWriterSlave.startWrite(lineReceiver, this.writerSliceConfig, this.getTaskPluginCollector());
}
}
}

View File

@ -0,0 +1,40 @@
package com.alibaba.datax.plugin.writer.databendwriter.util;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.StringJoiner;
public final class DatabendWriterUtil
{
private static final Logger LOG = LoggerFactory.getLogger(DatabendWriterUtil.class);
private DatabendWriterUtil() {}
public static void dealWriteMode(Configuration originalConfig)
{
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
String jdbcUrl = originalConfig.getString(String.format("%s[0].%s",
Constant.CONN_MARK, Key.JDBC_URL, String.class));
String writeMode = originalConfig.getString(Key.WRITE_MODE, "INSERT");
StringBuilder writeDataSqlTemplate = new StringBuilder();
writeDataSqlTemplate.append("INSERT INTO %s");
StringJoiner columnString = new StringJoiner(",");
for (String column : columns) {
columnString.add(column);
}
writeDataSqlTemplate.append(String.format("(%s)", columnString));
writeDataSqlTemplate.append(" VALUES");
LOG.info("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);
originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate);
}
}

View File

@ -0,0 +1,6 @@
{
"name": "databendwriter",
"class": "com.alibaba.datax.plugin.writer.databendwriter.DatabendWriter",
"description": "execute batch insert sql to write dataX data into databend",
"developer": "databend"
}

View File

@ -0,0 +1,19 @@
{
"name": "databendwriter",
"parameter": {
"username": "username",
"password": "password",
"column": ["col1", "col2", "col3"],
"connection": [
{
"jdbcUrl": "jdbc:databend://<host>:<port>[/<database>]",
"table": "table1"
}
],
"preSql": [],
"postSql": [],
"maxBatchRows": 65536,
"maxBatchSize": 134217728
}
}

View File

@ -434,6 +434,13 @@
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>databendwriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>oscarwriter/target/datax/</directory>
<includes>

View File

@ -24,8 +24,8 @@ public enum DataBaseType {
KingbaseES("kingbasees", "com.kingbase8.Driver"),
Oscar("oscar", "com.oscar.Driver"),
OceanBase("oceanbase", "com.alipay.oceanbase.jdbc.Driver"),
StarRocks("starrocks", "com.mysql.jdbc.Driver");
StarRocks("starrocks", "com.mysql.jdbc.Driver"),
Databend("databend", "com.databend.jdbc.DatabendDriver");
private String typeName;
private String driverClassName;
@ -118,6 +118,8 @@ public enum DataBaseType {
break;
case RDBMS:
break;
case Databend:
break;
case KingbaseES:
break;
case Oscar:

View File

@ -84,6 +84,7 @@
<module>mysqlwriter</module>
<module>starrockswriter</module>
<module>drdswriter</module>
<module>databendwriter</module>
<module>oraclewriter</module>
<module>sqlserverwriter</module>
<module>postgresqlwriter</module>