mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 04:59:51 +08:00
apoc安装在容器之内便于测试
This commit is contained in:
parent
b0c3d3e6d9
commit
5c337d6b22
@ -167,8 +167,8 @@ public class Neo4jClient {
|
||||
private void doWrite(List<MapValue> values) {
|
||||
Value batchValues = Values.parameters(this.writeConfig.batchVariableName, values);
|
||||
Query query = new Query(this.writeConfig.cypher, batchValues);
|
||||
LOGGER.debug("query:{}", query.text());
|
||||
LOGGER.debug("batch:{}", toUnwindStr(values));
|
||||
// LOGGER.debug("query:{}", query.text());
|
||||
// LOGGER.debug("batch:{}", toUnwindStr(values));
|
||||
try {
|
||||
RetryUtil.executeWithRetry(() -> {
|
||||
session.writeTransaction(tx -> tx.run(query));
|
||||
@ -205,7 +205,7 @@ public class Neo4jClient {
|
||||
LOGGER.warn("接收到的数据列少于neo4jWriter企图消费的数据列,请注意风险,这可能导致数据不匹配");
|
||||
LOGGER.warn("Receive fewer data columns than neo4jWriter attempts to consume, " +
|
||||
"be aware of the risk that this may result in data mismatch");
|
||||
LOGGER.warn("接受到的数据是:" + record);
|
||||
LOGGER.warn("接收到的数据是:" + record);
|
||||
LOGGER.warn("received data is:" + record);
|
||||
}
|
||||
|
||||
|
@ -17,12 +17,12 @@ public class Neo4jWriter extends Writer {
|
||||
private Configuration jobConf = null;
|
||||
@Override
|
||||
public void init() {
|
||||
|
||||
LOGGER.info("Neo4jWriter Job init Success");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
|
||||
LOGGER.info("Neo4jWriter Job destroyed");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1,29 +0,0 @@
|
||||
package com.alibaba.datax.plugin.writer.neo4jwriter.element;
|
||||
|
||||
import org.neo4j.driver.Record;
|
||||
import org.neo4j.driver.Value;
|
||||
import org.neo4j.driver.internal.AsValue;
|
||||
import org.neo4j.driver.internal.value.MapValue;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 一般来说,我们会将一批对象转换成hashmap再传输给neo4j的驱动用作参数解析,驱动会将hashmap转换成org.neo4j.driver.Value
|
||||
* 过程是:List[domain] -> List[map]->List[Value]
|
||||
* 直接将Record实现AsValue接口,有1个好处:
|
||||
* 减少了一次对象转换次数,List[domain] -> List[Value]
|
||||
*/
|
||||
public class Neo4jRecord implements AsValue {
|
||||
|
||||
private MapValue mapValue;
|
||||
|
||||
public Neo4jRecord(Record record, List<String> columnNames) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Value asValue() {
|
||||
return null;
|
||||
}
|
||||
}
|
@ -1,81 +0,0 @@
|
||||
package com.alibaba.datax.plugin.writer;
|
||||
|
||||
import com.alibaba.datax.common.element.Record;
|
||||
import com.alibaba.datax.common.element.StringColumn;
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import com.alibaba.datax.plugin.writer.mock.MockRecord;
|
||||
import com.alibaba.datax.plugin.writer.neo4jwriter.Neo4jClient;
|
||||
import org.junit.Test;
|
||||
import org.neo4j.driver.*;
|
||||
import org.neo4j.driver.types.Node;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* 由于docker 镜像没有apoc函数,所以此测试只能本地搭建环境复现
|
||||
*/
|
||||
public class ApocTest {
|
||||
/**
|
||||
* neo4j中,Label和关系类型,想动态的写,需要借助于apoc函数
|
||||
*/
|
||||
@Test
|
||||
public void test_use_apoc_create_dynamic_label() {
|
||||
try (Driver neo4jDriver = GraphDatabase.driver(
|
||||
"bolt://localhost:7687",
|
||||
AuthTokens.basic("yourUserName", "yourPassword"));
|
||||
Session neo4jSession = neo4jDriver.session(SessionConfig.forDatabase("yourDataBase"))) {
|
||||
List<String> dynamicLabel = new ArrayList<>();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
dynamicLabel.add("Label" + i);
|
||||
}
|
||||
//删除原有数据
|
||||
//remove test data if exist
|
||||
//这种占位符的方式不支持批量动态写,当然可以使用union拼接,但是性能不好
|
||||
String query = "match (p:%s) return p";
|
||||
String delete = "match (p:%s) delete p";
|
||||
for (String label : dynamicLabel) {
|
||||
Result result = neo4jSession.run(String.format(query, label));
|
||||
if (result.hasNext()) {
|
||||
neo4jSession.run(String.format(delete, label));
|
||||
}
|
||||
}
|
||||
|
||||
Configuration configuration = Configuration.from(new File("src/test/resources/dynamicLabel.json"));
|
||||
Neo4jClient neo4jClient = Neo4jClient.build(configuration, null);
|
||||
|
||||
neo4jClient.init();
|
||||
for (int i = 0; i < dynamicLabel.size(); i++) {
|
||||
Record record = new MockRecord();
|
||||
record.addColumn(new StringColumn(dynamicLabel.get(i)));
|
||||
record.addColumn(new StringColumn(String.valueOf(i)));
|
||||
neo4jClient.tryWrite(record);
|
||||
}
|
||||
neo4jClient.destroy();
|
||||
|
||||
//校验脚本的批量写入是否正确
|
||||
int cnt = 0;
|
||||
for (int i = 0; i < dynamicLabel.size(); i++) {
|
||||
String label = dynamicLabel.get(i);
|
||||
Result result = neo4jSession.run(String.format(query, label));
|
||||
while (result.hasNext()) {
|
||||
org.neo4j.driver.Record record = result.next();
|
||||
Node node = record.get("p").asNode();
|
||||
assertTrue(node.hasLabel(label));
|
||||
assertEquals(node.asMap().get("id"), i + "");
|
||||
cnt++;
|
||||
}
|
||||
}
|
||||
assertEquals(cnt, 100);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
}
|
@ -39,7 +39,7 @@ public class Neo4jWriterTest {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jWriterTest.class);
|
||||
private static final int MOCK_NUM = 100;
|
||||
private static final String CONTAINER_IMAGE = "neo4j:5.6.0";
|
||||
private static final String CONTAINER_IMAGE = "neo4j:5.9.0";
|
||||
|
||||
private static final String CONTAINER_HOST = "neo4j-host";
|
||||
private static final int HTTP_PORT = 7474;
|
||||
@ -65,6 +65,10 @@ public class Neo4jWriterTest {
|
||||
.withEnv(
|
||||
"NEO4J_AUTH",
|
||||
CONTAINER_NEO4J_USERNAME + "/" + CONTAINER_NEO4J_PASSWORD)
|
||||
.withEnv("apoc.export.file.enabled", "true")
|
||||
.withEnv("apoc.import.file.enabled", "true")
|
||||
.withEnv("apoc.import.file.use_neo4j_config", "true")
|
||||
.withEnv("NEO4J_PLUGINS", "[\"apoc\"]")
|
||||
.withLogConsumer(
|
||||
new Slf4jLogConsumer(
|
||||
DockerLoggerFactory.getLogger(CONTAINER_IMAGE)));
|
||||
|
@ -1,8 +1,8 @@
|
||||
{
|
||||
"uri": "bolt://localhost:7687",
|
||||
"username":"yourUserName",
|
||||
"password":"yourPassword",
|
||||
"database":"yourDataBase",
|
||||
"username":"neo4j",
|
||||
"password":"Test@12343",
|
||||
"database":"neo4j",
|
||||
"cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.Label + '`{id:$id})' ,{id: row.id} ) YIELD value RETURN 1 ",
|
||||
"batch_data_variable_name": "batch",
|
||||
"batch_size": "33",
|
||||
|
Loading…
Reference in New Issue
Block a user