add clickhousereader & ob support 4.0

This commit is contained in:
penglin 2023-07-18 16:03:49 +08:00
parent 5028b2c478
commit 8eb19c8cda
21 changed files with 850 additions and 63 deletions

View File

@ -37,50 +37,48 @@ DataX本身作为数据同步框架将不同数据源的同步抽象为从源
DataX目前已经有了比较全面的插件体系主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入目前支持数据如下图详情请点击[DataX数据源参考指南](https://github.com/alibaba/DataX/wiki/DataX-all-data-channels) DataX目前已经有了比较全面的插件体系主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入目前支持数据如下图详情请点击[DataX数据源参考指南](https://github.com/alibaba/DataX/wiki/DataX-all-data-channels)
| 类型 | 数据源 | Reader(读) | Writer(写) | 文档 | | 类型 | 数据源 | Reader(读) | Writer(写) | 文档 |
| ------------------ | ------------------------------- | :--------: | :--------: | :----------------------------------------------------------: | |--------------|---------------------------|:---------:|:---------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:|
| RDBMS 关系型数据库 | MySQL | √ | √ | [](https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md) | | RDBMS 关系型数据库 | MySQL | √ | √ | [](https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md) |
| | Oracle | √ | √ | [](https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/oraclewriter/doc/oraclewriter.md) | | | Oracle | √ | √ | [](https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/oraclewriter/doc/oraclewriter.md) |
| | OceanBase | √ | √ | [](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) 、[写](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) | | | OceanBase | √ | √ | [](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) 、[写](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) |
| | SQLServer | √ | √ | [](https://github.com/alibaba/DataX/blob/master/sqlserverreader/doc/sqlserverreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/sqlserverwriter/doc/sqlserverwriter.md) | | | SQLServer | √ | √ | [](https://github.com/alibaba/DataX/blob/master/sqlserverreader/doc/sqlserverreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/sqlserverwriter/doc/sqlserverwriter.md) |
| | PostgreSQL | √ | √ | [](https://github.com/alibaba/DataX/blob/master/postgresqlreader/doc/postgresqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/postgresqlwriter/doc/postgresqlwriter.md) | | | PostgreSQL | √ | √ | [](https://github.com/alibaba/DataX/blob/master/postgresqlreader/doc/postgresqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/postgresqlwriter/doc/postgresqlwriter.md) |
| | DRDS | √ | √ | [](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) | | | DRDS | √ | √ | [](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) |
| | Kingbase | √ | √ | [](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) | | | Kingbase | √ | √ | [](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) |
| | 通用RDBMS(支持所有关系型数据库) | √ | √ | [](https://github.com/alibaba/DataX/blob/master/rdbmsreader/doc/rdbmsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/rdbmswriter/doc/rdbmswriter.md) | | | 通用RDBMS(支持所有关系型数据库) | √ | √ | [](https://github.com/alibaba/DataX/blob/master/rdbmsreader/doc/rdbmsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/rdbmswriter/doc/rdbmswriter.md) |
| 阿里云数仓数据存储 | ODPS | √ | √ | [](https://github.com/alibaba/DataX/blob/master/odpsreader/doc/odpsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/odpswriter/doc/odpswriter.md) | | 阿里云数仓数据存储 | ODPS | √ | √ | [](https://github.com/alibaba/DataX/blob/master/odpsreader/doc/odpsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/odpswriter/doc/odpswriter.md) |
| | ADB | | √ | [](https://github.com/alibaba/DataX/blob/master/adbmysqlwriter/doc/adbmysqlwriter.md) | | | ADB | | √ | [](https://github.com/alibaba/DataX/blob/master/adbmysqlwriter/doc/adbmysqlwriter.md) |
| | ADS | | √ | [](https://github.com/alibaba/DataX/blob/master/adswriter/doc/adswriter.md) | | | ADS | | √ | [](https://github.com/alibaba/DataX/blob/master/adswriter/doc/adswriter.md) |
| | OSS | √ | √ | [](https://github.com/alibaba/DataX/blob/master/ossreader/doc/ossreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/osswriter/doc/osswriter.md) | | | OSS | √ | √ | [](https://github.com/alibaba/DataX/blob/master/ossreader/doc/ossreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/osswriter/doc/osswriter.md) |
| | OCS | | √ | [](https://github.com/alibaba/DataX/blob/master/ocswriter/doc/ocswriter.md) | | | OCS | | √ | [](https://github.com/alibaba/DataX/blob/master/ocswriter/doc/ocswriter.md) |
| | Hologres | | √ | [](https://github.com/alibaba/DataX/blob/master/hologresjdbcwriter/doc/hologresjdbcwriter.md) | | | Hologres | | √ | [](https://github.com/alibaba/DataX/blob/master/hologresjdbcwriter/doc/hologresjdbcwriter.md) |
| | AnalyticDB For PostgreSQL | | √ | 写 | | | AnalyticDB For PostgreSQL | | √ | 写 |
| 阿里云中间件 | datahub | √ | √ | 读 、写 | | 阿里云中间件 | datahub | √ | √ | 读 、写 |
| | SLS | √ | √ | 读 、写 | | | SLS | √ | √ | 读 、写 |
| 图数据库 | 阿里云 GDB | √ | √ | [](https://github.com/alibaba/DataX/blob/master/gdbreader/doc/gdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/gdbwriter/doc/gdbwriter.md) | | 图数据库 | 阿里云 GDB | √ | √ | [](https://github.com/alibaba/DataX/blob/master/gdbreader/doc/gdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/gdbwriter/doc/gdbwriter.md) |
| | Neo4j | | √ | 写 | | | Neo4j | | √ | [](https://github.com/alibaba/DataX/blob/master/neo4jwriter/doc/neo4jwriter.md) |
| NoSQL数据存储 | OTS | √ | √ | [](https://github.com/alibaba/DataX/blob/master/otsreader/doc/otsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/otswriter/doc/otswriter.md) | | NoSQL数据存储 | OTS | √ | √ | [](https://github.com/alibaba/DataX/blob/master/otsreader/doc/otsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/otswriter/doc/otswriter.md) |
| | Hbase0.94 | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md) | | | Hbase0.94 | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md) |
| | Hbase1.1 | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md) | | | Hbase1.1 | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md) |
| | Phoenix4.x | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hbase11xsqlreader/doc/hbase11xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xsqlwriter/doc/hbase11xsqlwriter.md) | | | Phoenix4.x | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hbase11xsqlreader/doc/hbase11xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xsqlwriter/doc/hbase11xsqlwriter.md) |
| | Phoenix5.x | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hbase20xsqlreader/doc/hbase20xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase20xsqlwriter/doc/hbase20xsqlwriter.md) | | | Phoenix5.x | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hbase20xsqlreader/doc/hbase20xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase20xsqlwriter/doc/hbase20xsqlwriter.md) |
| | MongoDB | √ | √ | [](https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mongodbwriter/doc/mongodbwriter.md) | | | MongoDB | √ | √ | [](https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mongodbwriter/doc/mongodbwriter.md) |
| | Cassandra | √ | √ | [](https://github.com/alibaba/DataX/blob/master/cassandrareader/doc/cassandrareader.md) 、[写](https://github.com/alibaba/DataX/blob/master/cassandrawriter/doc/cassandrawriter.md) | | | Cassandra | √ | √ | [](https://github.com/alibaba/DataX/blob/master/cassandrareader/doc/cassandrareader.md) 、[写](https://github.com/alibaba/DataX/blob/master/cassandrawriter/doc/cassandrawriter.md) |
| 数仓数据存储 | StarRocks | √ | √ | 读 、[写](https://github.com/alibaba/DataX/blob/master/starrockswriter/doc/starrockswriter.md) | | 数仓数据存储 | StarRocks | √ | √ | 读 、[写](https://github.com/alibaba/DataX/blob/master/starrockswriter/doc/starrockswriter.md) |
| | ApacheDoris | | √ | [](https://github.com/alibaba/DataX/blob/master/doriswriter/doc/doriswriter.md) | | | ApacheDoris | | √ | [](https://github.com/alibaba/DataX/blob/master/doriswriter/doc/doriswriter.md) |
| | ClickHouse | | √ | 写 | | | ClickHouse | √ | √ | [](https://github.com/alibaba/DataX/blob/master/clickhousereader/doc/clickhousereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/clickhousewriter/doc/clickhousewriter.md) |
| | Databend | | √ | [](https://github.com/alibaba/DataX/blob/master/databendwriter/doc/databendwriter.md) | | | Databend | | √ | [](https://github.com/alibaba/DataX/blob/master/databendwriter/doc/databendwriter.md) |
| | Hive | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | | | Hive | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) |
| | kudu | | √ | [](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | | | kudu | | √ | [](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) |
| | selectdb | | √ | [](https://github.com/alibaba/DataX/blob/master/selectdbwriter/doc/selectdbwriter.md) | | | selectdb | | √ | [](https://github.com/alibaba/DataX/blob/master/selectdbwriter/doc/selectdbwriter.md) |
| 无结构化数据存储 | TxtFile | √ | √ | [](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md) | | 无结构化数据存储 | TxtFile | √ | √ | [](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md) |
| | FTP | √ | √ | [](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md) | | | FTP | √ | √ | [](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md) |
| | HDFS | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | | | HDFS | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) |
| | Elasticsearch | | √ | [](https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md) | | | Elasticsearch | | √ | [](https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md) |
| 时间序列数据库 | OpenTSDB | √ | | [](https://github.com/alibaba/DataX/blob/master/opentsdbreader/doc/opentsdbreader.md) | | 时间序列数据库 | OpenTSDB | √ | | [](https://github.com/alibaba/DataX/blob/master/opentsdbreader/doc/opentsdbreader.md) |
| | TSDB | √ | √ | [](https://github.com/alibaba/DataX/blob/master/tsdbreader/doc/tsdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/tsdbwriter/doc/tsdbhttpwriter.md) | | | TSDB | √ | √ | [](https://github.com/alibaba/DataX/blob/master/tsdbreader/doc/tsdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/tsdbwriter/doc/tsdbhttpwriter.md) |
| | TDengine | √ | √ | [](https://github.com/alibaba/DataX/blob/master/tdenginereader/doc/tdenginereader-CN.md) 、[写](https://github.com/alibaba/DataX/blob/master/tdenginewriter/doc/tdenginewriter-CN.md) | | | TDengine | √ | √ | [](https://github.com/alibaba/DataX/blob/master/tdenginereader/doc/tdenginereader-CN.md) 、[写](https://github.com/alibaba/DataX/blob/master/tdenginewriter/doc/tdenginewriter-CN.md) |
# 阿里云DataWorks数据集成 # 阿里云DataWorks数据集成
@ -97,11 +95,11 @@ DataX目前已经有了比较全面的插件体系主流的RDBMS数据库、N
- 新增比如DB2、Kafka、Hologres、MetaQ、SAPHANA、达梦等等持续扩充中 - 新增比如DB2、Kafka、Hologres、MetaQ、SAPHANA、达梦等等持续扩充中
- 离线同步支持的数据源https://help.aliyun.com/document_detail/137670.html - 离线同步支持的数据源https://help.aliyun.com/document_detail/137670.html
- 具备同步解决方案: - 具备同步解决方案:
- 解决方案系统https://help.aliyun.com/document_detail/171765.html - 解决方案系统https://help.aliyun.com/document_detail/171765.html
- 一键全增量https://help.aliyun.com/document_detail/175676.html - 一键全增量https://help.aliyun.com/document_detail/175676.html
- 整库迁移https://help.aliyun.com/document_detail/137809.html - 整库迁移https://help.aliyun.com/document_detail/137809.html
- 批量上云https://help.aliyun.com/document_detail/146671.html - 批量上云https://help.aliyun.com/document_detail/146671.html
- 更新更多能力请访问https://help.aliyun.com/document_detail/137663.html - 更新更多能力请访问https://help.aliyun.com/document_detail/137663.html
# 我要开发新的插件 # 我要开发新的插件
@ -122,10 +120,10 @@ DataX 后续计划月度迭代更新,也欢迎感兴趣的同学提交 Pull re
- 涉及通道能力更新OceanBase、Tdengine、Doris等 - 涉及通道能力更新OceanBase、Tdengine、Doris等
- [datax_v202209]https://github.com/alibaba/DataX/releases/tag/datax_v202209) - [datax_v202209]https://github.com/alibaba/DataX/releases/tag/datax_v202209)
- 涉及通道能力更新MaxCompute、Datahub、SLS等、安全漏洞更新、通用打包更新等 - 涉及通道能力更新MaxCompute、Datahub、SLS等、安全漏洞更新、通用打包更新等
- [datax_v202205]https://github.com/alibaba/DataX/releases/tag/datax_v202205) - [datax_v202205]https://github.com/alibaba/DataX/releases/tag/datax_v202205)
- 涉及通道能力更新MaxCompute、Hologres、OSS、Tdengine等、安全漏洞更新、通用打包更新等 - 涉及通道能力更新MaxCompute、Hologres、OSS、Tdengine等、安全漏洞更新、通用打包更新等
# 项目成员 # 项目成员

View File

@ -0,0 +1,344 @@
# ClickhouseReader 插件文档
___
## 1 快速介绍
ClickhouseReader插件实现了从Clickhouse读取数据。在底层实现上ClickhouseReader通过JDBC连接远程Clickhouse数据库并执行相应的sql语句将数据从Clickhouse库中SELECT出来。
## 2 实现原理
简而言之ClickhouseReader通过JDBC连接器连接到远程的Clickhouse数据库并根据用户配置的信息生成查询SELECT SQL语句并发送到远程Clickhouse数据库并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集并传递给下游Writer处理。
对于用户配置Table、Column、Where的信息ClickhouseReader将其拼接为SQL语句发送到Clickhouse数据库对于用户配置querySql信息Clickhouse直接将其发送到Clickhouse数据库。
## 3 功能说明
### 3.1 配置样例
* 配置一个从Clickhouse数据库同步抽取数据到本地的作业:
```
{
"job": {
"setting": {
"speed": {
//设置传输速度 byte/s 尽量逼近这个速度但是不高于它.
// channel 表示通道数量byte表示通道速度如果单通道速度1MB配置byte为1048576表示一个channel
"byte": 1048576
},
//出错限制
"errorLimit": {
//先选择record
"record": 0,
//百分比 1表示100%
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "clickhousereader",
"parameter": {
// 数据库连接用户名
"username": "root",
// 数据库连接密码
"password": "root",
"column": [
"id","name"
],
"connection": [
{
"table": [
"table"
],
"jdbcUrl": [
"jdbc:clickhouse://[HOST_NAME]:PORT/[DATABASE_NAME]"
]
}
]
}
},
"writer": {
//writer类型
"name": "streamwriter",
// 是否打印内容
"parameter": {
"print": true
}
}
}
]
}
}
```
* 配置一个自定义SQL的数据库同步任务到本地内容的作业
```
{
"job": {
"setting": {
"speed": {
"channel": 5
}
},
"content": [
{
"reader": {
"name": "clickhousereader",
"parameter": {
"username": "root",
"password": "root",
"where": "",
"connection": [
{
"querySql": [
"select db_id,on_line_flag from db_info where db_id < 10"
],
"jdbcUrl": [
"jdbc:clickhouse://1.1.1.1:8123/default"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"visible": false,
"encoding": "UTF-8"
}
}
}
]
}
}
```
### 3.2 参数说明
* **jdbcUrl**
* 描述描述的是到对端数据库的JDBC连接信息使用JSON的数组描述并支持一个库填写多个连接地址。之所以使用JSON数组描述连接信息是因为阿里集团内部支持多个IP探测如果配置了多个ClickhouseReader可以依次探测ip的可连接性直到选择一个合法的IP。如果全部连接失败ClickhouseReader报错。 注意jdbcUrl必须包含在connection配置单元中。对于阿里集团外部使用情况JSON数组填写一个JDBC连接即可。
jdbcUrl按照Clickhouse官方规范并可以填写连接附件控制信息。具体请参看[Clickhouse官方文档](https://clickhouse.com/docs/en/engines/table-engines/integrations/jdbc)。
* 必选:是 <br />
* 默认值:无 <br />
* **username**
* 描述:数据源的用户名 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **password**
* 描述:数据源指定用户名的密码 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **table**
* 描述所选取的需要同步的表。使用JSON的数组描述因此支持多张表同时抽取。当配置为多张表时用户自己需保证多张表是同一schema结构ClickhouseReader不予检查表是否同一逻辑表。注意table必须包含在connection配置单元中。<br />
* 必选:是 <br />
* 默认值:无 <br />
* **column**
* 描述所配置的表中需要同步的列名集合使用JSON的数组描述字段信息。用户使用\*代表默认使用所有列配置,例如['\*']。
支持列裁剪,即列可以挑选部分列进行导出。
支持列换序即列可以不按照表schema信息进行导出。
支持常量配置用户需要按照JSON格式:
["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"]
id为普通列名\`table\`为包含保留在的列名1为整形数字常量'bazhen.csy'为字符串常量null为空指针to_char(a + 1)为表达式2.3为浮点数true为布尔值。
Column必须显示填写不允许为空
* 必选:是 <br />
* 默认值:无 <br />
* **splitPk**
* 描述ClickhouseReader进行数据抽取时如果指定splitPk表示用户希望使用splitPk代表的字段进行数据分片DataX因此会启动并发任务进行数据同步这样可以大大提供数据同步的效能。
推荐splitPk用户使用表主键因为表主键通常情况下比较均匀因此切分出来的分片也不容易出现数据热点。
目前splitPk仅支持整形数据切分`不支持浮点、日期等其他类型`。如果用户指定其他非支持类型ClickhouseReader将报错
splitPk如果不填写将视作用户不对单表进行切分ClickhouseReader使用单通道同步全量数据。
* 必选:否 <br />
* 默认值:无 <br />
* **where**
* 描述筛选条件MysqlReader根据指定的column、table、where条件拼接SQL并根据这个SQL进行数据抽取。在实际业务场景中往往会选择当天的数据进行同步可以将where条件指定为gmt_create > $bizdate 。注意不可以将where条件指定为limit 10limit不是SQL的合法where子句。<br />
where条件可以有效地进行业务增量同步。
* 必选:否 <br />
* 默认值:无 <br />
* **querySql**
* 描述在有些业务场景下where这一配置项不足以描述所筛选的条件用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后DataX系统就会忽略tablecolumn这些配置型直接使用这个配置项的内容对数据进行筛选例如需要进行多表join后同步数据使用select a,b from table_a join table_b on table_a.id = table_b.id <br />
`当用户配置querySql时ClickhouseReader直接忽略table、column、where条件的配置`
* 必选:否 <br />
* 默认值:无 <br />
* **fetchSize**
* 描述该配置项定义了插件和数据库服务器端每次批量数据获取条数该值决定了DataX和服务器端的网络交互次数能够较大的提升数据抽取性能。<br />
`注意,该值过大(>2048)可能造成DataX进程OOM。`
* 必选:否 <br />
* 默认值1024 <br />
* **session**
* 描述:控制写入数据的时间格式,时区等的配置,如果表中有时间字段,配置该值以明确告知写入 clickhouse 的时间格式。通常配置的参数为NLS_DATE_FORMAT,NLS_TIME_FORMAT。其配置的值为 json 格式,例如:
```
"session": [
"alter session set NLS_DATE_FORMAT='yyyy-mm-dd hh24:mi:ss'",
"alter session set NLS_TIMESTAMP_FORMAT='yyyy-mm-dd hh24:mi:ss'",
"alter session set NLS_TIMESTAMP_TZ_FORMAT='yyyy-mm-dd hh24:mi:ss'",
"alter session set TIME_ZONE='US/Pacific'"
]
```
`(注意&quot;是 " 的转义字符串)`
* 必选:否 <br />
* 默认值:无 <br />
### 3.3 类型转换
目前ClickhouseReader支持大部分Clickhouse类型但也存在部分个别类型没有支持的情况请注意检查你的类型。
下面列出ClickhouseReader针对Clickhouse类型转换列表:
| DataX 内部类型| Clickhouse 数据类型 |
| -------- |--------------------------------------------------------------------------------------------|
| Long | UInt8, UInt16, UInt32, UInt64, UInt128, UInt256, Int8, Int16, Int32, Int64, Int128, Int256 |
| Double | Float32, Float64, Decimal |
| String | String, FixedString |
| Date | DATE, Date32, DateTime, DateTime64 |
| Boolean | Boolean |
| Bytes | BLOB,BFILE,RAW,LONG RAW |
请注意:
* `除上述罗列字段类型外,其他类型均不支持`
## 4 性能报告
### 4.1 环境准备
#### 4.1.1 数据特征
为了模拟线上真实数据我们设计两个Clickhouse数据表分别为:
#### 4.1.2 机器参数
* 执行DataX的机器参数为:
* Clickhouse数据库机器参数为:
### 4.2 测试报告
#### 4.2.1 表1测试报告
| 并发任务数| DataX速度(Rec/s)|DataX流量|网卡流量|DataX运行负载|DB运行负载|
|--------| --------|--------|--------|--------|--------|
|1| DataX 统计速度(Rec/s)|DataX统计流量|网卡流量|DataX运行负载|DB运行负载|
## 5 约束限制
### 5.1 主备同步数据恢复问题
主备同步问题指Clickhouse使用主从灾备备库从主库不间断通过binlog恢复数据。由于主备数据同步存在一定的时间差特别在于某些特定情况例如网络延迟等问题导致备库同步恢复的数据与主库有较大差别导致从备库同步的数据不是一份当前时间的完整镜像。
针对这个问题我们提供了preSql功能该功能待补充。
### 5.2 一致性约束
Clickhouse在数据存储划分中属于RDBMS系统对外可以提供强一致性数据查询接口。例如当一次同步任务启动运行过程中当该库存在其他数据写入方写入数据时ClickhouseReader完全不会获取到写入更新数据这是由于数据库本身的快照特性决定的。关于数据库快照特性请参看[MVCC Wikipedia](https://en.wikipedia.org/wiki/Multiversion_concurrency_control)
上述是在ClickhouseReader单线程模型下数据同步一致性的特性由于ClickhouseReader可以根据用户配置信息使用了并发数据抽取因此不能严格保证数据一致性当ClickhouseReader根据splitPk进行数据切分后会先后启动多个并发任务完成数据同步。由于多个并发任务相互之间不属于同一个读事务同时多个并发任务存在时间间隔。因此这份数据并不是`完整的`、`一致的`数据快照信息。
针对多线程的一致性快照需求,在技术上目前无法实现,只能从工程角度解决,工程化的方式存在取舍,我们提供几个解决思路给用户,用户可以自行选择:
1. 使用单线程同步,即不再进行数据切片。缺点是速度比较慢,但是能够很好保证一致性。
2. 关闭其他数据写入方,保证当前数据为静态数据,例如,锁表、关闭备库同步等等。缺点是可能影响在线业务。
### 5.3 数据库编码问题
ClickhouseReader底层使用JDBC进行数据抽取JDBC天然适配各类编码并在底层进行了编码转换。因此ClickhouseReader不需用户指定编码可以自动获取编码并转码。
对于Clickhouse底层写入编码和其设定的编码不一致的混乱情况ClickhouseReader对此无法识别对此也无法提供解决方案对于这类情况`导出有可能为乱码`。
### 5.4 增量数据同步
ClickhouseReader使用JDBC SELECT语句完成数据抽取工作因此可以使用SELECT...WHERE...进行增量数据抽取,方式有多种:
* 数据库在线应用写入数据库时填充modify字段为更改时间戳包括新增、更新、删除(逻辑删)。对于这类应用ClickhouseReader只需要WHERE条件跟上一同步阶段时间戳即可。
* 对于新增流水型数据ClickhouseReader可以WHERE条件后跟上一阶段最大自增ID即可。
对于业务上无字段区分新增、修改数据情况ClickhouseReader也无法进行增量数据同步只能同步全量数据。
### 5.5 Sql安全性
ClickhouseReader提供querySql语句交给用户自己实现SELECT抽取语句ClickhouseReader本身对querySql不做任何安全性校验。这块交由DataX用户方自己保证。
## 6 FAQ
***
**Q: ClickhouseReader同步报错报错信息为XXX**
A: 网络或者权限问题请使用Clickhouse命令行测试
如果上述命令也报错那可以证实是环境问题请联系你的DBA。
**Q: ClickhouseReader抽取速度很慢怎么办**
A: 影响抽取时间的原因大概有如下几个:(来自专业 DBA 卫绾)
1. 由于SQL的plan异常导致的抽取时间长 在抽取时,尽可能使用全表扫描代替索引扫描;
2. 合理sql的并发度减少抽取时间
3. 抽取sql要简单尽量不用replace等函数这个非常消耗cpu会严重影响抽取速度;

91
clickhousereader/pom.xml Normal file
View File

@ -0,0 +1,91 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>clickhousereader</artifactId>
<name>clickhousereader</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>${datax-project-version}</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,35 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id>datax</id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/reader/clickhousereader</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>clickhousereader-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/reader/clickhousereader</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/reader/clickhousereader/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,87 @@
package com.alibaba.datax.plugin.reader.clickhousereader;
import java.sql.Array;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.MessageSource;
import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.fastjson2.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClickhouseReader extends Reader {
private static final DataBaseType DATABASE_TYPE = DataBaseType.ClickHouse;
private static final Logger LOG = LoggerFactory.getLogger(ClickhouseReader.class);
public static class Job extends Reader.Job {
private static MessageSource MESSAGE_SOURCE = MessageSource.loadResourceBundle(ClickhouseReader.class);
private Configuration jobConfig = null;
private CommonRdbmsReader.Job commonRdbmsReaderMaster;
@Override
public void init() {
this.jobConfig = super.getPluginJobConf();
this.commonRdbmsReaderMaster = new CommonRdbmsReader.Job(DATABASE_TYPE);
this.commonRdbmsReaderMaster.init(this.jobConfig);
}
@Override
public List<Configuration> split(int mandatoryNumber) {
return this.commonRdbmsReaderMaster.split(this.jobConfig, mandatoryNumber);
}
@Override
public void post() {
this.commonRdbmsReaderMaster.post(this.jobConfig);
}
@Override
public void destroy() {
this.commonRdbmsReaderMaster.destroy(this.jobConfig);
}
}
public static class Task extends Reader.Task {
private Configuration jobConfig;
private CommonRdbmsReader.Task commonRdbmsReaderSlave;
@Override
public void init() {
this.jobConfig = super.getPluginJobConf();
this.commonRdbmsReaderSlave = new CommonRdbmsReader.Task(DATABASE_TYPE, super.getTaskGroupId(), super.getTaskId());
this.commonRdbmsReaderSlave.init(this.jobConfig);
}
@Override
public void startRead(RecordSender recordSender) {
int fetchSize = this.jobConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE, 1000);
this.commonRdbmsReaderSlave.startRead(this.jobConfig, recordSender, super.getTaskPluginCollector(), fetchSize);
}
@Override
public void post() {
this.commonRdbmsReaderSlave.post(this.jobConfig);
}
@Override
public void destroy() {
this.commonRdbmsReaderSlave.destroy(this.jobConfig);
}
}
}

View File

@ -0,0 +1,6 @@
{
"name": "clickhousereader",
"class": "com.alibaba.datax.plugin.reader.clickhousereader.ClickhouseReader",
"description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql.",
"developer": "alibaba"
}

View File

@ -0,0 +1,16 @@
{
"name": "clickhousereader",
"parameter": {
"username": "username",
"password": "password",
"column": ["col1", "col2", "col3"],
"connection": [
{
"jdbcUrl": "jdbc:clickhouse://<host>:<port>[/<database>]",
"table": ["table1", "table2"]
}
],
"preSql": [],
"postSql": []
}
}

View File

@ -0,0 +1,74 @@
package com.alibaba.datax.plugin.reader.clickhousereader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.dataxservice.face.eventcenter.EventLogStore;
import com.alibaba.datax.dataxservice.face.eventcenter.RuntimeContext;
import com.alibaba.datax.test.simulator.BasicReaderPluginTest;
import com.alibaba.datax.test.simulator.junit.extend.log.LoggedRunner;
import com.alibaba.datax.test.simulator.junit.extend.log.TestLogger;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.ArrayUtils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(LoggedRunner.class)
@Ignore
public class ClickhouseReaderTest extends BasicReaderPluginTest {
@TestLogger(log = "测试basic1.json. 配置常量.")
@Test
public void testBasic1() {
RuntimeContext.setGlobalJobId(-1);
EventLogStore.init();
List<Record> noteRecordForTest = new ArrayList<Record>();
List<Configuration> subjobs = super.doReaderTest("basic1.json", 1, noteRecordForTest);
Assert.assertEquals(1, subjobs.size());
Assert.assertEquals(1, noteRecordForTest.size());
Assert.assertEquals("[8,16,32,64,-8,-16,-32,-64,\"3.2\",\"6.4\",1,\"str_col\",\"abc\"," + "\"417ddc5d-e556-4d27-95dd-a34d84e46a50\",1580745600000,1580752800000,\"hello\",\"[1,2,3]\"," + "\"[\\\"abc\\\",\\\"cde\\\"]\",\"(8,'uint8_type')\",null,\"[1,2]\",\"[\\\"x\\\",\\\"y\\\"]\",\"127.0.0.1\",\"::\",\"23.345\"]", JSON.toJSONString(listData(noteRecordForTest.get(0))));
}
@Override
protected OutputStream buildDataOutput(String optionalOutputName) {
File f = new File(optionalOutputName + "-output.txt");
try {
return new FileOutputStream(f);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
return null;
}
@Override
public String getTestPluginName() {
return "clickhousereader";
}
private Object[] listData(Record record) {
if (null == record) {
return ArrayUtils.EMPTY_OBJECT_ARRAY;
}
Object[] arr = new Object[record.getColumnNumber()];
for (int i = 0; i < arr.length; i++) {
Column col = record.getColumn(i);
if (null != col) {
arr[i] = col.getRawData();
}
}
return arr;
}
}

View File

@ -0,0 +1,57 @@
{
"job": {
"setting": {
"speed": {
"channel": 5
}
},
"content": [
{
"reader": {
"name": "clickhousereader",
"parameter": {
"username": "XXXX",
"password": "XXXX",
"column": [
"uint8_col",
"uint16_col",
"uint32_col",
"uint64_col",
"int8_col",
"int16_col",
"int32_col",
"int64_col",
"float32_col",
"float64_col",
"bool_col",
"str_col",
"fixedstr_col",
"uuid_col",
"date_col",
"datetime_col",
"enum_col",
"ary_uint8_col",
"ary_str_col",
"tuple_col",
"nullable_col",
"nested_col.nested_id",
"nested_col.nested_str",
"ipv4_col",
"ipv6_col",
"decimal_col"
],
"connection": [
{
"table": [
"all_type_tbl"
],
"jdbcUrl":["jdbc:clickhouse://XXXX:8123/default"]
}
]
}
},
"writer": {}
}
]
}
}

View File

@ -0,0 +1,34 @@
CREATE TABLE IF NOT EXISTS default.all_type_tbl
(
`uint8_col` UInt8,
`uint16_col` UInt16,
uint32_col UInt32,
uint64_col UInt64,
int8_col Int8,
int16_col Int16,
int32_col Int32,
int64_col Int64,
float32_col Float32,
float64_col Float64,
bool_col UInt8,
str_col String,
fixedstr_col FixedString(3),
uuid_col UUID,
date_col Date,
datetime_col DateTime,
enum_col Enum('hello' = 1, 'world' = 2),
ary_uint8_col Array(UInt8),
ary_str_col Array(String),
tuple_col Tuple(UInt8, String),
nullable_col Nullable(UInt8),
nested_col Nested
(
nested_id UInt32,
nested_str String
),
ipv4_col IPv4,
ipv6_col IPv6,
decimal_col Decimal(5,3)
)
ENGINE = MergeTree()
ORDER BY (uint8_col);

View File

@ -1,6 +1,7 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.util; package com.alibaba.datax.plugin.reader.oceanbasev10reader.util;
import com.alibaba.datax.common.element.*; import com.alibaba.datax.common.element.*;
import com.alibaba.datax.plugin.rdbms.reader.util.ObVersion;
import com.alibaba.datax.plugin.rdbms.reader.util.SingleTableSplitUtil; import com.alibaba.datax.plugin.rdbms.reader.util.SingleTableSplitUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.datax.plugin.rdbms.util.DataBaseType;

View File

@ -3,6 +3,7 @@ package com.alibaba.datax.plugin.reader.oceanbasev10reader.util;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.reader.Constant; import com.alibaba.datax.plugin.rdbms.reader.Constant;
import com.alibaba.datax.plugin.rdbms.reader.Key; import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.reader.util.ObVersion;
import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.ObReaderKey; import com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.ObReaderKey;

View File

@ -86,6 +86,7 @@ public class OceanBaseV10Writer extends Writer {
if (tableNumber == 1) { if (tableNumber == 1) {
this.commonJob.prepare(this.originalConfig); this.commonJob.prepare(this.originalConfig);
final String version = fetchServerVersion(originalConfig); final String version = fetchServerVersion(originalConfig);
ObWriterUtils.setObVersion(version);
originalConfig.set(Config.OB_VERSION, version); originalConfig.set(Config.OB_VERSION, version);
} }
@ -187,8 +188,9 @@ public class OceanBaseV10Writer extends Writer {
} }
private String fetchServerVersion(Configuration config) { private String fetchServerVersion(Configuration config) {
final String fetchVersionSql = "show variables like 'version'"; final String fetchVersionSql = "show variables like 'version_comment'";
return DbUtils.fetchSingleValueWithRetry(config, fetchVersionSql); String versionComment = DbUtils.fetchSingleValueWithRetry(config, fetchVersionSql);
return versionComment.split(" ")[1];
} }
private void checkCompatibleMode(Configuration configure) { private void checkCompatibleMode(Configuration configure) {

View File

@ -3,18 +3,17 @@ package com.alibaba.datax.plugin.writer.oceanbasev10writer.util;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import com.alibaba.datax.plugin.rdbms.writer.Constant; import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.rdbms.writer.Key; import com.alibaba.datax.plugin.rdbms.writer.Key;
import org.slf4j.Logger; import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config;
import org.slf4j.LoggerFactory;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DbUtils { public class DbUtils {
@ -25,7 +24,7 @@ public class DbUtils {
final String password = config.getString(Key.PASSWORD); final String password = config.getString(Key.PASSWORD);
String jdbcUrl = config.getString(Key.JDBC_URL); String jdbcUrl = config.getString(Key.JDBC_URL);
if(jdbcUrl == null) { if (jdbcUrl == null) {
List<Object> conns = config.getList(Constant.CONN_MARK, Object.class); List<Object> conns = config.getList(Constant.CONN_MARK, Object.class);
Configuration connConf = Configuration.from(conns.get(0).toString()); Configuration connConf = Configuration.from(conns.get(0).toString());
jdbcUrl = connConf.getString(Key.JDBC_URL); jdbcUrl = connConf.getString(Key.JDBC_URL);
@ -34,9 +33,9 @@ public class DbUtils {
Connection conn = null; Connection conn = null;
PreparedStatement stmt = null; PreparedStatement stmt = null;
ResultSet result = null; ResultSet result = null;
boolean need_retry = false;
String value = null; String value = null;
int retry = 0; int retry = 0;
int failTryCount = config.getInt(Config.FAIL_TRY_COUNT, Config.DEFAULT_FAIL_TRY_COUNT);
do { do {
try { try {
if (retry > 0) { if (retry > 0) {
@ -58,13 +57,12 @@ public class DbUtils {
LOG.info("value for query [{}] is [{}]", query, value); LOG.info("value for query [{}] is [{}]", query, value);
break; break;
} catch (SQLException e) { } catch (SQLException e) {
need_retry = true;
++retry; ++retry;
LOG.warn("fetch value with {} error {}", query, e); LOG.warn("fetch value with {} error {}", query, e);
} finally { } finally {
DBUtil.closeDBResources(result, stmt, conn); DBUtil.closeDBResources(result, stmt, conn);
} }
} while (need_retry); } while (retry < failTryCount);
return value; return value;
} }

View File

@ -1,5 +1,6 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.util; package com.alibaba.datax.plugin.writer.oceanbasev10writer.util;
import com.alibaba.datax.plugin.rdbms.reader.util.ObVersion;
import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task; import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config; import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config;
@ -18,8 +19,11 @@ public class ObWriterUtils {
private static final String ORACLE_KEYWORDS = "ACCESS,ADD,ALL,ALTER,AND,ANY,ARRAYLEN,AS,ASC,AUDIT,BETWEEN,BY,CHAR,CHECK,CLUSTER,COLUMN,COMMENT,COMPRESS,CONNECT,CREATE,CURRENT,DATE,DECIMAL,DEFAULT,DELETE,DESC,DISTINCT,DROP,ELSE,EXCLUSIVE,EXISTS,FILE,FLOAT,FOR,FROM,GRANT,GROUP,HAVING,IDENTIFIED,IMMEDIATE,IN,INCREMENT,INDEX,INITIAL,INSERT,INTEGER,INTERSECT,INTO,IS,LEVEL,LIKE,LOCK,LONG,MAXEXTENTS,MINUS,MODE,MODIFY,NOAUDIT,NOCOMPRESS,NOT,NOTFOUND,NOWAIT,NULL,NUMBER,OF,OFFLINE,ON,ONLINE,OPTION,OR,ORDER,PCTFREE,PRIOR,PRIVILEGES,PUBLIC,RAW,RENAME,RESOURCE,REVOKE,ROW,ROWID,ROWLABEL,ROWNUM,ROWS,SELECT,SESSION,SET,SHARE,SIZE,SMALLINT,SQLBUF,START,SUCCESSFUL,SYNONYM,TABLE,THEN,TO,TRIGGER,UID,UNION,UNIQUE,UPDATE,USER,VALIDATE,VALUES,VARCHAR,VARCHAR2,VIEW,WHENEVER,WHERE,WITH"; private static final String ORACLE_KEYWORDS = "ACCESS,ADD,ALL,ALTER,AND,ANY,ARRAYLEN,AS,ASC,AUDIT,BETWEEN,BY,CHAR,CHECK,CLUSTER,COLUMN,COMMENT,COMPRESS,CONNECT,CREATE,CURRENT,DATE,DECIMAL,DEFAULT,DELETE,DESC,DISTINCT,DROP,ELSE,EXCLUSIVE,EXISTS,FILE,FLOAT,FOR,FROM,GRANT,GROUP,HAVING,IDENTIFIED,IMMEDIATE,IN,INCREMENT,INDEX,INITIAL,INSERT,INTEGER,INTERSECT,INTO,IS,LEVEL,LIKE,LOCK,LONG,MAXEXTENTS,MINUS,MODE,MODIFY,NOAUDIT,NOCOMPRESS,NOT,NOTFOUND,NOWAIT,NULL,NUMBER,OF,OFFLINE,ON,ONLINE,OPTION,OR,ORDER,PCTFREE,PRIOR,PRIVILEGES,PUBLIC,RAW,RENAME,RESOURCE,REVOKE,ROW,ROWID,ROWLABEL,ROWNUM,ROWS,SELECT,SESSION,SET,SHARE,SIZE,SMALLINT,SQLBUF,START,SUCCESSFUL,SYNONYM,TABLE,THEN,TO,TRIGGER,UID,UNION,UNIQUE,UPDATE,USER,VALIDATE,VALUES,VARCHAR,VARCHAR2,VIEW,WHENEVER,WHERE,WITH";
private static String CHECK_MEMSTORE = "select 1 from %s.gv$memstore t where t.total>t.mem_limit * ?"; private static String CHECK_MEMSTORE = "select 1 from %s.gv$memstore t where t.total>t.mem_limit * ?";
private static final String CHECK_MEMSTORE_4_0 = "select 1 from %s.gv$ob_memstore t where t.MEMSTORE_USED>t.MEMSTORE_LIMIT * ?";
private static Set<String> databaseKeywords; private static Set<String> databaseKeywords;
private static String compatibleMode = null; private static String compatibleMode = null;
private static String obVersion = null;
protected static final Logger LOG = LoggerFactory.getLogger(Task.class); protected static final Logger LOG = LoggerFactory.getLogger(Task.class);
private static Set<String> keywordsFromString2HashSet(final String keywords) { private static Set<String> keywordsFromString2HashSet(final String keywords) {
return new HashSet(Arrays.asList(keywords.split(","))); return new HashSet(Arrays.asList(keywords.split(",")));
@ -61,7 +65,7 @@ public class ObWriterUtils {
if (isOracleMode()) { if (isOracleMode()) {
sysDbName = "sys"; sysDbName = "sys";
} }
ps = conn.prepareStatement(String.format(CHECK_MEMSTORE, sysDbName)); ps = conn.prepareStatement(String.format(getMemStoreSql(), sysDbName));
ps.setDouble(1, memstoreThreshold); ps.setDouble(1, memstoreThreshold);
rs = ps.executeQuery(); rs = ps.executeQuery();
// 只要有满足条件的,则表示当前租户 有个机器的memstore即将满 // 只要有满足条件的,则表示当前租户 有个机器的memstore即将满
@ -81,6 +85,14 @@ public class ObWriterUtils {
return (compatibleMode.equals(Config.OB_COMPATIBLE_MODE_ORACLE)); return (compatibleMode.equals(Config.OB_COMPATIBLE_MODE_ORACLE));
} }
private static String getMemStoreSql() {
if (ObVersion.valueOf(obVersion).compareTo(ObVersion.V4000) >= 0) {
return CHECK_MEMSTORE_4_0;
} else {
return CHECK_MEMSTORE;
}
}
public static String getCompatibleMode() { public static String getCompatibleMode() {
return compatibleMode; return compatibleMode;
} }
@ -89,6 +101,10 @@ public class ObWriterUtils {
compatibleMode = mode; compatibleMode = mode;
} }
public static void setObVersion(String version) {
obVersion = version;
}
private static String buildDeleteSql (String tableName, List<String> columns) { private static String buildDeleteSql (String tableName, List<String> columns) {
StringBuilder builder = new StringBuilder("DELETE FROM "); StringBuilder builder = new StringBuilder("DELETE FROM ");
builder.append(tableName).append(" WHERE "); builder.append(tableName).append(" WHERE ");

7
package.xml Executable file → Normal file
View File

@ -145,6 +145,13 @@
</includes> </includes>
<outputDirectory>datax</outputDirectory> <outputDirectory>datax</outputDirectory>
</fileSet> </fileSet>
<fileSet>
<directory>clickhousereader/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet> <fileSet>
<directory>hdfsreader/target/datax/</directory> <directory>hdfsreader/target/datax/</directory>
<includes> <includes>

View File

@ -1,4 +1,4 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.util; package com.alibaba.datax.plugin.rdbms.reader.util;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;

View File

@ -7,6 +7,7 @@ import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.util.*; import com.alibaba.datax.plugin.rdbms.util.*;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import java.text.MessageFormat;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
@ -20,6 +21,7 @@ import java.sql.ResultSetMetaData;
import java.sql.Types; import java.sql.Types;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import static org.apache.commons.lang3.StringUtils.EMPTY;
public class SingleTableSplitUtil { public class SingleTableSplitUtil {
private static final Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory
@ -277,7 +279,24 @@ public class SingleTableSplitUtil {
String splitPK = configuration.getString(Key.SPLIT_PK).trim(); String splitPK = configuration.getString(Key.SPLIT_PK).trim();
String table = configuration.getString(Key.TABLE).trim(); String table = configuration.getString(Key.TABLE).trim();
String where = configuration.getString(Key.WHERE, null); String where = configuration.getString(Key.WHERE, null);
return genPKSql(splitPK,table,where); String obMode = configuration.getString("obCompatibilityMode");
// OceanBase对SELECT MIN(%s),MAX(%s) FROM %s这条sql没有做查询改写会进行全表扫描在数据量的时候查询耗时很大甚至超时
// 所以对于OceanBase数据库查询模板需要改写为分别查询最大值和最小值这样可以提升查询数量级的性能
if (DATABASE_TYPE == DataBaseType.OceanBase && StringUtils.isNotEmpty(obMode)) {
boolean isOracleMode = "ORACLE".equalsIgnoreCase(obMode);
String minMaxTemplate = isOracleMode ? "select v2.id as min_a, v1.id as max_a from ("
+ "select * from (select %s as id from %s {0} order by id desc) where rownum =1 ) v1,"
+ "(select * from (select %s as id from %s order by id asc) where rownum =1 ) v2;" :
"select v2.id as min_a, v1.id as max_a from (select %s as id from %s {0} order by id desc limit 1) v1,"
+ "(select %s as id from %s order by id asc limit 1) v2;";
String pkRangeSQL = String.format(minMaxTemplate, splitPK, table, splitPK, table);
String whereString = StringUtils.isNotBlank(where) ? String.format("WHERE (%s AND %s IS NOT NULL)", where, splitPK) : EMPTY;
pkRangeSQL = MessageFormat.format(pkRangeSQL, whereString);
return pkRangeSQL;
}
return genPKSql(splitPK, table, where);
} }
public static String genPKSql(String splitPK, String table, String where){ public static String genPKSql(String splitPK, String table, String where){

View File

@ -70,6 +70,7 @@
<module>ftpreader</module> <module>ftpreader</module>
<module>txtfilereader</module> <module>txtfilereader</module>
<module>streamreader</module> <module>streamreader</module>
<module>clickhousereader</module>
<module>mongodbreader</module> <module>mongodbreader</module>
<module>tdenginereader</module> <module>tdenginereader</module>