From 83fc1be6911efec86b5a793f69abb14c0635aea3 Mon Sep 17 00:00:00 2001 From: zhangjihu Date: Fri, 6 Sep 2024 17:31:11 +0800 Subject: [PATCH 1/2] =?UTF-8?q?zh:=201.=E5=A2=9E=E5=8A=A0=E6=9C=AC?= =?UTF-8?q?=E5=9C=B0=E8=B0=83=E8=AF=95=E6=97=B6=E5=AF=B9=E7=AC=AC=E4=B8=89?= =?UTF-8?q?=E6=96=B9=20Transformer=20=E8=87=AA=E5=8A=A8=E6=89=AB=E6=8F=8F?= =?UTF-8?q?=E5=B9=B6=E6=B3=A8=E5=86=8C=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../example/util/ExampleConfigParser.java | 110 ++++++++++++++++++ .../example/util/TransformerErrorCode.java | 41 +++++++ 2 files changed, 151 insertions(+) create mode 100644 datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/TransformerErrorCode.java diff --git a/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java b/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java index 6bbb4a23..13057fc1 100644 --- a/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java +++ b/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java @@ -2,9 +2,14 @@ package com.alibaba.datax.example.util; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.core.transport.transformer.ComplexTransformerProxy; +import com.alibaba.datax.core.transport.transformer.TransformerInfo; +import com.alibaba.datax.core.transport.transformer.TransformerRegistry; import com.alibaba.datax.core.util.ConfigParser; import com.alibaba.datax.core.util.FrameworkErrorCode; import com.alibaba.datax.core.util.container.CoreConstant; +import com.alibaba.datax.transformer.ComplexTransformer; +import com.alibaba.datax.transformer.Transformer; import java.io.File; import java.io.IOException; @@ -20,6 +25,9 @@ public class ExampleConfigParser { private static final String PLUGIN_DESC_FILE = "plugin.json"; + private static final String TRANSFORMER_DESC_FILE = "transformer.json"; + + /** * 指定Job配置路径,ConfigParser会解析Job、Plugin、Core全部信息,并以Configuration返回 * 不同于Core的ConfigParser,这里的core,plugin 不依赖于编译后的datax.home,而是扫描程序编译后的target目录 @@ -37,6 +45,10 @@ public class ExampleConfigParser { pluginTypeMap.put(writerName, "writer"); Configuration pluginsDescConfig = parsePluginsConfig(pluginTypeMap); configuration.merge(pluginsDescConfig, false); + // 扫描并注册自定义Transformer + List listConfiguration = configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER); + registerTransformerConfig(listConfiguration); + return configuration; } @@ -151,4 +163,102 @@ public class ExampleConfigParser { "Please check whether /example/conf/core.json exists!"); } } + + /** + * 注册第三方 transformer + * 判断是否使用TransFormer + * 如果使用则将涉及的Transformer项目全部注册 + * + * @param transformers + */ + private static void registerTransformerConfig(List transformers) { + + //Configuration from = Configuration.from(transformer); + if (transformers == null || transformers.size() == 0) { + return; + } + Set transformerSet = new HashSet<>(); + for (Configuration transformer : transformers) { + String name = transformer.getString("name"); + if (!name.startsWith("dx_")) { // 只检测自定义Transformer + transformerSet.add(name); + } + } + for (File basePackage : runtimeBasePackages()) { + scanTransFormerByPackage(basePackage, basePackage.listFiles(), transformerSet); + } + if (!transformerSet.isEmpty()) { + String failedTransformer = transformerSet.toString(); + String message = "\ntransformer %s load failed :ry to analyze the reasons from the following aspects.。\n" + + "1: Check if the name of the transformer is spelled correctly, and verify whether DataX supports this transformer\n" + + "2:Verify if the tag has been added under section in the pom file of the relevant transformer.\n" + + " src/main/resources\n" + + " \n" + + " **/*.*\n" + + " \n" + + " true\n" + + " \n [Refer to the streamreader pom file] \n" + + "3: Check that the datax-yourtransformer-example module imported your test transformer"; + message = String.format(message, failedTransformer); + throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_INIT_ERROR, message); + } + } + + /** + * @param packageFile 编译出来的target/classes根目录 便于找到TRANSFORMER时设置插件的TRANSFORMER 目录,设置根目录是最保险的方式 + * @param files 待扫描文件 + */ + private static void scanTransFormerByPackage(File packageFile, File[] files, Set transformerSet) { + if (files == null) { + return; + } + for (File file : files) { + if (file.isFile() && TRANSFORMER_DESC_FILE.equals(file.getName())) { + Configuration transfomerDesc = Configuration.from(file); + String descTransformerName = transfomerDesc.getString("name"); + String descTransformerClass = transfomerDesc.getString("class"); + transformerSet.remove(descTransformerName); + + if (verfiyExist(descTransformerName,descTransformerClass)) { + break; + } + try { + Class transformerClass = Class.forName(descTransformerClass); + Object transformer = transformerClass.newInstance(); + if (ComplexTransformer.class.isAssignableFrom(transformer.getClass())) { + ((ComplexTransformer) transformer).setTransformerName(descTransformerName); + TransformerRegistry.registComplexTransformer((ComplexTransformer) transformer, null, false); + } else if (Transformer.class.isAssignableFrom(transformer.getClass())) { + ((Transformer) transformer).setTransformerName(descTransformerName); + TransformerRegistry.registTransformer((Transformer) transformer, null, false); + } else { + throw DataXException.asDataXException(String.format("load Transformer class(%s) error, path = %s", descTransformerClass, file.getPath())); + } + } catch (Exception e) { + //错误funciton跳过 + throw DataXException.asDataXException(String.format("skip transformer(%s),load Transformer class error, path = %s ", descTransformerName, file.getPath())); + } + } else { + scanTransFormerByPackage(packageFile, file.listFiles(), transformerSet); + } + } + } + + private static Boolean verfiyExist(String transformerName,String transFormerClass){ + TransformerInfo transformer = TransformerRegistry.getTransformer(transformerName); + if(transformer==null){ + return false; + } + ComplexTransformerProxy proxy = (ComplexTransformerProxy)transformer.getTransformer(); + if(proxy==null){ + return false; + } + String className = proxy.getRealTransformer().getClass().getName(); + if (transFormerClass.equals(className)){ + return true; + }else{ + throw DataXException.asDataXException(String.format("skip transformer(%s),load Transformer class error,There are two Transformer with the same name and different class path, class1 = %s;class2 = %s ", transformerName,transFormerClass,className)); + } + } + } diff --git a/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/TransformerErrorCode.java b/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/TransformerErrorCode.java new file mode 100644 index 00000000..bf368e64 --- /dev/null +++ b/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/TransformerErrorCode.java @@ -0,0 +1,41 @@ +package com.alibaba.datax.example.util; + +import com.alibaba.datax.common.spi.ErrorCode; + +/** + * TODO: 根据现有日志数据分析各类错误,进行细化。 + * + *

请不要格式化本类代码

+ */ +public enum TransformerErrorCode implements ErrorCode { + + TRANSFORMER_INIT_ERROR("Transformer-00", "DataX Transformer 注册失败, 请联系您的运维解决 ."); + + + private final String code; + + private final String description; + + private TransformerErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s]. ", this.code, + this.description); + } + + +} From a7ef73257da22bc46f94443361afea2dcdeffdbf Mon Sep 17 00:00:00 2001 From: zhangjihu Date: Fri, 6 Sep 2024 17:32:42 +0800 Subject: [PATCH 2/2] =?UTF-8?q?zh:=201.=E5=A2=9E=E5=8A=A0=E6=9C=AC?= =?UTF-8?q?=E5=9C=B0=E8=B0=83=E8=AF=95=E6=97=B6=E5=AF=B9=E7=AC=AC=E4=B8=89?= =?UTF-8?q?=E6=96=B9=20Transformer=20=E8=87=AA=E5=8A=A8=E6=89=AB=E6=8F=8F?= =?UTF-8?q?=E5=B9=B6=E6=B3=A8=E5=86=8C=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../alibaba/datax/example/util/ExampleConfigParser.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java b/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java index 13057fc1..854c155d 100644 --- a/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java +++ b/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java @@ -173,7 +173,6 @@ public class ExampleConfigParser { */ private static void registerTransformerConfig(List transformers) { - //Configuration from = Configuration.from(transformer); if (transformers == null || transformers.size() == 0) { return; } @@ -244,6 +243,12 @@ public class ExampleConfigParser { } } + /** + * 验证Transformer 是否已经加载过了 + * @param transformerName + * @param transFormerClass + * @return + */ private static Boolean verfiyExist(String transformerName,String transFormerClass){ TransformerInfo transformer = TransformerRegistry.getTransformer(transformerName); if(transformer==null){