From 11380b308396ba1c6262712699fd50b97dc16bee Mon Sep 17 00:00:00 2001 From: FuYouJ <1247908487@qq.com> Date: Sat, 16 Sep 2023 13:29:49 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E6=96=B0=E5=A2=9EwriteMode=E5=8F=82?= =?UTF-8?q?=E6=95=B0=EF=BC=8C=E6=9B=B4=E6=96=B0=E7=A4=BA=E4=BE=8B=E6=96=87?= =?UTF-8?q?=E6=A1=A3=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增writeMode参数,更新示例文档。 --- neo4jwriter/doc/neo4jwriter.md | 21 ++++++++------- .../writer/neo4jwriter/Neo4jClient.java | 26 +++++++++++++++++-- .../writer/neo4jwriter/Neo4jWriter.java | 5 ++++ .../neo4jwriter/config/ConfigConstants.java | 8 +++++- .../writer/neo4jwriter/config/WriteMode.java | 20 ++++++++++++++ 5 files changed, 67 insertions(+), 13 deletions(-) create mode 100644 neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/WriteMode.java diff --git a/neo4jwriter/doc/neo4jwriter.md b/neo4jwriter/doc/neo4jwriter.md index 0c6e356c..7cfbe692 100644 --- a/neo4jwriter/doc/neo4jwriter.md +++ b/neo4jwriter/doc/neo4jwriter.md @@ -16,8 +16,8 @@ ### 配置项介绍 -| 配置 | 说明 | 是否必须 | 默认值 | 示例 | -|:-------------------------------|--------------------| -------- | ------ | ---------------------------------------------------- | +| 配置 | 说明 | 是否必须 | 默认值 | 示例 | +|:-------------------------------|--------------------| -------- |--------|------------------------------------------------------| | database | 数据库名字 | 是 | - | neo4j | | uri | 数据库访问链接 | 是 | - | bolt://localhost:7687 | | username | 访问用户名 | 是 | - | neo4j | @@ -25,13 +25,14 @@ | bearerToken | 权限相关 | 否 | - | - | | kerberosTicket | 权限相关 | 否 | - | - | | cypher | 同步语句 | 是 | - | unwind $batch as row create(p) set p.name = row.name | -| batchDataVariableName | unwind 携带的数据变量名 | | | batch | -| properties | 定义neo4j中数据的属性名字和类型 | 是 | - | 见后续案例 | +| batchDataVariableName | unwind 携带的数据变量名 | | batch | batch | +| properties | 定义neo4j中数据的属性名字和类型 | 是 | - | 见后续案例 | | batchSize | 一批写入数据量 | 否 | 1000 | | -| maxTransactionRetryTimeSeconds | 事务运行最长时间 | 否 | 30秒 | 30 | -| maxConnectionTimeoutSeconds | 驱动最长链接时间 | 否 | 30秒 | 30 | -| retryTimes | 发生错误的重试次数 | 否 | 3次 | 3 | -| retrySleepMills | 重试失败后的等待时间 | 否 | 3秒 | 3 | +| maxTransactionRetryTimeSeconds | 事务运行最长时间 | 否 | 30秒 | 30 | +| maxConnectionTimeoutSeconds | 驱动最长链接时间 | 否 | 30秒 | 30 | +| retryTimes | 发生错误的重试次数 | 否 | 3次 | 3 | +| retrySleepMills | 重试失败后的等待时间 | 否 | 3秒 | 3 | +| writeMode | 写入模式 | 否 | INSERT | INSERT or UPDATE | ### 支持的数据类型 > 配置时均忽略大小写 @@ -124,7 +125,7 @@ Object_ARRAY "database": "neo4j", "cypher": "unwind $batch as row match(p1:Person) where p1.id = row.startNodeId match(p2:Person) where p2.id = row.endNodeId create (p1)-[:LINK]->(p2)", "batchDataVariableName": "batch", - "batch_size": "33", + "batchSize": "33", "properties": [ { "name": "startNodeId", @@ -153,7 +154,7 @@ Object_ARRAY "database": "yourDataBase", "cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.Label + '`{id:$id})' ,{id: row.id} ) YIELD value RETURN 1 ", "batchDataVariableName": "batch", - "batch_size": "1", + "batchSize": "1", "properties": [ { "name": "Label", diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java index 4451bbdf..d9068745 100644 --- a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java @@ -10,6 +10,7 @@ import com.alibaba.datax.common.util.RetryUtil; import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.DateAdapter; import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.ValueAdapter; import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty; +import com.alibaba.datax.plugin.writer.neo4jwriter.config.WriteMode; import com.alibaba.datax.plugin.writer.neo4jwriter.exception.Neo4jErrorCode; import com.alibaba.fastjson2.JSON; import org.apache.commons.lang3.StringUtils; @@ -67,16 +68,28 @@ public class Neo4jClient { String batchVariableName = config.getString(BATCH_DATA_VARIABLE_NAME.getKey(), BATCH_DATA_VARIABLE_NAME.getDefaultValue()); List neo4jProperties = JSON.parseArray(config.getString(NEO4J_PROPERTIES.getKey()), Neo4jProperty.class); + int batchSize = config.getInt(BATCH_SIZE.getKey(), BATCH_SIZE.getDefaultValue()); int retryTimes = config.getInt(RETRY_TIMES.getKey(), RETRY_TIMES.getDefaultValue()); + long retrySleepMills = RETRY_SLEEP_MILLS.getDefaultValue(); + String writeMode = config.getString(WRITE_MODE.getKey(), WRITE_MODE.getDefaultValue()); + return new Neo4jClient(driver, new WriteConfig(cypher, database, batchVariableName, neo4jProperties, batchSize), - new RetryConfig(retryTimes, config.getLong(RETRY_SLEEP_MILLS.getKey(), RETRY_SLEEP_MILLS.getDefaultValue())), + new RetryConfig(retryTimes, + config.getLong(RETRY_SLEEP_MILLS.getKey(), retrySleepMills), + checkWriteMode(writeMode)), taskPluginCollector ); } + private static WriteMode checkWriteMode(String writeMode) { + Optional mode = WriteMode.from(writeMode); + return mode.orElseThrow(() -> + DataXException.asDataXException(Neo4jErrorCode.CONFIG_INVALID, "writeMode should be INSERT or UPDATE")); + } + private static String checkCypher(Configuration config) { String cypher = config.getString(CYPHER.getKey()); if (StringUtils.isBlank(cypher)) { @@ -177,6 +190,10 @@ public class Neo4jClient { } + public boolean supportFailOver() { + return WriteMode.UPDATE.equals(this.retryConfig.writeMode); + } + private String toUnwindStr(List values) { StringJoiner joiner = new StringJoiner(","); for (MapValue value : values) { @@ -222,9 +239,14 @@ public class Neo4jClient { int retryTimes; long retrySleepMills; - RetryConfig(int retryTimes, long retrySleepMills) { + //INSERT + //UPDATE + WriteMode writeMode; + + RetryConfig(int retryTimes, long retrySleepMills, WriteMode writeMode) { this.retryTimes = retryTimes; this.retrySleepMills = retrySleepMills; + this.writeMode = writeMode; } } diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java index 6a589c1d..e182d7a5 100644 --- a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java @@ -60,5 +60,10 @@ public class Neo4jWriter extends Writer { this.neo4jClient.tryWrite(record); } } + + @Override + public boolean supportFailOver() { + return this.neo4jClient.supportFailOver(); + } } } diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/ConfigConstants.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/ConfigConstants.java index eed3588e..ab66bd07 100644 --- a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/ConfigConstants.java +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/ConfigConstants.java @@ -13,7 +13,6 @@ public final class ConfigConstants { public static final Long DEFAULT_MAX_CONNECTION_SECONDS = 30L; - public static final Option RETRY_TIMES = Option.builder() .key("retryTimes") @@ -113,4 +112,11 @@ public final class ConfigConstants { .defaultValue(1000) .desc("max batch size") .build(); + + public static final Option WRITE_MODE = + Option.builder() + .key("writeMode") + .defaultValue(WriteMode.INSERT.name()) + .desc("The flag is either insert or update, and if it is update, a retry is performed") + .build(); } diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/WriteMode.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/WriteMode.java new file mode 100644 index 00000000..75d3fbbf --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/WriteMode.java @@ -0,0 +1,20 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter.config; + + +import java.util.Arrays; +import java.util.Optional; + +/** + * @author fuyouj + */ + +public enum WriteMode { + INSERT, + UPDATE; + + public static Optional from(String name){ + return Arrays.stream(WriteMode.values()) + .filter(e -> e.name().equalsIgnoreCase(name)) + .findFirst(); + } +} From 9e4cd42cf84ff6acb8b71413bb55997996009803 Mon Sep 17 00:00:00 2001 From: FuYouJ <1247908487@qq.com> Date: Fri, 13 Oct 2023 20:43:45 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E6=9B=B4=E6=96=B0neo4jWriter=E6=96=87?= =?UTF-8?q?=E6=A1=A3=EF=BC=8C=E8=A1=A5=E5=85=85=E5=A4=8D=E6=9D=82=E7=9A=84?= =?UTF-8?q?=E5=86=99=E5=85=A5=E6=A1=88=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- neo4jwriter/doc/neo4jwriter.md | 130 +++++++++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) diff --git a/neo4jwriter/doc/neo4jwriter.md b/neo4jwriter/doc/neo4jwriter.md index 7cfbe692..96f4238e 100644 --- a/neo4jwriter/doc/neo4jwriter.md +++ b/neo4jwriter/doc/neo4jwriter.md @@ -168,6 +168,136 @@ Object_ARRAY } } ``` +> 同步数据时,每一条数据的Label均不相同的情况下,写入到neo4j似乎有点麻烦。因为cypher语句中 Label 不支持动态引用变量,不得不使用字符串拼接 关系或者节点的 Label. + +假设现在同步节点,然后同步关系。 + +**节点源头表** + +| 类型(TYPE) | 姓名属性(NAME) | uid属性(UID) | +| ---------- | -------------- | ------------ | +| Boy | 小付 | 1 | +| Girl | 小杰 | 2 | + +假设以上两条数据,是节点数据,他们的 Label 分别是 Boy 和 Girl. + +那么我们的writer这样配置。 + +```json + "writer": { + "name": "neo4jWriter", + "parameter": { + "uri": "bolt://localhost:7687", + "username": "yourUserName", + "password": "yourPassword", + "database": "yourDataBase", + "cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.type + '`{uid:$uid}) set n.name = name' ,{uid: row.uid,name:row.name,type:row.type} ) YIELD value RETURN 1", + "batchDataVariableName": "batch", + "batchSize": "1", + "properties": [ + { + "name": "type", + "type": "STRING" + }, + { + "name": "name", + "type": "STRING" + }, + { + "name":"uid", + "type":"STRING" + } + ] + } + } +//注意字符串拼接的规则。 +前面的语句`'+要拼接的类型+'`后面的语句. +``` + +我们将每一行的属性都作为参数传递给了apoc函数,在使用类型的地方,使用了字符串拼接。注意字符串拼接的规则。 + +实际上,以上语句最后到neo4j会被解析如下: + +```cypher +unwind [{type:'Boy',uid:'1',name:'小付'},{type:'Girl',uid:'2',name:'小杰'}] as row + CALL apoc.cypher.doIt( 'create (n:`' + row.type + '`{uid:$uid}) set n.name = name' ,{uid: row.uid,name:row.name,type:row.type} ) YIELD value RETURN 1 +``` + +假设节点同步成功后,我们开始同步关系。 + +**关系源头描述表** + +| 开始节点id | 结束节点id | 关系id | 开始节点类型type | 结束节点类型type | 关系类型type | 关系属性name | +| ---------- | ---------- | ------ | ---------------- | ---------------- | ------------ | ------------ | +| 1 | 2 | 3 | Boy | Girl | Link | link | + +我们根据开始节点和结束节点建立起连接关系。 + +```json + "writer": { + "name": "neo4jWriter", + "parameter": { + "uri": "bolt://localhost:7687", + "username": "yourUserName", + "password": "yourPassword", + "database": "yourDataBase", + "cypher": "unwind $batch as row CALL apoc.cypher.doIt( +'match(start:`'+row.startType+'`) where start.uid = $startId +match(end:`'+row.endType+'`{uid:$endId}) create (start)-[r:`'+row.rType+'`]-> +(end) set r.rid = $rid,r.name=name' , +{rType:row.rType,startType:row.startType,endType:row.endType,startId:row.startId +,endId:row.endId,name:row.name,rid:row.rid} ) YIELD value RETURN 1", + "batchDataVariableName": "batch", + "batchSize": "1000", + "properties": [ + { + "name": "rType", + "type": "STRING" + }, + { + "name": "startType", + "type": "STRING" + }, + { + "name":"endType", + "type":"STRING" + }, + { + "name":"startId", + "type":"STRING" + }, + { + "name":"endId", + "type":"STRING" + }, + { + "name":"name", + "type":"STRING" + } + ] + } + } +//注意字符串拼接的规则。 +前面的语句`'+要拼接的类型+'`后面的语句. +``` + +在配置中,我们解析每一行的数据,根据类型和id找到开始节点和结束节点,并将他们链接起来。 + +实际的cypher会被解析为: + +```cypher +unwind +[{rType:'Link',startType:'Boy',endType:'Girl',startId:'1',endId:'2', +name:'link',rid:'3'}] as row +CALL apoc.cypher.doIt( 'match(start:`'+row.startType+'`) where +start.uid = $startId match(end:`'+row.endType+'`{uid:$endId}) create (start)- +[r:`'+row.rType+'`]->(end) set r.rid = $rid,r.name=name' , +{rType:row.rType,startType:row.startType,endType:row.endType,startId:row.startId +,endId:row.endId,name:row.name,rid:row.rid} ) YIELD value RETURN 1 +``` + +* 动态写入Label的语法确实比较复杂,请用户复制以上案例到测试环境方便理解为何要使用字符串拼接。 +* 如果觉得这种写法太过于复杂,后续可能会引入其他方式。 ## 注意事项