diff --git a/core/src/main/java/com/alibaba/datax/core/util/container/LoadUtil.java b/core/src/main/java/com/alibaba/datax/core/util/container/LoadUtil.java index 0891e4d6..9a6a8302 100755 --- a/core/src/main/java/com/alibaba/datax/core/util/container/LoadUtil.java +++ b/core/src/main/java/com/alibaba/datax/core/util/container/LoadUtil.java @@ -49,7 +49,7 @@ public class LoadUtil { /** * jarLoader的缓冲 */ - private static Map jarLoaderCenter = new HashMap(); + private static Map jarLoaderCenter = new HashMap(); /** * 设置pluginConfigs,方便后面插件来获取 @@ -167,7 +167,7 @@ public class LoadUtil { PluginType pluginType, String pluginName, ContainerType pluginRunType) { Configuration pluginConf = getPluginConf(pluginType, pluginName); - ClassLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName); + JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName); try { return (Class) jarLoader .loadClass(pluginConf.getString("class") + "$" @@ -177,14 +177,22 @@ public class LoadUtil { } } - public static synchronized ClassLoader getJarLoader(PluginType pluginType, + public static synchronized JarLoader getJarLoader(PluginType pluginType, String pluginName) { Configuration pluginConf = getPluginConf(pluginType, pluginName); - ClassLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType, + JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType, pluginName)); if (null == jarLoader) { - jarLoader = PluginLoaderFactory.create(pluginConf,pluginType,pluginName); + String pluginPath = pluginConf.getString("path"); + if (StringUtils.isBlank(pluginPath)) { + throw DataXException.asDataXException( + FrameworkErrorCode.RUNTIME_ERROR, + String.format( + "%s插件[%s]路径非法!", + pluginType, pluginName)); + } + jarLoader = new JarLoader(new String[]{pluginPath}); jarLoaderCenter.put(generatePluginKey(pluginType, pluginName), jarLoader); } diff --git a/datax-example/doc/README.md b/datax-example/doc/README.md index 94a5e46d..467c2d37 100644 --- a/datax-example/doc/README.md +++ b/datax-example/doc/README.md @@ -21,28 +21,82 @@ ### 实现原理 -- 不修改原有的ConfigParer,使用新的ExampleConfigParser,仅用于example模块。 -- 提供新的PluginLoader 插件加载器,可以从程序运行目录获取插件,与JarLoader各司其职。 +- 不修改原有的ConfigParer,使用新的ExampleConfigParser,仅用于example模块。他不依赖datax.home,而是依赖ide编译后的target目录 +- 将ide的target目录作为每个插件的目录类加载目录。 ![img](img/img02.png) -### 如何使用 +### 如何使用 +1.修改插件的pom文件,做如下改动。以streamreader为例。
+改动前 +```xml + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + +``` +改动后 +```xml + + + + + src/main/resources + + **/*.* + + true + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + +``` +2.在datax-example模块引入你需要的插件,默认只引入了streamreader、writer + +3.打开datax-example的Main class ```java public class Main { - public static void main(String[] args) { - //1. 在 datax-example pom文件中加入测试插件模块的依赖,默认导入了streamreader/writer - //2. 在此处指定你的测试文件路径 - String path = "/job/stream2stream.json"; - Configuration configuration = ExampleConfigParser.parse( - PathUtil.getAbsolutePathFromClassPath(path) - ); + /** + * 注意! + * 1.在example模块pom文件添加你依赖的的调试插件, + * 你可以直接打开本模块的pom文件,参考是如何引入streamreader,streamwriter + * 2. 在此处指定你的job文件 + */ + public static void main(String[] args) { + + String classPathJobPath = "/job/stream2stream.json"; + String absJobPath = PathUtil.getAbsolutePathFromClassPath(classPathJobPath); + startExample(absJobPath); + } + + public static void startExample(String jobPath) { + + Configuration configuration = ExampleConfigParser.parse(jobPath); Engine engine = new Engine(); engine.start(configuration); } + } -``` -### 注意 -此模块不参与打包 +``` \ No newline at end of file diff --git a/datax-example/doc/img/img02.png b/datax-example/doc/img/img02.png index 5e535a4c..eec860d4 100644 Binary files a/datax-example/doc/img/img02.png and b/datax-example/doc/img/img02.png differ diff --git a/datax-example/pom.xml b/datax-example/pom.xml index f8285ae8..9f077a28 100644 --- a/datax-example/pom.xml +++ b/datax-example/pom.xml @@ -28,6 +28,7 @@ datax-core 0.0.1-SNAPSHOT + com.alibaba.datax streamwriter @@ -49,12 +50,31 @@ - src/main/java + src/main/resources - **/*.properties + **/*.* + true + + + src/main/resources + + **/*.* + + true + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + \ No newline at end of file diff --git a/datax-example/src/main/java/com/alibaba/datax/example/Main.java b/datax-example/src/main/java/com/alibaba/datax/example/Main.java index 1e600753..afb6829c 100644 --- a/datax-example/src/main/java/com/alibaba/datax/example/Main.java +++ b/datax-example/src/main/java/com/alibaba/datax/example/Main.java @@ -10,15 +10,25 @@ import com.alibaba.datax.example.util.PathUtil; * @author fuyouj */ public class Main { - public static void main(String[] args) { - //在此处指定你的测试文件路径 - String path = "/job/stream2stream.json"; - Configuration configuration = ExampleConfigParser.parse( - PathUtil.getAbsolutePathFromClassPath(path) - ); + /** + * 1.在example模块pom文件添加你依赖的的调试插件, + * 你可以直接打开本模块的pom文件,参考是如何引入streamreader,streamwriter + * 2. 在此处指定你的job文件 + */ + public static void main(String[] args) { + + String classPathJobPath = "/job/stream2stream.json"; + String absJobPath = PathUtil.getAbsolutePathFromClassPath(classPathJobPath); + startExample(absJobPath); + } + + public static void startExample(String jobPath) { + + Configuration configuration = ExampleConfigParser.parse(jobPath); Engine engine = new Engine(); engine.start(configuration); } + } diff --git a/datax-example/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java b/datax-example/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java index af4f2c18..cf13c9ba 100644 --- a/datax-example/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java +++ b/datax-example/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java @@ -7,10 +7,10 @@ import com.alibaba.datax.core.util.FrameworkErrorCode; import com.alibaba.datax.core.util.container.CoreConstant; import java.io.File; +import java.io.IOException; import java.net.URL; import java.nio.file.Paths; -import java.util.HashMap; -import java.util.Map; +import java.util.*; /** * @author fuyouj @@ -44,9 +44,15 @@ public class ExampleConfigParser { Configuration configuration = Configuration.newDefault(); - String workingDirectory = System.getProperty("user.dir"); - File file = new File(workingDirectory); - scanPlugin(configuration, file.listFiles(), pluginTypeMap); + //最初打算通过user.dir获取工作目录来扫描插件, + //但是user.dir在不同有一些不确定性,所以废弃了这个选择 + + for (File basePackage : runtimeBasePackages()) { + if (pluginTypeMap.isEmpty()) { + break; + } + scanPluginByPackage(basePackage, configuration, basePackage.listFiles(), pluginTypeMap); + } if (!pluginTypeMap.isEmpty()) { throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR, "load plugin failed,未完成指定插件加载:" @@ -55,7 +61,42 @@ public class ExampleConfigParser { return configuration; } - private static void scanPlugin(Configuration configuration, File[] files, Map pluginTypeMap) { + /** + * 通过classLoader获取程序编译的输出目录 + * @return File[/datax-example/target/classes,xxReader/target/classes,xxWriter/target/classes] + */ + private static File[] runtimeBasePackages() { + List basePackages = new ArrayList<>(); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + Enumeration resources = null; + try { + resources = classLoader.getResources(""); + } catch (IOException e) { + throw DataXException.asDataXException(e.getMessage()); + } + + while (resources.hasMoreElements()) { + URL resource = resources.nextElement(); + File file = new File(resource.getFile()); + if (file.isDirectory()) { + basePackages.add(file); + } + } + + return basePackages.toArray(new File[0]); + } + + /** + * + * @param packageFile 编译出来的target/classes根目录 便于找到插件时设置插件的URL目录,设置根目录是最保险的方式 + * @param configuration pluginConfig + * @param files 待扫描文件 + * @param needPluginTypeMap 需要的插件 + */ + private static void scanPluginByPackage(File packageFile, + Configuration configuration, + File[] files, + Map needPluginTypeMap) { if (files == null) { return; } @@ -64,22 +105,26 @@ public class ExampleConfigParser { Configuration pluginDesc = Configuration.from(file); String descPluginName = pluginDesc.getString("name", ""); - if (pluginTypeMap.containsKey(descPluginName)) { + if (needPluginTypeMap.containsKey(descPluginName)) { - String type = pluginTypeMap.get(descPluginName); - configuration.merge(parseOnePlugin(type, descPluginName, pluginDesc), false); - pluginTypeMap.remove(descPluginName); + String type = needPluginTypeMap.get(descPluginName); + configuration.merge(parseOnePlugin(packageFile.getAbsolutePath(),type, descPluginName, pluginDesc), false); + needPluginTypeMap.remove(descPluginName); } } else { - scanPlugin(configuration, file.listFiles(), pluginTypeMap); + scanPluginByPackage(packageFile,configuration, file.listFiles(), needPluginTypeMap); } } } - private static Configuration parseOnePlugin(String pluginType, String pluginName, Configuration pluginDesc) { - pluginDesc.set("loadType","pluginLoader"); + private static Configuration parseOnePlugin(String packagePath, + String pluginType, + String pluginName, + Configuration pluginDesc) { + //设置path 兼容jarLoader的加载方式URLClassLoader + pluginDesc.set("path", packagePath); Configuration pluginConfInJob = Configuration.newDefault(); pluginConfInJob.set( String.format("plugin.%s.%s", pluginType, pluginName), diff --git a/datax-example/src/test/java/com/alibaba/datax/example/DataXExampleTest.java b/datax-example/src/test/java/com/alibaba/datax/example/DataXExampleTest.java index 12aac3fe..06af835b 100644 --- a/datax-example/src/test/java/com/alibaba/datax/example/DataXExampleTest.java +++ b/datax-example/src/test/java/com/alibaba/datax/example/DataXExampleTest.java @@ -1,24 +1,16 @@ package com.alibaba.datax.example; -import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.core.Engine; -import com.alibaba.datax.example.util.ExampleConfigParser; import com.alibaba.datax.example.util.PathUtil; import org.junit.Test; -public class DataXExampleTest extends ExampleTestTemplate { +public class DataXExampleTest { @Test public void testStreamReader2StreamWriter() { - String path = "/job/stream2stream.json"; - Configuration testConfiguration = ExampleConfigParser.parse( - PathUtil.getAbsolutePathFromClassPath(path) - ); - Engine engine = new Engine(); - engine.start(testConfiguration); + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + Main.startExample(jobPath); } - } diff --git a/datax-example/src/test/java/com/alibaba/datax/example/util/com/alibaba/datax/example/util/ExampleConfigParserTest.java b/datax-example/src/test/java/com/alibaba/datax/example/util/com/alibaba/datax/example/util/ExampleConfigParserTest.java index 64eab578..5a99e7b1 100644 --- a/datax-example/src/test/java/com/alibaba/datax/example/util/com/alibaba/datax/example/util/ExampleConfigParserTest.java +++ b/datax-example/src/test/java/com/alibaba/datax/example/util/com/alibaba/datax/example/util/ExampleConfigParserTest.java @@ -1,7 +1,6 @@ package com.alibaba.datax.example.util.com.alibaba.datax.example.util; import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.example.ExampleTestTemplate; import com.alibaba.datax.example.util.ExampleConfigParser; import com.alibaba.datax.example.util.PathUtil; import org.junit.Assert; @@ -10,7 +9,7 @@ import org.junit.Test; import java.io.File; -public class ExampleConfigParserTest extends ExampleTestTemplate { +public class ExampleConfigParserTest { @Test diff --git a/pom.xml b/pom.xml index 4af27ee3..c3d87ec9 100644 --- a/pom.xml +++ b/pom.xml @@ -47,88 +47,88 @@ transformer - mysqlreader - drdsreader - sqlserverreader - postgresqlreader - kingbaseesreader - oraclereader - cassandrareader - oceanbasev10reader - rdbmsreader + + + + + + + + + - odpsreader - otsreader - otsstreamreader - hbase11xreader - hbase094xreader - hbase11xsqlreader - hbase20xsqlreader + + + + + + + - ossreader - hdfsreader - ftpreader - txtfilereader + + + + streamreader - clickhousereader + - mongodbreader - tdenginereader - gdbreader - tsdbreader - opentsdbreader - loghubreader - datahubreader - starrocksreader + + + + + + + + - - mysqlwriter - starrockswriter - drdswriter - databendwriter - oraclewriter - sqlserverwriter - postgresqlwriter - kingbaseeswriter - adswriter - oceanbasev10writer - adbpgwriter - hologresjdbcwriter - rdbmswriter + + + + + + + + + + + + + + - odpswriter - osswriter - otswriter - hbase11xwriter - hbase094xwriter - hbase11xsqlwriter - hbase20xsqlwriter - kuduwriter - ftpwriter - hdfswriter - txtfilewriter + + + + + + + + + + + streamwriter - elasticsearchwriter - mongodbwriter - tdenginewriter - ocswriter - tsdbwriter - gdbwriter - oscarwriter - loghubwriter - datahubwriter - cassandrawriter - clickhousewriter - doriswriter - selectdbwriter - adbmysqlwriter - neo4jwriter + + + + + + + + + + + + + + + - plugin-rdbms-util - plugin-unstructured-storage-util + + datax-example diff --git a/streamreader/pom.xml b/streamreader/pom.xml index dc754d9a..7d186076 100755 --- a/streamreader/pom.xml +++ b/streamreader/pom.xml @@ -39,6 +39,16 @@ + + + + src/main/resources + + **/*.* + + true + + diff --git a/streamwriter/pom.xml b/streamwriter/pom.xml index 4a987fac..2fa95d7b 100755 --- a/streamwriter/pom.xml +++ b/streamwriter/pom.xml @@ -34,6 +34,16 @@ + + + + src/main/resources + + **/*.* + + true + +