This commit is contained in:
FuYouJ 2023-07-23 18:39:26 +08:00
parent 82680c4c63
commit f0128e5853
13 changed files with 362 additions and 14 deletions

View File

@ -653,6 +653,7 @@ public class JobContainer extends AbstractContainer {
*/
private Reader.Job initJobReader(
JobPluginCollector jobPluginCollector) {
//TODO loadUtil加载或者是 pluginUtil加载
this.readerPluginName = this.configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(

View File

@ -15,7 +15,7 @@ import java.util.List;
/**
* 提供Jar隔离的加载机制会把传入的路径及其子路径以及路径中的jar文件加入到class path
*/
public class JarLoader extends URLClassLoader {
public class JarLoader extends URLClassLoader implements PluginLoader{
public JarLoader(String[] paths) {
this(paths, JarLoader.class.getClassLoader());
}

View File

@ -49,7 +49,7 @@ public class LoadUtil {
/**
* jarLoader的缓冲
*/
private static Map<String, JarLoader> jarLoaderCenter = new HashMap<String, JarLoader>();
private static Map<String, ClassLoader> jarLoaderCenter = new HashMap();
/**
* 设置pluginConfigs方便后面插件来获取
@ -167,7 +167,7 @@ public class LoadUtil {
PluginType pluginType, String pluginName,
ContainerType pluginRunType) {
Configuration pluginConf = getPluginConf(pluginType, pluginName);
JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);
ClassLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);
try {
return (Class<? extends AbstractPlugin>) jarLoader
.loadClass(pluginConf.getString("class") + "$"
@ -177,22 +177,23 @@ public class LoadUtil {
}
}
public static synchronized JarLoader getJarLoader(PluginType pluginType,
public static synchronized ClassLoader getJarLoader(PluginType pluginType,
String pluginName) {
Configuration pluginConf = getPluginConf(pluginType, pluginName);
JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,
ClassLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,
pluginName));
if (null == jarLoader) {
String pluginPath = pluginConf.getString("path");
if (StringUtils.isBlank(pluginPath)) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
String.format(
"%s插件[%s]路径非法!",
pluginType, pluginName));
}
jarLoader = new JarLoader(new String[]{pluginPath});
// jarLoader = new JarLoader(new String[]{pluginPath});
jarLoader = PluginLoaderFactory.create(pluginConf);
// if (StringUtils.isBlank(pluginPath)) {
// throw DataXException.asDataXException(
// FrameworkErrorCode.RUNTIME_ERROR,
// String.format(
// "%s插件[%s]路径非法!",
// pluginType, pluginName));
// }
jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),
jarLoader);
}

View File

@ -0,0 +1,10 @@
package com.alibaba.datax.core.util.container;
/**
* {@code Author} FuYouJ
* {@code Date} 2023/7/23 16:52
* @author fuyouj
*/
public class PluginClassLoader extends ClassLoader implements PluginLoader{
}

View File

@ -0,0 +1,16 @@
package com.alibaba.datax.core.util.container;
/**
* @author fuyouj
*/
public interface PluginLoader {
/**
* 加载插件对象
*
* @param name 类全限定名
* @return class对象
* @throws ClassNotFoundException
*/
Class<?> loadClass(String name) throws ClassNotFoundException;
}

View File

@ -0,0 +1,22 @@
package com.alibaba.datax.core.util.container;
import com.alibaba.datax.common.util.Configuration;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
/**
* @author fuyouj
*/
public class PluginLoaderFactory {
public static ClassLoader create(Configuration configuration) {
String path = configuration.getString("path");
if (StringUtils.isNotBlank(path) && path.contains(File.separator)) {
return new JarLoader(new String[]{path});
} else {
return new PluginClassLoader();
}
}
}

42
datax-example/pom.xml Normal file
View File

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>datax-example</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>streamwriter</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>streamreader</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,25 @@
package com.alibaba.datax.example;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.Engine;
import com.alibaba.datax.example.util.ExampleConfigParser;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
/**
* @author fuyouj
*/
public class Main {
public static void main(String[] args) throws URISyntaxException {
URL resource = Main.class.getResource("/job/stream2stream.json");
URI uri = resource.toURI();
String path = Paths.get(uri).toString();
Configuration configuration = ExampleConfigParser.parse(path);
Engine engine = new Engine();
engine.start(configuration);
}
}

View File

@ -0,0 +1,98 @@
package com.alibaba.datax.example.util;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.util.ConfigParser;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import com.alibaba.datax.core.util.container.CoreConstant;
import java.io.File;
import java.net.URL;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
/**
* @author fuyouj
*/
public class ExampleConfigParser {
private static final String CORE_CONF = "/example/conf/core.json";
private static final String PLUGIN_DESC_FILE = "plugin.json";
/**
* 指定Job配置路径ConfigParser会解析JobPluginCore全部信息并以Configuration返回
* 不同于Core的ConfigParser,这里的core,plugin 不依赖于编译后的datax.home,而是扫描程序目录
*/
public static Configuration parse(final String jobPath) {
Configuration configuration = ConfigParser.parseJobConfig(jobPath);
configuration.merge(coreConfig(),
false);
Map<String, String> pluginTypeMap = new HashMap<>();
String readerName = configuration.getString(CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
String writerName = configuration.getString(CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);
pluginTypeMap.put(readerName, "reader");
pluginTypeMap.put(writerName, "writer");
Configuration pluginsDescConfig = parsePluginsConfig(pluginTypeMap);
configuration.merge(pluginsDescConfig, false);
return configuration;
}
public static Configuration parsePluginsConfig(Map<String, String> pluginTypeMap) {
Configuration configuration = Configuration.newDefault();
String workingDirectory = System.getProperty("user.dir");
File file = new File(workingDirectory);
scanPlugin(configuration, file.listFiles(), pluginTypeMap);
if (!pluginTypeMap.isEmpty()) {
throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR,
"load plugin failed未完成指定插件加载:"
+ pluginTypeMap.keySet());
}
return configuration;
}
private static void scanPlugin(Configuration configuration, File[] files, Map<String, String> pluginTypeMap) {
if (files == null) {
return;
}
for (File file : files) {
if (file.isFile() && PLUGIN_DESC_FILE.equals(file.getName())) {
Configuration pluginDesc = Configuration.from(file);
String descPluginName = pluginDesc.getString("name", "");
if (pluginTypeMap.containsKey(descPluginName)) {
String type = pluginTypeMap.get(descPluginName);
configuration.merge(parseOnePlugin(type, descPluginName, pluginDesc), false);
pluginTypeMap.remove(descPluginName);
}
} else {
scanPlugin(configuration, file.listFiles(), pluginTypeMap);
}
}
}
private static Configuration parseOnePlugin(String pluginType, String pluginName, Configuration pluginDesc) {
Configuration pluginConfInJob = Configuration.newDefault();
pluginConfInJob.set(
String.format("plugin.%s.%s", pluginType, pluginName),
pluginDesc.getInternal());
return pluginConfInJob;
}
private static Configuration coreConfig() {
try {
URL resource = ExampleConfigParser.class.getResource(CORE_CONF);
return Configuration.from(Paths.get(resource.toURI()).toFile());
} catch (Exception ignore) {
throw DataXException.asDataXException("Failed to load the configuration file core.json. " +
"Please check whether /example/conf/core.json exists!");
}
}
}

View File

@ -0,0 +1,60 @@
{
"entry": {
"jvm": "-Xms1G -Xmx1G",
"environment": {}
},
"common": {
"column": {
"datetimeFormat": "yyyy-MM-dd HH:mm:ss",
"timeFormat": "HH:mm:ss",
"dateFormat": "yyyy-MM-dd",
"extraFormats":["yyyyMMdd"],
"timeZone": "GMT+8",
"encoding": "utf-8"
}
},
"core": {
"dataXServer": {
"address": "http://localhost:7001/api",
"timeout": 10000,
"reportDataxLog": false,
"reportPerfLog": false
},
"transport": {
"channel": {
"class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
"speed": {
"byte": -1,
"record": -1
},
"flowControlInterval": 20,
"capacity": 512,
"byteCapacity": 67108864
},
"exchanger": {
"class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",
"bufferSize": 32
}
},
"container": {
"job": {
"reportInterval": 10000
},
"taskGroup": {
"channel": 5
},
"trace": {
"enable": "false"
}
},
"statistics": {
"collector": {
"plugin": {
"taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector",
"maxDirtyNumber": 10
}
}
}
}
}

View File

@ -0,0 +1,36 @@
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello你好世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}

View File

@ -0,0 +1,36 @@
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello你好世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}

View File

@ -129,6 +129,7 @@
<!-- common support module -->
<module>plugin-rdbms-util</module>
<module>plugin-unstructured-storage-util</module>
<module>datax-example</module>
</modules>