This commit is contained in:
Galthen 2025-04-10 16:22:37 +08:00 committed by GitHub
commit 52d16c93d5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -141,14 +141,21 @@ public abstract class Channel {
public Record pull() { public Record pull() {
Record record = this.doPull(); Record record = this.doPull();
// record为终止对象时该记录不算入统计数量
if (!(record instanceof TerminateRecord)) {
this.statPull(1L, record.getByteSize()); this.statPull(1L, record.getByteSize());
}
return record; return record;
} }
public void pullAll(final Collection<Record> rs) { public void pullAll(final Collection<Record> rs) {
Validate.notNull(rs); Validate.notNull(rs);
this.doPullAll(rs); this.doPullAll(rs);
this.statPull(rs.size(), this.getByteSize(rs));
// record为终止对象时该记录不算入统计数量
long recordSize = rs.stream().filter(record -> !(record instanceof TerminateRecord)).count();
this.statPull(recordSize, this.getByteSize(rs));
} }
protected abstract void doPush(Record r); protected abstract void doPush(Record r);