提交 63b09e70 编写于 作者: M Masahiro Sakamoto 提交者: xiaolong.ran

[pulsar-broker] Close previous dispatcher when subscription type changes (#5288)

* Close previous dispatcher when subscription type changes

* Use isClosed directly to check if dispatcher is closed

(cherry picked from commit 413ba030)
上级 8b33ff28
......@@ -63,6 +63,10 @@ public abstract class AbstractDispatcherMultipleConsumers extends AbstractBaseDi
return consumerList.size() == 1 && consumerSet.contains(consumer);
}
public boolean isClosed() {
return isClosed == TRUE;
}
public SubType getType() {
return SubType.Shared;
}
......
......@@ -197,6 +197,10 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
return disconnectAllConsumers();
}
public boolean isClosed() {
return isClosed == TRUE;
}
/**
* Disconnect all consumers on this dispatcher (server side close). This triggers channelInactive on the inbound
* handler which calls dispatcher.removeConsumer(), where the closeFuture is completed
......
......@@ -54,6 +54,8 @@ public interface Dispatcher {
*/
CompletableFuture<Void> close();
boolean isClosed();
/**
* disconnect all consumers
*
......
......@@ -88,14 +88,18 @@ public class NonPersistentSubscription implements Subscription {
}
if (dispatcher == null || !dispatcher.isConsumerConnected()) {
Dispatcher previousDispatcher = null;
switch (consumer.subType()) {
case Exclusive:
if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) {
previousDispatcher = dispatcher;
dispatcher = new NonPersistentDispatcherSingleActiveConsumer(SubType.Exclusive, 0, topic, this);
}
break;
case Shared:
if (dispatcher == null || dispatcher.getType() != SubType.Shared) {
previousDispatcher = dispatcher;
dispatcher = new NonPersistentDispatcherMultipleConsumers(topic, this);
}
break;
......@@ -107,18 +111,29 @@ public class NonPersistentSubscription implements Subscription {
}
if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
previousDispatcher = dispatcher;
dispatcher = new NonPersistentDispatcherSingleActiveConsumer(SubType.Failover, partitionIndex,
topic, this);
}
break;
case Key_Shared:
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
previousDispatcher = dispatcher;
dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this);
}
break;
default:
throw new ServerMetadataException("Unsupported subscription type");
}
if (previousDispatcher != null) {
previousDispatcher.close().thenRun(() -> {
log.info("[{}][{}] Successfully closed previous dispatcher", topicName, subName);
}).exceptionally(ex -> {
log.error("[{}][{}] Failed to close previous dispatcher", topicName, subName, ex);
return null;
});
}
} else {
if (consumer.subType() != dispatcher.getType()) {
throw new SubscriptionBusyException("Subscription is of different type");
......
......@@ -181,14 +181,18 @@ public class PersistentSubscription implements Subscription {
}
if (dispatcher == null || !dispatcher.isConsumerConnected()) {
Dispatcher previousDispatcher = null;
switch (consumer.subType()) {
case Exclusive:
if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) {
previousDispatcher = dispatcher;
dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Exclusive, 0, topic, this);
}
break;
case Shared:
if (dispatcher == null || dispatcher.getType() != SubType.Shared) {
previousDispatcher = dispatcher;
dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, this);
}
break;
......@@ -200,18 +204,29 @@ public class PersistentSubscription implements Subscription {
}
if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
previousDispatcher = dispatcher;
dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover, partitionIndex,
topic, this);
}
break;
case Key_Shared:
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
previousDispatcher = dispatcher;
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this);
}
break;
default:
throw new ServerMetadataException("Unsupported subscription type");
}
if (previousDispatcher != null) {
previousDispatcher.close().thenRun(() -> {
log.info("[{}][{}] Successfully closed previous dispatcher", topicName, subName);
}).exceptionally(ex -> {
log.error("[{}][{}] Failed to close previous dispatcher", topicName, subName, ex);
return null;
});
}
} else {
if (consumer.subType() != dispatcher.getType()) {
throw new SubscriptionBusyException("Subscription is of different type");
......
......@@ -475,6 +475,38 @@ public class PersistentTopicTest {
assertNull(topic.getSubscription(successSubName));
}
@Test
public void testChangeSubscriptionType() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic, "change-sub-type", cursorMock, false);
Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1, 0, "Cons1", 50000, serverCnx,
"myrole-1", Collections.emptyMap(), false, InitialPosition.Latest);
sub.addConsumer(consumer);
consumer.close();
SubType previousSubType = SubType.Exclusive;
for (SubType subType : Lists.newArrayList(SubType.Shared, SubType.Failover, SubType.Key_Shared,
SubType.Exclusive)) {
Dispatcher previousDispatcher = sub.getDispatcher();
consumer = new Consumer(sub, subType, topic.getName(), 1, 0, "Cons1", 50000, serverCnx, "myrole-1",
Collections.emptyMap(), false, InitialPosition.Latest);
sub.addConsumer(consumer);
assertTrue(sub.getDispatcher().isConsumerConnected());
assertFalse(sub.getDispatcher().isClosed());
assertEquals(sub.getDispatcher().getType(), subType);
assertFalse(previousDispatcher.isConsumerConnected());
assertTrue(previousDispatcher.isClosed());
assertEquals(previousDispatcher.getType(), previousSubType);
consumer.close();
previousSubType = subType;
}
}
@Test
public void testAddRemoveConsumer() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
......@@ -924,6 +956,7 @@ public class PersistentTopicTest {
doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
doReturn("mockCursor").when(cursorMock).getName();
doReturn(true).when(cursorMock).isDurable();
// doNothing().when(cursorMock).asyncClose(new CloseCallback() {
doAnswer(new Answer<Object>() {
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册