提交 59cf1677 编写于 作者: M Marvin Cai 提交者: xiaolong.ran

Update logic for picking active consumer for failover subscription on...

Update logic for picking active consumer for failover subscription on non-partitioned topic. (#4604)

Instead of sorting the consumers based on priority level and consumer name then pick a active consumer, which could cause subscription getting into a flaky state, where the "active" consumer joins and leaves, no consumer is actually elected as "active" and consuming the messages.
Fix logic to always pick the first consumer in the consumer list without sorting consumers. So consumers will be picked as acive consumer based on the order of their subscription.

(cherry picked from commit 22f1d7d1)
上级 fed52520
......@@ -80,34 +80,45 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
}
/**
* @return the previous active consumer if the consumer is changed, otherwise null.
* Pick active consumer for a topic for {@link SubType#Failover} subscription.
* If it's a non-partitioned topic then it'll pick consumer based on order they subscribe to the topic.
* If is's a partitioned topic, first sort consumers based on their priority level and consumer name then
* distributed partitions evenly across consumers with highest priority level.
*
* @return the true consumer if the consumer is changed, otherwise false.
*/
protected boolean pickAndScheduleActiveConsumer() {
checkArgument(!consumers.isEmpty());
AtomicBoolean hasPriorityConsumer = new AtomicBoolean(false);
consumers.sort((c1, c2) -> {
int priority = c1.getPriorityLevel() - c2.getPriorityLevel();
if (priority != 0) {
hasPriorityConsumer.set(true);
return priority;
}
return c1.consumerName().compareTo(c2.consumerName());
});
int consumersSize = consumers.size();
// find number of consumers which are having the highest priorities. so partitioned-topic assignment happens
// evenly across highest priority consumers
if (hasPriorityConsumer.get()) {
int highestPriorityLevel = consumers.get(0).getPriorityLevel();
for (int i = 0; i < consumers.size(); i++) {
if (highestPriorityLevel != consumers.get(i).getPriorityLevel()) {
consumersSize = i;
break;
// By default always pick the first connected consumer for non partitioned topic.
int index = 0;
// If it's a partitioned topic, sort consumers based on priority level then consumer name.
if (partitionIndex >= 0) {
AtomicBoolean hasPriorityConsumer = new AtomicBoolean(false);
consumers.sort((c1, c2) -> {
int priority = c1.getPriorityLevel() - c2.getPriorityLevel();
if (priority != 0) {
hasPriorityConsumer.set(true);
return priority;
}
return c1.consumerName().compareTo(c2.consumerName());
});
int consumersSize = consumers.size();
// find number of consumers which are having the highest priorities. so partitioned-topic assignment happens
// evenly across highest priority consumers
if (hasPriorityConsumer.get()) {
int highestPriorityLevel = consumers.get(0).getPriorityLevel();
for (int i = 0; i < consumers.size(); i++) {
if (highestPriorityLevel != consumers.get(i).getPriorityLevel()) {
consumersSize = i;
break;
}
}
}
index = partitionIndex % consumersSize;
}
int index = partitionIndex % consumersSize;
Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index));
Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
......
......@@ -199,8 +199,9 @@ public class PersistentSubscription implements Subscription {
case Failover:
int partitionIndex = TopicName.getPartitionIndex(topicName);
if (partitionIndex < 0) {
// For non partition topics, assume index 0 to pick a predictable consumer
partitionIndex = 0;
// For non partition topics, use a negative index so dispatcher won't sort consumers before picking
// an active consumer for the topic.
partitionIndex = -1;
}
if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
......
......@@ -304,7 +304,7 @@ public class PersistentDispatcherFailoverConsumerTest {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
int partitionIndex = 0;
int partitionIndex = 4;
PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock,
SubType.Failover, partitionIndex, topic, sub);
......
......@@ -232,67 +232,6 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
assertEquals(subRef.getNumberOfEntriesInBacklog(), 0);
for (int i = 0; i < numMsgs; i++) {
String message = "my-message-" + i;
futures.add(producer.sendAsync(message.getBytes()));
}
FutureUtil.waitForAll(futures).get();
futures.clear();
// 6. consumer subscription should send messages to the new consumer if its name is highest in the list
for (int i = 0; i < 5; i++) {
msg = consumer2.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
consumer2.acknowledge(msg);
}
consumer1 = consumerBulder1.subscribe();
Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
for (int i = 5; i < numMsgs; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
consumer1.acknowledge(msg);
}
Assert.assertNull(consumer1.receive(100, TimeUnit.MILLISECONDS));
rolloverPerIntervalStats();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
assertEquals(subRef.getNumberOfEntriesInBacklog(), 0);
for (int i = 0; i < numMsgs; i++) {
String message = "my-message-" + i;
futures.add(producer.sendAsync(message.getBytes()));
}
FutureUtil.waitForAll(futures).get();
futures.clear();
// 7. consumer subscription should not send messages to the new consumer if its name is not highest in the list
for (int i = 0; i < 5; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
consumer1.acknowledge(msg);
}
TestConsumerStateEventListener listener3 = new TestConsumerStateEventListener();
Consumer<byte[]> consumer3 = consumerBuilder.clone().consumerName("3").consumerEventListener(listener3)
.subscribe();
Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
verifyConsumerInactive(listener3, -1);
Assert.assertNull(consumer3.receive(100, TimeUnit.MILLISECONDS));
for (int i = 5; i < numMsgs; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
consumer1.acknowledge(msg);
}
rolloverPerIntervalStats();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
assertEquals(subRef.getNumberOfEntriesInBacklog(), 0);
// 8. unsubscribe not allowed if multiple consumers connected
try {
consumer1.unsubscribe();
......@@ -303,10 +242,9 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
// 9. unsubscribe allowed if there is a lone consumer
consumer1.close();
consumer2.close();
Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
try {
consumer3.unsubscribe();
consumer2.unsubscribe();
} catch (PulsarClientException e) {
fail("Should not fail", e);
}
......@@ -316,7 +254,7 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
assertNull(subRef);
producer.close();
consumer3.close();
consumer2.close();
admin.topics().delete(topicName);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册