This commit is contained in:
FuYouJ 2025-04-10 16:20:20 +08:00 committed by GitHub
commit 4cd588f690
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 197 additions and 13 deletions

View File

@ -16,8 +16,8 @@
### 配置项介绍 ### 配置项介绍
| 配置 | 说明 | 是否必须 | 默认值 | 示例 | | 配置 | 说明 | 是否必须 | 默认值 | 示例 |
|:-------------------------------|--------------------| -------- | ------ | ---------------------------------------------------- | |:-------------------------------|--------------------| -------- |--------|------------------------------------------------------|
| database | 数据库名字 | 是 | - | neo4j | | database | 数据库名字 | 是 | - | neo4j |
| uri | 数据库访问链接 | 是 | - | bolt://localhost:7687 | | uri | 数据库访问链接 | 是 | - | bolt://localhost:7687 |
| username | 访问用户名 | 是 | - | neo4j | | username | 访问用户名 | 是 | - | neo4j |
@ -25,13 +25,14 @@
| bearerToken | 权限相关 | 否 | - | - | | bearerToken | 权限相关 | 否 | - | - |
| kerberosTicket | 权限相关 | 否 | - | - | | kerberosTicket | 权限相关 | 否 | - | - |
| cypher | 同步语句 | 是 | - | unwind $batch as row create(p) set p.name = row.name | | cypher | 同步语句 | 是 | - | unwind $batch as row create(p) set p.name = row.name |
| batchDataVariableName | unwind 携带的数据变量名 | | | batch | | batchDataVariableName | unwind 携带的数据变量名 | | batch | batch |
| properties | 定义neo4j中数据的属性名字和类型 | 是 | - | 见后续案例 | | properties | 定义neo4j中数据的属性名字和类型 | 是 | - | 见后续案例 |
| batchSize | 一批写入数据量 | 否 | 1000 | | | batchSize | 一批写入数据量 | 否 | 1000 | |
| maxTransactionRetryTimeSeconds | 事务运行最长时间 | 否 | 30秒 | 30 | | maxTransactionRetryTimeSeconds | 事务运行最长时间 | 否 | 30秒 | 30 |
| maxConnectionTimeoutSeconds | 驱动最长链接时间 | 否 | 30秒 | 30 | | maxConnectionTimeoutSeconds | 驱动最长链接时间 | 否 | 30秒 | 30 |
| retryTimes | 发生错误的重试次数 | 否 | 3次 | 3 | | retryTimes | 发生错误的重试次数 | 否 | 3次 | 3 |
| retrySleepMills | 重试失败后的等待时间 | 否 | 3秒 | 3 | | retrySleepMills | 重试失败后的等待时间 | 否 | 3秒 | 3 |
| writeMode | 写入模式 | 否 | INSERT | INSERT or UPDATE |
### 支持的数据类型 ### 支持的数据类型
> 配置时均忽略大小写 > 配置时均忽略大小写
@ -124,7 +125,7 @@ Object_ARRAY
"database": "neo4j", "database": "neo4j",
"cypher": "unwind $batch as row match(p1:Person) where p1.id = row.startNodeId match(p2:Person) where p2.id = row.endNodeId create (p1)-[:LINK]->(p2)", "cypher": "unwind $batch as row match(p1:Person) where p1.id = row.startNodeId match(p2:Person) where p2.id = row.endNodeId create (p1)-[:LINK]->(p2)",
"batchDataVariableName": "batch", "batchDataVariableName": "batch",
"batch_size": "33", "batchSize": "33",
"properties": [ "properties": [
{ {
"name": "startNodeId", "name": "startNodeId",
@ -153,7 +154,7 @@ Object_ARRAY
"database": "yourDataBase", "database": "yourDataBase",
"cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.Label + '`{id:$id})' ,{id: row.id} ) YIELD value RETURN 1 ", "cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.Label + '`{id:$id})' ,{id: row.id} ) YIELD value RETURN 1 ",
"batchDataVariableName": "batch", "batchDataVariableName": "batch",
"batch_size": "1", "batchSize": "1",
"properties": [ "properties": [
{ {
"name": "Label", "name": "Label",
@ -167,6 +168,136 @@ Object_ARRAY
} }
} }
``` ```
> 同步数据时每一条数据的Label均不相同的情况下写入到neo4j似乎有点麻烦。因为cypher语句中 Label 不支持动态引用变量,不得不使用字符串拼接 关系或者节点的 Label.
假设现在同步节点,然后同步关系。
**节点源头表**
| 类型(TYPE) | 姓名属性(NAME) | uid属性(UID) |
| ---------- | -------------- | ------------ |
| Boy | 小付 | 1 |
| Girl | 小杰 | 2 |
假设以上两条数据,是节点数据,他们的 Label 分别是 Boy 和 Girl.
那么我们的writer这样配置。
```json
"writer": {
"name": "neo4jWriter",
"parameter": {
"uri": "bolt://localhost:7687",
"username": "yourUserName",
"password": "yourPassword",
"database": "yourDataBase",
"cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.type + '`{uid:$uid}) set n.name = name' ,{uid: row.uid,name:row.name,type:row.type} ) YIELD value RETURN 1",
"batchDataVariableName": "batch",
"batchSize": "1",
"properties": [
{
"name": "type",
"type": "STRING"
},
{
"name": "name",
"type": "STRING"
},
{
"name":"uid",
"type":"STRING"
}
]
}
}
//注意字符串拼接的规则。
前面的语句`'+要拼接的类型+'`后面的语句.
```
我们将每一行的属性都作为参数传递给了apoc函数在使用类型的地方使用了字符串拼接。注意字符串拼接的规则。
实际上以上语句最后到neo4j会被解析如下
```cypher
unwind [{type:'Boy',uid:'1',name:'小付'},{type:'Girl',uid:'2',name:'小杰'}] as row
CALL apoc.cypher.doIt( 'create (n:`' + row.type + '`{uid:$uid}) set n.name = name' ,{uid: row.uid,name:row.name,type:row.type} ) YIELD value RETURN 1
```
假设节点同步成功后,我们开始同步关系。
**关系源头描述表**
| 开始节点id | 结束节点id | 关系id | 开始节点类型type | 结束节点类型type | 关系类型type | 关系属性name |
| ---------- | ---------- | ------ | ---------------- | ---------------- | ------------ | ------------ |
| 1 | 2 | 3 | Boy | Girl | Link | link |
我们根据开始节点和结束节点建立起连接关系。
```json
"writer": {
"name": "neo4jWriter",
"parameter": {
"uri": "bolt://localhost:7687",
"username": "yourUserName",
"password": "yourPassword",
"database": "yourDataBase",
"cypher": "unwind $batch as row CALL apoc.cypher.doIt(
'match(start:`'+row.startType+'`) where start.uid = $startId
match(end:`'+row.endType+'`{uid:$endId}) create (start)-[r:`'+row.rType+'`]->
(end) set r.rid = $rid,r.name=name' ,
{rType:row.rType,startType:row.startType,endType:row.endType,startId:row.startId
,endId:row.endId,name:row.name,rid:row.rid} ) YIELD value RETURN 1",
"batchDataVariableName": "batch",
"batchSize": "1000",
"properties": [
{
"name": "rType",
"type": "STRING"
},
{
"name": "startType",
"type": "STRING"
},
{
"name":"endType",
"type":"STRING"
},
{
"name":"startId",
"type":"STRING"
},
{
"name":"endId",
"type":"STRING"
},
{
"name":"name",
"type":"STRING"
}
]
}
}
//注意字符串拼接的规则。
前面的语句`'+要拼接的类型+'`后面的语句.
```
在配置中我们解析每一行的数据根据类型和id找到开始节点和结束节点并将他们链接起来。
实际的cypher会被解析为
```cypher
unwind
[{rType:'Link',startType:'Boy',endType:'Girl',startId:'1',endId:'2',
name:'link',rid:'3'}] as row
CALL apoc.cypher.doIt( 'match(start:`'+row.startType+'`) where
start.uid = $startId match(end:`'+row.endType+'`{uid:$endId}) create (start)-
[r:`'+row.rType+'`]->(end) set r.rid = $rid,r.name=name' ,
{rType:row.rType,startType:row.startType,endType:row.endType,startId:row.startId
,endId:row.endId,name:row.name,rid:row.rid} ) YIELD value RETURN 1
```
* 动态写入Label的语法确实比较复杂请用户复制以上案例到测试环境方便理解为何要使用字符串拼接。
* 如果觉得这种写法太过于复杂,后续可能会引入其他方式。
## 注意事项 ## 注意事项

View File

@ -10,6 +10,7 @@ import com.alibaba.datax.common.util.RetryUtil;
import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.DateAdapter; import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.DateAdapter;
import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.ValueAdapter; import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.ValueAdapter;
import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty; import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty;
import com.alibaba.datax.plugin.writer.neo4jwriter.config.WriteMode;
import com.alibaba.datax.plugin.writer.neo4jwriter.exception.Neo4jErrorCode; import com.alibaba.datax.plugin.writer.neo4jwriter.exception.Neo4jErrorCode;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -67,16 +68,28 @@ public class Neo4jClient {
String batchVariableName = config.getString(BATCH_DATA_VARIABLE_NAME.getKey(), String batchVariableName = config.getString(BATCH_DATA_VARIABLE_NAME.getKey(),
BATCH_DATA_VARIABLE_NAME.getDefaultValue()); BATCH_DATA_VARIABLE_NAME.getDefaultValue());
List<Neo4jProperty> neo4jProperties = JSON.parseArray(config.getString(NEO4J_PROPERTIES.getKey()), Neo4jProperty.class); List<Neo4jProperty> neo4jProperties = JSON.parseArray(config.getString(NEO4J_PROPERTIES.getKey()), Neo4jProperty.class);
int batchSize = config.getInt(BATCH_SIZE.getKey(), BATCH_SIZE.getDefaultValue()); int batchSize = config.getInt(BATCH_SIZE.getKey(), BATCH_SIZE.getDefaultValue());
int retryTimes = config.getInt(RETRY_TIMES.getKey(), RETRY_TIMES.getDefaultValue()); int retryTimes = config.getInt(RETRY_TIMES.getKey(), RETRY_TIMES.getDefaultValue());
long retrySleepMills = RETRY_SLEEP_MILLS.getDefaultValue();
String writeMode = config.getString(WRITE_MODE.getKey(), WRITE_MODE.getDefaultValue());
return new Neo4jClient(driver, return new Neo4jClient(driver,
new WriteConfig(cypher, database, batchVariableName, neo4jProperties, batchSize), new WriteConfig(cypher, database, batchVariableName, neo4jProperties, batchSize),
new RetryConfig(retryTimes, config.getLong(RETRY_SLEEP_MILLS.getKey(), RETRY_SLEEP_MILLS.getDefaultValue())), new RetryConfig(retryTimes,
config.getLong(RETRY_SLEEP_MILLS.getKey(), retrySleepMills),
checkWriteMode(writeMode)),
taskPluginCollector taskPluginCollector
); );
} }
private static WriteMode checkWriteMode(String writeMode) {
Optional<WriteMode> mode = WriteMode.from(writeMode);
return mode.orElseThrow(() ->
DataXException.asDataXException(Neo4jErrorCode.CONFIG_INVALID, "writeMode should be INSERT or UPDATE"));
}
private static String checkCypher(Configuration config) { private static String checkCypher(Configuration config) {
String cypher = config.getString(CYPHER.getKey()); String cypher = config.getString(CYPHER.getKey());
if (StringUtils.isBlank(cypher)) { if (StringUtils.isBlank(cypher)) {
@ -177,6 +190,10 @@ public class Neo4jClient {
} }
public boolean supportFailOver() {
return WriteMode.UPDATE.equals(this.retryConfig.writeMode);
}
private String toUnwindStr(List<MapValue> values) { private String toUnwindStr(List<MapValue> values) {
StringJoiner joiner = new StringJoiner(","); StringJoiner joiner = new StringJoiner(",");
for (MapValue value : values) { for (MapValue value : values) {
@ -222,9 +239,14 @@ public class Neo4jClient {
int retryTimes; int retryTimes;
long retrySleepMills; long retrySleepMills;
RetryConfig(int retryTimes, long retrySleepMills) { //INSERT
//UPDATE
WriteMode writeMode;
RetryConfig(int retryTimes, long retrySleepMills, WriteMode writeMode) {
this.retryTimes = retryTimes; this.retryTimes = retryTimes;
this.retrySleepMills = retrySleepMills; this.retrySleepMills = retrySleepMills;
this.writeMode = writeMode;
} }
} }

View File

@ -60,5 +60,10 @@ public class Neo4jWriter extends Writer {
this.neo4jClient.tryWrite(record); this.neo4jClient.tryWrite(record);
} }
} }
@Override
public boolean supportFailOver() {
return this.neo4jClient.supportFailOver();
}
} }
} }

View File

@ -13,7 +13,6 @@ public final class ConfigConstants {
public static final Long DEFAULT_MAX_CONNECTION_SECONDS = 30L; public static final Long DEFAULT_MAX_CONNECTION_SECONDS = 30L;
public static final Option<Integer> RETRY_TIMES = public static final Option<Integer> RETRY_TIMES =
Option.<Integer>builder() Option.<Integer>builder()
.key("retryTimes") .key("retryTimes")
@ -113,4 +112,11 @@ public final class ConfigConstants {
.defaultValue(1000) .defaultValue(1000)
.desc("max batch size") .desc("max batch size")
.build(); .build();
public static final Option<String> WRITE_MODE =
Option.<String>builder()
.key("writeMode")
.defaultValue(WriteMode.INSERT.name())
.desc("The flag is either insert or update, and if it is update, a retry is performed")
.build();
} }

View File

@ -0,0 +1,20 @@
package com.alibaba.datax.plugin.writer.neo4jwriter.config;
import java.util.Arrays;
import java.util.Optional;
/**
* @author fuyouj
*/
public enum WriteMode {
INSERT,
UPDATE;
public static Optional<WriteMode> from(String name){
return Arrays.stream(WriteMode.values())
.filter(e -> e.name().equalsIgnoreCase(name))
.findFirst();
}
}