From 5c337d6b2282942d06a29f656100d63224e2d677 Mon Sep 17 00:00:00 2001 From: FuYouJ <1247908487@qq.com> Date: Thu, 29 Jun 2023 22:25:05 +0800 Subject: [PATCH] =?UTF-8?q?apoc=E5=AE=89=E8=A3=85=E5=9C=A8=E5=AE=B9?= =?UTF-8?q?=E5=99=A8=E4=B9=8B=E5=86=85=E4=BE=BF=E4=BA=8E=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../writer/neo4jwriter/Neo4jClient.java | 6 +- .../writer/neo4jwriter/Neo4jWriter.java | 4 +- .../neo4jwriter/element/Neo4jRecord.java | 29 ------- .../alibaba/datax/plugin/writer/ApocTest.java | 81 ------------------- .../datax/plugin/writer/Neo4jWriterTest.java | 6 +- .../src/test/resources/dynamicLabel.json | 6 +- 6 files changed, 13 insertions(+), 119 deletions(-) delete mode 100644 neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/element/Neo4jRecord.java delete mode 100644 neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/ApocTest.java diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java index f8b9ee4a..c9cbb060 100644 --- a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java @@ -167,8 +167,8 @@ public class Neo4jClient { private void doWrite(List values) { Value batchValues = Values.parameters(this.writeConfig.batchVariableName, values); Query query = new Query(this.writeConfig.cypher, batchValues); - LOGGER.debug("query:{}", query.text()); - LOGGER.debug("batch:{}", toUnwindStr(values)); +// LOGGER.debug("query:{}", query.text()); +// LOGGER.debug("batch:{}", toUnwindStr(values)); try { RetryUtil.executeWithRetry(() -> { session.writeTransaction(tx -> tx.run(query)); @@ -205,7 +205,7 @@ public class Neo4jClient { LOGGER.warn("接收到的数据列少于neo4jWriter企图消费的数据列,请注意风险,这可能导致数据不匹配"); LOGGER.warn("Receive fewer data columns than neo4jWriter attempts to consume, " + "be aware of the risk that this may result in data mismatch"); - LOGGER.warn("接受到的数据是:" + record); + LOGGER.warn("接收到的数据是:" + record); LOGGER.warn("received data is:" + record); } diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java index a851a363..9a7b62ee 100644 --- a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java @@ -17,12 +17,12 @@ public class Neo4jWriter extends Writer { private Configuration jobConf = null; @Override public void init() { - + LOGGER.info("Neo4jWriter Job init Success"); } @Override public void destroy() { - + LOGGER.info("Neo4jWriter Job destroyed"); } @Override diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/element/Neo4jRecord.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/element/Neo4jRecord.java deleted file mode 100644 index 7240f19e..00000000 --- a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/element/Neo4jRecord.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.alibaba.datax.plugin.writer.neo4jwriter.element; - -import org.neo4j.driver.Record; -import org.neo4j.driver.Value; -import org.neo4j.driver.internal.AsValue; -import org.neo4j.driver.internal.value.MapValue; - -import java.util.List; -import java.util.Map; - -/** - * 一般来说,我们会将一批对象转换成hashmap再传输给neo4j的驱动用作参数解析,驱动会将hashmap转换成org.neo4j.driver.Value - * 过程是:List[domain] -> List[map]->List[Value] - * 直接将Record实现AsValue接口,有1个好处: - * 减少了一次对象转换次数,List[domain] -> List[Value] - */ -public class Neo4jRecord implements AsValue { - - private MapValue mapValue; - - public Neo4jRecord(Record record, List columnNames) { - - } - - @Override - public Value asValue() { - return null; - } -} diff --git a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/ApocTest.java b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/ApocTest.java deleted file mode 100644 index 6faa6e77..00000000 --- a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/ApocTest.java +++ /dev/null @@ -1,81 +0,0 @@ -package com.alibaba.datax.plugin.writer; - -import com.alibaba.datax.common.element.Record; -import com.alibaba.datax.common.element.StringColumn; -import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.plugin.writer.mock.MockRecord; -import com.alibaba.datax.plugin.writer.neo4jwriter.Neo4jClient; -import org.junit.Test; -import org.neo4j.driver.*; -import org.neo4j.driver.types.Node; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * 由于docker 镜像没有apoc函数,所以此测试只能本地搭建环境复现 - */ -public class ApocTest { - /** - * neo4j中,Label和关系类型,想动态的写,需要借助于apoc函数 - */ - @Test - public void test_use_apoc_create_dynamic_label() { - try (Driver neo4jDriver = GraphDatabase.driver( - "bolt://localhost:7687", - AuthTokens.basic("yourUserName", "yourPassword")); - Session neo4jSession = neo4jDriver.session(SessionConfig.forDatabase("yourDataBase"))) { - List dynamicLabel = new ArrayList<>(); - for (int i = 0; i < 100; i++) { - dynamicLabel.add("Label" + i); - } - //删除原有数据 - //remove test data if exist - //这种占位符的方式不支持批量动态写,当然可以使用union拼接,但是性能不好 - String query = "match (p:%s) return p"; - String delete = "match (p:%s) delete p"; - for (String label : dynamicLabel) { - Result result = neo4jSession.run(String.format(query, label)); - if (result.hasNext()) { - neo4jSession.run(String.format(delete, label)); - } - } - - Configuration configuration = Configuration.from(new File("src/test/resources/dynamicLabel.json")); - Neo4jClient neo4jClient = Neo4jClient.build(configuration, null); - - neo4jClient.init(); - for (int i = 0; i < dynamicLabel.size(); i++) { - Record record = new MockRecord(); - record.addColumn(new StringColumn(dynamicLabel.get(i))); - record.addColumn(new StringColumn(String.valueOf(i))); - neo4jClient.tryWrite(record); - } - neo4jClient.destroy(); - - //校验脚本的批量写入是否正确 - int cnt = 0; - for (int i = 0; i < dynamicLabel.size(); i++) { - String label = dynamicLabel.get(i); - Result result = neo4jSession.run(String.format(query, label)); - while (result.hasNext()) { - org.neo4j.driver.Record record = result.next(); - Node node = record.get("p").asNode(); - assertTrue(node.hasLabel(label)); - assertEquals(node.asMap().get("id"), i + ""); - cnt++; - } - } - assertEquals(cnt, 100); - } - - - - - - } -} diff --git a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java index 6df7053a..a132d4fd 100644 --- a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java +++ b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java @@ -39,7 +39,7 @@ public class Neo4jWriterTest { private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jWriterTest.class); private static final int MOCK_NUM = 100; - private static final String CONTAINER_IMAGE = "neo4j:5.6.0"; + private static final String CONTAINER_IMAGE = "neo4j:5.9.0"; private static final String CONTAINER_HOST = "neo4j-host"; private static final int HTTP_PORT = 7474; @@ -65,6 +65,10 @@ public class Neo4jWriterTest { .withEnv( "NEO4J_AUTH", CONTAINER_NEO4J_USERNAME + "/" + CONTAINER_NEO4J_PASSWORD) + .withEnv("apoc.export.file.enabled", "true") + .withEnv("apoc.import.file.enabled", "true") + .withEnv("apoc.import.file.use_neo4j_config", "true") + .withEnv("NEO4J_PLUGINS", "[\"apoc\"]") .withLogConsumer( new Slf4jLogConsumer( DockerLoggerFactory.getLogger(CONTAINER_IMAGE))); diff --git a/neo4jwriter/src/test/resources/dynamicLabel.json b/neo4jwriter/src/test/resources/dynamicLabel.json index 61f5de75..5874b694 100644 --- a/neo4jwriter/src/test/resources/dynamicLabel.json +++ b/neo4jwriter/src/test/resources/dynamicLabel.json @@ -1,8 +1,8 @@ { "uri": "bolt://localhost:7687", - "username":"yourUserName", - "password":"yourPassword", - "database":"yourDataBase", + "username":"neo4j", + "password":"Test@12343", + "database":"neo4j", "cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.Label + '`{id:$id})' ,{id: row.id} ) YIELD value RETURN 1 ", "batch_data_variable_name": "batch", "batch_size": "33",