diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java index 88c993a0070c5f07d6550c4870d165974c49013f..6e3d8c2afcb7c8fb124ff88a3a07a58a678a306b 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java @@ -23,8 +23,6 @@ import java.io.IOException; import java.util.BitSet; import java.util.NavigableMap; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -60,7 +58,6 @@ import io.netty.util.concurrent.GenericFutureListener; import static com.yahoo.pulsar.common.api.Commands.readChecksum; import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum; import static com.yahoo.pulsar.common.api.Commands.hasChecksum; -import static com.yahoo.pulsar.common.api.Commands.readChecksum; public class ConsumerImpl extends ConsumerBase { @@ -79,6 +76,8 @@ public class ConsumerImpl extends ConsumerBase { private volatile boolean waitingOnReceiveForZeroQueueSize = false; private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + private final ReadWriteLock zeroQueueLock; private final ConcurrentSkipListMap batchMessageAckTracker; @@ -104,6 +103,11 @@ public class ConsumerImpl extends ConsumerBase { } else { stats = ConsumerStats.CONSUMER_STATS_DISABLED; } + if (conf.getReceiverQueueSize() <= 1) { + zeroQueueLock = new ReentrantReadWriteLock(); + } else { + zeroQueueLock = null; + } grabCnx(); } @@ -142,7 +146,13 @@ public class ConsumerImpl extends ConsumerBase { @Override protected Message internalReceive() throws PulsarClientException { if (conf.getReceiverQueueSize() == 0) { - return fetchSingleMessageFromBroker(); + checkArgument(zeroQueueLock != null, "Receiver queue size can't be modified"); + zeroQueueLock.writeLock().lock(); + try { + return fetchSingleMessageFromBroker(); + } finally { + zeroQueueLock.writeLock().unlock(); + } } Message message; try { @@ -190,7 +200,7 @@ public class ConsumerImpl extends ConsumerBase { return result; } - private synchronized Message fetchSingleMessageFromBroker() throws PulsarClientException { + private Message fetchSingleMessageFromBroker() throws PulsarClientException { checkArgument(conf.getReceiverQueueSize() == 0); // Just being cautious @@ -203,8 +213,10 @@ public class ConsumerImpl extends ConsumerBase { try { // is cnx is null or if the connection breaks the connectionOpened function will send the flow again waitingOnReceiveForZeroQueueSize = true; - if (isConnected()) { - receiveMessages(cnx(), 1); + synchronized (this) { + if (isConnected()) { + receiveMessages(cnx(), 1); + } } do { message = incomingMessages.take();