This commit is contained in:
oms 2025-04-10 16:18:58 +08:00 committed by GitHub
commit e143ae2285
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 156 additions and 0 deletions

View File

@ -2,9 +2,14 @@ package com.alibaba.datax.example.util;
import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.transformer.ComplexTransformerProxy;
import com.alibaba.datax.core.transport.transformer.TransformerInfo;
import com.alibaba.datax.core.transport.transformer.TransformerRegistry;
import com.alibaba.datax.core.util.ConfigParser; import com.alibaba.datax.core.util.ConfigParser;
import com.alibaba.datax.core.util.FrameworkErrorCode; import com.alibaba.datax.core.util.FrameworkErrorCode;
import com.alibaba.datax.core.util.container.CoreConstant; import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.datax.transformer.ComplexTransformer;
import com.alibaba.datax.transformer.Transformer;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -20,6 +25,9 @@ public class ExampleConfigParser {
private static final String PLUGIN_DESC_FILE = "plugin.json"; private static final String PLUGIN_DESC_FILE = "plugin.json";
private static final String TRANSFORMER_DESC_FILE = "transformer.json";
/** /**
* 指定Job配置路径ConfigParser会解析JobPluginCore全部信息并以Configuration返回 * 指定Job配置路径ConfigParser会解析JobPluginCore全部信息并以Configuration返回
* 不同于Core的ConfigParser,这里的core,plugin 不依赖于编译后的datax.home,而是扫描程序编译后的target目录 * 不同于Core的ConfigParser,这里的core,plugin 不依赖于编译后的datax.home,而是扫描程序编译后的target目录
@ -37,6 +45,10 @@ public class ExampleConfigParser {
pluginTypeMap.put(writerName, "writer"); pluginTypeMap.put(writerName, "writer");
Configuration pluginsDescConfig = parsePluginsConfig(pluginTypeMap); Configuration pluginsDescConfig = parsePluginsConfig(pluginTypeMap);
configuration.merge(pluginsDescConfig, false); configuration.merge(pluginsDescConfig, false);
// 扫描并注册自定义Transformer
List<Configuration> listConfiguration = configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);
registerTransformerConfig(listConfiguration);
return configuration; return configuration;
} }
@ -151,4 +163,107 @@ public class ExampleConfigParser {
"Please check whether /example/conf/core.json exists!"); "Please check whether /example/conf/core.json exists!");
} }
} }
/**
* 注册第三方 transformer
* 判断是否使用TransFormer
* 如果使用则将涉及的Transformer项目全部注册
*
* @param transformers
*/
private static void registerTransformerConfig(List<Configuration> transformers) {
if (transformers == null || transformers.size() == 0) {
return;
}
Set<String> transformerSet = new HashSet<>();
for (Configuration transformer : transformers) {
String name = transformer.getString("name");
if (!name.startsWith("dx_")) { // 只检测自定义Transformer
transformerSet.add(name);
}
}
for (File basePackage : runtimeBasePackages()) {
scanTransFormerByPackage(basePackage, basePackage.listFiles(), transformerSet);
}
if (!transformerSet.isEmpty()) {
String failedTransformer = transformerSet.toString();
String message = "\ntransformer %s load failed ry to analyze the reasons from the following aspects.。\n" +
"1: Check if the name of the transformer is spelled correctly, and verify whether DataX supports this transformer\n" +
"2Verify if the <resource></resource> tag has been added under <build></build> section in the pom file of the relevant transformer.\n<resource>" +
" <directory>src/main/resources</directory>\n" +
" <includes>\n" +
" <include>**/*.*</include>\n" +
" </includes>\n" +
" <filtering>true</filtering>\n" +
" </resource>\n [Refer to the streamreader pom file] \n" +
"3: Check that the datax-yourtransformer-example module imported your test transformer";
message = String.format(message, failedTransformer);
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_INIT_ERROR, message);
}
}
/**
* @param packageFile 编译出来的target/classes根目录 便于找到TRANSFORMER时设置插件的TRANSFORMER 目录设置根目录是最保险的方式
* @param files 待扫描文件
*/
private static void scanTransFormerByPackage(File packageFile, File[] files, Set<String> transformerSet) {
if (files == null) {
return;
}
for (File file : files) {
if (file.isFile() && TRANSFORMER_DESC_FILE.equals(file.getName())) {
Configuration transfomerDesc = Configuration.from(file);
String descTransformerName = transfomerDesc.getString("name");
String descTransformerClass = transfomerDesc.getString("class");
transformerSet.remove(descTransformerName);
if (verfiyExist(descTransformerName,descTransformerClass)) {
break;
}
try {
Class<?> transformerClass = Class.forName(descTransformerClass);
Object transformer = transformerClass.newInstance();
if (ComplexTransformer.class.isAssignableFrom(transformer.getClass())) {
((ComplexTransformer) transformer).setTransformerName(descTransformerName);
TransformerRegistry.registComplexTransformer((ComplexTransformer) transformer, null, false);
} else if (Transformer.class.isAssignableFrom(transformer.getClass())) {
((Transformer) transformer).setTransformerName(descTransformerName);
TransformerRegistry.registTransformer((Transformer) transformer, null, false);
} else {
throw DataXException.asDataXException(String.format("load Transformer class(%s) error, path = %s", descTransformerClass, file.getPath()));
}
} catch (Exception e) {
//错误funciton跳过
throw DataXException.asDataXException(String.format("skip transformer(%s),load Transformer class error, path = %s ", descTransformerName, file.getPath()));
}
} else {
scanTransFormerByPackage(packageFile, file.listFiles(), transformerSet);
}
}
}
/**
* 验证Transformer 是否已经加载过了
* @param transformerName
* @param transFormerClass
* @return
*/
private static Boolean verfiyExist(String transformerName,String transFormerClass){
TransformerInfo transformer = TransformerRegistry.getTransformer(transformerName);
if(transformer==null){
return false;
}
ComplexTransformerProxy proxy = (ComplexTransformerProxy)transformer.getTransformer();
if(proxy==null){
return false;
}
String className = proxy.getRealTransformer().getClass().getName();
if (transFormerClass.equals(className)){
return true;
}else{
throw DataXException.asDataXException(String.format("skip transformer(%s),load Transformer class error,There are two Transformer with the same name and different class path, class1 = %s;class2 = %s ", transformerName,transFormerClass,className));
}
}
} }

View File

@ -0,0 +1,41 @@
package com.alibaba.datax.example.util;
import com.alibaba.datax.common.spi.ErrorCode;
/**
* TODO: 根据现有日志数据分析各类错误进行细化
*
* <p>请不要格式化本类代码</p>
*/
public enum TransformerErrorCode implements ErrorCode {
TRANSFORMER_INIT_ERROR("Transformer-00", "DataX Transformer 注册失败, 请联系您的运维解决 .");
private final String code;
private final String description;
private TransformerErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s]. ", this.code,
this.description);
}
}