新增writeMode参数,更新示例文档。

新增writeMode参数,更新示例文档。
This commit is contained in:
FuYouJ 2023-09-16 13:29:49 +08:00
parent 5d437e4de4
commit 11380b3083
5 changed files with 67 additions and 13 deletions

View File

@ -16,8 +16,8 @@
### 配置项介绍 ### 配置项介绍
| 配置 | 说明 | 是否必须 | 默认值 | 示例 | | 配置 | 说明 | 是否必须 | 默认值 | 示例 |
|:-------------------------------|--------------------| -------- | ------ | ---------------------------------------------------- | |:-------------------------------|--------------------| -------- |--------|------------------------------------------------------|
| database | 数据库名字 | 是 | - | neo4j | | database | 数据库名字 | 是 | - | neo4j |
| uri | 数据库访问链接 | 是 | - | bolt://localhost:7687 | | uri | 数据库访问链接 | 是 | - | bolt://localhost:7687 |
| username | 访问用户名 | 是 | - | neo4j | | username | 访问用户名 | 是 | - | neo4j |
@ -25,13 +25,14 @@
| bearerToken | 权限相关 | 否 | - | - | | bearerToken | 权限相关 | 否 | - | - |
| kerberosTicket | 权限相关 | 否 | - | - | | kerberosTicket | 权限相关 | 否 | - | - |
| cypher | 同步语句 | 是 | - | unwind $batch as row create(p) set p.name = row.name | | cypher | 同步语句 | 是 | - | unwind $batch as row create(p) set p.name = row.name |
| batchDataVariableName | unwind 携带的数据变量名 | | | batch | | batchDataVariableName | unwind 携带的数据变量名 | | batch | batch |
| properties | 定义neo4j中数据的属性名字和类型 | 是 | - | 见后续案例 | | properties | 定义neo4j中数据的属性名字和类型 | 是 | - | 见后续案例 |
| batchSize | 一批写入数据量 | 否 | 1000 | | | batchSize | 一批写入数据量 | 否 | 1000 | |
| maxTransactionRetryTimeSeconds | 事务运行最长时间 | 否 | 30秒 | 30 | | maxTransactionRetryTimeSeconds | 事务运行最长时间 | 否 | 30秒 | 30 |
| maxConnectionTimeoutSeconds | 驱动最长链接时间 | 否 | 30秒 | 30 | | maxConnectionTimeoutSeconds | 驱动最长链接时间 | 否 | 30秒 | 30 |
| retryTimes | 发生错误的重试次数 | 否 | 3次 | 3 | | retryTimes | 发生错误的重试次数 | 否 | 3次 | 3 |
| retrySleepMills | 重试失败后的等待时间 | 否 | 3秒 | 3 | | retrySleepMills | 重试失败后的等待时间 | 否 | 3秒 | 3 |
| writeMode | 写入模式 | 否 | INSERT | INSERT or UPDATE |
### 支持的数据类型 ### 支持的数据类型
> 配置时均忽略大小写 > 配置时均忽略大小写
@ -124,7 +125,7 @@ Object_ARRAY
"database": "neo4j", "database": "neo4j",
"cypher": "unwind $batch as row match(p1:Person) where p1.id = row.startNodeId match(p2:Person) where p2.id = row.endNodeId create (p1)-[:LINK]->(p2)", "cypher": "unwind $batch as row match(p1:Person) where p1.id = row.startNodeId match(p2:Person) where p2.id = row.endNodeId create (p1)-[:LINK]->(p2)",
"batchDataVariableName": "batch", "batchDataVariableName": "batch",
"batch_size": "33", "batchSize": "33",
"properties": [ "properties": [
{ {
"name": "startNodeId", "name": "startNodeId",
@ -153,7 +154,7 @@ Object_ARRAY
"database": "yourDataBase", "database": "yourDataBase",
"cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.Label + '`{id:$id})' ,{id: row.id} ) YIELD value RETURN 1 ", "cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.Label + '`{id:$id})' ,{id: row.id} ) YIELD value RETURN 1 ",
"batchDataVariableName": "batch", "batchDataVariableName": "batch",
"batch_size": "1", "batchSize": "1",
"properties": [ "properties": [
{ {
"name": "Label", "name": "Label",

View File

@ -10,6 +10,7 @@ import com.alibaba.datax.common.util.RetryUtil;
import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.DateAdapter; import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.DateAdapter;
import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.ValueAdapter; import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.ValueAdapter;
import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty; import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty;
import com.alibaba.datax.plugin.writer.neo4jwriter.config.WriteMode;
import com.alibaba.datax.plugin.writer.neo4jwriter.exception.Neo4jErrorCode; import com.alibaba.datax.plugin.writer.neo4jwriter.exception.Neo4jErrorCode;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -67,16 +68,28 @@ public class Neo4jClient {
String batchVariableName = config.getString(BATCH_DATA_VARIABLE_NAME.getKey(), String batchVariableName = config.getString(BATCH_DATA_VARIABLE_NAME.getKey(),
BATCH_DATA_VARIABLE_NAME.getDefaultValue()); BATCH_DATA_VARIABLE_NAME.getDefaultValue());
List<Neo4jProperty> neo4jProperties = JSON.parseArray(config.getString(NEO4J_PROPERTIES.getKey()), Neo4jProperty.class); List<Neo4jProperty> neo4jProperties = JSON.parseArray(config.getString(NEO4J_PROPERTIES.getKey()), Neo4jProperty.class);
int batchSize = config.getInt(BATCH_SIZE.getKey(), BATCH_SIZE.getDefaultValue()); int batchSize = config.getInt(BATCH_SIZE.getKey(), BATCH_SIZE.getDefaultValue());
int retryTimes = config.getInt(RETRY_TIMES.getKey(), RETRY_TIMES.getDefaultValue()); int retryTimes = config.getInt(RETRY_TIMES.getKey(), RETRY_TIMES.getDefaultValue());
long retrySleepMills = RETRY_SLEEP_MILLS.getDefaultValue();
String writeMode = config.getString(WRITE_MODE.getKey(), WRITE_MODE.getDefaultValue());
return new Neo4jClient(driver, return new Neo4jClient(driver,
new WriteConfig(cypher, database, batchVariableName, neo4jProperties, batchSize), new WriteConfig(cypher, database, batchVariableName, neo4jProperties, batchSize),
new RetryConfig(retryTimes, config.getLong(RETRY_SLEEP_MILLS.getKey(), RETRY_SLEEP_MILLS.getDefaultValue())), new RetryConfig(retryTimes,
config.getLong(RETRY_SLEEP_MILLS.getKey(), retrySleepMills),
checkWriteMode(writeMode)),
taskPluginCollector taskPluginCollector
); );
} }
private static WriteMode checkWriteMode(String writeMode) {
Optional<WriteMode> mode = WriteMode.from(writeMode);
return mode.orElseThrow(() ->
DataXException.asDataXException(Neo4jErrorCode.CONFIG_INVALID, "writeMode should be INSERT or UPDATE"));
}
private static String checkCypher(Configuration config) { private static String checkCypher(Configuration config) {
String cypher = config.getString(CYPHER.getKey()); String cypher = config.getString(CYPHER.getKey());
if (StringUtils.isBlank(cypher)) { if (StringUtils.isBlank(cypher)) {
@ -177,6 +190,10 @@ public class Neo4jClient {
} }
public boolean supportFailOver() {
return WriteMode.UPDATE.equals(this.retryConfig.writeMode);
}
private String toUnwindStr(List<MapValue> values) { private String toUnwindStr(List<MapValue> values) {
StringJoiner joiner = new StringJoiner(","); StringJoiner joiner = new StringJoiner(",");
for (MapValue value : values) { for (MapValue value : values) {
@ -222,9 +239,14 @@ public class Neo4jClient {
int retryTimes; int retryTimes;
long retrySleepMills; long retrySleepMills;
RetryConfig(int retryTimes, long retrySleepMills) { //INSERT
//UPDATE
WriteMode writeMode;
RetryConfig(int retryTimes, long retrySleepMills, WriteMode writeMode) {
this.retryTimes = retryTimes; this.retryTimes = retryTimes;
this.retrySleepMills = retrySleepMills; this.retrySleepMills = retrySleepMills;
this.writeMode = writeMode;
} }
} }

View File

@ -60,5 +60,10 @@ public class Neo4jWriter extends Writer {
this.neo4jClient.tryWrite(record); this.neo4jClient.tryWrite(record);
} }
} }
@Override
public boolean supportFailOver() {
return this.neo4jClient.supportFailOver();
}
} }
} }

View File

@ -13,7 +13,6 @@ public final class ConfigConstants {
public static final Long DEFAULT_MAX_CONNECTION_SECONDS = 30L; public static final Long DEFAULT_MAX_CONNECTION_SECONDS = 30L;
public static final Option<Integer> RETRY_TIMES = public static final Option<Integer> RETRY_TIMES =
Option.<Integer>builder() Option.<Integer>builder()
.key("retryTimes") .key("retryTimes")
@ -113,4 +112,11 @@ public final class ConfigConstants {
.defaultValue(1000) .defaultValue(1000)
.desc("max batch size") .desc("max batch size")
.build(); .build();
public static final Option<String> WRITE_MODE =
Option.<String>builder()
.key("writeMode")
.defaultValue(WriteMode.INSERT.name())
.desc("The flag is either insert or update, and if it is update, a retry is performed")
.build();
} }

View File

@ -0,0 +1,20 @@
package com.alibaba.datax.plugin.writer.neo4jwriter.config;
import java.util.Arrays;
import java.util.Optional;
/**
* @author fuyouj
*/
public enum WriteMode {
INSERT,
UPDATE;
public static Optional<WriteMode> from(String name){
return Arrays.stream(WriteMode.values())
.filter(e -> e.name().equalsIgnoreCase(name))
.findFirst();
}
}