diff --git a/datax-example/doc/README.md b/datax-example/doc/README.md index 467c2d37..d44e10d2 100644 --- a/datax-example/doc/README.md +++ b/datax-example/doc/README.md @@ -70,9 +70,10 @@ ``` -2.在datax-example模块引入你需要的插件,默认只引入了streamreader、writer +#### 在example模块使用 +1.在datax-example模块引入你需要的插件,默认只引入了streamreader、writer -3.打开datax-example的Main class +2.打开datax-example的Main class ```java public class Main { @@ -99,4 +100,51 @@ public class Main { } } +``` +#### 在reader/writer模块使用 +参考neo4jwriter的StreamReader2Neo4jWriterTest +```java +public class StreamReader2Neo4jWriterTest extends Neo4jWriterTest { + private static final int CHANNEL = 5; + private static final int READER_NUM = 10; + + //在neo4jWriter模块使用Example测试整个job,方便发现整个流程的代码问题 + @Test + public void streamReader2Neo4j() { + + deleteHistoryIfExist(); + + String path = "/streamreader2neo4j.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + + ExampleContainer.start(jobPath); + + //根据channel和reader的mock数据,校验结果集是否符合预期 + verifyWriteResult(); + } + + private void deleteHistoryIfExist() { + String query = "match (n:StreamReader) return n limit 1"; + String delete = "match (n:StreamReader) delete n"; + if (super.neo4jSession.run(query).hasNext()) { + neo4jSession.run(delete); + } + } + + private void verifyWriteResult() { + int total = CHANNEL * READER_NUM; + String query = "match (n:StreamReader) return n"; + Result run = neo4jSession.run(query); + int count = 0; + while (run.hasNext()) { + Record record = run.next(); + Node node = record.get("n").asNode(); + if (node.hasLabel("StreamReader")) { + count++; + } + } + Assert.assertEquals(count, total); + } +} + ``` \ No newline at end of file diff --git a/datax-example/pom.xml b/datax-example/pom.xml index 9f077a28..17bb9e18 100644 --- a/datax-example/pom.xml +++ b/datax-example/pom.xml @@ -56,13 +56,6 @@ true - - src/main/resources - - **/*.* - - true - diff --git a/datax-example/src/main/java/com/alibaba/datax/example/ExampleContainer.java b/datax-example/src/main/java/com/alibaba/datax/example/ExampleContainer.java new file mode 100644 index 00000000..a4229fd1 --- /dev/null +++ b/datax-example/src/main/java/com/alibaba/datax/example/ExampleContainer.java @@ -0,0 +1,26 @@ +package com.alibaba.datax.example; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.core.Engine; +import com.alibaba.datax.example.util.ExampleConfigParser; + +/** + * {@code Date} 2023/8/6 11:22 + * + * @author fuyouj + */ + +public class ExampleContainer { + /** + * example对外暴露的启动入口 + * 使用前最好看下 datax-example/doc/README.MD + * @param jobPath 任务json绝对路径 + */ + public static void start(String jobPath) { + + Configuration configuration = ExampleConfigParser.parse(jobPath); + + Engine engine = new Engine(); + engine.start(configuration); + } +} diff --git a/datax-example/src/main/java/com/alibaba/datax/example/Main.java b/datax-example/src/main/java/com/alibaba/datax/example/Main.java index afb6829c..56bf9f0b 100644 --- a/datax-example/src/main/java/com/alibaba/datax/example/Main.java +++ b/datax-example/src/main/java/com/alibaba/datax/example/Main.java @@ -1,9 +1,6 @@ package com.alibaba.datax.example; -import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.core.Engine; -import com.alibaba.datax.example.util.ExampleConfigParser; import com.alibaba.datax.example.util.PathUtil; /** @@ -13,22 +10,14 @@ public class Main { /** * 1.在example模块pom文件添加你依赖的的调试插件, - * 你可以直接打开本模块的pom文件,参考是如何引入streamreader,streamwriter + * 你可以直接打开本模块的pom文件,参考是如何引入streamreader,streamwriter * 2. 在此处指定你的job文件 */ public static void main(String[] args) { String classPathJobPath = "/job/stream2stream.json"; String absJobPath = PathUtil.getAbsolutePathFromClassPath(classPathJobPath); - startExample(absJobPath); - } - - public static void startExample(String jobPath) { - - Configuration configuration = ExampleConfigParser.parse(jobPath); - - Engine engine = new Engine(); - engine.start(configuration); + ExampleContainer.start(absJobPath); } } diff --git a/datax-example/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java b/datax-example/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java index cf13c9ba..aaf4155c 100644 --- a/datax-example/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java +++ b/datax-example/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java @@ -22,7 +22,7 @@ public class ExampleConfigParser { /** * 指定Job配置路径,ConfigParser会解析Job、Plugin、Core全部信息,并以Configuration返回 - * 不同于Core的ConfigParser,这里的core,plugin 不依赖于编译后的datax.home,而是扫描程序目录 + * 不同于Core的ConfigParser,这里的core,plugin 不依赖于编译后的datax.home,而是扫描程序编译后的target目录 */ public static Configuration parse(final String jobPath) { @@ -54,15 +54,26 @@ public class ExampleConfigParser { scanPluginByPackage(basePackage, configuration, basePackage.listFiles(), pluginTypeMap); } if (!pluginTypeMap.isEmpty()) { - throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR, - "load plugin failed,未完成指定插件加载:" - + pluginTypeMap.keySet()); + String failedPlugin = pluginTypeMap.keySet().toString(); + String message = "\n插件%s加载失败:尝试从以下方面分析原因。\n" + + "1: 检查插件的名字是否书写正确\n" + + "2:相关插件的pom文件的下是否已经添加了 \n" + + " src/main/resources\n" + + " \n" + + " **/*.*\n" + + " \n" + + " true\n" + + " \n [可参阅streamreader pom文件] \n" + + "3:如果你是以datax-example为启动模块,example模块是否导入了插件的依赖。参开example的pom文件"; + message = String.format(message, failedPlugin); + throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR, message); } return configuration; } /** * 通过classLoader获取程序编译的输出目录 + * * @return File[/datax-example/target/classes,xxReader/target/classes,xxWriter/target/classes] */ private static File[] runtimeBasePackages() { @@ -87,10 +98,9 @@ public class ExampleConfigParser { } /** - * - * @param packageFile 编译出来的target/classes根目录 便于找到插件时设置插件的URL目录,设置根目录是最保险的方式 - * @param configuration pluginConfig - * @param files 待扫描文件 + * @param packageFile 编译出来的target/classes根目录 便于找到插件时设置插件的URL目录,设置根目录是最保险的方式 + * @param configuration pluginConfig + * @param files 待扫描文件 * @param needPluginTypeMap 需要的插件 */ private static void scanPluginByPackage(File packageFile, @@ -108,12 +118,12 @@ public class ExampleConfigParser { if (needPluginTypeMap.containsKey(descPluginName)) { String type = needPluginTypeMap.get(descPluginName); - configuration.merge(parseOnePlugin(packageFile.getAbsolutePath(),type, descPluginName, pluginDesc), false); + configuration.merge(parseOnePlugin(packageFile.getAbsolutePath(), type, descPluginName, pluginDesc), false); needPluginTypeMap.remove(descPluginName); } } else { - scanPluginByPackage(packageFile,configuration, file.listFiles(), needPluginTypeMap); + scanPluginByPackage(packageFile, configuration, file.listFiles(), needPluginTypeMap); } } } diff --git a/datax-example/src/test/java/com/alibaba/datax/example/DataXExampleTest.java b/datax-example/src/test/java/com/alibaba/datax/example/DataXExampleTest.java index 06af835b..c2f600cd 100644 --- a/datax-example/src/test/java/com/alibaba/datax/example/DataXExampleTest.java +++ b/datax-example/src/test/java/com/alibaba/datax/example/DataXExampleTest.java @@ -11,6 +11,6 @@ public class DataXExampleTest { public void testStreamReader2StreamWriter() { String path = "/job/stream2stream.json"; String jobPath = PathUtil.getAbsolutePathFromClassPath(path); - Main.startExample(jobPath); + ExampleContainer.start(jobPath); } } diff --git a/neo4jwriter/pom.xml b/neo4jwriter/pom.xml index a9ae43e9..0e65505e 100644 --- a/neo4jwriter/pom.xml +++ b/neo4jwriter/pom.xml @@ -53,9 +53,24 @@ ${junit4.version} test + + com.alibaba.datax + datax-example + 0.0.1-SNAPSHOT + + + + src/main/resources + + **/*.* + + true + + + diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java index a89f4674..6a589c1d 100644 --- a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java @@ -18,6 +18,7 @@ public class Neo4jWriter extends Writer { @Override public void init() { LOGGER.info("Neo4jWriter Job init success"); + this.jobConf = getPluginJobConf(); } @Override diff --git a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java index 53c9235e..67f6e95e 100644 --- a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java +++ b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java @@ -52,8 +52,8 @@ public class Neo4jWriterTest { protected static final Network NETWORK = Network.newNetwork(); private GenericContainer container; - private Driver neo4jDriver; - private Session neo4jSession; + protected Driver neo4jDriver; + protected Session neo4jSession; @Before public void init() { diff --git a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/StreamReader2Neo4jWriterTest.java b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/StreamReader2Neo4jWriterTest.java new file mode 100644 index 00000000..3335f5e7 --- /dev/null +++ b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/StreamReader2Neo4jWriterTest.java @@ -0,0 +1,59 @@ +package com.alibaba.datax.plugin.writer.mock; + +import com.alibaba.datax.example.ExampleContainer; +import com.alibaba.datax.example.util.PathUtil; +import com.alibaba.datax.plugin.writer.Neo4jWriterTest; +import org.junit.Assert; +import org.junit.Test; +import org.neo4j.driver.Record; +import org.neo4j.driver.Result; +import org.neo4j.driver.types.Node; + +/** + * 展示如何使用ExampleContainer运行测试用例 + * {@code Author} FuYouJ + * {@code Date} 2023/8/6 11:36 + */ + +public class StreamReader2Neo4jWriterTest extends Neo4jWriterTest { + private static final int CHANNEL = 5; + private static final int READER_NUM = 10; + + //在neo4jWriter模块使用Example测试整个job,方便发现整个流程的代码问题 + @Test + public void streamReader2Neo4j() { + + deleteHistoryIfExist(); + + String path = "/streamreader2neo4j.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + + ExampleContainer.start(jobPath); + + //根据channel和reader的mock数据,校验结果集是否符合预期 + verifyWriteResult(); + } + + private void deleteHistoryIfExist() { + String query = "match (n:StreamReader) return n limit 1"; + String delete = "match (n:StreamReader) delete n"; + if (super.neo4jSession.run(query).hasNext()) { + neo4jSession.run(delete); + } + } + + private void verifyWriteResult() { + int total = CHANNEL * READER_NUM; + String query = "match (n:StreamReader) return n"; + Result run = neo4jSession.run(query); + int count = 0; + while (run.hasNext()) { + Record record = run.next(); + Node node = record.get("n").asNode(); + if (node.hasLabel("StreamReader")) { + count++; + } + } + Assert.assertEquals(count, total); + } +} diff --git a/neo4jwriter/src/test/resources/streamreader2neo4j.json b/neo4jwriter/src/test/resources/streamreader2neo4j.json new file mode 100644 index 00000000..3d543ce3 --- /dev/null +++ b/neo4jwriter/src/test/resources/streamreader2neo4j.json @@ -0,0 +1,51 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "sliceRecordCount": 10, + "column": [ + { + "type": "string", + "value": "StreamReader" + }, + { + "type": "string", + "value": "1997" + } + ] + } + }, + "writer": { + "name": "neo4jWriter", + "parameter": { + "uri": "bolt://localhost:7687", + "username":"neo4j", + "password":"Test@12343", + "database":"neo4j", + "cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.Label + '`{id:$id})' ,{id: row.id} ) YIELD value RETURN 1 ", + "batchDataVariableName": "batch", + "batchSize": "3", + "properties": [ + { + "name": "Label", + "type": "string" + }, + { + "name": "id", + "type": "STRING" + } + ] + } + } + } + ], + "setting": { + "speed": { + "channel": 5 + } + } + } +} \ No newline at end of file