diff --git a/rdb2graph/README.md b/rdb2graph/README.md
new file mode 100644
index 00000000..b0cedf64
--- /dev/null
+++ b/rdb2graph/README.md
@@ -0,0 +1,3 @@
+- 关系/图转换插件,该插件属于数据同构同步,模型一致,属性名称一致,优点是自动化高,不用多大干预,适合生成中间数据,
+ 特别是,关系数据库系统与图数据库系统不在同一网段,网络不稳定,同步尽量简单显得很重要
+- schema转换 4.3+
diff --git a/rdb2graph/mysqlreader4graph/doc/mysqlreader.md b/rdb2graph/mysqlreader4graph/doc/mysqlreader.md
new file mode 100644
index 00000000..3ae52afb
--- /dev/null
+++ b/rdb2graph/mysqlreader4graph/doc/mysqlreader.md
@@ -0,0 +1,368 @@
+
+# MysqlReader 插件文档
+
+
+___
+
+
+
+## 1 快速介绍
+
+MysqlReader插件实现了从Mysql读取数据。在底层实现上,MysqlReader通过JDBC连接远程Mysql数据库,并执行相应的sql语句将数据从mysql库中SELECT出来。
+
+**不同于其他关系型数据库,MysqlReader不支持FetchSize.**
+
+## 2 实现原理
+
+简而言之,MysqlReader通过JDBC连接器连接到远程的Mysql数据库,并根据用户配置的信息生成查询SELECT SQL语句,然后发送到远程Mysql数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。
+
+对于用户配置Table、Column、Where的信息,MysqlReader将其拼接为SQL语句发送到Mysql数据库;对于用户配置querySql信息,MysqlReader直接将其发送到Mysql数据库。
+
+
+## 3 功能说明
+
+### 3.1 配置样例
+
+* 配置一个从Mysql数据库同步抽取数据到本地的作业:
+
+```
+{
+ "job": {
+ "setting": {
+ "speed": {
+ "channel": 3
+ },
+ "errorLimit": {
+ "record": 0,
+ "percentage": 0.02
+ }
+ },
+ "content": [
+ {
+ "reader": {
+ "name": "mysqlreader",
+ "parameter": {
+ "username": "root",
+ "password": "root",
+ "column": [
+ "id",
+ "name"
+ ],
+ "splitPk": "db_id",
+ "connection": [
+ {
+ "table": [
+ "table"
+ ],
+ "jdbcUrl": [
+ "jdbc:mysql://127.0.0.1:3306/database"
+ ]
+ }
+ ]
+ }
+ },
+ "writer": {
+ "name": "streamwriter",
+ "parameter": {
+ "print":true
+ }
+ }
+ }
+ ]
+ }
+}
+
+```
+
+* 配置一个自定义SQL的数据库同步任务到本地内容的作业:
+
+```
+{
+ "job": {
+ "setting": {
+ "speed": {
+ "channel":1
+ }
+ },
+ "content": [
+ {
+ "reader": {
+ "name": "mysqlreader",
+ "parameter": {
+ "username": "root",
+ "password": "root",
+ "connection": [
+ {
+ "querySql": [
+ "select db_id,on_line_flag from db_info where db_id < 10;"
+ ],
+ "jdbcUrl": [
+ "jdbc:mysql://bad_ip:3306/database",
+ "jdbc:mysql://127.0.0.1:bad_port/database",
+ "jdbc:mysql://127.0.0.1:3306/database"
+ ]
+ }
+ ]
+ }
+ },
+ "writer": {
+ "name": "streamwriter",
+ "parameter": {
+ "print": false,
+ "encoding": "UTF-8"
+ }
+ }
+ }
+ ]
+ }
+}
+```
+
+
+### 3.2 参数说明
+
+* **jdbcUrl**
+
+ * 描述:描述的是到对端数据库的JDBC连接信息,使用JSON的数组描述,并支持一个库填写多个连接地址。之所以使用JSON数组描述连接信息,是因为阿里集团内部支持多个IP探测,如果配置了多个,MysqlReader可以依次探测ip的可连接性,直到选择一个合法的IP。如果全部连接失败,MysqlReader报错。 注意,jdbcUrl必须包含在connection配置单元中。对于阿里集团外部使用情况,JSON数组填写一个JDBC连接即可。
+
+ jdbcUrl按照Mysql官方规范,并可以填写连接附件控制信息。具体请参看[Mysql官方文档](http://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html)。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **username**
+
+ * 描述:数据源的用户名
+
+ * 必选:是
+
+ * 默认值:无
+
+* **password**
+
+ * 描述:数据源指定用户名的密码
+
+ * 必选:是
+
+ * 默认值:无
+
+* **table**
+
+ * 描述:所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构,MysqlReader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **column**
+
+ * 描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用\*代表默认使用所有列配置,例如['\*']。
+
+ 支持列裁剪,即列可以挑选部分列进行导出。
+
+ 支持列换序,即列可以不按照表schema信息进行导出。
+
+ 支持常量配置,用户需要按照Mysql SQL语法格式:
+ ["id", "\`table\`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"]
+ id为普通列名,\`table\`为包含保留在的列名,1为整形数字常量,'bazhen.csy'为字符串常量,null为空指针,to_char(a + 1)为表达式,2.3为浮点数,true为布尔值。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **splitPk**
+
+ * 描述:MysqlReader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。
+
+ 推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。
+
+ 目前splitPk仅支持整形数据切分,`不支持浮点、字符串、日期等其他类型`。如果用户指定其他非支持类型,MysqlReader将报错!
+
+ 如果splitPk不填写,包括不提供splitPk或者splitPk值为空,DataX视作使用单通道同步该表数据。
+
+ * 必选:否
+
+ * 默认值:空
+
+* **where**
+
+ * 描述:筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > $bizdate 。注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。
+
+ where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或者value,DataX均视作同步全量数据。
+
+ * 必选:否
+
+ * 默认值:无
+
+* **querySql**
+
+ * 描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id
+
+ `当用户配置querySql时,MysqlReader直接忽略table、column、where条件的配置`,querySql优先级大于table、column、where选项。
+
+ * 必选:否
+
+ * 默认值:无
+
+
+### 3.3 类型转换
+
+目前MysqlReader支持大部分Mysql类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。
+
+下面列出MysqlReader针对Mysql类型转换列表:
+
+
+| DataX 内部类型| Mysql 数据类型 |
+| -------- | ----- |
+| Long |int, tinyint, smallint, mediumint, int, bigint|
+| Double |float, double, decimal|
+| String |varchar, char, tinytext, text, mediumtext, longtext, year |
+| Date |date, datetime, timestamp, time |
+| Boolean |bit, bool |
+| Bytes |tinyblob, mediumblob, blob, longblob, varbinary |
+
+
+
+请注意:
+
+* `除上述罗列字段类型外,其他类型均不支持`。
+* `tinyint(1) DataX视作为整形`。
+* `year DataX视作为字符串类型`
+* `bit DataX属于未定义行为`。
+
+## 4 性能报告
+
+### 4.1 环境准备
+
+#### 4.1.1 数据特征
+建表语句:
+
+ CREATE TABLE `tc_biz_vertical_test_0000` (
+ `biz_order_id` bigint(20) NOT NULL COMMENT 'id',
+ `key_value` varchar(4000) NOT NULL COMMENT 'Key-value的内容',
+ `gmt_create` datetime NOT NULL COMMENT '创建时间',
+ `gmt_modified` datetime NOT NULL COMMENT '修改时间',
+ `attribute_cc` int(11) DEFAULT NULL COMMENT '防止并发修改的标志',
+ `value_type` int(11) NOT NULL DEFAULT '0' COMMENT '类型',
+ `buyer_id` bigint(20) DEFAULT NULL COMMENT 'buyerid',
+ `seller_id` bigint(20) DEFAULT NULL COMMENT 'seller_id',
+ PRIMARY KEY (`biz_order_id`,`value_type`),
+ KEY `idx_biz_vertical_gmtmodified` (`gmt_modified`)
+ ) ENGINE=InnoDB DEFAULT CHARSET=gbk COMMENT='tc_biz_vertical'
+
+
+单行记录类似于:
+
+ biz_order_id: 888888888
+ key_value: ;orderIds:20148888888,2014888888813800;
+ gmt_create: 2011-09-24 11:07:20
+ gmt_modified: 2011-10-24 17:56:34
+ attribute_cc: 1
+ value_type: 3
+ buyer_id: 8888888
+ seller_id: 1
+
+#### 4.1.2 机器参数
+
+* 执行DataX的机器参数为:
+ 1. cpu: 24核 Intel(R) Xeon(R) CPU E5-2630 0 @ 2.30GHz
+ 2. mem: 48GB
+ 3. net: 千兆双网卡
+ 4. disc: DataX 数据不落磁盘,不统计此项
+
+* Mysql数据库机器参数为:
+ 1. cpu: 32核 Intel(R) Xeon(R) CPU E5-2650 v2 @ 2.60GHz
+ 2. mem: 256GB
+ 3. net: 千兆双网卡
+ 4. disc: BTWL419303E2800RGN INTEL SSDSC2BB800G4 D2010370
+
+#### 4.1.3 DataX jvm 参数
+
+ -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
+
+
+### 4.2 测试报告
+
+#### 4.2.1 单表测试报告
+
+
+| 通道数| 是否按照主键切分| DataX速度(Rec/s)|DataX流量(MB/s)| DataX机器网卡进入流量(MB/s)|DataX机器运行负载|DB网卡流出流量(MB/s)|DB运行负载|
+|--------|--------| --------|--------|--------|--------|--------|--------|
+|1| 否 | 183185 | 18.11 | 29| 0.6 | 31| 0.6 |
+|1| 是 | 183185 | 18.11 | 29| 0.6 | 31| 0.6 |
+|4| 否 | 183185 | 18.11 | 29| 0.6 | 31| 0.6 |
+|4| 是 | 329733 | 32.60 | 58| 0.8 | 60| 0.76 |
+|8| 否 | 183185 | 18.11 | 29| 0.6 | 31| 0.6 |
+|8| 是 | 549556 | 54.33 | 115| 1.46 | 120| 0.78 |
+
+说明:
+
+1. 这里的单表,主键类型为 bigint(20),范围为:190247559466810-570722244711460,从主键范围划分看,数据分布均匀。
+2. 对单表如果没有安装主键切分,那么配置通道个数不会提升速度,效果与1个通道一样。
+
+
+#### 4.2.2 分表测试报告(2个分库,每个分库16张分表,共计32张分表)
+
+
+| 通道数| DataX速度(Rec/s)|DataX流量(MB/s)| DataX机器网卡进入流量(MB/s)|DataX机器运行负载|DB网卡流出流量(MB/s)|DB运行负载|
+|--------| --------|--------|--------|--------|--------|--------|
+|1| 202241 | 20.06 | 31.5| 1.0 | 32 | 1.1 |
+|4| 726358 | 72.04 | 123.9 | 3.1 | 132 | 3.6 |
+|8|1074405 | 106.56| 197 | 5.5 | 205| 5.1|
+|16| 1227892 | 121.79 | 229.2 | 8.1 | 233 | 7.3 |
+
+## 5 约束限制
+
+### 5.1 主备同步数据恢复问题
+
+主备同步问题指Mysql使用主从灾备,备库从主库不间断通过binlog恢复数据。由于主备数据同步存在一定的时间差,特别在于某些特定情况,例如网络延迟等问题,导致备库同步恢复的数据与主库有较大差别,导致从备库同步的数据不是一份当前时间的完整镜像。
+
+针对这个问题,我们提供了preSql功能,该功能待补充。
+
+### 5.2 一致性约束
+
+Mysql在数据存储划分中属于RDBMS系统,对外可以提供强一致性数据查询接口。例如当一次同步任务启动运行过程中,当该库存在其他数据写入方写入数据时,MysqlReader完全不会获取到写入更新数据,这是由于数据库本身的快照特性决定的。关于数据库快照特性,请参看[MVCC Wikipedia](https://en.wikipedia.org/wiki/Multiversion_concurrency_control)
+
+上述是在MysqlReader单线程模型下数据同步一致性的特性,由于MysqlReader可以根据用户配置信息使用了并发数据抽取,因此不能严格保证数据一致性:当MysqlReader根据splitPk进行数据切分后,会先后启动多个并发任务完成数据同步。由于多个并发任务相互之间不属于同一个读事务,同时多个并发任务存在时间间隔。因此这份数据并不是`完整的`、`一致的`数据快照信息。
+
+针对多线程的一致性快照需求,在技术上目前无法实现,只能从工程角度解决,工程化的方式存在取舍,我们提供几个解决思路给用户,用户可以自行选择:
+
+1. 使用单线程同步,即不再进行数据切片。缺点是速度比较慢,但是能够很好保证一致性。
+
+2. 关闭其他数据写入方,保证当前数据为静态数据,例如,锁表、关闭备库同步等等。缺点是可能影响在线业务。
+
+### 5.3 数据库编码问题
+
+Mysql本身的编码设置非常灵活,包括指定编码到库、表、字段级别,甚至可以均不同编码。优先级从高到低为字段、表、库、实例。我们不推荐数据库用户设置如此混乱的编码,最好在库级别就统一到UTF-8。
+
+MysqlReader底层使用JDBC进行数据抽取,JDBC天然适配各类编码,并在底层进行了编码转换。因此MysqlReader不需用户指定编码,可以自动获取编码并转码。
+
+对于Mysql底层写入编码和其设定的编码不一致的混乱情况,MysqlReader对此无法识别,对此也无法提供解决方案,对于这类情况,`导出有可能为乱码`。
+
+### 5.4 增量数据同步
+
+MysqlReader使用JDBC SELECT语句完成数据抽取工作,因此可以使用SELECT...WHERE...进行增量数据抽取,方式有多种:
+
+* 数据库在线应用写入数据库时,填充modify字段为更改时间戳,包括新增、更新、删除(逻辑删)。对于这类应用,MysqlReader只需要WHERE条件跟上一同步阶段时间戳即可。
+* 对于新增流水型数据,MysqlReader可以WHERE条件后跟上一阶段最大自增ID即可。
+
+对于业务上无字段区分新增、修改数据情况,MysqlReader也无法进行增量数据同步,只能同步全量数据。
+
+### 5.5 Sql安全性
+
+MysqlReader提供querySql语句交给用户自己实现SELECT抽取语句,MysqlReader本身对querySql不做任何安全性校验。这块交由DataX用户方自己保证。
+
+## 6 FAQ
+
+***
+
+**Q: MysqlReader同步报错,报错信息为XXX**
+
+ A: 网络或者权限问题,请使用mysql命令行测试:
+
+ mysql -u -p -h -D -e "select * from <表名>"
+
+如果上述命令也报错,那可以证实是环境问题,请联系你的DBA。
+
+
diff --git a/rdb2graph/mysqlreader4graph/pom.xml b/rdb2graph/mysqlreader4graph/pom.xml
new file mode 100644
index 00000000..a6238e30
--- /dev/null
+++ b/rdb2graph/mysqlreader4graph/pom.xml
@@ -0,0 +1,84 @@
+
+
+ 4.0.0
+
+ com.leehom.arch.datax.plugin
+ rdb2graph-parent
+ ${revision}
+
+ mysqlreader4graph
+ jar
+
+
+
+ com.alibaba.datax
+ datax-common
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+ ch.qos.logback
+ logback-classic
+
+
+
+ mysql
+ mysql-connector-java
+ ${mysql.driver.version}
+
+
+
+ com.leehom.arch.datax.plugin
+ rdb2graph-rdbms-util
+ ${revision}
+
+
+ com.leehom.arch.datax.plugin
+ rdb2graph-common
+ ${revision}
+
+
+
+
+
+
+
+ maven-compiler-plugin
+
+ ${java.version}
+ ${java.version}
+ ${project-sourceEncoding}
+
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
diff --git a/rdb2graph/mysqlreader4graph/src/main/assembly/package.xml b/rdb2graph/mysqlreader4graph/src/main/assembly/package.xml
new file mode 100644
index 00000000..1d81c407
--- /dev/null
+++ b/rdb2graph/mysqlreader4graph/src/main/assembly/package.xml
@@ -0,0 +1,35 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/reader/mysqlreader4graph
+
+
+ target/
+
+ mysqlreader4graph-1.0.0-SNAPSHOT.jar
+
+ plugin/reader/mysqlreader4graph
+
+
+
+
+
+ false
+ plugin/reader/mysqlreader4graph/libs
+ runtime
+
+
+
diff --git a/rdb2graph/mysqlreader4graph/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/reader/mysqlreader/MysqlReader4Graph.java b/rdb2graph/mysqlreader4graph/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/reader/mysqlreader/MysqlReader4Graph.java
new file mode 100644
index 00000000..5e33c212
--- /dev/null
+++ b/rdb2graph/mysqlreader4graph/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/reader/mysqlreader/MysqlReader4Graph.java
@@ -0,0 +1,110 @@
+package com.leehom.arch.datax.plugin.rdb2graph.reader.mysqlreader;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.datax.common.plugin.RecordSender;
+import com.alibaba.datax.common.spi.Reader;
+import com.alibaba.datax.common.util.Configuration;
+import com.leehom.arch.datax.plugin.rdb2graph.rdbms.reader.CommonRdbms2GraphReader;
+import com.leehom.arch.datax.plugin.rdb2graph.rdbms.reader.Constant;
+import com.leehom.arch.datax.plugin.rdb2graph.rdbms.util.DataBaseType;
+
+/**
+ * @类名: MysqlReader4Graph
+ * @说明: mysql读入
+ * 1.表数据同步,datax table模式,schema表写入datax的配置
+ 2.表关系同步,按表连接图分组,一对关系一组,表连接关系生成querySql,写入datax的querySql模式配置
+ *
+ * @author leehom
+ * @Date 2022年4月30日 上午12:38:34
+ * 修改记录:
+ *
+ * @see
+ */
+public class MysqlReader4Graph extends Reader {
+
+ private static final DataBaseType DATABASE_TYPE = DataBaseType.MySql;
+
+ public static class Job extends Reader.Job {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(Job.class);
+
+ private Configuration originalConfig = null;
+ private CommonRdbms2GraphReader.Job commonRdbmsReaderJob;
+
+ @Override
+ public void init() {
+ this.originalConfig = super.getPluginJobConf();
+
+ Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE);
+ if (userConfigedFetchSize != null) {
+ LOG.warn("对 mysqlreader 不需要配置 fetchSize, mysqlreader 将会忽略这项配置. 如果您不想再看到此警告,请去除fetchSize 配置.");
+ }
+
+ this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);
+
+ this.commonRdbmsReaderJob = new CommonRdbms2GraphReader.Job(DATABASE_TYPE);
+ this.commonRdbmsReaderJob.init(this.originalConfig);
+ }
+
+ @Override
+ public void preCheck(){
+ init();
+ this.commonRdbmsReaderJob.preCheck(this.originalConfig,DATABASE_TYPE);
+
+ }
+
+ @Override
+ public List split(int adviceNumber) {
+ return this.commonRdbmsReaderJob.split(this.originalConfig, adviceNumber);
+ }
+
+ @Override
+ public void post() {
+ this.commonRdbmsReaderJob.post(this.originalConfig);
+ }
+
+ @Override
+ public void destroy() {
+ this.commonRdbmsReaderJob.destroy(this.originalConfig);
+ }
+
+ }
+
+ public static class Task extends Reader.Task {
+
+ private Configuration readerSliceConfig;
+ private CommonRdbms2GraphReader.Task commonRdbmsReaderTask;
+
+ @Override
+ public void init() {
+ this.readerSliceConfig = super.getPluginJobConf();
+ this.commonRdbmsReaderTask = new CommonRdbms2GraphReader.Task(DATABASE_TYPE,super.getTaskGroupId(), super.getTaskId());
+ this.commonRdbmsReaderTask.init(this.readerSliceConfig);
+
+ }
+
+ @Override
+ public void startRead(RecordSender recordSender) {
+ int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);
+
+ this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
+ super.getTaskPluginCollector(), fetchSize);
+ }
+
+ @Override
+ public void post() {
+ this.commonRdbmsReaderTask.post(this.readerSliceConfig);
+ }
+
+ @Override
+ public void destroy() {
+ this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);
+ }
+
+ }
+
+}
diff --git a/rdb2graph/mysqlreader4graph/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/reader/mysqlreader/MysqlReaderErrorCode.java b/rdb2graph/mysqlreader4graph/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/reader/mysqlreader/MysqlReaderErrorCode.java
new file mode 100644
index 00000000..8cda39a9
--- /dev/null
+++ b/rdb2graph/mysqlreader4graph/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/reader/mysqlreader/MysqlReaderErrorCode.java
@@ -0,0 +1,31 @@
+package com.leehom.arch.datax.plugin.rdb2graph.reader.mysqlreader;
+
+import com.alibaba.datax.common.spi.ErrorCode;
+
+public enum MysqlReaderErrorCode implements ErrorCode {
+ ;
+
+ private final String code;
+ private final String description;
+
+ private MysqlReaderErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Code:[%s], Description:[%s]. ", this.code,
+ this.description);
+ }
+}
diff --git a/rdb2graph/mysqlreader4graph/src/main/resources/plugin.json b/rdb2graph/mysqlreader4graph/src/main/resources/plugin.json
new file mode 100644
index 00000000..3844d545
--- /dev/null
+++ b/rdb2graph/mysqlreader4graph/src/main/resources/plugin.json
@@ -0,0 +1,6 @@
+{
+ "name": "mysqlreader4graph",
+ "class": "com.leehom.arch.datax.plugin.rdb2graph.reader.mysqlreader.MysqlReader4Graph",
+ "description": "",
+ "developer": "leehom"
+}
\ No newline at end of file
diff --git a/rdb2graph/mysqlreader4graph/src/main/resources/plugin_job_template.json b/rdb2graph/mysqlreader4graph/src/main/resources/plugin_job_template.json
new file mode 100644
index 00000000..c330e9fb
--- /dev/null
+++ b/rdb2graph/mysqlreader4graph/src/main/resources/plugin_job_template.json
@@ -0,0 +1,17 @@
+{
+ "name": "mysqlreader4graph",
+ "parameter": {
+ "username": "",
+ "password": "",
+ "phase": "",
+ "schemaUri": "",
+ "column": [],
+ "connection": [
+ {
+ "jdbcUrl": [],
+ "table": []
+ }
+ ],
+ "where": ""
+ }
+}
\ No newline at end of file
diff --git a/rdb2graph/neo4jwriter/README.md b/rdb2graph/neo4jwriter/README.md
new file mode 100644
index 00000000..fd40910d
--- /dev/null
+++ b/rdb2graph/neo4jwriter/README.md
@@ -0,0 +1,4 @@
+
+
+
+
diff --git a/rdb2graph/neo4jwriter/pom.xml b/rdb2graph/neo4jwriter/pom.xml
new file mode 100644
index 00000000..d19df925
--- /dev/null
+++ b/rdb2graph/neo4jwriter/pom.xml
@@ -0,0 +1,84 @@
+
+
+
+ com.leehom.arch.datax.plugin
+ rdb2graph-parent
+ ${revision}
+
+ 4.0.0
+
+ neo4jwriter
+
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax.version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+ ch.qos.logback
+ logback-classic
+
+
+ com.alibaba
+ fastjson
+
+
+ junit
+ junit
+ test
+
+
+ com.leehom.arch.datax.plugin
+ rdb2graph-scanner
+ 1.0.0-SNAPSHOT
+
+
+ com.leehom.arch.datax.plugin
+ rdb2graph-common
+ 1.0.0-SNAPSHOT
+
+
+
+
+
+
+ maven-compiler-plugin
+
+ ${java.version}
+ ${java.version}
+ ${project-sourceEncoding}
+
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
diff --git a/rdb2graph/neo4jwriter/src/main/assembly/package.xml b/rdb2graph/neo4jwriter/src/main/assembly/package.xml
new file mode 100644
index 00000000..2d0cb951
--- /dev/null
+++ b/rdb2graph/neo4jwriter/src/main/assembly/package.xml
@@ -0,0 +1,34 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+
+ plugin/writer/neo4jwriter
+
+
+ target/
+
+ neo4jwriter-1.0.0-SNAPSHOT.jar
+
+ plugin/writer/neo4jwriter
+
+
+
+
+
+ false
+ plugin/writer/neo4jwriter/libs
+ runtime
+
+
+
diff --git a/rdb2graph/neo4jwriter/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/writer/neo4jwriter/Key.java b/rdb2graph/neo4jwriter/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/writer/neo4jwriter/Key.java
new file mode 100644
index 00000000..ae397da2
--- /dev/null
+++ b/rdb2graph/neo4jwriter/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/writer/neo4jwriter/Key.java
@@ -0,0 +1,121 @@
+package com.leehom.arch.datax.plugin.rdb2graph.writer.neo4jwriter;
+
+import com.alibaba.datax.common.util.Configuration;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @类名: Key
+ * @说明: 配置项
+ *
+ * @author leehom
+ * @Date 2022年4月27日 下午2:49:13
+ * 修改记录:
+ *
+ * @see
+ */
+public final class Key {
+
+ public static String database(Configuration conf) {
+ return conf.getString("database", "");
+ }
+
+ public static String schemaUri(Configuration conf) {
+ return conf.getString("schemaUri", "");
+ }
+
+ public static String uri(Configuration conf) {
+ return conf.getString("uri", "");
+ }
+
+ public static String userName(Configuration conf) {
+ return conf.getString("username", "");
+ }
+
+ public static String password(Configuration conf) {
+ return conf.getString("password", "");
+ }
+
+ public static int batchSize(Configuration conf) {
+ return conf.getInt("batchSize", 1000);
+ }
+
+ public static int getTrySize(Configuration conf) {
+ return conf.getInt("trySize", 30);
+ }
+
+ public static int getTimeout(Configuration conf) {
+ return conf.getInt("timeout", 600000);
+ }
+
+ public static boolean isCleanup(Configuration conf) {
+ return conf.getBool("cleanup", false);
+ }
+
+ public static boolean isDiscovery(Configuration conf) {
+ return conf.getBool("discovery", false);
+ }
+
+ public static boolean isCompression(Configuration conf) {
+ return conf.getBool("compression", true);
+ }
+
+ public static boolean isMultiThread(Configuration conf) {
+ return conf.getBool("multiThread", true);
+ }
+
+ public static String getIndexName(Configuration conf) {
+ return conf.getNecessaryValue("index", Neo4jWriterErrorCode.BAD_CONFIG_VALUE);
+ }
+
+ public static String getTypeName(Configuration conf) {
+ String indexType = conf.getString("indexType");
+ if(StringUtils.isBlank(indexType)){
+ indexType = conf.getString("type", getIndexName(conf));
+ }
+ return indexType;
+ }
+
+
+ public static boolean isIgnoreWriteError(Configuration conf) {
+ return conf.getBool("ignoreWriteError", false);
+ }
+
+ public static boolean isIgnoreParseError(Configuration conf) {
+ return conf.getBool("ignoreParseError", true);
+ }
+
+
+ public static boolean isHighSpeedMode(Configuration conf) {
+ if ("highspeed".equals(conf.getString("mode", ""))) {
+ return true;
+ }
+ return false;
+ }
+
+ public static String getAlias(Configuration conf) {
+ return conf.getString("alias", "");
+ }
+
+ public static boolean isNeedCleanAlias(Configuration conf) {
+ String mode = conf.getString("aliasMode", "append");
+ if ("exclusive".equals(mode)) {
+ return true;
+ }
+ return false;
+ }
+
+ public static Map getSettings(Configuration conf) {
+ return conf.getMap("settings", new HashMap());
+ }
+
+ public static String getSplitter(Configuration conf) {
+ return conf.getString("splitter", "-,-");
+ }
+
+ public static boolean getDynamic(Configuration conf) {
+ return conf.getBool("dynamic", false);
+ }
+}
diff --git a/rdb2graph/neo4jwriter/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/writer/neo4jwriter/Neo4jWriter.java b/rdb2graph/neo4jwriter/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/writer/neo4jwriter/Neo4jWriter.java
new file mode 100644
index 00000000..edf6aef5
--- /dev/null
+++ b/rdb2graph/neo4jwriter/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/writer/neo4jwriter/Neo4jWriter.java
@@ -0,0 +1,424 @@
+package com.leehom.arch.datax.plugin.rdb2graph.writer.neo4jwriter;
+
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.sql.JDBCType;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.reflect.MethodUtils;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Query;
+import org.neo4j.driver.exceptions.Neo4jException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.element.Column.Type;
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.spi.Writer;
+import com.alibaba.datax.common.util.Configuration;
+import com.leehom.arch.datax.plugin.rdb2graph.common.ByteAndStreamUtils;
+import com.leehom.arch.datax.plugin.rdb2graph.common.ResourceLoaderUtil;
+import com.leehom.arch.datax.plugin.rdb2graph.common.serializer.Serializer;
+import com.leehom.arch.datax.plugin.rdb2graph.scanner.config.ScannerSerializerConfig;
+import com.leehom.arch.datax.plugin.rdb2graph.scanner.neo4j.ds.Neo4jDao;
+import com.leehom.arch.datax.plugin.rdb2graph.scanner.neo4j.ds.Neo4jQueryPattern;
+import com.leehom.arch.datax.plugin.rdb2graph.scanner.neo4j.ds.ParamsUtils;
+import com.leehom.arch.datax.plugin.rdb2graph.scanner.rdb.DbSchema;
+import com.leehom.arch.datax.plugin.rdb2graph.scanner.rdb.FieldMetadata;
+import com.leehom.arch.datax.plugin.rdb2graph.scanner.rdb.TableMetadata;
+import com.leehom.arch.datax.plugin.rdb2graph.scanner.rdb.constraint.fk.FKConstraintMetadata;
+import com.leehom.arch.datax.plugin.rdb2graph.scanner.rdb.constraint.fk.FKField;
+
+/**
+ * @类名: Neo4jWriter
+ * @说明: neo4j写入,支持两个阶段
+ *
+ *
+ * @author leehom
+ * @Date 2022年4月21日 下午11:14:32
+ * 修改记录:
+ *
+ * TODO
+ * 1. 支持写入模式,@WriteMode replace/update
+ *
+ * @see
+ */
+public class Neo4jWriter extends Writer {
+
+ public static class Job extends Writer.Job {
+
+ private static final Logger log = LoggerFactory.getLogger(Job.class);
+
+ private Configuration conf = null;
+
+ // 初始化环境和配置,为客户端准备环境
+ @Override
+ public void init() {
+ this.conf = super.getPluginJobConf();
+ }
+
+ // 准备
+ // 这里可做schema转换
+ @Override
+ public void prepare() {
+ //
+ }
+
+ // 目标库只有一个,分片相当于并行写入,克隆即可
+ @Override
+ public List split(int mandatoryNumber) {
+ List configurations = new ArrayList(mandatoryNumber);
+ for (int i = 0; i < mandatoryNumber; i++) {
+ configurations.add(conf);
+ }
+ return configurations;
+ }
+
+ // 后处理
+ @Override
+ public void post() {
+ // nothing
+ }
+
+ // 释放资源
+ @Override
+ public void destroy() {
+ // nothing
+ }
+ }
+
+ public static class Task extends Writer.Task {
+
+ private static final Logger log = LoggerFactory.getLogger(Job.class);
+
+ private Configuration conf;
+ private Neo4jDao client = null;
+ private int batchSize; // 支持批量
+ private DbSchema rdbSchema; //
+ private WriteMode writeMode; // 写入模式,
+ private Serializer ser; //
+
+ @Override
+ public void init() {
+ //
+ this.conf = super.getPluginJobConf();
+ // 初始化 Neo4jDao
+ String uri = Key.uri(conf);
+ String un = Key.userName(conf);
+ String pw = Key.password(conf);
+ String db = Key.database(conf);
+ Driver driver = GraphDatabase.driver(uri, AuthTokens.basic(un, pw));
+ client = new Neo4jDao();
+ client.setDriver(driver);
+ client.setDatabase(db);
+ // 初始化数据库schema序列化器
+ ser = ScannerSerializerConfig.rdbSchemaXmlSerializer();
+ // 写入批量
+ batchSize = Key.batchSize(conf);
+ }
+
+ @Override
+ public void prepare() {
+ init(); // 作业容器并没有调用init方法
+ // 载入关系数据库schema
+ String schemaUri = Key.schemaUri(conf);
+ InputStream is;
+ try {
+ is = ResourceLoaderUtil.getResourceStream(schemaUri);
+ byte[] bytes = ByteAndStreamUtils.StreamToBytes(is);
+ rdbSchema = (DbSchema)ser.Unmarshal(bytes);
+ } catch (Exception e) {
+ DataXException.asDataXException(Neo4jWriterErrorCode.ERROR_LOAD_RDBSCHEMA, e);
+ }
+
+ }
+
+ // 写入, 两个场景
+ // 1. 表记录 TableRecord 2. 关系记录 RelRecord
+ @Override
+ public void startWrite(RecordReceiver recordReceiver) {
+ List writerBuffer = new ArrayList(this.batchSize);
+ Record record = null;
+ // 交换器(Exchanger)接收到TerminateRecord返回null
+ while ((record = recordReceiver.getFromReader()) != null) {
+ writerBuffer.add(record);
+ if (writerBuffer.size() >= this.batchSize) {
+ this.doWrite(writerBuffer);
+ writerBuffer.clear();
+ }
+ }
+ if (!writerBuffer.isEmpty()) {
+ this.doWrite(writerBuffer);
+ writerBuffer.clear();
+ }
+
+ }
+
+ // 写入分流 节点 / 关系
+ private void doWrite(List writerBuffer) {
+ Record record = writerBuffer.get(0);
+ try {
+ if ("TableRecord".equals(record.getClass().getSimpleName())) {
+ doBatchWriteNode(writerBuffer);
+ return;
+ }
+ if ("RelRecord".equals(record.getClass().getSimpleName())) {
+ doBatchWriteRel(writerBuffer);
+ return;
+ }
+ // 系统sleep
+ try {
+ Thread.sleep(300);
+ } catch (InterruptedException e) {
+
+ }
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ // 理论上不会发送,只记录
+ log.error(e.getMessage());
+
+ } catch (DataXException e) {
+ // 字段类型不合法,记录,其他抛出
+ log.error(e.getMessage());
+
+ } catch (Neo4jException e) {
+ // neo4j异常
+ throw DataXException.asDataXException(Neo4jWriterErrorCode.WRONG_NEO4j_CLIENT,
+ Neo4jWriterErrorCode.WRONG_NEO4j_CLIENT.getDescription(), e);
+ }
+
+ }
+
+ // 批量写入
+ private void doBatchWriteNode(final List writerBuffer) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+ List queries = new ArrayList<>();
+ for (Record record : writerBuffer) {
+ String tn = (String) MethodUtils.invokeMethod(record, "getTable");
+ TableMetadata tbmd = rdbSchema.findTable(tn);
+ if (tbmd.isLinkTable())
+ continue;
+ // 节点属性
+ Map props = fillProperties(tbmd, record);
+ Query q = calcNodeWriteQuery(tbmd, props);
+ queries.add(q);
+ }
+ client.reTryRunInTransaction(queries, 5);
+
+ }
+
+ private void doBatchWriteRel(final List writerBuffer)
+ throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+ List queries = new ArrayList<>();
+ for (Record record : writerBuffer) {
+ // 表名称
+ String tn;
+ String fk;
+
+ tn = (String) MethodUtils.invokeMethod(record, "getFromTable");
+ fk = (String) MethodUtils.invokeMethod(record, "getFk");
+ TableMetadata tbmd = rdbSchema.findTable(tn);
+ // 关系起始表,外键
+ TableMetadata from;
+ TableMetadata to;
+ String fkName;
+ Map props;
+ // 连接表
+ if (tbmd.isLinkTable()) {
+ // 作为起点的外键,连接表有且仅有2外键,其中一个作为起点,另一个为关系终点
+ FKConstraintMetadata fromFkmd = tbmd.getLinkFrom();
+ from = fromFkmd.getRefTable();
+ //
+ FKConstraintMetadata toFkmd = tbmd.getFks().get(0).getFkName().equals(fromFkmd.getFkName())
+ ? tbmd.getFks().get(1)
+ : tbmd.getFks().get(0);
+ to = toFkmd.getRefTable();
+ fkName = tbmd.getName();
+ props = fillProperties(tbmd, record);
+ Query q = calcLinkRelWriteQuery(from, to, tbmd, fromFkmd, toFkmd, props);
+ queries.add(q);
+ } else {
+ from = tbmd;
+ FKConstraintMetadata fkmd = from.findFk(fk);
+ to = fkmd.getRefTable();
+ fkName = fkmd.getFkName();
+ props = new HashMap();
+ Query q = calcRelWriteQuery(from, to, record, fkName, props);
+ queries.add(q);
+ }
+ // 构建查询
+
+ }
+ client.reTryRunInTransaction(queries, 5);
+ }
+
+ // 构建写入 query, 写入模式
+ // @WriteMode
+ private Query calcNodeWriteQuery(TableMetadata tbmd, Map props) {
+ // insert
+ String propsStr = ParamsUtils.params2String(props);
+ String cql = MessageFormat.format(Neo4jQueryPattern.CREATE_NODE, tbmd.getName(), propsStr);
+ return new Query(cql);
+ }
+
+ private Query calcRelWriteQuery(TableMetadata from, TableMetadata to, Record record, String fkName, Map props) {
+ // 连接属性
+ String propsStr = ParamsUtils.params2String(props);
+ // 节点过滤条件,使用主键
+ String nodeWherePattern = "{0}.{1} = {2}";
+ List nodeWhereItems = new ArrayList<>();
+ //
+ List pkfs = from.getPk().getFields();
+ FKConstraintMetadata fk = from.findFk(fkName);
+ //
+ List fkfs = fk.getFkFields();
+ // from节点
+ int i = 0;
+ for(FieldMetadata f : pkfs) {
+ Column col = record.getColumn(i);
+ String item;
+ if(col.getType()==Type.INT || col.getType()==Type.LONG || col.getType()==Type.DOUBLE) {
+ item = MessageFormat.format(nodeWherePattern, "a", f.getName(), col.asLong().toString());
+ } else { // 其他,字符增加引号
+ item = MessageFormat.format(nodeWherePattern, "a", f.getName(), "'"+col.asString()+"'");
+ }
+ nodeWhereItems.add(item);
+ i++;
+ }
+ // to节点
+ for(FKField fkf : fkfs) {
+ Column col = record.getColumn(i);
+ String item;
+ if(col.getType()==Type.INT || col.getType()==Type.LONG || col.getType()==Type.DOUBLE) {
+ item = MessageFormat.format(nodeWherePattern, "b", fkf.getRefField().getName(), col.asLong().toString());
+ } else { // 其他,字符增加引号
+ item = MessageFormat.format(nodeWherePattern, "b", fkf.getRefField().getName(), "'"+col.asString()+"'");
+ }
+ nodeWhereItems.add(item);
+ i++;
+
+ }
+ String nodeWhere = Utils.relWhere("", nodeWhereItems, " and ");
+ //
+ String cql = MessageFormat.format(Neo4jQueryPattern.CREATE_REL,
+ from.getName(), to.getName(), nodeWhere.toString(), fkName, propsStr);
+ return new Query(cql);
+ }
+
+ // 连接表关系
+ private Query calcLinkRelWriteQuery(TableMetadata from, TableMetadata to, TableMetadata link,
+ FKConstraintMetadata fromFkmd, FKConstraintMetadata toFkmd,
+ Map props) {
+ // 连接属性
+ String propsStr = ParamsUtils.params2String(props);
+ // 节点过滤条件,使用主键
+ String nodeWherePattern = "{0}.{1} = {2}";
+ List nodeWhereItems = new ArrayList<>();
+ //
+ List fromFkFs = fromFkmd.getFkFields();
+ List toFkFs = toFkmd.getFkFields();
+ // from节点
+ for(FKField fkf : fromFkFs) {
+ String item;
+ // from<-link
+ // from表对应link外键名称
+ String fn = fkf.getRefField().getName();
+ String ln = fkf.getField().getName();
+ Object v = props.get(ln);
+ if(v instanceof Long || v instanceof Double) {
+ item = MessageFormat.format(nodeWherePattern, "a", fn, v.toString());
+ } else { // 其他,字符增加引号,
+ item = MessageFormat.format(nodeWherePattern, "a", fn, "'"+v.toString()+"'");
+ }
+ nodeWhereItems.add(item);
+
+ }
+ // to节点
+ for(FKField fkf : toFkFs) {
+ String item;
+ // link->to
+ // to表对应link外键名称
+ String fn = fkf.getRefField().getName();
+ String ln = fkf.getField().getName();
+ Object v = props.get(ln);
+ if(v instanceof Long || v instanceof Double) {
+ item = MessageFormat.format(nodeWherePattern, "b", fn, v.toString());
+ } else { // 其他,字符增加引号,
+ item = MessageFormat.format(nodeWherePattern, "b", fn, "'"+v.toString()+"'");
+ }
+ nodeWhereItems.add(item);
+ }
+ String nodeWhere = Utils.relWhere("", nodeWhereItems, " and ");
+ // from->to
+ String cql = MessageFormat.format(Neo4jQueryPattern.CREATE_REL,
+ from.getName(), to.getName(), nodeWhere.toString(), link.getName(), propsStr);
+ return new Query(cql);
+ }
+
+ // 节点或关系属性,类型转换
+ // 本地缓存,查找
+ protected Map fillProperties(TableMetadata tbmd, Record record) {
+ Map props = new HashMap<>();
+ int i = 0;
+ for(FieldMetadata fmd : tbmd.getFields()) {
+ String columnName = fmd.getName();
+ JDBCType jdbcType = fmd.getType();
+ Column column = record.getColumn(i);
+ /* BAD, NULL, INT, LONG, DOUBLE, STRING, BOOL, DATE, BYTES */
+ switch (column.getType()) {
+ case INT:
+ case LONG:
+ props.put(columnName, column.asLong());
+ break;
+ case DOUBLE:
+ props.put(columnName, column.asDouble());
+ break;
+ case STRING:
+ // 转义
+ String col = Utils.strFieldEscape(column.asString());
+ props.put(columnName, col);
+ break;
+ case BOOL:
+ props.put(fmd.getName(), column.asBoolean());
+ break;
+ case DATE:
+ Date date = column.asDate();
+ // LocalDateTime ldt = Utils.dateToLocalDateTime(date);
+ props.put(fmd.getName(), date);
+ break;
+ case BYTES:
+ log.warn(String.format("neo4j不支持二进制属性类型, 字段名:[%s], 字段类型:[%s]. ",
+ fmd.getName(),
+ jdbcType.getName()));
+ break;
+ default: // 其他,不支持类型
+ throw DataXException.asDataXException(Neo4jWriterErrorCode.UNSUPPORTED_TYPE,
+ String.format("neo4j不支持属性类型, 字段名:[%s], 字段类型:[%s]. ",
+ fmd.getName(),
+ jdbcType.getName()));
+ } // end switch
+ i++; // 下一个字段
+ } // end for tbmd
+ return props;
+ }
+
+ // 后处理
+ @Override
+ public void post() {
+ }
+
+ // 释放
+ @Override
+ public void destroy() {
+
+ }
+ }
+}
diff --git a/rdb2graph/neo4jwriter/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/writer/neo4jwriter/Neo4jWriterErrorCode.java b/rdb2graph/neo4jwriter/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/writer/neo4jwriter/Neo4jWriterErrorCode.java
new file mode 100644
index 00000000..87cb95a9
--- /dev/null
+++ b/rdb2graph/neo4jwriter/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/writer/neo4jwriter/Neo4jWriterErrorCode.java
@@ -0,0 +1,36 @@
+package com.leehom.arch.datax.plugin.rdb2graph.writer.neo4jwriter;
+
+import com.alibaba.datax.common.spi.ErrorCode;
+
+public enum Neo4jWriterErrorCode implements ErrorCode {
+ BAD_CONFIG_VALUE("Neo4jWriter-00", "配置的值不合法."),
+ ERROR_LOAD_RDBSCHEMA("Neo4jWriter-01", "载入关系模式异常."),
+ UNSUPPORTED_TYPE("Neo4jWriter-02", "不支持字段类型."),
+ WRONG_RECORD_TYPE("Neo4jWriter-03, {}, {}", "错误记录类型."),
+ WRONG_NEO4j_CLIENT("Neo4jWriter-04", "neo4j client访问异常."),
+ ;
+
+ private final String code;
+ private final String description;
+
+ Neo4jWriterErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Code:[%s], Description:[%s]. ", this.code,
+ this.description);
+ }
+}
\ No newline at end of file
diff --git a/rdb2graph/neo4jwriter/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/writer/neo4jwriter/Utils.java b/rdb2graph/neo4jwriter/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/writer/neo4jwriter/Utils.java
new file mode 100644
index 00000000..cfb59bf0
--- /dev/null
+++ b/rdb2graph/neo4jwriter/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/writer/neo4jwriter/Utils.java
@@ -0,0 +1,61 @@
+package com.leehom.arch.datax.plugin.rdb2graph.writer.neo4jwriter;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.leehom.arch.datax.plugin.rdb2graph.common.StringUtils;
+
+public class Utils {
+
+ /** 字段分割*/
+ public static final String FIELD_SEQ = " and ";
+
+ // 关系wehre
+ public static String relWhere(List fields) {
+ return relWhere(null, fields);
+ }
+
+ public static String relWhere(String prefix, List fields) {
+ return relWhere(prefix, fields, FIELD_SEQ);
+ }
+
+ public static String relWhere(String prefix, List fields, String seq) {
+ if (fields == null || fields.size() == 0)
+ return "";
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < fields.size(); i++) {
+ String tmp = StringUtils.isNotEmpty(prefix) ? prefix+"."+fields.get(i) : fields.get(i) + seq;
+ sb.append(tmp);
+ }
+ // 去掉最后seq
+ sb.delete(sb.length()-seq.length(), sb.length());
+ return sb.toString();
+ }
+
+ private static Map ESCAPES = new HashMap();
+
+ static {
+ // '\' -> '\\'
+ // '"' -> '""'
+ ESCAPES.put("\\", "\\\\\\\\");
+ ESCAPES.put("\"", "\"\"");
+ }
+
+ // 字符字段转义
+ public static String strFieldEscape(String fieldString) {
+ if (StringUtils.isEmpty(fieldString))
+ return fieldString;
+ String ed = fieldString;
+ for (String key : ESCAPES.keySet()) {
+ if(ed.contains(key)) {
+ if("\\".equals(key))
+ ed = ed.replaceAll("\\\\", ESCAPES.get(key));
+ else
+ ed = ed.replaceAll(key, ESCAPES.get(key));
+ }
+ }
+ return ed;
+ }
+
+}
diff --git a/rdb2graph/neo4jwriter/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/writer/neo4jwriter/WriteMode.java b/rdb2graph/neo4jwriter/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/writer/neo4jwriter/WriteMode.java
new file mode 100644
index 00000000..c8e00b4a
--- /dev/null
+++ b/rdb2graph/neo4jwriter/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/writer/neo4jwriter/WriteMode.java
@@ -0,0 +1,26 @@
+/**
+ * %%
+ * %%
+ */
+package com.leehom.arch.datax.plugin.rdb2graph.writer.neo4jwriter;
+
+/**
+ * @类名: WriteMode
+ * @说明: 写入模式
+ *
+ * @author leehom
+ * @Date 2020年9月3日 下午6:01:12s
+ * 修改记录:
+ *
+ * @see
+ */
+public enum WriteMode {
+
+ // CLEAR_BEFORE_INSERT, // 清除
+ INSERT, // 直接写入
+ INSERT_NOT_EXIST,
+ REPALCE, // 每次删除后插入
+ UPDATE // 更新模式, 不存在插入
+ ;
+
+}
diff --git a/rdb2graph/neo4jwriter/src/main/resources/plugin.json b/rdb2graph/neo4jwriter/src/main/resources/plugin.json
new file mode 100644
index 00000000..2b4bff47
--- /dev/null
+++ b/rdb2graph/neo4jwriter/src/main/resources/plugin.json
@@ -0,0 +1,6 @@
+{
+ "name": "neo4jwriter",
+ "class": "com.leehom.arch.datax.plugin.rdb2graph.writer.neo4jwriter.Neo4jWriter",
+ "description": "",
+ "developer": "leehom"
+}
\ No newline at end of file
diff --git a/rdb2graph/neo4jwriter/src/test/java/com/leehom/arch/datax/rdb2graph/writer/neo4jwriter/EscapeTest.java b/rdb2graph/neo4jwriter/src/test/java/com/leehom/arch/datax/rdb2graph/writer/neo4jwriter/EscapeTest.java
new file mode 100644
index 00000000..564a885e
--- /dev/null
+++ b/rdb2graph/neo4jwriter/src/test/java/com/leehom/arch/datax/rdb2graph/writer/neo4jwriter/EscapeTest.java
@@ -0,0 +1,13 @@
+package com.leehom.arch.datax.rdb2graph.writer.neo4jwriter;
+
+import com.leehom.arch.datax.plugin.rdb2graph.writer.neo4jwriter.Utils;
+
+public class EscapeTest {
+
+ public static void main(String[] args) {
+ String escapeStr = "xx\\Gxx";
+ String ed =Utils.strFieldEscape(escapeStr);
+ System.out.print(ed);
+
+ }
+}
diff --git a/rdb2graph/oraclereader4graph/oraclereader.md b/rdb2graph/oraclereader4graph/oraclereader.md
new file mode 100644
index 00000000..bf35ff72
--- /dev/null
+++ b/rdb2graph/oraclereader4graph/oraclereader.md
@@ -0,0 +1,350 @@
+
+# OracleReader 插件文档
+
+
+___
+
+
+## 1 快速介绍
+
+OracleReader插件实现了从Oracle读取数据。在底层实现上,OracleReader通过JDBC连接远程Oracle数据库,并执行相应的sql语句将数据从Oracle库中SELECT出来。
+
+## 2 实现原理
+
+简而言之,OracleReader通过JDBC连接器连接到远程的Oracle数据库,并根据用户配置的信息生成查询SELECT SQL语句并发送到远程Oracle数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。
+
+对于用户配置Table、Column、Where的信息,OracleReader将其拼接为SQL语句发送到Oracle数据库;对于用户配置querySql信息,Oracle直接将其发送到Oracle数据库。
+
+
+## 3 功能说明
+
+### 3.1 配置样例
+
+* 配置一个从Oracle数据库同步抽取数据到本地的作业:
+
+```
+{
+ "job": {
+ "setting": {
+ "speed": {
+ //设置传输速度 byte/s 尽量逼近这个速度但是不高于它.
+ // channel 表示通道数量,byte表示通道速度,如果单通道速度1MB,配置byte为1048576表示一个channel
+ "byte": 1048576
+ },
+ //出错限制
+ "errorLimit": {
+ //先选择record
+ "record": 0,
+ //百分比 1表示100%
+ "percentage": 0.02
+ }
+ },
+ "content": [
+ {
+ "reader": {
+ "name": "oraclereader",
+ "parameter": {
+ // 数据库连接用户名
+ "username": "root",
+ // 数据库连接密码
+ "password": "root",
+ "column": [
+ "id","name"
+ ],
+ //切分主键
+ "splitPk": "db_id",
+ "connection": [
+ {
+ "table": [
+ "table"
+ ],
+ "jdbcUrl": [
+ "jdbc:oracle:thin:@[HOST_NAME]:PORT:[DATABASE_NAME]"
+ ]
+ }
+ ]
+ }
+ },
+ "writer": {
+ //writer类型
+ "name": "streamwriter",
+ // 是否打印内容
+ "parameter": {
+ "print": true
+ }
+ }
+ }
+ ]
+ }
+}
+
+```
+
+* 配置一个自定义SQL的数据库同步任务到本地内容的作业:
+
+```
+{
+ "job": {
+ "setting": {
+ "speed": {
+ "channel": 5
+ }
+ },
+ "content": [
+ {
+ "reader": {
+ "name": "oraclereader",
+ "parameter": {
+ "username": "root",
+ "password": "root",
+ "where": "",
+ "connection": [
+ {
+ "querySql": [
+ "select db_id,on_line_flag from db_info where db_id < 10"
+ ],
+ "jdbcUrl": [
+ "jdbc:oracle:thin:@[HOST_NAME]:PORT:[DATABASE_NAME]"
+ ]
+ }
+ ]
+ }
+ },
+ "writer": {
+ "name": "streamwriter",
+ "parameter": {
+ "visible": false,
+ "encoding": "UTF-8"
+ }
+ }
+ }
+ ]
+ }
+}
+```
+
+
+### 3.2 参数说明
+
+* **jdbcUrl**
+
+ * 描述:描述的是到对端数据库的JDBC连接信息,使用JSON的数组描述,并支持一个库填写多个连接地址。之所以使用JSON数组描述连接信息,是因为阿里集团内部支持多个IP探测,如果配置了多个,OracleReader可以依次探测ip的可连接性,直到选择一个合法的IP。如果全部连接失败,OracleReader报错。 注意,jdbcUrl必须包含在connection配置单元中。对于阿里集团外部使用情况,JSON数组填写一个JDBC连接即可。
+
+ jdbcUrl按照Oracle官方规范,并可以填写连接附件控制信息。具体请参看[Oracle官方文档](http://www.oracle.com/technetwork/database/enterprise-edition/documentation/index.html)。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **username**
+
+ * 描述:数据源的用户名
+
+ * 必选:是
+
+ * 默认值:无
+
+* **password**
+
+ * 描述:数据源指定用户名的密码
+
+ * 必选:是
+
+ * 默认值:无
+
+* **table**
+
+ * 描述:所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构,OracleReader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **column**
+
+ * 描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用\*代表默认使用所有列配置,例如['\*']。
+
+ 支持列裁剪,即列可以挑选部分列进行导出。
+
+ 支持列换序,即列可以不按照表schema信息进行导出。
+
+ 支持常量配置,用户需要按照JSON格式:
+ ["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"]
+ id为普通列名,\`table\`为包含保留在的列名,1为整形数字常量,'bazhen.csy'为字符串常量,null为空指针,to_char(a + 1)为表达式,2.3为浮点数,true为布尔值。
+
+ Column必须显示填写,不允许为空!
+
+ * 必选:是
+
+ * 默认值:无
+
+* **splitPk**
+
+ * 描述:OracleReader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。
+
+ 推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。
+
+ 目前splitPk仅支持整形、字符串型数据切分,`不支持浮点、日期等其他类型`。如果用户指定其他非支持类型,OracleReader将报错!
+
+ splitPk如果不填写,将视作用户不对单表进行切分,OracleReader使用单通道同步全量数据。
+
+ * 必选:否
+
+ * 默认值:无
+
+* **where**
+
+ * 描述:筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > $bizdate 。注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。
+
+ where条件可以有效地进行业务增量同步。
+
+ * 必选:否
+
+ * 默认值:无
+
+* **querySql**
+
+ * 描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id
+
+ `当用户配置querySql时,OracleReader直接忽略table、column、where条件的配置`。
+
+ * 必选:否
+
+ * 默认值:无
+
+* **fetchSize**
+
+ * 描述:该配置项定义了插件和数据库服务器端每次批量数据获取条数,该值决定了DataX和服务器端的网络交互次数,能够较大的提升数据抽取性能。
+
+ `注意,该值过大(>2048)可能造成DataX进程OOM。`。
+
+ * 必选:否
+
+ * 默认值:1024
+
+* **session**
+
+ * 描述:控制写入数据的时间格式,时区等的配置,如果表中有时间字段,配置该值以明确告知写入 oracle 的时间格式。通常配置的参数为:NLS_DATE_FORMAT,NLS_TIME_FORMAT。其配置的值为 json 格式,例如:
+```
+"session": [
+ "alter session set NLS_DATE_FORMAT='yyyy-mm-dd hh24:mi:ss'",
+ "alter session set NLS_TIMESTAMP_FORMAT='yyyy-mm-dd hh24:mi:ss'",
+ "alter session set NLS_TIMESTAMP_TZ_FORMAT='yyyy-mm-dd hh24:mi:ss'",
+ "alter session set TIME_ZONE='US/Pacific'"
+ ]
+```
+ `(注意"是 " 的转义字符串)`。
+
+ * 必选:否
+
+ * 默认值:无
+
+
+### 3.3 类型转换
+
+目前OracleReader支持大部分Oracle类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。
+
+下面列出OracleReader针对Oracle类型转换列表:
+
+
+| DataX 内部类型| Oracle 数据类型 |
+| -------- | ----- |
+| Long |NUMBER,INTEGER,INT,SMALLINT|
+| Double |NUMERIC,DECIMAL,FLOAT,DOUBLE PRECISION,REAL|
+| String |LONG,CHAR,NCHAR,VARCHAR,VARCHAR2,NVARCHAR2,CLOB,NCLOB,CHARACTER,CHARACTER VARYING,CHAR VARYING,NATIONAL CHARACTER,NATIONAL CHAR,NATIONAL CHARACTER VARYING,NATIONAL CHAR VARYING,NCHAR VARYING |
+| Date |TIMESTAMP,DATE |
+| Boolean |bit, bool |
+| Bytes |BLOB,BFILE,RAW,LONG RAW |
+
+
+
+请注意:
+
+* `除上述罗列字段类型外,其他类型均不支持`。
+
+
+## 4 性能报告
+
+### 4.1 环境准备
+
+#### 4.1.1 数据特征
+
+为了模拟线上真实数据,我们设计两个Oracle数据表,分别为:
+
+#### 4.1.2 机器参数
+
+* 执行DataX的机器参数为:
+
+* Oracle数据库机器参数为:
+
+### 4.2 测试报告
+
+#### 4.2.1 表1测试报告
+
+
+| 并发任务数| DataX速度(Rec/s)|DataX流量|网卡流量|DataX运行负载|DB运行负载|
+|--------| --------|--------|--------|--------|--------|
+|1| DataX 统计速度(Rec/s)|DataX统计流量|网卡流量|DataX运行负载|DB运行负载|
+
+## 5 约束限制
+
+### 5.1 主备同步数据恢复问题
+
+主备同步问题指Oracle使用主从灾备,备库从主库不间断通过binlog恢复数据。由于主备数据同步存在一定的时间差,特别在于某些特定情况,例如网络延迟等问题,导致备库同步恢复的数据与主库有较大差别,导致从备库同步的数据不是一份当前时间的完整镜像。
+
+针对这个问题,我们提供了preSql功能,该功能待补充。
+
+### 5.2 一致性约束
+
+Oracle在数据存储划分中属于RDBMS系统,对外可以提供强一致性数据查询接口。例如当一次同步任务启动运行过程中,当该库存在其他数据写入方写入数据时,OracleReader完全不会获取到写入更新数据,这是由于数据库本身的快照特性决定的。关于数据库快照特性,请参看[MVCC Wikipedia](https://en.wikipedia.org/wiki/Multiversion_concurrency_control)
+
+上述是在OracleReader单线程模型下数据同步一致性的特性,由于OracleReader可以根据用户配置信息使用了并发数据抽取,因此不能严格保证数据一致性:当OracleReader根据splitPk进行数据切分后,会先后启动多个并发任务完成数据同步。由于多个并发任务相互之间不属于同一个读事务,同时多个并发任务存在时间间隔。因此这份数据并不是`完整的`、`一致的`数据快照信息。
+
+针对多线程的一致性快照需求,在技术上目前无法实现,只能从工程角度解决,工程化的方式存在取舍,我们提供几个解决思路给用户,用户可以自行选择:
+
+1. 使用单线程同步,即不再进行数据切片。缺点是速度比较慢,但是能够很好保证一致性。
+
+2. 关闭其他数据写入方,保证当前数据为静态数据,例如,锁表、关闭备库同步等等。缺点是可能影响在线业务。
+
+### 5.3 数据库编码问题
+
+
+OracleReader底层使用JDBC进行数据抽取,JDBC天然适配各类编码,并在底层进行了编码转换。因此OracleReader不需用户指定编码,可以自动获取编码并转码。
+
+对于Oracle底层写入编码和其设定的编码不一致的混乱情况,OracleReader对此无法识别,对此也无法提供解决方案,对于这类情况,`导出有可能为乱码`。
+
+### 5.4 增量数据同步
+
+OracleReader使用JDBC SELECT语句完成数据抽取工作,因此可以使用SELECT...WHERE...进行增量数据抽取,方式有多种:
+
+* 数据库在线应用写入数据库时,填充modify字段为更改时间戳,包括新增、更新、删除(逻辑删)。对于这类应用,OracleReader只需要WHERE条件跟上一同步阶段时间戳即可。
+* 对于新增流水型数据,OracleReader可以WHERE条件后跟上一阶段最大自增ID即可。
+
+对于业务上无字段区分新增、修改数据情况,OracleReader也无法进行增量数据同步,只能同步全量数据。
+
+### 5.5 Sql安全性
+
+OracleReader提供querySql语句交给用户自己实现SELECT抽取语句,OracleReader本身对querySql不做任何安全性校验。这块交由DataX用户方自己保证。
+
+## 6 FAQ
+
+***
+
+**Q: OracleReader同步报错,报错信息为XXX**
+
+ A: 网络或者权限问题,请使用Oracle命令行测试:
+ sqlplus username/password@//host:port/sid
+
+
+如果上述命令也报错,那可以证实是环境问题,请联系你的DBA。
+
+
+**Q: OracleReader抽取速度很慢怎么办?**
+
+ A: 影响抽取时间的原因大概有如下几个:(来自专业 DBA 卫绾)
+ 1. 由于SQL的plan异常,导致的抽取时间长; 在抽取时,尽可能使用全表扫描代替索引扫描;
+ 2. 合理sql的并发度,减少抽取时间;根据表的大小,
+ <50G可以不用并发,
+ <100G添加如下hint: parallel(a,2),
+ >100G添加如下hint : parallel(a,4);
+ 3. 抽取sql要简单,尽量不用replace等函数,这个非常消耗cpu,会严重影响抽取速度;
diff --git a/rdb2graph/oraclereader4graph/pom.xml b/rdb2graph/oraclereader4graph/pom.xml
new file mode 100644
index 00000000..f044e904
--- /dev/null
+++ b/rdb2graph/oraclereader4graph/pom.xml
@@ -0,0 +1,90 @@
+
+
+
+ com.leehom.arch.datax.plugin
+ rdb2graph-parent
+ ${revision}
+
+ 4.0.0
+
+ oraclereader4graph
+ jar
+
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax.version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+ ch.qos.logback
+ logback-classic
+
+
+
+
+ com.oracle
+ ojdbc6
+ 11.2.0.3
+ system
+ ${basedir}/src/main/lib/ojdbc6-11.2.0.3.jar
+
+
+
+ com.leehom.arch.datax.plugin
+ rdb2graph-scanner
+ 1.0.0-SNAPSHOT
+
+
+ com.leehom.arch.datax.plugin
+ rdb2graph-rdbms-util
+ 1.0.0-SNAPSHOT
+
+
+
+
+
+
+
+ maven-compiler-plugin
+
+ ${java.version}
+ ${java.version}
+ ${project-sourceEncoding}
+
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
+
diff --git a/rdb2graph/oraclereader4graph/src/main/assembly/package.xml b/rdb2graph/oraclereader4graph/src/main/assembly/package.xml
new file mode 100644
index 00000000..8c681e5c
--- /dev/null
+++ b/rdb2graph/oraclereader4graph/src/main/assembly/package.xml
@@ -0,0 +1,42 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/reader/oraclereader4graph
+
+
+ src/main/lib
+
+ ojdbc6-11.2.0.3.jar
+
+ plugin/reader/oraclereader4graph/libs
+
+
+ target/
+
+ oraclereader4graph-1.0.0-SNAPSHOT.jar
+
+ plugin/reader/oraclereader4graph
+
+
+
+
+
+ false
+ plugin/reader/oraclereader4graph/libs
+ runtime
+
+
+
diff --git a/rdb2graph/oraclereader4graph/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/reader/oraclereader/Constant.java b/rdb2graph/oraclereader4graph/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/reader/oraclereader/Constant.java
new file mode 100644
index 00000000..099f1795
--- /dev/null
+++ b/rdb2graph/oraclereader4graph/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/reader/oraclereader/Constant.java
@@ -0,0 +1,7 @@
+package com.leehom.arch.datax.plugin.rdb2graph.reader.oraclereader;
+
+public class Constant {
+
+ public static final int DEFAULT_FETCH_SIZE = 1024;
+
+}
diff --git a/rdb2graph/oraclereader4graph/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/reader/oraclereader/OracleReader4Graph.java b/rdb2graph/oraclereader4graph/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/reader/oraclereader/OracleReader4Graph.java
new file mode 100644
index 00000000..4c95a666
--- /dev/null
+++ b/rdb2graph/oraclereader4graph/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/reader/oraclereader/OracleReader4Graph.java
@@ -0,0 +1,143 @@
+package com.leehom.arch.datax.plugin.rdb2graph.reader.oraclereader;
+
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordSender;
+import com.alibaba.datax.common.spi.Reader;
+import com.alibaba.datax.common.util.Configuration;
+import com.leehom.arch.datax.plugin.rdb2graph.rdbms.reader.CommonRdbms2GraphReader;
+import com.leehom.arch.datax.plugin.rdb2graph.rdbms.reader.Key;
+import com.leehom.arch.datax.plugin.rdb2graph.rdbms.reader.util.HintUtil;
+import com.leehom.arch.datax.plugin.rdb2graph.rdbms.util.DBUtilErrorCode;
+import com.leehom.arch.datax.plugin.rdb2graph.rdbms.util.DataBaseType;
+
+/**
+ * @类名: OracleReader4Graph
+ * @说明: oracle读入
+ * 1.表数据同步,datax table模式,schema表写入datax的配置
+ 2.表关系同步,按表连接图分组,一对关系一组,表连接关系生成querySql,写入datax的querySql模式配置
+
+ *
+ * @author leehom
+ * @Date 2022年4月28日 下午6:30:15
+ * 修改记录:
+ *
+ * @see
+ */
+public class OracleReader4Graph extends Reader {
+
+ private static final DataBaseType DATABASE_TYPE = DataBaseType.Oracle;
+
+ public static class Job extends Reader.Job {
+ private static final Logger LOG = LoggerFactory.getLogger(OracleReader4Graph.Job.class);
+
+ //
+ private Configuration originalConfig = null;
+ private CommonRdbms2GraphReader.Job commonRdbmsReaderJob;
+
+ @Override
+ public void init() {
+ // 载入配置
+ this.originalConfig = super.getPluginJobConf();
+ //
+ dealFetchSize(this.originalConfig);
+ //
+ this.commonRdbmsReaderJob = new CommonRdbms2GraphReader.Job(DATABASE_TYPE);
+ //
+ this.commonRdbmsReaderJob.init(this.originalConfig);
+
+ // 注意:要在 this.commonRdbmsReaderJob.init(this.originalConfig); 之后执行,这样可以直接快速判断是否是querySql 模式
+ dealHint(this.originalConfig);
+ }
+
+ // 检查连接/表是否可查询
+ @Override
+ public void preCheck(){
+ init();
+ // 检测
+ this.commonRdbmsReaderJob.preCheck(this.originalConfig, DATABASE_TYPE);
+ }
+
+ // 分片,包括多表分配,表分片
+ @Override
+ public List split(int adviceNumber) {
+ return this.commonRdbmsReaderJob.split(this.originalConfig, adviceNumber);
+ }
+
+ @Override
+ public void post() {
+ this.commonRdbmsReaderJob.post(this.originalConfig);
+ }
+
+ @Override
+ public void destroy() {
+ this.commonRdbmsReaderJob.destroy(this.originalConfig);
+ }
+
+ // fetch size
+ private void dealFetchSize(Configuration originalConfig) {
+ int fetchSize = originalConfig.getInt(
+ com.leehom.arch.datax.plugin.rdb2graph.rdbms.reader.Constant.FETCH_SIZE,
+ Constant.DEFAULT_FETCH_SIZE);
+ if (fetchSize < 1) {
+ throw DataXException
+ .asDataXException(DBUtilErrorCode.REQUIRED_VALUE,
+ String.format("您配置的 fetchSize 有误,fetchSize:[%d] 值不能小于 1.",
+ fetchSize));
+ }
+ originalConfig.set(
+ com.leehom.arch.datax.plugin.rdb2graph.rdbms.reader.Constant.FETCH_SIZE, fetchSize);
+ }
+
+ private void dealHint(Configuration originalConfig) {
+ String hint = originalConfig.getString(Key.HINT);
+ if (StringUtils.isNotBlank(hint)) {
+ boolean isTableMode = originalConfig.getBool(com.leehom.arch.datax.plugin.rdb2graph.rdbms.reader.Constant.IS_TABLE_MODE).booleanValue();
+ if(!isTableMode){
+ throw DataXException.asDataXException(OracleReaderErrorCode.HINT_ERROR, "当且仅当非 querySql 模式读取 oracle 时才能配置 HINT.");
+ }
+ HintUtil.initHintConf(DATABASE_TYPE, originalConfig);
+ }
+ }
+ }
+
+ public static class Task extends Reader.Task {
+
+ private Configuration readerSliceConfig;
+ private CommonRdbms2GraphReader.Task commonRdbmsReaderTask;
+
+ @Override
+ public void init() {
+ this.readerSliceConfig = super.getPluginJobConf();
+ this.commonRdbmsReaderTask = new CommonRdbms2GraphReader.Task(
+ DATABASE_TYPE ,super.getTaskGroupId(), super.getTaskId());
+ this.commonRdbmsReaderTask.init(this.readerSliceConfig);
+ }
+
+ @Override
+ public void startRead(RecordSender recordSender) {
+ int fetchSize = this.readerSliceConfig
+ .getInt(com.leehom.arch.datax.plugin.rdb2graph.rdbms.reader.Constant.FETCH_SIZE);
+
+ this.commonRdbmsReaderTask.startRead(this.readerSliceConfig,
+ recordSender, super.getTaskPluginCollector(), fetchSize);
+ }
+
+ @Override
+ public void post() {
+ this.commonRdbmsReaderTask.post(this.readerSliceConfig);
+ }
+
+ @Override
+ public void destroy() {
+ this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);
+ }
+
+ }
+
+}
diff --git a/rdb2graph/oraclereader4graph/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/reader/oraclereader/OracleReaderErrorCode.java b/rdb2graph/oraclereader4graph/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/reader/oraclereader/OracleReaderErrorCode.java
new file mode 100644
index 00000000..f4441011
--- /dev/null
+++ b/rdb2graph/oraclereader4graph/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/reader/oraclereader/OracleReaderErrorCode.java
@@ -0,0 +1,32 @@
+package com.leehom.arch.datax.plugin.rdb2graph.reader.oraclereader;
+
+import com.alibaba.datax.common.spi.ErrorCode;
+
+public enum OracleReaderErrorCode implements ErrorCode {
+ HINT_ERROR("Oraclereader-00", "您的 Hint 配置出错."),
+ ;
+
+ private final String code;
+ private final String description;
+
+ private OracleReaderErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Code:[%s], Description:[%s]. ", this.code,
+ this.description);
+ }
+}
diff --git a/rdb2graph/oraclereader4graph/src/main/resources/plugin.json b/rdb2graph/oraclereader4graph/src/main/resources/plugin.json
new file mode 100644
index 00000000..eab3d3bf
--- /dev/null
+++ b/rdb2graph/oraclereader4graph/src/main/resources/plugin.json
@@ -0,0 +1,6 @@
+{
+ "name": "oraclereader4graph",
+ "class": "com.leehom.arch.datax.plugin.rdb2graph.reader.oraclereader.OracleReader4Graph",
+ "description": "",
+ "developer": "leehom"
+}
\ No newline at end of file
diff --git a/rdb2graph/oraclereader4graph/src/main/resources/plugin_job_template.json b/rdb2graph/oraclereader4graph/src/main/resources/plugin_job_template.json
new file mode 100644
index 00000000..cc06128a
--- /dev/null
+++ b/rdb2graph/oraclereader4graph/src/main/resources/plugin_job_template.json
@@ -0,0 +1,16 @@
+{
+ "name": "oraclereader4graph",
+ "parameter": {
+ "username": "",
+ "password": "",
+ "phase": "",
+ "schemaUri": "",
+ "column": [],
+ "connection": [
+ {
+ "table": [],
+ "jdbcUrl": []
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git a/rdb2graph/pom.xml b/rdb2graph/pom.xml
new file mode 100644
index 00000000..e222d0f4
--- /dev/null
+++ b/rdb2graph/pom.xml
@@ -0,0 +1,146 @@
+
+
+ 4.0.0
+
+ com.alibaba.datax
+ datax-all
+ 0.0.1-SNAPSHOT
+
+
+ com.leehom.arch.datax.plugin
+ rdb2graph-parent
+ pom
+ ${revision}
+
+
+ 1.0.0-SNAPSHOT
+ 1.8
+ 1.2.36
+ 1.4.10
+ 27.0.1-jre
+ 0.0.1-SNAPSHOT
+ 1.18.12
+ 1.1.23
+ 1.18.12
+ 1.0.1-SNAPSHOT
+
+
+
+
+ rdb2graph-scanner
+ rdb2graph-transformer
+ neo4jwriter
+ mysqlreader4graph
+ oraclereader4graph
+ rdb2grpah-rdbms-util
+ rdb2graph-common
+ rdb2graph-datax
+
+
+
+
+ rdc-releases
+ https://repo.rdc.aliyun.com/repository/128947-release-JP7RVf/
+
+
+ rdc-snapshots
+ https://repo.rdc.aliyun.com/repository/128947-snapshot-2gBBfM/
+
+
+
+
+
+
+ com.alibaba
+ fastjson
+ ${fastjson.version}
+
+
+ com.alibaba.datax
+ datax-core
+ 0.0.1-SNAPSHOT
+
+
+ com.alibaba.datax
+ datax-common
+ 0.0.1-SNAPSHOT
+
+
+ org.projectlombok
+ lombok
+ ${lombok.version}
+ provided
+
+
+ com.alibaba
+ druid
+ ${druid.version}
+
+
+ com.google.guava
+ guava
+ ${guava.version}
+
+
+ com.thoughtworks.xstream
+ xstream
+ ${xstream.version}
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.1
+
+ ${java.version}
+ ${java.version}
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ ${spring-boot.version}
+
+ true
+
+
+
+
+
+
+ org.codehaus.mojo
+ flatten-maven-plugin
+ 1.1.0
+
+
+ true
+ oss
+
+
+
+ flatten
+ process-resources
+
+ flatten
+
+
+
+ flatten.clean
+ clean
+
+ clean
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/rdb2graph/rdb2graph-common/pom.xml b/rdb2graph/rdb2graph-common/pom.xml
new file mode 100644
index 00000000..a7e9b567
--- /dev/null
+++ b/rdb2graph/rdb2graph-common/pom.xml
@@ -0,0 +1,54 @@
+
+ 4.0.0
+
+ com.leehom.arch.datax.plugin
+ rdb2graph-parent
+ ${revision}
+
+ rdb2graph-common
+ jar
+
+
+ 0.9.0
+ UTF-8
+
+
+
+
+
+ com.google.guava
+ guava
+
+
+
+ org.projectlombok
+ lombok
+
+
+
+ com.alibaba.datax
+ datax-core
+
+
+ com.thoughtworks.xstream
+ xstream
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 3.0.2
+
+
+ logback.xml
+ application.yml
+
+
+
+
+
+
diff --git a/rdb2graph/rdb2graph-common/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/common/BeanUtils.java b/rdb2graph/rdb2graph-common/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/common/BeanUtils.java
new file mode 100644
index 00000000..6da1aebf
--- /dev/null
+++ b/rdb2graph/rdb2graph-common/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/common/BeanUtils.java
@@ -0,0 +1,158 @@
+/**
+ *
+ */
+package com.leehom.arch.datax.plugin.rdb2graph.common;
+
+import java.io.PrintStream;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * 类名: BeanUtils
+ * 说明: Bean工具类
+ *
+ * @author leehom
+ * @Date Nov 30, 2009 4:09:36 PM
+ * 修改记录:
+ *
+ * @see
+ */
+public class BeanUtils {
+
+ /**
+ * @说明:输出Bean的属性/深输出Bean的属性
+ *
+ * @param '输出流'
+ * @author hjli
+ *
+ * @异常:
+ */
+ public static void printBean(Object bean) {
+ printBean(bean, System.out);
+ }
+
+ public static void printBean(Object bean, PrintStream out) {
+ out.println(bean.toString());
+ }
+
+ public static void printBeanDeep(Object bean) {
+ printBeanDeep(bean, System.out);
+ }
+
+ public static void printBeanDeep(Object bean, PrintStream out) {
+ try {
+ Map m = org.apache.commons.beanutils.BeanUtils.describe(bean);
+ for (Object o : m.keySet()) {
+ if(o==null){
+ out.println("Null value field");
+ continue;
+ }
+ out.println(o.toString()+":"+m.get(o));
+ }
+ } catch(Exception ex) {
+ throw new RuntimeException(ex.getMessage());
+ }
+ }
+
+ public static Long number2Long(Number num) {
+ if(num!=null)
+ return num.longValue();
+ return null;
+
+ }
+
+ public static Integer number2Integer(Number num) {
+ if(num!=null)
+ return num.intValue();
+ return null;
+
+ }
+
+ public static Double number2Double(Number num) {
+ if(num!=null)
+ return num.doubleValue();
+ return null;
+
+ }
+
+ public static Short number2Short(Number num) {
+ if(num!=null)
+ return num.shortValue();
+ return null;
+
+ }
+
+ public static Byte number2Byte(Number num) {
+ if(num!=null)
+ return num.byteValue();
+ return null;
+
+ }
+
+ public static Double bigDecimal2Double(BigDecimal num) {
+ if(num!=null)
+ return num.doubleValue();
+ return null;
+
+ }
+
+ public static Long bigDecimal2Long(BigDecimal num) {
+ if(num!=null)
+ return num.longValue();
+ return null;
+
+ }
+
+ public static Integer bigDecimal2Integer(BigDecimal num) {
+ if(num!=null)
+ return num.intValue();
+ return null;
+
+ }
+
+ public static Integer bigInteger2Integer(BigInteger num) {
+ if(num!=null)
+ return num.intValue();
+ return null;
+
+ }
+
+ public static Long bigInteger2Long(BigInteger num) {
+ if(num!=null)
+ return num.longValue();
+ return null;
+
+ }
+
+ public static Date stringToDate(String dateStr, Class> clazz) {
+ if(dateStr==null)
+ return null;
+ return DateTimeUtils.StringToDate(dateStr);
+
+ }
+
+ public static double doublePrecision(double d, short precision) {
+ BigDecimal bg = new BigDecimal(d);
+ double d2 = bg.setScale(precision, BigDecimal.ROUND_HALF_DOWN).doubleValue();
+ return d2;
+ }
+
+ public static Class getTemplateType(Object obj) {
+ Class clazz = null;
+ Class> c = obj.getClass();
+ Type t = c.getGenericSuperclass();
+ if (t instanceof ParameterizedType) {
+ Type[] p = ((ParameterizedType) t).getActualTypeArguments();
+ if(p[0] instanceof ParameterizedType )
+ clazz = (Class) ((ParameterizedType) p[0]).getRawType();
+ else
+ clazz = (Class) p[0];
+ }
+ return clazz;
+
+ }
+}
diff --git a/rdb2graph/rdb2graph-common/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/common/ByteAndStreamUtils.java b/rdb2graph/rdb2graph-common/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/common/ByteAndStreamUtils.java
new file mode 100644
index 00000000..0ce1775a
--- /dev/null
+++ b/rdb2graph/rdb2graph-common/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/common/ByteAndStreamUtils.java
@@ -0,0 +1,125 @@
+/**
+ * %流程框架%
+ * %1.0%
+ */
+package com.leehom.arch.datax.plugin.rdb2graph.common;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * @类名: ByteAndStreamUtils
+ * @说明: 字节与流工具类
+ *
+ * @author leehom
+ * @Date 2010-4-2 下午02:19:25
+ * 修改记录:
+ *
+ * @see
+ */
+public class ByteAndStreamUtils {
+
+ /**
+ * @说明:流转换成字节
+ *
+ * @author hjli
+ * @param is
+ * @return
+ *
+ * @异常:
+ */
+ public static byte[] StreamToBytes(InputStream is) {
+ BufferedInputStream bis = null;
+ try {
+ is = new BufferedInputStream(is);
+ byte[] bytes = new byte[is.available()];
+ int len = bytes.length;
+ int offset = 0;
+ int read = 0;
+ while (offset < len
+ && (read = is.read(bytes, offset, len - offset)) > 0) {
+ offset += read;
+ }
+ return bytes;
+ } catch (Exception e) {
+ return null;
+ } finally {
+ try {
+ is.close();
+ is = null;
+ } catch (IOException e) {
+ return null;
+ }
+
+ }
+ }
+ // A block of stream to bytes
+ public static byte[] StreamBlockToBytes(InputStream is, long offset, int size) {
+ BufferedInputStream bis = null;
+ try {
+ is = new BufferedInputStream(is);
+ // Skip to the position where to start receiving bytes
+ is.skip(offset);
+ // Actual data size that would be get
+ int datSize = is.available()< size ? is.available() : size;
+ byte[] bytes = new byte[datSize];
+ // Offset of data bytes which to start storing bytes
+ int dataOffset = 0;
+ int read = 0;
+ while (dataOffset < size
+ && (read = is.read(bytes, dataOffset, datSize - dataOffset)) > 0) {
+ dataOffset += read;
+ }
+ return bytes;
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ /**
+ * 从字节数组获取对象
+ * @Author Sean.guo
+ * @EditTime 2007-8-13 上午11:46:34
+ */
+ public static Object objectFromBytes(byte[] objBytes) throws IOException, ClassNotFoundException {
+ if (objBytes == null || objBytes.length == 0) {
+ return null;
+ }
+ ByteArrayInputStream bi = new ByteArrayInputStream(objBytes);
+ ObjectInputStream oi = new ObjectInputStream(bi);
+ return oi.readObject();
+ }
+
+ /**
+ * 从对象获取一个字节数组
+ * @Author Sean.guo
+ * @EditTime 2007-8-13 上午11:46:56
+ */
+ public static byte[] objectToBytes(Serializable obj) throws IOException {
+ if (obj == null) {
+ return null;
+ }
+ ByteArrayOutputStream bo = new ByteArrayOutputStream();
+ ObjectOutputStream oo = new ObjectOutputStream(bo);
+ oo.writeObject(obj);
+ return bo.toByteArray();
+ }
+
+ // 深克隆对象
+ public static Object deepClone(Serializable obj) throws IOException, ClassNotFoundException {
+ ByteArrayOutputStream bo = new ByteArrayOutputStream();
+ ObjectOutputStream oo = new ObjectOutputStream(bo);
+ oo.writeObject(obj);
+ //
+ ByteArrayInputStream bi = new ByteArrayInputStream(bo.toByteArray());
+ ObjectInputStream oi = new ObjectInputStream(bi);
+ return oi.readObject();
+ }
+
+}
diff --git a/rdb2graph/rdb2graph-common/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/common/DateTimeUtils.java b/rdb2graph/rdb2graph-common/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/common/DateTimeUtils.java
new file mode 100644
index 00000000..ea6662f3
--- /dev/null
+++ b/rdb2graph/rdb2graph-common/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/common/DateTimeUtils.java
@@ -0,0 +1,306 @@
+package com.leehom.arch.datax.plugin.rdb2graph.common;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * 类名: DateTimeUtils.java
+ * 说明: 日期时间工具类
+ *
+ * @author leehom
+ * @Date Apr 23, 2009 5:36:41 AM
+ *
+ * @see
+ */
+public class DateTimeUtils {
+
+ /**
+ * 默认数据模式
+ */
+ public static final String defaultPatten = "yyyy-MM-dd HH:mm:ss";
+ public static final String defaultPattenMillis = "yyyy-MM-dd HH:mm:ss.SSS";
+ public static final String defaultDatePatten = "yyyyMMdd";
+ public static final String DatePatten2 = "yyyy-MM-dd";
+ // ISO8601 Date Type Pattern
+ public static final String ISO8601Patten = "yyyy-MM-dd'T'HH:mm:ssZZ";
+ public static final String ISO8601PattenNoZone = "yyyy-MM-dd'T'HH:mm:ss";
+ public static final String ISO8601PattenWithMillis = "yyyy-MM-dd'T'HH:mm:ss.SSSZZ";
+
+
+ /**
+ * 说明:Date to String
+ *
+ * @author hjli
+ * @Param @param date
+ * @Param @return
+ * @Return String
+ *
+ */
+ public static String DateToString(Date date) {
+ return DateToString(date, defaultPatten);
+ }
+
+ public static String DateToString(Date date, String pattern) {
+ SimpleDateFormat sdf = new SimpleDateFormat(pattern);
+ return sdf.format(date);
+ }
+
+ /**
+ * @说明:String to Date
+ *
+ * @author leehong
+ * @param dateStr
+ * @return Date
+ *
+ * @异常:
+ */
+ public static Date StringToDate(String dateStr) {
+ return StringToDate(dateStr, defaultPatten);
+ }
+
+ public static Date StringToDate(String dateStr, String pattern) {
+ if(dateStr==null)
+ return null;
+ SimpleDateFormat sdf = new SimpleDateFormat(pattern);
+ try {
+ return sdf.parse(dateStr);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ return null;
+ }
+ }
+
+ /**
+ * 获取当前日期
+ *
+ * @author hjli
+ * @Param @return
+ * @Return String
+ *
+ * TODO
+ *
+ */
+ public static String getCurrentDateTimeText(String pattern) {
+ if ((pattern==null)||(pattern.equals(""))) {
+ pattern = defaultPatten;
+ }
+ SimpleDateFormat sdf = new SimpleDateFormat(pattern);
+ return sdf.format(Calendar.getInstance().getTime());
+ }
+
+ public static String getCurrentDateTimeText() {
+ return getCurrentDateTimeText(defaultPatten);
+ }
+
+ /**
+ * 说明:获取前一小时时间
+ *
+ * @author hjli
+ * @Param @param pattern
+ * @Param @return
+ * @Return String
+ *
+ * TODO
+ *
+ */
+ public static String getPreHourText(String pattern) {
+ Calendar c = Calendar.getInstance();
+ return getPreHourText(c.getTime(), pattern);
+ }
+
+ public static String getPreHourText(Date date, String pattern) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(date);
+ c.add(Calendar.HOUR_OF_DAY, -1);
+ SimpleDateFormat sdf = new SimpleDateFormat(pattern);
+ return sdf.format(c.getTime());
+ }
+
+ public static Date getPreHour(Date date) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(date);
+ c.add(Calendar.HOUR_OF_DAY, -1);
+ return c.getTime();
+ }
+
+ /** 获取前n小时时间*/
+ public static Date getPreHourN(Date date, int n) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(date);
+ c.add(Calendar.HOUR_OF_DAY, 0-n);
+ return c.getTime();
+ }
+
+ public static Date getNextHour(Date date) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(date);
+ c.add(Calendar.HOUR_OF_DAY, 1);
+ return c.getTime();
+
+ }
+
+ public static Date getNextHourN(Date date, int n) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(date);
+ c.add(Calendar.HOUR_OF_DAY, n);
+ return c.getTime();
+
+ }
+
+ /**
+ * 说明:获取当前日前一日
+ *
+ * @author hjli
+ * @Param @param pattern
+ * @Param @return
+ * @Return String
+ *
+ * TODO
+ *
+ */
+ public static String getPreDayText(String pattern) {
+ Calendar c = Calendar.getInstance();
+ c.add(Calendar.DATE, -1);
+ SimpleDateFormat sdf = new SimpleDateFormat(pattern);
+ return sdf.format(c.getTime());
+ }
+
+ public static Date getPreDay(Date date) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(date);
+ c.add(Calendar.DATE, -1);
+ return c.getTime();
+ }
+
+ /** 获取前n天时间*/
+ public static Date getPreDayN(Date date, int n) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(date);
+ c.add(Calendar.DATE, 0-n);
+ return c.getTime();
+ }
+
+ public static Date getNextDay(Date date) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(date);
+ c.add(Calendar.DATE, 1);
+ return c.getTime();
+
+ }
+
+ public static Date getNextDayN(Date date, int n) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(date);
+ c.add(Calendar.DATE, n);
+ return c.getTime();
+
+ }
+
+ public static Date getPreWeek(Date date) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(date);
+ c.add(Calendar.DATE, -7);
+ return c.getTime();
+ }
+
+ /**
+ * 说明:获取当前月前一月
+ *
+ * @author hjli
+ * @Param @param pattern
+ * @Param @return
+ * @Return String
+ *
+ */
+ public static String getPreMonthText(String pattern) {
+ Calendar c = Calendar.getInstance();
+ c.add(Calendar.MONTH, -1);
+ SimpleDateFormat sdf = new SimpleDateFormat(pattern);
+ return sdf.format(c.getTime());
+ }
+
+ public static Date getPreMonth(Date date) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(date);
+ c.add(Calendar.MONTH, -1);
+ return c.getTime();
+ }
+
+ /** 获取前n月时间*/
+ public static Date getPreMonthN(Date date, int n) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(date);
+ c.add(Calendar.MONTH, 0-n);
+ return c.getTime();
+ }
+
+ public static Date getNextMonth(Date date) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(date);
+ c.add(Calendar.MONTH, 1);
+ return c.getTime();
+
+ }
+
+ public static Date getNextMonthN(Date date, int n) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(date);
+ c.add(Calendar.MONTH, n);
+ return c.getTime();
+
+ }
+
+ // 获取前一年
+ public static String getPreYearText(String pattern) {
+ Calendar c = Calendar.getInstance();
+ c.add(Calendar.YEAR, -1);
+ SimpleDateFormat sdf = new SimpleDateFormat(pattern);
+ return sdf.format(c.getTime());
+ }
+
+ public static Date getPreYear(Date date) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(date);
+ c.add(Calendar.YEAR, -1);
+ return c.getTime();
+ }
+
+ /** 获取前n年*/
+ public static Date getPreYearN(Date date, int n) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(date);
+ c.add(Calendar.YEAR, 0-n);
+ return c.getTime();
+ }
+
+ /** 获取下一年*/
+ public static Date getNextYear(Date date) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(date);
+ c.add(Calendar.YEAR, 1);
+ return c.getTime();
+
+ }
+
+ public static Date getNextYearN(Date date, int n) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(date);
+ c.add(Calendar.YEAR, n);
+ return c.getTime();
+
+ }
+
+ // the right date type should be like "2012-12-15T00:00:00+08:00"
+ public static void validateDateFormat(String dateString) throws Exception {
+ Pattern pattern = Pattern.compile("\\d{4}\\-\\d{2}\\-\\d{2}T\\d{2}:\\d{2}:\\d{2}[\\+|\\-]\\d{2}:\\d{2}");
+ Matcher matcher = pattern.matcher(dateString);
+ boolean isValidated = matcher.matches();
+ if (!isValidated) {
+ throw new Exception("'" + dateString + "' is not a validate " +
+ "date string. Please follow this sample:2012-12-15T00:00:00+08:00");
+ }
+ }
+}
diff --git a/rdb2graph/rdb2graph-common/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/common/RelRecord.java b/rdb2graph/rdb2graph-common/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/common/RelRecord.java
new file mode 100644
index 00000000..49ecef30
--- /dev/null
+++ b/rdb2graph/rdb2graph-common/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/common/RelRecord.java
@@ -0,0 +1,43 @@
+/**
+ * %datax-graph%
+ * %v1.0%
+ */
+package com.leehom.arch.datax.plugin.rdb2graph.common;
+
+import com.alibaba.datax.core.transport.record.DefaultRecord;
+
+/**
+ * @类名: RelRecord
+ * @说明: 关系record
+ *
+ * @author leehom
+ * @Date 2022年4月26日 下午7:31:18
+ * 修改记录:
+ *
+ * @see
+ */
+public class RelRecord extends DefaultRecord {
+
+ /** 关系起始表或者连接表*/
+ private String fromTable;
+ /**
+ * 若连接表,fk则为连接的起点的关联表
+ * 若为一般表,fk则为连接的外键,表可有多个外键
+ */
+ private String fk;
+
+ public String getFk() {
+ return fk;
+ }
+ public void setFk(String fk) {
+ this.fk = fk;
+ }
+ public String getFromTable() {
+ return fromTable;
+ }
+ public void setFromTable(String fromTable) {
+ this.fromTable = fromTable;
+ }
+
+
+}
diff --git a/rdb2graph/rdb2graph-common/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/common/ResourceLoaderUtil.java b/rdb2graph/rdb2graph-common/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/common/ResourceLoaderUtil.java
new file mode 100644
index 00000000..1abddd41
--- /dev/null
+++ b/rdb2graph/rdb2graph-common/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/common/ResourceLoaderUtil.java
@@ -0,0 +1,251 @@
+package com.leehom.arch.datax.plugin.rdb2graph.common;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @类名: ResourceLoaderUtil
+ * @说明: 资源加载工具类
+ *
+ * @author leehom
+ * @Date 2018-9-9 上午12:05:39
+ * 修改记录:
+ *
+ * @see
+ */
+public class ResourceLoaderUtil {
+
+ private static Logger log = LoggerFactory.getLogger(ResourceLoaderUtil.class);
+
+ /**
+ *
+ * get a class with the name specified
+ *
+ * @paramclassName
+ *
+ * @return
+ *
+ */
+ public static Class loadClass(String className) {
+ try {
+ return getClassLoader().loadClass(className);
+ }
+ catch (ClassNotFoundException e) {
+ throw new RuntimeException("class not found '" + className + "'", e);
+ }
+ }
+
+ /**
+ *
+ * get the class loader
+ *
+ * @return
+ *
+ */
+ public static ClassLoader getClassLoader() {
+ return ResourceLoaderUtil.class.getClassLoader();
+ }
+
+ /**
+ *
+ * get the file that from the path that related to current class path
+ * example, there is the file root like this:
+ * |--resource/xml/target.xml
+ * |--classes
+ * in order to get the xml file the api should called like this:
+ * getStream("../resource/xml/target.xml");
+ * @paramrelativePath
+ *
+ * @return
+ *
+ * @throwsIOException
+ *
+ * @throwsMalformedURLException
+ *
+ */
+ public static InputStream getResourceStream(String relativePath) throws MalformedURLException, IOException {
+ if (!relativePath.contains("../")) {
+ return getClassLoader().getResourceAsStream(relativePath);
+ }
+ else {
+ return ResourceLoaderUtil.getStreamByExtendResource(relativePath);
+ }
+ }
+
+ /**
+ *
+ *
+ *
+ * @paramurl
+ *
+ * @return
+ *
+ * @throwsIOException
+ *
+ */
+ public static InputStream getStream(URL url) throws IOException {
+ if (url != null) {
+ return url.openStream();
+ }
+ else {
+ return null;
+ }
+ }
+
+ /**
+ *
+ * get the file that from the path that related to current class path
+ * example, there is the file root like this:
+ * |--resource/xml/target.xml
+ * |--classes
+ * in order to get the xml file the api should called like this:
+ * getStream("../resource/xml/target.xml");
+ * @return
+ *
+ * @throwsMalformedURLException
+ *
+ * @throwsIOException
+ *
+ */
+ public static InputStream getStreamByExtendResource(String relativePath) throws MalformedURLException, IOException {
+ return ResourceLoaderUtil.getStream(ResourceLoaderUtil.getExtendResource(relativePath));
+ }
+
+ /**
+ *
+ *
+ * @paramresource
+ *
+ * @return
+ *
+ */
+ public static Properties getProperties(String resource) {
+ Properties properties = new Properties();
+ try {
+ properties.load(getResourceStream(resource));
+ }
+ catch (IOException e) {
+ throw new RuntimeException("couldn't load properties file '" + resource + "'", e);
+ }
+ return properties;
+ }
+
+ /**
+ *
+ * get the absolute path of the class loader of this Class
+ *
+ *
+ * @return
+ *
+ */
+ public static String getAbsolutePathOfClassLoaderClassPath() {
+ //ClassLoaderUtil.log.info(ClassLoaderUtil.getClassLoader().getResource("").toString());
+ return ResourceLoaderUtil.getClassLoader().getResource("").getPath();
+ }
+
+ /**
+ *
+ * get the file that from the path that related to current class path
+ * example, there is the file root like this:
+ * |--resource/xml/target.xml
+ * |--classes
+ * in order to get the xml file the api should called like this:
+ * getStream("../resource/xml/target.xml");
+ *
+ * @throwsMalformedURLException
+ *
+ */
+ public static URL getExtendResource(String relativePath) throws MalformedURLException {
+ //ClassLoaderUtil.log.info("The income relative path:" + relativePath);
+ // ClassLoaderUtil.log.info(Integer.valueOf(relativePath.indexOf("../"))) ;
+ if (!relativePath.contains("../")) {
+ return ResourceLoaderUtil.getResource(relativePath);
+ }
+ String classPathAbsolutePath = ResourceLoaderUtil.getAbsolutePathOfClassLoaderClassPath();
+ if (relativePath.substring(0, 1).equals("/")) {
+ relativePath = relativePath.substring(1);
+ }
+ //ClassLoaderUtil.log.info(Integer.valueOf(relativePath.lastIndexOf("../")));
+ String wildcardString = relativePath.substring(0, relativePath.lastIndexOf("../") + 3);
+ relativePath = relativePath.substring(relativePath.lastIndexOf("../") + 3);
+ int containSum = ResourceLoaderUtil.containSum(wildcardString, "../");
+ classPathAbsolutePath = ResourceLoaderUtil.cutLastString(classPathAbsolutePath, "/", containSum);
+ String resourceAbsolutePath = classPathAbsolutePath + relativePath;
+ //ClassLoaderUtil.log.info("The income absolute path:" + resourceAbsolutePath);
+ URL resourceAbsoluteURL = new URL(resourceAbsolutePath);
+ return resourceAbsoluteURL;
+ }
+
+ /**
+ *
+ *
+ *
+ * @paramsource
+ *
+ * @paramdest
+ *
+ * @return
+ *
+ */
+ private static int containSum(String source, String dest) {
+ int containSum = 0;
+ int destLength = dest.length();
+ while (source.contains(dest)) {
+ containSum = containSum + 1;
+ source = source.substring(destLength);
+ }
+ return containSum;
+ }
+
+ /**
+ *
+ *
+ *
+ * @paramsource
+ *
+ * @paramdest
+ *
+ * @paramnum
+ *
+ * @return
+ *
+ */
+ private static String cutLastString(String source, String dest, int num) {
+ // String cutSource=null;
+ for (int i = 0; i < num; i++) {
+ source = source.substring(0, source.lastIndexOf(dest, source.length() - 2) + 1);
+ }
+ return source;
+ }
+
+ /**
+ *
+ *
+ *
+ * @paramresource
+ *
+ * @return
+ *
+ */
+ public static URL getResource(String resource) {
+ //ClassLoaderUtil.log.info("The income classpath related path:" + resource);
+ return ResourceLoaderUtil.getClassLoader().getResource(resource);
+ }
+
+ public static File getFile(String resource) throws URISyntaxException {
+ URL url = ResourceLoaderUtil.getClassLoader().getResource(resource);
+ if(url==null)
+ return null;
+ File file=new File(url.getPath());
+ return file;
+ }
+
+}
diff --git a/rdb2graph/rdb2graph-common/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/common/StringUtils.java b/rdb2graph/rdb2graph-common/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/common/StringUtils.java
new file mode 100644
index 00000000..23cf5a8e
--- /dev/null
+++ b/rdb2graph/rdb2graph-common/src/main/java/com/leehom/arch/datax/plugin/rdb2graph/common/StringUtils.java
@@ -0,0 +1,221 @@
+/**
+ * %utils%
+ * %1.0%
+ */
+package com.leehom.arch.datax.plugin.rdb2graph.common;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * @类名: StringUtils
+ * @说明:
+ *
+ * @author leehom
+ * @Date 2021年11月27日 下午4:47:54
+ * 修改记录:
+ *
+ * @see
+ */
+public class StringUtils {
+
+ public static final String EMPTY = "";
+
+ /**
+ * Pads a String s
to take up n
characters,
+ * padding with char c
on the left (true
) or on
+ * the right (false
). Returns null
if passed a
+ * null
String.
+ **/
+ public static String paddingString(String s, int n, char c, boolean paddingLeft) {
+ if (s == null) {
+ return s;
+ }
+ int add = n - s.length(); // may overflow int size... should not be a
+ // problem in real life
+ if (add <= 0) {
+ return s;
+ }
+ StringBuffer str = new StringBuffer(s);
+ char[] ch = new char[add];
+ Arrays.fill(ch, c);
+ if (paddingLeft) {
+ str.insert(0, ch);
+ } else {
+ str.append(ch);
+ }
+ return str.toString();
+ }
+
+ public static String padLeft(String s, int n, char c) {
+ return paddingString(s, n, c, true);
+ }
+
+ public static String padRight(String s, int n, char c) {
+ return paddingString(s, n, c, false);
+ }
+
+ public static boolean isInt(String s) {
+ try {
+ Integer.parseInt(s);
+ return true;
+ } catch (Exception ex) {
+ return false;
+ }
+ }
+
+ /**
+ * @说明:字符串编码
+ *
+ * @author leehom
+ * @param src
+ * @return
+ *
+ * 异常:
+ */
+ public static String strCoderGBk(String src) {
+ try {
+ if (src == null)
+ return null;
+ return new String(src.getBytes("ISO-8859-1"), "utf8");
+ } catch (UnsupportedEncodingException e) {
+ return null;
+ }
+ }
+
+ public static String stringFromSet(Set ss, String spliter) {
+ if (ss == null || ss.size() == 0)
+ return null;
+ StringBuffer result = new StringBuffer();
+ String[] a = ss.toArray(new String[0]);
+ //
+ result.append(a[0]);
+ for (int i = 1; i < a.length; i++) {
+ result.append(spliter);
+ result.append(a[i]);
+ }
+ return result.toString();
+ }
+
+ // 字符串->字符数组
+ public static Set str2Set(String s, String spliter) {
+ if (s == null || s.equals(""))
+ return null;
+ String[] sa = s.split(spliter);
+ List sl = Arrays.asList(sa);
+ Set set = new HashSet(sl);
+ return set;
+ }
+
+ // 字符串转换字符串数组
+ public static String[] str2Strings(String s, String spliter) {
+ if (s == null || s.equals(""))
+ return new String[0];
+ String[] sa = s.split(spliter);
+ String[] ss = new String[sa.length];
+ for(int i=0;i array2List(String[] strArry) {
+ return Arrays.asList(strArry);
+ }
+
+ public static List str2List(String s, String spliter) {
+ String[] arrayStr = str2Strings(s, spliter);
+ return Arrays.asList(arrayStr);
+ }
+
+ public static String array2Str(Object[] objs) {
+ return array2Str(objs, "\"", ",");
+ }
+
+ public static String list2Str(List