DataX/transformer/doc/transformer.md
2024-08-20 20:17:56 +08:00

259 lines
11 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# DataX Transformer
## Transformer定义
在数据同步、传输过程中存在用户对于数据传输进行特殊定制化的需求场景包括裁剪列、转换列等工作可以借助ETL的T过程实现(Transformer)。DataX包含了完整的E(Extract)、T(Transformer)、L(Load)支持。
## 运行模型
![image](http://git.cn-hangzhou.oss.aliyun-inc.com/uploads/datax/datax/b5652c0492c394684958272219ce327c/image.png)
## UDF手册
1. dx_substr
* 参数3个
* 第一个参数字段编号对应record中第几个字段。
* 第二个参数:字段值的开始位置。
* 第三个参数:目标字段长度。
* 返回: 从字符串的指定位置包含截取指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值直接返回即不参与本transformer
* 举例:
```
dx_substr(1,"2","5") column 1的value为“dataxTest”=>"taxTe"
dx_substr(1,"5","10") column 1的value为“dataxTest”=>"Test"
```
2. dx_pad
* 参数4个
* 第一个参数字段编号对应record中第几个字段。
* 第二个参数:"l","r", 指示是在头进行pad还是尾进行pad。
* 第三个参数:目标字段长度。
* 第四个参数需要pad的字符。
* 返回: 如果源字符串长度小于目标字段长度按照位置添加pad字符后返回。如果长于直接截断都截右边。如果字段为空值转换为空字符串进行pad即最后的字符串全是需要pad的字符
* 举例:
```
dx_pad(1,"l","4","A"), 如果column 1 的值为 xyz=> Axyz 值为 xyzzzzz => xyzz
dx_pad(1,"r","4","A"), 如果column 1 的值为 xyz=> xyzA 值为 xyzzzzz => xyzz
```
3. dx_replace
* 参数4个
* 第一个参数字段编号对应record中第几个字段。
* 第二个参数:字段值的开始位置。
* 第三个参数:需要替换的字段长度。
* 第四个参数:需要替换的字符串。
* 返回: 从字符串的指定位置包含替换指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值直接返回即不参与本transformer
* 举例:
```
dx_replace(1,"2","4","****") column 1的value为“dataxTest”=>"da****est"
dx_replace(1,"5","10","****") column 1的value为“dataxTest”=>"datax****"
```
4. dx_filter 关联filter暂不支持即多个字段的联合判断函参太过复杂用户难以使用。
* 参数:
* 第一个参数字段编号对应record中第几个字段。
* 第二个参数运算符支持以下运算符like, not like, >, =, <, >=, !=, <=
* 第三个参数正则表达式java正则表达式、值。
* 返回:
* 如果匹配正则表达式返回Null表示过滤该行。不匹配表达式时表示保留该行。注意是该行。对于>=<都是对字段直接compare的结果.
* like not like是将字段转换成String然后和目标正则表达式进行全匹配
* >, =, <, >=, !=, <= 对于DoubleColumn比较double值对于LongColumn和DateColumn比较long值其他StringColumnBooleanColumn以及ByteColumn均比较的是StringColumn值。
* 如果目标colunn为空null对于 = null的过滤条件将满足条件被过滤。=null的过滤条件null不满足过滤条件不被过滤。 like字段为null不满足条件不被过滤和not like字段为null满足条件被过滤。
* 举例:
```
dx_filter(1,"like","dataTest")
dx_filter(1,">=","10")
```
5. dx_digest
* 参数3个
* 第一个参数字段编号对应record中第几个字段。
* 第二个参数hash类型md5、sha1
* 第三个参数hash值大小写 toUpperCase大写、toLowerCase小写
* 返回: 返回指定类型的hashHex,如果字段为空则转为空字符串再返回对应hashHex
* 举例:
```
dx_digest(1,"md5","toUpperCase"), column 1的值为 xyzzzzz => 9CDFFC4FA4E45A99DB8BBCD762ACFFA2
```
6. dx_groovy
* 参数。
* 第一个参数: groovy code
* 第二个参数列表或者为空extraPackage
* 备注:
* dx_groovy只能调用一次。不能多次调用。
* groovy code中支持java.lang, java.util的包可直接引用的对象有record以及element下的各种columnBoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class,StringColumn.class。不支持其他包如果用户有需要用到其他包可设置extraPackage注意extraPackage不支持第三方jar包。
* groovy code中返回更新过的Record比如record.setColumn(columnIndex, new StringColumn(newValue));或者null。返回null表示过滤此行。
* 用户可以直接调用静态的Util方式GroovyTransformerStaticUtil目前GroovyTransformerStaticUtil的方法列表
* md5(String):String
* sha1(String):String
* 举例:
```
groovy 实现的subStr:
String code = "Column column = record.getColumn(1);\n" +
" String oriValue = column.asString();\n" +
" String newValue = oriValue.substring(0, 3);\n" +
" record.setColumn(1, new StringColumn(newValue));\n" +
" return record;";
dx_groovy(record);
```
```
groovy 实现的Replace
String code2 = "Column column = record.getColumn(1);\n" +
" String oriValue = column.asString();\n" +
" String newValue = \"****\" + oriValue.substring(3, oriValue.length());\n" +
" record.setColumn(1, new StringColumn(newValue));\n" +
" return record;";
```
```
groovy 实现的Pad
String code3 = "Column column = record.getColumn(1);\n" +
" String oriValue = column.asString();\n" +
" String padString = \"12345\";\n" +
" String finalPad = \"\";\n" +
" int NeedLength = 8 - oriValue.length();\n" +
" while (NeedLength > 0) {\n" +
"\n" +
" if (NeedLength >= padString.length()) {\n" +
" finalPad += padString;\n" +
" NeedLength -= padString.length();\n" +
" } else {\n" +
" finalPad += padString.substring(0, NeedLength);\n" +
" NeedLength = 0;\n" +
" }\n" +
" }\n" +
" String newValue= finalPad + oriValue;\n" +
" record.setColumn(1, new StringColumn(newValue));\n" +
" return record;";
```
## Job定义
* 本例中配置4个UDF。
```
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "DataX",
"type": "string"
},
{
"value": 1724154616370,
"type": "long"
},
{
"value": "2024-01-01 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "TestRawData",
"type": "bytes"
}
],
"sliceRecordCount": 100
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false,
"encoding": "UTF-8"
}
},
"transformer": [
{
"name": "dx_substr",
"parameter": {
"columnIndex": 5,
"paras": [
"1",
"3"
]
}
},
{
"name": "dx_replace",
"parameter": {
"columnIndex": 4,
"paras": [
"3",
"4",
"****"
]
}
},
{
"name": "dx_digest",
"parameter": {
"columnIndex": 3,
"paras": [
"md5",
"toLowerCase"
]
}
},
{
"name": "dx_groovy",
"parameter": {
"code": "//groovy code//",
"extraPackage": [
"import somePackage1;",
"import somePackage2;"
]
}
}
]
}
]
}
}
```
## 计量和脏数据
Transform过程涉及到数据的转换可能造成数据的增加或减少因此更加需要精确度量包括
* Transform的入参Record条数、字节数。
* Transform的出参Record条数、字节数。
* Transform的脏数据Record条数、字节数。
* 如果是多个Transform某一个发生脏数据将不会再进行后面的transform直接统计为脏数据。
* 目前只提供了所有Transform的计量成功失败过滤的count以及transform的消耗时间
涉及到运行过程的计量数据展现定义如下:
```
Total 1000000 records, 22000000 bytes | Transform 100000 records(in), 10000 records(out) | Speed 2.10MB/s, 100000 records/s | Error 0 records, 0 bytes | Percentage 100.00%
```
**注意,这里主要记录转换的输入输出,需要检测数据输入输出的记录数量变化。**
涉及到最终作业的计量数据展现定义如下:
```
任务启动时刻 : 2015-03-10 17:34:21
任务结束时刻 : 2015-03-10 17:34:31
任务总计耗时 : 10s
任务平均流量 : 2.10MB/s
记录写入速度 : 100000rec/s
转换输入总数 : 1000000
转换输出总数 : 1000000
读出记录总数 : 1000000
同步失败总数 : 0
```
**注意,这里主要记录转换的输入输出,需要检测数据输入输出的记录数量变化。**