diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index 28036e4320f6c8cdb4d5434b1e0d9902a7e62339..f15ea5fe7813a4e44ec6a8ec21f3aeec56aa1af3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -80,34 +80,45 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas } /** - * @return the previous active consumer if the consumer is changed, otherwise null. + * Pick active consumer for a topic for {@link SubType#Failover} subscription. + * If it's a non-partitioned topic then it'll pick consumer based on order they subscribe to the topic. + * If is's a partitioned topic, first sort consumers based on their priority level and consumer name then + * distributed partitions evenly across consumers with highest priority level. + * + * @return the true consumer if the consumer is changed, otherwise false. */ protected boolean pickAndScheduleActiveConsumer() { checkArgument(!consumers.isEmpty()); - - AtomicBoolean hasPriorityConsumer = new AtomicBoolean(false); - consumers.sort((c1, c2) -> { - int priority = c1.getPriorityLevel() - c2.getPriorityLevel(); - if (priority != 0) { - hasPriorityConsumer.set(true); - return priority; - } - return c1.consumerName().compareTo(c2.consumerName()); - }); - - int consumersSize = consumers.size(); - // find number of consumers which are having the highest priorities. so partitioned-topic assignment happens - // evenly across highest priority consumers - if (hasPriorityConsumer.get()) { - int highestPriorityLevel = consumers.get(0).getPriorityLevel(); - for (int i = 0; i < consumers.size(); i++) { - if (highestPriorityLevel != consumers.get(i).getPriorityLevel()) { - consumersSize = i; - break; + // By default always pick the first connected consumer for non partitioned topic. + int index = 0; + + // If it's a partitioned topic, sort consumers based on priority level then consumer name. + if (partitionIndex >= 0) { + AtomicBoolean hasPriorityConsumer = new AtomicBoolean(false); + consumers.sort((c1, c2) -> { + int priority = c1.getPriorityLevel() - c2.getPriorityLevel(); + if (priority != 0) { + hasPriorityConsumer.set(true); + return priority; + } + return c1.consumerName().compareTo(c2.consumerName()); + }); + + int consumersSize = consumers.size(); + // find number of consumers which are having the highest priorities. so partitioned-topic assignment happens + // evenly across highest priority consumers + if (hasPriorityConsumer.get()) { + int highestPriorityLevel = consumers.get(0).getPriorityLevel(); + for (int i = 0; i < consumers.size(); i++) { + if (highestPriorityLevel != consumers.get(i).getPriorityLevel()) { + consumersSize = i; + break; + } } } + index = partitionIndex % consumersSize; } - int index = partitionIndex % consumersSize; + Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index)); Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index ec8ab8dd03cf1f71c4022f99cd7478cf8d90dcfb..e0550122808aa7cd8d30760d0b2398bf84129047 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -199,8 +199,9 @@ public class PersistentSubscription implements Subscription { case Failover: int partitionIndex = TopicName.getPartitionIndex(topicName); if (partitionIndex < 0) { - // For non partition topics, assume index 0 to pick a predictable consumer - partitionIndex = 0; + // For non partition topics, use a negative index so dispatcher won't sort consumers before picking + // an active consumer for the topic. + partitionIndex = -1; } if (dispatcher == null || dispatcher.getType() != SubType.Failover) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index a2838913fe1afd1bdde15f1a1e67ced0d203d053..886018bc11875186b8e88a213879c79ab238b0be 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -304,7 +304,7 @@ public class PersistentDispatcherFailoverConsumerTest { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); - int partitionIndex = 0; + int partitionIndex = 4; PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock, SubType.Failover, partitionIndex, topic, sub); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java index 99566fdda279b7e82b45c99035309aa72d0460a2..9389c1a88211c9b749f0007cb096a637a34a744b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java @@ -232,67 +232,6 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); assertEquals(subRef.getNumberOfEntriesInBacklog(), 0); - for (int i = 0; i < numMsgs; i++) { - String message = "my-message-" + i; - futures.add(producer.sendAsync(message.getBytes())); - } - FutureUtil.waitForAll(futures).get(); - futures.clear(); - - // 6. consumer subscription should send messages to the new consumer if its name is highest in the list - for (int i = 0; i < 5; i++) { - msg = consumer2.receive(1, TimeUnit.SECONDS); - Assert.assertNotNull(msg); - Assert.assertEquals(new String(msg.getData()), "my-message-" + i); - consumer2.acknowledge(msg); - } - consumer1 = consumerBulder1.subscribe(); - Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME); - for (int i = 5; i < numMsgs; i++) { - msg = consumer1.receive(1, TimeUnit.SECONDS); - Assert.assertNotNull(msg); - Assert.assertEquals(new String(msg.getData()), "my-message-" + i); - consumer1.acknowledge(msg); - } - Assert.assertNull(consumer1.receive(100, TimeUnit.MILLISECONDS)); - - rolloverPerIntervalStats(); - Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); - assertEquals(subRef.getNumberOfEntriesInBacklog(), 0); - - for (int i = 0; i < numMsgs; i++) { - String message = "my-message-" + i; - futures.add(producer.sendAsync(message.getBytes())); - } - FutureUtil.waitForAll(futures).get(); - futures.clear(); - - // 7. consumer subscription should not send messages to the new consumer if its name is not highest in the list - for (int i = 0; i < 5; i++) { - msg = consumer1.receive(1, TimeUnit.SECONDS); - Assert.assertNotNull(msg); - Assert.assertEquals(new String(msg.getData()), "my-message-" + i); - consumer1.acknowledge(msg); - } - TestConsumerStateEventListener listener3 = new TestConsumerStateEventListener(); - Consumer consumer3 = consumerBuilder.clone().consumerName("3").consumerEventListener(listener3) - .subscribe(); - Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME); - - verifyConsumerInactive(listener3, -1); - - Assert.assertNull(consumer3.receive(100, TimeUnit.MILLISECONDS)); - for (int i = 5; i < numMsgs; i++) { - msg = consumer1.receive(1, TimeUnit.SECONDS); - Assert.assertNotNull(msg); - Assert.assertEquals(new String(msg.getData()), "my-message-" + i); - consumer1.acknowledge(msg); - } - - rolloverPerIntervalStats(); - Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); - assertEquals(subRef.getNumberOfEntriesInBacklog(), 0); - // 8. unsubscribe not allowed if multiple consumers connected try { consumer1.unsubscribe(); @@ -303,10 +242,9 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { // 9. unsubscribe allowed if there is a lone consumer consumer1.close(); - consumer2.close(); Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME); try { - consumer3.unsubscribe(); + consumer2.unsubscribe(); } catch (PulsarClientException e) { fail("Should not fail", e); } @@ -316,7 +254,7 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { assertNull(subRef); producer.close(); - consumer3.close(); + consumer2.close(); admin.topics().delete(topicName); }