Merge pull request #1084 from XuDaojie/transformer

feature:新增DigestTransformer
This commit is contained in:
Trafalgar 2022-10-12 17:20:20 +08:00 committed by GitHub
commit 127a963e3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 119 additions and 3 deletions

View File

@ -0,0 +1,87 @@
package com.alibaba.datax.core.transport.transformer;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.transformer.Transformer;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import java.util.Arrays;
/**
* no comments.
*
* @author XuDaojie
* @since 2021-08-16
*/
public class DigestTransformer extends Transformer {
private static final String MD5 = "md5";
private static final String SHA1 = "sha1";
private static final String TO_UPPER_CASE = "toUpperCase";
private static final String TO_LOWER_CASE = "toLowerCase";
public DigestTransformer() {
setTransformerName("dx_digest");
}
@Override
public Record evaluate(Record record, Object... paras) {
int columnIndex;
String type;
String charType;
try {
if (paras.length != 3) {
throw new RuntimeException("dx_digest paras length must be 3");
}
columnIndex = (Integer) paras[0];
type = (String) paras[1];
charType = (String) paras[2];
if (!StringUtils.equalsIgnoreCase(MD5, type) && !StringUtils.equalsIgnoreCase(SHA1, type)) {
throw new RuntimeException("dx_digest paras index 1 must be md5 or sha1");
}
if (!StringUtils.equalsIgnoreCase(TO_UPPER_CASE, charType) && !StringUtils.equalsIgnoreCase(TO_LOWER_CASE, charType)) {
throw new RuntimeException("dx_digest paras index 2 must be toUpperCase or toLowerCase");
}
} catch (Exception e) {
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER, "paras:" + Arrays.asList(paras) + " => " + e.getMessage());
}
Column column = record.getColumn(columnIndex);
try {
String oriValue = column.asString();
// 如果字段为空作为空字符串处理
if (oriValue == null) {
oriValue = "";
}
String newValue;
if (MD5.equals(type)) {
newValue = DigestUtils.md5Hex(oriValue);
} else {
newValue = DigestUtils.sha1Hex(oriValue);
}
if (TO_UPPER_CASE.equals(charType)) {
newValue = newValue.toUpperCase();
} else {
newValue = newValue.toLowerCase();
}
record.setColumn(columnIndex, new StringColumn(newValue));
} catch (Exception e) {
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_RUN_EXCEPTION, e.getMessage(), e);
}
return record;
}
}

View File

@ -1,10 +1,18 @@
package com.alibaba.datax.core.transport.transformer;
import org.apache.commons.codec.digest.DigestUtils;
/**
* GroovyTransformer的帮助类供groovy代码使用必须全是static的方法
* Created by liqiang on 16/3/4.
*/
public class GroovyTransformerStaticUtil {
public static String md5(final String data) {
return DigestUtils.md5Hex(data);
}
public static String sha1(final String data) {
return DigestUtils.sha1Hex(data);
}
}

View File

@ -36,6 +36,7 @@ public class TransformerRegistry {
registTransformer(new ReplaceTransformer());
registTransformer(new FilterTransformer());
registTransformer(new GroovyTransformer());
registTransformer(new DigestTransformer());
}
public static void loadTransformerFromLocalStorage() {

View File

@ -59,7 +59,17 @@ dx_replace(1,"5","10","****") column 1的value为“dataxTest”=>"data****"
dx_filter(1,"like","dataTest")
dx_filter(1,">=","10")
```
5. dx_groovy
5. dx_digest
* 参数3个
* 第一个参数字段编号对应record中第几个字段。
* 第二个参数hash类型md5、sha1
* 第三个参数hash值大小写 toUpperCase大写、toLowerCase小写
* 返回: 返回指定类型的hashHex,如果字段为空则转为空字符串再返回对应hashHex
* 举例:
```
dx_digest(1,"md5","toUpperCase"), column 1的值为 xyzzzzz => 9CDFFC4FA4E45A99DB8BBCD762ACFFA2
```
6. dx_groovy
* 参数。
* 第一个参数: groovy code
* 第二个参数列表或者为空extraPackage
@ -67,7 +77,9 @@ dx_filter(1,">=","10")
* dx_groovy只能调用一次。不能多次调用。
* groovy code中支持java.lang, java.util的包可直接引用的对象有record以及element下的各种columnBoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class,StringColumn.class。不支持其他包如果用户有需要用到其他包可设置extraPackage注意extraPackage不支持第三方jar包。
* groovy code中返回更新过的Record比如record.setColumn(columnIndex, new StringColumn(newValue));或者null。返回null表示过滤此行。
* 用户可以直接调用静态的Util方式GroovyTransformerStaticUtil目前GroovyTransformerStaticUtil的方法列表 (按需补充)
* 用户可以直接调用静态的Util方式GroovyTransformerStaticUtil目前GroovyTransformerStaticUtil的方法列表
* md5(String):String
* sha1(String):String
* 举例:
```
groovy 实现的subStr:
@ -109,7 +121,7 @@ String code3 = "Column column = record.getColumn(1);\n" +
```
## Job定义
* 本例中,配置3个UDF。
* 本例中,配置4个UDF。
```
{
@ -176,6 +188,14 @@ String code3 = "Column column = record.getColumn(1);\n" +
"paras":["3","4","****"]
}
},
{
"name": "dx_digest",
"parameter":
{
"columnIndex":3,
"paras":["md5", "toLowerCase"]
}
},
{
"name": "dx_groovy",
"parameter":