From 11380b308396ba1c6262712699fd50b97dc16bee Mon Sep 17 00:00:00 2001 From: FuYouJ <1247908487@qq.com> Date: Sat, 16 Sep 2023 13:29:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9EwriteMode=E5=8F=82=E6=95=B0?= =?UTF-8?q?=EF=BC=8C=E6=9B=B4=E6=96=B0=E7=A4=BA=E4=BE=8B=E6=96=87=E6=A1=A3?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增writeMode参数,更新示例文档。 --- neo4jwriter/doc/neo4jwriter.md | 21 ++++++++------- .../writer/neo4jwriter/Neo4jClient.java | 26 +++++++++++++++++-- .../writer/neo4jwriter/Neo4jWriter.java | 5 ++++ .../neo4jwriter/config/ConfigConstants.java | 8 +++++- .../writer/neo4jwriter/config/WriteMode.java | 20 ++++++++++++++ 5 files changed, 67 insertions(+), 13 deletions(-) create mode 100644 neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/WriteMode.java diff --git a/neo4jwriter/doc/neo4jwriter.md b/neo4jwriter/doc/neo4jwriter.md index 0c6e356c..7cfbe692 100644 --- a/neo4jwriter/doc/neo4jwriter.md +++ b/neo4jwriter/doc/neo4jwriter.md @@ -16,8 +16,8 @@ ### 配置项介绍 -| 配置 | 说明 | 是否必须 | 默认值 | 示例 | -|:-------------------------------|--------------------| -------- | ------ | ---------------------------------------------------- | +| 配置 | 说明 | 是否必须 | 默认值 | 示例 | +|:-------------------------------|--------------------| -------- |--------|------------------------------------------------------| | database | 数据库名字 | 是 | - | neo4j | | uri | 数据库访问链接 | 是 | - | bolt://localhost:7687 | | username | 访问用户名 | 是 | - | neo4j | @@ -25,13 +25,14 @@ | bearerToken | 权限相关 | 否 | - | - | | kerberosTicket | 权限相关 | 否 | - | - | | cypher | 同步语句 | 是 | - | unwind $batch as row create(p) set p.name = row.name | -| batchDataVariableName | unwind 携带的数据变量名 | | | batch | -| properties | 定义neo4j中数据的属性名字和类型 | 是 | - | 见后续案例 | +| batchDataVariableName | unwind 携带的数据变量名 | | batch | batch | +| properties | 定义neo4j中数据的属性名字和类型 | 是 | - | 见后续案例 | | batchSize | 一批写入数据量 | 否 | 1000 | | -| maxTransactionRetryTimeSeconds | 事务运行最长时间 | 否 | 30秒 | 30 | -| maxConnectionTimeoutSeconds | 驱动最长链接时间 | 否 | 30秒 | 30 | -| retryTimes | 发生错误的重试次数 | 否 | 3次 | 3 | -| retrySleepMills | 重试失败后的等待时间 | 否 | 3秒 | 3 | +| maxTransactionRetryTimeSeconds | 事务运行最长时间 | 否 | 30秒 | 30 | +| maxConnectionTimeoutSeconds | 驱动最长链接时间 | 否 | 30秒 | 30 | +| retryTimes | 发生错误的重试次数 | 否 | 3次 | 3 | +| retrySleepMills | 重试失败后的等待时间 | 否 | 3秒 | 3 | +| writeMode | 写入模式 | 否 | INSERT | INSERT or UPDATE | ### 支持的数据类型 > 配置时均忽略大小写 @@ -124,7 +125,7 @@ Object_ARRAY "database": "neo4j", "cypher": "unwind $batch as row match(p1:Person) where p1.id = row.startNodeId match(p2:Person) where p2.id = row.endNodeId create (p1)-[:LINK]->(p2)", "batchDataVariableName": "batch", - "batch_size": "33", + "batchSize": "33", "properties": [ { "name": "startNodeId", @@ -153,7 +154,7 @@ Object_ARRAY "database": "yourDataBase", "cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.Label + '`{id:$id})' ,{id: row.id} ) YIELD value RETURN 1 ", "batchDataVariableName": "batch", - "batch_size": "1", + "batchSize": "1", "properties": [ { "name": "Label", diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java index 4451bbdf..d9068745 100644 --- a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java @@ -10,6 +10,7 @@ import com.alibaba.datax.common.util.RetryUtil; import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.DateAdapter; import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.ValueAdapter; import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty; +import com.alibaba.datax.plugin.writer.neo4jwriter.config.WriteMode; import com.alibaba.datax.plugin.writer.neo4jwriter.exception.Neo4jErrorCode; import com.alibaba.fastjson2.JSON; import org.apache.commons.lang3.StringUtils; @@ -67,16 +68,28 @@ public class Neo4jClient { String batchVariableName = config.getString(BATCH_DATA_VARIABLE_NAME.getKey(), BATCH_DATA_VARIABLE_NAME.getDefaultValue()); List neo4jProperties = JSON.parseArray(config.getString(NEO4J_PROPERTIES.getKey()), Neo4jProperty.class); + int batchSize = config.getInt(BATCH_SIZE.getKey(), BATCH_SIZE.getDefaultValue()); int retryTimes = config.getInt(RETRY_TIMES.getKey(), RETRY_TIMES.getDefaultValue()); + long retrySleepMills = RETRY_SLEEP_MILLS.getDefaultValue(); + String writeMode = config.getString(WRITE_MODE.getKey(), WRITE_MODE.getDefaultValue()); + return new Neo4jClient(driver, new WriteConfig(cypher, database, batchVariableName, neo4jProperties, batchSize), - new RetryConfig(retryTimes, config.getLong(RETRY_SLEEP_MILLS.getKey(), RETRY_SLEEP_MILLS.getDefaultValue())), + new RetryConfig(retryTimes, + config.getLong(RETRY_SLEEP_MILLS.getKey(), retrySleepMills), + checkWriteMode(writeMode)), taskPluginCollector ); } + private static WriteMode checkWriteMode(String writeMode) { + Optional mode = WriteMode.from(writeMode); + return mode.orElseThrow(() -> + DataXException.asDataXException(Neo4jErrorCode.CONFIG_INVALID, "writeMode should be INSERT or UPDATE")); + } + private static String checkCypher(Configuration config) { String cypher = config.getString(CYPHER.getKey()); if (StringUtils.isBlank(cypher)) { @@ -177,6 +190,10 @@ public class Neo4jClient { } + public boolean supportFailOver() { + return WriteMode.UPDATE.equals(this.retryConfig.writeMode); + } + private String toUnwindStr(List values) { StringJoiner joiner = new StringJoiner(","); for (MapValue value : values) { @@ -222,9 +239,14 @@ public class Neo4jClient { int retryTimes; long retrySleepMills; - RetryConfig(int retryTimes, long retrySleepMills) { + //INSERT + //UPDATE + WriteMode writeMode; + + RetryConfig(int retryTimes, long retrySleepMills, WriteMode writeMode) { this.retryTimes = retryTimes; this.retrySleepMills = retrySleepMills; + this.writeMode = writeMode; } } diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java index 6a589c1d..e182d7a5 100644 --- a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java @@ -60,5 +60,10 @@ public class Neo4jWriter extends Writer { this.neo4jClient.tryWrite(record); } } + + @Override + public boolean supportFailOver() { + return this.neo4jClient.supportFailOver(); + } } } diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/ConfigConstants.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/ConfigConstants.java index eed3588e..ab66bd07 100644 --- a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/ConfigConstants.java +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/ConfigConstants.java @@ -13,7 +13,6 @@ public final class ConfigConstants { public static final Long DEFAULT_MAX_CONNECTION_SECONDS = 30L; - public static final Option RETRY_TIMES = Option.builder() .key("retryTimes") @@ -113,4 +112,11 @@ public final class ConfigConstants { .defaultValue(1000) .desc("max batch size") .build(); + + public static final Option WRITE_MODE = + Option.builder() + .key("writeMode") + .defaultValue(WriteMode.INSERT.name()) + .desc("The flag is either insert or update, and if it is update, a retry is performed") + .build(); } diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/WriteMode.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/WriteMode.java new file mode 100644 index 00000000..75d3fbbf --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/WriteMode.java @@ -0,0 +1,20 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter.config; + + +import java.util.Arrays; +import java.util.Optional; + +/** + * @author fuyouj + */ + +public enum WriteMode { + INSERT, + UPDATE; + + public static Optional from(String name){ + return Arrays.stream(WriteMode.values()) + .filter(e -> e.name().equalsIgnoreCase(name)) + .findFirst(); + } +}