From 18332962926b63ac3f328d7e5d4e2760265b1762 Mon Sep 17 00:00:00 2001 From: Rajan Date: Mon, 17 Oct 2016 13:04:13 -0700 Subject: [PATCH] avoid synchronization for method that blocks while fetching zero-queue message (#61) --- .../pulsar/client/impl/ConsumerImpl.java | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java index 88c993a0070..6e3d8c2afcb 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java @@ -23,8 +23,6 @@ import java.io.IOException; import java.util.BitSet; import java.util.NavigableMap; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -60,7 +58,6 @@ import io.netty.util.concurrent.GenericFutureListener; import static com.yahoo.pulsar.common.api.Commands.readChecksum; import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum; import static com.yahoo.pulsar.common.api.Commands.hasChecksum; -import static com.yahoo.pulsar.common.api.Commands.readChecksum; public class ConsumerImpl extends ConsumerBase { @@ -79,6 +76,8 @@ public class ConsumerImpl extends ConsumerBase { private volatile boolean waitingOnReceiveForZeroQueueSize = false; private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + private final ReadWriteLock zeroQueueLock; private final ConcurrentSkipListMap batchMessageAckTracker; @@ -104,6 +103,11 @@ public class ConsumerImpl extends ConsumerBase { } else { stats = ConsumerStats.CONSUMER_STATS_DISABLED; } + if (conf.getReceiverQueueSize() <= 1) { + zeroQueueLock = new ReentrantReadWriteLock(); + } else { + zeroQueueLock = null; + } grabCnx(); } @@ -142,7 +146,13 @@ public class ConsumerImpl extends ConsumerBase { @Override protected Message internalReceive() throws PulsarClientException { if (conf.getReceiverQueueSize() == 0) { - return fetchSingleMessageFromBroker(); + checkArgument(zeroQueueLock != null, "Receiver queue size can't be modified"); + zeroQueueLock.writeLock().lock(); + try { + return fetchSingleMessageFromBroker(); + } finally { + zeroQueueLock.writeLock().unlock(); + } } Message message; try { @@ -190,7 +200,7 @@ public class ConsumerImpl extends ConsumerBase { return result; } - private synchronized Message fetchSingleMessageFromBroker() throws PulsarClientException { + private Message fetchSingleMessageFromBroker() throws PulsarClientException { checkArgument(conf.getReceiverQueueSize() == 0); // Just being cautious @@ -203,8 +213,10 @@ public class ConsumerImpl extends ConsumerBase { try { // is cnx is null or if the connection breaks the connectionOpened function will send the flow again waitingOnReceiveForZeroQueueSize = true; - if (isConnected()) { - receiveMessages(cnx(), 1); + synchronized (this) { + if (isConnected()) { + receiveMessages(cnx(), 1); + } } do { message = incomingMessages.take(); -- GitLab