From 6457689854d80091f23646d4f77d3e5112e5b853 Mon Sep 17 00:00:00 2001 From: jimmyc2006 Date: Wed, 24 Jun 2020 17:56:25 +0800 Subject: [PATCH] =?UTF-8?q?=E7=94=A8=E6=88=B7=E5=9C=A8=E4=BD=BF=E7=94=A8?= =?UTF-8?q?=E4=BA=86transformer=E7=9A=84=E6=83=85=E5=86=B5=E4=B8=8B?= =?UTF-8?q?=E4=BC=9A=E5=87=BA=E7=8E=B0=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 注释的地方,如果用户配置了transformer会造成对同一个记录重复操作的问题,除非用户在不使用随机函数的情况下配置了"hasMixupFunction" : true recordSender.sendToWriter(oneRecord)这个方法可能对oneRecord的数据进行修改,所以使用前需要先复制一下,或者每次都重新生成,我的配置如下: { "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "sliceRecordCount": 10, "column": [ { "type": "long", "value": "10" }, { "type": "string", "value": "hello,你好,世界-DataX" } ] } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "UTF-8", "print": true } }, "transformer": [ { "name": "dx_substr", "parameter": { "columnIndex":1, "paras":["1","3"] } } ] } ], "setting": { "speed": { "channel": 1 } } }, "core" : { "container" : { "job" : { "sleepInterval" : 100 } } } } --- .../datax/plugin/reader/streamreader/StreamReader.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/streamreader/src/main/java/com/alibaba/datax/plugin/reader/streamreader/StreamReader.java b/streamreader/src/main/java/com/alibaba/datax/plugin/reader/streamreader/StreamReader.java index e3b86659..3ab142e8 100755 --- a/streamreader/src/main/java/com/alibaba/datax/plugin/reader/streamreader/StreamReader.java +++ b/streamreader/src/main/java/com/alibaba/datax/plugin/reader/streamreader/StreamReader.java @@ -222,9 +222,11 @@ public class StreamReader extends Reader { public void startRead(RecordSender recordSender) { Record oneRecord = buildOneRecord(recordSender, this.columns); while (this.sliceRecordCount > 0) { - if (this.haveMixupFunction) { + // 如果用户配置了transformer会造成对同一个记录重复操作的问题,除非用户在不使用随机函数的情况下配置了"hasMixupFunction" : true + // recordSender.sendToWriter(oneRecord)这个方法可能对oneRecord的数据进行修改,所以使用前需要先复制一下,或者每次都重新生成 + // if (this.haveMixupFunction) { oneRecord = buildOneRecord(recordSender, this.columns); - } + // } recordSender.sendToWriter(oneRecord); this.sliceRecordCount--; }