用户在使用了transformer的情况下会出现错误

注释的地方,如果用户配置了transformer会造成对同一个记录重复操作的问题,除非用户在不使用随机函数的情况下配置了"hasMixupFunction" : true
recordSender.sendToWriter(oneRecord)这个方法可能对oneRecord的数据进行修改,所以使用前需要先复制一下,或者每次都重新生成,我的配置如下:
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "sliceRecordCount": 10,
            "column": [
              {
                "type": "long",
                "value": "10"
              },
              {
                "type": "string",
                "value": "hello,你好,世界-DataX"
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        },
		"transformer": [
                    {
                        "name": "dx_substr",
                        "parameter": 
                            {
								"columnIndex":1,
								"paras":["1","3"]
                            }  
                    }
                ]
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
       }
    }
  },
  "core" : {
	"container" : {
		"job" : {
			"sleepInterval" : 100
		}
	}
  }
}
This commit is contained in:
jimmyc2006 2020-06-24 17:56:25 +08:00 committed by GitHub
parent e2aed2e9e0
commit 6457689854
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -222,9 +222,11 @@ public class StreamReader extends Reader {
public void startRead(RecordSender recordSender) {
Record oneRecord = buildOneRecord(recordSender, this.columns);
while (this.sliceRecordCount > 0) {
if (this.haveMixupFunction) {
// 如果用户配置了transformer会造成对同一个记录重复操作的问题除非用户在不使用随机函数的情况下配置了"hasMixupFunction" : true
// recordSender.sendToWriter(oneRecord)这个方法可能对oneRecord的数据进行修改所以使用前需要先复制一下或者每次都重新生成
// if (this.haveMixupFunction) {
oneRecord = buildOneRecord(recordSender, this.columns);
}
// }
recordSender.sendToWriter(oneRecord);
this.sliceRecordCount--;
}