提交 e190534c 编写于 作者: E Ezequiel Lovelle 提交者: Matteo Merli

Feature - implement reference count for ConsumerImpl (#3795)

* Feature - implement reference count for ConsumerImpl

Add reference count for ConsumerImpl in order to track reused instances of a
consumer instance returned by `subscribe()` method call.
Having the reference of subscribed consumer instances offers the ability to not
close a consumer until the last corresponding `close()` is being called.

Modifications:

  - Add field on ConsumerBase to track references of consumer instances
    subscribed by the user.
  - Add checks on ConsumerImpl to know whether close() action should be
    performed regarding of reference count being zero value.
  - Increment reference count when a previous built consumer instance is being
    used by caller.

Future considerations:

When optimization #3312 is going to be made for other consumers implementation
aside from ConsumerImpl it should add refCount checks on close() method call.

* Add tests for reference count on ConsumerImpl

  - Add test to verify ConsumerImpl reference count on close() method.
  - Fix test from dup consumers feature with refcount.
上级 449f2f5b
......@@ -2983,4 +2983,29 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
Assert.assertTrue(consumerC.isConnected());
consumerC.close();
}
@Test
public void testRefCount_OnCloseConsumer() throws Exception {
final String topic = "persistent://my-property/my-ns/my-topic";
final String subName = "my-subscription";
Consumer<byte[]> consumerA = pulsarClient.newConsumer().topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.subscribe();
Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.subscribe();
Assert.assertEquals(consumerA, consumerB);
consumerA.close();
Assert.assertTrue(consumerA.isConnected());
Assert.assertTrue(consumerB.isConnected());
consumerB.close();
Assert.assertFalse(consumerA.isConnected());
Assert.assertFalse(consumerB.isConnected());
}
}
......@@ -59,8 +59,9 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
final BlockingQueue<Message<T>> incomingMessages;
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
protected int maxReceiverQueueSize;
protected Schema<T> schema;
protected final Schema<T> schema;
protected final ConsumerInterceptors<T> interceptors;
private int refCount = 0;
protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
int receiverQueueSize, ExecutorService listenerExecutor,
......@@ -380,4 +381,11 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
}
}
protected synchronized void incrRefCount() {
++refCount;
}
protected synchronized boolean shouldTearDown() {
return refCount > 0 ? refCount-- == 0 : refCount == 0;
}
}
......@@ -679,6 +679,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
@Override
public CompletableFuture<Void> closeAsync() {
if (!shouldTearDown()) {
return CompletableFuture.completedFuture(null);
}
if (getState() == State.Closing || getState() == State.Closed) {
unAckedMessageTracker.close();
if (possibleSendToDeadLetterTopicMessages != null) {
......
......@@ -683,6 +683,7 @@ public class PulsarClientImpl implements PulsarClient {
.filter(c -> c.getSubscription().equals(conf.getSubscriptionName()))
.filter(Consumer::isConnected)
.findFirst();
subscriber.ifPresent(ConsumerBase::incrRefCount);
return subscriber.map(ConsumerBase.class::cast);
}
}
......
......@@ -305,6 +305,11 @@ public class PulsarSpoutTest extends ProducerConsumerBase {
otherSpout.close();
topicStats = admin.topics().getStats(topic);
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
otherSpout.close();
topicStats = admin.topics().getStats(topic);
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 0);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册