From f641e46ccfa0743b57dc245d5c6688127f2ffb7c Mon Sep 17 00:00:00 2001 From: zhihanz Date: Thu, 12 Jan 2023 00:56:12 +0800 Subject: [PATCH] feat: support databend writer for dataX datax save save save save add official documenations add doc add doc fix internationalization fix guava --- README.md | 81 +++--- databendwriter/doc/databendwriter-CN.md | 171 ++++++++++++ databendwriter/doc/databendwriter.md | 166 ++++++++++++ databendwriter/pom.xml | 101 +++++++ databendwriter/src/main/assembly/package.xml | 34 +++ .../writer/databendwriter/DatabendWriter.java | 248 ++++++++++++++++++ .../util/DatabendWriterUtil.java | 40 +++ databendwriter/src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 19 ++ package.xml | 7 + .../datax/plugin/rdbms/util/DataBaseType.java | 6 +- pom.xml | 1 + 12 files changed, 838 insertions(+), 42 deletions(-) create mode 100644 databendwriter/doc/databendwriter-CN.md create mode 100644 databendwriter/doc/databendwriter.md create mode 100644 databendwriter/pom.xml create mode 100755 databendwriter/src/main/assembly/package.xml create mode 100644 databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriter.java create mode 100644 databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java create mode 100644 databendwriter/src/main/resources/plugin.json create mode 100644 databendwriter/src/main/resources/plugin_job_template.json diff --git a/README.md b/README.md index 10fb9eb2..5acf4cc1 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ # DataX -DataX 是阿里云 [DataWorks数据集成](https://www.aliyun.com/product/bigdata/ide) 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。 +DataX 是阿里云 [DataWorks数据集成](https://www.aliyun.com/product/bigdata/ide) 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。 # DataX 商业版本 阿里云DataWorks数据集成是DataX团队在阿里云上的商业化产品,致力于提供复杂网络环境下、丰富的异构数据源之间高速稳定的数据移动能力,以及繁杂业务背景下的数据同步解决方案。目前已经支持云上近3000家客户,单日同步数据超过3万亿条。DataWorks数据集成目前支持离线50+种数据源,可以进行整库迁移、批量上云、增量同步、分库分表等各类同步解决方案。2020年更新实时同步能力,支持10+种数据源的读写任意组合。提供MySQL,Oracle等多种数据源到阿里云MaxCompute,Hologres等大数据引擎的一键全增量同步解决方案。 @@ -36,45 +36,46 @@ DataX本身作为数据同步框架,将不同数据源的同步抽象为从源 DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,详情请点击:[DataX数据源参考指南](https://github.com/alibaba/DataX/wiki/DataX-all-data-channels) -| 类型 | 数据源 | Reader(读) |Writer(写)| 文档 | -|--------------|---------------------------|:---------:|:-------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:| -| RDBMS 关系型数据库 | MySQL |√|√| [读](https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md) | -| | Oracle |√|√| [读](https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/oraclewriter/doc/oraclewriter.md) | -| | OceanBase |√|√| [读](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) 、[写](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) | -| | SQLServer |√|√| [读](https://github.com/alibaba/DataX/blob/master/sqlserverreader/doc/sqlserverreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/sqlserverwriter/doc/sqlserverwriter.md) | -| | PostgreSQL |√|√| [读](https://github.com/alibaba/DataX/blob/master/postgresqlreader/doc/postgresqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/postgresqlwriter/doc/postgresqlwriter.md) | -| | DRDS |√|√| [读](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) | -| | Kingbase |√|√| [读](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) | -| | 通用RDBMS(支持所有关系型数据库) |√|√| [读](https://github.com/alibaba/DataX/blob/master/rdbmsreader/doc/rdbmsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/rdbmswriter/doc/rdbmswriter.md) | -| 阿里云数仓数据存储 | ODPS |√|√| [读](https://github.com/alibaba/DataX/blob/master/odpsreader/doc/odpsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/odpswriter/doc/odpswriter.md) | -| | ADB | |√| [写](https://github.com/alibaba/DataX/blob/master/adbmysqlwriter/doc/adbmysqlwriter.md) | -| | ADS | |√| [写](https://github.com/alibaba/DataX/blob/master/adswriter/doc/adswriter.md) | -| | OSS |√|√| [读](https://github.com/alibaba/DataX/blob/master/ossreader/doc/ossreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/osswriter/doc/osswriter.md) | -| | OCS | |√| [写](https://github.com/alibaba/DataX/blob/master/ocswriter/doc/ocswriter.md) | -| | Hologres | |√| [写](https://github.com/alibaba/DataX/blob/master/hologresjdbcwriter/doc/hologresjdbcwriter.md) | -| | AnalyticDB For PostgreSQL | |√| 写 | -| 阿里云中间件 | datahub |√|√| 读 、写 | -| | SLS |√|√| 读 、写 | -| 阿里云图数据库 | GDB |√|√| [读](https://github.com/alibaba/DataX/blob/master/gdbreader/doc/gdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/gdbwriter/doc/gdbwriter.md) | -| NoSQL数据存储 | OTS |√|√| [读](https://github.com/alibaba/DataX/blob/master/otsreader/doc/otsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/otswriter/doc/otswriter.md) | -| | Hbase0.94 |√|√| [读](https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md) | -| | Hbase1.1 |√|√| [读](https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md) | -| | Phoenix4.x |√|√| [读](https://github.com/alibaba/DataX/blob/master/hbase11xsqlreader/doc/hbase11xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xsqlwriter/doc/hbase11xsqlwriter.md) | -| | Phoenix5.x |√|√| [读](https://github.com/alibaba/DataX/blob/master/hbase20xsqlreader/doc/hbase20xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase20xsqlwriter/doc/hbase20xsqlwriter.md) | -| | MongoDB |√|√| [读](https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mongodbwriter/doc/mongodbwriter.md) | -| | Cassandra |√|√| [读](https://github.com/alibaba/DataX/blob/master/cassandrareader/doc/cassandrareader.md) 、[写](https://github.com/alibaba/DataX/blob/master/cassandrawriter/doc/cassandrawriter.md) | -| 数仓数据存储 | StarRocks |√|√| 读 、[写](https://github.com/alibaba/DataX/blob/master/starrockswriter/doc/starrockswriter.md) | -| | ApacheDoris | |√| [写](https://github.com/alibaba/DataX/blob/master/doriswriter/doc/doriswriter.md) | -| | ClickHouse | |√| 写| -| | Hive |√|√| [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | -| | kudu | |√| [写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | -| 无结构化数据存储 | TxtFile |√|√| [读](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md) | -| | FTP |√|√| [读](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md) | -| | HDFS |√|√| [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | -| | Elasticsearch | |√| [写](https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md) | -| 时间序列数据库 | OpenTSDB |√| | [读](https://github.com/alibaba/DataX/blob/master/opentsdbreader/doc/opentsdbreader.md) | -| | TSDB |√|√| [读](https://github.com/alibaba/DataX/blob/master/tsdbreader/doc/tsdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/tsdbwriter/doc/tsdbhttpwriter.md) | -| | TDengine |√|√| [读](https://github.com/alibaba/DataX/blob/master/tdenginereader/doc/tdenginereader-CN.md) 、[写](https://github.com/alibaba/DataX/blob/master/tdenginewriter/doc/tdenginewriter-CN.md) | +| 类型 | 数据源 | Reader(读) | Writer(写) | 文档 | +|--------------|---------------------------|:---------:|:---------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:| +| RDBMS 关系型数据库 | MySQL | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md) | +| | Oracle | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/oraclewriter/doc/oraclewriter.md) | +| | OceanBase | √ | √ | [读](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) 、[写](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) | +| | SQLServer | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/sqlserverreader/doc/sqlserverreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/sqlserverwriter/doc/sqlserverwriter.md) | +| | PostgreSQL | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/postgresqlreader/doc/postgresqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/postgresqlwriter/doc/postgresqlwriter.md) | +| | DRDS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) | +| | Kingbase | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) | +| | 通用RDBMS(支持所有关系型数据库) | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/rdbmsreader/doc/rdbmsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/rdbmswriter/doc/rdbmswriter.md) | +| 阿里云数仓数据存储 | ODPS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/odpsreader/doc/odpsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/odpswriter/doc/odpswriter.md) | +| | ADB | | √ | [写](https://github.com/alibaba/DataX/blob/master/adbmysqlwriter/doc/adbmysqlwriter.md) | +| | ADS | | √ | [写](https://github.com/alibaba/DataX/blob/master/adswriter/doc/adswriter.md) | +| | OSS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/ossreader/doc/ossreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/osswriter/doc/osswriter.md) | +| | OCS | | √ | [写](https://github.com/alibaba/DataX/blob/master/ocswriter/doc/ocswriter.md) | +| | Hologres | | √ | [写](https://github.com/alibaba/DataX/blob/master/hologresjdbcwriter/doc/hologresjdbcwriter.md) | +| | AnalyticDB For PostgreSQL | | √ | 写 | +| 阿里云中间件 | datahub | √ | √ | 读 、写 | +| | SLS | √ | √ | 读 、写 | +| 阿里云图数据库 | GDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/gdbreader/doc/gdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/gdbwriter/doc/gdbwriter.md) | +| NoSQL数据存储 | OTS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/otsreader/doc/otsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/otswriter/doc/otswriter.md) | +| | Hbase0.94 | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md) | +| | Hbase1.1 | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md) | +| | Phoenix4.x | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase11xsqlreader/doc/hbase11xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xsqlwriter/doc/hbase11xsqlwriter.md) | +| | Phoenix5.x | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase20xsqlreader/doc/hbase20xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase20xsqlwriter/doc/hbase20xsqlwriter.md) | +| | MongoDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mongodbwriter/doc/mongodbwriter.md) | +| | Cassandra | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/cassandrareader/doc/cassandrareader.md) 、[写](https://github.com/alibaba/DataX/blob/master/cassandrawriter/doc/cassandrawriter.md) | +| 数仓数据存储 | StarRocks | √ | √ | 读 、[写](https://github.com/alibaba/DataX/blob/master/starrockswriter/doc/starrockswriter.md) | +| | ApacheDoris | | √ | [写](https://github.com/alibaba/DataX/blob/master/doriswriter/doc/doriswriter.md) | +| | ClickHouse | | √ | 写 | +| | Databend | | √ | [写](https://github.com/alibaba/DataX/blob/master/databendwriter/doc/databendwriter.md) | +| | Hive | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | +| | kudu | | √ | [写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | +| 无结构化数据存储 | TxtFile | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md) | +| | FTP | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md) | +| | HDFS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | +| | Elasticsearch | | √ | [写](https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md) | +| 时间序列数据库 | OpenTSDB | √ | | [读](https://github.com/alibaba/DataX/blob/master/opentsdbreader/doc/opentsdbreader.md) | +| | TSDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/tsdbreader/doc/tsdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/tsdbwriter/doc/tsdbhttpwriter.md) | +| | TDengine | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/tdenginereader/doc/tdenginereader-CN.md) 、[写](https://github.com/alibaba/DataX/blob/master/tdenginewriter/doc/tdenginewriter-CN.md) | # 阿里云DataWorks数据集成 diff --git a/databendwriter/doc/databendwriter-CN.md b/databendwriter/doc/databendwriter-CN.md new file mode 100644 index 00000000..d6a8f1f3 --- /dev/null +++ b/databendwriter/doc/databendwriter-CN.md @@ -0,0 +1,171 @@ +# DataX DatabendWriter +[简体中文](./databendwriter-CN.md) | [English](./databendwriter.md) + +## 1 快速介绍 + +Databend Writer 是一个 DataX 的插件,用于从 DataX 中写入数据到 Databend 表中。 +该插件基于[databend JDBC driver](https://github.com/databendcloud/databend-jdbc) ,它使用 [RESTful http protocol](https://databend.rs/doc/integrations/api/rest) +在开源的 databend 和 [databend cloud](https://app.databend.com/) 上执行查询。 + +在每个写入批次中,databend writer 将批量数据上传到内部的 S3 stage,然后执行相应的 insert SQL 将数据上传到 databend 表中。 + +为了最佳的用户体验,如果您使用的是 databend 社区版本,您应该尝试采用 [S3](https://aws.amazon.com/s3/)/[minio](https://min.io/)/[OSS](https://www.alibabacloud.com/product/object-storage-service) 作为其底层存储层,因为 +它们支持预签名上传操作,否则您可能会在数据传输上浪费不必要的成本。 + +您可以在[文档](https://databend.rs/doc/deploy/deploying-databend)中了解更多详细信息 + +## 2 实现原理 + +Databend Writer 将使用 DataX 从 DataX Reader 中获取生成的记录,并将记录批量插入到 databend 表中指定的列中。 + +## 3 功能说明 + +### 3.1 配置样例 + +* 以下配置将从内存中读取一些生成的数据,并将数据上传到databend表中 + +#### 准备工作 +```sql +--- create table in databend +drop table if exists datax.sample1; +drop database if exists datax; +create database if not exists datax; +create table if not exsits datax.sample1(a string, b int64, c date, d timestamp, e bool, f string, g variant); +``` + +#### 配置样例 +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "column" : [ + { + "value": "DataX", + "type": "string" + }, + { + "value": 19880808, + "type": "long" + }, + { + "value": "1926-08-08 08:08:08", + "type": "date" + }, + { + "value": "1988-08-08 08:08:08", + "type": "date" + }, + { + "value": true, + "type": "bool" + }, + { + "value": "test", + "type": "bytes" + }, + { + "value": "{\"type\": \"variant\", \"value\": \"test\"}", + "type": "string" + } + + ], + "sliceRecordCount": 10000 + } + }, + "writer": { + "name": "databendwriter", + "parameter": { + "username": "databend", + "password": "databend", + "column": ["a", "b", "c", "d", "e", "f", "g"], + "batchSize": 1000, + "preSql": [ + ], + "postSql": [ + ], + "connection": [ + { + "jdbcUrl": "jdbc:databend://localhost:8000/datax", + "table": [ + "sample1" + ] + } + ] + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} +``` + +### 3.2 参数说明 +* jdbcUrl + * 描述: JDBC 数据源 url。请参阅仓库中的详细[文档](https://github.com/databendcloud/databend-jdbc) + * 必选: 是 + * 默认值: 无 + * 示例: jdbc:databend://localhost:8000/datax +* username + * 描述: JDBC 数据源用户名 + * 必选: 是 + * 默认值: 无 + * 示例: databend +* password + * 描述: JDBC 数据源密码 + * 必选: 是 + * 默认值: 无 + * 示例: databend +* table + * 描述: 表名的集合,table应该包含column参数中的所有列。 + * 必选: 是 + * 默认值: 无 + * 示例: ["sample1"] +* column + * 描述: 表中的列名集合,字段顺序应该与reader的record中的column类型对应 + * 必选: 是 + * 默认值: 无 + * 示例: ["a", "b", "c", "d", "e", "f", "g"] +* batchSize + * 描述: 每个批次的记录数 + * 必选: 否 + * 默认值: 1000 + * 示例: 1000 +* preSql + * 描述: 在写入数据之前执行的SQL语句 + * 必选: 否 + * 默认值: 无 + * 示例: ["delete from datax.sample1"] +* postSql + * 描述: 在写入数据之后执行的SQL语句 + * 必选: 否 + * 默认值: 无 + * 示例: ["select count(*) from datax.sample1"] + +### 3.3 类型转化 +DataX中的数据类型可以转换为databend中的相应数据类型。下表显示了两种类型之间的对应关系。 + +| DataX 内部类型 | Databend 数据类型 | +|------------|-----------------------------------------------------------| +| INT | TINYINT, INT8, SMALLINT, INT16, INT, INT32, BIGINT, INT64 | +| LONG | TINYINT, INT8, SMALLINT, INT16, INT, INT32, BIGINT, INT64 | +| STRING | STRING, VARCHAR | +| DOUBLE | FLOAT, DOUBLE | +| BOOL | BOOLEAN, BOOL | +| DATE | DATE, TIMESTAMP | +| BYTES | STRING, VARCHAR | + +## 4 性能测试 + +## 5 约束限制 +目前,复杂数据类型支持不稳定,如果您想使用复杂数据类型,例如元组,数组,请检查databend和jdbc驱动程序的进一步版本。 + +## FAQ \ No newline at end of file diff --git a/databendwriter/doc/databendwriter.md b/databendwriter/doc/databendwriter.md new file mode 100644 index 00000000..0b57bf13 --- /dev/null +++ b/databendwriter/doc/databendwriter.md @@ -0,0 +1,166 @@ +# DataX DatabendWriter +[简体中文](./databendwriter-CN.md) | [English](./databendwriter.md) + +## 1 Introduction +Databend Writer is a plugin for DataX to write data to Databend Table from dataX records. +The plugin is based on [databend JDBC driver](https://github.com/databendcloud/databend-jdbc) which use [RESTful http protocol](https://databend.rs/doc/integrations/api/rest) +to execute query on open source databend and [databend cloud](https://app.databend.com/). + +During each write batch, databend writer will upload batch data into internal S3 stage and execute corresponding insert SQL to upload data into databend table. + +For best user experience, if you are using databend community distribution, you should try to adopt [S3](https://aws.amazon.com/s3/)/[minio](https://min.io/)/[OSS](https://www.alibabacloud.com/product/object-storage-service) as its underlying storage layer since +they support presign upload operation otherwise you may expend unneeded cost on data transfer. + +You could see more details on the [doc](https://databend.rs/doc/deploy/deploying-databend) + +## 2 Detailed Implementation +Databend Writer would use DataX to fetch records generated by DataX Reader, and then batch insert records to the designated columns for your databend table. + +## 3 Features +### 3.1 Example Configurations +* the following configuration would read some generated data in memory and upload data into databend table + +#### Preparation +```sql +--- create table in databend +drop table if exists datax.sample1; +drop database if exists datax; +create database if not exists datax; +create table if not exsits datax.sample1(a string, b int64, c date, d timestamp, e bool, f string, g variant); +``` + +#### Configurations +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "column" : [ + { + "value": "DataX", + "type": "string" + }, + { + "value": 19880808, + "type": "long" + }, + { + "value": "1926-08-08 08:08:08", + "type": "date" + }, + { + "value": "1988-08-08 08:08:08", + "type": "date" + }, + { + "value": true, + "type": "bool" + }, + { + "value": "test", + "type": "bytes" + }, + { + "value": "{\"type\": \"variant\", \"value\": \"test\"}", + "type": "string" + } + + ], + "sliceRecordCount": 10000 + } + }, + "writer": { + "name": "databendwriter", + "parameter": { + "username": "databend", + "password": "databend", + "column": ["a", "b", "c", "d", "e", "f", "g"], + "batchSize": 1000, + "preSql": [ + ], + "postSql": [ + ], + "connection": [ + { + "jdbcUrl": "jdbc:databend://localhost:8000/datax", + "table": [ + "sample1" + ] + } + ] + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} +``` + +### 3.2 Configuration Description +* jdbcUrl + * Description: JDBC Data source url in Databend. Please take a look at repository for detailed [doc](https://github.com/databendcloud/databend-jdbc) + * Required: yes + * Default: none + * Example: jdbc:databend://localhost:8000/datax +* username + * Description: Databend user name + * Required: yes + * Default: none + * Example: databend +* password + * Description: Databend user password + * Required: yes + * Default: none + * Example: databend +* table + * Description: A list of table names that should contain all of the columns in the column parameter. + * Required: yes + * Default: none + * Example: ["sample1"] +* column + * Description: A list of column field names that should be inserted into the table. if you want to insert all column fields use `["*"]` instead. + * Required: yes + * Default: none + * Example: ["a", "b", "c", "d", "e", "f", "g"] +* batchSize + * Description: The number of records to be inserted in each batch. + * Required: no + * Default: 1024 +* preSql + * Description: A list of SQL statements that will be executed before the write operation. + * Required: no + * Default: none +* postSql + * Description: A list of SQL statements that will be executed after the write operation. + * Required: no + * Default: none + +### 3.3 Type Convert +Data types in datax can be converted to the corresponding data types in databend. The following table shows the correspondence between the two types. + +| DataX Type | Databend Type | +|------------|-----------------------------------------------------------| +| INT | TINYINT, INT8, SMALLINT, INT16, INT, INT32, BIGINT, INT64 | +| LONG | TINYINT, INT8, SMALLINT, INT16, INT, INT32, BIGINT, INT64 | +| STRING | STRING, VARCHAR | +| DOUBLE | FLOAT, DOUBLE | +| BOOL | BOOLEAN, BOOL | +| DATE | DATE, TIMESTAMP | +| BYTES | STRING, VARCHAR | + + +## 4 Performance Test + + +## 5 Restrictions +Currently, complex data type support is not stable, if you want to use complex data type such as tuple, array, please check further release version of databend and jdbc driver. + +## FAQ diff --git a/databendwriter/pom.xml b/databendwriter/pom.xml new file mode 100644 index 00000000..1c1ef309 --- /dev/null +++ b/databendwriter/pom.xml @@ -0,0 +1,101 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + + 4.0.0 + databendwriter + databendwriter + jar + + + + com.databend + databend-jdbc + 0.0.4 + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + org.slf4j + slf4j-api + + + + ch.qos.logback + logback-classic + + + + com.alibaba.datax + plugin-rdbms-util + ${datax-project-version} + + + com.google.guava + guava + + + + + + + junit + junit + test + + + + + + src/main/java + + **/*.properties + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + \ No newline at end of file diff --git a/databendwriter/src/main/assembly/package.xml b/databendwriter/src/main/assembly/package.xml new file mode 100755 index 00000000..8a9ba1b2 --- /dev/null +++ b/databendwriter/src/main/assembly/package.xml @@ -0,0 +1,34 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/databendwriter + + + target/ + + databendwriter-0.0.1-SNAPSHOT.jar + + plugin/writer/databendwriter + + + + + + false + plugin/writer/databendwriter/libs + + + diff --git a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriter.java b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriter.java new file mode 100644 index 00000000..a4222f08 --- /dev/null +++ b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriter.java @@ -0,0 +1,248 @@ +package com.alibaba.datax.plugin.writer.databendwriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.exception.CommonErrorCode; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; +import com.alibaba.datax.plugin.writer.databendwriter.util.DatabendWriterUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.util.List; +import java.util.regex.Pattern; + +public class DatabendWriter extends Writer +{ + private static final DataBaseType DATABASE_TYPE = DataBaseType.Databend; + + public static class Job + extends Writer.Job + { + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + private Configuration originalConfig; + private CommonRdbmsWriter.Job commonRdbmsWriterMaster; + + @Override + public void init() + { + this.originalConfig = super.getPluginJobConf(); + this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE); + this.commonRdbmsWriterMaster.init(this.originalConfig); + // placeholder currently not supported by databend driver, needs special treatment + DatabendWriterUtil.dealWriteMode(this.originalConfig); + } + + @Override + public void preCheck() + { + this.init(); + this.commonRdbmsWriterMaster.writerPreCheck(this.originalConfig, DATABASE_TYPE); + } + + @Override + public void prepare() { + this.commonRdbmsWriterMaster.prepare(this.originalConfig); + } + + @Override + public List split(int mandatoryNumber) { + return this.commonRdbmsWriterMaster.split(this.originalConfig, mandatoryNumber); + } + + @Override + public void post() { + this.commonRdbmsWriterMaster.post(this.originalConfig); + } + + @Override + public void destroy() { + this.commonRdbmsWriterMaster.destroy(this.originalConfig); + } + } + + + public static class Task extends Writer.Task + { + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + + private Configuration writerSliceConfig; + + private CommonRdbmsWriter.Task commonRdbmsWriterSlave; + + @Override + public void init() + { + this.writerSliceConfig = super.getPluginJobConf(); + + this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DataBaseType.Databend){ + @Override + protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqltype, String typeName, Column column) throws SQLException { + try { + if (column.getRawData() == null) { + preparedStatement.setNull(columnIndex + 1, columnSqltype); + return preparedStatement; + } + + java.util.Date utilDate; + switch (columnSqltype) { + + case Types.TINYINT: + case Types.SMALLINT: + case Types.INTEGER: + preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); + break; + case Types.BIGINT: + preparedStatement.setLong(columnIndex + 1, column.asLong()); + break; + case Types.DECIMAL: + preparedStatement.setBigDecimal(columnIndex + 1, column.asBigDecimal()); + break; + case Types.FLOAT: + case Types.REAL: + preparedStatement.setFloat(columnIndex + 1, column.asDouble().floatValue()); + break; + case Types.DOUBLE: + preparedStatement.setDouble(columnIndex + 1, column.asDouble()); + break; + case Types.DATE: + java.sql.Date sqlDate = null; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "Date type conversion error: [%s]", column)); + } + + if (null != utilDate) { + sqlDate = new java.sql.Date(utilDate.getTime()); + } + preparedStatement.setDate(columnIndex + 1, sqlDate); + break; + + case Types.TIME: + java.sql.Time sqlTime = null; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "Date type conversion error: [%s]", column)); + } + + if (null != utilDate) { + sqlTime = new java.sql.Time(utilDate.getTime()); + } + preparedStatement.setTime(columnIndex + 1, sqlTime); + break; + + case Types.TIMESTAMP: + Timestamp sqlTimestamp = null; + if (column instanceof StringColumn && column.asString() != null) { + String timeStampStr = column.asString(); + // JAVA TIMESTAMP 类型入参必须是 "2017-07-12 14:39:00.123566" 格式 + String pattern = "^\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+"; + boolean isMatch = Pattern.matches(pattern, timeStampStr); + if (isMatch) { + sqlTimestamp = Timestamp.valueOf(timeStampStr); + preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp); + break; + } + } + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "Date type conversion error: [%s]", column)); + } + + if (null != utilDate) { + sqlTimestamp = new Timestamp( + utilDate.getTime()); + } + preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp); + break; + + case Types.BINARY: + case Types.VARBINARY: + case Types.BLOB: + case Types.LONGVARBINARY: + preparedStatement.setBytes(columnIndex + 1, column + .asBytes()); + break; + + case Types.BOOLEAN: + + // warn: bit(1) -> Types.BIT 可使用setBoolean + // warn: bit(>1) -> Types.VARBINARY 可使用setBytes + case Types.BIT: + if (this.dataBaseType == DataBaseType.MySql) { + Boolean asBoolean = column.asBoolean(); + if (asBoolean != null) { + preparedStatement.setBoolean(columnIndex + 1, asBoolean); + } else { + preparedStatement.setNull(columnIndex + 1, Types.BIT); + } + } else { + preparedStatement.setString(columnIndex + 1, column.asString()); + } + break; + + default: + // cast variant / array into string is fine. + preparedStatement.setString(columnIndex + 1, column.asString()); + break; + } + return preparedStatement; + } catch (DataXException e) { + // fix类型转换或者溢出失败时,将具体哪一列打印出来 + if (e.getErrorCode() == CommonErrorCode.CONVERT_NOT_SUPPORT || + e.getErrorCode() == CommonErrorCode.CONVERT_OVER_FLOW) { + throw DataXException + .asDataXException( + e.getErrorCode(), + String.format( + "type conversion error. columnName: [%s], columnType:[%d], columnJavaType: [%s]. please change the data type in given column field or do not sync on the column.", + this.resultSetMetaData.getLeft() + .get(columnIndex), + this.resultSetMetaData.getMiddle() + .get(columnIndex), + this.resultSetMetaData.getRight() + .get(columnIndex))); + } else { + throw e; + } + } + } + + }; + this.commonRdbmsWriterSlave.init(this.writerSliceConfig); + } + + @Override + public void destroy() + { + this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig); + } + + @Override + public void prepare() { + this.commonRdbmsWriterSlave.prepare(this.writerSliceConfig); + } + + @Override + public void post() { + this.commonRdbmsWriterSlave.post(this.writerSliceConfig); + } + @Override + public void startWrite(RecordReceiver lineReceiver) + { + this.commonRdbmsWriterSlave.startWrite(lineReceiver, this.writerSliceConfig, this.getTaskPluginCollector()); + } + + } +} diff --git a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java new file mode 100644 index 00000000..a862e920 --- /dev/null +++ b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java @@ -0,0 +1,40 @@ +package com.alibaba.datax.plugin.writer.databendwriter.util; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.writer.Constant; +import com.alibaba.datax.plugin.rdbms.writer.Key; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.StringJoiner; + +public final class DatabendWriterUtil +{ + private static final Logger LOG = LoggerFactory.getLogger(DatabendWriterUtil.class); + + private DatabendWriterUtil() {} + public static void dealWriteMode(Configuration originalConfig) + { + List columns = originalConfig.getList(Key.COLUMN, String.class); + + String jdbcUrl = originalConfig.getString(String.format("%s[0].%s", + Constant.CONN_MARK, Key.JDBC_URL, String.class)); + + String writeMode = originalConfig.getString(Key.WRITE_MODE, "INSERT"); + + StringBuilder writeDataSqlTemplate = new StringBuilder(); + writeDataSqlTemplate.append("INSERT INTO %s"); + StringJoiner columnString = new StringJoiner(","); + + for (String column : columns) { + columnString.add(column); + } + writeDataSqlTemplate.append(String.format("(%s)", columnString)); + writeDataSqlTemplate.append(" VALUES"); + + LOG.info("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl); + + originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate); + } +} \ No newline at end of file diff --git a/databendwriter/src/main/resources/plugin.json b/databendwriter/src/main/resources/plugin.json new file mode 100644 index 00000000..bab0130d --- /dev/null +++ b/databendwriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "databendwriter", + "class": "com.alibaba.datax.plugin.writer.databendwriter.DatabendWriter", + "description": "execute batch insert sql to write dataX data into databend", + "developer": "databend" +} \ No newline at end of file diff --git a/databendwriter/src/main/resources/plugin_job_template.json b/databendwriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..34d4b251 --- /dev/null +++ b/databendwriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,19 @@ +{ + "name": "databendwriter", + "parameter": { + "username": "username", + "password": "password", + "column": ["col1", "col2", "col3"], + "connection": [ + { + "jdbcUrl": "jdbc:databend://:[/]", + "table": "table1" + } + ], + "preSql": [], + "postSql": [], + + "maxBatchRows": 65536, + "maxBatchSize": 134217728 + } +} \ No newline at end of file diff --git a/package.xml b/package.xml index 96b52c30..f602b96f 100755 --- a/package.xml +++ b/package.xml @@ -434,6 +434,13 @@ datax + + databendwriter/target/datax/ + + **/*.* + + datax + oscarwriter/target/datax/ diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java index 2e145d02..1b46a8bc 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java @@ -24,8 +24,8 @@ public enum DataBaseType { KingbaseES("kingbasees", "com.kingbase8.Driver"), Oscar("oscar", "com.oscar.Driver"), OceanBase("oceanbase", "com.alipay.oceanbase.jdbc.Driver"), - StarRocks("starrocks", "com.mysql.jdbc.Driver"); - + StarRocks("starrocks", "com.mysql.jdbc.Driver"), + Databend("databend", "com.databend.jdbc.DatabendDriver"); private String typeName; private String driverClassName; @@ -118,6 +118,8 @@ public enum DataBaseType { break; case RDBMS: break; + case Databend: + break; case KingbaseES: break; case Oscar: diff --git a/pom.xml b/pom.xml index 228e850e..45587263 100644 --- a/pom.xml +++ b/pom.xml @@ -84,6 +84,7 @@ mysqlwriter starrockswriter drdswriter + databendwriter oraclewriter sqlserverwriter postgresqlwriter