From f0128e5853ff968c265874950f9a796046bd7199 Mon Sep 17 00:00:00 2001 From: FuYouJ <1247908487@qq.com> Date: Sun, 23 Jul 2023 18:39:26 +0800 Subject: [PATCH] example --- .../alibaba/datax/core/job/JobContainer.java | 1 + .../datax/core/util/container/JarLoader.java | 2 +- .../datax/core/util/container/LoadUtil.java | 25 ++--- .../util/container/PluginClassLoader.java | 10 ++ .../core/util/container/PluginLoader.java | 16 +++ .../util/container/PluginLoaderFactory.java | 22 +++++ datax-example/pom.xml | 42 ++++++++ .../java/com/alibaba/datax/example/Main.java | 25 +++++ .../example/util/ExampleConfigParser.java | 98 +++++++++++++++++++ .../src/main/resources/example/conf/core.json | 60 ++++++++++++ .../src/main/resources/job/stream2stream.json | 36 +++++++ .../src/main/resources/stream2stream.json | 36 +++++++ pom.xml | 3 +- 13 files changed, 362 insertions(+), 14 deletions(-) create mode 100644 core/src/main/java/com/alibaba/datax/core/util/container/PluginClassLoader.java create mode 100644 core/src/main/java/com/alibaba/datax/core/util/container/PluginLoader.java create mode 100644 core/src/main/java/com/alibaba/datax/core/util/container/PluginLoaderFactory.java create mode 100644 datax-example/pom.xml create mode 100644 datax-example/src/main/java/com/alibaba/datax/example/Main.java create mode 100644 datax-example/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java create mode 100755 datax-example/src/main/resources/example/conf/core.json create mode 100644 datax-example/src/main/resources/job/stream2stream.json create mode 100644 datax-example/src/main/resources/stream2stream.json diff --git a/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java b/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java index 49f5a0a1..03830100 100755 --- a/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java +++ b/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java @@ -653,6 +653,7 @@ public class JobContainer extends AbstractContainer { */ private Reader.Job initJobReader( JobPluginCollector jobPluginCollector) { + //TODO loadUtil加载或者是 pluginUtil加载 this.readerPluginName = this.configuration.getString( CoreConstant.DATAX_JOB_CONTENT_READER_NAME); classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( diff --git a/core/src/main/java/com/alibaba/datax/core/util/container/JarLoader.java b/core/src/main/java/com/alibaba/datax/core/util/container/JarLoader.java index 9fc113dc..513a090d 100755 --- a/core/src/main/java/com/alibaba/datax/core/util/container/JarLoader.java +++ b/core/src/main/java/com/alibaba/datax/core/util/container/JarLoader.java @@ -15,7 +15,7 @@ import java.util.List; /** * 提供Jar隔离的加载机制,会把传入的路径、及其子路径、以及路径中的jar文件加入到class path。 */ -public class JarLoader extends URLClassLoader { +public class JarLoader extends URLClassLoader implements PluginLoader{ public JarLoader(String[] paths) { this(paths, JarLoader.class.getClassLoader()); } diff --git a/core/src/main/java/com/alibaba/datax/core/util/container/LoadUtil.java b/core/src/main/java/com/alibaba/datax/core/util/container/LoadUtil.java index 30e926c3..dc5b22c6 100755 --- a/core/src/main/java/com/alibaba/datax/core/util/container/LoadUtil.java +++ b/core/src/main/java/com/alibaba/datax/core/util/container/LoadUtil.java @@ -49,7 +49,7 @@ public class LoadUtil { /** * jarLoader的缓冲 */ - private static Map jarLoaderCenter = new HashMap(); + private static Map jarLoaderCenter = new HashMap(); /** * 设置pluginConfigs,方便后面插件来获取 @@ -167,7 +167,7 @@ public class LoadUtil { PluginType pluginType, String pluginName, ContainerType pluginRunType) { Configuration pluginConf = getPluginConf(pluginType, pluginName); - JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName); + ClassLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName); try { return (Class) jarLoader .loadClass(pluginConf.getString("class") + "$" @@ -177,22 +177,23 @@ public class LoadUtil { } } - public static synchronized JarLoader getJarLoader(PluginType pluginType, + public static synchronized ClassLoader getJarLoader(PluginType pluginType, String pluginName) { Configuration pluginConf = getPluginConf(pluginType, pluginName); - JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType, + ClassLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType, pluginName)); if (null == jarLoader) { String pluginPath = pluginConf.getString("path"); - if (StringUtils.isBlank(pluginPath)) { - throw DataXException.asDataXException( - FrameworkErrorCode.RUNTIME_ERROR, - String.format( - "%s插件[%s]路径非法!", - pluginType, pluginName)); - } - jarLoader = new JarLoader(new String[]{pluginPath}); +// jarLoader = new JarLoader(new String[]{pluginPath}); + jarLoader = PluginLoaderFactory.create(pluginConf); +// if (StringUtils.isBlank(pluginPath)) { +// throw DataXException.asDataXException( +// FrameworkErrorCode.RUNTIME_ERROR, +// String.format( +// "%s插件[%s]路径非法!", +// pluginType, pluginName)); +// } jarLoaderCenter.put(generatePluginKey(pluginType, pluginName), jarLoader); } diff --git a/core/src/main/java/com/alibaba/datax/core/util/container/PluginClassLoader.java b/core/src/main/java/com/alibaba/datax/core/util/container/PluginClassLoader.java new file mode 100644 index 00000000..38ba952c --- /dev/null +++ b/core/src/main/java/com/alibaba/datax/core/util/container/PluginClassLoader.java @@ -0,0 +1,10 @@ +package com.alibaba.datax.core.util.container; + +/** + * {@code Author} FuYouJ + * {@code Date} 2023/7/23 16:52 + * @author fuyouj + */ + +public class PluginClassLoader extends ClassLoader implements PluginLoader{ +} diff --git a/core/src/main/java/com/alibaba/datax/core/util/container/PluginLoader.java b/core/src/main/java/com/alibaba/datax/core/util/container/PluginLoader.java new file mode 100644 index 00000000..5e79a548 --- /dev/null +++ b/core/src/main/java/com/alibaba/datax/core/util/container/PluginLoader.java @@ -0,0 +1,16 @@ +package com.alibaba.datax.core.util.container; + + +/** + * @author fuyouj + */ +public interface PluginLoader { + /** + * 加载插件对象 + * + * @param name 类全限定名 + * @return class对象 + * @throws ClassNotFoundException + */ + Class loadClass(String name) throws ClassNotFoundException; +} diff --git a/core/src/main/java/com/alibaba/datax/core/util/container/PluginLoaderFactory.java b/core/src/main/java/com/alibaba/datax/core/util/container/PluginLoaderFactory.java new file mode 100644 index 00000000..8fe6474e --- /dev/null +++ b/core/src/main/java/com/alibaba/datax/core/util/container/PluginLoaderFactory.java @@ -0,0 +1,22 @@ +package com.alibaba.datax.core.util.container; + + +import com.alibaba.datax.common.util.Configuration; +import org.apache.commons.lang3.StringUtils; + +import java.io.File; + +/** + * @author fuyouj + */ +public class PluginLoaderFactory { + + public static ClassLoader create(Configuration configuration) { + String path = configuration.getString("path"); + if (StringUtils.isNotBlank(path) && path.contains(File.separator)) { + return new JarLoader(new String[]{path}); + } else { + return new PluginClassLoader(); + } + } +} diff --git a/datax-example/pom.xml b/datax-example/pom.xml new file mode 100644 index 00000000..6d136313 --- /dev/null +++ b/datax-example/pom.xml @@ -0,0 +1,42 @@ + + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + + datax-example + + + 8 + 8 + UTF-8 + + + + com.alibaba.datax + datax-common + 0.0.1-SNAPSHOT + + + com.alibaba.datax + datax-core + 0.0.1-SNAPSHOT + + + com.alibaba.datax + streamwriter + 0.0.1-SNAPSHOT + + + com.alibaba.datax + streamreader + 0.0.1-SNAPSHOT + + + + \ No newline at end of file diff --git a/datax-example/src/main/java/com/alibaba/datax/example/Main.java b/datax-example/src/main/java/com/alibaba/datax/example/Main.java new file mode 100644 index 00000000..cb6f55b6 --- /dev/null +++ b/datax-example/src/main/java/com/alibaba/datax/example/Main.java @@ -0,0 +1,25 @@ +package com.alibaba.datax.example; + + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.core.Engine; +import com.alibaba.datax.example.util.ExampleConfigParser; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Paths; + +/** + * @author fuyouj + */ +public class Main { + public static void main(String[] args) throws URISyntaxException { + URL resource = Main.class.getResource("/job/stream2stream.json"); + URI uri = resource.toURI(); + String path = Paths.get(uri).toString(); + Configuration configuration = ExampleConfigParser.parse(path); + Engine engine = new Engine(); + engine.start(configuration); + } +} diff --git a/datax-example/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java b/datax-example/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java new file mode 100644 index 00000000..c10b8ce5 --- /dev/null +++ b/datax-example/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java @@ -0,0 +1,98 @@ +package com.alibaba.datax.example.util; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.core.util.ConfigParser; +import com.alibaba.datax.core.util.FrameworkErrorCode; +import com.alibaba.datax.core.util.container.CoreConstant; + +import java.io.File; +import java.net.URL; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; + +/** + * @author fuyouj + */ +public class ExampleConfigParser { + private static final String CORE_CONF = "/example/conf/core.json"; + + private static final String PLUGIN_DESC_FILE = "plugin.json"; + + /** + * 指定Job配置路径,ConfigParser会解析Job、Plugin、Core全部信息,并以Configuration返回 + * 不同于Core的ConfigParser,这里的core,plugin 不依赖于编译后的datax.home,而是扫描程序目录 + */ + public static Configuration parse(final String jobPath) { + + Configuration configuration = ConfigParser.parseJobConfig(jobPath); + configuration.merge(coreConfig(), + false); + + Map pluginTypeMap = new HashMap<>(); + String readerName = configuration.getString(CoreConstant.DATAX_JOB_CONTENT_READER_NAME); + String writerName = configuration.getString(CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME); + pluginTypeMap.put(readerName, "reader"); + pluginTypeMap.put(writerName, "writer"); + Configuration pluginsDescConfig = parsePluginsConfig(pluginTypeMap); + configuration.merge(pluginsDescConfig, false); + return configuration; + } + + public static Configuration parsePluginsConfig(Map pluginTypeMap) { + + Configuration configuration = Configuration.newDefault(); + + String workingDirectory = System.getProperty("user.dir"); + File file = new File(workingDirectory); + scanPlugin(configuration, file.listFiles(), pluginTypeMap); + if (!pluginTypeMap.isEmpty()) { + throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR, + "load plugin failed,未完成指定插件加载:" + + pluginTypeMap.keySet()); + } + return configuration; + } + + private static void scanPlugin(Configuration configuration, File[] files, Map pluginTypeMap) { + if (files == null) { + return; + } + for (File file : files) { + if (file.isFile() && PLUGIN_DESC_FILE.equals(file.getName())) { + Configuration pluginDesc = Configuration.from(file); + String descPluginName = pluginDesc.getString("name", ""); + + if (pluginTypeMap.containsKey(descPluginName)) { + + String type = pluginTypeMap.get(descPluginName); + configuration.merge(parseOnePlugin(type, descPluginName, pluginDesc), false); + pluginTypeMap.remove(descPluginName); + + } + } else { + scanPlugin(configuration, file.listFiles(), pluginTypeMap); + } + } + } + + private static Configuration parseOnePlugin(String pluginType, String pluginName, Configuration pluginDesc) { + + Configuration pluginConfInJob = Configuration.newDefault(); + pluginConfInJob.set( + String.format("plugin.%s.%s", pluginType, pluginName), + pluginDesc.getInternal()); + return pluginConfInJob; + } + + private static Configuration coreConfig() { + try { + URL resource = ExampleConfigParser.class.getResource(CORE_CONF); + return Configuration.from(Paths.get(resource.toURI()).toFile()); + } catch (Exception ignore) { + throw DataXException.asDataXException("Failed to load the configuration file core.json. " + + "Please check whether /example/conf/core.json exists!"); + } + } +} diff --git a/datax-example/src/main/resources/example/conf/core.json b/datax-example/src/main/resources/example/conf/core.json new file mode 100755 index 00000000..33281ac0 --- /dev/null +++ b/datax-example/src/main/resources/example/conf/core.json @@ -0,0 +1,60 @@ +{ + "entry": { + "jvm": "-Xms1G -Xmx1G", + "environment": {} + }, + "common": { + "column": { + "datetimeFormat": "yyyy-MM-dd HH:mm:ss", + "timeFormat": "HH:mm:ss", + "dateFormat": "yyyy-MM-dd", + "extraFormats":["yyyyMMdd"], + "timeZone": "GMT+8", + "encoding": "utf-8" + } + }, + "core": { + "dataXServer": { + "address": "http://localhost:7001/api", + "timeout": 10000, + "reportDataxLog": false, + "reportPerfLog": false + }, + "transport": { + "channel": { + "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel", + "speed": { + "byte": -1, + "record": -1 + }, + "flowControlInterval": 20, + "capacity": 512, + "byteCapacity": 67108864 + }, + "exchanger": { + "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger", + "bufferSize": 32 + } + }, + "container": { + "job": { + "reportInterval": 10000 + }, + "taskGroup": { + "channel": 5 + }, + "trace": { + "enable": "false" + } + + }, + "statistics": { + "collector": { + "plugin": { + "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector", + "maxDirtyNumber": 10 + } + } + } + } +} diff --git a/datax-example/src/main/resources/job/stream2stream.json b/datax-example/src/main/resources/job/stream2stream.json new file mode 100644 index 00000000..b2a57395 --- /dev/null +++ b/datax-example/src/main/resources/job/stream2stream.json @@ -0,0 +1,36 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "sliceRecordCount": 10, + "column": [ + { + "type": "long", + "value": "10" + }, + { + "type": "string", + "value": "hello,你好,世界-DataX" + } + ] + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "encoding": "UTF-8", + "print": true + } + } + } + ], + "setting": { + "speed": { + "channel": 5 + } + } + } +} \ No newline at end of file diff --git a/datax-example/src/main/resources/stream2stream.json b/datax-example/src/main/resources/stream2stream.json new file mode 100644 index 00000000..b2a57395 --- /dev/null +++ b/datax-example/src/main/resources/stream2stream.json @@ -0,0 +1,36 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "sliceRecordCount": 10, + "column": [ + { + "type": "long", + "value": "10" + }, + { + "type": "string", + "value": "hello,你好,世界-DataX" + } + ] + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "encoding": "UTF-8", + "print": true + } + } + } + ], + "setting": { + "speed": { + "channel": 5 + } + } + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index eafde57b..4af27ee3 100644 --- a/pom.xml +++ b/pom.xml @@ -129,7 +129,8 @@ plugin-rdbms-util plugin-unstructured-storage-util - + datax-example +