提交 25d4c555 编写于 作者: S sschepens 提交者: Matteo Merli

unacked messages should always be changed even if BlockConsumerOnUnackMsgs is disabled (#96)

上级 5e55d799
......@@ -181,7 +181,7 @@ public class Consumer {
}
private void incrementUnackedMessages(int ackedMessages) {
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages.addAndGet(ackedMessages) >= maxUnackedMessages) {
if (unackedMessages.addAndGet(ackedMessages) >= maxUnackedMessages && shouldBlockConsumerOnUnackMsgs()) {
blockedConsumerOnUnackedMsgs = true;
}
}
......@@ -432,9 +432,9 @@ public class Consumer {
int totalAckedMsgs = ackOwnedConsumer.getPendingAcks().remove(position);
// unblock consumer-throttling when receives half of maxUnackedMessages => consumer can start again
// consuming messages
if (ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs()
&& ((ackOwnedConsumer.unackedMessages.addAndGet(-totalAckedMsgs) <= (maxUnackedMessages / 2))
&& ackOwnedConsumer.blockedConsumerOnUnackedMsgs)) {
if (((ackOwnedConsumer.unackedMessages.addAndGet(-totalAckedMsgs) <= (maxUnackedMessages / 2))
&& ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
&& ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs()) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册