mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 06:50:35 +08:00
rename notInsufficient to notSufficient
This commit is contained in:
parent
f6d21f112d
commit
d54c1ae199
@ -29,7 +29,7 @@ public class MemoryChannel extends Channel {
|
|||||||
|
|
||||||
private ReentrantLock lock;
|
private ReentrantLock lock;
|
||||||
|
|
||||||
private Condition notInsufficient, notEmpty;
|
private Condition notSufficient, notEmpty;
|
||||||
|
|
||||||
public MemoryChannel(final Configuration configuration) {
|
public MemoryChannel(final Configuration configuration) {
|
||||||
super(configuration);
|
super(configuration);
|
||||||
@ -37,7 +37,7 @@ public class MemoryChannel extends Channel {
|
|||||||
this.bufferSize = configuration.getInt(CoreConstant.DATAX_CORE_TRANSPORT_EXCHANGER_BUFFERSIZE);
|
this.bufferSize = configuration.getInt(CoreConstant.DATAX_CORE_TRANSPORT_EXCHANGER_BUFFERSIZE);
|
||||||
|
|
||||||
lock = new ReentrantLock();
|
lock = new ReentrantLock();
|
||||||
notInsufficient = lock.newCondition();
|
notSufficient = lock.newCondition();
|
||||||
notEmpty = lock.newCondition();
|
notEmpty = lock.newCondition();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,7 +75,7 @@ public class MemoryChannel extends Channel {
|
|||||||
lock.lockInterruptibly();
|
lock.lockInterruptibly();
|
||||||
int bytes = getRecordBytes(rs);
|
int bytes = getRecordBytes(rs);
|
||||||
while (memoryBytes.get() + bytes > this.byteCapacity || rs.size() > this.queue.remainingCapacity()) {
|
while (memoryBytes.get() + bytes > this.byteCapacity || rs.size() > this.queue.remainingCapacity()) {
|
||||||
notInsufficient.await(200L, TimeUnit.MILLISECONDS);
|
notSufficient.await(200L, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
this.queue.addAll(rs);
|
this.queue.addAll(rs);
|
||||||
waitWriterTime += System.nanoTime() - startTime;
|
waitWriterTime += System.nanoTime() - startTime;
|
||||||
@ -116,7 +116,7 @@ public class MemoryChannel extends Channel {
|
|||||||
waitReaderTime += System.nanoTime() - startTime;
|
waitReaderTime += System.nanoTime() - startTime;
|
||||||
int bytes = getRecordBytes(rs);
|
int bytes = getRecordBytes(rs);
|
||||||
memoryBytes.addAndGet(-bytes);
|
memoryBytes.addAndGet(-bytes);
|
||||||
notInsufficient.signalAll();
|
notSufficient.signalAll();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw DataXException.asDataXException(
|
throw DataXException.asDataXException(
|
||||||
FrameworkErrorCode.RUNTIME_ERROR, e);
|
FrameworkErrorCode.RUNTIME_ERROR, e);
|
||||||
|
Loading…
Reference in New Issue
Block a user