mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 06:50:35 +08:00
添加测试,添加文档,修改PluginLoaderFactory工程类
添加测试,添加文档,修改PluginLoaderFactory工程类
This commit is contained in:
parent
f0128e5853
commit
d747e1d292
@ -168,6 +168,7 @@ public final class ConfigParser {
|
||||
boolean isDefaultPath = StringUtils.isBlank(pluginPath);
|
||||
if (isDefaultPath) {
|
||||
configuration.set("path", path);
|
||||
configuration.set("loadType","jarLoader");
|
||||
}
|
||||
|
||||
Configuration result = Configuration.newDefault();
|
||||
|
@ -184,16 +184,7 @@ public class LoadUtil {
|
||||
ClassLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,
|
||||
pluginName));
|
||||
if (null == jarLoader) {
|
||||
String pluginPath = pluginConf.getString("path");
|
||||
// jarLoader = new JarLoader(new String[]{pluginPath});
|
||||
jarLoader = PluginLoaderFactory.create(pluginConf);
|
||||
// if (StringUtils.isBlank(pluginPath)) {
|
||||
// throw DataXException.asDataXException(
|
||||
// FrameworkErrorCode.RUNTIME_ERROR,
|
||||
// String.format(
|
||||
// "%s插件[%s]路径非法!",
|
||||
// pluginType, pluginName));
|
||||
// }
|
||||
jarLoader = PluginLoaderFactory.create(pluginConf,pluginType,pluginName);
|
||||
jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),
|
||||
jarLoader);
|
||||
}
|
||||
|
@ -1,22 +1,60 @@
|
||||
package com.alibaba.datax.core.util.container;
|
||||
|
||||
|
||||
import com.alibaba.datax.common.constant.PluginType;
|
||||
import com.alibaba.datax.common.exception.DataXException;
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import com.alibaba.datax.core.util.FrameworkErrorCode;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
/**
|
||||
* PluginLoader简单工厂
|
||||
* @author fuyouj
|
||||
*/
|
||||
public class PluginLoaderFactory {
|
||||
|
||||
public static ClassLoader create(Configuration configuration) {
|
||||
String path = configuration.getString("path");
|
||||
if (StringUtils.isNotBlank(path) && path.contains(File.separator)) {
|
||||
private static final String JAR_LOADER = "jarLoader";
|
||||
private static final String PLUGIN_LOADER = "pluginLoader";
|
||||
|
||||
public static ClassLoader create(Configuration pluginDescConf, PluginType pluginType, String pluginName) {
|
||||
|
||||
check(pluginDescConf, pluginType, pluginName);
|
||||
|
||||
String loadType = pluginDescConf.getString("loadType");
|
||||
|
||||
if (JAR_LOADER.equalsIgnoreCase(loadType)) {
|
||||
String path = pluginDescConf.getString("path");
|
||||
return new JarLoader(new String[]{path});
|
||||
} else {
|
||||
}
|
||||
|
||||
if (PLUGIN_LOADER.equalsIgnoreCase(loadType)){
|
||||
return new PluginClassLoader();
|
||||
}
|
||||
throw DataXException.asDataXException(
|
||||
FrameworkErrorCode.RUNTIME_ERROR,
|
||||
String.format(
|
||||
"%s插件[%s]无法加载,不支持的loadType,请检查程序代码",
|
||||
pluginType, pluginName));
|
||||
}
|
||||
|
||||
private static void check(Configuration pluginDescConf, PluginType pluginType, String pluginName) {
|
||||
|
||||
String loadType = pluginDescConf.getString("loadType");
|
||||
if (StringUtils.isBlank(loadType)){
|
||||
throw DataXException.asDataXException(
|
||||
FrameworkErrorCode.RUNTIME_ERROR,
|
||||
String.format(
|
||||
"%s插件[%s]无法加载,没有指定的loadType!",
|
||||
pluginType, pluginName));
|
||||
}
|
||||
|
||||
String pluginPath = pluginDescConf.getString("path");
|
||||
if (StringUtils.isBlank(pluginPath) && JAR_LOADER.equalsIgnoreCase(loadType)) {
|
||||
throw DataXException.asDataXException(
|
||||
FrameworkErrorCode.RUNTIME_ERROR,
|
||||
String.format(
|
||||
"%s插件[%s]路径非法!",
|
||||
pluginType, pluginName));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
48
datax-example/doc/README.md
Normal file
48
datax-example/doc/README.md
Normal file
@ -0,0 +1,48 @@
|
||||
## [DataX-Example]调试datax插件的模块
|
||||
|
||||
### 为什么要开发这个模块
|
||||
|
||||
一般使用DataX启动数据同步任务是从datax.py 脚本开始,获取程序datax包目录设置到系统变量datax.home里,此后系统核心插件的加载,配置初始化均依赖于变量datax.home,这带来了一些麻烦,以一次本地 DeBug streamreader 插件为例。
|
||||
|
||||
- maven 打包 datax 生成 datax 目录
|
||||
- 在 IDE 中 设置系统环境变量 datax.home,或者在Engine启动类中硬编码设置datax.home。
|
||||
- 修改插件 streamreader 代码
|
||||
- 再次 maven 打包,使JarLoader 能够加载到最新的 streamreader 代码。
|
||||
- 调试代码
|
||||
|
||||
在以上步骤中,打包完全不必要且最耗时,等待打包也最煎熬。
|
||||
|
||||
所以我编写一个新的模块(datax-example),此模块特用于本地调试和复现 BUG。如果模块顺利编写完成,那么以上流程将被简化至两步。
|
||||
|
||||
- 修改插件 streamreader 代码。
|
||||
- 调试代码
|
||||
|
||||
<img src="img/img01.png" alt="img" style="zoom:40%;" />
|
||||
|
||||
### 实现原理
|
||||
|
||||
- 不修改原有的ConfigParer,使用新的ExampleConfigParser,仅用于example模块。
|
||||
- 提供新的PluginLoader 插件加载器,可以从程序运行目录获取插件,与JarLoader各司其职。
|
||||
|
||||

|
||||
|
||||
### 如何使用
|
||||
|
||||
```java
|
||||
public class Main {
|
||||
public static void main(String[] args) {
|
||||
//1. 在 datax-example pom文件中加入测试插件模块的依赖,默认导入了streamreader/writer
|
||||
//2. 在此处指定你的测试文件路径
|
||||
String path = "/job/stream2stream.json";
|
||||
|
||||
Configuration configuration = ExampleConfigParser.parse(
|
||||
PathUtil.getAbsolutePathFromClassPath(path)
|
||||
);
|
||||
|
||||
Engine engine = new Engine();
|
||||
engine.start(configuration);
|
||||
}
|
||||
}
|
||||
```
|
||||
### 注意
|
||||
此模块不参与打包
|
BIN
datax-example/doc/img/img01.png
Normal file
BIN
datax-example/doc/img/img01.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 71 KiB |
BIN
datax-example/doc/img/img02.png
Normal file
BIN
datax-example/doc/img/img02.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 61 KiB |
@ -15,6 +15,7 @@
|
||||
<maven.compiler.source>8</maven.compiler.source>
|
||||
<maven.compiler.target>8</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<junit4.version>4.13.2</junit4.version>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
@ -37,6 +38,23 @@
|
||||
<artifactId>streamreader</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>${junit4.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<resources>
|
||||
<resource>
|
||||
<directory>src/main/java</directory>
|
||||
<includes>
|
||||
<include>**/*.properties</include>
|
||||
</includes>
|
||||
</resource>
|
||||
</resources>
|
||||
</build>
|
||||
|
||||
</project>
|
@ -4,21 +4,20 @@ package com.alibaba.datax.example;
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import com.alibaba.datax.core.Engine;
|
||||
import com.alibaba.datax.example.util.ExampleConfigParser;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.nio.file.Paths;
|
||||
import com.alibaba.datax.example.util.PathUtil;
|
||||
|
||||
/**
|
||||
* @author fuyouj
|
||||
*/
|
||||
public class Main {
|
||||
public static void main(String[] args) throws URISyntaxException {
|
||||
URL resource = Main.class.getResource("/job/stream2stream.json");
|
||||
URI uri = resource.toURI();
|
||||
String path = Paths.get(uri).toString();
|
||||
Configuration configuration = ExampleConfigParser.parse(path);
|
||||
public static void main(String[] args) {
|
||||
//在此处指定你的测试文件路径
|
||||
String path = "/job/stream2stream.json";
|
||||
|
||||
Configuration configuration = ExampleConfigParser.parse(
|
||||
PathUtil.getAbsolutePathFromClassPath(path)
|
||||
);
|
||||
|
||||
Engine engine = new Engine();
|
||||
engine.start(configuration);
|
||||
}
|
||||
|
@ -40,7 +40,7 @@ public class ExampleConfigParser {
|
||||
return configuration;
|
||||
}
|
||||
|
||||
public static Configuration parsePluginsConfig(Map<String, String> pluginTypeMap) {
|
||||
private static Configuration parsePluginsConfig(Map<String, String> pluginTypeMap) {
|
||||
|
||||
Configuration configuration = Configuration.newDefault();
|
||||
|
||||
@ -79,6 +79,7 @@ public class ExampleConfigParser {
|
||||
|
||||
private static Configuration parseOnePlugin(String pluginType, String pluginName, Configuration pluginDesc) {
|
||||
|
||||
pluginDesc.set("loadType","pluginLoader");
|
||||
Configuration pluginConfInJob = Configuration.newDefault();
|
||||
pluginConfInJob.set(
|
||||
String.format("plugin.%s.%s", pluginType, pluginName),
|
||||
|
@ -0,0 +1,25 @@
|
||||
package com.alibaba.datax.example.util;
|
||||
|
||||
|
||||
import com.alibaba.datax.common.exception.DataXException;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
/**
|
||||
* @author fuyouj
|
||||
*/
|
||||
public class PathUtil {
|
||||
public static String getAbsolutePathFromClassPath(String path) {
|
||||
URL resource = PathUtil.class.getResource(path);
|
||||
try {
|
||||
URI uri = resource.toURI();
|
||||
return Paths.get(uri).toString();
|
||||
} catch (NullPointerException | URISyntaxException e) {
|
||||
throw DataXException.asDataXException("path 路径错误");
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,24 @@
|
||||
package com.alibaba.datax.example;
|
||||
|
||||
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import com.alibaba.datax.core.Engine;
|
||||
import com.alibaba.datax.example.util.ExampleConfigParser;
|
||||
import com.alibaba.datax.example.util.PathUtil;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
public class DataXExampleTest extends ExampleTestTemplate {
|
||||
|
||||
@Test
|
||||
public void testStreamReader2StreamWriter() {
|
||||
|
||||
String path = "/job/stream2stream.json";
|
||||
Configuration testConfiguration = ExampleConfigParser.parse(
|
||||
PathUtil.getAbsolutePathFromClassPath(path)
|
||||
);
|
||||
Engine engine = new Engine();
|
||||
engine.start(testConfiguration);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
package com.alibaba.datax.example;
|
||||
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
/**
|
||||
* {@code Author} FuYouJ
|
||||
* {@code Date} 2023/7/29 18:23
|
||||
*/
|
||||
|
||||
public abstract class ExampleTestTemplate {
|
||||
|
||||
@Before
|
||||
public void fixWorkingDirectory(){
|
||||
String property = System.getProperty("user.dir");
|
||||
File file = new File(property);
|
||||
File parentFile = file.getParentFile();
|
||||
System.setProperty("user.dir",parentFile.getAbsolutePath());
|
||||
}
|
||||
}
|
@ -0,0 +1,34 @@
|
||||
package com.alibaba.datax.example.util.com.alibaba.datax.example.util;
|
||||
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import com.alibaba.datax.example.ExampleTestTemplate;
|
||||
import com.alibaba.datax.example.util.ExampleConfigParser;
|
||||
import com.alibaba.datax.example.util.PathUtil;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
|
||||
public class ExampleConfigParserTest extends ExampleTestTemplate {
|
||||
|
||||
|
||||
@Test
|
||||
public void testExampleConfigParserShouldLoadDefaultConf() {
|
||||
|
||||
String path = "/job/stream2stream.json";
|
||||
Configuration testConfiguration = ExampleConfigParser.parse(
|
||||
PathUtil.getAbsolutePathFromClassPath(path)
|
||||
);
|
||||
Configuration defaultConf = loadDefaultConf();
|
||||
Assert.assertEquals(testConfiguration.get("core"), defaultConf.get("core"));
|
||||
Assert.assertEquals(testConfiguration.get("common"), defaultConf.get("common"));
|
||||
}
|
||||
|
||||
private Configuration loadDefaultConf() {
|
||||
return Configuration.from(
|
||||
new File(PathUtil.getAbsolutePathFromClassPath("/example/conf/core.json")
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
60
datax-example/src/test/resources/example/conf/core.json
Executable file
60
datax-example/src/test/resources/example/conf/core.json
Executable file
@ -0,0 +1,60 @@
|
||||
{
|
||||
"entry": {
|
||||
"jvm": "-Xms1G -Xmx1G",
|
||||
"environment": {}
|
||||
},
|
||||
"common": {
|
||||
"column": {
|
||||
"datetimeFormat": "yyyy-MM-dd HH:mm:ss",
|
||||
"timeFormat": "HH:mm:ss",
|
||||
"dateFormat": "yyyy-MM-dd",
|
||||
"extraFormats":["yyyyMMdd"],
|
||||
"timeZone": "GMT+8",
|
||||
"encoding": "utf-8"
|
||||
}
|
||||
},
|
||||
"core": {
|
||||
"dataXServer": {
|
||||
"address": "http://localhost:7001/api",
|
||||
"timeout": 10000,
|
||||
"reportDataxLog": false,
|
||||
"reportPerfLog": false
|
||||
},
|
||||
"transport": {
|
||||
"channel": {
|
||||
"class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
|
||||
"speed": {
|
||||
"byte": -1,
|
||||
"record": -1
|
||||
},
|
||||
"flowControlInterval": 20,
|
||||
"capacity": 512,
|
||||
"byteCapacity": 67108864
|
||||
},
|
||||
"exchanger": {
|
||||
"class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",
|
||||
"bufferSize": 32
|
||||
}
|
||||
},
|
||||
"container": {
|
||||
"job": {
|
||||
"reportInterval": 10000
|
||||
},
|
||||
"taskGroup": {
|
||||
"channel": 5
|
||||
},
|
||||
"trace": {
|
||||
"enable": "false"
|
||||
}
|
||||
|
||||
},
|
||||
"statistics": {
|
||||
"collector": {
|
||||
"plugin": {
|
||||
"taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector",
|
||||
"maxDirtyNumber": 10
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
36
datax-example/src/test/resources/job/notExistPluginTest.json
Normal file
36
datax-example/src/test/resources/job/notExistPluginTest.json
Normal file
@ -0,0 +1,36 @@
|
||||
{
|
||||
"job": {
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "notExistReaderPlugin",
|
||||
"parameter": {
|
||||
"sliceRecordCount": 10,
|
||||
"column": [
|
||||
{
|
||||
"type": "long",
|
||||
"value": "10"
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"value": "hello,你好,世界-DataX"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "streamwriter",
|
||||
"parameter": {
|
||||
"encoding": "UTF-8",
|
||||
"print": true
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 5
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user