mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 19:50:29 +08:00
neo4jwriter
This commit is contained in:
parent
f6d21f112d
commit
643c26ec0b
BIN
neo4jwriter/doc/benchmark.png
Normal file
BIN
neo4jwriter/doc/benchmark.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 466 KiB |
194
neo4jwriter/doc/neo4jwriter.md
Normal file
194
neo4jwriter/doc/neo4jwriter.md
Normal file
@ -0,0 +1,194 @@
|
|||||||
|
# DataX neo4jWriter 插件文档
|
||||||
|
|
||||||
|
## 功能简介
|
||||||
|
|
||||||
|
本目前市面上的neo4j 批量导入主要有Cypher Create,Load CSV,第三方或者官方提供的Batch Import。Load CSV支持节点10W级别一下,Batch Import 需要对数据库进行停机。要想实现不停机的数据写入,Cypher是最好的方式。
|
||||||
|
|
||||||
|
## 支持版本
|
||||||
|
|
||||||
|
支持Neo4j 4 和Neo4j 5,如果是Neo4j 3,需要自行将驱动降低至相对应的版本进行编译。
|
||||||
|
|
||||||
|
## 实现原理
|
||||||
|
|
||||||
|
将datax的数据转换成了neo4j驱动能识别的对象,利用 unwind 语法进行批量插入。
|
||||||
|
|
||||||
|
## 如何配置
|
||||||
|
|
||||||
|
### 配置项介绍
|
||||||
|
|
||||||
|
| 配置 | 说明 | 是否必须 | 默认值 | 示例 |
|
||||||
|
| :--------------------------------- | --------------------------- | -------- | ------ | ---------------------------------------------------- |
|
||||||
|
| database | 数据库名字 | 是 | - | neo4j |
|
||||||
|
| uri | 数据库访问链接 | 是 | - | bolt://localhost:7687 |
|
||||||
|
| username | 访问用户名 | 是 | - | neo4j |
|
||||||
|
| password | 访问密码 | 是 | - | neo4j |
|
||||||
|
| bearer_token | 权限相关 | 否 | - | - |
|
||||||
|
| kerberos_ticket | 权限相关 | 否 | - | - |
|
||||||
|
| cypher | 同步语句 | 是 | - | unwind $batch as row create(p) set p.name = row.name |
|
||||||
|
| batch_data_variable_name | unwind 携带的数据变量名 | | | batch |
|
||||||
|
| fields | 定义datax中数据的名字和类型 | 是 | - | 见后续案例 |
|
||||||
|
| batch_size | 一批写入数据量 | 否 | 1000 | |
|
||||||
|
| max_transaction_retry_time_seconds | 事务运行最长时间 | 否 | 30秒 | 30 |
|
||||||
|
| max_connection_timeout_seconds | 驱动最长链接时间 | 否 | 30秒 | 30 |
|
||||||
|
| retry_times | 发生错误的重试次数 | 否 | 3次 | 3 |
|
||||||
|
| retry_sleep_mills | 重试失败后的等待时间 | 否 | 3秒 | 3 |
|
||||||
|
|
||||||
|
### 支持的数据类型
|
||||||
|
|
||||||
|
```
|
||||||
|
BOOLEAN,
|
||||||
|
STRING,
|
||||||
|
LONG,
|
||||||
|
SHORT,
|
||||||
|
INTEGER,
|
||||||
|
DOUBLE,
|
||||||
|
FLOAT,
|
||||||
|
LOCAL_DATE,
|
||||||
|
LOCAL_TIME,
|
||||||
|
LOCAL_DATE_TIME,
|
||||||
|
LIST,
|
||||||
|
MAP,
|
||||||
|
CHAR_ARRAY,
|
||||||
|
BYTE_ARRAY,
|
||||||
|
BOOLEAN_ARRAY,
|
||||||
|
STRING_ARRAY,
|
||||||
|
LONG_ARRAY,
|
||||||
|
INT_ARRAY,
|
||||||
|
SHORT_ARRAY,
|
||||||
|
DOUBLE_ARRAY,
|
||||||
|
FLOAT_ARRAY,
|
||||||
|
Object_ARRAY
|
||||||
|
```
|
||||||
|
|
||||||
|
### 写节点
|
||||||
|
|
||||||
|
这里提供了一个写节点包含很多类型属性的例子。你可以在我的测试方法中运行。
|
||||||
|
|
||||||
|
```json
|
||||||
|
"writer": {
|
||||||
|
"name": "neo4jWriter",
|
||||||
|
"parameter": {
|
||||||
|
"uri": "neo4j://localhost:7687",
|
||||||
|
"username": "neo4j",
|
||||||
|
"password": "Test@12343",
|
||||||
|
"database": "neo4j",
|
||||||
|
"cypher": "unwind $batch as row create(p:Person) set p.pbool = row.pbool,p.pstring = row.pstring,p.plong = row.plong,p.pshort = row.pshort,p.pdouble=row.pdouble,p.pstringarr=row.pstringarr,p.plocaldate=row.plocaldate",
|
||||||
|
"batch_data_variable_name": "batch",
|
||||||
|
"batch_size": "33",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"fieldName": "pbool",
|
||||||
|
"fieldType": "BOOLEAN"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldName": "pstring",
|
||||||
|
"fieldType": "STRING"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldName": "plong",
|
||||||
|
"fieldType": "LONG"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldName": "pshort",
|
||||||
|
"fieldType": "SHORT"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldName": "pdouble",
|
||||||
|
"fieldType": "DOUBLE"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldName": "pstringarr",
|
||||||
|
"fieldType": "STRING_ARRAY",
|
||||||
|
"split": ",",
|
||||||
|
"arrayTrimChars": [
|
||||||
|
"[",
|
||||||
|
"]"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldName": "plocaldate",
|
||||||
|
"fieldType": "LOCAL_DATE",
|
||||||
|
"dateFormat": "yyyy-MM-dd"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 写关系
|
||||||
|
|
||||||
|
```json
|
||||||
|
"writer": {
|
||||||
|
"name": "neo4jWriter",
|
||||||
|
"parameter": {
|
||||||
|
"uri": "neo4j://localhost:7687",
|
||||||
|
"username": "neo4j",
|
||||||
|
"password": "Test@12343",
|
||||||
|
"database": "neo4j",
|
||||||
|
"cypher": "unwind $batch as row match(p1:Person) where p1.id = row.startNodeId match(p2:Person) where p2.id = row.endNodeId create (p1)-[:LINK]->(p2)",
|
||||||
|
"batch_data_variable_name": "batch",
|
||||||
|
"batch_size": "33",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"fieldName": "startNodeId",
|
||||||
|
"fieldType": "STRING"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldName": "endNodeId",
|
||||||
|
"fieldType": "STRING"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 节点/关系类型动态写
|
||||||
|
|
||||||
|
> 需要使用AOPC函数拓展,如果你的数据库没有,请安装APOC函数拓展
|
||||||
|
|
||||||
|
```json
|
||||||
|
"writer": {
|
||||||
|
"name": "neo4jWriter",
|
||||||
|
"parameter": {
|
||||||
|
"uri": "bolt://localhost:7687",
|
||||||
|
"username": "yourUserName",
|
||||||
|
"password": "yourPassword",
|
||||||
|
"database": "yourDataBase",
|
||||||
|
"cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.Label + '`{id:$id})' ,{id: row.id} ) YIELD value RETURN 1 ",
|
||||||
|
"batch_data_variable_name": "batch",
|
||||||
|
"batch_size": "1",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"fieldName": "Label",
|
||||||
|
"fieldType": "STRING"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldName": "id",
|
||||||
|
"fieldType": "STRING"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## 注意事项
|
||||||
|
|
||||||
|
* fields的定义需要与源端一一对应。如果源端的数据列少于neo4j字段怎么办?建议将源端数据加工成json格式,在neo4j端将数据类型设置成map。在cypher中,可以根据jsonpath语法一直取值。比如 unwind $batch as row create (p) set p.name = row.props.name,set p.age = row.props.age
|
||||||
|
* 如果提示事务超时,建议调大事务运行时间或者调小batch_size
|
||||||
|
* 如果用于更新场景,会遇到死锁问题,建议二开源码加入死锁异常检测,并进行重试,开源版本不提供此功能。
|
||||||
|
|
||||||
|
## 性能报告
|
||||||
|
|
||||||
|
**JVM参数**
|
||||||
|
|
||||||
|
16G G1垃圾收集器 8核心
|
||||||
|
|
||||||
|
**Neo4j数据库配置**
|
||||||
|
|
||||||
|
32核心,256G
|
||||||
|
|
||||||
|
**datax 配置**
|
||||||
|
|
||||||
|
Channel 20 batchsize = 1000
|
||||||
|
|
||||||
|

|
56
neo4jwriter/pom.xml
Normal file
56
neo4jwriter/pom.xml
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<parent>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>datax-all</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<artifactId>neo4jwriter</artifactId>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<maven.compiler.source>8</maven.compiler.source>
|
||||||
|
<maven.compiler.target>8</maven.compiler.target>
|
||||||
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
|
<neo4j-java-driver.version>4.4.9</neo4j-java-driver.version>
|
||||||
|
<junit4.version>4.13.2</junit4.version>
|
||||||
|
<test.container.version>1.17.6</test.container.version>
|
||||||
|
</properties>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>slf4j-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>ch.qos.logback</groupId>
|
||||||
|
<artifactId>logback-classic</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.neo4j.driver</groupId>
|
||||||
|
<artifactId>neo4j-java-driver</artifactId>
|
||||||
|
<version>${neo4j-java-driver.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>datax-common</artifactId>
|
||||||
|
<version>${datax-project-version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.testcontainers</groupId>
|
||||||
|
<artifactId>testcontainers</artifactId>
|
||||||
|
<version>${test.container.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<!-- Testcontainers 1.x is tightly coupled with the JUnit 4.x rule API-->
|
||||||
|
<dependency>
|
||||||
|
<groupId>junit</groupId>
|
||||||
|
<artifactId>junit</artifactId>
|
||||||
|
<version>${junit4.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
</project>
|
34
neo4jwriter/src/main/assembly/package.xml
Normal file
34
neo4jwriter/src/main/assembly/package.xml
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
<assembly
|
||||||
|
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
|
||||||
|
<id></id>
|
||||||
|
<formats>
|
||||||
|
<format>dir</format>
|
||||||
|
</formats>
|
||||||
|
<includeBaseDirectory>false</includeBaseDirectory>
|
||||||
|
<fileSets>
|
||||||
|
<fileSet>
|
||||||
|
<directory>src/main/resources</directory>
|
||||||
|
<includes>
|
||||||
|
<include>plugin.json</include>
|
||||||
|
</includes>
|
||||||
|
<outputDirectory>plugin/writer/neo4jwriter</outputDirectory>
|
||||||
|
</fileSet>
|
||||||
|
<fileSet>
|
||||||
|
<directory>target/</directory>
|
||||||
|
<includes>
|
||||||
|
<include>neo4jwriter-0.0.1-SNAPSHOT.jar</include>
|
||||||
|
</includes>
|
||||||
|
<outputDirectory>plugin/writer/neo4jwriter</outputDirectory>
|
||||||
|
</fileSet>
|
||||||
|
</fileSets>
|
||||||
|
|
||||||
|
<dependencySets>
|
||||||
|
<dependencySet>
|
||||||
|
<useProjectArtifact>false</useProjectArtifact>
|
||||||
|
<outputDirectory>plugin/writer/neo4jwriter/libs</outputDirectory>
|
||||||
|
<scope>runtime</scope>
|
||||||
|
</dependencySet>
|
||||||
|
</dependencySets>
|
||||||
|
</assembly>
|
@ -0,0 +1,274 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.neo4jwriter;
|
||||||
|
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.element.Column;
|
||||||
|
import com.alibaba.datax.common.element.Record;
|
||||||
|
import com.alibaba.datax.common.exception.DataXException;
|
||||||
|
import com.alibaba.datax.common.plugin.TaskPluginCollector;
|
||||||
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
|
import com.alibaba.datax.common.util.RetryUtil;
|
||||||
|
import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.ValueAdapter;
|
||||||
|
import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jField;
|
||||||
|
import com.alibaba.datax.plugin.writer.neo4jwriter.exception.Neo4jErrorCode;
|
||||||
|
import com.alibaba.fastjson2.JSON;
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.neo4j.driver.*;
|
||||||
|
import org.neo4j.driver.exceptions.Neo4jException;
|
||||||
|
import org.neo4j.driver.internal.value.MapValue;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static com.alibaba.datax.plugin.writer.neo4jwriter.config.ConfigConstants.*;
|
||||||
|
import static com.alibaba.datax.plugin.writer.neo4jwriter.exception.Neo4jErrorCode.DATABASE_ERROR;
|
||||||
|
|
||||||
|
public class Neo4jClient {
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jClient.class);
|
||||||
|
private Driver driver;
|
||||||
|
|
||||||
|
private WriteConfig writeConfig;
|
||||||
|
private RetryConfig retryConfig;
|
||||||
|
private TaskPluginCollector taskPluginCollector;
|
||||||
|
|
||||||
|
private Session session;
|
||||||
|
|
||||||
|
private List<MapValue> writerBuffer;
|
||||||
|
|
||||||
|
|
||||||
|
public Neo4jClient(Driver driver,
|
||||||
|
WriteConfig writeConfig,
|
||||||
|
RetryConfig retryConfig,
|
||||||
|
TaskPluginCollector taskPluginCollector) {
|
||||||
|
this.driver = driver;
|
||||||
|
this.writeConfig = writeConfig;
|
||||||
|
this.retryConfig = retryConfig;
|
||||||
|
this.taskPluginCollector = taskPluginCollector;
|
||||||
|
this.writerBuffer = new ArrayList<>(writeConfig.batchSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void init() {
|
||||||
|
String database = writeConfig.database;
|
||||||
|
//neo4j 3.x 没有数据库
|
||||||
|
//neo4j 3.x no database
|
||||||
|
if (null != database && !"".equals(database)) {
|
||||||
|
this.session = driver.session(SessionConfig.forDatabase(database));
|
||||||
|
} else {
|
||||||
|
this.session = driver.session();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Neo4jClient build(Configuration config, TaskPluginCollector taskPluginCollector) {
|
||||||
|
|
||||||
|
Driver driver = buildNeo4jDriver(config);
|
||||||
|
String cypher = checkCypher(config);
|
||||||
|
String database = config.getString(DATABASE.getKey());
|
||||||
|
String batchVariableName = config.getString(BATCH_DATA_VARIABLE_NAME.getKey(),
|
||||||
|
BATCH_DATA_VARIABLE_NAME.getDefaultValue());
|
||||||
|
List<Neo4jField> neo4jFields = JSON.parseArray(config.getString(NEO4J_FIELDS.getKey()), Neo4jField.class);
|
||||||
|
int batchSize = config.getInt(BATCH_SIZE.getKey(), BATCH_SIZE.getDefaultValue());
|
||||||
|
int retryTimes = config.getInt(RETRY_TIMES.getKey(), RETRY_TIMES.getDefaultValue());
|
||||||
|
|
||||||
|
return new Neo4jClient(driver,
|
||||||
|
new WriteConfig(cypher, database, batchVariableName, neo4jFields, batchSize),
|
||||||
|
new RetryConfig(retryTimes, config.getLong(RETRY_SLEEP_MILLS.getKey(), RETRY_SLEEP_MILLS.getDefaultValue())),
|
||||||
|
taskPluginCollector
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String checkCypher(Configuration config) {
|
||||||
|
String cypher = config.getString(CYPHER.getKey());
|
||||||
|
if (StringUtils.isBlank(cypher)) {
|
||||||
|
throw DataXException.asDataXException(Neo4jErrorCode.CONFIG_INVALID, "cypher must not null or empty");
|
||||||
|
}
|
||||||
|
return cypher;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Driver buildNeo4jDriver(Configuration config) {
|
||||||
|
|
||||||
|
Config.ConfigBuilder configBuilder = Config.builder().withMaxConnectionPoolSize(1);
|
||||||
|
String uri = checkUriConfig(config);
|
||||||
|
|
||||||
|
//connection timeout
|
||||||
|
//连接超时时间
|
||||||
|
Long maxConnTime = config.getLong(MAX_CONNECTION_TIMEOUT_SECONDS.getKey());
|
||||||
|
if (maxConnTime != null && maxConnTime > 0) {
|
||||||
|
configBuilder
|
||||||
|
.withConnectionAcquisitionTimeout(
|
||||||
|
maxConnTime * 2, TimeUnit.SECONDS)
|
||||||
|
.withConnectionTimeout(maxConnTime, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
//transaction timeout
|
||||||
|
//事务运行超时时间
|
||||||
|
Long txRetryTime = config.getLong(MAX_TRANSACTION_RETRY_TIME.getKey());
|
||||||
|
if (txRetryTime != null && txRetryTime > 0) {
|
||||||
|
configBuilder.withMaxTransactionRetryTime(
|
||||||
|
txRetryTime, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
String username = config.getString(USERNAME.getKey());
|
||||||
|
String password = config.getString(PASSWORD.getKey());
|
||||||
|
String bearerToken = config.getString(BEARER_TOKEN.getKey());
|
||||||
|
String kerberosTicket = config.getString(KERBEROS_TICKET.getKey());
|
||||||
|
|
||||||
|
if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
|
||||||
|
|
||||||
|
return GraphDatabase.driver(uri, AuthTokens.basic(username, password), configBuilder.build());
|
||||||
|
|
||||||
|
} else if (StringUtils.isNotBlank(bearerToken)) {
|
||||||
|
|
||||||
|
return GraphDatabase.driver(uri, AuthTokens.bearer(bearerToken), configBuilder.build());
|
||||||
|
|
||||||
|
} else if (StringUtils.isNotBlank(kerberosTicket)) {
|
||||||
|
|
||||||
|
return GraphDatabase.driver(uri, AuthTokens.kerberos(kerberosTicket), configBuilder.build());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
throw DataXException.asDataXException(Neo4jErrorCode.CONFIG_INVALID, "Invalid Auth config.");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String checkUriConfig(Configuration config) {
|
||||||
|
String uri = config.getString(URI.getKey());
|
||||||
|
if (null == uri || uri.length() == 0) {
|
||||||
|
throw DataXException.asDataXException(Neo4jErrorCode.CONFIG_INVALID, "Invalid uri configuration");
|
||||||
|
}
|
||||||
|
return uri;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void destroy() {
|
||||||
|
tryFlushBuffer();
|
||||||
|
if (driver != null) {
|
||||||
|
driver.close();
|
||||||
|
}
|
||||||
|
if (session != null) {
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void tryFlushBuffer() {
|
||||||
|
if (!writerBuffer.isEmpty()) {
|
||||||
|
doWrite(writerBuffer);
|
||||||
|
writerBuffer.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void tryBatchWrite() {
|
||||||
|
if (!writerBuffer.isEmpty() && writerBuffer.size() >= writeConfig.batchSize) {
|
||||||
|
doWrite(writerBuffer);
|
||||||
|
writerBuffer.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doWrite(List<MapValue> values) {
|
||||||
|
Value batchValues = Values.parameters(this.writeConfig.batchVariableName, values);
|
||||||
|
Query query = new Query(this.writeConfig.cypher, batchValues);
|
||||||
|
LOGGER.debug("query:{}", query.text());
|
||||||
|
LOGGER.debug("batch:{}", toUnwindStr(values));
|
||||||
|
try {
|
||||||
|
RetryUtil.executeWithRetry(() -> {
|
||||||
|
session.writeTransaction(tx -> tx.run(query));
|
||||||
|
return null;
|
||||||
|
}, this.retryConfig.retryTimes, retryConfig.retrySleepMills, true,
|
||||||
|
Collections.singletonList(Neo4jException.class));
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error("在写入数据库时发生了异常,原因是:{}", e.getMessage());
|
||||||
|
LOGGER.error("an exception occurred while writing to the database,message:{}", e.getMessage());
|
||||||
|
throw DataXException.asDataXException(DATABASE_ERROR, e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private String toUnwindStr(List<MapValue> values) {
|
||||||
|
StringJoiner joiner = new StringJoiner(",");
|
||||||
|
for (MapValue value : values) {
|
||||||
|
joiner.add(value.toString());
|
||||||
|
}
|
||||||
|
return "[" + joiner + "]";
|
||||||
|
}
|
||||||
|
|
||||||
|
public void tryWrite(Record record) {
|
||||||
|
MapValue neo4jValue = checkAndConvert(record);
|
||||||
|
writerBuffer.add(neo4jValue);
|
||||||
|
tryBatchWrite();
|
||||||
|
}
|
||||||
|
|
||||||
|
private MapValue checkAndConvert(Record record) {
|
||||||
|
int sourceColNum = record.getColumnNumber();
|
||||||
|
List<Neo4jField> neo4jFields = writeConfig.neo4jFields;
|
||||||
|
if (sourceColNum < neo4jFields.size()) {
|
||||||
|
LOGGER.warn("接收到的数据列少于neo4jWriter企图消费的数据列,请注意风险,这可能导致数据不匹配");
|
||||||
|
LOGGER.warn("Receive fewer data columns than neo4jWriter attempts to consume, " +
|
||||||
|
"be aware of the risk that this may result in data mismatch");
|
||||||
|
LOGGER.warn("接受到的数据是:" + record);
|
||||||
|
LOGGER.warn("received data is:" + record);
|
||||||
|
}
|
||||||
|
|
||||||
|
int len = Math.min(sourceColNum, neo4jFields.size());
|
||||||
|
Map<String, Value> data = new HashMap<>(len * 4 / 3);
|
||||||
|
for (int i = 0; i < len; i++) {
|
||||||
|
Column column = record.getColumn(i);
|
||||||
|
Neo4jField neo4jField = neo4jFields.get(i);
|
||||||
|
try {
|
||||||
|
|
||||||
|
Value value = ValueAdapter.column2Value(column, neo4jField);
|
||||||
|
data.put(neo4jField.getFieldName(), value);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.info("检测到一条脏数据:{},原因:{}", column, e.getMessage());
|
||||||
|
LOGGER.info("dirty record:{},message :{}", column, e.getMessage());
|
||||||
|
this.taskPluginCollector.collectDirtyRecord(record, e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new MapValue(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
//todo 精细化处理数据库异常
|
||||||
|
private void processNeo4jException(Neo4jException e) {
|
||||||
|
//Neo.ClientError.Statement.SyntaxError cypher语句错误
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Neo4jField> getNeo4jFields() {
|
||||||
|
return this.writeConfig.neo4jFields;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static class RetryConfig {
|
||||||
|
int retryTimes;
|
||||||
|
long retrySleepMills;
|
||||||
|
|
||||||
|
RetryConfig(int retryTimes, long retrySleepMills) {
|
||||||
|
this.retryTimes = retryTimes;
|
||||||
|
this.retrySleepMills = retrySleepMills;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class WriteConfig {
|
||||||
|
String cypher;
|
||||||
|
|
||||||
|
String database;
|
||||||
|
|
||||||
|
String batchVariableName;
|
||||||
|
|
||||||
|
List<Neo4jField> neo4jFields;
|
||||||
|
|
||||||
|
int batchSize;
|
||||||
|
|
||||||
|
public WriteConfig(String cypher,
|
||||||
|
String database,
|
||||||
|
String batchVariableName,
|
||||||
|
List<Neo4jField> neo4jFields,
|
||||||
|
int batchSize) {
|
||||||
|
this.cypher = cypher;
|
||||||
|
this.database = database;
|
||||||
|
this.batchVariableName = batchVariableName;
|
||||||
|
this.neo4jFields = neo4jFields;
|
||||||
|
this.batchSize = batchSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,63 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.neo4jwriter;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||||
|
import com.alibaba.datax.common.spi.Writer;
|
||||||
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
|
import com.alibaba.datax.common.element.Record;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class Neo4jWriter extends Writer {
|
||||||
|
public static class Job extends Writer.Job {
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(Job.class);
|
||||||
|
|
||||||
|
private Configuration jobConf = null;
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Configuration> split(int mandatoryNumber) {
|
||||||
|
List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
|
||||||
|
for (int i = 0; i < mandatoryNumber; i++) {
|
||||||
|
configurations.add(this.jobConf.clone());
|
||||||
|
}
|
||||||
|
return configurations;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Task extends Writer.Task {
|
||||||
|
private static final Logger TASK_LOGGER = LoggerFactory.getLogger(Task.class);
|
||||||
|
private Neo4jClient neo4jClient;
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
Configuration taskConf = super.getPluginJobConf();
|
||||||
|
this.neo4jClient = Neo4jClient.build(taskConf,getTaskPluginCollector());
|
||||||
|
this.neo4jClient.init();
|
||||||
|
TASK_LOGGER.info("neo4j writer task init success.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
this.neo4jClient.destroy();
|
||||||
|
TASK_LOGGER.info("neo4j writer task destroyed.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void startWrite(RecordReceiver receiver) {
|
||||||
|
Record record;
|
||||||
|
while ((record = receiver.getFromReader()) != null){
|
||||||
|
this.neo4jClient.tryWrite(record);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,70 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.neo4jwriter.adapter;
|
||||||
|
|
||||||
|
|
||||||
|
import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jField;
|
||||||
|
import org.testcontainers.shaded.com.google.common.base.Supplier;
|
||||||
|
|
||||||
|
import java.time.LocalDate;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.LocalTime;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author fuyouj
|
||||||
|
*/
|
||||||
|
public class DateAdapter {
|
||||||
|
private static final ThreadLocal<DateTimeFormatter> LOCAL_DATE_FORMATTER_MAP = new ThreadLocal<>();
|
||||||
|
private static final ThreadLocal<DateTimeFormatter> LOCAL_TIME_FORMATTER_MAP = new ThreadLocal<>();
|
||||||
|
private static final ThreadLocal<DateTimeFormatter> LOCAL_DATE_TIME_FORMATTER_MAP = new ThreadLocal<>();
|
||||||
|
private static final String DEFAULT_LOCAL_DATE_FORMATTER = "yyyy-MM-dd";
|
||||||
|
private static final String DEFAULT_LOCAL_TIME_FORMATTER = "HH:mm:ss";
|
||||||
|
private static final String DEFAULT_LOCAL_DATE_TIME_FORMATTER = "yyyy-MM-dd HH:mm:ss";
|
||||||
|
|
||||||
|
|
||||||
|
public static LocalDate localDate(String text, Neo4jField neo4jField) {
|
||||||
|
if (LOCAL_DATE_FORMATTER_MAP.get() != null) {
|
||||||
|
return LocalDate.parse(text, LOCAL_DATE_FORMATTER_MAP.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
String format = getOrDefault(neo4jField::getDateFormat, DEFAULT_LOCAL_DATE_FORMATTER);
|
||||||
|
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(format);
|
||||||
|
LOCAL_DATE_FORMATTER_MAP.set(dateTimeFormatter);
|
||||||
|
return LocalDate.parse(text, dateTimeFormatter);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getOrDefault(Supplier<String> dateFormat, String defaultFormat) {
|
||||||
|
String format = dateFormat.get();
|
||||||
|
if (null == format || "".equals(format)) {
|
||||||
|
return defaultFormat;
|
||||||
|
} else {
|
||||||
|
return format;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void destroy() {
|
||||||
|
LOCAL_DATE_FORMATTER_MAP.remove();
|
||||||
|
LOCAL_TIME_FORMATTER_MAP.remove();
|
||||||
|
LOCAL_DATE_TIME_FORMATTER_MAP.remove();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static LocalTime localTime(String text, Neo4jField neo4jField) {
|
||||||
|
if (LOCAL_TIME_FORMATTER_MAP.get() != null) {
|
||||||
|
return LocalTime.parse(text, LOCAL_TIME_FORMATTER_MAP.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
String format = getOrDefault(neo4jField::getDateFormat, DEFAULT_LOCAL_TIME_FORMATTER);
|
||||||
|
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(format);
|
||||||
|
LOCAL_TIME_FORMATTER_MAP.set(dateTimeFormatter);
|
||||||
|
return LocalTime.parse(text, dateTimeFormatter);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static LocalDateTime localDateTime(String text, Neo4jField neo4jField) {
|
||||||
|
if (LOCAL_DATE_TIME_FORMATTER_MAP.get() != null){
|
||||||
|
return LocalDateTime.parse(text,LOCAL_DATE_TIME_FORMATTER_MAP.get());
|
||||||
|
}
|
||||||
|
String format = getOrDefault(neo4jField::getDateFormat, DEFAULT_LOCAL_DATE_TIME_FORMATTER);
|
||||||
|
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(format);
|
||||||
|
LOCAL_DATE_TIME_FORMATTER_MAP.set(dateTimeFormatter);
|
||||||
|
return LocalDateTime.parse(text, dateTimeFormatter);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,114 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.neo4jwriter.adapter;
|
||||||
|
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.element.Column;
|
||||||
|
import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jField;
|
||||||
|
import com.alibaba.datax.plugin.writer.neo4jwriter.element.FieldType;
|
||||||
|
import com.alibaba.fastjson2.JSON;
|
||||||
|
import org.neo4j.driver.Value;
|
||||||
|
import org.neo4j.driver.Values;
|
||||||
|
import org.neo4j.driver.internal.value.NullValue;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author fuyouj
|
||||||
|
*/
|
||||||
|
public class ValueAdapter {
|
||||||
|
|
||||||
|
|
||||||
|
public static Value column2Value(final Column column, final Neo4jField neo4jField) {
|
||||||
|
FieldType type = neo4jField.getFieldType();
|
||||||
|
switch (type) {
|
||||||
|
case NULL:
|
||||||
|
return NullValue.NULL;
|
||||||
|
case MAP:
|
||||||
|
return Values.value(JSON.parseObject(column.asString(),Map.class));
|
||||||
|
case BOOLEAN:
|
||||||
|
return Values.value(column.asBoolean());
|
||||||
|
case STRING:
|
||||||
|
return Values.value(column.asString());
|
||||||
|
case INTEGER:
|
||||||
|
case LONG:
|
||||||
|
return Values.value(column.asLong());
|
||||||
|
case SHORT:
|
||||||
|
return Values.value(Short.valueOf(column.asString()));
|
||||||
|
case FLOAT:
|
||||||
|
case DOUBLE:
|
||||||
|
return Values.value(column.asDouble());
|
||||||
|
case BYTE_ARRAY:
|
||||||
|
return Values.value(parseArrayType(neo4jField, column.asString(), Byte::valueOf));
|
||||||
|
case CHAR_ARRAY:
|
||||||
|
return Values.value(parseArrayType(neo4jField, column.asString(), (s) -> s.charAt(0)));
|
||||||
|
case BOOLEAN_ARRAY:
|
||||||
|
return Values.value(parseArrayType(neo4jField, column.asString(), Boolean::valueOf));
|
||||||
|
case STRING_ARRAY:
|
||||||
|
case Object_ARRAY:
|
||||||
|
case LIST:
|
||||||
|
return Values.value(parseArrayType(neo4jField, column.asString(), Function.identity()));
|
||||||
|
case LONG_ARRAY:
|
||||||
|
return Values.value(parseArrayType(neo4jField, column.asString(), Long::valueOf));
|
||||||
|
case INT_ARRAY:
|
||||||
|
return Values.value(parseArrayType(neo4jField, column.asString(), Integer::valueOf));
|
||||||
|
case SHORT_ARRAY:
|
||||||
|
return Values.value(parseArrayType(neo4jField, column.asString(), Short::valueOf));
|
||||||
|
case DOUBLE_ARRAY:
|
||||||
|
case FLOAT_ARRAY:
|
||||||
|
return Values.value(parseArrayType(neo4jField, column.asString(), Double::valueOf));
|
||||||
|
case LOCAL_DATE:
|
||||||
|
return Values.value(DateAdapter.localDate(column.asString(), neo4jField));
|
||||||
|
case LOCAL_TIME:
|
||||||
|
return Values.value(DateAdapter.localTime(column.asString(),neo4jField));
|
||||||
|
case LOCAL_DATE_TIME:
|
||||||
|
return Values.value(DateAdapter.localDateTime(column.asString(),neo4jField));
|
||||||
|
default:
|
||||||
|
return Values.value(column.getRawData());
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static <R> List<R> parseArrayType(final Neo4jField neo4jField,
|
||||||
|
final String strValue,
|
||||||
|
final Function<String, R> convertFunc) {
|
||||||
|
if (null == strValue || "".equals(strValue)) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
String trimStr = trimString(strValue, neo4jField.getArrayTrimOrDefault());
|
||||||
|
if ("".equals(trimStr)) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
String[] strArr = trimStr.split(neo4jField.getSplitOrDefault());
|
||||||
|
List<R> ans = new ArrayList<>();
|
||||||
|
for (String s : strArr) {
|
||||||
|
ans.add(convertFunc.apply(s));
|
||||||
|
}
|
||||||
|
return ans;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String trimString(String strValue, List<Character> trimChars) {
|
||||||
|
|
||||||
|
Set<Character> characters = new HashSet<>(trimChars);
|
||||||
|
char[] chars = strValue.toCharArray();
|
||||||
|
int i = 0;
|
||||||
|
int j = chars.length - 1;
|
||||||
|
|
||||||
|
while (i <= chars.length - 1 && characters.contains(chars[i])) {
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
while (j >= i && characters.contains(chars[j])) {
|
||||||
|
j--;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i > j) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i == j) {
|
||||||
|
return String.valueOf(chars[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
return new String(Arrays.copyOfRange(chars, i, j + 1));
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,116 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.neo4jwriter.config;
|
||||||
|
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author fuyouj
|
||||||
|
*/
|
||||||
|
public final class ConfigConstants {
|
||||||
|
|
||||||
|
public static final Long DEFAULT_MAX_TRANSACTION_RETRY_SECONDS = 30L;
|
||||||
|
|
||||||
|
public static final Long DEFAULT_MAX_CONNECTION_SECONDS = 30L;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public static final Option<Integer> RETRY_TIMES =
|
||||||
|
Option.<Integer>builder()
|
||||||
|
.key("retry_times")
|
||||||
|
.defaultValue(3)
|
||||||
|
.desc("The number of overwrites when an error occurs")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Option<Long> RETRY_SLEEP_MILLS =
|
||||||
|
Option.<Long>builder()
|
||||||
|
.key("retry_sleep_mills")
|
||||||
|
.defaultValue(3000L)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* cluster mode please reference
|
||||||
|
* <a href="https://neo4j.com/docs/java-manual/current/client-applications/">how to connect cluster mode</a>
|
||||||
|
*/
|
||||||
|
public static final Option<String> URI =
|
||||||
|
Option.<String>builder()
|
||||||
|
.key("uri")
|
||||||
|
.noDefaultValue()
|
||||||
|
.desc("uir of neo4j database")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Option<String> USERNAME =
|
||||||
|
Option.<String>builder()
|
||||||
|
.key("username")
|
||||||
|
.noDefaultValue()
|
||||||
|
.desc("username for accessing the neo4j database")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Option<String> PASSWORD =
|
||||||
|
Option.<String>builder()
|
||||||
|
.key("password")
|
||||||
|
.noDefaultValue()
|
||||||
|
.desc("password for accessing the neo4j database")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Option<String> BEARER_TOKEN =
|
||||||
|
Option.<String>builder()
|
||||||
|
.key("bearer_token")
|
||||||
|
.noDefaultValue()
|
||||||
|
.desc("base64 encoded bearer token of the Neo4j. for Auth.")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Option<String> KERBEROS_TICKET =
|
||||||
|
Option.<String>builder()
|
||||||
|
.key("kerberos_ticket")
|
||||||
|
.noDefaultValue()
|
||||||
|
.desc("base64 encoded kerberos ticket of the Neo4j. for Auth.")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Option<String> DATABASE =
|
||||||
|
Option.<String>builder()
|
||||||
|
.key("database")
|
||||||
|
.noDefaultValue()
|
||||||
|
.desc("database name.")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Option<String> CYPHER =
|
||||||
|
Option.<String>builder()
|
||||||
|
.key("cypher")
|
||||||
|
.noDefaultValue()
|
||||||
|
.desc("cypher query.")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Option<Long> MAX_TRANSACTION_RETRY_TIME =
|
||||||
|
Option.<Long>builder()
|
||||||
|
.key("max_transaction_retry_time_seconds")
|
||||||
|
.defaultValue(DEFAULT_MAX_TRANSACTION_RETRY_SECONDS)
|
||||||
|
.desc("maximum transaction retry time(seconds). transaction fail if exceeded.")
|
||||||
|
.build();
|
||||||
|
public static final Option<Long> MAX_CONNECTION_TIMEOUT_SECONDS =
|
||||||
|
Option.<Long>builder()
|
||||||
|
.key("max_connection_timeout_seconds")
|
||||||
|
.defaultValue(DEFAULT_MAX_CONNECTION_SECONDS)
|
||||||
|
.desc("The maximum amount of time to wait for a TCP connection to be established (seconds).")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Option<String> BATCH_DATA_VARIABLE_NAME =
|
||||||
|
Option.<String>builder()
|
||||||
|
.key("batch_data_variable_name")
|
||||||
|
.defaultValue("batch")
|
||||||
|
.desc("In a cypher statement, a variable name that represents a batch of data")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Option<List<Neo4jField>> NEO4J_FIELDS =
|
||||||
|
Option.<List<Neo4jField>>builder()
|
||||||
|
.key("fields")
|
||||||
|
.noDefaultValue()
|
||||||
|
.desc("neo4j fields.")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Option<Integer> BATCH_SIZE =
|
||||||
|
Option.<Integer>builder().
|
||||||
|
key("batch_size")
|
||||||
|
.defaultValue(1000)
|
||||||
|
.desc("max batch size")
|
||||||
|
.build();
|
||||||
|
}
|
@ -0,0 +1,108 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.neo4jwriter.config;
|
||||||
|
|
||||||
|
import com.alibaba.datax.plugin.writer.neo4jwriter.element.FieldType;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 由于dataX并不能传输数据的元数据,所以只能在writer端定义每列数据的名字
|
||||||
|
* datax does not support data metadata,
|
||||||
|
* only the name of each column of data can be defined on neo4j writer
|
||||||
|
* @author fuyouj
|
||||||
|
*/
|
||||||
|
public class Neo4jField {
|
||||||
|
public static final String DEFAULT_SPLIT = ",";
|
||||||
|
public static final List<Character> DEFAULT_ARRAY_TRIM = Arrays.asList('[',']');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* name of neo4j field
|
||||||
|
*/
|
||||||
|
private String fieldName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* neo4j type
|
||||||
|
* reference by org.neo4j.driver.Values
|
||||||
|
*/
|
||||||
|
private FieldType fieldType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* for date
|
||||||
|
*/
|
||||||
|
private String dateFormat;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* for array type
|
||||||
|
*/
|
||||||
|
private String split;
|
||||||
|
/**
|
||||||
|
* such as [1,2,3,4,5]
|
||||||
|
* split is ,
|
||||||
|
* arrayTrimChar is [ ]
|
||||||
|
*/
|
||||||
|
private List<Character> arrayTrimChars;
|
||||||
|
|
||||||
|
public Neo4jField(){}
|
||||||
|
|
||||||
|
public Neo4jField(String fieldName, FieldType fieldType, String format, String split, List<Character> arrayTrimChars) {
|
||||||
|
this.fieldName = fieldName;
|
||||||
|
this.fieldType = fieldType;
|
||||||
|
this.dateFormat = format;
|
||||||
|
this.split = split;
|
||||||
|
this.arrayTrimChars = arrayTrimChars;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getFieldName() {
|
||||||
|
return fieldName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setFieldName(String fieldName) {
|
||||||
|
this.fieldName = fieldName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FieldType getFieldType() {
|
||||||
|
return fieldType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setFieldType(FieldType fieldType) {
|
||||||
|
this.fieldType = fieldType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDateFormat() {
|
||||||
|
return dateFormat;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDateFormat(String dateFormat) {
|
||||||
|
this.dateFormat = dateFormat;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSplit() {
|
||||||
|
return getSplitOrDefault();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSplitOrDefault(){
|
||||||
|
if (split == null || "".equals(split)){
|
||||||
|
return DEFAULT_SPLIT;
|
||||||
|
}
|
||||||
|
return split;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSplit(String split) {
|
||||||
|
this.split = split;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Character> getArrayTrimChars() {
|
||||||
|
return getArrayTrimOrDefault();
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Character> getArrayTrimOrDefault(){
|
||||||
|
if (arrayTrimChars == null || arrayTrimChars.isEmpty()){
|
||||||
|
return DEFAULT_ARRAY_TRIM;
|
||||||
|
}
|
||||||
|
return arrayTrimChars;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setArrayTrimChars(List<Character> arrayTrimChars) {
|
||||||
|
this.arrayTrimChars = arrayTrimChars;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,65 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.neo4jwriter.config;
|
||||||
|
|
||||||
|
|
||||||
|
public class Option<T> {
|
||||||
|
|
||||||
|
public static class Builder<T> {
|
||||||
|
private String key;
|
||||||
|
private String desc;
|
||||||
|
|
||||||
|
private T defaultValue;
|
||||||
|
|
||||||
|
public Builder<T> key(String key) {
|
||||||
|
this.key = key;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder<T> desc(String desc) {
|
||||||
|
this.desc = desc;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder<T> defaultValue(T defaultValue) {
|
||||||
|
this.defaultValue = defaultValue;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder<T> noDefaultValue() {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Option<T> build() {
|
||||||
|
return new Option<>(this.key, this.desc, this.defaultValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final String key;
|
||||||
|
private final String desc;
|
||||||
|
|
||||||
|
private final T defaultValue;
|
||||||
|
|
||||||
|
public Option(String key, String desc, T defaultValue) {
|
||||||
|
this.key = key;
|
||||||
|
this.desc = desc;
|
||||||
|
this.defaultValue = defaultValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> Builder<T> builder(){
|
||||||
|
return new Builder<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getKey() {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDesc() {
|
||||||
|
return desc;
|
||||||
|
}
|
||||||
|
|
||||||
|
public T getDefaultValue() {
|
||||||
|
if (defaultValue == null){
|
||||||
|
throw new IllegalStateException(key + ":defaultValue is null");
|
||||||
|
}
|
||||||
|
return defaultValue;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,32 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.neo4jwriter.element;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see org.neo4j.driver.Values
|
||||||
|
* @author fuyouj
|
||||||
|
*/
|
||||||
|
public enum FieldType {
|
||||||
|
NULL,
|
||||||
|
BOOLEAN,
|
||||||
|
STRING,
|
||||||
|
LONG,
|
||||||
|
SHORT,
|
||||||
|
INTEGER,
|
||||||
|
DOUBLE,
|
||||||
|
FLOAT,
|
||||||
|
LOCAL_DATE,
|
||||||
|
LOCAL_TIME,
|
||||||
|
LOCAL_DATE_TIME,
|
||||||
|
LIST,
|
||||||
|
MAP,
|
||||||
|
CHAR_ARRAY,
|
||||||
|
BYTE_ARRAY,
|
||||||
|
BOOLEAN_ARRAY,
|
||||||
|
STRING_ARRAY,
|
||||||
|
LONG_ARRAY,
|
||||||
|
INT_ARRAY,
|
||||||
|
SHORT_ARRAY,
|
||||||
|
DOUBLE_ARRAY,
|
||||||
|
FLOAT_ARRAY,
|
||||||
|
Object_ARRAY
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,29 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.neo4jwriter.element;
|
||||||
|
|
||||||
|
import org.neo4j.driver.Record;
|
||||||
|
import org.neo4j.driver.Value;
|
||||||
|
import org.neo4j.driver.internal.AsValue;
|
||||||
|
import org.neo4j.driver.internal.value.MapValue;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 一般来说,我们会将一批对象转换成hashmap再传输给neo4j的驱动用作参数解析,驱动会将hashmap转换成org.neo4j.driver.Value
|
||||||
|
* 过程是:List[domain] -> List[map]->List[Value]
|
||||||
|
* 直接将Record实现AsValue接口,有1个好处:
|
||||||
|
* 减少了一次对象转换次数,List[domain] -> List[Value]
|
||||||
|
*/
|
||||||
|
public class Neo4jRecord implements AsValue {
|
||||||
|
|
||||||
|
private MapValue mapValue;
|
||||||
|
|
||||||
|
public Neo4jRecord(Record record, List<String> columnNames) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Value asValue() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,37 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.neo4jwriter.exception;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.spi.ErrorCode;
|
||||||
|
|
||||||
|
|
||||||
|
public enum Neo4jErrorCode implements ErrorCode {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invalid configuration
|
||||||
|
* 配置校验异常
|
||||||
|
*/
|
||||||
|
CONFIG_INVALID("NEO4J_ERROR_01","Invalid configuration"),
|
||||||
|
/**
|
||||||
|
* database error
|
||||||
|
* 在执行写入到数据库时抛出的异常,可能是权限异常,也可能是连接超时,或者是配置到了从节点。
|
||||||
|
* 如果是更新操作,还会有死锁异常。具体原因根据报错信息确定,但是这与dataX无关。
|
||||||
|
*/
|
||||||
|
DATABASE_ERROR("NEO4J_ERROR_02","database error");
|
||||||
|
|
||||||
|
private final String code;
|
||||||
|
private final String description;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getCode() {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDescription() {
|
||||||
|
return description;
|
||||||
|
}
|
||||||
|
|
||||||
|
Neo4jErrorCode(String code, String description) {
|
||||||
|
this.code = code;
|
||||||
|
this.description = description;
|
||||||
|
}
|
||||||
|
}
|
6
neo4jwriter/src/main/resources/plugin.json
Normal file
6
neo4jwriter/src/main/resources/plugin.json
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
{
|
||||||
|
"name": "neo4jWriter",
|
||||||
|
"class": "com.alibaba.datax.plugin.writer.neo4jwriter.Neo4jWriter",
|
||||||
|
"description": "dataX neo4j 写插件",
|
||||||
|
"developer": "付有杰"
|
||||||
|
}
|
@ -0,0 +1,81 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.element.Record;
|
||||||
|
import com.alibaba.datax.common.element.StringColumn;
|
||||||
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
|
import com.alibaba.datax.plugin.writer.mock.MockRecord;
|
||||||
|
import com.alibaba.datax.plugin.writer.neo4jwriter.Neo4jClient;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.neo4j.driver.*;
|
||||||
|
import org.neo4j.driver.types.Node;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 由于docker 镜像没有apoc函数,所以此测试只能本地搭建环境复现
|
||||||
|
*/
|
||||||
|
public class ApocTest {
|
||||||
|
/**
|
||||||
|
* neo4j中,Label和关系类型,想动态的写,需要借助于apoc函数
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void test_use_apoc_create_dynamic_label() {
|
||||||
|
try (Driver neo4jDriver = GraphDatabase.driver(
|
||||||
|
"bolt://localhost:7687",
|
||||||
|
AuthTokens.basic("yourUserName", "yourPassword"));
|
||||||
|
Session neo4jSession = neo4jDriver.session(SessionConfig.forDatabase("yourDataBase"))) {
|
||||||
|
List<String> dynamicLabel = new ArrayList<>();
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
dynamicLabel.add("Label" + i);
|
||||||
|
}
|
||||||
|
//删除原有数据
|
||||||
|
//remove test data if exist
|
||||||
|
//这种占位符的方式不支持批量动态写,当然可以使用union拼接,但是性能不好
|
||||||
|
String query = "match (p:%s) return p";
|
||||||
|
String delete = "match (p:%s) delete p";
|
||||||
|
for (String label : dynamicLabel) {
|
||||||
|
Result result = neo4jSession.run(String.format(query, label));
|
||||||
|
if (result.hasNext()) {
|
||||||
|
neo4jSession.run(String.format(delete, label));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Configuration configuration = Configuration.from(new File("src/test/resources/dynamicLabel.json"));
|
||||||
|
Neo4jClient neo4jClient = Neo4jClient.build(configuration, null);
|
||||||
|
|
||||||
|
neo4jClient.init();
|
||||||
|
for (int i = 0; i < dynamicLabel.size(); i++) {
|
||||||
|
Record record = new MockRecord();
|
||||||
|
record.addColumn(new StringColumn(dynamicLabel.get(i)));
|
||||||
|
record.addColumn(new StringColumn(String.valueOf(i)));
|
||||||
|
neo4jClient.tryWrite(record);
|
||||||
|
}
|
||||||
|
neo4jClient.destroy();
|
||||||
|
|
||||||
|
//校验脚本的批量写入是否正确
|
||||||
|
int cnt = 0;
|
||||||
|
for (int i = 0; i < dynamicLabel.size(); i++) {
|
||||||
|
String label = dynamicLabel.get(i);
|
||||||
|
Result result = neo4jSession.run(String.format(query, label));
|
||||||
|
while (result.hasNext()) {
|
||||||
|
org.neo4j.driver.Record record = result.next();
|
||||||
|
Node node = record.get("p").asNode();
|
||||||
|
assertTrue(node.hasLabel(label));
|
||||||
|
assertEquals(node.asMap().get("id"), i + "");
|
||||||
|
cnt++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(cnt, 100);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,252 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer;
|
||||||
|
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.element.Record;
|
||||||
|
import com.alibaba.datax.common.element.StringColumn;
|
||||||
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
|
import com.alibaba.datax.plugin.writer.mock.MockRecord;
|
||||||
|
import com.alibaba.datax.plugin.writer.mock.MockUtil;
|
||||||
|
import com.alibaba.datax.plugin.writer.neo4jwriter.Neo4jClient;
|
||||||
|
import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jField;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.neo4j.driver.*;
|
||||||
|
import org.neo4j.driver.types.Node;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.testcontainers.containers.GenericContainer;
|
||||||
|
import org.testcontainers.containers.Network;
|
||||||
|
import org.testcontainers.containers.output.Slf4jLogConsumer;
|
||||||
|
import org.testcontainers.lifecycle.Startables;
|
||||||
|
import org.testcontainers.shaded.org.awaitility.Awaitility;
|
||||||
|
import org.testcontainers.utility.DockerImageName;
|
||||||
|
import org.testcontainers.utility.DockerLoggerFactory;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
|
||||||
|
public class Neo4jWriterTest {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jWriterTest.class);
|
||||||
|
private static final int MOCK_NUM = 100;
|
||||||
|
private static final String CONTAINER_IMAGE = "neo4j:5.6.0";
|
||||||
|
|
||||||
|
private static final String CONTAINER_HOST = "neo4j-host";
|
||||||
|
private static final int HTTP_PORT = 7474;
|
||||||
|
private static final int BOLT_PORT = 7687;
|
||||||
|
private static final String CONTAINER_NEO4J_USERNAME = "neo4j";
|
||||||
|
private static final String CONTAINER_NEO4J_PASSWORD = "Test@12343";
|
||||||
|
private static final URI CONTAINER_URI = URI.create("neo4j://localhost:" + BOLT_PORT);
|
||||||
|
|
||||||
|
protected static final Network NETWORK = Network.newNetwork();
|
||||||
|
|
||||||
|
private GenericContainer<?> container;
|
||||||
|
private Driver neo4jDriver;
|
||||||
|
private Session neo4jSession;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void init() {
|
||||||
|
DockerImageName imageName = DockerImageName.parse(CONTAINER_IMAGE);
|
||||||
|
container =
|
||||||
|
new GenericContainer<>(imageName)
|
||||||
|
.withNetwork(NETWORK)
|
||||||
|
.withNetworkAliases(CONTAINER_HOST)
|
||||||
|
.withExposedPorts(HTTP_PORT, BOLT_PORT)
|
||||||
|
.withEnv(
|
||||||
|
"NEO4J_AUTH",
|
||||||
|
CONTAINER_NEO4J_USERNAME + "/" + CONTAINER_NEO4J_PASSWORD)
|
||||||
|
.withLogConsumer(
|
||||||
|
new Slf4jLogConsumer(
|
||||||
|
DockerLoggerFactory.getLogger(CONTAINER_IMAGE)));
|
||||||
|
container.setPortBindings(
|
||||||
|
Arrays.asList(
|
||||||
|
String.format("%s:%s", HTTP_PORT, HTTP_PORT),
|
||||||
|
String.format("%s:%s", BOLT_PORT, BOLT_PORT)));
|
||||||
|
Startables.deepStart(Stream.of(container)).join();
|
||||||
|
LOGGER.info("container started");
|
||||||
|
Awaitility.given()
|
||||||
|
.ignoreExceptions()
|
||||||
|
.await()
|
||||||
|
.atMost(30, TimeUnit.SECONDS)
|
||||||
|
.untilAsserted(this::initConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_create_node_all_type_field() {
|
||||||
|
final Result checkExists = neo4jSession.run("MATCH (p:Person) RETURN p limit 1");
|
||||||
|
if (checkExists.hasNext()) {
|
||||||
|
neo4jSession.run("MATCH (p:Person) delete p");
|
||||||
|
}
|
||||||
|
|
||||||
|
Configuration configuration = Configuration.from(new File("src/test/resources/allTypeFieldNode.json"));
|
||||||
|
Neo4jClient neo4jClient = Neo4jClient.build(configuration, null);
|
||||||
|
|
||||||
|
neo4jClient.init();
|
||||||
|
for (int i = 0; i < MOCK_NUM; i++) {
|
||||||
|
neo4jClient.tryWrite(mockAllTypeFieldTestNode(neo4jClient.getNeo4jFields()));
|
||||||
|
}
|
||||||
|
neo4jClient.destroy();
|
||||||
|
|
||||||
|
|
||||||
|
Result result = neo4jSession.run("MATCH (p:Person) return p");
|
||||||
|
// nodes
|
||||||
|
assertTrue(result.hasNext());
|
||||||
|
int cnt = 0;
|
||||||
|
while (result.hasNext()) {
|
||||||
|
org.neo4j.driver.Record record = result.next();
|
||||||
|
record.get("p").get("pbool").asBoolean();
|
||||||
|
record.get("p").get("pstring").asString();
|
||||||
|
record.get("p").get("plong").asLong();
|
||||||
|
record.get("p").get("pshort").asInt();
|
||||||
|
record.get("p").get("pdouble").asDouble();
|
||||||
|
List list = (List) record.get("p").get("pstringarr").asObject();
|
||||||
|
record.get("p").get("plocaldate").asLocalDate();
|
||||||
|
cnt++;
|
||||||
|
|
||||||
|
}
|
||||||
|
assertEquals(cnt, MOCK_NUM);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建关系 必须先有节点
|
||||||
|
* 所以先创建节点再模拟关系
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void test_create_relation() {
|
||||||
|
final Result checkExists = neo4jSession.run("MATCH (p1:Person)-[r:LINK]->(p1:Person) return r limit 1");
|
||||||
|
if (checkExists.hasNext()) {
|
||||||
|
neo4jSession.run("MATCH (p1:Person)-[r:LINK]->(p1:Person) delete r,p1,p2");
|
||||||
|
}
|
||||||
|
|
||||||
|
String createNodeCql = "create (p:Person) set p.id = '%s'";
|
||||||
|
Configuration configuration = Configuration.from(new File("src/test/resources/relationship.json"));
|
||||||
|
|
||||||
|
Neo4jClient neo4jClient = Neo4jClient.build(configuration, null);
|
||||||
|
neo4jClient.init();
|
||||||
|
//创建节点为后续写关系做准备
|
||||||
|
//Create nodes to prepare for subsequent write relationships
|
||||||
|
for (int i = 0; i < MOCK_NUM; i++) {
|
||||||
|
neo4jSession.run(String.format(createNodeCql, i + "start"));
|
||||||
|
neo4jSession.run(String.format(createNodeCql, i + "end"));
|
||||||
|
Record record = new MockRecord();
|
||||||
|
record.addColumn(new StringColumn(i + "start"));
|
||||||
|
record.addColumn(new StringColumn(i + "end"));
|
||||||
|
neo4jClient.tryWrite(record);
|
||||||
|
|
||||||
|
}
|
||||||
|
neo4jClient.destroy();
|
||||||
|
|
||||||
|
Result result = neo4jSession.run("MATCH (start:Person)-[r:LINK]->(end:Person) return r,start,end");
|
||||||
|
// relationships
|
||||||
|
assertTrue(result.hasNext());
|
||||||
|
int cnt = 0;
|
||||||
|
while (result.hasNext()) {
|
||||||
|
org.neo4j.driver.Record record = result.next();
|
||||||
|
|
||||||
|
Node startNode = record.get("start").asNode();
|
||||||
|
assertTrue(startNode.hasLabel("Person"));
|
||||||
|
assertTrue(startNode.asMap().containsKey("id"));
|
||||||
|
|
||||||
|
Node endNode = record.get("end").asNode();
|
||||||
|
assertTrue(startNode.hasLabel("Person"));
|
||||||
|
assertTrue(endNode.asMap().containsKey("id"));
|
||||||
|
|
||||||
|
|
||||||
|
String name = record.get("r").type().name();
|
||||||
|
assertEquals("RELATIONSHIP", name);
|
||||||
|
cnt++;
|
||||||
|
}
|
||||||
|
assertEquals(cnt, MOCK_NUM);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* neo4j中,Label和关系类型,想动态的写,需要借助于apoc函数
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void test_use_apoc_create_dynamic_label() {
|
||||||
|
List<String> dynamicLabel = new ArrayList<>();
|
||||||
|
for (int i = 0; i < MOCK_NUM; i++) {
|
||||||
|
dynamicLabel.add("Label" + i);
|
||||||
|
}
|
||||||
|
//删除原有数据
|
||||||
|
//remove test data if exist
|
||||||
|
//这种占位符的方式不支持批量动态写,当然可以使用union拼接,但是性能不好
|
||||||
|
String query = "match (p:%s) return p";
|
||||||
|
String delete = "match (p:%s) delete p";
|
||||||
|
for (String label : dynamicLabel) {
|
||||||
|
Result result = neo4jSession.run(String.format(query, label));
|
||||||
|
if (result.hasNext()) {
|
||||||
|
neo4jSession.run(String.format(delete, label));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Configuration configuration = Configuration.from(new File("src/test/resources/dynamicLabel.json"));
|
||||||
|
Neo4jClient neo4jClient = Neo4jClient.build(configuration, null);
|
||||||
|
|
||||||
|
neo4jClient.init();
|
||||||
|
for (int i = 0; i < dynamicLabel.size(); i++) {
|
||||||
|
Record record = new MockRecord();
|
||||||
|
record.addColumn(new StringColumn(dynamicLabel.get(i)));
|
||||||
|
record.addColumn(new StringColumn(String.valueOf(i)));
|
||||||
|
neo4jClient.tryWrite(record);
|
||||||
|
}
|
||||||
|
neo4jClient.destroy();
|
||||||
|
|
||||||
|
//校验脚本的批量写入是否正确
|
||||||
|
int cnt = 0;
|
||||||
|
for (int i = 0; i < dynamicLabel.size(); i++) {
|
||||||
|
String label = dynamicLabel.get(i);
|
||||||
|
Result result = neo4jSession.run(String.format(query, label));
|
||||||
|
while (result.hasNext()) {
|
||||||
|
org.neo4j.driver.Record record = result.next();
|
||||||
|
Node node = record.get("p").asNode();
|
||||||
|
assertTrue(node.hasLabel(label));
|
||||||
|
assertEquals(node.asMap().get("id"), i + "");
|
||||||
|
cnt++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(cnt, MOCK_NUM);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private Record mockAllTypeFieldTestNode(List<Neo4jField> neo4jFields) {
|
||||||
|
Record mock = new MockRecord();
|
||||||
|
for (Neo4jField field : neo4jFields) {
|
||||||
|
mock.addColumn(MockUtil.mockColumnByType(field.getFieldType()));
|
||||||
|
}
|
||||||
|
return mock;
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void destroy() {
|
||||||
|
if (neo4jSession != null) {
|
||||||
|
neo4jSession.close();
|
||||||
|
}
|
||||||
|
if (neo4jDriver != null) {
|
||||||
|
neo4jDriver.close();
|
||||||
|
}
|
||||||
|
if (container != null) {
|
||||||
|
container.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initConnection() {
|
||||||
|
neo4jDriver =
|
||||||
|
GraphDatabase.driver(
|
||||||
|
CONTAINER_URI,
|
||||||
|
AuthTokens.basic(CONTAINER_NEO4J_USERNAME, CONTAINER_NEO4J_PASSWORD));
|
||||||
|
neo4jSession = neo4jDriver.session(SessionConfig.forDatabase("neo4j"));
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,104 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.mock;
|
||||||
|
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.element.Column;
|
||||||
|
import com.alibaba.datax.common.element.Record;
|
||||||
|
import com.alibaba.fastjson2.JSON;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class MockRecord implements Record {
|
||||||
|
private static final int RECORD_AVERGAE_COLUMN_NUMBER = 16;
|
||||||
|
|
||||||
|
private List<Column> columns;
|
||||||
|
|
||||||
|
private int byteSize;
|
||||||
|
|
||||||
|
|
||||||
|
private Map<String, Object> meta;
|
||||||
|
|
||||||
|
public MockRecord() {
|
||||||
|
this.columns = new ArrayList<>(RECORD_AVERGAE_COLUMN_NUMBER);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addColumn(Column column) {
|
||||||
|
columns.add(column);
|
||||||
|
incrByteSize(column);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Column getColumn(int i) {
|
||||||
|
if (i < 0 || i >= columns.size()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return columns.get(i);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setColumn(int i, final Column column) {
|
||||||
|
if (i < 0) {
|
||||||
|
throw new IllegalArgumentException("不能给index小于0的column设置值");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i >= columns.size()) {
|
||||||
|
expandCapacity(i + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
decrByteSize(getColumn(i));
|
||||||
|
this.columns.set(i, column);
|
||||||
|
incrByteSize(getColumn(i));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
Map<String, Object> json = new HashMap<String, Object>();
|
||||||
|
json.put("size", this.getColumnNumber());
|
||||||
|
json.put("data", this.columns);
|
||||||
|
return JSON.toJSONString(json);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getColumnNumber() {
|
||||||
|
return this.columns.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getByteSize() {
|
||||||
|
return byteSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMemorySize() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setMeta(Map<String, String> meta) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, String> getMeta() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void decrByteSize(final Column column) {
|
||||||
|
}
|
||||||
|
|
||||||
|
private void incrByteSize(final Column column) {
|
||||||
|
}
|
||||||
|
|
||||||
|
private void expandCapacity(int totalSize) {
|
||||||
|
if (totalSize <= 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int needToExpand = totalSize - columns.size();
|
||||||
|
while (needToExpand-- > 0) {
|
||||||
|
this.columns.add(null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,50 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.mock;
|
||||||
|
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.element.*;
|
||||||
|
import com.alibaba.datax.plugin.writer.neo4jwriter.element.FieldType;
|
||||||
|
import com.alibaba.fastjson2.JSON;
|
||||||
|
|
||||||
|
import java.time.LocalDate;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
public class MockUtil {
|
||||||
|
|
||||||
|
public static Column mockColumnByType(FieldType type) {
|
||||||
|
Random random = new Random();
|
||||||
|
switch (type) {
|
||||||
|
case SHORT:
|
||||||
|
return new StringColumn("1");
|
||||||
|
case BOOLEAN:
|
||||||
|
return new BoolColumn(random.nextInt() % 2 == 0);
|
||||||
|
case INTEGER:
|
||||||
|
case LONG:
|
||||||
|
return new LongColumn(random.nextInt(Integer.MAX_VALUE));
|
||||||
|
case FLOAT:
|
||||||
|
case DOUBLE:
|
||||||
|
return new DoubleColumn(random.nextDouble());
|
||||||
|
case NULL:
|
||||||
|
return null;
|
||||||
|
case BYTE_ARRAY:
|
||||||
|
return new BytesColumn(new byte[]{(byte) (random.nextInt() % 2)});
|
||||||
|
case LOCAL_DATE:
|
||||||
|
return new StringColumn(LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
|
||||||
|
case MAP:
|
||||||
|
return new StringColumn(JSON.toJSONString(propmap()));
|
||||||
|
case STRING_ARRAY:
|
||||||
|
return new StringColumn("[1,1,1,1,1,1,1]");
|
||||||
|
default:
|
||||||
|
return new StringColumn("randomStr" + random.nextInt(Integer.MAX_VALUE));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Map<String, Object> propmap() {
|
||||||
|
Map<String, Object> prop = new HashMap<>();
|
||||||
|
prop.put("name", "neo4jWriter");
|
||||||
|
prop.put("age", "1");
|
||||||
|
return prop;
|
||||||
|
}
|
||||||
|
}
|
42
neo4jwriter/src/test/resources/allTypeFieldNode.json
Normal file
42
neo4jwriter/src/test/resources/allTypeFieldNode.json
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
{
|
||||||
|
"uri": "neo4j://localhost:7687",
|
||||||
|
"username":"neo4j",
|
||||||
|
"password":"Test@12343",
|
||||||
|
"database":"neo4j",
|
||||||
|
"cypher": "unwind $ batch as row create(p:Person) set p.pbool = row.pbool,p.pstring = row.pstring,p.plong = row.plong,p.pshort = row.pshort,p.pdouble=row.pdouble,p.pstringarr=row.pstringarr,p.plocaldate=row.plocaldate",
|
||||||
|
"batch_data_variable_name": "batch",
|
||||||
|
"batch_size": "33",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"fieldName": "pbool",
|
||||||
|
"fieldType": "BOOLEAN"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldName": "pstring",
|
||||||
|
"fieldType": "STRING"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldName": "plong",
|
||||||
|
"fieldType": "LONG"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldName": "pshort",
|
||||||
|
"fieldType": "SHORT"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldName": "pdouble",
|
||||||
|
"fieldType": "DOUBLE"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldName": "pstringarr",
|
||||||
|
"fieldType": "STRING_ARRAY",
|
||||||
|
"split": ",",
|
||||||
|
"arrayTrimChars": ["[","]"]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldName": "plocaldate",
|
||||||
|
"fieldType": "LOCAL_DATE",
|
||||||
|
"dateFormat": "yyyy-MM-dd"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
19
neo4jwriter/src/test/resources/dynamicLabel.json
Normal file
19
neo4jwriter/src/test/resources/dynamicLabel.json
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
{
|
||||||
|
"uri": "bolt://localhost:7687",
|
||||||
|
"username":"yourUserName",
|
||||||
|
"password":"yourPassword",
|
||||||
|
"database":"yourDataBase",
|
||||||
|
"cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.Label + '`{id:$id})' ,{id: row.id} ) YIELD value RETURN 1 ",
|
||||||
|
"batch_data_variable_name": "batch",
|
||||||
|
"batch_size": "33",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"fieldName": "Label",
|
||||||
|
"fieldType": "STRING"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldName": "id",
|
||||||
|
"fieldType": "STRING"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
19
neo4jwriter/src/test/resources/relationship.json
Normal file
19
neo4jwriter/src/test/resources/relationship.json
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
{
|
||||||
|
"uri": "neo4j://localhost:7687",
|
||||||
|
"username":"neo4j",
|
||||||
|
"password":"Test@12343",
|
||||||
|
"database":"neo4j",
|
||||||
|
"cypher": "unwind $batch as row match(p1:Person) where p1.id = row.startNodeId match(p2:Person) where p2.id = row.endNodeId create (p1)-[:LINK]->(p2)",
|
||||||
|
"batch_data_variable_name": "batch",
|
||||||
|
"batch_size": "33",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"fieldName": "startNodeId",
|
||||||
|
"fieldType": "STRING"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldName": "endNodeId",
|
||||||
|
"fieldType": "STRING"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
1
pom.xml
1
pom.xml
@ -123,6 +123,7 @@
|
|||||||
<module>doriswriter</module>
|
<module>doriswriter</module>
|
||||||
<module>selectdbwriter</module>
|
<module>selectdbwriter</module>
|
||||||
<module>adbmysqlwriter</module>
|
<module>adbmysqlwriter</module>
|
||||||
|
<module>neo4jwriter</module>
|
||||||
|
|
||||||
<!-- common support module -->
|
<!-- common support module -->
|
||||||
<module>plugin-rdbms-util</module>
|
<module>plugin-rdbms-util</module>
|
||||||
|
Loading…
Reference in New Issue
Block a user