不依赖于user.dir环境变量,所以插件加载工厂便没有意义了

不依赖于user.dir环境变量,所以插件加载工厂便没有意义了
This commit is contained in:
FuYouJ 2023-08-05 23:11:43 +08:00
parent 03cfbfa2f7
commit af631a0a00
11 changed files with 271 additions and 123 deletions

View File

@ -49,7 +49,7 @@ public class LoadUtil {
/** /**
* jarLoader的缓冲 * jarLoader的缓冲
*/ */
private static Map<String, ClassLoader> jarLoaderCenter = new HashMap(); private static Map<String, JarLoader> jarLoaderCenter = new HashMap();
/** /**
* 设置pluginConfigs方便后面插件来获取 * 设置pluginConfigs方便后面插件来获取
@ -167,7 +167,7 @@ public class LoadUtil {
PluginType pluginType, String pluginName, PluginType pluginType, String pluginName,
ContainerType pluginRunType) { ContainerType pluginRunType) {
Configuration pluginConf = getPluginConf(pluginType, pluginName); Configuration pluginConf = getPluginConf(pluginType, pluginName);
ClassLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName); JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);
try { try {
return (Class<? extends AbstractPlugin>) jarLoader return (Class<? extends AbstractPlugin>) jarLoader
.loadClass(pluginConf.getString("class") + "$" .loadClass(pluginConf.getString("class") + "$"
@ -177,14 +177,22 @@ public class LoadUtil {
} }
} }
public static synchronized ClassLoader getJarLoader(PluginType pluginType, public static synchronized JarLoader getJarLoader(PluginType pluginType,
String pluginName) { String pluginName) {
Configuration pluginConf = getPluginConf(pluginType, pluginName); Configuration pluginConf = getPluginConf(pluginType, pluginName);
ClassLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType, JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,
pluginName)); pluginName));
if (null == jarLoader) { if (null == jarLoader) {
jarLoader = PluginLoaderFactory.create(pluginConf,pluginType,pluginName); String pluginPath = pluginConf.getString("path");
if (StringUtils.isBlank(pluginPath)) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
String.format(
"%s插件[%s]路径非法!",
pluginType, pluginName));
}
jarLoader = new JarLoader(new String[]{pluginPath});
jarLoaderCenter.put(generatePluginKey(pluginType, pluginName), jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),
jarLoader); jarLoader);
} }

View File

@ -21,28 +21,82 @@
### 实现原理 ### 实现原理
- 不修改原有的ConfigParer,使用新的ExampleConfigParser,仅用于example模块。 - 不修改原有的ConfigParer,使用新的ExampleConfigParser,仅用于example模块。他不依赖datax.home,而是依赖ide编译后的target目录
- 提供新的PluginLoader 插件加载器可以从程序运行目录获取插件与JarLoader各司其职 - 将ide的target目录作为每个插件的目录类加载目录
![img](img/img02.png) ![img](img/img02.png)
### 如何使用 ### 如何使用
1.修改插件的pom文件做如下改动。以streamreader为例。<br/>
改动前
```xml
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
</build>
```
改动后
```xml
<build>
<resources>
<!--将resource目录也输出到target-->
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.*</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
</build>
```
2.在datax-example模块引入你需要的插件默认只引入了streamreader、writer
3.打开datax-example的Main class
```java ```java
public class Main { public class Main {
public static void main(String[] args) {
//1. 在 datax-example pom文件中加入测试插件模块的依赖默认导入了streamreader/writer
//2. 在此处指定你的测试文件路径
String path = "/job/stream2stream.json";
Configuration configuration = ExampleConfigParser.parse( /**
PathUtil.getAbsolutePathFromClassPath(path) * 注意!
); * 1.在example模块pom文件添加你依赖的的调试插件
* 你可以直接打开本模块的pom文件,参考是如何引入streamreaderstreamwriter
* 2. 在此处指定你的job文件
*/
public static void main(String[] args) {
String classPathJobPath = "/job/stream2stream.json";
String absJobPath = PathUtil.getAbsolutePathFromClassPath(classPathJobPath);
startExample(absJobPath);
}
public static void startExample(String jobPath) {
Configuration configuration = ExampleConfigParser.parse(jobPath);
Engine engine = new Engine(); Engine engine = new Engine();
engine.start(configuration); engine.start(configuration);
} }
} }
``` ```
### 注意
此模块不参与打包

Binary file not shown.

Before

Width:  |  Height:  |  Size: 61 KiB

After

Width:  |  Height:  |  Size: 66 KiB

View File

@ -28,6 +28,7 @@
<artifactId>datax-core</artifactId> <artifactId>datax-core</artifactId>
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
</dependency> </dependency>
<!-- 默认引入streamwriter,streamreader-->
<dependency> <dependency>
<groupId>com.alibaba.datax</groupId> <groupId>com.alibaba.datax</groupId>
<artifactId>streamwriter</artifactId> <artifactId>streamwriter</artifactId>
@ -49,12 +50,31 @@
<build> <build>
<resources> <resources>
<resource> <resource>
<directory>src/main/java</directory> <directory>src/main/resources</directory>
<includes> <includes>
<include>**/*.properties</include> <include>**/*.*</include>
</includes> </includes>
<filtering>true</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.*</include>
</includes>
<filtering>true</filtering>
</resource> </resource>
</resources> </resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
</build> </build>
</project> </project>

View File

@ -10,15 +10,25 @@ import com.alibaba.datax.example.util.PathUtil;
* @author fuyouj * @author fuyouj
*/ */
public class Main { public class Main {
public static void main(String[] args) {
//在此处指定你的测试文件路径
String path = "/job/stream2stream.json";
Configuration configuration = ExampleConfigParser.parse( /**
PathUtil.getAbsolutePathFromClassPath(path) * 1.在example模块pom文件添加你依赖的的调试插件
); * 你可以直接打开本模块的pom文件,参考是如何引入streamreaderstreamwriter
* 2. 在此处指定你的job文件
*/
public static void main(String[] args) {
String classPathJobPath = "/job/stream2stream.json";
String absJobPath = PathUtil.getAbsolutePathFromClassPath(classPathJobPath);
startExample(absJobPath);
}
public static void startExample(String jobPath) {
Configuration configuration = ExampleConfigParser.parse(jobPath);
Engine engine = new Engine(); Engine engine = new Engine();
engine.start(configuration); engine.start(configuration);
} }
} }

View File

@ -7,10 +7,10 @@ import com.alibaba.datax.core.util.FrameworkErrorCode;
import com.alibaba.datax.core.util.container.CoreConstant; import com.alibaba.datax.core.util.container.CoreConstant;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.net.URL; import java.net.URL;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.HashMap; import java.util.*;
import java.util.Map;
/** /**
* @author fuyouj * @author fuyouj
@ -44,9 +44,15 @@ public class ExampleConfigParser {
Configuration configuration = Configuration.newDefault(); Configuration configuration = Configuration.newDefault();
String workingDirectory = System.getProperty("user.dir"); //最初打算通过user.dir获取工作目录来扫描插件
File file = new File(workingDirectory); //但是user.dir在不同有一些不确定性所以废弃了这个选择
scanPlugin(configuration, file.listFiles(), pluginTypeMap);
for (File basePackage : runtimeBasePackages()) {
if (pluginTypeMap.isEmpty()) {
break;
}
scanPluginByPackage(basePackage, configuration, basePackage.listFiles(), pluginTypeMap);
}
if (!pluginTypeMap.isEmpty()) { if (!pluginTypeMap.isEmpty()) {
throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR, throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR,
"load plugin failed未完成指定插件加载:" "load plugin failed未完成指定插件加载:"
@ -55,7 +61,42 @@ public class ExampleConfigParser {
return configuration; return configuration;
} }
private static void scanPlugin(Configuration configuration, File[] files, Map<String, String> pluginTypeMap) { /**
* 通过classLoader获取程序编译的输出目录
* @return File[/datax-example/target/classes,xxReader/target/classes,xxWriter/target/classes]
*/
private static File[] runtimeBasePackages() {
List<File> basePackages = new ArrayList<>();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Enumeration<URL> resources = null;
try {
resources = classLoader.getResources("");
} catch (IOException e) {
throw DataXException.asDataXException(e.getMessage());
}
while (resources.hasMoreElements()) {
URL resource = resources.nextElement();
File file = new File(resource.getFile());
if (file.isDirectory()) {
basePackages.add(file);
}
}
return basePackages.toArray(new File[0]);
}
/**
*
* @param packageFile 编译出来的target/classes根目录 便于找到插件时设置插件的URL目录设置根目录是最保险的方式
* @param configuration pluginConfig
* @param files 待扫描文件
* @param needPluginTypeMap 需要的插件
*/
private static void scanPluginByPackage(File packageFile,
Configuration configuration,
File[] files,
Map<String, String> needPluginTypeMap) {
if (files == null) { if (files == null) {
return; return;
} }
@ -64,22 +105,26 @@ public class ExampleConfigParser {
Configuration pluginDesc = Configuration.from(file); Configuration pluginDesc = Configuration.from(file);
String descPluginName = pluginDesc.getString("name", ""); String descPluginName = pluginDesc.getString("name", "");
if (pluginTypeMap.containsKey(descPluginName)) { if (needPluginTypeMap.containsKey(descPluginName)) {
String type = pluginTypeMap.get(descPluginName); String type = needPluginTypeMap.get(descPluginName);
configuration.merge(parseOnePlugin(type, descPluginName, pluginDesc), false); configuration.merge(parseOnePlugin(packageFile.getAbsolutePath(),type, descPluginName, pluginDesc), false);
pluginTypeMap.remove(descPluginName); needPluginTypeMap.remove(descPluginName);
} }
} else { } else {
scanPlugin(configuration, file.listFiles(), pluginTypeMap); scanPluginByPackage(packageFile,configuration, file.listFiles(), needPluginTypeMap);
} }
} }
} }
private static Configuration parseOnePlugin(String pluginType, String pluginName, Configuration pluginDesc) {
pluginDesc.set("loadType","pluginLoader"); private static Configuration parseOnePlugin(String packagePath,
String pluginType,
String pluginName,
Configuration pluginDesc) {
//设置path 兼容jarLoader的加载方式URLClassLoader
pluginDesc.set("path", packagePath);
Configuration pluginConfInJob = Configuration.newDefault(); Configuration pluginConfInJob = Configuration.newDefault();
pluginConfInJob.set( pluginConfInJob.set(
String.format("plugin.%s.%s", pluginType, pluginName), String.format("plugin.%s.%s", pluginType, pluginName),

View File

@ -1,24 +1,16 @@
package com.alibaba.datax.example; package com.alibaba.datax.example;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.Engine;
import com.alibaba.datax.example.util.ExampleConfigParser;
import com.alibaba.datax.example.util.PathUtil; import com.alibaba.datax.example.util.PathUtil;
import org.junit.Test; import org.junit.Test;
public class DataXExampleTest extends ExampleTestTemplate { public class DataXExampleTest {
@Test @Test
public void testStreamReader2StreamWriter() { public void testStreamReader2StreamWriter() {
String path = "/job/stream2stream.json"; String path = "/job/stream2stream.json";
Configuration testConfiguration = ExampleConfigParser.parse( String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
PathUtil.getAbsolutePathFromClassPath(path) Main.startExample(jobPath);
);
Engine engine = new Engine();
engine.start(testConfiguration);
} }
} }

View File

@ -1,7 +1,6 @@
package com.alibaba.datax.example.util.com.alibaba.datax.example.util; package com.alibaba.datax.example.util.com.alibaba.datax.example.util;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.example.ExampleTestTemplate;
import com.alibaba.datax.example.util.ExampleConfigParser; import com.alibaba.datax.example.util.ExampleConfigParser;
import com.alibaba.datax.example.util.PathUtil; import com.alibaba.datax.example.util.PathUtil;
import org.junit.Assert; import org.junit.Assert;
@ -10,7 +9,7 @@ import org.junit.Test;
import java.io.File; import java.io.File;
public class ExampleConfigParserTest extends ExampleTestTemplate { public class ExampleConfigParserTest {
@Test @Test

142
pom.xml
View File

@ -47,88 +47,88 @@
<module>transformer</module> <module>transformer</module>
<!-- reader --> <!-- reader -->
<module>mysqlreader</module> <!-- <module>mysqlreader</module>-->
<module>drdsreader</module> <!-- <module>drdsreader</module>-->
<module>sqlserverreader</module> <!-- <module>sqlserverreader</module>-->
<module>postgresqlreader</module> <!-- <module>postgresqlreader</module>-->
<module>kingbaseesreader</module> <!-- <module>kingbaseesreader</module>-->
<module>oraclereader</module> <!-- <module>oraclereader</module>-->
<module>cassandrareader</module> <!-- <module>cassandrareader</module>-->
<module>oceanbasev10reader</module> <!-- <module>oceanbasev10reader</module>-->
<module>rdbmsreader</module> <!-- <module>rdbmsreader</module>-->
<module>odpsreader</module> <!-- <module>odpsreader</module>-->
<module>otsreader</module> <!-- <module>otsreader</module>-->
<module>otsstreamreader</module> <!-- <module>otsstreamreader</module>-->
<module>hbase11xreader</module> <!-- <module>hbase11xreader</module>-->
<module>hbase094xreader</module> <!-- <module>hbase094xreader</module>-->
<module>hbase11xsqlreader</module> <!-- <module>hbase11xsqlreader</module>-->
<module>hbase20xsqlreader</module> <!-- <module>hbase20xsqlreader</module>-->
<module>ossreader</module> <!-- <module>ossreader</module>-->
<module>hdfsreader</module> <!-- <module>hdfsreader</module>-->
<module>ftpreader</module> <!-- <module>ftpreader</module>-->
<module>txtfilereader</module> <!-- <module>txtfilereader</module>-->
<module>streamreader</module> <module>streamreader</module>
<module>clickhousereader</module> <!-- <module>clickhousereader</module>-->
<module>mongodbreader</module> <!-- <module>mongodbreader</module>-->
<module>tdenginereader</module> <!-- <module>tdenginereader</module>-->
<module>gdbreader</module> <!-- <module>gdbreader</module>-->
<module>tsdbreader</module> <!-- <module>tsdbreader</module>-->
<module>opentsdbreader</module> <!-- <module>opentsdbreader</module>-->
<module>loghubreader</module> <!-- <module>loghubreader</module>-->
<module>datahubreader</module> <!-- <module>datahubreader</module>-->
<module>starrocksreader</module> <!-- <module>starrocksreader</module>-->
<!-- writer --> <!-- &lt;!&ndash; writer &ndash;&gt;-->
<module>mysqlwriter</module> <!-- <module>mysqlwriter</module>-->
<module>starrockswriter</module> <!-- <module>starrockswriter</module>-->
<module>drdswriter</module> <!-- <module>drdswriter</module>-->
<module>databendwriter</module> <!-- <module>databendwriter</module>-->
<module>oraclewriter</module> <!-- <module>oraclewriter</module>-->
<module>sqlserverwriter</module> <!-- <module>sqlserverwriter</module>-->
<module>postgresqlwriter</module> <!-- <module>postgresqlwriter</module>-->
<module>kingbaseeswriter</module> <!-- <module>kingbaseeswriter</module>-->
<module>adswriter</module> <!-- <module>adswriter</module>-->
<module>oceanbasev10writer</module> <!-- <module>oceanbasev10writer</module>-->
<module>adbpgwriter</module> <!-- <module>adbpgwriter</module>-->
<module>hologresjdbcwriter</module> <!-- <module>hologresjdbcwriter</module>-->
<module>rdbmswriter</module> <!-- <module>rdbmswriter</module>-->
<module>odpswriter</module> <!-- <module>odpswriter</module>-->
<module>osswriter</module> <!-- <module>osswriter</module>-->
<module>otswriter</module> <!-- <module>otswriter</module>-->
<module>hbase11xwriter</module> <!-- <module>hbase11xwriter</module>-->
<module>hbase094xwriter</module> <!-- <module>hbase094xwriter</module>-->
<module>hbase11xsqlwriter</module> <!-- <module>hbase11xsqlwriter</module>-->
<module>hbase20xsqlwriter</module> <!-- <module>hbase20xsqlwriter</module>-->
<module>kuduwriter</module> <!-- <module>kuduwriter</module>-->
<module>ftpwriter</module> <!-- <module>ftpwriter</module>-->
<module>hdfswriter</module> <!-- <module>hdfswriter</module>-->
<module>txtfilewriter</module> <!-- <module>txtfilewriter</module>-->
<module>streamwriter</module> <module>streamwriter</module>
<module>elasticsearchwriter</module> <!-- <module>elasticsearchwriter</module>-->
<module>mongodbwriter</module> <!-- <module>mongodbwriter</module>-->
<module>tdenginewriter</module> <!-- <module>tdenginewriter</module>-->
<module>ocswriter</module> <!-- <module>ocswriter</module>-->
<module>tsdbwriter</module> <!-- <module>tsdbwriter</module>-->
<module>gdbwriter</module> <!-- <module>gdbwriter</module>-->
<module>oscarwriter</module> <!-- <module>oscarwriter</module>-->
<module>loghubwriter</module> <!-- <module>loghubwriter</module>-->
<module>datahubwriter</module> <!-- <module>datahubwriter</module>-->
<module>cassandrawriter</module> <!-- <module>cassandrawriter</module>-->
<module>clickhousewriter</module> <!-- <module>clickhousewriter</module>-->
<module>doriswriter</module> <!-- <module>doriswriter</module>-->
<module>selectdbwriter</module> <!-- <module>selectdbwriter</module>-->
<module>adbmysqlwriter</module> <!-- <module>adbmysqlwriter</module>-->
<module>neo4jwriter</module> <!-- <module>neo4jwriter</module>-->
<!-- common support module --> <!-- common support module -->
<module>plugin-rdbms-util</module> <!-- <module>plugin-rdbms-util</module>-->
<module>plugin-unstructured-storage-util</module> <!-- <module>plugin-unstructured-storage-util</module>-->
<module>datax-example</module> <module>datax-example</module>
</modules> </modules>

View File

@ -39,6 +39,16 @@
</dependencies> </dependencies>
<build> <build>
<resources>
<!--将resource目录也输出到target-->
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.*</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
<plugins> <plugins>
<!-- compiler plugin --> <!-- compiler plugin -->
<plugin> <plugin>

View File

@ -34,6 +34,16 @@
</dependencies> </dependencies>
<build> <build>
<resources>
<!-- 将resource目录也输出到target-->
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.*</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
<plugins> <plugins>
<!-- compiler plugin --> <!-- compiler plugin -->
<plugin> <plugin>