Merge branch 'alibaba:master' into mysqlwriter_doc_update

This commit is contained in:
up_upup 2023-03-23 15:11:09 +08:00 committed by GitHub
commit cbb81962f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
148 changed files with 3683 additions and 579 deletions

View File

@ -1,9 +1,10 @@
![Datax-logo](https://github.com/alibaba/DataX/blob/master/images/DataX-logo.jpg)
# DataX
DataX 是阿里云 [DataWorks数据集成](https://www.aliyun.com/product/bigdata/ide) 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。
[![Leaderboard](https://img.shields.io/badge/DataX-%E6%9F%A5%E7%9C%8B%E8%B4%A1%E7%8C%AE%E6%8E%92%E8%A1%8C%E6%A6%9C-orange)](https://opensource.alibaba.com/contribution_leaderboard/details?projectValue=datax)
DataX 是阿里云 [DataWorks数据集成](https://www.aliyun.com/product/bigdata/ide) 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。
# DataX 商业版本
阿里云DataWorks数据集成是DataX团队在阿里云上的商业化产品致力于提供复杂网络环境下、丰富的异构数据源之间高速稳定的数据移动能力以及繁杂业务背景下的数据同步解决方案。目前已经支持云上近3000家客户单日同步数据超过3万亿条。DataWorks数据集成目前支持离线50+种数据源可以进行整库迁移、批量上云、增量同步、分库分表等各类同步解决方案。2020年更新实时同步能力支持10+种数据源的读写任意组合。提供MySQLOracle等多种数据源到阿里云MaxComputeHologres等大数据引擎的一键全增量同步解决方案。
@ -25,7 +26,7 @@ DataX本身作为数据同步框架将不同数据源的同步抽象为从源
# Quick Start
##### Download [DataX下载地址](https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz)
##### Download [DataX下载地址](https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202303/datax.tar.gz)
##### 请点击:[Quick Start](https://github.com/alibaba/DataX/blob/master/userGuid.md)
@ -37,7 +38,7 @@ DataX本身作为数据同步框架将不同数据源的同步抽象为从源
DataX目前已经有了比较全面的插件体系主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入目前支持数据如下图详情请点击[DataX数据源参考指南](https://github.com/alibaba/DataX/wiki/DataX-all-data-channels)
| 类型 | 数据源 | Reader(读) | Writer(写) | 文档 |
|--------------|---------------------------|:---------:|:-------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:|
|--------------|---------------------------|:---------:|:---------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:|
| RDBMS 关系型数据库 | MySQL | √ | √ | [](https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md) |
| | Oracle | √ | √ | [](https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/oraclewriter/doc/oraclewriter.md) |
| | OceanBase | √ | √ | [](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) 、[写](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) |
@ -47,6 +48,7 @@ DataX目前已经有了比较全面的插件体系主流的RDBMS数据库、N
| | Kingbase | √ | √ | [](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) |
| | 通用RDBMS(支持所有关系型数据库) | √ | √ | [](https://github.com/alibaba/DataX/blob/master/rdbmsreader/doc/rdbmsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/rdbmswriter/doc/rdbmswriter.md) |
| 阿里云数仓数据存储 | ODPS | √ | √ | [](https://github.com/alibaba/DataX/blob/master/odpsreader/doc/odpsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/odpswriter/doc/odpswriter.md) |
| | ADB | | √ | [](https://github.com/alibaba/DataX/blob/master/adbmysqlwriter/doc/adbmysqlwriter.md) |
| | ADS | | √ | [](https://github.com/alibaba/DataX/blob/master/adswriter/doc/adswriter.md) |
| | OSS | √ | √ | [](https://github.com/alibaba/DataX/blob/master/ossreader/doc/ossreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/osswriter/doc/osswriter.md) |
| | OCS | | √ | [](https://github.com/alibaba/DataX/blob/master/ocswriter/doc/ocswriter.md) |
@ -65,8 +67,10 @@ DataX目前已经有了比较全面的插件体系主流的RDBMS数据库、N
| 数仓数据存储 | StarRocks | √ | √ | 读 、[写](https://github.com/alibaba/DataX/blob/master/starrockswriter/doc/starrockswriter.md) |
| | ApacheDoris | | √ | [](https://github.com/alibaba/DataX/blob/master/doriswriter/doc/doriswriter.md) |
| | ClickHouse | | √ | 写 |
| | Databend | | √ | [](https://github.com/alibaba/DataX/blob/master/databendwriter/doc/databendwriter.md) |
| | Hive | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) |
| | kudu | | √ | [](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) |
| | selectdb | | √ | [](https://github.com/alibaba/DataX/blob/master/selectdbwriter/doc/selectdbwriter.md) |
| 无结构化数据存储 | TxtFile | √ | √ | [](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md) |
| | FTP | √ | √ | [](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md) |
| | HDFS | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) |
@ -105,6 +109,12 @@ DataX目前已经有了比较全面的插件体系主流的RDBMS数据库、N
DataX 后续计划月度迭代更新,也欢迎感兴趣的同学提交 Pull requests月度更新内容会介绍介绍如下。
- [datax_v202303]https://github.com/alibaba/DataX/releases/tag/datax_v202303)
- 精简代码
- 新增插件adbmysqlwriter、databendwriter、selectdbwriter
- 优化插件、修复问题sqlserver、hdfs、cassandra、kudu、oss
- fastjson 升级到 fastjson2
- [datax_v202210]https://github.com/alibaba/DataX/releases/tag/datax_v202210)
- 涉及通道能力更新OceanBase、Tdengine、Doris等

View File

@ -0,0 +1,338 @@
# DataX AdbMysqlWriter
---
## 1 快速介绍
AdbMysqlWriter 插件实现了写入数据到 ADB MySQL 目的表的功能。在底层实现上, AdbMysqlWriter 通过 JDBC 连接远程 ADB MySQL 数据库,并执行相应的 `insert into ...` 或者 ( `replace into ...` ) 的 SQL 语句将数据写入 ADB MySQL内部会分批次提交入库。
AdbMysqlWriter 面向ETL开发工程师他们使用 AdbMysqlWriter 从数仓导入数据到 ADB MySQL。同时 AdbMysqlWriter 亦可以作为数据迁移工具为DBA等用户提供服务。
## 2 实现原理
AdbMysqlWriter 通过 DataX 框架获取 Reader 生成的协议数据AdbMysqlWriter 通过 JDBC 连接远程 ADB MySQL 数据库,并执行相应的 `insert into ...` 或者 ( `replace into ...` ) 的 SQL 语句将数据写入 ADB MySQL。
* `insert into...`(遇到主键重复时会自动忽略当前写入数据,不做更新,作用等同于`insert ignore into`)
##### 或者
* `replace into...`(没有遇到主键/唯一性索引冲突时,与 insert into 行为一致,冲突时会用新行替换原有行所有字段) 的语句写入数据到 MySQL。出于性能考虑采用了 `PreparedStatement + Batch`,并且设置了:`rewriteBatchedStatements=true`,将数据缓冲到线程上下文 Buffer 中,当 Buffer 累计到预定阈值时,才发起写入请求。
<br />
注意:整个任务至少需要具备 `insert/replace into...` 的权限,是否需要其他权限,取决于你任务配置中在 preSql 和 postSql 中指定的语句。
## 3 功能说明
### 3.1 配置样例
* 这里使用一份从内存产生到 ADB MySQL 导入的数据。
```json
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": "DataX",
"type": "string"
},
{
"value": 19880808,
"type": "long"
},
{
"value": "1988-08-08 08:08:08",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 1000
}
},
"writer": {
"name": "adbmysqlwriter",
"parameter": {
"writeMode": "replace",
"username": "root",
"password": "root",
"column": [
"*"
],
"preSql": [
"truncate table @table"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://ip:port/database?useUnicode=true",
"table": [
"test"
]
}
]
}
}
}
]
}
}
```
### 3.2 参数说明
* **jdbcUrl**
* 描述:目的数据库的 JDBC 连接信息。作业运行时DataX 会在你提供的 jdbcUrl 后面追加如下属性yearIsDateType=false&zeroDateTimeBehavior=convertToNull&rewriteBatchedStatements=true
注意1、在一个数据库上只能配置一个 jdbcUrl
2、一个 AdbMySQL 写入任务仅能配置一个 jdbcUrl
3、jdbcUrl按照MySQL官方规范并可以填写连接附加控制信息比如想指定连接编码为 gbk ,则在 jdbcUrl 后面追加属性 useUnicode=true&characterEncoding=gbk。具体请参看 Mysql官方文档或者咨询对应 DBA。
* 必选:是 <br />
* 默认值:无 <br />
* **username**
* 描述:目的数据库的用户名 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **password**
* 描述:目的数据库的密码 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **table**
* 描述:目的表的表名称。只能配置一个 AdbMySQL 的表名称。
注意table 和 jdbcUrl 必须包含在 connection 配置单元中
* 必选:是 <br />
* 默认值:无 <br />
* **column**
* 描述:目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: "column": ["id", "name", "age"]。如果要依次写入全部列,使用`*`表示, 例如: `"column": ["*"]`。
**column配置项必须指定不能留空**
注意1、我们强烈不推荐你这样配置因为当你目的表字段个数、类型等有改动时你的任务可能运行不正确或者失败
2、 column 不能配置任何常量值
* 必选:是 <br />
* 默认值:否 <br />
* **session**
* 描述: DataX在获取 ADB MySQL 连接时执行session指定的SQL语句修改当前connection session属性
* 必须: 否
* 默认值: 空
* **preSql**
* 描述:写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,请使用 `@table` 表示,这样在实际执行 SQL 语句时,会对变量按照实际表名称进行替换。比如希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:`"preSql":["truncate table @table"]`,效果是:在执行到每个表写入数据前,会先执行对应的 `truncate table 对应表名称` <br />
* 必选:否 <br />
* 默认值:无 <br />
* **postSql**
* 描述:写入数据到目的表后,会执行这里的标准语句。(原理同 preSql <br />
* 必选:否 <br />
* 默认值:无 <br />
* **writeMode**
* 描述:控制写入数据到目标表采用 `insert into` 或者 `replace into` 或者 `ON DUPLICATE KEY UPDATE` 语句<br />
* 必选:是 <br />
* 所有选项insert/replace/update <br />
* 默认值replace <br />
* **batchSize**
* 描述一次性批量提交的记录数大小该值可以极大减少DataX与 Adb MySQL 的网络交互次数并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。<br />
* 必选:否 <br />
* 默认值2048 <br />
### 3.3 类型转换
目前 AdbMysqlWriter 支持大部分 MySQL 类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。
下面列出 AdbMysqlWriter 针对 MySQL 类型转换列表:
| DataX 内部类型 | AdbMysql 数据类型 |
|---------------|---------------------------------|
| Long | tinyint, smallint, int, bigint |
| Double | float, double, decimal |
| String | varchar |
| Date | date, time, datetime, timestamp |
| Boolean | boolean |
| Bytes | binary |
## 4 性能报告
### 4.1 环境准备
#### 4.1.1 数据特征
TPC-H 数据集 lineitem 表,共 17 个字段, 随机生成总记录行数 59986052。未压缩总数据量7.3GiB
建表语句:
CREATE TABLE `datax_adbmysqlwriter_perf_lineitem` (
`l_orderkey` bigint NOT NULL COMMENT '',
`l_partkey` int NOT NULL COMMENT '',
`l_suppkey` int NOT NULL COMMENT '',
`l_linenumber` int NOT NULL COMMENT '',
`l_quantity` decimal(15,2) NOT NULL COMMENT '',
`l_extendedprice` decimal(15,2) NOT NULL COMMENT '',
`l_discount` decimal(15,2) NOT NULL COMMENT '',
`l_tax` decimal(15,2) NOT NULL COMMENT '',
`l_returnflag` varchar(1024) NOT NULL COMMENT '',
`l_linestatus` varchar(1024) NOT NULL COMMENT '',
`l_shipdate` date NOT NULL COMMENT '',
`l_commitdate` date NOT NULL COMMENT '',
`l_receiptdate` date NOT NULL COMMENT '',
`l_shipinstruct` varchar(1024) NOT NULL COMMENT '',
`l_shipmode` varchar(1024) NOT NULL COMMENT '',
`l_comment` varchar(1024) NOT NULL COMMENT '',
`dummy` varchar(1024),
PRIMARY KEY (`l_orderkey`, `l_linenumber`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='datax perf test';
单行记录类似于:
l_orderkey: 2122789
l_partkey: 1233571
l_suppkey: 8608
l_linenumber: 1
l_quantity: 35.00
l_extendedprice: 52657.85
l_discount: 0.02
l_tax: 0.07
l_returnflag: N
l_linestatus: O
l_shipdate: 1996-11-03
l_commitdate: 1996-12-07
l_receiptdate: 1996-11-16
l_shipinstruct: COLLECT COD
l_shipmode: FOB
l_comment: ld, regular theodolites.
dummy:
#### 4.1.2 机器参数
* DataX ECS: 24Core48GB
* Adb MySQL 数据库
* 计算资源16Core64GB集群版
* 弹性IO资源3
#### 4.1.3 DataX jvm 参数
-Xms1G -Xmx10G -XX:+HeapDumpOnOutOfMemoryError
### 4.2 测试报告
| 通道数 | 批量提交行数 | DataX速度(Rec/s) | DataX流量(MB/s) | 导入用时(s) |
|-----|-------|------------------|---------------|---------|
| 1 | 512 | 23071 | 2.34 | 2627 |
| 1 | 1024 | 26080 | 2.65 | 2346 |
| 1 | 2048 | 28162 | 2.86 | 2153 |
| 1 | 4096 | 28978 | 2.94 | 2119 |
| 4 | 512 | 56590 | 5.74 | 1105 |
| 4 | 1024 | 81062 | 8.22 | 763 |
| 4 | 2048 | 107117 | 10.87 | 605 |
| 4 | 4096 | 113181 | 11.48 | 579 |
| 8 | 512 | 81062 | 8.22 | 786 |
| 8 | 1024 | 127629 | 12.95 | 519 |
| 8 | 2048 | 187456 | 19.01 | 369 |
| 8 | 4096 | 206848 | 20.98 | 341 |
| 16 | 512 | 130404 | 13.23 | 513 |
| 16 | 1024 | 214235 | 21.73 | 335 |
| 16 | 2048 | 299930 | 30.42 | 253 |
| 16 | 4096 | 333255 | 33.80 | 227 |
| 32 | 512 | 206848 | 20.98 | 347 |
| 32 | 1024 | 315716 | 32.02 | 241 |
| 32 | 2048 | 399907 | 40.56 | 199 |
| 32 | 4096 | 461431 | 46.80 | 184 |
| 64 | 512 | 333255 | 33.80 | 231 |
| 64 | 1024 | 399907 | 40.56 | 204 |
| 64 | 2048 | 428471 | 43.46 | 199 |
| 64 | 4096 | 461431 | 46.80 | 187 |
| 128 | 512 | 333255 | 33.80 | 235 |
| 128 | 1024 | 399907 | 40.56 | 203 |
| 128 | 2048 | 425432 | 43.15 | 197 |
| 128 | 4096 | 387006 | 39.26 | 211 |
说明:
1. datax 使用 txtfilereader 读取本地文件,避免源端存在性能瓶颈。
#### 性能测试小结
1. channel通道个数和batchSize对性能影响比较大
2. 通常不建议写入数据库时,通道个数 > 32
## 5 约束限制
## FAQ
***
**Q: AdbMysqlWriter 执行 postSql 语句报错,那么数据导入到目标数据库了吗?**
A: DataX 导入过程存在三块逻辑pre 操作、导入操作、post 操作其中任意一环报错DataX 作业报错。由于 DataX 不能保证在同一个事务完成上述几个操作,因此有可能数据已经落入到目标端。
***
**Q: 按照上述说法,那么有部分脏数据导入数据库,如果影响到线上数据库怎么办?**
A: 目前有两种解法,第一种配置 pre 语句,该 sql 可以清理当天导入数据, DataX 每次导入时候可以把上次清理干净并导入完整数据。第二种,向临时表导入数据,完成后再 rename 到线上表。
***
**Q: 上面第二种方法可以避免对线上数据造成影响,那我具体怎样操作?**
A: 可以配置临时表导入

79
adbmysqlwriter/pom.xml Executable file
View File

@ -0,0 +1,79 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>adbmysqlwriter</artifactId>
<name>adbmysqlwriter</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.40</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,35 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/adbmysqlwriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>adbmysqlwriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/adbmysqlwriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/adbmysqlwriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,138 @@
package com.alibaba.datax.plugin.writer.adbmysqlwriter;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
public class AdbMysqlWriter extends Writer {
private static final DataBaseType DATABASE_TYPE = DataBaseType.ADB;
public static class Job extends Writer.Job {
private Configuration originalConfig = null;
private CommonRdbmsWriter.Job commonRdbmsWriterJob;
@Override
public void preCheck(){
this.init();
this.commonRdbmsWriterJob.writerPreCheck(this.originalConfig, DATABASE_TYPE);
}
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
this.commonRdbmsWriterJob.init(this.originalConfig);
}
// 一般来说是需要推迟到 task 中进行pre 的执行单表情况例外
@Override
public void prepare() {
//实跑先不支持 权限 检验
//this.commonRdbmsWriterJob.privilegeValid(this.originalConfig, DATABASE_TYPE);
this.commonRdbmsWriterJob.prepare(this.originalConfig);
}
@Override
public List<Configuration> split(int mandatoryNumber) {
return this.commonRdbmsWriterJob.split(this.originalConfig, mandatoryNumber);
}
// 一般来说是需要推迟到 task 中进行post 的执行单表情况例外
@Override
public void post() {
this.commonRdbmsWriterJob.post(this.originalConfig);
}
@Override
public void destroy() {
this.commonRdbmsWriterJob.destroy(this.originalConfig);
}
}
public static class Task extends Writer.Task {
private Configuration writerSliceConfig;
private CommonRdbmsWriter.Task commonRdbmsWriterTask;
public static class DelegateClass extends CommonRdbmsWriter.Task {
private long writeTime = 0L;
private long writeCount = 0L;
private long lastLogTime = 0;
public DelegateClass(DataBaseType dataBaseType) {
super(dataBaseType);
}
@Override
protected void doBatchInsert(Connection connection, List<Record> buffer)
throws SQLException {
long startTime = System.currentTimeMillis();
super.doBatchInsert(connection, buffer);
writeCount = writeCount + buffer.size();
writeTime = writeTime + (System.currentTimeMillis() - startTime);
// log write metrics every 10 seconds
if (System.currentTimeMillis() - lastLogTime > 10000) {
lastLogTime = System.currentTimeMillis();
logTotalMetrics();
}
}
public void logTotalMetrics() {
LOG.info(Thread.currentThread().getName() + ", AdbMySQL writer take " + writeTime + " ms, write " + writeCount + " records.");
}
}
@Override
public void init() {
this.writerSliceConfig = super.getPluginJobConf();
if (StringUtils.isBlank(this.writerSliceConfig.getString(Key.WRITE_MODE))) {
this.writerSliceConfig.set(Key.WRITE_MODE, "REPLACE");
}
this.commonRdbmsWriterTask = new DelegateClass(DATABASE_TYPE);
this.commonRdbmsWriterTask.init(this.writerSliceConfig);
}
@Override
public void prepare() {
this.commonRdbmsWriterTask.prepare(this.writerSliceConfig);
}
//TODO 改用连接池确保每次获取的连接都是可用的注意连接可能需要每次都初始化其 session
public void startWrite(RecordReceiver recordReceiver) {
this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig,
super.getTaskPluginCollector());
}
@Override
public void post() {
this.commonRdbmsWriterTask.post(this.writerSliceConfig);
}
@Override
public void destroy() {
this.commonRdbmsWriterTask.destroy(this.writerSliceConfig);
}
@Override
public boolean supportFailOver(){
String writeMode = writerSliceConfig.getString(Key.WRITE_MODE);
return "replace".equalsIgnoreCase(writeMode);
}
}
}

View File

@ -0,0 +1,6 @@
{
"name": "adbmysqlwriter",
"class": "com.alibaba.datax.plugin.writer.adbmysqlwriter.AdbMysqlWriter",
"description": "useScene: prod. mechanism: Jdbc connection using the database, execute insert sql. warn: The more you know about the database, the less problems you encounter.",
"developer": "alibaba"
}

View File

@ -0,0 +1,20 @@
{
"name": "adbmysqlwriter",
"parameter": {
"username": "username",
"password": "password",
"column": ["col1", "col2", "col3"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://<host>:<port>[/<database>]",
"table": ["table1", "table2"]
}
],
"preSql": [],
"postSql": [],
"batchSize": 65536,
"batchByteSize": 134217728,
"dryRun": false,
"writeMode": "insert"
}
}

View File

@ -110,7 +110,6 @@ DataX 将数据直连ADS接口利用ADS暴露的INSERT接口直写到ADS。
"account": "xxx@aliyun.com",
"odpsServer": "xxx",
"tunnelServer": "xxx",
"accountType": "aliyun",
"project": "transfer_project"
},
"writeMode": "load",

View File

@ -18,7 +18,7 @@ import com.alibaba.datax.plugin.writer.adswriter.AdsWriterErrorCode;
import com.alibaba.datax.plugin.writer.adswriter.ads.TableInfo;
import com.alibaba.datax.plugin.writer.adswriter.util.Constant;
import com.alibaba.datax.plugin.writer.adswriter.util.Key;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;

View File

@ -12,7 +12,6 @@ public class TransferProjectConf {
public final static String KEY_ACCOUNT = "odps.account";
public final static String KEY_ODPS_SERVER = "odps.odpsServer";
public final static String KEY_ODPS_TUNNEL = "odps.tunnelServer";
public final static String KEY_ACCOUNT_TYPE = "odps.accountType";
public final static String KEY_PROJECT = "odps.project";
private String accessId;
@ -20,7 +19,6 @@ public class TransferProjectConf {
private String account;
private String odpsServer;
private String odpsTunnel;
private String accountType;
private String project;
public static TransferProjectConf create(Configuration adsWriterConf) {
@ -30,7 +28,6 @@ public class TransferProjectConf {
res.account = adsWriterConf.getString(KEY_ACCOUNT);
res.odpsServer = adsWriterConf.getString(KEY_ODPS_SERVER);
res.odpsTunnel = adsWriterConf.getString(KEY_ODPS_TUNNEL);
res.accountType = adsWriterConf.getString(KEY_ACCOUNT_TYPE, "aliyun");
res.project = adsWriterConf.getString(KEY_PROJECT);
return res;
}
@ -55,9 +52,6 @@ public class TransferProjectConf {
return odpsTunnel;
}
public String getAccountType() {
return accountType;
}
public String getProject() {
return project;

View File

@ -23,7 +23,7 @@ import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.CodecRegistry;
@ -298,6 +298,7 @@ public class CassandraReaderHelper {
record.addColumn(new LongColumn(rs.getInt(i)));
break;
case COUNTER:
case BIGINT:
record.addColumn(new LongColumn(rs.getLong(i)));
break;
@ -558,26 +559,6 @@ public class CassandraReaderHelper {
String.format(
"配置信息有错误.列信息中需要包含'%s'字段 .",Key.COLUMN_NAME));
}
if( name.startsWith(Key.WRITE_TIME) ) {
String colName = name.substring(Key.WRITE_TIME.length(),name.length() - 1 );
ColumnMetadata col = tableMetadata.getColumn(colName);
if( col == null ) {
throw DataXException
.asDataXException(
CassandraReaderErrorCode.CONF_ERROR,
String.format(
"配置信息有错误.列'%s'不存在 .",colName));
}
} else {
ColumnMetadata col = tableMetadata.getColumn(name);
if( col == null ) {
throw DataXException
.asDataXException(
CassandraReaderErrorCode.CONF_ERROR,
String.format(
"配置信息有错误.列'%s'不存在 .",name));
}
}
}
}

View File

@ -18,10 +18,10 @@ import java.util.UUID;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONException;
import com.alibaba.fastjson2.JSONObject;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.CodecRegistry;
@ -204,7 +204,7 @@ public class CassandraWriterHelper {
case MAP: {
Map m = new HashMap();
for (JSONObject.Entry e : ((JSONObject)jsonObject).entrySet()) {
for (Map.Entry e : ((JSONObject)jsonObject).entrySet()) {
Object k = parseFromString((String) e.getKey(), type.getTypeArguments().get(0));
Object v = parseFromJson(e.getValue(), type.getTypeArguments().get(1));
m.put(k,v);
@ -233,7 +233,7 @@ public class CassandraWriterHelper {
case UDT: {
UDTValue t = ((UserType) type).newValue();
UserType userType = t.getType();
for (JSONObject.Entry e : ((JSONObject)jsonObject).entrySet()) {
for (Map.Entry e : ((JSONObject)jsonObject).entrySet()) {
DataType eleType = userType.getFieldType((String)e.getKey());
t.set((String)e.getKey(), parseFromJson(e.getValue(), eleType), registry.codecFor(eleType).getJavaType());
}

View File

@ -10,8 +10,8 @@ import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import java.sql.Array;
import java.sql.Connection;

View File

@ -17,8 +17,8 @@
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>

View File

@ -1,6 +1,6 @@
package com.alibaba.datax.common.element;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import java.math.BigDecimal;
import java.math.BigInteger;

View File

@ -31,7 +31,6 @@ public class PerfTrace {
private int taskGroupId;
private int channelNumber;
private int priority;
private int batchSize = 500;
private volatile boolean perfReportEnable = true;
@ -54,12 +53,12 @@ public class PerfTrace {
* @param taskGroupId
* @return
*/
public static PerfTrace getInstance(boolean isJob, long jobId, int taskGroupId, int priority, boolean enable) {
public static PerfTrace getInstance(boolean isJob, long jobId, int taskGroupId, boolean enable) {
if (instance == null) {
synchronized (lock) {
if (instance == null) {
instance = new PerfTrace(isJob, jobId, taskGroupId, priority, enable);
instance = new PerfTrace(isJob, jobId, taskGroupId, enable);
}
}
}
@ -76,22 +75,21 @@ public class PerfTrace {
LOG.error("PerfTrace instance not be init! must have some error! ");
synchronized (lock) {
if (instance == null) {
instance = new PerfTrace(false, -1111, -1111, 0, false);
instance = new PerfTrace(false, -1111, -1111, false);
}
}
}
return instance;
}
private PerfTrace(boolean isJob, long jobId, int taskGroupId, int priority, boolean enable) {
private PerfTrace(boolean isJob, long jobId, int taskGroupId, boolean enable) {
try {
this.perfTraceId = isJob ? "job_" + jobId : String.format("taskGroup_%s_%s", jobId, taskGroupId);
this.enable = enable;
this.isJob = isJob;
this.taskGroupId = taskGroupId;
this.instId = jobId;
this.priority = priority;
LOG.info(String.format("PerfTrace traceId=%s, isEnable=%s, priority=%s", this.perfTraceId, this.enable, this.priority));
LOG.info(String.format("PerfTrace traceId=%s, isEnable=%s", this.perfTraceId, this.enable));
} catch (Exception e) {
// do nothing
@ -398,7 +396,6 @@ public class PerfTrace {
jdo.setWindowEnd(this.windowEnd);
jdo.setJobStartTime(jobStartTime);
jdo.setJobRunTimeMs(System.currentTimeMillis() - jobStartTime.getTime());
jdo.setJobPriority(this.priority);
jdo.setChannelNum(this.channelNumber);
jdo.setCluster(this.cluster);
jdo.setJobDomain(this.jobDomain);
@ -609,7 +606,6 @@ public class PerfTrace {
private Date jobStartTime;
private Date jobEndTime;
private Long jobRunTimeMs;
private Integer jobPriority;
private Integer channelNum;
private String cluster;
private String jobDomain;
@ -680,10 +676,6 @@ public class PerfTrace {
return jobRunTimeMs;
}
public Integer getJobPriority() {
return jobPriority;
}
public Integer getChannelNum() {
return channelNum;
}
@ -816,10 +808,6 @@ public class PerfTrace {
this.jobRunTimeMs = jobRunTimeMs;
}
public void setJobPriority(Integer jobPriority) {
this.jobPriority = jobPriority;
}
public void setChannelNum(Integer channelNum) {
this.channelNum = channelNum;
}

View File

@ -3,8 +3,8 @@ package com.alibaba.datax.common.util;
import com.alibaba.datax.common.exception.CommonErrorCode;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.spi.ErrorCode;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONWriter;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.CharUtils;
import org.apache.commons.lang3.StringUtils;
@ -586,7 +586,7 @@ public class Configuration {
*/
public String beautify() {
return JSON.toJSONString(this.getInternal(),
SerializerFeature.PrettyFormat);
JSONWriter.Feature.PrettyFormat);
}
/**

View File

@ -1,62 +0,0 @@
package com.alibaba.datax.common.util;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.datax.common.exception.DataXException;
public class IdAndKeyRollingUtil {
private static Logger LOGGER = LoggerFactory.getLogger(IdAndKeyRollingUtil.class);
public static final String SKYNET_ACCESSID = "SKYNET_ACCESSID";
public static final String SKYNET_ACCESSKEY = "SKYNET_ACCESSKEY";
public final static String ACCESS_ID = "accessId";
public final static String ACCESS_KEY = "accessKey";
public static String parseAkFromSkynetAccessKey() {
Map<String, String> envProp = System.getenv();
String skynetAccessID = envProp.get(IdAndKeyRollingUtil.SKYNET_ACCESSID);
String skynetAccessKey = envProp.get(IdAndKeyRollingUtil.SKYNET_ACCESSKEY);
String accessKey = null;
// follow 原有的判断条件
// 环境变量中如果存在SKYNET_ACCESSID/SKYNET_ACCESSKEy只要有其中一个变量则认为一定是两个都存在的
// if (StringUtils.isNotBlank(skynetAccessID) ||
// StringUtils.isNotBlank(skynetAccessKey)) {
// 检查严格只有加密串不为空的时候才进去不过 之前能跑的加密串都不应该为空
if (StringUtils.isNotBlank(skynetAccessKey)) {
LOGGER.info("Try to get accessId/accessKey from environment SKYNET_ACCESSKEY.");
accessKey = DESCipher.decrypt(skynetAccessKey);
if (StringUtils.isBlank(accessKey)) {
// 环境变量里面有但是解析不到
throw DataXException.asDataXException(String.format(
"Failed to get the [accessId]/[accessKey] from the environment variable. The [accessId]=[%s]",
skynetAccessID));
}
}
if (StringUtils.isNotBlank(accessKey)) {
LOGGER.info("Get accessId/accessKey from environment variables SKYNET_ACCESSKEY successfully.");
}
return accessKey;
}
public static String getAccessIdAndKeyFromEnv(Configuration originalConfig) {
String accessId = null;
Map<String, String> envProp = System.getenv();
accessId = envProp.get(IdAndKeyRollingUtil.SKYNET_ACCESSID);
String accessKey = null;
if (StringUtils.isBlank(accessKey)) {
// 老的没有出异常只是获取不到ak
accessKey = IdAndKeyRollingUtil.parseAkFromSkynetAccessKey();
}
if (StringUtils.isNotBlank(accessKey)) {
// 确认使用这个的都是 accessIdaccessKey的命名习惯
originalConfig.set(IdAndKeyRollingUtil.ACCESS_ID, accessId);
originalConfig.set(IdAndKeyRollingUtil.ACCESS_KEY, accessKey);
}
return accessKey;
}
}

View File

@ -79,16 +79,9 @@ public class Engine {
perfReportEnable = false;
}
int priority = 0;
try {
priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
}catch (NumberFormatException e){
LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY"));
}
Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
//初始化PerfTrace
PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, traceEnable);
perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);
container.start();

View File

@ -114,7 +114,7 @@ public final class JobAssignUtil {
* 需要实现的效果通过例子来说是
* <pre>
* a 库上有表0, 1, 2
* a 库上有表3, 4
* b 库上有表3, 4
* c 库上有表5, 6, 7
*
* 如果有 4个 taskGroup

View File

@ -27,7 +27,7 @@ import com.alibaba.datax.core.util.container.ClassLoaderSwapper;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.datax.core.util.container.LoadUtil;
import com.alibaba.datax.dataxservice.face.domain.enums.ExecuteMode;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.slf4j.Logger;

View File

@ -2,7 +2,7 @@ package com.alibaba.datax.core.statistics.communication;
import com.alibaba.datax.common.statistics.PerfTrace;
import com.alibaba.datax.common.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang.Validate;
import java.text.DecimalFormat;

View File

@ -6,7 +6,7 @@ import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.datax.core.statistics.plugin.task.util.DirtyRecord;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;

View File

@ -4,7 +4,7 @@ import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import java.math.BigDecimal;
import java.math.BigInteger;

View File

@ -27,7 +27,7 @@ import com.alibaba.datax.core.util.TransformerUtil;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.datax.core.util.container.LoadUtil;
import com.alibaba.datax.dataxservice.face.domain.enums.State;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -5,7 +5,7 @@ import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.core.util.ClassSize;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import java.util.ArrayList;
import java.util.HashMap;

View File

@ -0,0 +1,171 @@
# DataX DatabendWriter
[简体中文](./databendwriter-CN.md) | [English](./databendwriter.md)
## 1 快速介绍
Databend Writer 是一个 DataX 的插件,用于从 DataX 中写入数据到 Databend 表中。
该插件基于[databend JDBC driver](https://github.com/databendcloud/databend-jdbc) ,它使用 [RESTful http protocol](https://databend.rs/doc/integrations/api/rest)
在开源的 databend 和 [databend cloud](https://app.databend.com/) 上执行查询。
在每个写入批次中databend writer 将批量数据上传到内部的 S3 stage然后执行相应的 insert SQL 将数据上传到 databend 表中。
为了最佳的用户体验,如果您使用的是 databend 社区版本,您应该尝试采用 [S3](https://aws.amazon.com/s3/)/[minio](https://min.io/)/[OSS](https://www.alibabacloud.com/product/object-storage-service) 作为其底层存储层,因为
它们支持预签名上传操作,否则您可能会在数据传输上浪费不必要的成本。
您可以在[文档](https://databend.rs/doc/deploy/deploying-databend)中了解更多详细信息
## 2 实现原理
Databend Writer 将使用 DataX 从 DataX Reader 中获取生成的记录,并将记录批量插入到 databend 表中指定的列中。
## 3 功能说明
### 3.1 配置样例
* 以下配置将从内存中读取一些生成的数据并将数据上传到databend表中
#### 准备工作
```sql
--- create table in databend
drop table if exists datax.sample1;
drop database if exists datax;
create database if not exists datax;
create table if not exsits datax.sample1(a string, b int64, c date, d timestamp, e bool, f string, g variant);
```
#### 配置样例
```json
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": "DataX",
"type": "string"
},
{
"value": 19880808,
"type": "long"
},
{
"value": "1926-08-08 08:08:08",
"type": "date"
},
{
"value": "1988-08-08 08:08:08",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
},
{
"value": "{\"type\": \"variant\", \"value\": \"test\"}",
"type": "string"
}
],
"sliceRecordCount": 10000
}
},
"writer": {
"name": "databendwriter",
"parameter": {
"username": "databend",
"password": "databend",
"column": ["a", "b", "c", "d", "e", "f", "g"],
"batchSize": 1000,
"preSql": [
],
"postSql": [
],
"connection": [
{
"jdbcUrl": "jdbc:databend://localhost:8000/datax",
"table": [
"sample1"
]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
```
### 3.2 参数说明
* jdbcUrl
* 描述: JDBC 数据源 url。请参阅仓库中的详细[文档](https://github.com/databendcloud/databend-jdbc)
* 必选: 是
* 默认值: 无
* 示例: jdbc:databend://localhost:8000/datax
* username
* 描述: JDBC 数据源用户名
* 必选: 是
* 默认值: 无
* 示例: databend
* password
* 描述: JDBC 数据源密码
* 必选: 是
* 默认值: 无
* 示例: databend
* table
* 描述: 表名的集合table应该包含column参数中的所有列。
* 必选: 是
* 默认值: 无
* 示例: ["sample1"]
* column
* 描述: 表中的列名集合字段顺序应该与reader的record中的column类型对应
* 必选: 是
* 默认值: 无
* 示例: ["a", "b", "c", "d", "e", "f", "g"]
* batchSize
* 描述: 每个批次的记录数
* 必选: 否
* 默认值: 1000
* 示例: 1000
* preSql
* 描述: 在写入数据之前执行的SQL语句
* 必选: 否
* 默认值: 无
* 示例: ["delete from datax.sample1"]
* postSql
* 描述: 在写入数据之后执行的SQL语句
* 必选: 否
* 默认值: 无
* 示例: ["select count(*) from datax.sample1"]
### 3.3 类型转化
DataX中的数据类型可以转换为databend中的相应数据类型。下表显示了两种类型之间的对应关系。
| DataX 内部类型 | Databend 数据类型 |
|------------|-----------------------------------------------------------|
| INT | TINYINT, INT8, SMALLINT, INT16, INT, INT32, BIGINT, INT64 |
| LONG | TINYINT, INT8, SMALLINT, INT16, INT, INT32, BIGINT, INT64 |
| STRING | STRING, VARCHAR |
| DOUBLE | FLOAT, DOUBLE |
| BOOL | BOOLEAN, BOOL |
| DATE | DATE, TIMESTAMP |
| BYTES | STRING, VARCHAR |
## 4 性能测试
## 5 约束限制
目前复杂数据类型支持不稳定如果您想使用复杂数据类型例如元组数组请检查databend和jdbc驱动程序的进一步版本。
## FAQ

View File

@ -0,0 +1,166 @@
# DataX DatabendWriter
[简体中文](./databendwriter-CN.md) | [English](./databendwriter.md)
## 1 Introduction
Databend Writer is a plugin for DataX to write data to Databend Table from dataX records.
The plugin is based on [databend JDBC driver](https://github.com/databendcloud/databend-jdbc) which use [RESTful http protocol](https://databend.rs/doc/integrations/api/rest)
to execute query on open source databend and [databend cloud](https://app.databend.com/).
During each write batch, databend writer will upload batch data into internal S3 stage and execute corresponding insert SQL to upload data into databend table.
For best user experience, if you are using databend community distribution, you should try to adopt [S3](https://aws.amazon.com/s3/)/[minio](https://min.io/)/[OSS](https://www.alibabacloud.com/product/object-storage-service) as its underlying storage layer since
they support presign upload operation otherwise you may expend unneeded cost on data transfer.
You could see more details on the [doc](https://databend.rs/doc/deploy/deploying-databend)
## 2 Detailed Implementation
Databend Writer would use DataX to fetch records generated by DataX Reader, and then batch insert records to the designated columns for your databend table.
## 3 Features
### 3.1 Example Configurations
* the following configuration would read some generated data in memory and upload data into databend table
#### Preparation
```sql
--- create table in databend
drop table if exists datax.sample1;
drop database if exists datax;
create database if not exists datax;
create table if not exsits datax.sample1(a string, b int64, c date, d timestamp, e bool, f string, g variant);
```
#### Configurations
```json
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": "DataX",
"type": "string"
},
{
"value": 19880808,
"type": "long"
},
{
"value": "1926-08-08 08:08:08",
"type": "date"
},
{
"value": "1988-08-08 08:08:08",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
},
{
"value": "{\"type\": \"variant\", \"value\": \"test\"}",
"type": "string"
}
],
"sliceRecordCount": 10000
}
},
"writer": {
"name": "databendwriter",
"parameter": {
"username": "databend",
"password": "databend",
"column": ["a", "b", "c", "d", "e", "f", "g"],
"batchSize": 1000,
"preSql": [
],
"postSql": [
],
"connection": [
{
"jdbcUrl": "jdbc:databend://localhost:8000/datax",
"table": [
"sample1"
]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
```
### 3.2 Configuration Description
* jdbcUrl
* Description: JDBC Data source url in Databend. Please take a look at repository for detailed [doc](https://github.com/databendcloud/databend-jdbc)
* Required: yes
* Default: none
* Example: jdbc:databend://localhost:8000/datax
* username
* Description: Databend user name
* Required: yes
* Default: none
* Example: databend
* password
* Description: Databend user password
* Required: yes
* Default: none
* Example: databend
* table
* Description: A list of table names that should contain all of the columns in the column parameter.
* Required: yes
* Default: none
* Example: ["sample1"]
* column
* Description: A list of column field names that should be inserted into the table. if you want to insert all column fields use `["*"]` instead.
* Required: yes
* Default: none
* Example: ["a", "b", "c", "d", "e", "f", "g"]
* batchSize
* Description: The number of records to be inserted in each batch.
* Required: no
* Default: 1024
* preSql
* Description: A list of SQL statements that will be executed before the write operation.
* Required: no
* Default: none
* postSql
* Description: A list of SQL statements that will be executed after the write operation.
* Required: no
* Default: none
### 3.3 Type Convert
Data types in datax can be converted to the corresponding data types in databend. The following table shows the correspondence between the two types.
| DataX Type | Databend Type |
|------------|-----------------------------------------------------------|
| INT | TINYINT, INT8, SMALLINT, INT16, INT, INT32, BIGINT, INT64 |
| LONG | TINYINT, INT8, SMALLINT, INT16, INT, INT32, BIGINT, INT64 |
| STRING | STRING, VARCHAR |
| DOUBLE | FLOAT, DOUBLE |
| BOOL | BOOLEAN, BOOL |
| DATE | DATE, TIMESTAMP |
| BYTES | STRING, VARCHAR |
## 4 Performance Test
## 5 Restrictions
Currently, complex data type support is not stable, if you want to use complex data type such as tuple, array, please check further release version of databend and jdbc driver.
## FAQ

101
databendwriter/pom.xml Normal file
View File

@ -0,0 +1,101 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>databendwriter</artifactId>
<name>databendwriter</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.databend</groupId>
<artifactId>databend-jdbc</artifactId>
<version>0.0.5</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,34 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/databendwriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>databendwriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/databendwriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/databendwriter/libs</outputDirectory>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,248 @@
package com.alibaba.datax.plugin.writer.databendwriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.CommonErrorCode;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import com.alibaba.datax.plugin.writer.databendwriter.util.DatabendWriterUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.List;
import java.util.regex.Pattern;
public class DatabendWriter extends Writer
{
private static final DataBaseType DATABASE_TYPE = DataBaseType.Databend;
public static class Job
extends Writer.Job
{
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig;
private CommonRdbmsWriter.Job commonRdbmsWriterMaster;
@Override
public void init()
{
this.originalConfig = super.getPluginJobConf();
this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE);
this.commonRdbmsWriterMaster.init(this.originalConfig);
// placeholder currently not supported by databend driver, needs special treatment
DatabendWriterUtil.dealWriteMode(this.originalConfig);
}
@Override
public void preCheck()
{
this.init();
this.commonRdbmsWriterMaster.writerPreCheck(this.originalConfig, DATABASE_TYPE);
}
@Override
public void prepare() {
this.commonRdbmsWriterMaster.prepare(this.originalConfig);
}
@Override
public List<Configuration> split(int mandatoryNumber) {
return this.commonRdbmsWriterMaster.split(this.originalConfig, mandatoryNumber);
}
@Override
public void post() {
this.commonRdbmsWriterMaster.post(this.originalConfig);
}
@Override
public void destroy() {
this.commonRdbmsWriterMaster.destroy(this.originalConfig);
}
}
public static class Task extends Writer.Task
{
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private Configuration writerSliceConfig;
private CommonRdbmsWriter.Task commonRdbmsWriterSlave;
@Override
public void init()
{
this.writerSliceConfig = super.getPluginJobConf();
this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DataBaseType.Databend){
@Override
protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqltype, String typeName, Column column) throws SQLException {
try {
if (column.getRawData() == null) {
preparedStatement.setNull(columnIndex + 1, columnSqltype);
return preparedStatement;
}
java.util.Date utilDate;
switch (columnSqltype) {
case Types.TINYINT:
case Types.SMALLINT:
case Types.INTEGER:
preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue());
break;
case Types.BIGINT:
preparedStatement.setLong(columnIndex + 1, column.asLong());
break;
case Types.DECIMAL:
preparedStatement.setBigDecimal(columnIndex + 1, column.asBigDecimal());
break;
case Types.FLOAT:
case Types.REAL:
preparedStatement.setFloat(columnIndex + 1, column.asDouble().floatValue());
break;
case Types.DOUBLE:
preparedStatement.setDouble(columnIndex + 1, column.asDouble());
break;
case Types.DATE:
java.sql.Date sqlDate = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"Date type conversion error: [%s]", column));
}
if (null != utilDate) {
sqlDate = new java.sql.Date(utilDate.getTime());
}
preparedStatement.setDate(columnIndex + 1, sqlDate);
break;
case Types.TIME:
java.sql.Time sqlTime = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"Date type conversion error: [%s]", column));
}
if (null != utilDate) {
sqlTime = new java.sql.Time(utilDate.getTime());
}
preparedStatement.setTime(columnIndex + 1, sqlTime);
break;
case Types.TIMESTAMP:
Timestamp sqlTimestamp = null;
if (column instanceof StringColumn && column.asString() != null) {
String timeStampStr = column.asString();
// JAVA TIMESTAMP 类型入参必须是 "2017-07-12 14:39:00.123566" 格式
String pattern = "^\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+";
boolean isMatch = Pattern.matches(pattern, timeStampStr);
if (isMatch) {
sqlTimestamp = Timestamp.valueOf(timeStampStr);
preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp);
break;
}
}
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"Date type conversion error: [%s]", column));
}
if (null != utilDate) {
sqlTimestamp = new Timestamp(
utilDate.getTime());
}
preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp);
break;
case Types.BINARY:
case Types.VARBINARY:
case Types.BLOB:
case Types.LONGVARBINARY:
preparedStatement.setBytes(columnIndex + 1, column
.asBytes());
break;
case Types.BOOLEAN:
// warn: bit(1) -> Types.BIT 可使用setBoolean
// warn: bit(>1) -> Types.VARBINARY 可使用setBytes
case Types.BIT:
if (this.dataBaseType == DataBaseType.MySql) {
Boolean asBoolean = column.asBoolean();
if (asBoolean != null) {
preparedStatement.setBoolean(columnIndex + 1, asBoolean);
} else {
preparedStatement.setNull(columnIndex + 1, Types.BIT);
}
} else {
preparedStatement.setString(columnIndex + 1, column.asString());
}
break;
default:
// cast variant / array into string is fine.
preparedStatement.setString(columnIndex + 1, column.asString());
break;
}
return preparedStatement;
} catch (DataXException e) {
// fix类型转换或者溢出失败时将具体哪一列打印出来
if (e.getErrorCode() == CommonErrorCode.CONVERT_NOT_SUPPORT ||
e.getErrorCode() == CommonErrorCode.CONVERT_OVER_FLOW) {
throw DataXException
.asDataXException(
e.getErrorCode(),
String.format(
"type conversion error. columnName: [%s], columnType:[%d], columnJavaType: [%s]. please change the data type in given column field or do not sync on the column.",
this.resultSetMetaData.getLeft()
.get(columnIndex),
this.resultSetMetaData.getMiddle()
.get(columnIndex),
this.resultSetMetaData.getRight()
.get(columnIndex)));
} else {
throw e;
}
}
}
};
this.commonRdbmsWriterSlave.init(this.writerSliceConfig);
}
@Override
public void destroy()
{
this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig);
}
@Override
public void prepare() {
this.commonRdbmsWriterSlave.prepare(this.writerSliceConfig);
}
@Override
public void post() {
this.commonRdbmsWriterSlave.post(this.writerSliceConfig);
}
@Override
public void startWrite(RecordReceiver lineReceiver)
{
this.commonRdbmsWriterSlave.startWrite(lineReceiver, this.writerSliceConfig, this.getTaskPluginCollector());
}
}
}

View File

@ -0,0 +1,40 @@
package com.alibaba.datax.plugin.writer.databendwriter.util;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.StringJoiner;
public final class DatabendWriterUtil
{
private static final Logger LOG = LoggerFactory.getLogger(DatabendWriterUtil.class);
private DatabendWriterUtil() {}
public static void dealWriteMode(Configuration originalConfig)
{
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
String jdbcUrl = originalConfig.getString(String.format("%s[0].%s",
Constant.CONN_MARK, Key.JDBC_URL, String.class));
String writeMode = originalConfig.getString(Key.WRITE_MODE, "INSERT");
StringBuilder writeDataSqlTemplate = new StringBuilder();
writeDataSqlTemplate.append("INSERT INTO %s");
StringJoiner columnString = new StringJoiner(",");
for (String column : columns) {
columnString.add(column);
}
writeDataSqlTemplate.append(String.format("(%s)", columnString));
writeDataSqlTemplate.append(" VALUES");
LOG.info("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);
originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate);
}
}

View File

@ -0,0 +1,6 @@
{
"name": "databendwriter",
"class": "com.alibaba.datax.plugin.writer.databendwriter.DatabendWriter",
"description": "execute batch insert sql to write dataX data into databend",
"developer": "databend"
}

View File

@ -0,0 +1,19 @@
{
"name": "databendwriter",
"parameter": {
"username": "username",
"password": "password",
"column": ["col1", "col2", "col3"],
"connection": [
{
"jdbcUrl": "jdbc:databend://<host>:<port>[/<database>]",
"table": "table1"
}
],
"preSql": [],
"postSql": [],
"maxBatchRows": 65536,
"maxBatchSize": 134217728
}
}

View File

@ -1,8 +1,8 @@
package com.alibaba.datax.plugin.reader.datahubreader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.aliyun.datahub.client.DatahubClient;
import com.aliyun.datahub.client.DatahubClientBuilder;
import com.aliyun.datahub.client.auth.Account;

View File

@ -3,8 +3,8 @@ package com.alibaba.datax.plugin.writer.datahubwriter;
import org.apache.commons.lang3.StringUtils;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.aliyun.datahub.client.DatahubClient;
import com.aliyun.datahub.client.DatahubClientBuilder;
import com.aliyun.datahub.client.auth.Account;

View File

@ -8,7 +8,7 @@ import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.DataXCaseEnvUtil;
import com.alibaba.datax.common.util.RetryUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import com.aliyun.datahub.client.DatahubClient;
import com.aliyun.datahub.client.model.FieldType;
import com.aliyun.datahub.client.model.GetTopicResult;

View File

@ -1,7 +1,7 @@
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.element.Record;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import java.util.HashMap;
import java.util.List;

View File

@ -1,6 +1,6 @@
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;

View File

@ -5,8 +5,8 @@ import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.elasticsearchwriter.jest.ClusterInfo;
import com.alibaba.datax.plugin.writer.elasticsearchwriter.jest.ClusterInfoResult;
import com.alibaba.datax.plugin.writer.elasticsearchwriter.jest.PutMapping7;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
@ -53,6 +53,8 @@ public class ElasticSearchClient {
public ElasticSearchClient(Configuration conf) {
this.conf = conf;
String endpoint = Key.getEndpoint(conf);
//es是支持集群写入的
String[] endpoints = endpoint.split(",");
String user = Key.getUsername(conf);
String passwd = Key.getPassword(conf);
boolean multiThread = Key.isMultiThread(conf);
@ -63,7 +65,7 @@ public class ElasticSearchClient {
int totalConnection = this.conf.getInt("maxTotalConnection", 200);
JestClientFactory factory = new JestClientFactory();
Builder httpClientConfig = new HttpClientConfig
.Builder(endpoint)
.Builder(Arrays.asList(endpoints))
// .setPreemptiveAuth(new HttpHost(endpoint))
.multiThreaded(multiThread)
.connTimeout(readTimeout)

View File

@ -9,11 +9,11 @@ import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.DataXCaseEnvUtil;
import com.alibaba.datax.common.util.RetryUtil;
import com.alibaba.datax.plugin.writer.elasticsearchwriter.Key.ActionType;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.TypeReference;
import com.alibaba.fastjson2.JSONWriter;
import com.google.common.base.Joiner;
import io.searchbox.client.JestResult;
import io.searchbox.core.*;
@ -927,9 +927,8 @@ public class ElasticSearchWriter extends Writer {
Index.Builder builder = null;
if (this.enableWriteNull) {
builder = new Index.Builder(
JSONObject.toJSONString(data, SerializerFeature.WriteMapNullValue,
SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,
SerializerFeature.WriteEnumUsingToString, SerializerFeature.SortField));
JSONObject.toJSONString(data, JSONWriter.Feature.WriteMapNullValue,
JSONWriter.Feature.WriteEnumUsingToString));
} else {
builder = new Index.Builder(JSONObject.toJSONString(data));
}
@ -958,9 +957,8 @@ public class ElasticSearchWriter extends Writer {
if (this.enableWriteNull) {
// write: {a:"1",b:null}
update = new Update.Builder(
JSONObject.toJSONString(updateDoc, SerializerFeature.WriteMapNullValue,
SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,
SerializerFeature.WriteEnumUsingToString, SerializerFeature.SortField));
JSONObject.toJSONString(updateDoc, JSONWriter.Feature.WriteMapNullValue,
JSONWriter.Feature.WriteEnumUsingToString));
// 在DEFAULT_GENERATE_FEATURE基础上只增加了SerializerFeature.WRITE_MAP_NULL_FEATURES
} else {
// write: {"a":"1"}

View File

@ -2,7 +2,7 @@ package com.alibaba.datax.plugin.writer.elasticsearchwriter;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSONObject;
public class JsonPathUtil {

View File

@ -1,8 +1,8 @@
package com.alibaba.datax.plugin.writer.elasticsearchwriter;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONException;
import com.alibaba.fastjson2.JSONObject;
/**
* @author bozu

View File

@ -1,8 +1,8 @@
package com.alibaba.datax.plugin.writer.elasticsearchwriter;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import org.apache.commons.lang3.StringUtils;

View File

@ -24,7 +24,7 @@ FtpWriter实现了从DataX协议转为FTP文件功能FTP文件本身是无结
我们不能做到:
1. 单个文件不能支持并发写入。
1. 单个文件并发写入。
## 3 功能说明

View File

@ -14,8 +14,8 @@ import org.slf4j.LoggerFactory;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.plugin.writer.ftpwriter.FtpWriterErrorCode;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONWriter;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
@ -251,7 +251,7 @@ public class SftpHelperImpl implements IFtpHelper {
@SuppressWarnings("rawtypes")
Vector allFiles = this.channelSftp.ls(dir);
LOG.debug(String.format("ls: %s", JSON.toJSONString(allFiles,
SerializerFeature.UseSingleQuotes)));
JSONWriter.Feature.UseSingleQuotes)));
for (int i = 0; i < allFiles.size(); i++) {
LsEntry le = (LsEntry) allFiles.get(i);
String strName = le.getFilename();

View File

@ -18,8 +18,8 @@ import org.slf4j.LoggerFactory;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.plugin.writer.ftpwriter.FtpWriterErrorCode;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONWriter;
public class StandardFtpHelperImpl implements IFtpHelper {
private static final Logger LOG = LoggerFactory
@ -244,7 +244,7 @@ public class StandardFtpHelperImpl implements IFtpHelper {
FTPFile[] fs = this.ftpClient.listFiles(dir);
// LOG.debug(JSON.toJSONString(this.ftpClient.listNames(dir)));
LOG.debug(String.format("ls: %s",
JSON.toJSONString(fs, SerializerFeature.UseSingleQuotes)));
JSON.toJSONString(fs, JSONWriter.Feature.UseSingleQuotes)));
for (FTPFile ff : fs) {
String strName = ff.getName();
if (strName.startsWith(prefixFileName)) {

View File

@ -19,8 +19,8 @@ import com.alibaba.datax.plugin.writer.gdbwriter.Key;
import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbEdge;
import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement;
import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbVertex;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;

View File

@ -12,8 +12,8 @@ import org.apache.commons.lang3.StringUtils;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.gdbwriter.GdbWriterErrorCode;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
/**
* @author jerrywang

View File

@ -2,8 +2,8 @@ package com.alibaba.datax.plugin.reader.hbase094xreader;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.hadoop.fs.Path;

View File

@ -2,8 +2,8 @@ package com.alibaba.datax.plugin.writer.hbase094xwriter;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.hadoop.fs.Path;

View File

@ -2,8 +2,8 @@ package com.alibaba.datax.plugin.reader.hbase11xreader;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.hadoop.hbase.HBaseConfiguration;

View File

@ -2,8 +2,8 @@ package com.alibaba.datax.plugin.reader.hbase11xsqlreader;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.mapreduce.InputSplit;

View File

@ -2,8 +2,8 @@ package com.alibaba.datax.plugin.writer.hbase11xsqlwriter;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.util.Pair;

View File

@ -2,8 +2,8 @@ package com.alibaba.datax.plugin.writer.hbase11xwriter;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.hadoop.hbase.HBaseConfiguration;

View File

@ -8,8 +8,8 @@ import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.unstructuredstorage.reader.ColumnEntry;
import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderErrorCode;
import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@ -331,9 +331,11 @@ public class DFSUtil {
//If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
//Each file as a split
//TODO multy threads
InputSplit[] splits = in.getSplits(conf, 1);
RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
// OrcInputFormat getSplits params numSplits not used, splits size = block numbers
InputSplit[] splits = in.getSplits(conf, -1);
for (InputSplit split : splits) {
{
RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);
Object key = reader.createKey();
Object value = reader.createValue();
// 获取列信息
@ -351,6 +353,8 @@ public class DFSUtil {
taskPluginCollector, isReadAllColumns, nullFormat);
}
reader.close();
}
}
} catch (Exception e) {
String message = String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。"
, sourceOrcFilePath);

View File

@ -8,8 +8,8 @@ import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.unstructuredstorage.util.ColumnTypeUtil;
import com.alibaba.datax.plugin.unstructuredstorage.util.HdfsUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;

View File

@ -15,8 +15,8 @@ import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.writer.hologresjdbcwriter.util.ConfLoader;
import com.alibaba.datax.plugin.writer.hologresjdbcwriter.util.OriginalConfPretreatmentUtil;
import com.alibaba.datax.plugin.writer.hologresjdbcwriter.util.WriterUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.hologres.client.HoloClient;
import com.alibaba.hologres.client.HoloConfig;
import com.alibaba.hologres.client.Put;

View File

@ -3,7 +3,7 @@ package com.q1.datax.plugin.writer.kudu11xwriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.kudu.ColumnSchema;

View File

@ -134,7 +134,7 @@ public class KuduWriterTask {
break;
case BOOLEAN:
synchronized (lock) {
row.addBoolean(name, Boolean.getBoolean(rawData));
row.addBoolean(name, Boolean.parseBoolean(rawData));
}
break;
case STRING:

View File

@ -8,7 +8,7 @@ import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.DataXCaseEnvUtil;
import com.alibaba.datax.common.util.RetryUtil;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSONObject;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.*;

View File

@ -114,8 +114,7 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据通过主控的J
"accessKey": "********************",
"truncate": true,
"odpsServer": "xxx/api",
"tunnelServer": "xxx",
"accountType": "aliyun"
"tunnelServer": "xxx"
}
}
}

View File

@ -18,9 +18,9 @@ import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.reader.mongodbreader.util.CollectionSplitUtil;
import com.alibaba.datax.plugin.reader.mongodbreader.util.MongoUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;

View File

@ -7,9 +7,9 @@ import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import com.alibaba.datax.plugin.writer.mongodbwriter.util.MongoUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.google.common.base.Strings;
import com.mongodb.*;
import com.mongodb.client.MongoCollection;

View File

@ -10,7 +10,7 @@ import com.alibaba.datax.plugin.rdbms.reader.Constant;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.OceanBaseReader;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.PartitionSplitUtil;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -12,7 +12,7 @@ import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.task.ConcurrentTableWriterTask;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.DbUtils;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -14,20 +14,9 @@ public class Constant {
public static final String PARTITION_SPLIT_MODE = "partition";
public static final String DEFAULT_ACCOUNT_TYPE = "aliyun";
public static final String TAOBAO_ACCOUNT_TYPE = "taobao";
// 常量字段用COLUMN_CONSTANT_FLAG 首尾包住即可
public final static String COLUMN_CONSTANT_FLAG = "'";
/**
* 以下是获取accesskey id 需要用到的常量值
*/
public static final String SKYNET_ACCESSID = "SKYNET_ACCESSID";
public static final String SKYNET_ACCESSKEY = "SKYNET_ACCESSKEY";
public static final String PARTITION_COLUMNS = "partitionColumns";
public static final String PARSED_COLUMNS = "parsedColumns";

View File

@ -24,9 +24,6 @@ public class Key {
// 当值为partition 则只切分到分区当值为record则当按照分区切分后达不到adviceNum时继续按照record切分
public final static String SPLIT_MODE = "splitMode";
// 账号类型默认为aliyun也可能为taobao等其他类型
public final static String ACCOUNT_TYPE = "accountType";
public final static String PACKAGE_AUTHORIZED_PROJECT = "packageAuthorizedProject";
public final static String IS_COMPRESS = "isCompress";

View File

@ -7,7 +7,7 @@ import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.FilterUtil;
import com.alibaba.datax.common.util.MessageSource;
import com.alibaba.datax.plugin.reader.odpsreader.util.*;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.Table;
@ -42,12 +42,6 @@ public class OdpsReader extends Reader {
this.originalConfig = super.getPluginJobConf();
this.successOnNoPartition = this.originalConfig.getBool(Key.SUCCESS_ON_NO_PATITION, false);
//如果用户没有配置accessId/accessKey,尝试从环境变量获取
String accountType = originalConfig.getString(Key.ACCOUNT_TYPE, Constant.DEFAULT_ACCOUNT_TYPE);
if (Constant.DEFAULT_ACCOUNT_TYPE.equalsIgnoreCase(accountType)) {
this.originalConfig = IdAndKeyUtil.parseAccessIdAndKey(this.originalConfig);
}
//检查必要的参数配置
OdpsUtil.checkNecessaryConfig(this.originalConfig);
//重试次数的配置检查

View File

@ -6,7 +6,7 @@ import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.MessageSource;
import com.alibaba.datax.plugin.reader.odpsreader.util.OdpsUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.data.*;
@ -200,7 +200,7 @@ public class ReaderProxy {
}
if (IS_DEBUG) {
LOG.debug(String.format("partition value details: %s",
com.alibaba.fastjson.JSON.toJSONString(partitionMap)));
com.alibaba.fastjson2.JSON.toJSONString(partitionMap)));
}
return partitionMap;
}
@ -212,7 +212,7 @@ public class ReaderProxy {
// it's will never happen, but add this checking
if (!partitionMap.containsKey(partitionColumnName)) {
String errorMessage = MESSAGE_SOURCE.message("readerproxy.3",
com.alibaba.fastjson.JSON.toJSONString(partitionMap),
com.alibaba.fastjson2.JSON.toJSONString(partitionMap),
partitionColumnName);
throw DataXException.asDataXException(
OdpsReaderErrorCode.READ_DATA_FAIL, errorMessage);

View File

@ -1,65 +0,0 @@
/**
* (C) 2010-2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.datax.plugin.reader.odpsreader.util;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.IdAndKeyRollingUtil;
import com.alibaba.datax.common.util.MessageSource;
import com.alibaba.datax.plugin.reader.odpsreader.Key;
import com.alibaba.datax.plugin.reader.odpsreader.OdpsReaderErrorCode;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class IdAndKeyUtil {
private static Logger LOG = LoggerFactory.getLogger(IdAndKeyUtil.class);
private static MessageSource MESSAGE_SOURCE = MessageSource.loadResourceBundle(IdAndKeyUtil.class);
public static Configuration parseAccessIdAndKey(Configuration originalConfig) {
String accessId = originalConfig.getString(Key.ACCESS_ID);
String accessKey = originalConfig.getString(Key.ACCESS_KEY);
// 只要 accessId,accessKey 二者配置了一个就理解为是用户本意是要直接手动配置其 accessid/accessKey
if (StringUtils.isNotBlank(accessId) || StringUtils.isNotBlank(accessKey)) {
LOG.info("Try to get accessId/accessKey from your config.");
//通过如下语句进行检查是否确实配置了
accessId = originalConfig.getNecessaryValue(Key.ACCESS_ID, OdpsReaderErrorCode.REQUIRED_VALUE);
accessKey = originalConfig.getNecessaryValue(Key.ACCESS_KEY, OdpsReaderErrorCode.REQUIRED_VALUE);
//检查完毕返回即可
return originalConfig;
} else {
Map<String, String> envProp = System.getenv();
return getAccessIdAndKeyFromEnv(originalConfig, envProp);
}
}
private static Configuration getAccessIdAndKeyFromEnv(Configuration originalConfig,
Map<String, String> envProp) {
// 如果获取到ak在getAccessIdAndKeyFromEnv中已经设置到originalConfig了
String accessKey = IdAndKeyRollingUtil.getAccessIdAndKeyFromEnv(originalConfig);
if (StringUtils.isBlank(accessKey)) {
// 无处获取既没有配置在作业中也没用在环境变量中
throw DataXException.asDataXException(OdpsReaderErrorCode.GET_ID_KEY_FAIL,
MESSAGE_SOURCE.message("idandkeyutil.2"));
}
return originalConfig;
}
}

View File

@ -76,20 +76,13 @@ public final class OdpsUtil {
defaultProject = packageAuthorizedProject;
}
String accountType = originalConfig.getString(Key.ACCOUNT_TYPE,
Constant.DEFAULT_ACCOUNT_TYPE);
Account account = null;
if (accountType.equalsIgnoreCase(Constant.DEFAULT_ACCOUNT_TYPE)) {
if (StringUtils.isNotBlank(securityToken)) {
account = new StsAccount(accessId, accessKey, securityToken);
} else {
account = new AliyunAccount(accessId, accessKey);
}
} else {
throw DataXException.asDataXException(OdpsReaderErrorCode.ACCOUNT_TYPE_ERROR,
MESSAGE_SOURCE.message("odpsutil.3", accountType));
}
Odps odps = new Odps(account);
boolean isPreCheck = originalConfig.getBool("dryRun", false);

View File

@ -71,8 +71,7 @@ ODPSWriter插件用于实现往ODPS插入或者更新数据主要提供给etl
"accessKey": "xxxx",
"truncate": true,
"odpsServer": "http://sxxx/api",
"tunnelServer": "http://xxx",
"accountType": "aliyun"
"tunnelServer": "http://xxx"
}
}
}

View File

@ -2,13 +2,6 @@ package com.alibaba.datax.plugin.writer.odpswriter;
public class Constant {
public static final String SKYNET_ACCESSID = "SKYNET_ACCESSID";
public static final String SKYNET_ACCESSKEY = "SKYNET_ACCESSKEY";
public static final String DEFAULT_ACCOUNT_TYPE = "aliyun";
public static final String TAOBAO_ACCOUNT_TYPE = "taobao";
public static final String COLUMN_POSITION = "columnPosition";

View File

@ -30,8 +30,6 @@ public final class Key {
//boolean 类型default:false
public final static String EMPTY_AS_NULL = "emptyAsNull";
public final static String ACCOUNT_TYPE = "accountType";
public final static String IS_COMPRESS = "isCompress";
// preSql

View File

@ -12,9 +12,9 @@ import com.alibaba.datax.common.util.MessageSource;
import com.alibaba.datax.plugin.writer.odpswriter.model.PartitionInfo;
import com.alibaba.datax.plugin.writer.odpswriter.model.UserDefinedFunction;
import com.alibaba.datax.plugin.writer.odpswriter.util.*;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.aliyun.odps.Odps;
import com.aliyun.odps.Table;
import com.aliyun.odps.TableSchema;
@ -62,7 +62,6 @@ public class OdpsWriter extends Writer {
private String tableName;
private String tunnelServer;
private String partition;
private String accountType;
private boolean truncate;
private String uploadId;
private TableTunnel.UploadSession masterUpload;
@ -104,8 +103,6 @@ public class OdpsWriter extends Writer {
this.tableName = this.originalConfig.getString(Key.TABLE);
this.tunnelServer = this.originalConfig.getString(Key.TUNNEL_SERVER, null);
this.dealAK();
// init odps config
this.odps = OdpsUtil.initOdpsProject(this.originalConfig);
@ -153,31 +150,6 @@ public class OdpsWriter extends Writer {
}
}
private void dealAK() {
this.accountType = this.originalConfig.getString(Key.ACCOUNT_TYPE,
Constant.DEFAULT_ACCOUNT_TYPE);
if (!Constant.DEFAULT_ACCOUNT_TYPE.equalsIgnoreCase(this.accountType) &&
!Constant.TAOBAO_ACCOUNT_TYPE.equalsIgnoreCase(this.accountType)) {
throw DataXException.asDataXException(OdpsWriterErrorCode.ACCOUNT_TYPE_ERROR,
MESSAGE_SOURCE.message("odpswriter.1", accountType));
}
this.originalConfig.set(Key.ACCOUNT_TYPE, this.accountType);
//检查accessId,accessKey配置
if (Constant.DEFAULT_ACCOUNT_TYPE
.equalsIgnoreCase(this.accountType)) {
this.originalConfig = IdAndKeyUtil.parseAccessIdAndKey(this.originalConfig);
String accessId = this.originalConfig.getString(Key.ACCESS_ID);
String accessKey = this.originalConfig.getString(Key.ACCESS_KEY);
if (IS_DEBUG) {
LOG.debug("accessId:[{}], accessKey:[{}] .", accessId,
accessKey);
}
LOG.info("accessId:[{}] .", accessId);
}
}
private void dealDynamicPartition() {
/*
* 如果显示配置了 supportDynamicPartition则以配置为准
@ -241,20 +213,6 @@ public class OdpsWriter extends Writer {
@Override
public void prepare() {
String accessId = null;
String accessKey = null;
if (Constant.DEFAULT_ACCOUNT_TYPE
.equalsIgnoreCase(this.accountType)) {
this.originalConfig = IdAndKeyUtil.parseAccessIdAndKey(this.originalConfig);
accessId = this.originalConfig.getString(Key.ACCESS_ID);
accessKey = this.originalConfig.getString(Key.ACCESS_KEY);
if (IS_DEBUG) {
LOG.debug("accessId:[{}], accessKey:[{}] .", accessId,
accessKey);
}
LOG.info("accessId:[{}] .", accessId);
}
// init odps config
this.odps = OdpsUtil.initOdpsProject(this.originalConfig);

View File

@ -6,9 +6,9 @@ import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.MessageSource;
import com.alibaba.datax.plugin.writer.odpswriter.util.OdpsUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.data.ArrayRecord;

View File

@ -4,7 +4,7 @@ import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.odpswriter.model.PartitionInfo;
import com.alibaba.datax.plugin.writer.odpswriter.model.UserDefinedFunction;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import com.google.common.base.Joiner;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;

View File

@ -1,65 +0,0 @@
/**
* (C) 2010-2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.datax.plugin.writer.odpswriter.util;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.IdAndKeyRollingUtil;
import com.alibaba.datax.common.util.MessageSource;
import com.alibaba.datax.plugin.writer.odpswriter.Key;
import com.alibaba.datax.plugin.writer.odpswriter.OdpsWriterErrorCode;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class IdAndKeyUtil {
private static Logger LOG = LoggerFactory.getLogger(IdAndKeyUtil.class);
private static final MessageSource MESSAGE_SOURCE = MessageSource.loadResourceBundle(IdAndKeyUtil.class);
public static Configuration parseAccessIdAndKey(Configuration originalConfig) {
String accessId = originalConfig.getString(Key.ACCESS_ID);
String accessKey = originalConfig.getString(Key.ACCESS_KEY);
// 只要 accessId,accessKey 二者配置了一个就理解为是用户本意是要直接手动配置其 accessid/accessKey
if (StringUtils.isNotBlank(accessId) || StringUtils.isNotBlank(accessKey)) {
LOG.info("Try to get accessId/accessKey from your config.");
//通过如下语句进行检查是否确实配置了
accessId = originalConfig.getNecessaryValue(Key.ACCESS_ID, OdpsWriterErrorCode.REQUIRED_VALUE);
accessKey = originalConfig.getNecessaryValue(Key.ACCESS_KEY, OdpsWriterErrorCode.REQUIRED_VALUE);
//检查完毕返回即可
return originalConfig;
} else {
Map<String, String> envProp = System.getenv();
return getAccessIdAndKeyFromEnv(originalConfig, envProp);
}
}
private static Configuration getAccessIdAndKeyFromEnv(Configuration originalConfig,
Map<String, String> envProp) {
// 如果获取到ak在getAccessIdAndKeyFromEnv中已经设置到originalConfig了
String accessKey = IdAndKeyRollingUtil.getAccessIdAndKeyFromEnv(originalConfig);
if (StringUtils.isBlank(accessKey)) {
// 无处获取既没有配置在作业中也没用在环境变量中
throw DataXException.asDataXException(OdpsWriterErrorCode.GET_ID_KEY_FAIL,
MESSAGE_SOURCE.message("idandkeyutil.2"));
}
return originalConfig;
}
}

View File

@ -79,7 +79,6 @@ public class OdpsUtil {
public static Odps initOdpsProject(Configuration originalConfig) {
String accountType = originalConfig.getString(Key.ACCOUNT_TYPE);
String accessId = originalConfig.getString(Key.ACCESS_ID);
String accessKey = originalConfig.getString(Key.ACCESS_KEY);
@ -88,16 +87,11 @@ public class OdpsUtil {
String securityToken = originalConfig.getString(Key.SECURITY_TOKEN);
Account account;
if (accountType.equalsIgnoreCase(Constant.DEFAULT_ACCOUNT_TYPE)) {
if (StringUtils.isNotBlank(securityToken)) {
account = new com.aliyun.odps.account.StsAccount(accessId, accessKey, securityToken);
} else {
account = new AliyunAccount(accessId, accessKey);
}
} else {
throw DataXException.asDataXException(OdpsWriterErrorCode.ACCOUNT_TYPE_ERROR,
MESSAGE_SOURCE.message("odpsutil.4", accountType));
}
Odps odps = new Odps(account);
boolean isPreCheck = originalConfig.getBool("dryRun", false);

View File

@ -44,10 +44,6 @@
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>fastjson</artifactId>
<groupId>com.alibaba</groupId>
</exclusion>
<exclusion>
<artifactId>commons-math3</artifactId>
<groupId>org.apache.commons</groupId>
@ -89,8 +85,8 @@
<!-- json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
</dependency>
<!-- opentsdb -->

View File

@ -1,6 +1,6 @@
package com.alibaba.datax.plugin.reader.conn;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import java.util.Map;

View File

@ -2,7 +2,7 @@ package com.alibaba.datax.plugin.reader.conn;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.plugin.reader.util.TSDBUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang3.StringUtils;
import java.util.List;

View File

@ -1,7 +1,7 @@
package com.alibaba.datax.plugin.reader.conn;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import net.opentsdb.core.TSDB;
import net.opentsdb.utils.Config;

View File

@ -6,7 +6,7 @@ import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.reader.conn.OpenTSDBConnection;
import com.alibaba.datax.plugin.reader.util.TimeUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;

View File

@ -1,6 +1,6 @@
package com.alibaba.datax.plugin.reader.util;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import org.apache.http.client.fluent.Content;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.ContentType;

View File

@ -1,7 +1,7 @@
package com.alibaba.datax.plugin.reader.util;
import com.alibaba.datax.plugin.reader.conn.DataPoint4TSDB;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -12,8 +12,8 @@ import com.alibaba.datax.plugin.unstructuredstorage.FileFormat;
import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil;
import com.alibaba.datax.plugin.unstructuredstorage.reader.binaryFileUtil.BinaryFileReaderUtil;
import com.alibaba.datax.plugin.unstructuredstorage.reader.split.StartEndPair;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.aliyun.oss.ClientException;
import com.aliyun.oss.OSSClient;
import com.aliyun.oss.OSSException;

View File

@ -2,8 +2,8 @@ package com.alibaba.datax.plugin.reader.ossreader.util;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.reader.ossreader.Key;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
/**
* @Author: guxuan

View File

@ -7,8 +7,8 @@ import com.alibaba.datax.plugin.unstructuredstorage.reader.Key;
import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderErrorCode;
import com.alibaba.datax.plugin.unstructuredstorage.reader.split.StartEndPair;
import com.alibaba.datax.plugin.unstructuredstorage.reader.split.UnstructuredSplitUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.aliyun.oss.OSSClient;
import com.aliyun.oss.model.GetObjectRequest;
import com.aliyun.oss.model.OSSObject;

View File

@ -14,7 +14,7 @@ import com.alibaba.datax.plugin.unstructuredstorage.writer.binaryFileUtil.Binary
import com.alibaba.datax.plugin.writer.hdfswriter.HdfsWriter;
import com.alibaba.datax.plugin.writer.osswriter.util.HandlerUtil;
import com.alibaba.datax.plugin.writer.osswriter.util.HdfsParquetUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import com.aliyun.oss.model.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;

View File

@ -5,9 +5,9 @@ import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.plugin.unstructuredstorage.writer.Key;
import com.alibaba.datax.plugin.writer.osswriter.Constant;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;

View File

@ -3,8 +3,8 @@ package com.alibaba.datax.plugin.writer.osswriter.util;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.hdfswriter.HdfsWriter;
import com.alibaba.datax.plugin.writer.osswriter.Key;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.hadoop.fs.FileSystem;

View File

@ -434,6 +434,13 @@
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>databendwriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>oscarwriter/target/datax/</directory>
<includes>
@ -483,5 +490,12 @@
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>selectdbwriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
</fileSets>
</assembly>

View File

@ -261,7 +261,7 @@ public final class OriginalConfPretreatmentUtil {
// 混合配制 table querySql
if (!ListUtil.checkIfValueSame(tableModeFlags)
|| !ListUtil.checkIfValueSame(tableModeFlags)) {
|| !ListUtil.checkIfValueSame(querySqlModeFlags)) {
throw DataXException.asDataXException(DBUtilErrorCode.TABLE_QUERYSQL_MIXED,
"您配置凌乱了. 不能同时既配置table又配置querySql. 请检查您的配置并作出修改.");
}

View File

@ -5,7 +5,7 @@ import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.reader.Constant;
import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.util.*;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;

View File

@ -720,6 +720,11 @@ public final class DBUtil {
new ArrayList<String>(), String.class);
DBUtil.doDealWithSessionConfig(conn, sessionConfig, message);
break;
case SQLServer:
sessionConfig = config.getList(Key.SESSION,
new ArrayList<String>(), String.class);
DBUtil.doDealWithSessionConfig(conn, sessionConfig, message);
break;
default:
break;
}

View File

@ -18,13 +18,14 @@ public enum DataBaseType {
PostgreSQL("postgresql", "org.postgresql.Driver"),
RDBMS("rdbms", "com.alibaba.datax.plugin.rdbms.util.DataBaseType"),
DB2("db2", "com.ibm.db2.jcc.DB2Driver"),
ADB("adb","com.mysql.jdbc.Driver"),
ADS("ads","com.mysql.jdbc.Driver"),
ClickHouse("clickhouse", "ru.yandex.clickhouse.ClickHouseDriver"),
KingbaseES("kingbasees", "com.kingbase8.Driver"),
Oscar("oscar", "com.oscar.Driver"),
OceanBase("oceanbase", "com.alipay.oceanbase.jdbc.Driver"),
StarRocks("starrocks", "com.mysql.jdbc.Driver");
StarRocks("starrocks", "com.mysql.jdbc.Driver"),
Databend("databend", "com.databend.jdbc.DatabendDriver");
private String typeName;
private String driverClassName;
@ -89,6 +90,14 @@ public enum DataBaseType {
result = jdbc + "?" + suffix;
}
break;
case ADB:
suffix = "yearIsDateType=false&zeroDateTimeBehavior=convertToNull&rewriteBatchedStatements=true&tinyInt1isBit=false";
if (jdbc.contains("?")) {
result = jdbc + "&" + suffix;
} else {
result = jdbc + "?" + suffix;
}
break;
case DRDS:
suffix = "yearIsDateType=false&zeroDateTimeBehavior=convertToNull";
if (jdbc.contains("?")) {
@ -109,6 +118,8 @@ public enum DataBaseType {
break;
case RDBMS:
break;
case Databend:
break;
case KingbaseES:
break;
case Oscar:

View File

@ -5,7 +5,7 @@ import java.text.SimpleDateFormat;
import org.apache.commons.lang3.StringUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
public class ColumnEntry {
private Integer index;

View File

@ -5,9 +5,9 @@ import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.TypeReference;
import com.csvreader.CsvReader;
import org.apache.commons.beanutils.BeanUtils;
import io.airlift.compress.snappy.SnappyCodec;

Some files were not shown because too many files have changed in this diff Show More