diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java index 9b1026639784c36bf4cfb538566348bbec34e5f7..2c42ad1c72f5f2b0226e1ee71067a063f07f64ff 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java @@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.util.Rate; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,19 +117,22 @@ public class Consumer { * * @return a promise that can be use to track when all the data has been written into the socket */ - public ChannelPromise sendMessages(final List entries) { + public Pair sendMessages(final List entries) { final ChannelHandlerContext ctx = cnx.ctx(); + final MutablePair sentMessages = new MutablePair(); final ChannelPromise writePromise = ctx.newPromise(); + sentMessages.setLeft(writePromise); if (entries.isEmpty()) { if (log.isDebugEnabled()) { log.debug("[{}] List of messages is empty, triggering write future immediately for consumerId {}", subscription, consumerId); } writePromise.setSuccess(); - return writePromise; + sentMessages.setRight(0); + return sentMessages; } - updatePermitsAndPendingAcks(entries); + sentMessages.setRight(updatePermitsAndPendingAcks(entries)); ctx.channel().eventLoop().execute(() -> { for (int i = 0; i < entries.size(); i++) { @@ -165,7 +170,7 @@ public class Consumer { ctx.flush(); }); - return writePromise; + return sentMessages; } private void incrementUnackedMessages(int ackedMessages) { @@ -192,7 +197,7 @@ public class Consumer { return -1; } - void updatePermitsAndPendingAcks(final List entries) { + int updatePermitsAndPendingAcks(final List entries) { int permitsToReduce = 0; Iterator iter = entries.iterator(); while (iter.hasNext()) { @@ -221,6 +226,7 @@ public class Consumer { log.debug("[{}] [{}] message permits dropped below 0 - {}", subscription, consumerId, permits); } } + return permitsToReduce; } public boolean isWritable() { diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 4a49796be7bcbeb80da708008ce9c3374d149475..57383ed9b30636f6539db4c3438f443062f52333 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -149,7 +149,7 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn } private void readMoreEntries() { - if (totalAvailablePermits > 0 && isUnblockedConsumerAvailable()) { + if (totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) { int messagesToRead = Math.min(totalAvailablePermits, readBatchSize); if (!messagesToReplay.isEmpty()) { @@ -258,7 +258,7 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size()); } - while (entriesToDispatch > 0 && totalAvailablePermits > 0 && isUnblockedConsumerAvailable()) { + while (entriesToDispatch > 0 && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) { Consumer c = getNextConsumer(); if (c == null) { // Do nothing, cursor will be rewind at reconnection @@ -271,7 +271,7 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn int messagesForC = Math.min(Math.min(entriesToDispatch, c.getAvailablePermits()), MaxRoundRobinBatchSize); if (messagesForC > 0) { - c.sendMessages(entries.subList(start, start + messagesForC)); + int msgSent = c.sendMessages(entries.subList(start, start + messagesForC)).getRight(); if (readType == ReadType.Replay) { entries.subList(start, start + messagesForC).forEach(entry -> { @@ -280,7 +280,7 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn } start += messagesForC; entriesToDispatch -= messagesForC; - totalAvailablePermits -= messagesForC; + totalAvailablePermits -= msgSent; } } @@ -357,7 +357,7 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn // find next available unblocked consumer int unblockedConsumerIndex = consumerIndex; do { - if (!consumerList.get(unblockedConsumerIndex).isBlocked()) { + if (isConsumerAvailable(consumerList.get(unblockedConsumerIndex))) { consumerIndex = unblockedConsumerIndex; return consumerList.get(consumerIndex++); } @@ -371,23 +371,26 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn } /** - * returns true only if {@link consumerList} has atleast one unblocked consumer + * returns true only if {@link consumerList} has atleast one unblocked consumer and have available permits * * @return */ - private boolean isUnblockedConsumerAvailable() { + private boolean isAtleastOneConsumerAvailable() { if (consumerList.isEmpty() || closeFuture != null) { // abort read if no consumers are connected or if disconnect is initiated return false; } - Iterator consumerIterator = consumerList.iterator(); - while (consumerIterator.hasNext()) { - if (!consumerIterator.next().isBlocked()) { + for(Consumer consumer : consumerList) { + if (isConsumerAvailable(consumer)) { return true; } } return false; } + + private boolean isConsumerAvailable(Consumer consumer) { + return consumer != null && !consumer.isBlocked() && consumer.getAvailablePermits() > 0; + } @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 7b05a6015dda6cdcc749eaf50e176e74677336eb..47b9acb75df84647f498870ddd1865978d22672d 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -199,7 +199,7 @@ public final class PersistentDispatcherSingleActiveConsumer implements Dispatche readMoreEntries(currentConsumer); } } else { - currentConsumer.sendMessages(entries).addListener(future -> { + currentConsumer.sendMessages(entries).getLeft().addListener(future -> { if (future.isSuccess()) { // Schedule a new read batch operation only after the previous batch has been written to the socket synchronized (PersistentDispatcherSingleActiveConsumer.this) {