提交 18332962 编写于 作者: R Rajan 提交者: Matteo Merli

avoid synchronization for method that blocks while fetching zero-queue message (#61)

上级 5b7b9109
...@@ -23,8 +23,6 @@ import java.io.IOException; ...@@ -23,8 +23,6 @@ import java.io.IOException;
import java.util.BitSet; import java.util.BitSet;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -60,7 +58,6 @@ import io.netty.util.concurrent.GenericFutureListener; ...@@ -60,7 +58,6 @@ import io.netty.util.concurrent.GenericFutureListener;
import static com.yahoo.pulsar.common.api.Commands.readChecksum; import static com.yahoo.pulsar.common.api.Commands.readChecksum;
import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum; import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
import static com.yahoo.pulsar.common.api.Commands.hasChecksum; import static com.yahoo.pulsar.common.api.Commands.hasChecksum;
import static com.yahoo.pulsar.common.api.Commands.readChecksum;
public class ConsumerImpl extends ConsumerBase { public class ConsumerImpl extends ConsumerBase {
...@@ -79,6 +76,8 @@ public class ConsumerImpl extends ConsumerBase { ...@@ -79,6 +76,8 @@ public class ConsumerImpl extends ConsumerBase {
private volatile boolean waitingOnReceiveForZeroQueueSize = false; private volatile boolean waitingOnReceiveForZeroQueueSize = false;
private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final ReadWriteLock zeroQueueLock;
private final ConcurrentSkipListMap<MessageIdImpl, BitSet> batchMessageAckTracker; private final ConcurrentSkipListMap<MessageIdImpl, BitSet> batchMessageAckTracker;
...@@ -104,6 +103,11 @@ public class ConsumerImpl extends ConsumerBase { ...@@ -104,6 +103,11 @@ public class ConsumerImpl extends ConsumerBase {
} else { } else {
stats = ConsumerStats.CONSUMER_STATS_DISABLED; stats = ConsumerStats.CONSUMER_STATS_DISABLED;
} }
if (conf.getReceiverQueueSize() <= 1) {
zeroQueueLock = new ReentrantReadWriteLock();
} else {
zeroQueueLock = null;
}
grabCnx(); grabCnx();
} }
...@@ -142,7 +146,13 @@ public class ConsumerImpl extends ConsumerBase { ...@@ -142,7 +146,13 @@ public class ConsumerImpl extends ConsumerBase {
@Override @Override
protected Message internalReceive() throws PulsarClientException { protected Message internalReceive() throws PulsarClientException {
if (conf.getReceiverQueueSize() == 0) { if (conf.getReceiverQueueSize() == 0) {
return fetchSingleMessageFromBroker(); checkArgument(zeroQueueLock != null, "Receiver queue size can't be modified");
zeroQueueLock.writeLock().lock();
try {
return fetchSingleMessageFromBroker();
} finally {
zeroQueueLock.writeLock().unlock();
}
} }
Message message; Message message;
try { try {
...@@ -190,7 +200,7 @@ public class ConsumerImpl extends ConsumerBase { ...@@ -190,7 +200,7 @@ public class ConsumerImpl extends ConsumerBase {
return result; return result;
} }
private synchronized Message fetchSingleMessageFromBroker() throws PulsarClientException { private Message fetchSingleMessageFromBroker() throws PulsarClientException {
checkArgument(conf.getReceiverQueueSize() == 0); checkArgument(conf.getReceiverQueueSize() == 0);
// Just being cautious // Just being cautious
...@@ -203,8 +213,10 @@ public class ConsumerImpl extends ConsumerBase { ...@@ -203,8 +213,10 @@ public class ConsumerImpl extends ConsumerBase {
try { try {
// is cnx is null or if the connection breaks the connectionOpened function will send the flow again // is cnx is null or if the connection breaks the connectionOpened function will send the flow again
waitingOnReceiveForZeroQueueSize = true; waitingOnReceiveForZeroQueueSize = true;
if (isConnected()) { synchronized (this) {
receiveMessages(cnx(), 1); if (isConnected()) {
receiveMessages(cnx(), 1);
}
} }
do { do {
message = incomingMessages.take(); message = incomingMessages.take();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册