提交 3e4e1276 编写于 作者: R Rajan 提交者: Matteo Merli

Reduce Dispatcher-totalPermits by number of messages delivered in batch (#67)

上级 bd49d699
......@@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -115,19 +117,22 @@ public class Consumer {
*
* @return a promise that can be use to track when all the data has been written into the socket
*/
public ChannelPromise sendMessages(final List<Entry> entries) {
public Pair<ChannelPromise, Integer> sendMessages(final List<Entry> entries) {
final ChannelHandlerContext ctx = cnx.ctx();
final MutablePair<ChannelPromise, Integer> sentMessages = new MutablePair<ChannelPromise, Integer>();
final ChannelPromise writePromise = ctx.newPromise();
sentMessages.setLeft(writePromise);
if (entries.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("[{}] List of messages is empty, triggering write future immediately for consumerId {}",
subscription, consumerId);
}
writePromise.setSuccess();
return writePromise;
sentMessages.setRight(0);
return sentMessages;
}
updatePermitsAndPendingAcks(entries);
sentMessages.setRight(updatePermitsAndPendingAcks(entries));
ctx.channel().eventLoop().execute(() -> {
for (int i = 0; i < entries.size(); i++) {
......@@ -165,7 +170,7 @@ public class Consumer {
ctx.flush();
});
return writePromise;
return sentMessages;
}
private void incrementUnackedMessages(int ackedMessages) {
......@@ -192,7 +197,7 @@ public class Consumer {
return -1;
}
void updatePermitsAndPendingAcks(final List<Entry> entries) {
int updatePermitsAndPendingAcks(final List<Entry> entries) {
int permitsToReduce = 0;
Iterator<Entry> iter = entries.iterator();
while (iter.hasNext()) {
......@@ -221,6 +226,7 @@ public class Consumer {
log.debug("[{}] [{}] message permits dropped below 0 - {}", subscription, consumerId, permits);
}
}
return permitsToReduce;
}
public boolean isWritable() {
......
......@@ -149,7 +149,7 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
}
private void readMoreEntries() {
if (totalAvailablePermits > 0 && isUnblockedConsumerAvailable()) {
if (totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
int messagesToRead = Math.min(totalAvailablePermits, readBatchSize);
if (!messagesToReplay.isEmpty()) {
......@@ -258,7 +258,7 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size());
}
while (entriesToDispatch > 0 && totalAvailablePermits > 0 && isUnblockedConsumerAvailable()) {
while (entriesToDispatch > 0 && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
Consumer c = getNextConsumer();
if (c == null) {
// Do nothing, cursor will be rewind at reconnection
......@@ -271,7 +271,7 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
int messagesForC = Math.min(Math.min(entriesToDispatch, c.getAvailablePermits()), MaxRoundRobinBatchSize);
if (messagesForC > 0) {
c.sendMessages(entries.subList(start, start + messagesForC));
int msgSent = c.sendMessages(entries.subList(start, start + messagesForC)).getRight();
if (readType == ReadType.Replay) {
entries.subList(start, start + messagesForC).forEach(entry -> {
......@@ -280,7 +280,7 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
}
start += messagesForC;
entriesToDispatch -= messagesForC;
totalAvailablePermits -= messagesForC;
totalAvailablePermits -= msgSent;
}
}
......@@ -357,7 +357,7 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
// find next available unblocked consumer
int unblockedConsumerIndex = consumerIndex;
do {
if (!consumerList.get(unblockedConsumerIndex).isBlocked()) {
if (isConsumerAvailable(consumerList.get(unblockedConsumerIndex))) {
consumerIndex = unblockedConsumerIndex;
return consumerList.get(consumerIndex++);
}
......@@ -371,23 +371,26 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
}
/**
* returns true only if {@link consumerList} has atleast one unblocked consumer
* returns true only if {@link consumerList} has atleast one unblocked consumer and have available permits
*
* @return
*/
private boolean isUnblockedConsumerAvailable() {
private boolean isAtleastOneConsumerAvailable() {
if (consumerList.isEmpty() || closeFuture != null) {
// abort read if no consumers are connected or if disconnect is initiated
return false;
}
Iterator<Consumer> consumerIterator = consumerList.iterator();
while (consumerIterator.hasNext()) {
if (!consumerIterator.next().isBlocked()) {
for(Consumer consumer : consumerList) {
if (isConsumerAvailable(consumer)) {
return true;
}
}
return false;
}
private boolean isConsumerAvailable(Consumer consumer) {
return consumer != null && !consumer.isBlocked() && consumer.getAvailablePermits() > 0;
}
@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
......
......@@ -199,7 +199,7 @@ public final class PersistentDispatcherSingleActiveConsumer implements Dispatche
readMoreEntries(currentConsumer);
}
} else {
currentConsumer.sendMessages(entries).addListener(future -> {
currentConsumer.sendMessages(entries).getLeft().addListener(future -> {
if (future.isSuccess()) {
// Schedule a new read batch operation only after the previous batch has been written to the socket
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册