From 1dbd39e053cc5f29d217ff0e5b4d181a5de2d0b7 Mon Sep 17 00:00:00 2001 From: Liu Jianping Date: Mon, 2 Sep 2019 18:06:15 +0800 Subject: [PATCH] add gdbwriter plugin --- gdbwriter/doc/gdbwriter.md | 370 ++++++++++++++++++ gdbwriter/pom.xml | 103 +++++ gdbwriter/src/main/assembly/package.xml | 35 ++ .../plugin/writer/gdbwriter/GdbWriter.java | 251 ++++++++++++ .../writer/gdbwriter/GdbWriterErrorCode.java | 33 ++ .../datax/plugin/writer/gdbwriter/Key.java | 141 +++++++ .../gdbwriter/client/GdbGraphManager.java | 39 ++ .../gdbwriter/client/GdbWriterConfig.java | 41 ++ .../gdbwriter/mapping/DefaultGdbMapper.java | 190 +++++++++ .../writer/gdbwriter/mapping/GdbMapper.java | 17 + .../writer/gdbwriter/mapping/MappingRule.java | 41 ++ .../gdbwriter/mapping/MappingRuleFactory.java | 181 +++++++++ .../writer/gdbwriter/mapping/ValueType.java | 71 ++++ .../gdbwriter/model/AbstractGdbGraph.java | 151 +++++++ .../writer/gdbwriter/model/GdbEdge.java | 20 + .../writer/gdbwriter/model/GdbElement.java | 20 + .../writer/gdbwriter/model/GdbGraph.java | 20 + .../writer/gdbwriter/model/GdbVertex.java | 17 + .../gdbwriter/model/ScriptGdbGraph.java | 196 ++++++++++ .../writer/gdbwriter/util/ConfigHelper.java | 59 +++ .../util/GdbDuplicateIdException.java | 23 ++ gdbwriter/src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 74 ++++ pom.xml | 1 + 24 files changed, 2100 insertions(+) create mode 100644 gdbwriter/doc/gdbwriter.md create mode 100644 gdbwriter/pom.xml create mode 100644 gdbwriter/src/main/assembly/package.xml create mode 100644 gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriter.java create mode 100644 gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriterErrorCode.java create mode 100644 gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/Key.java create mode 100644 gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbGraphManager.java create mode 100644 gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbWriterConfig.java create mode 100644 gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/DefaultGdbMapper.java create mode 100644 gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/GdbMapper.java create mode 100644 gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRule.java create mode 100644 gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRuleFactory.java create mode 100644 gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/ValueType.java create mode 100644 gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/AbstractGdbGraph.java create mode 100644 gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbEdge.java create mode 100644 gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbElement.java create mode 100644 gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbGraph.java create mode 100644 gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbVertex.java create mode 100644 gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/ScriptGdbGraph.java create mode 100644 gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/ConfigHelper.java create mode 100644 gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/GdbDuplicateIdException.java create mode 100644 gdbwriter/src/main/resources/plugin.json create mode 100644 gdbwriter/src/main/resources/plugin_job_template.json diff --git a/gdbwriter/doc/gdbwriter.md b/gdbwriter/doc/gdbwriter.md new file mode 100644 index 00000000..82cdd899 --- /dev/null +++ b/gdbwriter/doc/gdbwriter.md @@ -0,0 +1,370 @@ +# DataX GDBWriter + +## 1 快速介绍 + +GDBWriter插件实现了写入数据到GDB实例的功能。GDBWriter通过`Gremlin Client`连接远程GDB实例,获取Reader的数据,生成写入DSL语句,将数据写入到GDB。 + +## 2 实现原理 + +GDBWriter通过DataX框架获取Reader生成的协议数据,使用`g.addV/E(GDB___label).property(id, GDB___id).property(GDB___PK1, GDB___PV1)...`语句写入数据到GDB实例。 + +可以配置`Gremlin Client`工作在session模式,由客户端控制事务,在一次事务中实现多个记录的批量写入。 + +## 3 功能说明 +因为GDB中点和边的配置不同,导入时需要区分点和边的配置。 + +### 3.1 点配置样例 +* 这里是一份从内存生成点数据导入GDB实例的配置 + +```json +{ + "job": { + "setting": { + "speed": { + "channel": 1 + } + }, + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "column" : [ + { + "random": "1,100", + "type": "double" + }, + { + "random": "1000,1200", + "type": "long" + }, + { + "random": "60,64", + "type": "string" + } + ], + "sliceRecordCount": 1000 + } + }, + "writer": { + "name": "gdbwriter", + "parameter": { + "host": "gdb-endpoint", + "port": 8182, + "username": "root", + "password": "***", + "writeMode": "INSERT", + "labelType": "VERTEX", + "label": "${1}", + "idTransRule": "none", + "session": true, + "maxRecordsInBatch": 64, + "column": [ + { + "name": "id", + "value": "${0}", + "type": "string", + "columnType": "primaryKey" + }, + { + "name": "vertex_propKey", + "value": "${2}", + "type": "string", + "columnType": "vertexProperty" + } + ] + } + } + } + ] + } +} + +``` +### 3.2 边配置样例 +* 这里是一份从内存生成边数据导入GDB实例的配置 +> **注意** +> 下面配置导入边时,需要提前在GDB实例中写入点,要求分别存在id为`person-{{i}}`和`book-{{i}}`的点,其中i取值0~100。 + +```json + +{ + "job": { + "setting": { + "speed": { + "channel": 1 + } + }, + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "column" : [ + { + "random": "100,200", + "type": "double" + }, + { + "random": "1,100", + "type": "long" + }, + { + "random": "1,100", + "type": "long" + }, + { + "random": "2000,2200", + "type": "long" + }, + { + "random": "60,64", + "type": "string" + } + ], + "sliceRecordCount": 1000 + } + }, + "writer": { + "name": "gdbwriter", + "parameter": { + "host": "gdb-endpoint", + "port": 8182, + "username": "root", + "password": "***", + "writeMode": "INSERT", + "labelType": "EDGE", + "label": "${3}", + "idTransRule": "none", + "srcIdTransRule": "labelPrefix", + "dstIdTransRule": "labelPrefix", + "srcLabel":"person-", + "dstLabel":"book-", + "session":false, + "column": [ + { + "name": "id", + "value": "${0}", + "type": "string", + "columnType": "primaryKey" + }, + { + "name": "id", + "value": "${1}", + "type": "string", + "columnType": "srcPrimaryKey" + }, + { + "name": "id", + "value": "${2}", + "type": "string", + "columnType": "dstPrimaryKey" + }, + { + "name": "edge_propKey", + "value": "${4}", + "type": "string", + "columnType": "edgeProperty" + } + ] + } + } + } + ] + } +} + +``` + +### 3.3 参数说明 + +* **host** + * 描述:GDB实例连接域名,对应阿里云控制台->"图数据库 GDB"->"实例管理"->"基本信息" 中的"内网地址"; + * 必选:是 + * 默认值:无 + +* **port** + * 描述:GDB实例连接端口 + * 必选:是 + * 默认值:8182 + +* **username** + * 描述:GDB实例账号名 + * 必选:是 + * 默认值:无 + +* **password** + * 描述:图实例账号名对应密码 + * 必选:是 + * 默认值:无 + +* **label** + * 描述:类型名,即点/边名称; label支持从源列中读取,如${0},表示取第一列字段作为label名。源列索引从0开始; + * 必选:是 + * 默认值:无 + +* **labelType** + * 描述:label类型; + * 枚举值"VERTEX"表示点 + * 枚举值"EDGE"表示边 + * 必选:是 + * 默认值:无 + +* **srcLabel** + * 描述:当label为边时,表示起点的点名称;srcLabel支持从源列中读取,如${0},表示取第一列字段作为label名。源列索引从0开始; + * 必选:labelType为边,srcIdTransRule为none时可不填写,否则必填; + * 默认值:无 + +* **dstLabel** + * 描述:当label为边时,表示终点的点名称;dstLabel支持从源列中读取,如${0},表示取第一列字段作为label名。源列索引从0开始; + * 必选:labelType为边,dstIdTransRule为none时可不填写,否则必填; + * 默认值:无 + +* **writeMode** + * 描述:导入id重复时的处理模式; + * 枚举值"INSERT"表示会报错,错误记录数加1; + * 枚举值"MERGE"表示更新属性值,不计入错误; + * 枚举值"SKIP"表示跳过,不计入错误 + * 必选:是 + * 默认值:INSERT + +* **idTransRule** + * 描述:主键id转换规则; + * 枚举值"labelPrefix"表示将映射的值转换为{label名}{源字段} + * 枚举值"none"表示映射的值不做转换 + * 必选:是 + * 默认值:"none" + +* **srcIdTransRule** + * 描述:当label为边时,表示起点的主键id转换规则; + * 枚举值"labelPrefix"表示映射的值转换为为{label名}{源字段} + * 枚举值"none"表示映射的值不做转换,此时srcLabel 可不填写 + * 必选:label为边时必选 + * 默认值:"none" + +* **dstIdTransRule** + * 描述:当label为边时,表示终点的主键id转换规则; + * 枚举值"labelPrefix"表示映射的值转换为为{label名}{源字段} + * 枚举值"none"表示映射的值不做转换,此时dstLabel 可不填写 + * 必选:label为边时必选 + * 默认值:"none" + +* **session** + * 描述:是否使用`Gremlin Client`的session模式写入数据 + * 必选:否 + * 默认值:false + +* **maxRecordsInBatch** + * 描述:使用`Gremlin Client`的session模式时,一次事务处理的记录数 + * 必选:否 + * 默认值:16 + +* **column** + * 描述:点/边字段映射关系配置 + * 必选:是 + * 默认值:无 + +* **column -> name** + * 描述:点/边映射关系的字段名 + * 必选:是 + * 默认值:无 + +* **column -> value** + * 描述:点/边映射关系的字段值; + * ${N}表示直接映射源端值,N为源端column索引,从0开始;${0}表示映射源端column第1个字段; + * test-${0} 表示源端值做拼接转换,${0}值前/后可添加固定字符串; + * ${0}-${1}表示做多字段拼接,也可在任意位置添加固定字符串,如test-${0}-test1-${1}-test2 + * 必选:是 + * 默认值:无 + +* **column -> type** + * 描述:点/边映射关系的字段值类型; + * 主键id只支持string类型,GDBWriter插件会强制转换,源id必须保证可转换为string; + * 普通属性支持类型:int, long, float, double, boolean, string + * 必选:是 + * 默认值:无 + +* **column -> columnType** + * 描述:点/边映射关系字段对应到GDB点/边数据的类型,支持以下几类枚举值: + * 公共枚举值: + * primaryKey:表示该字段是主键id + * 点枚举值: + * vertexProperty:labelType为点时,表示该字段是点的普通属性 + * vertexJsonProperty:labelType为点时,表示是点json属性,value结构请见备注**json properties示例**,点配置最多只允许出现一个json属性; + * 边枚举值: + * srcPrimaryKey:labelType为边时,表示该字段是起点主键id + * dstPrimaryKey:labelType为边时,表示该字段是终点主键id + * edgeProperty:labelType为边时,表示该字段是边的普通属性 + * edgeJsonProperty:labelType为边时,表示是边json属性,value结构请见备注**json properties示例**,边配置最多只允许出现一个json属性; + * 必选:是 + * 默认值:无 + * 备注:**json properties示例** + > ```json + > {"properties":[ + > {"k":"name","t":"string","v":"tom"}, + > {"k":"age","t":"int","v":"20"}, + > {"k":"sex","t":"string","v":"male"} + > ]} + > ``` + +## 4 性能报告 +### 4.1 环境参数 +GDB实例规格 +- 16core 128GB, 1TB SSD + +DataX压测机器 +- cpu: 4 * Intel(R) Xeon(R) Platinum 8163 CPU @ 2.50GHz +- mem: 16GB +- net: 千兆双网卡 +- os: CentOS 7, 3.10.0-957.5.1.el7.x86_64 +- jvm: -Xms4g -Xmx4g + +### 4.2 数据特征 + +``` +{ + id: random double(1~10000) + from: random long(1~40000000) + to: random long(1~40000000) + label: random long(20000000 ~ 20005000) + propertyKey: random string(len: 120~128) + propertyName: random string(len: 120~128) +} +``` +- 点/边都有一个属性,属性key和value都是长度120~128字节的随机字符串 +- label是范围20000000 ~ 20005000的随机整数转换的字符串 +- id是浮点数转换的字符串,防止重复 +- 边包含关联起点和终点,测试边时已经提前导入twitter数据集的点数据(4200W) + +### 4.3 任务配置 +分点和边的配置,具体配置与上述的示例配置相似,下面列出关键的差异点 + +- 增加并发任务数量 +> "channel": 32 + +- 使用session模式 +> "session": true + +- 增加事务批量处理记录个数 +> "maxRecordsInBatch": 128 + +### 4.4 测试结果 + +点导入性能: +- 任务平均流量: 4.07MB/s +- 任务总计耗时: 412s +- 记录写入速度: 15609rec/s +- 读出记录总数: 6400000 + +边导入性能: +- 任务平均流量: 2.76MB/s +- 任务总计耗时: 1602s +- 记录写入速度: 10000rec/s +- 读出记录总数: 16000000 + +## 5 约束限制 +- 导入边记录前要求GDB中已经存在边关联的起点/终点 +- GDBWriter插件与用户查询DSL使用相同的GDB实例端口,导入时可能会影响查询性能 + +## FAQ +无 diff --git a/gdbwriter/pom.xml b/gdbwriter/pom.xml new file mode 100644 index 00000000..a5d3f319 --- /dev/null +++ b/gdbwriter/pom.xml @@ -0,0 +1,103 @@ + + + 4.0.0 + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + + gdbwriter + gdbwriter + jar + + + 1.8 + 1.8 + 3.4.1 + + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + com.alibaba.datax + datax-core + ${datax-project-version} + test + + + slf4j-log4j12 + org.slf4j + + + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + org.apache.tinkerpop + gremlin-driver + ${gremlin.version} + + + org.projectlombok + lombok + 1.18.8 + + + com.github.ben-manes.caffeine + caffeine + 2.4.0 + + + + + + + + maven-compiler-plugin + + 1.8 + 1.8 + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/gdbwriter/src/main/assembly/package.xml b/gdbwriter/src/main/assembly/package.xml new file mode 100644 index 00000000..c0bc26fd --- /dev/null +++ b/gdbwriter/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/gdbwriter + + + target/ + + gdbwriter-0.0.1-SNAPSHOT.jar + + plugin/writer/gdbwriter + + + + + + false + plugin/writer/gdbwriter/libs + runtime + + + diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriter.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriter.java new file mode 100644 index 00000000..753d89fc --- /dev/null +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriter.java @@ -0,0 +1,251 @@ +package com.alibaba.datax.plugin.writer.gdbwriter; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import java.util.function.Function; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.gdbwriter.client.GdbGraphManager; +import com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig; +import com.alibaba.datax.plugin.writer.gdbwriter.mapping.DefaultGdbMapper; +import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRule; +import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRuleFactory; +import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement; +import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbGraph; + +import groovy.lang.Tuple2; +import io.netty.util.concurrent.DefaultThreadFactory; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GdbWriter extends Writer { + private static final Logger log = LoggerFactory.getLogger(GdbWriter.class); + + private static Function mapper = null; + private static GdbGraph globalGraph = null; + private static boolean session = false; + + /** + * Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。 + *

+ * 整个 Writer 执行流程是: + *

+     * Job类init-->prepare-->split
+     *
+     *                          Task类init-->prepare-->startWrite-->post-->destroy
+     *                          Task类init-->prepare-->startWrite-->post-->destroy
+     *
+     *                                                                            Job类post-->destroy
+     * 
+ */ + public static class Job extends Writer.Job { + private static final Logger LOG = LoggerFactory + .getLogger(Job.class); + + private Configuration jobConfig = null; + + @Override + public void init() { + LOG.info("GDB datax plugin writer job init begin ..."); + this.jobConfig = getPluginJobConf(); + GdbWriterConfig.of(this.jobConfig); + LOG.info("GDB datax plugin writer job init end."); + + /** + * 注意:此方法仅执行一次。 + * 最佳实践:通常在这里对用户的配置进行校验:是否缺失必填项?有无错误值?有没有无关配置项?... + * 并给出清晰的报错/警告提示。校验通常建议采用静态工具类进行,以保证本类结构清晰。 + */ + } + + @Override + public void prepare() { + /** + * 注意:此方法仅执行一次。 + * 最佳实践:如果 Job 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。 + */ + super.prepare(); + + MappingRule rule = MappingRuleFactory.getInstance().createV2(jobConfig); + + mapper = new DefaultGdbMapper().getMapper(rule); + session = jobConfig.getBool(Key.SESSION_STATE, false); + + /** + * client connect check before task + */ + try { + globalGraph = GdbGraphManager.instance().getGraph(jobConfig, false); + } catch (RuntimeException e) { + throw DataXException.asDataXException(GdbWriterErrorCode.FAIL_CLIENT_CONNECT, e.getMessage()); + } + } + + @Override + public List split(int mandatoryNumber) { + /** + * 注意:此方法仅执行一次。 + * 最佳实践:通常采用工具静态类完成把 Job 配置切分成多个 Task 配置的工作。 + * 这里的 mandatoryNumber 是强制必须切分的份数。 + */ + LOG.info("split begin..."); + List configurationList = new ArrayList(); + for (int i = 0; i < mandatoryNumber; i++) { + configurationList.add(this.jobConfig.clone()); + } + LOG.info("split end..."); + return configurationList; + } + + @Override + public void post() { + /** + * 注意:此方法仅执行一次。 + * 最佳实践:如果 Job 中有需要进行数据同步之后的后续处理,可以在此处完成。 + */ + globalGraph.close(); + } + + @Override + public void destroy() { + /** + * 注意:此方法仅执行一次。 + * 最佳实践:通常配合 Job 中的 post() 方法一起完成 Job 的资源释放。 + */ + } + + } + + @Slf4j + public static class Task extends Writer.Task { + + private Configuration taskConfig; + + private int failed = 0; + private int batchRecords; + private ExecutorService submitService = null; + private GdbGraph graph; + + @Override + public void init() { + /** + * 注意:此方法每个 Task 都会执行一次。 + * 最佳实践:此处通过对 taskConfig 配置的读取,进而初始化一些资源为 startWrite()做准备。 + */ + this.taskConfig = super.getPluginJobConf(); + batchRecords = taskConfig.getInt(Key.MAX_RECORDS_IN_BATCH, GdbWriterConfig.DEFAULT_RECORD_NUM_IN_BATCH); + submitService = new ThreadPoolExecutor(1, 1, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(), new DefaultThreadFactory("submit-dsl")); + + if (!session) { + graph = globalGraph; + } else { + /** + * 分批创建session client,由于服务端groovy编译性能的限制 + */ + try { + Thread.sleep((getTaskId()/10)*10000); + } catch (Exception e) { + // ... + } + graph = GdbGraphManager.instance().getGraph(taskConfig, session); + } + } + + @Override + public void prepare() { + /** + * 注意:此方法每个 Task 都会执行一次。 + * 最佳实践:如果 Task 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。 + */ + super.prepare(); + } + + @Override + public void startWrite(RecordReceiver recordReceiver) { + /** + * 注意:此方法每个 Task 都会执行一次。 + * 最佳实践:此处适当封装确保简洁清晰完成数据写入工作。 + */ + Record r; + Future future = null; + List> records = new ArrayList<>(batchRecords); + + while ((r = recordReceiver.getFromReader()) != null) { + records.add(new Tuple2<>(r, mapper.apply(r))); + + if (records.size() >= batchRecords) { + wait4Submit(future); + + final List> batch = records; + future = submitService.submit(() -> batchCommitRecords(batch)); + records = new ArrayList<>(batchRecords); + } + } + + wait4Submit(future); + if (!records.isEmpty()) { + final List> batch = records; + future = submitService.submit(() -> batchCommitRecords(batch)); + wait4Submit(future); + } + } + + private void wait4Submit(Future future) { + if (future == null) { + return; + } + + try { + future.get(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private boolean batchCommitRecords(final List> records) { + TaskPluginCollector collector = getTaskPluginCollector(); + try { + List> errors = graph.add(records); + errors.forEach(t -> collector.collectDirtyRecord(t.getFirst(), t.getSecond())); + failed += errors.size(); + } catch (Exception e) { + records.forEach(t -> collector.collectDirtyRecord(t.getFirst(), e)); + failed += records.size(); + } + + records.clear(); + return true; + } + + @Override + public void post() { + /** + * 注意:此方法每个 Task 都会执行一次。 + * 最佳实践:如果 Task 中有需要进行数据同步之后的后续处理,可以在此处完成。 + */ + log.info("Task done, dirty record count - {}", failed); + } + + @Override + public void destroy() { + /** + * 注意:此方法每个 Task 都会执行一次。 + * 最佳实践:通常配合Task 中的 post() 方法一起完成 Task 的资源释放。 + */ + if (session) { + graph.close(); + } + submitService.shutdown(); + } + + } + +} \ No newline at end of file diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriterErrorCode.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriterErrorCode.java new file mode 100644 index 00000000..a6f506ef --- /dev/null +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriterErrorCode.java @@ -0,0 +1,33 @@ +package com.alibaba.datax.plugin.writer.gdbwriter; + +import com.alibaba.datax.common.spi.ErrorCode; + +public enum GdbWriterErrorCode implements ErrorCode { + BAD_CONFIG_VALUE("GdbWriter-00", "您配置的值不合法."), + CONFIG_ITEM_MISS("GdbWriter-01", "您配置项缺失."), + FAIL_CLIENT_CONNECT("GdbWriter-02", "GDB连接异常."),; + + private final String code; + private final String description; + + private GdbWriterErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s]. ", this.code, + this.description); + } +} \ No newline at end of file diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/Key.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/Key.java new file mode 100644 index 00000000..f2e37005 --- /dev/null +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/Key.java @@ -0,0 +1,141 @@ +package com.alibaba.datax.plugin.writer.gdbwriter; + +public final class Key { + + /** + * 此处声明插件用到的需要插件使用者提供的配置项 + */ + + public final static String HOST = "host"; + public final static String PORT = "port"; + public final static String USERNAME = "username"; + public static final String PASSWORD = "password"; + + /** + * import type and mode + */ + public static final String IMPORT_TYPE = "labelType"; + public static final String UPDATE_MODE = "writeMode"; + + /** + * label prefix issue + */ + public static final String ID_TRANS_RULE = "idTransRule"; + public static final String SRC_ID_TRANS_RULE = "srcIdTransRule"; + public static final String DST_ID_TRANS_RULE = "dstIdTransRule"; + + public static final String LABEL = "label"; + public static final String SRC_LABEL = "srcLabel"; + public static final String DST_LABEL = "dstLabel"; + + public static final String MAPPING = "mapping"; + + /** + * column define in Gdb + */ + public static final String COLUMN = "column"; + public static final String COLUMN_NAME = "name"; + public static final String COLUMN_VALUE = "value"; + public static final String COLUMN_TYPE = "type"; + public static final String COLUMN_NODE_TYPE = "columnType"; + + /** + * Gdb Vertex/Edge elements + */ + public static final String ID = "id"; + public static final String FROM = "from"; + public static final String TO = "to"; + public static final String PROPERTIES = "properties"; + public static final String PROP_KEY = "name"; + public static final String PROP_VALUE = "value"; + public static final String PROP_TYPE = "type"; + + public static final String PROPERTIES_JSON_STR = "propertiesJsonStr"; + public static final String MAX_PROPERTIES_BATCH_NUM = "maxPropertiesBatchNumber"; + + /** + * session less client configure for connect pool + */ + public static final String MAX_IN_PROCESS_PER_CONNECTION = "maxInProcessPerConnection"; + public static final String MAX_CONNECTION_POOL_SIZE = "maxConnectionPoolSize"; + public static final String MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = "maxSimultaneousUsagePerConnection"; + + public static final String MAX_RECORDS_IN_BATCH = "maxRecordsInBatch"; + public static final String SESSION_STATE = "session"; + + public static enum ImportType { + /** + * Import vertices + */ + VERTEX, + /** + * Import edges + */ + EDGE; + } + + public static enum UpdateMode { + /** + * Insert new records, fail if exists + */ + INSERT, + /** + * Skip this record if exists + */ + SKIP, + /** + * Update property of this record if exists + */ + MERGE; + } + + public static enum ColumnType { + /** + * vertex or edge id + */ + primaryKey, + + /** + * vertex property + */ + vertexProperty, + + /** + * start vertex id of edge + */ + srcPrimaryKey, + + /** + * end vertex id of edge + */ + dstPrimaryKey, + + /** + * edge property + */ + edgeProperty, + + /** + * vertex json style property + */ + vertexJsonProperty, + + /** + * edge json style property + */ + edgeJsonProperty + } + + public static enum IdTransRule { + /** + * vertex or edge id with 'label' prefix + */ + labelPrefix, + + /** + * vertex or edge id raw + */ + none + } + +} diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbGraphManager.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbGraphManager.java new file mode 100644 index 00000000..ac06013c --- /dev/null +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbGraphManager.java @@ -0,0 +1,39 @@ +/** + * + */ +package com.alibaba.datax.plugin.writer.gdbwriter.client; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbGraph; +import com.alibaba.datax.plugin.writer.gdbwriter.model.ScriptGdbGraph; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author jerrywang + * + */ +public class GdbGraphManager implements AutoCloseable { + private static final GdbGraphManager instance = new GdbGraphManager(); + + private List graphs = new ArrayList<>(); + + public static GdbGraphManager instance() { + return instance; + } + + public GdbGraph getGraph(Configuration config, boolean session) { + GdbGraph graph = new ScriptGdbGraph(config, session); + graphs.add(graph); + return graph; + } + + @Override + public void close() { + for(GdbGraph graph : graphs) { + graph.close(); + } + graphs.clear(); + } +} diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbWriterConfig.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbWriterConfig.java new file mode 100644 index 00000000..0266a010 --- /dev/null +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbWriterConfig.java @@ -0,0 +1,41 @@ +/** + * + */ +package com.alibaba.datax.plugin.writer.gdbwriter.client; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.gdbwriter.Key; + +import static com.alibaba.datax.plugin.writer.gdbwriter.util.ConfigHelper.*; + +/** + * @author jerrywang + * + */ +public class GdbWriterConfig { + public static final int DEFAULT_MAX_IN_PROCESS_PER_CONNECTION = 4; + public static final int DEFAULT_MAX_CONNECTION_POOL_SIZE = 8; + public static final int DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = 8; + public static final int DEFAULT_BATCH_PROPERTY_NUM = 30; + public static final int DEFAULT_RECORD_NUM_IN_BATCH = 16; + + private Configuration config; + + private GdbWriterConfig(Configuration config) { + this.config = config; + + validate(); + } + + private void validate() { + assertHasContent(config, Key.HOST); + assertConfig(Key.PORT, () -> config.getInt(Key.PORT) > 0); + + assertHasContent(config, Key.USERNAME); + assertHasContent(config, Key.PASSWORD); + } + + public static GdbWriterConfig of(Configuration config) { + return new GdbWriterConfig(config); + } +} diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/DefaultGdbMapper.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/DefaultGdbMapper.java new file mode 100644 index 00000000..f5957295 --- /dev/null +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/DefaultGdbMapper.java @@ -0,0 +1,190 @@ +/** + * + */ +package com.alibaba.datax.plugin.writer.gdbwriter.mapping; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.datax.plugin.writer.gdbwriter.Key; +import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbEdge; +import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement; +import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbVertex; + +import lombok.extern.slf4j.Slf4j; + +import static com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType.VERTEX; + +/** + * @author jerrywang + * + */ +@Slf4j +public class DefaultGdbMapper implements GdbMapper { + private static final Pattern STR_PATTERN = Pattern.compile("\\$\\{(\\d+)}"); + private static final Pattern NORMAL_PATTERN = Pattern.compile("^\\$\\{(\\d+)}$"); + + @Override + public Function getMapper(MappingRule rule) { + return r -> { + GdbElement e = (rule.getImportType() == VERTEX) ? new GdbVertex() : new GdbEdge(); + forElement(rule).accept(r, e); + return e; + }; + } + + private static BiConsumer forElement(MappingRule rule) { + List> properties = new ArrayList<>(); + for (MappingRule.PropertyMappingRule propRule : rule.getProperties()) { + Function keyFunc = forStrColumn(propRule.getKey()); + + if (propRule.getValueType() == ValueType.STRING) { + final Function valueFunc = forStrColumn(propRule.getValue()); + properties.add((r, e) -> { + String k = keyFunc.apply(r); + String v = valueFunc.apply(r); + if (k != null && v != null) { + e.getProperties().put(k, v); + } + }); + } else { + final Function valueFunc = forObjColumn(propRule.getValue(), propRule.getValueType()); + properties.add((r, e) -> { + String k = keyFunc.apply(r); + Object v = valueFunc.apply(r); + if (k != null && v != null) { + e.getProperties().put(k, v); + } + }); + } + } + + if (rule.getPropertiesJsonStr() != null) { + Function jsonFunc = forStrColumn(rule.getPropertiesJsonStr()); + properties.add((r, e) -> { + String propertiesStr = jsonFunc.apply(r); + JSONObject root = (JSONObject)JSONObject.parse(propertiesStr); + JSONArray propertiesList = root.getJSONArray("properties"); + + for (Object object : propertiesList) { + JSONObject jsonObject = (JSONObject)object; + String key = jsonObject.getString("k"); + String name = jsonObject.getString("v"); + String type = jsonObject.getString("t"); + + if (key == null || name == null) { + continue; + } + addToProperties(e, key, name, type); + } + }); + } + + BiConsumer ret = (r, e) -> { + String label = forStrColumn(rule.getLabel()).apply(r); + String id = forStrColumn(rule.getId()).apply(r); + + if (rule.getImportType() == Key.ImportType.EDGE) { + String to = forStrColumn(rule.getTo()).apply(r); + String from = forStrColumn(rule.getFrom()).apply(r); + if (to == null || from == null) { + log.error("invalid record to: {} , from: {}", to, from); + throw new IllegalArgumentException("to or from missed in edge"); + } + ((GdbEdge)e).setTo(to); + ((GdbEdge)e).setFrom(from); + + // generate UUID for edge + if (id == null) { + id = UUID.randomUUID().toString(); + } + } + + if (id == null || label == null) { + log.error("invalid record id: {} , label: {}", id, label); + throw new IllegalArgumentException("id or label missed"); + } + + e.setId(id); + e.setLabel(label); + + properties.forEach(p -> p.accept(r, e)); + }; + return ret; + } + + static Function forObjColumn(String rule, ValueType type) { + Matcher m = NORMAL_PATTERN.matcher(rule); + if (m.matches()) { + int index = Integer.valueOf(m.group(1)); + return r -> type.applyColumn(r.getColumn(index)); + } else { + return r -> type.fromStrFunc(rule); + } + } + + static Function forStrColumn(String rule) { + List> list = new ArrayList<>(); + Matcher m = STR_PATTERN.matcher(rule); + int last = 0; + while (m.find()) { + String index = m.group(1); + // as simple integer index. + int i = Integer.parseInt(index); + + final int tmp = last; + final int start = m.start(); + list.add((sb, record) -> { + sb.append(rule.subSequence(tmp, start)); + if(record.getColumn(i) != null && record.getColumn(i).getByteSize() > 0) { + sb.append(record.getColumn(i).asString()); + } + }); + + last = m.end(); + } + + final int tmp = last; + list.add((sb, record) -> { + sb.append(rule.subSequence(tmp, rule.length())); + }); + + return r -> { + StringBuilder sb = new StringBuilder(); + list.forEach(c -> c.accept(sb, r)); + String res = sb.toString(); + return res.isEmpty() ? null : res; + }; + } + + static boolean addToProperties(GdbElement e, String key, String value, String type) { + ValueType valueType = ValueType.fromShortName(type); + + if(valueType == ValueType.STRING) { + e.getProperties().put(key, value); + } else if (valueType == ValueType.INT) { + e.getProperties().put(key, Integer.valueOf(value)); + } else if (valueType == ValueType.LONG) { + e.getProperties().put(key, Long.valueOf(value)); + } else if (valueType == ValueType.DOUBLE) { + e.getProperties().put(key, Double.valueOf(value)); + } else if (valueType == ValueType.FLOAT) { + e.getProperties().put(key, Float.valueOf(value)); + } else if (valueType == ValueType.BOOLEAN) { + e.getProperties().put(key, Boolean.valueOf(value)); + } else { + log.error("invalid property key {}, value {}, type {}", key, value, type); + return false; + } + + return true; + } +} diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/GdbMapper.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/GdbMapper.java new file mode 100644 index 00000000..3282f203 --- /dev/null +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/GdbMapper.java @@ -0,0 +1,17 @@ +/** + * + */ +package com.alibaba.datax.plugin.writer.gdbwriter.mapping; + +import java.util.function.Function; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement; + +/** + * @author jerrywang + * + */ +public interface GdbMapper { + Function getMapper(MappingRule rule); +} diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRule.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRule.java new file mode 100644 index 00000000..c0c58d88 --- /dev/null +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRule.java @@ -0,0 +1,41 @@ +/** + * + */ +package com.alibaba.datax.plugin.writer.gdbwriter.mapping; + +import java.util.ArrayList; +import java.util.List; + +import com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType; + +import lombok.Data; + +/** + * @author jerrywang + * + */ +@Data +public class MappingRule { + private String id = null; + + private String label = null; + + private ImportType importType = null; + + private String from = null; + + private String to = null; + + private List properties = new ArrayList<>(); + + private String propertiesJsonStr = null; + + @Data + public static class PropertyMappingRule { + private String key = null; + + private String value = null; + + private ValueType valueType = null; + } +} diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRuleFactory.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRuleFactory.java new file mode 100644 index 00000000..0738ac17 --- /dev/null +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRuleFactory.java @@ -0,0 +1,181 @@ +/** + * + */ +package com.alibaba.datax.plugin.writer.gdbwriter.mapping; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.gdbwriter.GdbWriterErrorCode; +import com.alibaba.datax.plugin.writer.gdbwriter.Key; +import com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType; +import com.alibaba.datax.plugin.writer.gdbwriter.Key.IdTransRule; +import com.alibaba.datax.plugin.writer.gdbwriter.Key.ColumnType; +import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRule.PropertyMappingRule; +import com.alibaba.datax.plugin.writer.gdbwriter.util.ConfigHelper; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +/** + * @author jerrywang + * + */ +@Slf4j +public class MappingRuleFactory { + private static final MappingRuleFactory instance = new MappingRuleFactory(); + + public static final MappingRuleFactory getInstance() { + return instance; + } + + @Deprecated + public MappingRule create(Configuration config, ImportType type) { + MappingRule rule = new MappingRule(); + rule.setId(config.getString(Key.ID)); + rule.setLabel(config.getString(Key.LABEL)); + if (type == ImportType.EDGE) { + rule.setFrom(config.getString(Key.FROM)); + rule.setTo(config.getString(Key.TO)); + } + + rule.setImportType(type); + + List configurations = config.getListConfiguration(Key.PROPERTIES); + if (configurations != null) { + for (Configuration prop : config.getListConfiguration(Key.PROPERTIES)) { + PropertyMappingRule propRule = new PropertyMappingRule(); + propRule.setKey(prop.getString(Key.PROP_KEY)); + propRule.setValue(prop.getString(Key.PROP_VALUE)); + propRule.setValueType(ValueType.fromShortName(prop.getString(Key.PROP_TYPE).toLowerCase())); + rule.getProperties().add(propRule); + } + } + + String propertiesJsonStr = config.getString(Key.PROPERTIES_JSON_STR, null); + if (propertiesJsonStr != null) { + rule.setPropertiesJsonStr(propertiesJsonStr); + } + + return rule; + } + + public MappingRule createV2(Configuration config) { + try { + ImportType type = ImportType.valueOf(config.getString(Key.IMPORT_TYPE)); + return createV2(config, type); + } catch (NullPointerException e) { + throw DataXException.asDataXException(GdbWriterErrorCode.CONFIG_ITEM_MISS, Key.IMPORT_TYPE); + } catch (IllegalArgumentException e) { + throw DataXException.asDataXException(GdbWriterErrorCode.BAD_CONFIG_VALUE, Key.IMPORT_TYPE); + } + } + + public MappingRule createV2(Configuration config, ImportType type) { + MappingRule rule = new MappingRule(); + + ConfigHelper.assertHasContent(config, Key.LABEL); + rule.setLabel(config.getString(Key.LABEL)); + rule.setImportType(type); + + IdTransRule srcTransRule = IdTransRule.none; + IdTransRule dstTransRule = IdTransRule.none; + if (type == ImportType.EDGE) { + ConfigHelper.assertHasContent(config, Key.SRC_ID_TRANS_RULE); + ConfigHelper.assertHasContent(config, Key.DST_ID_TRANS_RULE); + + srcTransRule = IdTransRule.valueOf(config.getString(Key.SRC_ID_TRANS_RULE)); + dstTransRule = IdTransRule.valueOf(config.getString(Key.DST_ID_TRANS_RULE)); + + if (srcTransRule == IdTransRule.labelPrefix) { + ConfigHelper.assertHasContent(config, Key.SRC_LABEL); + } + + if (dstTransRule == IdTransRule.labelPrefix) { + ConfigHelper.assertHasContent(config, Key.DST_LABEL); + } + } + ConfigHelper.assertHasContent(config, Key.ID_TRANS_RULE); + IdTransRule transRule = IdTransRule.valueOf(config.getString(Key.ID_TRANS_RULE)); + + List configurationList = config.getListConfiguration(Key.COLUMN); + ConfigHelper.assertConfig(Key.COLUMN, () -> (configurationList != null && !configurationList.isEmpty())); + for (Configuration column : configurationList) { + ConfigHelper.assertHasContent(column, Key.COLUMN_NAME); + ConfigHelper.assertHasContent(column, Key.COLUMN_VALUE); + ConfigHelper.assertHasContent(column, Key.COLUMN_TYPE); + ConfigHelper.assertHasContent(column, Key.COLUMN_NODE_TYPE); + + String columnValue = column.getString(Key.COLUMN_VALUE); + ColumnType columnType = ColumnType.valueOf(column.getString(Key.COLUMN_NODE_TYPE)); + if (columnValue == null || columnValue.isEmpty()) { + // only allow edge empty id + ConfigHelper.assertConfig("empty column value", + () -> (type == ImportType.EDGE && columnType == ColumnType.primaryKey)); + } + + if (columnType == ColumnType.primaryKey) { + ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE)); + ConfigHelper.assertConfig("only string is allowed in primary key", () -> (propType == ValueType.STRING)); + + if (transRule == IdTransRule.labelPrefix) { + rule.setId(config.getString(Key.LABEL) + columnValue); + } else { + rule.setId(columnValue); + } + } else if (columnType == ColumnType.edgeJsonProperty || columnType == ColumnType.vertexJsonProperty) { + // only support one json property in column + ConfigHelper.assertConfig("multi JsonProperty", () -> (rule.getPropertiesJsonStr() == null)); + + rule.setPropertiesJsonStr(columnValue); + } else if (columnType == ColumnType.vertexProperty || columnType == ColumnType.edgeProperty) { + PropertyMappingRule propertyMappingRule = new PropertyMappingRule(); + + propertyMappingRule.setKey(column.getString(Key.COLUMN_NAME)); + propertyMappingRule.setValue(columnValue); + ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE)); + ConfigHelper.assertConfig("unsupported property type", () -> propType != null); + + propertyMappingRule.setValueType(propType); + rule.getProperties().add(propertyMappingRule); + } else if (columnType == ColumnType.srcPrimaryKey) { + if (type != ImportType.EDGE) { + continue; + } + + ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE)); + ConfigHelper.assertConfig("only string is allowed in primary key", () -> (propType == ValueType.STRING)); + + if (srcTransRule == IdTransRule.labelPrefix) { + rule.setFrom(config.getString(Key.SRC_LABEL) + columnValue); + } else { + rule.setFrom(columnValue); + } + } else if (columnType == ColumnType.dstPrimaryKey) { + if (type != ImportType.EDGE) { + continue; + } + + ValueType propType = ValueType.fromShortName(column.getString(Key.COLUMN_TYPE)); + ConfigHelper.assertConfig("only string is allowed in primary key", () -> (propType == ValueType.STRING)); + + if (dstTransRule == IdTransRule.labelPrefix) { + rule.setTo(config.getString(Key.DST_LABEL) + columnValue); + } else { + rule.setTo(columnValue); + } + } + } + + if (rule.getImportType() == ImportType.EDGE) { + if (rule.getId() == null) { + rule.setId(""); + log.info("edge id is missed, uuid be default"); + } + ConfigHelper.assertConfig("to needed in edge", () -> (rule.getTo() != null)); + ConfigHelper.assertConfig("from needed in edge", () -> (rule.getFrom() != null)); + } + ConfigHelper.assertConfig("id needed", () -> (rule.getId() != null)); + + return rule; + } +} diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/ValueType.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/ValueType.java new file mode 100644 index 00000000..9ad8bd8d --- /dev/null +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/ValueType.java @@ -0,0 +1,71 @@ +/** + * + */ +package com.alibaba.datax.plugin.writer.gdbwriter.mapping; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +import com.alibaba.datax.common.element.Column; +import lombok.extern.slf4j.Slf4j; + +/** + * @author jerrywang + * + */ +@Slf4j +public enum ValueType { + INT(Integer.class, "int", Column::asLong, Integer::valueOf), + LONG(Long.class, "long", Column::asLong, Long::valueOf), + DOUBLE(Double.class, "double", Column::asDouble, Double::valueOf), + FLOAT(Float.class, "float", Column::asDouble, Float::valueOf), + BOOLEAN(Boolean.class, "boolean", Column::asBoolean, Boolean::valueOf), + STRING(String.class, "string", Column::asString, String::valueOf); + + private Class type = null; + private String shortName = null; + private Function columnFunc = null; + private Function fromStrFunc = null; + + private ValueType(Class type, String name, Function columnFunc, Function fromStrFunc) { + this.type = type; + this.shortName = name; + this.columnFunc = columnFunc; + this.fromStrFunc = fromStrFunc; + + ValueTypeHolder.shortName2type.put(name, this); + } + + public static ValueType fromShortName(String name) { + return ValueTypeHolder.shortName2type.get(name); + } + + public Class type() { + return this.type; + } + + public String shortName() { + return this.shortName; + } + + public Object applyColumn(Column column) { + try { + if (column == null) { + return null; + } + return columnFunc.apply(column); + } catch (Exception e) { + log.error("applyColumn error {}, column {}", e.toString(), column); + throw e; + } + } + + public Object fromStrFunc(String str) { + return fromStrFunc.apply(str); + } + + private static class ValueTypeHolder { + private static Map shortName2type = new HashMap<>(); + } +} diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/AbstractGdbGraph.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/AbstractGdbGraph.java new file mode 100644 index 00000000..0c31c644 --- /dev/null +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/AbstractGdbGraph.java @@ -0,0 +1,151 @@ +/** + * + */ +package com.alibaba.datax.plugin.writer.gdbwriter.model; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.gdbwriter.Key; +import com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig; + +import lombok.extern.slf4j.Slf4j; +import org.apache.tinkerpop.gremlin.driver.Client; +import org.apache.tinkerpop.gremlin.driver.Cluster; +import org.apache.tinkerpop.gremlin.driver.RequestOptions; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.ser.Serializers; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * @author jerrywang + * + */ +@Slf4j +public abstract class AbstractGdbGraph implements GdbGraph { + private final static int DEFAULT_TIMEOUT = 30000; + + protected Client client = null; + protected Key.UpdateMode updateMode = Key.UpdateMode.INSERT; + protected int propertiesBatchNum = GdbWriterConfig.DEFAULT_BATCH_PROPERTY_NUM; + protected boolean session = false; + + + protected AbstractGdbGraph() {} + + protected AbstractGdbGraph(Configuration config, boolean session) { + initClient(config, session); + } + + protected void initClient(Configuration config, boolean session) { + updateMode = Key.UpdateMode.valueOf(config.getString(Key.UPDATE_MODE, "INSERT")); + log.info("init graphdb client"); + String host = config.getString(Key.HOST); + int port = config.getInt(Key.PORT); + String username = config.getString(Key.USERNAME); + String password = config.getString(Key.PASSWORD); + int maxDepthPerConnection = config.getInt(Key.MAX_IN_PROCESS_PER_CONNECTION, + GdbWriterConfig.DEFAULT_MAX_IN_PROCESS_PER_CONNECTION); + + int maxConnectionPoolSize = config.getInt(Key.MAX_CONNECTION_POOL_SIZE, + GdbWriterConfig.DEFAULT_MAX_CONNECTION_POOL_SIZE); + + int maxSimultaneousUsagePerConnection = config.getInt(Key.MAX_SIMULTANEOUS_USAGE_PER_CONNECTION, + GdbWriterConfig.DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION); + + this.session = session; + if (this.session) { + maxConnectionPoolSize = GdbWriterConfig.DEFAULT_MAX_CONNECTION_POOL_SIZE; + maxDepthPerConnection = GdbWriterConfig.DEFAULT_MAX_IN_PROCESS_PER_CONNECTION; + maxSimultaneousUsagePerConnection = GdbWriterConfig.DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION; + } + + try { + Cluster cluster = Cluster.build(host).port(port).credentials(username, password) + .serializer(Serializers.GRAPHBINARY_V1D0) + .maxContentLength(1048576) + .maxInProcessPerConnection(maxDepthPerConnection) + .minInProcessPerConnection(0) + .maxConnectionPoolSize(maxConnectionPoolSize) + .minConnectionPoolSize(maxConnectionPoolSize) + .maxSimultaneousUsagePerConnection(maxSimultaneousUsagePerConnection) + .resultIterationBatchSize(64) + .create(); + client = session ? cluster.connect(UUID.randomUUID().toString()).init() : cluster.connect().init(); + warmClient(maxConnectionPoolSize*maxDepthPerConnection); + } catch (RuntimeException e) { + log.error("Failed to connect to GDB {}:{}, due to {}", host, port, e); + throw e; + } + + propertiesBatchNum = config.getInt(Key.MAX_PROPERTIES_BATCH_NUM, GdbWriterConfig.DEFAULT_BATCH_PROPERTY_NUM); + } + + + /** + * @param dsl + * @param parameters + */ + protected void runInternal(String dsl, final Map parameters) throws Exception { + RequestOptions.Builder options = RequestOptions.build().timeout(DEFAULT_TIMEOUT); + if (parameters != null && !parameters.isEmpty()) { + parameters.forEach(options::addParameter); + } + + ResultSet results = client.submitAsync(dsl, options.create()).get(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS); + results.all().get(DEFAULT_TIMEOUT + 1000, TimeUnit.MILLISECONDS); + } + + void beginTx() { + if (!session) { + return; + } + + String dsl = "g.tx().open()"; + client.submit(dsl).all().join(); + } + + void doCommit() { + if (!session) { + return; + } + + try { + String dsl = "g.tx().commit()"; + client.submit(dsl).all().join(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + void doRollback() { + if (!session) { + return; + } + + String dsl = "g.tx().rollback()"; + client.submit(dsl).all().join(); + } + + private void warmClient(int num) { + try { + beginTx(); + runInternal("g.V('test')", null); + doCommit(); + log.info("warm graphdb client over"); + } catch (Exception e) { + log.error("warmClient error"); + doRollback(); + throw new RuntimeException(e); + } + } + + @Override + public void close() { + if (client != null) { + log.info("close graphdb client"); + client.close(); + } + } +} diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbEdge.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbEdge.java new file mode 100644 index 00000000..d42c9182 --- /dev/null +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbEdge.java @@ -0,0 +1,20 @@ +/** + * + */ +package com.alibaba.datax.plugin.writer.gdbwriter.model; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * @author jerrywang + * + */ +@Data +@EqualsAndHashCode(callSuper = true) +@ToString(callSuper = true) +public class GdbEdge extends GdbElement { + private String from = null; + private String to = null; +} diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbElement.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbElement.java new file mode 100644 index 00000000..af3c7090 --- /dev/null +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbElement.java @@ -0,0 +1,20 @@ +/** + * + */ +package com.alibaba.datax.plugin.writer.gdbwriter.model; + +import java.util.HashMap; +import java.util.Map; + +import lombok.Data; + +/** + * @author jerrywang + * + */ +@Data +public class GdbElement { + String id = null; + String label = null; + Map properties = new HashMap<>(); +} diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbGraph.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbGraph.java new file mode 100644 index 00000000..5b98c502 --- /dev/null +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbGraph.java @@ -0,0 +1,20 @@ +/** + * + */ +package com.alibaba.datax.plugin.writer.gdbwriter.model; + +import com.alibaba.datax.common.element.Record; +import groovy.lang.Tuple2; + +import java.util.List; + +/** + * @author jerrywang + * + */ +public interface GdbGraph extends AutoCloseable { + List> add(List> records); + + @Override + void close(); +} diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbVertex.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbVertex.java new file mode 100644 index 00000000..0bc762bd --- /dev/null +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbVertex.java @@ -0,0 +1,17 @@ +/** + * + */ +package com.alibaba.datax.plugin.writer.gdbwriter.model; + +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * @author jerrywang + * + */ +@EqualsAndHashCode(callSuper = true) +@ToString(callSuper = true) +public class GdbVertex extends GdbElement { + +} diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/ScriptGdbGraph.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/ScriptGdbGraph.java new file mode 100644 index 00000000..7f898431 --- /dev/null +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/ScriptGdbGraph.java @@ -0,0 +1,196 @@ +/** + * + */ +package com.alibaba.datax.plugin.writer.gdbwriter.model; + +import java.util.*; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.util.Configuration; + +import com.alibaba.datax.plugin.writer.gdbwriter.Key; +import com.alibaba.datax.plugin.writer.gdbwriter.util.GdbDuplicateIdException; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import groovy.lang.Tuple2; +import lombok.extern.slf4j.Slf4j; + +/** + * @author jerrywang + * + */ +@Slf4j +public class ScriptGdbGraph extends AbstractGdbGraph { + private static final String VAR_PREFIX = "GDB___"; + private static final String VAR_ID = VAR_PREFIX + "id"; + private static final String VAR_LABEL = VAR_PREFIX + "label"; + private static final String VAR_FROM = VAR_PREFIX + "from"; + private static final String VAR_TO = VAR_PREFIX + "to"; + private static final String VAR_PROP_KEY = VAR_PREFIX + "PK"; + private static final String VAR_PROP_VALUE = VAR_PREFIX + "PV"; + private static final String ADD_V_START = "g.addV(" + VAR_LABEL + ").property(id, " + VAR_ID + ")"; + private static final String ADD_E_START = "g.addE(" + VAR_LABEL + ").property(id, " + VAR_ID + ").from(V(" + + VAR_FROM + ")).to(V(" + VAR_TO + "))"; + + private static final String UPDATE_V_START = "g.V("+VAR_ID+")"; + private static final String UPDATE_E_START = "g.E("+VAR_ID+")"; + + private Cache propertyCache; + private Random random; + + public ScriptGdbGraph() { + propertyCache = Caffeine.newBuilder().maximumSize(1024).build(); + random = new Random(); + } + + public ScriptGdbGraph(Configuration config, boolean session) { + super(config, session); + + propertyCache = Caffeine.newBuilder().maximumSize(1024).build(); + random = new Random(); + + log.info("Init as ScriptGdbGraph."); + } + + /** + * Apply list of {@link GdbElement} to GDB, return the failed records + * @param records list of element to apply + * @return + */ + @Override + public List> add(List> records) { + List> errors = new ArrayList<>(); + try { + beginTx(); + for (Tuple2 elementTuple2 : records) { + try { + addInternal(elementTuple2.getSecond()); + } catch (Exception e) { + errors.add(new Tuple2<>(elementTuple2.getFirst(), e)); + } + } + doCommit(); + } catch (Exception ex) { + doRollback(); + throw new RuntimeException(ex); + } + return errors; + } + + private void addInternal(GdbElement element) { + try { + addInternal(element, false); + } catch (GdbDuplicateIdException e) { + if (updateMode == Key.UpdateMode.SKIP) { + log.debug("Skip duplicate id {}", element.getId()); + } else if (updateMode == Key.UpdateMode.INSERT) { + throw new RuntimeException(e); + } else if (updateMode == Key.UpdateMode.MERGE) { + if (element.getProperties().isEmpty()) { + return; + } + + try { + addInternal(element, true); + } catch (GdbDuplicateIdException e1) { + log.error("duplicate id {} while update...", element.getId()); + throw new RuntimeException(e1); + } + } + } + } + + private void addInternal(GdbElement element, boolean update) throws GdbDuplicateIdException { + Map params = element.getProperties(); + Map subParams = new HashMap<>(propertiesBatchNum); + boolean firstAdd = !update; + boolean isVertex = (element instanceof GdbVertex); + + for (Map.Entry entry : params.entrySet()) { + subParams.put(entry.getKey(), entry.getValue()); + if (subParams.size() >= propertiesBatchNum) { + setGraphDbElement(element, subParams, isVertex, firstAdd); + firstAdd = false; + subParams.clear(); + } + } + if (!subParams.isEmpty() || firstAdd) { + setGraphDbElement(element, subParams, isVertex, firstAdd); + } + } + + private Tuple2> buildDsl(GdbElement element, + Map properties, + boolean isVertex, boolean firstAdd) { + Map params = new HashMap<>(); + + String dslPropertyPart = propertyCache.get(properties.size(), keys -> { + final StringBuilder sb = new StringBuilder(); + for (int i = 0; i < keys; i++) { + sb.append(".property(").append(VAR_PROP_KEY).append(i) + .append(", ").append(VAR_PROP_VALUE).append(i).append(")"); + } + return sb.toString(); + }); + + String dsl; + if (isVertex) { + dsl = (firstAdd ? ADD_V_START : UPDATE_V_START) + dslPropertyPart; + } else { + dsl = (firstAdd ? ADD_E_START : UPDATE_E_START) + dslPropertyPart; + if (firstAdd) { + params.put(VAR_FROM, ((GdbEdge)element).getFrom()); + params.put(VAR_TO, ((GdbEdge)element).getTo()); + } + } + + int index = 0; + for (Map.Entry entry : properties.entrySet()) { + params.put(VAR_PROP_KEY+index, entry.getKey()); + params.put(VAR_PROP_VALUE+index, entry.getValue()); + index++; + } + + if (firstAdd) { + params.put(VAR_LABEL, element.getLabel()); + } + params.put(VAR_ID, element.getId()); + + return new Tuple2<>(dsl, params); + } + + private void setGraphDbElement(GdbElement element, Map properties, + boolean isVertex, boolean firstAdd) throws GdbDuplicateIdException { + int retry = 10; + int idleTime = random.nextInt(10) + 10; + Tuple2> elementDsl = buildDsl(element, properties, isVertex, firstAdd); + + while (retry > 0) { + try { + runInternal(elementDsl.getFirst(), elementDsl.getSecond()); + log.debug("AddElement {}", element.getId()); + return; + } catch (Exception e) { + String cause = e.getCause() == null ? "" : e.getCause().toString(); + if (cause.contains("rejected from")) { + retry--; + try { + Thread.sleep(idleTime); + } catch (InterruptedException e1) { + // ... + } + idleTime = Math.min(idleTime * 2, 2000); + continue; + } else if (firstAdd && cause.contains("GraphDB id exists")) { + throw new GdbDuplicateIdException(e); + } + log.error("Add Failed id {}, dsl {}, params {}, e {}", element.getId(), + elementDsl.getFirst(), elementDsl.getSecond(), e); + throw new RuntimeException(e); + } + } + log.error("Add Failed id {}, dsl {}, params {}", element.getId(), + elementDsl.getFirst(), elementDsl.getSecond()); + throw new RuntimeException("failed to queue new element to server"); + } +} diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/ConfigHelper.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/ConfigHelper.java new file mode 100644 index 00000000..77175197 --- /dev/null +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/ConfigHelper.java @@ -0,0 +1,59 @@ +/** + * + */ +package com.alibaba.datax.plugin.writer.gdbwriter.util; + +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Supplier; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.gdbwriter.GdbWriterErrorCode; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; + +import org.apache.commons.lang3.StringUtils; + +/** + * @author jerrywang + * + */ +public interface ConfigHelper { + static void assertConfig(String key, Supplier f) { + if (!f.get()) { + throw DataXException.asDataXException(GdbWriterErrorCode.BAD_CONFIG_VALUE, key); + } + } + + static void assertHasContent(Configuration config, String key) { + assertConfig(key, () -> StringUtils.isNotBlank(config.getString(key))); + } + + /** + * NOTE: {@code Configuration::get(String, Class)} doesn't work. + * + * @param conf Configuration + * @param key key path to configuration + * @param cls Class of result type + * @return the target configuration object of type T + */ + static T getConfig(Configuration conf, String key, Class cls) { + JSONObject j = (JSONObject) conf.get(key); + return JSON.toJavaObject(j, cls); + } + + /** + * Create a configuration from the specified file on the classpath. + * + * @param name file name + * @return Configuration instance. + */ + static Configuration fromClasspath(String name) { + try (InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(name)) { + return Configuration.from(is); + } catch (IOException e) { + throw new IllegalArgumentException("File not found: " + name); + } + } +} diff --git a/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/GdbDuplicateIdException.java b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/GdbDuplicateIdException.java new file mode 100644 index 00000000..e531d51b --- /dev/null +++ b/gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/GdbDuplicateIdException.java @@ -0,0 +1,23 @@ +/* + * (C) 2019-present Alibaba Group Holding Limited. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + */ +package com.alibaba.datax.plugin.writer.gdbwriter.util; + +/** + * @author : Liu Jianping + * @date : 2019/8/3 + */ + +public class GdbDuplicateIdException extends Exception { + public GdbDuplicateIdException(Exception e) { + super(e); + } + + public GdbDuplicateIdException() { + super(); + } +} diff --git a/gdbwriter/src/main/resources/plugin.json b/gdbwriter/src/main/resources/plugin.json new file mode 100644 index 00000000..52d1b11d --- /dev/null +++ b/gdbwriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "gdbwriter", + "class": "com.alibaba.datax.plugin.writer.gdbwriter.GdbWriter", + "description": "useScene: prod. mechanism: connect GDB with gremlin-client, execute DSL as 'g.addV() or g.addE()' to write record", + "developer": "alibaba" +} diff --git a/gdbwriter/src/main/resources/plugin_job_template.json b/gdbwriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..09a03142 --- /dev/null +++ b/gdbwriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,74 @@ +{ + "job": { + "setting": { + "speed": { + "channel": 1 + } + }, + + "content": [ + { + "reader": { + "name": "odpsreader" + }, + + "writer": { + "name": "gdbwriter", + "parameter": { + "host": "localhost", + "port": 8182, + "username": "username", + "password": "password", + "label": "test-label", + "srcLabel": "test-srcLabel-", + "dstLabel": "test-dstLabel-", + "labelType": "EDGE", + "writeMode": "INSERT", + "idTransRule": "labelPrefix", + "srcIdTransRule": "labelPrefix", + "dstIdTransRule": "labelPrefix", + + "column": [ + { + "name": "id", + "value": "-test-${0}", + "type": "string", + "columnType": "primaryKey" + }, + { + "name": "id", + "value": "from-id-${2}", + "type": "string", + "columnType": "srcPrimaryKey" + }, + { + "name": "id", + "value": "to-id-${3}", + "type": "string", + "columnType": "dstPrimaryKey" + }, + { + "name": "strValue-${2}-key", + "value": "strValue-${2}-value", + "type": "string", + "columnType": "edgeProperty" + }, + { + "name": "intProp", + "value": "${3}", + "type": "int", + "columnType": "edgeProperty" + }, + { + "name": "booleanProp", + "value": "${5}", + "type": "boolean", + "columnType": "edgeProperty" + } + ] + } + } + } + ] + } +} diff --git a/pom.xml b/pom.xml index 79865473..50ee2080 100755 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,7 @@ elasticsearchwriter tsdbwriter adbpgwriter + gdbwriter plugin-rdbms-util