diff --git a/core/src/main/java/com/alibaba/datax/core/transport/channel/Channel.java b/core/src/main/java/com/alibaba/datax/core/transport/channel/Channel.java index 8d4f1f67..119ef965 100755 --- a/core/src/main/java/com/alibaba/datax/core/transport/channel/Channel.java +++ b/core/src/main/java/com/alibaba/datax/core/transport/channel/Channel.java @@ -141,14 +141,21 @@ public abstract class Channel { public Record pull() { Record record = this.doPull(); - this.statPull(1L, record.getByteSize()); + + // record为终止对象时,该记录不算入统计数量 + if (!(record instanceof TerminateRecord)) { + this.statPull(1L, record.getByteSize()); + } return record; } public void pullAll(final Collection rs) { Validate.notNull(rs); this.doPullAll(rs); - this.statPull(rs.size(), this.getByteSize(rs)); + + // record为终止对象时,该记录不算入统计数量 + long recordSize = rs.stream().filter(record -> !(record instanceof TerminateRecord)).count(); + this.statPull(recordSize, this.getByteSize(rs)); } protected abstract void doPush(Record r);