From 4df71b8c2dd08c65293ebc58b2cb1bea1a994482 Mon Sep 17 00:00:00 2001 From: chenyang Date: Wed, 9 Dec 2020 11:38:13 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=BB=9F=E8=AE=A1?= =?UTF-8?q?=E6=95=B0=E9=87=8F=E4=B8=8D=E6=AD=A3=E7=A1=AE=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98=E3=80=82=20=E9=97=AE=E9=A2=98=E5=8E=9F=E5=9B=A0?= =?UTF-8?q?=EF=BC=9A=E7=BB=88=E6=AD=A2task=E7=9A=84=E6=97=B6=EF=BC=8C?= =?UTF-8?q?=E4=BC=9A=E8=B0=83=E7=94=A8this.channel.pushTerminate(Terminate?= =?UTF-8?q?Record.get())=EF=BC=8Cpush=E4=B8=80=E4=B8=AA=E6=B2=A1=E6=9C=89?= =?UTF-8?q?=E5=85=83=E7=B4=A0=E7=9A=84TerminateRecord=E5=88=B0queue?= =?UTF-8?q?=E4=B8=AD=EF=BC=8C=E6=89=80=E4=BB=A5pull=E6=97=B6=EF=BC=8C?= =?UTF-8?q?=E6=AF=8F=E6=9C=891=E4=B8=AAtask=E7=BB=9F=E8=AE=A1=E5=B0=B1?= =?UTF-8?q?=E4=BC=9A=E5=A4=9A1=E4=B8=AA=E3=80=82=20=E5=9C=A8=E5=85=A5?= =?UTF-8?q?=E5=BA=93=E6=97=B6=EF=BC=8CTerminateRecord=E4=BC=9A=E8=A2=AB?= =?UTF-8?q?=E6=8E=92=E9=99=A4=E6=8E=89=EF=BC=8C=E4=BD=86=E6=98=AF=E5=9C=A8?= =?UTF-8?q?=E7=BB=9F=E8=AE=A1=E6=97=B6=E6=B2=A1=E6=9C=89=E5=A4=84=E7=90=86?= =?UTF-8?q?=EF=BC=8C=E5=BC=95=E8=B5=B7=E4=BA=86=E8=AF=A5=E9=97=AE=E9=A2=98?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../alibaba/datax/core/transport/channel/Channel.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/alibaba/datax/core/transport/channel/Channel.java b/core/src/main/java/com/alibaba/datax/core/transport/channel/Channel.java index 8d4f1f67..ced7fe1d 100755 --- a/core/src/main/java/com/alibaba/datax/core/transport/channel/Channel.java +++ b/core/src/main/java/com/alibaba/datax/core/transport/channel/Channel.java @@ -141,13 +141,20 @@ public abstract class Channel { public Record pull() { Record record = this.doPull(); - this.statPull(1L, record.getByteSize()); + + // record为终止对象时,该记录不算入统计数量 + if (!(record instanceof TerminateRecord)) { + this.statPull(1L, record.getByteSize()); + } return record; } public void pullAll(final Collection rs) { Validate.notNull(rs); this.doPullAll(rs); + + // record为终止对象时,该记录不算入统计数量 + long recordSize = rs.stream().filter(record -> !(record instanceof TerminateRecord)).count(); this.statPull(rs.size(), this.getByteSize(rs)); } From 8fb0f8590647fd5c7893e8143f7e00bb1abc9120 Mon Sep 17 00:00:00 2001 From: chenyang Date: Wed, 21 Jul 2021 14:46:09 +0800 Subject: [PATCH 2/2] Update Channel.java --- .../java/com/alibaba/datax/core/transport/channel/Channel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/com/alibaba/datax/core/transport/channel/Channel.java b/core/src/main/java/com/alibaba/datax/core/transport/channel/Channel.java index ced7fe1d..119ef965 100755 --- a/core/src/main/java/com/alibaba/datax/core/transport/channel/Channel.java +++ b/core/src/main/java/com/alibaba/datax/core/transport/channel/Channel.java @@ -155,7 +155,7 @@ public abstract class Channel { // record为终止对象时,该记录不算入统计数量 long recordSize = rs.stream().filter(record -> !(record instanceof TerminateRecord)).count(); - this.statPull(rs.size(), this.getByteSize(rs)); + this.statPull(recordSize, this.getByteSize(rs)); } protected abstract void doPush(Record r);