提交 cc638d3a 编写于 作者: E Ezequiel Lovelle 提交者: Matteo Merli

[java client] Bugfix prevent dup consumers for same topic subscribe (#3746)

Fixes #3743 issue.

Return previous instance of a consumer in the subscription processed should only
be considered with the scope of the same topic.

Modifications:

  - Remove optimization of duplicated consumers for multi topics subscribe and
    pattern topics subscribe, this should be handled with a different approach.
  - Filter consumers for the same topic name.
  - Filter consumers which are connected to broker, this is not necessary to fix
    this issue but is a good thing to do.
  - Add test that verifies that same subscription will allow different consumers
    instance for different topics.
上级 253381cc
......@@ -2901,7 +2901,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
// Pull 3312: https://github.com/apache/pulsar/pull/3312
// Bugfix preventing duplicated consumers on same client cnx with shared subscription mode
@Test()
public void testPreventDupConsumersOnClientCnx() throws Exception {
public void testPreventDupConsumersOnClientCnxForSingleSub() throws Exception {
final CompletableFuture<Void> future = new CompletableFuture<>();
final String topic = "persistent://my-property/my-ns/my-topic";
final String subName = "my-subscription";
......@@ -2931,7 +2931,56 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
});
future.get(5, TimeUnit.SECONDS);
Assert.assertEquals(consumer, consumerB);
Assert.assertTrue(future.isDone());
Assert.assertFalse(future.isCompletedExceptionally());
}
@Test()
public void testPreventDupConsumersOnClientCnxForSingleSub_AllowDifferentTopics() throws Exception {
final CompletableFuture<Void> future = new CompletableFuture<>();
final String topic = "persistent://my-property/my-ns/my-topic";
final String subName = "my-subscription";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.subscribe();
Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.subscribe();
// This consumer should be a newly subscription since is it from a different topic
// even though has the same subscription name.
Consumer<byte[]> consumerC = pulsarClient.newConsumer().topic(topic + "-different-topic")
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.subscribe();
consumer.unsubscribeAsync().whenComplete((aVoid1, t1) -> {
if (t1 != null) {
future.completeExceptionally(t1);
return;
}
consumer.closeAsync().whenComplete((aVoid2, t2) -> {
if (t2 != null) {
future.completeExceptionally(t2);
return;
}
future.complete(null);
});
});
future.get(5, TimeUnit.SECONDS);
Assert.assertEquals(consumer, consumerB);
Assert.assertTrue(future.isDone());
Assert.assertFalse(future.isCompletedExceptionally());
// consumerC is a newly created subscription.
Assert.assertNotEquals(consumer, consumerC);
Assert.assertTrue(consumerC.isConnected());
consumerC.close();
}
}
......@@ -349,11 +349,6 @@ public class PulsarClientImpl implements PulsarClient {
}
private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
Optional<ConsumerBase<T>> subscriber = subscriptionExist(conf);
if (subscriber.isPresent()) {
return CompletableFuture.completedFuture(subscriber.get());
}
CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
......@@ -376,10 +371,6 @@ public class PulsarClientImpl implements PulsarClient {
Mode subscriptionMode = convertRegexSubscriptionMode(conf.getRegexSubscriptionMode());
TopicName destination = TopicName.get(regex);
NamespaceName namespaceName = destination.getNamespaceObject();
Optional<ConsumerBase<T>> subscriber = subscriptionExist(conf);
if (subscriber.isPresent()) {
return CompletableFuture.completedFuture(subscriber.get());
}
CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode)
......@@ -687,8 +678,10 @@ public class PulsarClientImpl implements PulsarClient {
private <T> Optional<ConsumerBase<T>> subscriptionExist(ConsumerConfigurationData<?> conf) {
synchronized (consumers) {
Optional<ConsumerBase<?>> subscriber = consumers.keySet().stream()
.filter(consumerBase -> consumerBase.getSubType().equals(PulsarApi.CommandSubscribe.SubType.Shared))
.filter(c -> c.getSubType().equals(PulsarApi.CommandSubscribe.SubType.Shared))
.filter(c -> conf.getTopicNames().contains(c.getTopic()))
.filter(c -> c.getSubscription().equals(conf.getSubscriptionName()))
.filter(Consumer::isConnected)
.findFirst();
return subscriber.map(ConsumerBase.class::cast);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册