diff --git a/README.md b/README.md index 01bbc3ea..0ab85594 100644 --- a/README.md +++ b/README.md @@ -37,47 +37,48 @@ DataX本身作为数据同步框架,将不同数据源的同步抽象为从源 DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,详情请点击:[DataX数据源参考指南](https://github.com/alibaba/DataX/wiki/DataX-all-data-channels) -| 类型 | 数据源 | Reader(读) | Writer(写) | 文档 | +| 类型 | 数据源 | Reader(读) | Writer(写) | 文档 | |--------------|---------------------------|:---------:|:---------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:| -| RDBMS 关系型数据库 | MySQL | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md) | -| | Oracle | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/oraclewriter/doc/oraclewriter.md) | -| | OceanBase | √ | √ | [读](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) 、[写](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) | -| | SQLServer | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/sqlserverreader/doc/sqlserverreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/sqlserverwriter/doc/sqlserverwriter.md) | -| | PostgreSQL | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/postgresqlreader/doc/postgresqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/postgresqlwriter/doc/postgresqlwriter.md) | -| | DRDS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) | -| | Kingbase | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) | -| | 通用RDBMS(支持所有关系型数据库) | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/rdbmsreader/doc/rdbmsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/rdbmswriter/doc/rdbmswriter.md) | -| 阿里云数仓数据存储 | ODPS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/odpsreader/doc/odpsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/odpswriter/doc/odpswriter.md) | -| | ADB | | √ | [写](https://github.com/alibaba/DataX/blob/master/adbmysqlwriter/doc/adbmysqlwriter.md) | -| | ADS | | √ | [写](https://github.com/alibaba/DataX/blob/master/adswriter/doc/adswriter.md) | -| | OSS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/ossreader/doc/ossreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/osswriter/doc/osswriter.md) | -| | OCS | | √ | [写](https://github.com/alibaba/DataX/blob/master/ocswriter/doc/ocswriter.md) | -| | Hologres | | √ | [写](https://github.com/alibaba/DataX/blob/master/hologresjdbcwriter/doc/hologresjdbcwriter.md) | -| | AnalyticDB For PostgreSQL | | √ | 写 | -| 阿里云中间件 | datahub | √ | √ | 读 、写 | -| | SLS | √ | √ | 读 、写 | -| 阿里云图数据库 | GDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/gdbreader/doc/gdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/gdbwriter/doc/gdbwriter.md) | -| NoSQL数据存储 | OTS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/otsreader/doc/otsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/otswriter/doc/otswriter.md) | -| | Hbase0.94 | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md) | -| | Hbase1.1 | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md) | -| | Phoenix4.x | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase11xsqlreader/doc/hbase11xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xsqlwriter/doc/hbase11xsqlwriter.md) | -| | Phoenix5.x | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase20xsqlreader/doc/hbase20xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase20xsqlwriter/doc/hbase20xsqlwriter.md) | -| | MongoDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mongodbwriter/doc/mongodbwriter.md) | -| | Cassandra | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/cassandrareader/doc/cassandrareader.md) 、[写](https://github.com/alibaba/DataX/blob/master/cassandrawriter/doc/cassandrawriter.md) | -| 数仓数据存储 | StarRocks | √ | √ | 读 、[写](https://github.com/alibaba/DataX/blob/master/starrockswriter/doc/starrockswriter.md) | -| | ApacheDoris | | √ | [写](https://github.com/alibaba/DataX/blob/master/doriswriter/doc/doriswriter.md) | -| | ClickHouse | | √ | 写 | -| | Databend | | √ | [写](https://github.com/alibaba/DataX/blob/master/databendwriter/doc/databendwriter.md) | -| | Hive | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | -| | kudu | | √ | [写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | -| | selectdb | | √ | [写](https://github.com/alibaba/DataX/blob/master/selectdbwriter/doc/selectdbwriter.md) | -| 无结构化数据存储 | TxtFile | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md) | -| | FTP | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md) | -| | HDFS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | -| | Elasticsearch | | √ | [写](https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md) | -| 时间序列数据库 | OpenTSDB | √ | | [读](https://github.com/alibaba/DataX/blob/master/opentsdbreader/doc/opentsdbreader.md) | -| | TSDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/tsdbreader/doc/tsdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/tsdbwriter/doc/tsdbhttpwriter.md) | -| | TDengine | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/tdenginereader/doc/tdenginereader-CN.md) 、[写](https://github.com/alibaba/DataX/blob/master/tdenginewriter/doc/tdenginewriter-CN.md) | +| RDBMS 关系型数据库 | MySQL | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md) | +| | Oracle | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/oraclewriter/doc/oraclewriter.md) | +| | OceanBase | √ | √ | [读](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) 、[写](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) | +| | SQLServer | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/sqlserverreader/doc/sqlserverreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/sqlserverwriter/doc/sqlserverwriter.md) | +| | PostgreSQL | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/postgresqlreader/doc/postgresqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/postgresqlwriter/doc/postgresqlwriter.md) | +| | DRDS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) | +| | Kingbase | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) | +| | 通用RDBMS(支持所有关系型数据库) | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/rdbmsreader/doc/rdbmsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/rdbmswriter/doc/rdbmswriter.md) | +| 阿里云数仓数据存储 | ODPS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/odpsreader/doc/odpsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/odpswriter/doc/odpswriter.md) | +| | ADB | | √ | [写](https://github.com/alibaba/DataX/blob/master/adbmysqlwriter/doc/adbmysqlwriter.md) | +| | ADS | | √ | [写](https://github.com/alibaba/DataX/blob/master/adswriter/doc/adswriter.md) | +| | OSS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/ossreader/doc/ossreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/osswriter/doc/osswriter.md) | +| | OCS | | √ | [写](https://github.com/alibaba/DataX/blob/master/ocswriter/doc/ocswriter.md) | +| | Hologres | | √ | [写](https://github.com/alibaba/DataX/blob/master/hologresjdbcwriter/doc/hologresjdbcwriter.md) | +| | AnalyticDB For PostgreSQL | | √ | 写 | +| 阿里云中间件 | datahub | √ | √ | 读 、写 | +| | SLS | √ | √ | 读 、写 | +| 图数据库 | 阿里云 GDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/gdbreader/doc/gdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/gdbwriter/doc/gdbwriter.md) | +| | Neo4j | | √ | [写](https://github.com/alibaba/DataX/blob/master/neo4jwriter/doc/neo4jwriter.md) | +| NoSQL数据存储 | OTS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/otsreader/doc/otsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/otswriter/doc/otswriter.md) | +| | Hbase0.94 | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md) | +| | Hbase1.1 | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md) | +| | Phoenix4.x | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase11xsqlreader/doc/hbase11xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xsqlwriter/doc/hbase11xsqlwriter.md) | +| | Phoenix5.x | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase20xsqlreader/doc/hbase20xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase20xsqlwriter/doc/hbase20xsqlwriter.md) | +| | MongoDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mongodbwriter/doc/mongodbwriter.md) | +| | Cassandra | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/cassandrareader/doc/cassandrareader.md) 、[写](https://github.com/alibaba/DataX/blob/master/cassandrawriter/doc/cassandrawriter.md) | +| 数仓数据存储 | StarRocks | √ | √ | 读 、[写](https://github.com/alibaba/DataX/blob/master/starrockswriter/doc/starrockswriter.md) | +| | ApacheDoris | | √ | [写](https://github.com/alibaba/DataX/blob/master/doriswriter/doc/doriswriter.md) | +| | ClickHouse | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/clickhousereader/doc/clickhousereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/clickhousewriter/doc/clickhousewriter.md) | +| | Databend | | √ | [写](https://github.com/alibaba/DataX/blob/master/databendwriter/doc/databendwriter.md) | +| | Hive | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | +| | kudu | | √ | [写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | +| | selectdb | | √ | [写](https://github.com/alibaba/DataX/blob/master/selectdbwriter/doc/selectdbwriter.md) | +| 无结构化数据存储 | TxtFile | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md) | +| | FTP | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md) | +| | HDFS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | +| | Elasticsearch | | √ | [写](https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md) | +| 时间序列数据库 | OpenTSDB | √ | | [读](https://github.com/alibaba/DataX/blob/master/opentsdbreader/doc/opentsdbreader.md) | +| | TSDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/tsdbreader/doc/tsdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/tsdbwriter/doc/tsdbhttpwriter.md) | +| | TDengine | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/tdenginereader/doc/tdenginereader-CN.md) 、[写](https://github.com/alibaba/DataX/blob/master/tdenginewriter/doc/tdenginewriter-CN.md) | # 阿里云DataWorks数据集成 @@ -108,6 +109,11 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N # 重要版本更新说明 DataX 后续计划月度迭代更新,也欢迎感兴趣的同学提交 Pull requests,月度更新内容会介绍介绍如下。 +- [datax_v202306](https://github.com/alibaba/DataX/releases/tag/datax_v202306) + - 精简代码 + - 新增插件(neo4jwriter、clickhousewriter) + - 优化插件、修复问题(oceanbase、hdfs、databend、txtfile) + - [datax_v202303](https://github.com/alibaba/DataX/releases/tag/datax_v202303) - 精简代码 diff --git a/clickhousereader/doc/clickhousereader.md b/clickhousereader/doc/clickhousereader.md new file mode 100644 index 00000000..bf3cd203 --- /dev/null +++ b/clickhousereader/doc/clickhousereader.md @@ -0,0 +1,344 @@ + +# ClickhouseReader 插件文档 + + +___ + + +## 1 快速介绍 + +ClickhouseReader插件实现了从Clickhouse读取数据。在底层实现上,ClickhouseReader通过JDBC连接远程Clickhouse数据库,并执行相应的sql语句将数据从Clickhouse库中SELECT出来。 + +## 2 实现原理 + +简而言之,ClickhouseReader通过JDBC连接器连接到远程的Clickhouse数据库,并根据用户配置的信息生成查询SELECT SQL语句并发送到远程Clickhouse数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。 + +对于用户配置Table、Column、Where的信息,ClickhouseReader将其拼接为SQL语句发送到Clickhouse数据库;对于用户配置querySql信息,Clickhouse直接将其发送到Clickhouse数据库。 + + +## 3 功能说明 + +### 3.1 配置样例 + +* 配置一个从Clickhouse数据库同步抽取数据到本地的作业: + +``` +{ + "job": { + "setting": { + "speed": { + //设置传输速度 byte/s 尽量逼近这个速度但是不高于它. + // channel 表示通道数量,byte表示通道速度,如果单通道速度1MB,配置byte为1048576表示一个channel + "byte": 1048576 + }, + //出错限制 + "errorLimit": { + //先选择record + "record": 0, + //百分比 1表示100% + "percentage": 0.02 + } + }, + "content": [ + { + "reader": { + "name": "clickhousereader", + "parameter": { + // 数据库连接用户名 + "username": "root", + // 数据库连接密码 + "password": "root", + "column": [ + "id","name" + ], + "connection": [ + { + "table": [ + "table" + ], + "jdbcUrl": [ + "jdbc:clickhouse://[HOST_NAME]:PORT/[DATABASE_NAME]" + ] + } + ] + } + }, + "writer": { + //writer类型 + "name": "streamwriter", + // 是否打印内容 + "parameter": { + "print": true + } + } + } + ] + } +} + +``` + +* 配置一个自定义SQL的数据库同步任务到本地内容的作业: + +``` +{ + "job": { + "setting": { + "speed": { + "channel": 5 + } + }, + "content": [ + { + "reader": { + "name": "clickhousereader", + "parameter": { + "username": "root", + "password": "root", + "where": "", + "connection": [ + { + "querySql": [ + "select db_id,on_line_flag from db_info where db_id < 10" + ], + "jdbcUrl": [ + "jdbc:clickhouse://1.1.1.1:8123/default" + ] + } + ] + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "visible": false, + "encoding": "UTF-8" + } + } + } + ] + } +} +``` + + +### 3.2 参数说明 + +* **jdbcUrl** + + * 描述:描述的是到对端数据库的JDBC连接信息,使用JSON的数组描述,并支持一个库填写多个连接地址。之所以使用JSON数组描述连接信息,是因为阿里集团内部支持多个IP探测,如果配置了多个,ClickhouseReader可以依次探测ip的可连接性,直到选择一个合法的IP。如果全部连接失败,ClickhouseReader报错。 注意,jdbcUrl必须包含在connection配置单元中。对于阿里集团外部使用情况,JSON数组填写一个JDBC连接即可。 + + jdbcUrl按照Clickhouse官方规范,并可以填写连接附件控制信息。具体请参看[Clickhouse官方文档](https://clickhouse.com/docs/en/engines/table-engines/integrations/jdbc)。 + + * 必选:是
+ + * 默认值:无
+ +* **username** + + * 描述:数据源的用户名
+ + * 必选:是
+ + * 默认值:无
+ +* **password** + + * 描述:数据源指定用户名的密码
+ + * 必选:是
+ + * 默认值:无
+ +* **table** + + * 描述:所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构,ClickhouseReader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。
+ + * 必选:是
+ + * 默认值:无
+ +* **column** + + * 描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用\*代表默认使用所有列配置,例如['\*']。 + + 支持列裁剪,即列可以挑选部分列进行导出。 + + 支持列换序,即列可以不按照表schema信息进行导出。 + + 支持常量配置,用户需要按照JSON格式: + ["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"] + id为普通列名,\`table\`为包含保留在的列名,1为整形数字常量,'bazhen.csy'为字符串常量,null为空指针,to_char(a + 1)为表达式,2.3为浮点数,true为布尔值。 + + Column必须显示填写,不允许为空! + + * 必选:是
+ + * 默认值:无
+ +* **splitPk** + + * 描述:ClickhouseReader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。 + + 推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。 + + 目前splitPk仅支持整形数据切分,`不支持浮点、日期等其他类型`。如果用户指定其他非支持类型,ClickhouseReader将报错! + + splitPk如果不填写,将视作用户不对单表进行切分,ClickhouseReader使用单通道同步全量数据。 + + * 必选:否
+ + * 默认值:无
+ +* **where** + + * 描述:筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > $bizdate 。注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。
+ + where条件可以有效地进行业务增量同步。 + + * 必选:否
+ + * 默认值:无
+ +* **querySql** + + * 描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id
+ + `当用户配置querySql时,ClickhouseReader直接忽略table、column、where条件的配置`。 + + * 必选:否
+ + * 默认值:无
+ +* **fetchSize** + + * 描述:该配置项定义了插件和数据库服务器端每次批量数据获取条数,该值决定了DataX和服务器端的网络交互次数,能够较大的提升数据抽取性能。
+ + `注意,该值过大(>2048)可能造成DataX进程OOM。`。 + + * 必选:否
+ + * 默认值:1024
+ +* **session** + + * 描述:控制写入数据的时间格式,时区等的配置,如果表中有时间字段,配置该值以明确告知写入 clickhouse 的时间格式。通常配置的参数为:NLS_DATE_FORMAT,NLS_TIME_FORMAT。其配置的值为 json 格式,例如: +``` +"session": [ + "alter session set NLS_DATE_FORMAT='yyyy-mm-dd hh24:mi:ss'", + "alter session set NLS_TIMESTAMP_FORMAT='yyyy-mm-dd hh24:mi:ss'", + "alter session set NLS_TIMESTAMP_TZ_FORMAT='yyyy-mm-dd hh24:mi:ss'", + "alter session set TIME_ZONE='US/Pacific'" + ] +``` + `(注意"是 " 的转义字符串)`。 + + * 必选:否
+ + * 默认值:无
+ + +### 3.3 类型转换 + +目前ClickhouseReader支持大部分Clickhouse类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。 + +下面列出ClickhouseReader针对Clickhouse类型转换列表: + + +| DataX 内部类型| Clickhouse 数据类型 | +| -------- |--------------------------------------------------------------------------------------------| +| Long | UInt8, UInt16, UInt32, UInt64, UInt128, UInt256, Int8, Int16, Int32, Int64, Int128, Int256 | +| Double | Float32, Float64, Decimal | +| String | String, FixedString | +| Date | DATE, Date32, DateTime, DateTime64 | +| Boolean | Boolean | +| Bytes | BLOB,BFILE,RAW,LONG RAW | + + + +请注意: + +* `除上述罗列字段类型外,其他类型均不支持`。 + + +## 4 性能报告 + +### 4.1 环境准备 + +#### 4.1.1 数据特征 + +为了模拟线上真实数据,我们设计两个Clickhouse数据表,分别为: + +#### 4.1.2 机器参数 + +* 执行DataX的机器参数为: + +* Clickhouse数据库机器参数为: + +### 4.2 测试报告 + +#### 4.2.1 表1测试报告 + + +| 并发任务数| DataX速度(Rec/s)|DataX流量|网卡流量|DataX运行负载|DB运行负载| +|--------| --------|--------|--------|--------|--------| +|1| DataX 统计速度(Rec/s)|DataX统计流量|网卡流量|DataX运行负载|DB运行负载| + +## 5 约束限制 + +### 5.1 主备同步数据恢复问题 + +主备同步问题指Clickhouse使用主从灾备,备库从主库不间断通过binlog恢复数据。由于主备数据同步存在一定的时间差,特别在于某些特定情况,例如网络延迟等问题,导致备库同步恢复的数据与主库有较大差别,导致从备库同步的数据不是一份当前时间的完整镜像。 + +针对这个问题,我们提供了preSql功能,该功能待补充。 + +### 5.2 一致性约束 + +Clickhouse在数据存储划分中属于RDBMS系统,对外可以提供强一致性数据查询接口。例如当一次同步任务启动运行过程中,当该库存在其他数据写入方写入数据时,ClickhouseReader完全不会获取到写入更新数据,这是由于数据库本身的快照特性决定的。关于数据库快照特性,请参看[MVCC Wikipedia](https://en.wikipedia.org/wiki/Multiversion_concurrency_control) + +上述是在ClickhouseReader单线程模型下数据同步一致性的特性,由于ClickhouseReader可以根据用户配置信息使用了并发数据抽取,因此不能严格保证数据一致性:当ClickhouseReader根据splitPk进行数据切分后,会先后启动多个并发任务完成数据同步。由于多个并发任务相互之间不属于同一个读事务,同时多个并发任务存在时间间隔。因此这份数据并不是`完整的`、`一致的`数据快照信息。 + +针对多线程的一致性快照需求,在技术上目前无法实现,只能从工程角度解决,工程化的方式存在取舍,我们提供几个解决思路给用户,用户可以自行选择: + +1. 使用单线程同步,即不再进行数据切片。缺点是速度比较慢,但是能够很好保证一致性。 + +2. 关闭其他数据写入方,保证当前数据为静态数据,例如,锁表、关闭备库同步等等。缺点是可能影响在线业务。 + +### 5.3 数据库编码问题 + + +ClickhouseReader底层使用JDBC进行数据抽取,JDBC天然适配各类编码,并在底层进行了编码转换。因此ClickhouseReader不需用户指定编码,可以自动获取编码并转码。 + +对于Clickhouse底层写入编码和其设定的编码不一致的混乱情况,ClickhouseReader对此无法识别,对此也无法提供解决方案,对于这类情况,`导出有可能为乱码`。 + +### 5.4 增量数据同步 + +ClickhouseReader使用JDBC SELECT语句完成数据抽取工作,因此可以使用SELECT...WHERE...进行增量数据抽取,方式有多种: + +* 数据库在线应用写入数据库时,填充modify字段为更改时间戳,包括新增、更新、删除(逻辑删)。对于这类应用,ClickhouseReader只需要WHERE条件跟上一同步阶段时间戳即可。 +* 对于新增流水型数据,ClickhouseReader可以WHERE条件后跟上一阶段最大自增ID即可。 + +对于业务上无字段区分新增、修改数据情况,ClickhouseReader也无法进行增量数据同步,只能同步全量数据。 + +### 5.5 Sql安全性 + +ClickhouseReader提供querySql语句交给用户自己实现SELECT抽取语句,ClickhouseReader本身对querySql不做任何安全性校验。这块交由DataX用户方自己保证。 + +## 6 FAQ + +*** + +**Q: ClickhouseReader同步报错,报错信息为XXX** + + A: 网络或者权限问题,请使用Clickhouse命令行测试 + + +如果上述命令也报错,那可以证实是环境问题,请联系你的DBA。 + + +**Q: ClickhouseReader抽取速度很慢怎么办?** + + A: 影响抽取时间的原因大概有如下几个:(来自专业 DBA 卫绾) + 1. 由于SQL的plan异常,导致的抽取时间长; 在抽取时,尽可能使用全表扫描代替索引扫描; + 2. 合理sql的并发度,减少抽取时间; + 3. 抽取sql要简单,尽量不用replace等函数,这个非常消耗cpu,会严重影响抽取速度; diff --git a/clickhousereader/pom.xml b/clickhousereader/pom.xml new file mode 100644 index 00000000..4b095796 --- /dev/null +++ b/clickhousereader/pom.xml @@ -0,0 +1,91 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + + 4.0.0 + clickhousereader + clickhousereader + jar + + + + ru.yandex.clickhouse + clickhouse-jdbc + 0.2.4 + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + org.slf4j + slf4j-api + + + + ch.qos.logback + logback-classic + + + + com.alibaba.datax + plugin-rdbms-util + ${datax-project-version} + + + + + + + src/main/java + + **/*.properties + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + + + \ No newline at end of file diff --git a/clickhousereader/src/main/assembly/package.xml b/clickhousereader/src/main/assembly/package.xml new file mode 100644 index 00000000..2053ff75 --- /dev/null +++ b/clickhousereader/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + datax + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/reader/clickhousereader + + + target/ + + clickhousereader-0.0.1-SNAPSHOT.jar + + plugin/reader/clickhousereader + + + + + + false + plugin/reader/clickhousereader/libs + runtime + + + \ No newline at end of file diff --git a/clickhousereader/src/main/java/com/alibaba/datax/plugin/reader/clickhousereader/ClickhouseReader.java b/clickhousereader/src/main/java/com/alibaba/datax/plugin/reader/clickhousereader/ClickhouseReader.java new file mode 100644 index 00000000..bf3cad12 --- /dev/null +++ b/clickhousereader/src/main/java/com/alibaba/datax/plugin/reader/clickhousereader/ClickhouseReader.java @@ -0,0 +1,87 @@ +package com.alibaba.datax.plugin.reader.clickhousereader; + +import java.sql.Array; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Types; +import java.util.List; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.common.util.MessageSource; +import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.fastjson2.JSON; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClickhouseReader extends Reader { + + private static final DataBaseType DATABASE_TYPE = DataBaseType.ClickHouse; + private static final Logger LOG = LoggerFactory.getLogger(ClickhouseReader.class); + + public static class Job extends Reader.Job { + private static MessageSource MESSAGE_SOURCE = MessageSource.loadResourceBundle(ClickhouseReader.class); + + private Configuration jobConfig = null; + private CommonRdbmsReader.Job commonRdbmsReaderMaster; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + this.commonRdbmsReaderMaster = new CommonRdbmsReader.Job(DATABASE_TYPE); + this.commonRdbmsReaderMaster.init(this.jobConfig); + } + + @Override + public List split(int mandatoryNumber) { + return this.commonRdbmsReaderMaster.split(this.jobConfig, mandatoryNumber); + } + + @Override + public void post() { + this.commonRdbmsReaderMaster.post(this.jobConfig); + } + + @Override + public void destroy() { + this.commonRdbmsReaderMaster.destroy(this.jobConfig); + } + } + + public static class Task extends Reader.Task { + + private Configuration jobConfig; + private CommonRdbmsReader.Task commonRdbmsReaderSlave; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + this.commonRdbmsReaderSlave = new CommonRdbmsReader.Task(DATABASE_TYPE, super.getTaskGroupId(), super.getTaskId()); + this.commonRdbmsReaderSlave.init(this.jobConfig); + } + + @Override + public void startRead(RecordSender recordSender) { + int fetchSize = this.jobConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE, 1000); + + this.commonRdbmsReaderSlave.startRead(this.jobConfig, recordSender, super.getTaskPluginCollector(), fetchSize); + } + + @Override + public void post() { + this.commonRdbmsReaderSlave.post(this.jobConfig); + } + + @Override + public void destroy() { + this.commonRdbmsReaderSlave.destroy(this.jobConfig); + } + } +} diff --git a/clickhousereader/src/main/resources/plugin.json b/clickhousereader/src/main/resources/plugin.json new file mode 100644 index 00000000..5d608f6c --- /dev/null +++ b/clickhousereader/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "clickhousereader", + "class": "com.alibaba.datax.plugin.reader.clickhousereader.ClickhouseReader", + "description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql.", + "developer": "alibaba" +} \ No newline at end of file diff --git a/clickhousereader/src/main/resources/plugin_job_template.json b/clickhousereader/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..1814e510 --- /dev/null +++ b/clickhousereader/src/main/resources/plugin_job_template.json @@ -0,0 +1,16 @@ +{ + "name": "clickhousereader", + "parameter": { + "username": "username", + "password": "password", + "column": ["col1", "col2", "col3"], + "connection": [ + { + "jdbcUrl": "jdbc:clickhouse://:[/]", + "table": ["table1", "table2"] + } + ], + "preSql": [], + "postSql": [] + } +} \ No newline at end of file diff --git a/clickhousereader/src/test/java/com/alibaba/datax/plugin/reader/clickhousereader/ClickhouseReaderTest.java b/clickhousereader/src/test/java/com/alibaba/datax/plugin/reader/clickhousereader/ClickhouseReaderTest.java new file mode 100644 index 00000000..a4094020 --- /dev/null +++ b/clickhousereader/src/test/java/com/alibaba/datax/plugin/reader/clickhousereader/ClickhouseReaderTest.java @@ -0,0 +1,74 @@ +package com.alibaba.datax.plugin.reader.clickhousereader; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.dataxservice.face.eventcenter.EventLogStore; +import com.alibaba.datax.dataxservice.face.eventcenter.RuntimeContext; +import com.alibaba.datax.test.simulator.BasicReaderPluginTest; +import com.alibaba.datax.test.simulator.junit.extend.log.LoggedRunner; +import com.alibaba.datax.test.simulator.junit.extend.log.TestLogger; +import com.alibaba.fastjson.JSON; + +import org.apache.commons.lang3.ArrayUtils; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; + + +@RunWith(LoggedRunner.class) +@Ignore +public class ClickhouseReaderTest extends BasicReaderPluginTest { + @TestLogger(log = "测试basic1.json. 配置常量.") + @Test + public void testBasic1() { + RuntimeContext.setGlobalJobId(-1); + EventLogStore.init(); + List noteRecordForTest = new ArrayList(); + + List subjobs = super.doReaderTest("basic1.json", 1, noteRecordForTest); + + Assert.assertEquals(1, subjobs.size()); + Assert.assertEquals(1, noteRecordForTest.size()); + + Assert.assertEquals("[8,16,32,64,-8,-16,-32,-64,\"3.2\",\"6.4\",1,\"str_col\",\"abc\"," + "\"417ddc5d-e556-4d27-95dd-a34d84e46a50\",1580745600000,1580752800000,\"hello\",\"[1,2,3]\"," + "\"[\\\"abc\\\",\\\"cde\\\"]\",\"(8,'uint8_type')\",null,\"[1,2]\",\"[\\\"x\\\",\\\"y\\\"]\",\"127.0.0.1\",\"::\",\"23.345\"]", JSON.toJSONString(listData(noteRecordForTest.get(0)))); + } + + @Override + protected OutputStream buildDataOutput(String optionalOutputName) { + File f = new File(optionalOutputName + "-output.txt"); + try { + return new FileOutputStream(f); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } + return null; + } + + @Override + public String getTestPluginName() { + return "clickhousereader"; + } + + private Object[] listData(Record record) { + if (null == record) { + return ArrayUtils.EMPTY_OBJECT_ARRAY; + } + Object[] arr = new Object[record.getColumnNumber()]; + for (int i = 0; i < arr.length; i++) { + Column col = record.getColumn(i); + if (null != col) { + arr[i] = col.getRawData(); + } + } + return arr; + } +} diff --git a/clickhousereader/src/test/resources/basic1.json b/clickhousereader/src/test/resources/basic1.json new file mode 100755 index 00000000..c45a45e7 --- /dev/null +++ b/clickhousereader/src/test/resources/basic1.json @@ -0,0 +1,57 @@ +{ + "job": { + "setting": { + "speed": { + "channel": 5 + } + }, + "content": [ + { + "reader": { + "name": "clickhousereader", + "parameter": { + "username": "XXXX", + "password": "XXXX", + "column": [ + "uint8_col", + "uint16_col", + "uint32_col", + "uint64_col", + "int8_col", + "int16_col", + "int32_col", + "int64_col", + "float32_col", + "float64_col", + "bool_col", + "str_col", + "fixedstr_col", + "uuid_col", + "date_col", + "datetime_col", + "enum_col", + "ary_uint8_col", + "ary_str_col", + "tuple_col", + "nullable_col", + "nested_col.nested_id", + "nested_col.nested_str", + "ipv4_col", + "ipv6_col", + "decimal_col" + ], + "connection": [ + { + "table": [ + "all_type_tbl" + ], + "jdbcUrl":["jdbc:clickhouse://XXXX:8123/default"] + } + ] + } + }, + "writer": {} + } + ] + } +} \ No newline at end of file diff --git a/clickhousereader/src/test/resources/basic1.sql b/clickhousereader/src/test/resources/basic1.sql new file mode 100644 index 00000000..f937b889 --- /dev/null +++ b/clickhousereader/src/test/resources/basic1.sql @@ -0,0 +1,34 @@ +CREATE TABLE IF NOT EXISTS default.all_type_tbl +( +`uint8_col` UInt8, +`uint16_col` UInt16, +uint32_col UInt32, +uint64_col UInt64, +int8_col Int8, +int16_col Int16, +int32_col Int32, +int64_col Int64, +float32_col Float32, +float64_col Float64, +bool_col UInt8, +str_col String, +fixedstr_col FixedString(3), +uuid_col UUID, +date_col Date, +datetime_col DateTime, +enum_col Enum('hello' = 1, 'world' = 2), +ary_uint8_col Array(UInt8), +ary_str_col Array(String), +tuple_col Tuple(UInt8, String), +nullable_col Nullable(UInt8), +nested_col Nested + ( + nested_id UInt32, + nested_str String + ), +ipv4_col IPv4, +ipv6_col IPv6, +decimal_col Decimal(5,3) +) +ENGINE = MergeTree() +ORDER BY (uint8_col); \ No newline at end of file diff --git a/common/src/main/java/com/alibaba/datax/common/statistics/VMInfo.java b/common/src/main/java/com/alibaba/datax/common/statistics/VMInfo.java index cab42a4b..423c794e 100644 --- a/common/src/main/java/com/alibaba/datax/common/statistics/VMInfo.java +++ b/common/src/main/java/com/alibaba/datax/common/statistics/VMInfo.java @@ -77,8 +77,8 @@ public class VMInfo { garbageCollectorMXBeanList = java.lang.management.ManagementFactory.getGarbageCollectorMXBeans(); memoryPoolMXBeanList = java.lang.management.ManagementFactory.getMemoryPoolMXBeans(); - osInfo = runtimeMXBean.getVmVendor() + " " + runtimeMXBean.getSpecVersion() + " " + runtimeMXBean.getVmVersion(); - jvmInfo = osMXBean.getName() + " " + osMXBean.getArch() + " " + osMXBean.getVersion(); + jvmInfo = runtimeMXBean.getVmVendor() + " " + runtimeMXBean.getSpecVersion() + " " + runtimeMXBean.getVmVersion(); + osInfo = osMXBean.getName() + " " + osMXBean.getArch() + " " + osMXBean.getVersion(); totalProcessorCount = osMXBean.getAvailableProcessors(); //构建startPhyOSStatus diff --git a/core/src/main/java/com/alibaba/datax/core/transport/channel/memory/MemoryChannel.java b/core/src/main/java/com/alibaba/datax/core/transport/channel/memory/MemoryChannel.java index e49c7878..5bce085f 100755 --- a/core/src/main/java/com/alibaba/datax/core/transport/channel/memory/MemoryChannel.java +++ b/core/src/main/java/com/alibaba/datax/core/transport/channel/memory/MemoryChannel.java @@ -29,7 +29,7 @@ public class MemoryChannel extends Channel { private ReentrantLock lock; - private Condition notInsufficient, notEmpty; + private Condition notSufficient, notEmpty; public MemoryChannel(final Configuration configuration) { super(configuration); @@ -37,7 +37,7 @@ public class MemoryChannel extends Channel { this.bufferSize = configuration.getInt(CoreConstant.DATAX_CORE_TRANSPORT_EXCHANGER_BUFFERSIZE); lock = new ReentrantLock(); - notInsufficient = lock.newCondition(); + notSufficient = lock.newCondition(); notEmpty = lock.newCondition(); } @@ -75,7 +75,7 @@ public class MemoryChannel extends Channel { lock.lockInterruptibly(); int bytes = getRecordBytes(rs); while (memoryBytes.get() + bytes > this.byteCapacity || rs.size() > this.queue.remainingCapacity()) { - notInsufficient.await(200L, TimeUnit.MILLISECONDS); + notSufficient.await(200L, TimeUnit.MILLISECONDS); } this.queue.addAll(rs); waitWriterTime += System.nanoTime() - startTime; @@ -116,7 +116,7 @@ public class MemoryChannel extends Channel { waitReaderTime += System.nanoTime() - startTime; int bytes = getRecordBytes(rs); memoryBytes.addAndGet(-bytes); - notInsufficient.signalAll(); + notSufficient.signalAll(); } catch (InterruptedException e) { throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, e); diff --git a/neo4jwriter/doc/neo4jwriter.md b/neo4jwriter/doc/neo4jwriter.md new file mode 100644 index 00000000..0c6e356c --- /dev/null +++ b/neo4jwriter/doc/neo4jwriter.md @@ -0,0 +1,193 @@ +# DataX neo4jWriter 插件文档 + +## 功能简介 + +本目前市面上的neo4j 批量导入主要有Cypher Create,Load CSV,第三方或者官方提供的Batch Import。Load CSV支持节点10W级别一下,Batch Import 需要对数据库进行停机。要想实现不停机的数据写入,Cypher是最好的方式。 + +## 支持版本 + +支持Neo4j 4 和Neo4j 5,如果是Neo4j 3,需要自行将驱动降低至相对应的版本进行编译。 + +## 实现原理 + +将datax的数据转换成了neo4j驱动能识别的对象,利用 unwind 语法进行批量插入。 + +## 如何配置 + +### 配置项介绍 + +| 配置 | 说明 | 是否必须 | 默认值 | 示例 | +|:-------------------------------|--------------------| -------- | ------ | ---------------------------------------------------- | +| database | 数据库名字 | 是 | - | neo4j | +| uri | 数据库访问链接 | 是 | - | bolt://localhost:7687 | +| username | 访问用户名 | 是 | - | neo4j | +| password | 访问密码 | 是 | - | neo4j | +| bearerToken | 权限相关 | 否 | - | - | +| kerberosTicket | 权限相关 | 否 | - | - | +| cypher | 同步语句 | 是 | - | unwind $batch as row create(p) set p.name = row.name | +| batchDataVariableName | unwind 携带的数据变量名 | | | batch | +| properties | 定义neo4j中数据的属性名字和类型 | 是 | - | 见后续案例 | +| batchSize | 一批写入数据量 | 否 | 1000 | | +| maxTransactionRetryTimeSeconds | 事务运行最长时间 | 否 | 30秒 | 30 | +| maxConnectionTimeoutSeconds | 驱动最长链接时间 | 否 | 30秒 | 30 | +| retryTimes | 发生错误的重试次数 | 否 | 3次 | 3 | +| retrySleepMills | 重试失败后的等待时间 | 否 | 3秒 | 3 | + +### 支持的数据类型 +> 配置时均忽略大小写 +``` +BOOLEAN, +STRING, +LONG, +SHORT, +INTEGER, +DOUBLE, +FLOAT, +LOCAL_DATE, +LOCAL_TIME, +LOCAL_DATE_TIME, +LIST, +//map类型支持 . 属性表达式取值 +MAP, +CHAR_ARRAY, +BYTE_ARRAY, +BOOLEAN_ARRAY, +STRING_ARRAY, +LONG_ARRAY, +INT_ARRAY, +SHORT_ARRAY, +DOUBLE_ARRAY, +FLOAT_ARRAY, +Object_ARRAY +``` + +### 写节点 + +这里提供了一个写节点包含很多类型属性的例子。你可以在我的测试方法中运行。 + +```json +"writer": { + "name": "neo4jWriter", + "parameter": { + "uri": "neo4j://localhost:7687", + "username": "neo4j", + "password": "Test@12343", + "database": "neo4j", + "cypher": "unwind $batch as row create(p:Person) set p.pbool = row.pbool,p.pstring = row.pstring,p.plong = row.plong,p.pshort = row.pshort,p.pdouble=row.pdouble,p.pstringarr=row.pstringarr,p.plocaldate=row.plocaldate", + "batchDataVariableName": "batch", + "batchSize": "33", + "properties": [ + { + "name": "pbool", + "type": "BOOLEAN" + }, + { + "name": "pstring", + "type": "STRING" + }, + { + "name": "plong", + "type": "LONG" + }, + { + "name": "pshort", + "type": "SHORT" + }, + { + "name": "pdouble", + "type": "DOUBLE" + }, + { + "name": "pstringarr", + "type": "STRING_ARRAY", + "split": "," + }, + { + "name": "plocaldate", + "type": "LOCAL_DATE", + "dateFormat": "yyyy-MM-dd" + } + ] + } + } +``` + +### 写关系 + +```json +"writer": { + "name": "neo4jWriter", + "parameter": { + "uri": "neo4j://localhost:7687", + "username": "neo4j", + "password": "Test@12343", + "database": "neo4j", + "cypher": "unwind $batch as row match(p1:Person) where p1.id = row.startNodeId match(p2:Person) where p2.id = row.endNodeId create (p1)-[:LINK]->(p2)", + "batchDataVariableName": "batch", + "batch_size": "33", + "properties": [ + { + "name": "startNodeId", + "type": "STRING" + }, + { + "name": "endNodeId", + "type": "STRING" + } + ] + } + } +``` + +### 节点/关系类型动态写 + +> 需要使用AOPC函数拓展,如果你的数据库没有,请安装APOC函数拓展 + +```json + "writer": { + "name": "neo4jWriter", + "parameter": { + "uri": "bolt://localhost:7687", + "username": "yourUserName", + "password": "yourPassword", + "database": "yourDataBase", + "cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.Label + '`{id:$id})' ,{id: row.id} ) YIELD value RETURN 1 ", + "batchDataVariableName": "batch", + "batch_size": "1", + "properties": [ + { + "name": "Label", + "type": "STRING" + }, + { + "name": "id", + "type": "STRING" + } + ] + } + } +``` + +## 注意事项 + +* properties定义的顺序需要与reader端顺序一一对应。 +* 灵活使用map类型,可以免去很多数据加工的烦恼。在cypher中,可以根据 . 属性访问符号一直取值。比如 unwind $batch as row create (p) set p.name = row.prop.name,set p.age = row.prop.age,在这个例子中,prop是map类型,包含name和age两个属性。 +* 如果提示事务超时,建议调大事务运行时间或者调小batchSize +* 如果用于更新场景,遇到死锁问题影响写入,建议二开源码加入死锁异常检测,并进行重试。 + +## 性能报告 + +**JVM参数** + +16G G1垃圾收集器 8核心 + +**Neo4j数据库配置** + +32核心,256G + +**datax 配置** + +* Channel 20 batchsize = 1000 +* 任务平均流量:15.23MB/s +* 记录写入速度:44440 rec/s +* 读出记录总数:2222013 diff --git a/neo4jwriter/pom.xml b/neo4jwriter/pom.xml new file mode 100644 index 00000000..a9ae43e9 --- /dev/null +++ b/neo4jwriter/pom.xml @@ -0,0 +1,90 @@ + + + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + 4.0.0 + + neo4jwriter + neo4jwriter + jar + + + 8 + 8 + UTF-8 + 4.4.9 + 4.13.2 + 1.17.6 + + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + org.neo4j.driver + neo4j-java-driver + ${neo4j-java-driver.version} + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + + org.testcontainers + testcontainers + ${test.container.version} + + + + junit + junit + ${junit4.version} + test + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/neo4jwriter/src/main/assembly/package.xml b/neo4jwriter/src/main/assembly/package.xml new file mode 100644 index 00000000..3acbe674 --- /dev/null +++ b/neo4jwriter/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/neo4jwriter + + + target/ + + neo4jwriter-0.0.1-SNAPSHOT.jar + + plugin/writer/neo4jwriter + + + + + + false + plugin/writer/neo4jwriter/libs + runtime + + + diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java new file mode 100644 index 00000000..4451bbdf --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java @@ -0,0 +1,256 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter; + + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.common.util.RetryUtil; +import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.DateAdapter; +import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.ValueAdapter; +import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty; +import com.alibaba.datax.plugin.writer.neo4jwriter.exception.Neo4jErrorCode; +import com.alibaba.fastjson2.JSON; +import org.apache.commons.lang3.StringUtils; +import org.neo4j.driver.*; +import org.neo4j.driver.exceptions.Neo4jException; +import org.neo4j.driver.internal.value.MapValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import static com.alibaba.datax.plugin.writer.neo4jwriter.config.ConfigConstants.*; +import static com.alibaba.datax.plugin.writer.neo4jwriter.exception.Neo4jErrorCode.DATABASE_ERROR; + +public class Neo4jClient { + private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jClient.class); + private Driver driver; + + private WriteConfig writeConfig; + private RetryConfig retryConfig; + private TaskPluginCollector taskPluginCollector; + + private Session session; + + private List writerBuffer; + + + public Neo4jClient(Driver driver, + WriteConfig writeConfig, + RetryConfig retryConfig, + TaskPluginCollector taskPluginCollector) { + this.driver = driver; + this.writeConfig = writeConfig; + this.retryConfig = retryConfig; + this.taskPluginCollector = taskPluginCollector; + this.writerBuffer = new ArrayList<>(writeConfig.batchSize); + } + + public void init() { + String database = writeConfig.database; + //neo4j 3.x 没有数据库 + if (null != database && !"".equals(database)) { + this.session = driver.session(SessionConfig.forDatabase(database)); + } else { + this.session = driver.session(); + } + } + + public static Neo4jClient build(Configuration config, TaskPluginCollector taskPluginCollector) { + + Driver driver = buildNeo4jDriver(config); + String cypher = checkCypher(config); + String database = config.getString(DATABASE.getKey()); + String batchVariableName = config.getString(BATCH_DATA_VARIABLE_NAME.getKey(), + BATCH_DATA_VARIABLE_NAME.getDefaultValue()); + List neo4jProperties = JSON.parseArray(config.getString(NEO4J_PROPERTIES.getKey()), Neo4jProperty.class); + int batchSize = config.getInt(BATCH_SIZE.getKey(), BATCH_SIZE.getDefaultValue()); + int retryTimes = config.getInt(RETRY_TIMES.getKey(), RETRY_TIMES.getDefaultValue()); + + return new Neo4jClient(driver, + new WriteConfig(cypher, database, batchVariableName, neo4jProperties, batchSize), + new RetryConfig(retryTimes, config.getLong(RETRY_SLEEP_MILLS.getKey(), RETRY_SLEEP_MILLS.getDefaultValue())), + taskPluginCollector + ); + } + + private static String checkCypher(Configuration config) { + String cypher = config.getString(CYPHER.getKey()); + if (StringUtils.isBlank(cypher)) { + throw DataXException.asDataXException(Neo4jErrorCode.CONFIG_INVALID, "cypher must not null or empty"); + } + return cypher; + } + + private static Driver buildNeo4jDriver(Configuration config) { + + Config.ConfigBuilder configBuilder = Config.builder().withMaxConnectionPoolSize(1); + String uri = checkUriConfig(config); + + //connection timeout + //连接超时时间 + Long maxConnTime = config.getLong(MAX_CONNECTION_TIMEOUT_SECONDS.getKey(), MAX_TRANSACTION_RETRY_TIME.getDefaultValue()); + configBuilder + .withConnectionAcquisitionTimeout( + maxConnTime * 2, TimeUnit.SECONDS) + .withConnectionTimeout(maxConnTime, TimeUnit.SECONDS); + + + //transaction timeout + //事务运行超时时间 + Long txRetryTime = config.getLong(MAX_TRANSACTION_RETRY_TIME.getKey(), MAX_TRANSACTION_RETRY_TIME.getDefaultValue()); + configBuilder.withMaxTransactionRetryTime(txRetryTime, TimeUnit.SECONDS); + String username = config.getString(USERNAME.getKey()); + String password = config.getString(PASSWORD.getKey()); + String bearerToken = config.getString(BEARER_TOKEN.getKey()); + String kerberosTicket = config.getString(KERBEROS_TICKET.getKey()); + + if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) { + + return GraphDatabase.driver(uri, AuthTokens.basic(username, password), configBuilder.build()); + + } else if (StringUtils.isNotBlank(bearerToken)) { + + return GraphDatabase.driver(uri, AuthTokens.bearer(bearerToken), configBuilder.build()); + + } else if (StringUtils.isNotBlank(kerberosTicket)) { + + return GraphDatabase.driver(uri, AuthTokens.kerberos(kerberosTicket), configBuilder.build()); + + } + + throw DataXException.asDataXException(Neo4jErrorCode.CONFIG_INVALID, "Invalid Auth config."); + } + + private static String checkUriConfig(Configuration config) { + String uri = config.getString(URI.getKey()); + if (null == uri || uri.length() == 0) { + throw DataXException.asDataXException(Neo4jErrorCode.CONFIG_INVALID, "Invalid uri configuration"); + } + return uri; + } + + public void destroy() { + tryFlushBuffer(); + if (driver != null) { + driver.close(); + } + if (session != null) { + session.close(); + } + DateAdapter.destroy(); + } + + private void tryFlushBuffer() { + if (!writerBuffer.isEmpty()) { + doWrite(writerBuffer); + writerBuffer.clear(); + } + } + + private void tryBatchWrite() { + if (!writerBuffer.isEmpty() && writerBuffer.size() >= writeConfig.batchSize) { + doWrite(writerBuffer); + writerBuffer.clear(); + } + } + + private void doWrite(List values) { + Value batchValues = Values.parameters(this.writeConfig.batchVariableName, values); + Query query = new Query(this.writeConfig.cypher, batchValues); +// LOGGER.debug("query:{}", query.text()); +// LOGGER.debug("batch:{}", toUnwindStr(values)); + try { + RetryUtil.executeWithRetry(() -> { + session.writeTransaction(tx -> tx.run(query)); + return null; + }, this.retryConfig.retryTimes, retryConfig.retrySleepMills, true, + Collections.singletonList(Neo4jException.class)); + } catch (Exception e) { + LOGGER.error("an exception occurred while writing to the database,message:{}", e.getMessage()); + throw DataXException.asDataXException(DATABASE_ERROR, e.getMessage()); + } + + + } + + private String toUnwindStr(List values) { + StringJoiner joiner = new StringJoiner(","); + for (MapValue value : values) { + joiner.add(value.toString()); + } + return "[" + joiner + "]"; + } + + public void tryWrite(Record record) { + MapValue neo4jValue = checkAndConvert(record); + writerBuffer.add(neo4jValue); + tryBatchWrite(); + } + + private MapValue checkAndConvert(Record record) { + int sourceColNum = record.getColumnNumber(); + List neo4jProperties = writeConfig.neo4jProperties; + if (neo4jProperties == null || neo4jProperties.size() != sourceColNum) { + throw new DataXException(Neo4jErrorCode.CONFIG_INVALID, "the read and write columns do not match!"); + } + Map data = new HashMap<>(sourceColNum * 4 / 3); + for (int i = 0; i < sourceColNum; i++) { + Column column = record.getColumn(i); + Neo4jProperty neo4jProperty = neo4jProperties.get(i); + try { + + Value value = ValueAdapter.column2Value(column, neo4jProperty); + data.put(neo4jProperty.getName(), value); + } catch (Exception e) { + LOGGER.info("dirty record:{},message :{}", column, e.getMessage()); + this.taskPluginCollector.collectDirtyRecord(record, e.getMessage()); + } + } + return new MapValue(data); + } + + public List getNeo4jFields() { + return this.writeConfig.neo4jProperties; + } + + + static class RetryConfig { + int retryTimes; + long retrySleepMills; + + RetryConfig(int retryTimes, long retrySleepMills) { + this.retryTimes = retryTimes; + this.retrySleepMills = retrySleepMills; + } + } + + static class WriteConfig { + String cypher; + + String database; + + String batchVariableName; + + List neo4jProperties; + + int batchSize; + + public WriteConfig(String cypher, + String database, + String batchVariableName, + List neo4jProperties, + int batchSize) { + this.cypher = cypher; + this.database = database; + this.batchVariableName = batchVariableName; + this.neo4jProperties = neo4jProperties; + this.batchSize = batchSize; + } + + + } +} diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java new file mode 100644 index 00000000..a89f4674 --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java @@ -0,0 +1,63 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter; + +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.common.element.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class Neo4jWriter extends Writer { + public static class Job extends Writer.Job { + private static final Logger LOGGER = LoggerFactory.getLogger(Job.class); + + private Configuration jobConf = null; + @Override + public void init() { + LOGGER.info("Neo4jWriter Job init success"); + } + + @Override + public void destroy() { + LOGGER.info("Neo4jWriter Job destroyed"); + } + + @Override + public List split(int mandatoryNumber) { + List configurations = new ArrayList(mandatoryNumber); + for (int i = 0; i < mandatoryNumber; i++) { + configurations.add(this.jobConf.clone()); + } + return configurations; + } + } + + public static class Task extends Writer.Task { + private static final Logger TASK_LOGGER = LoggerFactory.getLogger(Task.class); + private Neo4jClient neo4jClient; + @Override + public void init() { + Configuration taskConf = super.getPluginJobConf(); + this.neo4jClient = Neo4jClient.build(taskConf,getTaskPluginCollector()); + this.neo4jClient.init(); + TASK_LOGGER.info("neo4j writer task init success."); + } + + @Override + public void destroy() { + this.neo4jClient.destroy(); + TASK_LOGGER.info("neo4j writer task destroyed."); + } + + @Override + public void startWrite(RecordReceiver receiver) { + Record record; + while ((record = receiver.getFromReader()) != null){ + this.neo4jClient.tryWrite(record); + } + } + } +} diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/adapter/DateAdapter.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/adapter/DateAdapter.java new file mode 100644 index 00000000..51b214bd --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/adapter/DateAdapter.java @@ -0,0 +1,70 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter.adapter; + + +import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty; +import org.testcontainers.shaded.com.google.common.base.Supplier; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; + +/** + * @author fuyouj + */ +public class DateAdapter { + private static final ThreadLocal LOCAL_DATE_FORMATTER_MAP = new ThreadLocal<>(); + private static final ThreadLocal LOCAL_TIME_FORMATTER_MAP = new ThreadLocal<>(); + private static final ThreadLocal LOCAL_DATE_TIME_FORMATTER_MAP = new ThreadLocal<>(); + private static final String DEFAULT_LOCAL_DATE_FORMATTER = "yyyy-MM-dd"; + private static final String DEFAULT_LOCAL_TIME_FORMATTER = "HH:mm:ss"; + private static final String DEFAULT_LOCAL_DATE_TIME_FORMATTER = "yyyy-MM-dd HH:mm:ss"; + + + public static LocalDate localDate(String text, Neo4jProperty neo4jProperty) { + if (LOCAL_DATE_FORMATTER_MAP.get() != null) { + return LocalDate.parse(text, LOCAL_DATE_FORMATTER_MAP.get()); + } + + String format = getOrDefault(neo4jProperty::getDateFormat, DEFAULT_LOCAL_DATE_FORMATTER); + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(format); + LOCAL_DATE_FORMATTER_MAP.set(dateTimeFormatter); + return LocalDate.parse(text, dateTimeFormatter); + } + + public static String getOrDefault(Supplier dateFormat, String defaultFormat) { + String format = dateFormat.get(); + if (null == format || "".equals(format)) { + return defaultFormat; + } else { + return format; + } + } + + public static void destroy() { + LOCAL_DATE_FORMATTER_MAP.remove(); + LOCAL_TIME_FORMATTER_MAP.remove(); + LOCAL_DATE_TIME_FORMATTER_MAP.remove(); + } + + public static LocalTime localTime(String text, Neo4jProperty neo4JProperty) { + if (LOCAL_TIME_FORMATTER_MAP.get() != null) { + return LocalTime.parse(text, LOCAL_TIME_FORMATTER_MAP.get()); + } + + String format = getOrDefault(neo4JProperty::getDateFormat, DEFAULT_LOCAL_TIME_FORMATTER); + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(format); + LOCAL_TIME_FORMATTER_MAP.set(dateTimeFormatter); + return LocalTime.parse(text, dateTimeFormatter); + } + + public static LocalDateTime localDateTime(String text, Neo4jProperty neo4JProperty) { + if (LOCAL_DATE_TIME_FORMATTER_MAP.get() != null){ + return LocalDateTime.parse(text,LOCAL_DATE_TIME_FORMATTER_MAP.get()); + } + String format = getOrDefault(neo4JProperty::getDateFormat, DEFAULT_LOCAL_DATE_TIME_FORMATTER); + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(format); + LOCAL_DATE_TIME_FORMATTER_MAP.set(dateTimeFormatter); + return LocalDateTime.parse(text, dateTimeFormatter); + } +} diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/adapter/ValueAdapter.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/adapter/ValueAdapter.java new file mode 100644 index 00000000..d0f4044d --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/adapter/ValueAdapter.java @@ -0,0 +1,95 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter.adapter; + + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty; +import com.alibaba.datax.plugin.writer.neo4jwriter.element.PropertyType; +import com.alibaba.fastjson2.JSON; +import org.neo4j.driver.Value; +import org.neo4j.driver.Values; +import org.neo4j.driver.internal.value.NullValue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +/** + * @author fuyouj + */ +public class ValueAdapter { + + + public static Value column2Value(final Column column, final Neo4jProperty neo4JProperty) { + + String typeStr = neo4JProperty.getType(); + PropertyType type = PropertyType.fromStrIgnoreCase(typeStr); + if (column.asString() == null) { + return NullValue.NULL; + } + + switch (type) { + case NULL: + return NullValue.NULL; + case MAP: + return Values.value(JSON.parseObject(column.asString(), Map.class)); + case BOOLEAN: + return Values.value(column.asBoolean()); + case STRING: + return Values.value(column.asString()); + case INTEGER: + case LONG: + return Values.value(column.asLong()); + case SHORT: + return Values.value(Short.valueOf(column.asString())); + case FLOAT: + case DOUBLE: + return Values.value(column.asDouble()); + case BYTE_ARRAY: + return Values.value(parseArrayType(neo4JProperty, column.asString(), Byte::valueOf)); + case CHAR_ARRAY: + return Values.value(parseArrayType(neo4JProperty, column.asString(), (s) -> s.charAt(0))); + case BOOLEAN_ARRAY: + return Values.value(parseArrayType(neo4JProperty, column.asString(), Boolean::valueOf)); + case STRING_ARRAY: + case Object_ARRAY: + case LIST: + return Values.value(parseArrayType(neo4JProperty, column.asString(), Function.identity())); + case LONG_ARRAY: + return Values.value(parseArrayType(neo4JProperty, column.asString(), Long::valueOf)); + case INT_ARRAY: + return Values.value(parseArrayType(neo4JProperty, column.asString(), Integer::valueOf)); + case SHORT_ARRAY: + return Values.value(parseArrayType(neo4JProperty, column.asString(), Short::valueOf)); + case DOUBLE_ARRAY: + case FLOAT_ARRAY: + return Values.value(parseArrayType(neo4JProperty, column.asString(), Double::valueOf)); + case LOCAL_DATE: + return Values.value(DateAdapter.localDate(column.asString(), neo4JProperty)); + case LOCAL_TIME: + return Values.value(DateAdapter.localTime(column.asString(), neo4JProperty)); + case LOCAL_DATE_TIME: + return Values.value(DateAdapter.localDateTime(column.asString(), neo4JProperty)); + default: + return Values.value(column.getRawData()); + + } + } + + + private static List parseArrayType(final Neo4jProperty neo4JProperty, + final String strValue, + final Function convertFunc) { + if (null == strValue || "".equals(strValue)) { + return Collections.emptyList(); + } + String split = neo4JProperty.getSplitOrDefault(); + String[] strArr = strValue.split(split); + List ans = new ArrayList<>(); + for (String s : strArr) { + ans.add(convertFunc.apply(s)); + } + return ans; + } +} diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/ConfigConstants.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/ConfigConstants.java new file mode 100644 index 00000000..eed3588e --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/ConfigConstants.java @@ -0,0 +1,116 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter.config; + + +import java.util.List; + +/** + * @author fuyouj + */ +public final class ConfigConstants { + + public static final Long DEFAULT_MAX_TRANSACTION_RETRY_SECONDS = 30L; + + public static final Long DEFAULT_MAX_CONNECTION_SECONDS = 30L; + + + + public static final Option RETRY_TIMES = + Option.builder() + .key("retryTimes") + .defaultValue(3) + .desc("The number of overwrites when an error occurs") + .build(); + + public static final Option RETRY_SLEEP_MILLS = + Option.builder() + .key("retrySleepMills") + .defaultValue(3000L) + .build(); + + /** + * cluster mode please reference + * how to connect cluster mode + */ + public static final Option URI = + Option.builder() + .key("uri") + .noDefaultValue() + .desc("uir of neo4j database") + .build(); + + public static final Option USERNAME = + Option.builder() + .key("username") + .noDefaultValue() + .desc("username for accessing the neo4j database") + .build(); + + public static final Option PASSWORD = + Option.builder() + .key("password") + .noDefaultValue() + .desc("password for accessing the neo4j database") + .build(); + + public static final Option BEARER_TOKEN = + Option.builder() + .key("bearerToken") + .noDefaultValue() + .desc("base64 encoded bearer token of the Neo4j. for Auth.") + .build(); + + public static final Option KERBEROS_TICKET = + Option.builder() + .key("kerberosTicket") + .noDefaultValue() + .desc("base64 encoded kerberos ticket of the Neo4j. for Auth.") + .build(); + + public static final Option DATABASE = + Option.builder() + .key("database") + .noDefaultValue() + .desc("database name.") + .build(); + + public static final Option CYPHER = + Option.builder() + .key("cypher") + .noDefaultValue() + .desc("cypher query.") + .build(); + + public static final Option MAX_TRANSACTION_RETRY_TIME = + Option.builder() + .key("maxTransactionRetryTimeSeconds") + .defaultValue(DEFAULT_MAX_TRANSACTION_RETRY_SECONDS) + .desc("maximum transaction retry time(seconds). transaction fail if exceeded.") + .build(); + public static final Option MAX_CONNECTION_TIMEOUT_SECONDS = + Option.builder() + .key("maxConnectionTimeoutSeconds") + .defaultValue(DEFAULT_MAX_CONNECTION_SECONDS) + .desc("The maximum amount of time to wait for a TCP connection to be established (seconds).") + .build(); + + public static final Option BATCH_DATA_VARIABLE_NAME = + Option.builder() + .key("batchDataVariableName") + .defaultValue("batch") + .desc("in a cypher statement, a variable name that represents a batch of data") + .build(); + + public static final Option> NEO4J_PROPERTIES = + Option.>builder() + .key("properties") + .noDefaultValue() + .desc("neo4j node or relation`s props") + .build(); + + public static final Option BATCH_SIZE = + Option.builder(). + key("batchSize") + .defaultValue(1000) + .desc("max batch size") + .build(); +} diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/Neo4jProperty.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/Neo4jProperty.java new file mode 100644 index 00000000..5c5867b3 --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/Neo4jProperty.java @@ -0,0 +1,82 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter.config; + +/** + * 由于dataX并不能传输数据的元数据,所以只能在writer端定义每列数据的名字 + * datax does not support data metadata, + * only the name of each column of data can be defined on neo4j writer + * + * @author fuyouj + */ +public class Neo4jProperty { + public static final String DEFAULT_SPLIT = ","; + + /** + * name of neo4j field + */ + private String name; + + /** + * neo4j type + * reference by org.neo4j.driver.Values + */ + private String type; + + /** + * for date + */ + private String dateFormat; + + /** + * for array type + */ + private String split; + + public Neo4jProperty() { + } + + public Neo4jProperty(String name, String type, String format, String split) { + this.name = name; + this.type = type; + this.dateFormat = format; + this.split = split; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getDateFormat() { + return dateFormat; + } + + public void setDateFormat(String dateFormat) { + this.dateFormat = dateFormat; + } + + public String getSplit() { + return getSplitOrDefault(); + } + + public String getSplitOrDefault() { + if (split == null || "".equals(split)) { + return DEFAULT_SPLIT; + } + return split; + } + + public void setSplit(String split) { + this.split = split; + } +} diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/Option.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/Option.java new file mode 100644 index 00000000..f22bd205 --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/Option.java @@ -0,0 +1,65 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter.config; + + +public class Option { + + public static class Builder { + private String key; + private String desc; + + private T defaultValue; + + public Builder key(String key) { + this.key = key; + return this; + } + + public Builder desc(String desc) { + this.desc = desc; + return this; + } + + public Builder defaultValue(T defaultValue) { + this.defaultValue = defaultValue; + return this; + } + + public Builder noDefaultValue() { + return this; + } + + public Option build() { + return new Option<>(this.key, this.desc, this.defaultValue); + } + } + + private final String key; + private final String desc; + + private final T defaultValue; + + public Option(String key, String desc, T defaultValue) { + this.key = key; + this.desc = desc; + this.defaultValue = defaultValue; + } + + public static Builder builder(){ + return new Builder<>(); + } + + public String getKey() { + return key; + } + + public String getDesc() { + return desc; + } + + public T getDefaultValue() { + if (defaultValue == null){ + throw new IllegalStateException(key + ":defaultValue is null"); + } + return defaultValue; + } +} diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/element/PropertyType.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/element/PropertyType.java new file mode 100644 index 00000000..b3446de7 --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/element/PropertyType.java @@ -0,0 +1,40 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter.element; + +import java.util.Arrays; + +/** + * @see org.neo4j.driver.Values + * @author fuyouj + */ +public enum PropertyType { + NULL, + BOOLEAN, + STRING, + LONG, + SHORT, + INTEGER, + DOUBLE, + FLOAT, + LOCAL_DATE, + LOCAL_TIME, + LOCAL_DATE_TIME, + LIST, + MAP, + CHAR_ARRAY, + BYTE_ARRAY, + BOOLEAN_ARRAY, + STRING_ARRAY, + LONG_ARRAY, + INT_ARRAY, + SHORT_ARRAY, + DOUBLE_ARRAY, + FLOAT_ARRAY, + Object_ARRAY; + + public static PropertyType fromStrIgnoreCase(String typeStr) { + return Arrays.stream(PropertyType.values()) + .filter(e -> e.name().equalsIgnoreCase(typeStr)) + .findFirst() + .orElse(PropertyType.STRING); + } +} diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/exception/Neo4jErrorCode.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/exception/Neo4jErrorCode.java new file mode 100644 index 00000000..d7df79ff --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/exception/Neo4jErrorCode.java @@ -0,0 +1,37 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter.exception; + +import com.alibaba.datax.common.spi.ErrorCode; + + +public enum Neo4jErrorCode implements ErrorCode { + + /** + * Invalid configuration + * 配置校验异常 + */ + CONFIG_INVALID("NEO4J_ERROR_01","invalid configuration"), + /** + * database error + * 在执行写入到数据库时抛出的异常,可能是权限异常,也可能是连接超时,或者是配置到了从节点。 + * 如果是更新操作,还会有死锁异常。具体原因根据报错信息确定,但是这与dataX无关。 + */ + DATABASE_ERROR("NEO4J_ERROR_02","database error"); + + private final String code; + private final String description; + + @Override + public String getCode() { + return code; + } + + @Override + public String getDescription() { + return description; + } + + Neo4jErrorCode(String code, String description) { + this.code = code; + this.description = description; + } +} diff --git a/neo4jwriter/src/main/resources/plugin.json b/neo4jwriter/src/main/resources/plugin.json new file mode 100644 index 00000000..3c8878f6 --- /dev/null +++ b/neo4jwriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "neo4jWriter", + "class": "com.alibaba.datax.plugin.writer.neo4jwriter.Neo4jWriter", + "description": "dataX neo4j 写插件", + "developer": "付有杰" +} \ No newline at end of file diff --git a/neo4jwriter/src/main/resources/plugin_job_template.json b/neo4jwriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..45bf3c88 --- /dev/null +++ b/neo4jwriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,42 @@ +{ + "uri": "neo4j://localhost:7687", + "username": "neo4j", + "password": "Test@12343", + "database": "neo4j", + "cypher": "unwind $batch as row create(p:Person) set p.pbool = row.pbool,p.pstring = row.pstring,p.plong = row.plong,p.pshort = row.pshort,p.pdouble=row.pdouble,p.pstringarr=row.pstringarr,p.plocaldate=row.plocaldate", + "batchDataVariableName": "batch", + "batchSize": "33", + "properties": [ + { + "name": "pbool", + //type 忽略大小写 + "type": "BOOLEAN" + }, + { + "name": "pstring", + "type": "STRING" + }, + { + "name": "plong", + "type": "LONG" + }, + { + "name": "pshort", + "type": "SHORT" + }, + { + "name": "pdouble", + "type": "DOUBLE" + }, + { + "name": "pstringarr", + "type": "STRING_ARRAY", + "split": "," + }, + { + "name": "plocaldate", + "type": "LOCAL_DATE", + "dateFormat": "yyyy-MM-dd" + } + ] +} \ No newline at end of file diff --git a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java new file mode 100644 index 00000000..53c9235e --- /dev/null +++ b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java @@ -0,0 +1,257 @@ +package com.alibaba.datax.plugin.writer; + + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.mock.MockRecord; +import com.alibaba.datax.plugin.writer.mock.MockUtil; +import com.alibaba.datax.plugin.writer.neo4jwriter.Neo4jClient; +import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty; +import com.alibaba.datax.plugin.writer.neo4jwriter.element.PropertyType; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.neo4j.driver.*; +import org.neo4j.driver.types.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.io.File; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class Neo4jWriterTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jWriterTest.class); + private static final int MOCK_NUM = 100; + private static final String CONTAINER_IMAGE = "neo4j:5.9.0"; + + private static final String CONTAINER_HOST = "neo4j-host"; + private static final int HTTP_PORT = 7474; + private static final int BOLT_PORT = 7687; + private static final String CONTAINER_NEO4J_USERNAME = "neo4j"; + private static final String CONTAINER_NEO4J_PASSWORD = "Test@12343"; + private static final URI CONTAINER_URI = URI.create("neo4j://localhost:" + BOLT_PORT); + + protected static final Network NETWORK = Network.newNetwork(); + + private GenericContainer container; + private Driver neo4jDriver; + private Session neo4jSession; + + @Before + public void init() { + DockerImageName imageName = DockerImageName.parse(CONTAINER_IMAGE); + container = + new GenericContainer<>(imageName) + .withNetwork(NETWORK) + .withNetworkAliases(CONTAINER_HOST) + .withExposedPorts(HTTP_PORT, BOLT_PORT) + .withEnv( + "NEO4J_AUTH", + CONTAINER_NEO4J_USERNAME + "/" + CONTAINER_NEO4J_PASSWORD) + .withEnv("apoc.export.file.enabled", "true") + .withEnv("apoc.import.file.enabled", "true") + .withEnv("apoc.import.file.use_neo4j_config", "true") + .withEnv("NEO4J_PLUGINS", "[\"apoc\"]") + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(CONTAINER_IMAGE))); + container.setPortBindings( + Arrays.asList( + String.format("%s:%s", HTTP_PORT, HTTP_PORT), + String.format("%s:%s", BOLT_PORT, BOLT_PORT))); + Startables.deepStart(Stream.of(container)).join(); + LOGGER.info("container started"); + Awaitility.given() + .ignoreExceptions() + .await() + .atMost(30, TimeUnit.SECONDS) + .untilAsserted(this::initConnection); + } + + @Test + public void testCreateNodeAllTypeField() { + final Result checkExists = neo4jSession.run("MATCH (p:Person) RETURN p limit 1"); + if (checkExists.hasNext()) { + neo4jSession.run("MATCH (p:Person) delete p"); + } + + Configuration configuration = Configuration.from(new File("src/test/resources/allTypeFieldNode.json")); + Neo4jClient neo4jClient = Neo4jClient.build(configuration, null); + + neo4jClient.init(); + for (int i = 0; i < MOCK_NUM; i++) { + neo4jClient.tryWrite(mockAllTypeFieldTestNode(neo4jClient.getNeo4jFields())); + } + neo4jClient.destroy(); + + + Result result = neo4jSession.run("MATCH (p:Person) return p"); + // nodes + assertTrue(result.hasNext()); + int cnt = 0; + while (result.hasNext()) { + org.neo4j.driver.Record record = result.next(); + record.get("p").get("pbool").asBoolean(); + record.get("p").get("pstring").asString(); + record.get("p").get("plong").asLong(); + record.get("p").get("pshort").asInt(); + record.get("p").get("pdouble").asDouble(); + List list = (List) record.get("p").get("pstringarr").asObject(); + record.get("p").get("plocaldate").asLocalDate(); + cnt++; + + } + assertEquals(cnt, MOCK_NUM); + } + + + /** + * 创建关系 必须先有节点 + * 所以先创建节点再模拟关系 + */ + @Test + public void testCreateRelation() { + final Result checkExists = neo4jSession.run("MATCH (p1:Person)-[r:LINK]->(p1:Person) return r limit 1"); + if (checkExists.hasNext()) { + neo4jSession.run("MATCH (p1:Person)-[r:LINK]->(p1:Person) delete r,p1,p2"); + } + + String createNodeCql = "create (p:Person) set p.id = '%s'"; + Configuration configuration = Configuration.from(new File("src/test/resources/relationship.json")); + + Neo4jClient neo4jClient = Neo4jClient.build(configuration, null); + neo4jClient.init(); + //创建节点为后续写关系做准备 + //Create nodes to prepare for subsequent write relationships + for (int i = 0; i < MOCK_NUM; i++) { + neo4jSession.run(String.format(createNodeCql, i + "start")); + neo4jSession.run(String.format(createNodeCql, i + "end")); + Record record = new MockRecord(); + record.addColumn(new StringColumn(i + "start")); + record.addColumn(new StringColumn(i + "end")); + neo4jClient.tryWrite(record); + + } + neo4jClient.destroy(); + + Result result = neo4jSession.run("MATCH (start:Person)-[r:LINK]->(end:Person) return r,start,end"); + // relationships + assertTrue(result.hasNext()); + int cnt = 0; + while (result.hasNext()) { + org.neo4j.driver.Record record = result.next(); + + Node startNode = record.get("start").asNode(); + assertTrue(startNode.hasLabel("Person")); + assertTrue(startNode.asMap().containsKey("id")); + + Node endNode = record.get("end").asNode(); + assertTrue(startNode.hasLabel("Person")); + assertTrue(endNode.asMap().containsKey("id")); + + + String name = record.get("r").type().name(); + assertEquals("RELATIONSHIP", name); + cnt++; + } + assertEquals(cnt, MOCK_NUM); + } + + /** + * neo4j中,Label和关系类型,想动态的写,需要借助于apoc函数 + */ + @Test + public void testUseApocCreateDynamicLabel() { + List dynamicLabel = new ArrayList<>(); + for (int i = 0; i < MOCK_NUM; i++) { + dynamicLabel.add("Label" + i); + } + //删除原有数据 + //remove test data if exist + //这种占位符的方式不支持批量动态写,当然可以使用union拼接,但是性能不好 + String query = "match (p:%s) return p"; + String delete = "match (p:%s) delete p"; + for (String label : dynamicLabel) { + Result result = neo4jSession.run(String.format(query, label)); + if (result.hasNext()) { + neo4jSession.run(String.format(delete, label)); + } + } + + Configuration configuration = Configuration.from(new File("src/test/resources/dynamicLabel.json")); + Neo4jClient neo4jClient = Neo4jClient.build(configuration, null); + + neo4jClient.init(); + for (int i = 0; i < dynamicLabel.size(); i++) { + Record record = new MockRecord(); + record.addColumn(new StringColumn(dynamicLabel.get(i))); + record.addColumn(new StringColumn(String.valueOf(i))); + neo4jClient.tryWrite(record); + } + neo4jClient.destroy(); + + //校验脚本的批量写入是否正确 + int cnt = 0; + for (int i = 0; i < dynamicLabel.size(); i++) { + String label = dynamicLabel.get(i); + Result result = neo4jSession.run(String.format(query, label)); + while (result.hasNext()) { + org.neo4j.driver.Record record = result.next(); + Node node = record.get("p").asNode(); + assertTrue(node.hasLabel(label)); + assertEquals(node.asMap().get("id"), i + ""); + cnt++; + } + } + assertEquals(cnt, MOCK_NUM); + + } + + + private Record mockAllTypeFieldTestNode(List neo4JProperties) { + Record mock = new MockRecord(); + for (Neo4jProperty field : neo4JProperties) { + mock.addColumn(MockUtil.mockColumnByType(PropertyType.fromStrIgnoreCase(field.getType()))); + } + return mock; + } + + @After + public void destroy() { + if (neo4jSession != null) { + neo4jSession.close(); + } + if (neo4jDriver != null) { + neo4jDriver.close(); + } + if (container != null) { + container.close(); + } + } + + private void initConnection() { + neo4jDriver = + GraphDatabase.driver( + CONTAINER_URI, + AuthTokens.basic(CONTAINER_NEO4J_USERNAME, CONTAINER_NEO4J_PASSWORD)); + neo4jSession = neo4jDriver.session(SessionConfig.forDatabase("neo4j")); + } +} diff --git a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/MockRecord.java b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/MockRecord.java new file mode 100644 index 00000000..77d3f500 --- /dev/null +++ b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/MockRecord.java @@ -0,0 +1,104 @@ +package com.alibaba.datax.plugin.writer.mock; + + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.fastjson2.JSON; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class MockRecord implements Record { + private static final int RECORD_AVERGAE_COLUMN_NUMBER = 16; + + private List columns; + + private int byteSize; + + + private Map meta; + + public MockRecord() { + this.columns = new ArrayList<>(RECORD_AVERGAE_COLUMN_NUMBER); + } + + @Override + public void addColumn(Column column) { + columns.add(column); + incrByteSize(column); + } + + @Override + public Column getColumn(int i) { + if (i < 0 || i >= columns.size()) { + return null; + } + return columns.get(i); + } + + @Override + public void setColumn(int i, final Column column) { + if (i < 0) { + throw new IllegalArgumentException("不能给index小于0的column设置值"); + } + + if (i >= columns.size()) { + expandCapacity(i + 1); + } + + decrByteSize(getColumn(i)); + this.columns.set(i, column); + incrByteSize(getColumn(i)); + } + + @Override + public String toString() { + Map json = new HashMap(); + json.put("size", this.getColumnNumber()); + json.put("data", this.columns); + return JSON.toJSONString(json); + } + + @Override + public int getColumnNumber() { + return this.columns.size(); + } + + @Override + public int getByteSize() { + return byteSize; + } + + public int getMemorySize() { + throw new UnsupportedOperationException(); + } + + @Override + public void setMeta(Map meta) { + + } + + @Override + public Map getMeta() { + return null; + } + + private void decrByteSize(final Column column) { + } + + private void incrByteSize(final Column column) { + } + + private void expandCapacity(int totalSize) { + if (totalSize <= 0) { + return; + } + + int needToExpand = totalSize - columns.size(); + while (needToExpand-- > 0) { + this.columns.add(null); + } + } +} diff --git a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/MockUtil.java b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/MockUtil.java new file mode 100644 index 00000000..8f05f1e8 --- /dev/null +++ b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/MockUtil.java @@ -0,0 +1,50 @@ +package com.alibaba.datax.plugin.writer.mock; + + +import com.alibaba.datax.common.element.*; +import com.alibaba.datax.plugin.writer.neo4jwriter.element.PropertyType; +import com.alibaba.fastjson2.JSON; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +public class MockUtil { + + public static Column mockColumnByType(PropertyType type) { + Random random = new Random(); + switch (type) { + case SHORT: + return new StringColumn("1"); + case BOOLEAN: + return new BoolColumn(random.nextInt() % 2 == 0); + case INTEGER: + case LONG: + return new LongColumn(random.nextInt(Integer.MAX_VALUE)); + case FLOAT: + case DOUBLE: + return new DoubleColumn(random.nextDouble()); + case NULL: + return null; + case BYTE_ARRAY: + return new BytesColumn(new byte[]{(byte) (random.nextInt() % 2)}); + case LOCAL_DATE: + return new StringColumn(LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); + case MAP: + return new StringColumn(JSON.toJSONString(propmap())); + case STRING_ARRAY: + return new StringColumn("[1,1,1,1,1,1,1]"); + default: + return new StringColumn("randomStr" + random.nextInt(Integer.MAX_VALUE)); + } + } + + public static Map propmap() { + Map prop = new HashMap<>(); + prop.put("name", "neo4jWriter"); + prop.put("age", "1"); + return prop; + } +} diff --git a/neo4jwriter/src/test/resources/allTypeFieldNode.json b/neo4jwriter/src/test/resources/allTypeFieldNode.json new file mode 100644 index 00000000..6d504d79 --- /dev/null +++ b/neo4jwriter/src/test/resources/allTypeFieldNode.json @@ -0,0 +1,41 @@ +{ + "uri": "neo4j://localhost:7687", + "username":"neo4j", + "password":"Test@12343", + "database":"neo4j", + "cypher": "unwind $batch as row create(p:Person) set p.pbool = row.pbool,p.pstring = row.pstring,p.plong = row.plong,p.pshort = row.pshort,p.pdouble=row.pdouble,p.pstringarr=row.pstringarr,p.plocaldate=row.plocaldate", + "batchDataVariableName": "batch", + "batchSize": "33", + "properties": [ + { + "name": "pbool", + "type": "BOOLEAN" + }, + { + "name": "pstring", + "type": "STRING" + }, + { + "name": "plong", + "type": "LONG" + }, + { + "name": "pshort", + "type": "SHORT" + }, + { + "name": "pdouble", + "type": "DOUBLE" + }, + { + "name": "pstringarr", + "type": "STRING_ARRAY", + "split": "," + }, + { + "name": "plocaldate", + "type": "LOCAL_DATE", + "dateFormat": "yyyy-MM-dd" + } + ] +} \ No newline at end of file diff --git a/neo4jwriter/src/test/resources/dynamicLabel.json b/neo4jwriter/src/test/resources/dynamicLabel.json new file mode 100644 index 00000000..05ed3e76 --- /dev/null +++ b/neo4jwriter/src/test/resources/dynamicLabel.json @@ -0,0 +1,19 @@ +{ + "uri": "bolt://localhost:7687", + "username":"neo4j", + "password":"Test@12343", + "database":"neo4j", + "cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.Label + '`{id:$id})' ,{id: row.id} ) YIELD value RETURN 1 ", + "batchDataVariableName": "batch", + "batchSize": "33", + "properties": [ + { + "name": "Label", + "type": "string" + }, + { + "name": "id", + "type": "STRING" + } + ] +} \ No newline at end of file diff --git a/neo4jwriter/src/test/resources/relationship.json b/neo4jwriter/src/test/resources/relationship.json new file mode 100644 index 00000000..cb9bbdf4 --- /dev/null +++ b/neo4jwriter/src/test/resources/relationship.json @@ -0,0 +1,19 @@ +{ + "uri": "neo4j://localhost:7687", + "username":"neo4j", + "password":"Test@12343", + "database":"neo4j", + "cypher": "unwind $batch as row match(p1:Person) where p1.id = row.startNodeId match(p2:Person) where p2.id = row.endNodeId create (p1)-[:LINK]->(p2)", + "batchDataVariableName": "batch", + "batchSize": "33", + "properties": [ + { + "name": "startNodeId", + "type": "STRING" + }, + { + "name": "endNodeId", + "type": "STRING" + } + ] +} \ No newline at end of file diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java index 06d53108..d7b8f2ed 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java @@ -1,6 +1,7 @@ package com.alibaba.datax.plugin.reader.oceanbasev10reader.util; import com.alibaba.datax.common.element.*; +import com.alibaba.datax.plugin.rdbms.reader.util.ObVersion; import com.alibaba.datax.plugin.rdbms.reader.util.SingleTableSplitUtil; import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DataBaseType; diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartitionSplitUtil.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartitionSplitUtil.java index 2929658a..ad165d99 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartitionSplitUtil.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartitionSplitUtil.java @@ -3,6 +3,7 @@ package com.alibaba.datax.plugin.reader.oceanbasev10reader.util; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.rdbms.reader.Constant; import com.alibaba.datax.plugin.rdbms.reader.Key; +import com.alibaba.datax.plugin.rdbms.reader.util.ObVersion; import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.ObReaderKey; diff --git a/oceanbasev10reader/src/main/libs/oceanbase-client-1.1.10.jar b/oceanbasev10reader/src/main/libs/oceanbase-client-1.1.10.jar deleted file mode 100644 index 38162912..00000000 Binary files a/oceanbasev10reader/src/main/libs/oceanbase-client-1.1.10.jar and /dev/null differ diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java index 3bcc1019..06292db5 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java @@ -86,6 +86,7 @@ public class OceanBaseV10Writer extends Writer { if (tableNumber == 1) { this.commonJob.prepare(this.originalConfig); final String version = fetchServerVersion(originalConfig); + ObWriterUtils.setObVersion(version); originalConfig.set(Config.OB_VERSION, version); } @@ -187,8 +188,9 @@ public class OceanBaseV10Writer extends Writer { } private String fetchServerVersion(Configuration config) { - final String fetchVersionSql = "show variables like 'version'"; - return DbUtils.fetchSingleValueWithRetry(config, fetchVersionSql); + final String fetchVersionSql = "show variables like 'version_comment'"; + String versionComment = DbUtils.fetchSingleValueWithRetry(config, fetchVersionSql); + return versionComment.split(" ")[1]; } private void checkCompatibleMode(Configuration configure) { diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/DbUtils.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/DbUtils.java index e590fe6b..ec26e788 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/DbUtils.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/DbUtils.java @@ -3,18 +3,17 @@ package com.alibaba.datax.plugin.writer.oceanbasev10writer.util; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DataBaseType; -import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; import com.alibaba.datax.plugin.rdbms.writer.Constant; import com.alibaba.datax.plugin.rdbms.writer.Key; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DbUtils { @@ -25,7 +24,7 @@ public class DbUtils { final String password = config.getString(Key.PASSWORD); String jdbcUrl = config.getString(Key.JDBC_URL); - if(jdbcUrl == null) { + if (jdbcUrl == null) { List conns = config.getList(Constant.CONN_MARK, Object.class); Configuration connConf = Configuration.from(conns.get(0).toString()); jdbcUrl = connConf.getString(Key.JDBC_URL); @@ -34,9 +33,9 @@ public class DbUtils { Connection conn = null; PreparedStatement stmt = null; ResultSet result = null; - boolean need_retry = false; String value = null; int retry = 0; + int failTryCount = config.getInt(Config.FAIL_TRY_COUNT, Config.DEFAULT_FAIL_TRY_COUNT); do { try { if (retry > 0) { @@ -58,13 +57,12 @@ public class DbUtils { LOG.info("value for query [{}] is [{}]", query, value); break; } catch (SQLException e) { - need_retry = true; ++retry; LOG.warn("fetch value with {} error {}", query, e); } finally { DBUtil.closeDBResources(result, stmt, conn); } - } while (need_retry); + } while (retry < failTryCount); return value; } diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java index edc4b236..037e4ce5 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java @@ -1,5 +1,6 @@ package com.alibaba.datax.plugin.writer.oceanbasev10writer.util; +import com.alibaba.datax.plugin.rdbms.reader.util.ObVersion; import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task; import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config; @@ -18,8 +19,11 @@ public class ObWriterUtils { private static final String ORACLE_KEYWORDS = "ACCESS,ADD,ALL,ALTER,AND,ANY,ARRAYLEN,AS,ASC,AUDIT,BETWEEN,BY,CHAR,CHECK,CLUSTER,COLUMN,COMMENT,COMPRESS,CONNECT,CREATE,CURRENT,DATE,DECIMAL,DEFAULT,DELETE,DESC,DISTINCT,DROP,ELSE,EXCLUSIVE,EXISTS,FILE,FLOAT,FOR,FROM,GRANT,GROUP,HAVING,IDENTIFIED,IMMEDIATE,IN,INCREMENT,INDEX,INITIAL,INSERT,INTEGER,INTERSECT,INTO,IS,LEVEL,LIKE,LOCK,LONG,MAXEXTENTS,MINUS,MODE,MODIFY,NOAUDIT,NOCOMPRESS,NOT,NOTFOUND,NOWAIT,NULL,NUMBER,OF,OFFLINE,ON,ONLINE,OPTION,OR,ORDER,PCTFREE,PRIOR,PRIVILEGES,PUBLIC,RAW,RENAME,RESOURCE,REVOKE,ROW,ROWID,ROWLABEL,ROWNUM,ROWS,SELECT,SESSION,SET,SHARE,SIZE,SMALLINT,SQLBUF,START,SUCCESSFUL,SYNONYM,TABLE,THEN,TO,TRIGGER,UID,UNION,UNIQUE,UPDATE,USER,VALIDATE,VALUES,VARCHAR,VARCHAR2,VIEW,WHENEVER,WHERE,WITH"; private static String CHECK_MEMSTORE = "select 1 from %s.gv$memstore t where t.total>t.mem_limit * ?"; + private static final String CHECK_MEMSTORE_4_0 = "select 1 from %s.gv$ob_memstore t where t.MEMSTORE_USED>t.MEMSTORE_LIMIT * ?"; + private static Set databaseKeywords; private static String compatibleMode = null; + private static String obVersion = null; protected static final Logger LOG = LoggerFactory.getLogger(Task.class); private static Set keywordsFromString2HashSet(final String keywords) { return new HashSet(Arrays.asList(keywords.split(","))); @@ -61,7 +65,7 @@ public class ObWriterUtils { if (isOracleMode()) { sysDbName = "sys"; } - ps = conn.prepareStatement(String.format(CHECK_MEMSTORE, sysDbName)); + ps = conn.prepareStatement(String.format(getMemStoreSql(), sysDbName)); ps.setDouble(1, memstoreThreshold); rs = ps.executeQuery(); // 只要有满足条件的,则表示当前租户 有个机器的memstore即将满 @@ -81,6 +85,14 @@ public class ObWriterUtils { return (compatibleMode.equals(Config.OB_COMPATIBLE_MODE_ORACLE)); } + private static String getMemStoreSql() { + if (ObVersion.valueOf(obVersion).compareTo(ObVersion.V4000) >= 0) { + return CHECK_MEMSTORE_4_0; + } else { + return CHECK_MEMSTORE; + } + } + public static String getCompatibleMode() { return compatibleMode; } @@ -89,6 +101,10 @@ public class ObWriterUtils { compatibleMode = mode; } + public static void setObVersion(String version) { + obVersion = version; + } + private static String buildDeleteSql (String tableName, List columns) { StringBuilder builder = new StringBuilder("DELETE FROM "); builder.append(tableName).append(" WHERE "); diff --git a/oceanbasev10writer/src/main/libs/oceanbase-client-1.1.10.jar b/oceanbasev10writer/src/main/libs/oceanbase-client-1.1.10.jar deleted file mode 100644 index 38162912..00000000 Binary files a/oceanbasev10writer/src/main/libs/oceanbase-client-1.1.10.jar and /dev/null differ diff --git a/package.xml b/package.xml old mode 100755 new mode 100644 index 4c1aff04..41cd9c55 --- a/package.xml +++ b/package.xml @@ -145,6 +145,13 @@ datax + + clickhousereader/target/datax/ + + **/*.* + + datax + hdfsreader/target/datax/ @@ -497,5 +504,12 @@ datax + + neo4jwriter/target/datax/ + + **/*.* + + datax + diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObVersion.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ObVersion.java similarity index 97% rename from oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObVersion.java rename to plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ObVersion.java index 2fc414ce..0eb34feb 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObVersion.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ObVersion.java @@ -1,4 +1,4 @@ -package com.alibaba.datax.plugin.reader.oceanbasev10reader.util; +package com.alibaba.datax.plugin.rdbms.reader.util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java old mode 100755 new mode 100644 index 10cfe795..844b6cfd --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java @@ -7,6 +7,7 @@ import com.alibaba.datax.plugin.rdbms.reader.Key; import com.alibaba.datax.plugin.rdbms.util.*; import com.alibaba.fastjson2.JSON; +import java.text.MessageFormat; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -20,6 +21,7 @@ import java.sql.ResultSetMetaData; import java.sql.Types; import java.util.ArrayList; import java.util.List; +import static org.apache.commons.lang3.StringUtils.EMPTY; public class SingleTableSplitUtil { private static final Logger LOG = LoggerFactory @@ -277,7 +279,24 @@ public class SingleTableSplitUtil { String splitPK = configuration.getString(Key.SPLIT_PK).trim(); String table = configuration.getString(Key.TABLE).trim(); String where = configuration.getString(Key.WHERE, null); - return genPKSql(splitPK,table,where); + String obMode = configuration.getString("obCompatibilityMode"); + // OceanBase对SELECT MIN(%s),MAX(%s) FROM %s这条sql没有做查询改写,会进行全表扫描,在数据量的时候查询耗时很大甚至超时; + // 所以对于OceanBase数据库,查询模板需要改写为分别查询最大值和最小值。这样可以提升查询数量级的性能。 + if (DATABASE_TYPE == DataBaseType.OceanBase && StringUtils.isNotEmpty(obMode)) { + boolean isOracleMode = "ORACLE".equalsIgnoreCase(obMode); + + String minMaxTemplate = isOracleMode ? "select v2.id as min_a, v1.id as max_a from (" + + "select * from (select %s as id from %s {0} order by id desc) where rownum =1 ) v1," + + "(select * from (select %s as id from %s order by id asc) where rownum =1 ) v2;" : + "select v2.id as min_a, v1.id as max_a from (select %s as id from %s {0} order by id desc limit 1) v1," + + "(select %s as id from %s order by id asc limit 1) v2;"; + + String pkRangeSQL = String.format(minMaxTemplate, splitPK, table, splitPK, table); + String whereString = StringUtils.isNotBlank(where) ? String.format("WHERE (%s AND %s IS NOT NULL)", where, splitPK) : EMPTY; + pkRangeSQL = MessageFormat.format(pkRangeSQL, whereString); + return pkRangeSQL; + } + return genPKSql(splitPK, table, where); } public static String genPKSql(String splitPK, String table, String where){ diff --git a/pom.xml b/pom.xml index 52f80372..8be96a05 100644 --- a/pom.xml +++ b/pom.xml @@ -70,6 +70,7 @@ ftpreader txtfilereader streamreader + clickhousereader mongodbreader tdenginereader @@ -125,7 +126,6 @@ doriswriter selectdbwriter adbmysqlwriter - sybasewriter plugin-rdbms-util diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriter.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriter.java index 75b2df3a..d5f2887a 100755 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriter.java +++ b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriter.java @@ -78,7 +78,7 @@ public class StarRocksWriter extends Writer { List renderedPostSqls = StarRocksWriterUtil.renderPreOrPostSqls(options.getPostSqlList(), options.getTable()); if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) { Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password); - LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl); + LOG.info("Begin to execute postSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl); StarRocksWriterUtil.executeSqls(conn, renderedPostSqls); DBUtil.closeDBResources(null, null, conn); } diff --git a/tdenginewriter/doc/tdenginewriter.md b/tdenginewriter/doc/tdenginewriter.md index ba20fdb7..d23a2b4f 100644 --- a/tdenginewriter/doc/tdenginewriter.md +++ b/tdenginewriter/doc/tdenginewriter.md @@ -14,7 +14,7 @@ TDengineWriter can be used as a data migration tool for DBAs to import data from TDengineWriter obtains the protocol data generated by Reader through DataX framework, connects to TDengine through JDBC Driver, executes insert statement /schemaless statement, and writes the data to TDengine. -In TDengine, table can be divided into super table, sub-table and ordinary table. Super table and sub-table include Colum and Tag. The value of tag column of sub-table is fixed value. (details please refer to: [data model](https://www.taosdata.com/docs/cn/v2.0/architecture#model)) +In TDengine, table can be divided into super table, sub-table and ordinary table. Super table and sub-table include Column and Tag. The value of tag column of sub-table is fixed value. (details please refer to: [data model](https://www.taosdata.com/docs/cn/v2.0/architecture#model)) The TDengineWriter can write data to super tables, sub-tables, and ordinary tables using the following methods based on the type of the table and whether the column parameter contains TBName: diff --git a/transformer/doc/transformer.md b/transformer/doc/transformer.md index 260c0fb6..0a00dbaa 100644 --- a/transformer/doc/transformer.md +++ b/transformer/doc/transformer.md @@ -42,7 +42,7 @@ dx_substr(1,"5","10") column 1的value为“dataxTest”=>"Test" * 举例: ``` dx_replace(1,"2","4","****") column 1的value为“dataxTest”=>"da****est" -dx_replace(1,"5","10","****") column 1的value为“dataxTest”=>"data****" +dx_replace(1,"5","10","****") column 1的value为“dataxTest”=>"datax****" ``` 4. dx_filter (关联filter暂不支持,即多个字段的联合判断,函参太过复杂,用户难以使用。) * 参数: diff --git a/userGuid.md b/userGuid.md index 876bae99..2bd23876 100644 --- a/userGuid.md +++ b/userGuid.md @@ -17,7 +17,7 @@ DataX本身作为数据同步框架,将不同数据源的同步抽象为从源 * 工具部署 - * 方法一、直接下载DataX工具包:[DataX下载地址](https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz) + * 方法一、直接下载DataX工具包:[DataX下载地址](https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202306/datax.tar.gz) 下载后解压至本地某个目录,进入bin目录,即可运行同步作业: