提交 5862644e 编写于 作者: R Rajan 提交者: GitHub

Refactor dispatcher's getting next consumer with consumer priority (#367)

上级 766e987e
......@@ -94,6 +94,7 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher is already closed. Closing consumer ", name, consumer);
consumer.disconnect();
return;
}
if (consumerList.isEmpty()) {
if (havePendingRead || havePendingReplayRead) {
......@@ -399,33 +400,19 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
*
* <pre>
* <b>Algorithm:</b>
* 1. sorted-list: consumers stored in sorted-list: max-priority stored first
* 1. consumerList: it stores consumers in sorted-list: max-priority stored first
* 2. currentConsumerRoundRobinIndex: it always stores last served consumer-index
* 3. resultingAvailableConsumerIndex: traversal index. we always start searching availableConsumer from the
* beginning of sorted-list and update resultingAvailableConsumerIndex according searching-traversal
*
* Each time getNextConsumer() is called:<p>
* 1. It always starts to traverse from the max-priority consumer (first element) from sorted-list
* 2. Consumers on same priority-level will be treated equally and it tries to pick one of them in round-robin manner
* 3. If consumer is not available on given priority-level then it will go to the next lower priority-level consumers
* 4. Optimization: <p>
* A. Consumers on same priority-level must be treated equally => dispatch message round-robin to them:
* [if Consumer of resultingAvailableConsumerIndex(current-traversal-index) has the same
* priority-level as consumer of currentConsumerRoundRobinIndex(last-Served-Consumer-Index)]
* <b>Dispatching in Round-Robin:</b> then it means we should do round-robin and skip all the consumers before
* currentConsumerRoundRobinIndex (as they are already served previously)
* a. if found: if we found availableConsumer on the same priority-level after currentConsumerRoundRobinIndex
* then return that consumer and update currentConsumerRoundRobinIndex with that consumer-index
* b. else not_found: if we don't find any consumer on that same-priority level after currentConsumerRoundRobinIndex
* - a. check skipped consumers: check skipped consumer (4.A.a) which are on index before than currentConsumerRoundRobinIndex
* - b. next priority-level: if not found in previous step: then it means no consumer available in prior level. So, move to
* next lower priority level and try to find next-available consumer as per 4.A
*
* 3. If consumer is not available on given priority-level then only it will go to the next lower priority-level consumers
* 4. Returns null in case it doesn't find any available consumer
* </pre>
*
* @return nextAvailableConsumer
*/
public Consumer getNextConsumer() {
private Consumer getNextConsumer() {
if (consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == TRUE) {
// abort read if no consumers are connected or if disconnect is initiated
return null;
......@@ -435,57 +422,102 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
currentConsumerRoundRobinIndex = 0;
}
// index of resulting consumer which will be returned
int resultingAvailableConsumerIndex = 0;
boolean scanFromBeginningIfCurrentConsumerNotAvailable = true;
int firstConsumerIndexOfCurrentPriorityLevel = -1;
do {
int priorityLevel = consumerList.get(currentConsumerRoundRobinIndex).getPriorityLevel()
- consumerList.get(resultingAvailableConsumerIndex).getPriorityLevel();
int currentRoundRobinConsumerPriority = consumerList.get(currentConsumerRoundRobinIndex).getPriorityLevel();
boolean isSamePriorityLevel = priorityLevel == 0;
// store first-consumer index with same-priority as currentConsumerRoundRobinIndex
if (isSamePriorityLevel && firstConsumerIndexOfCurrentPriorityLevel == -1) {
firstConsumerIndexOfCurrentPriorityLevel = resultingAvailableConsumerIndex;
// first find available-consumer on higher level unless currentIndex is not on highest level which is 0
if (currentRoundRobinConsumerPriority != 0) {
int higherPriorityConsumerIndex = getConsumerFromHigherPriority(currentRoundRobinConsumerPriority);
if (higherPriorityConsumerIndex != -1) {
currentConsumerRoundRobinIndex = higherPriorityConsumerIndex + 1;
return consumerList.get(higherPriorityConsumerIndex);
}
}
// currentIndex is already on highest level or couldn't find consumer on higher level so, find consumer on same or lower
// level
int availableConsumerIndex = getNextConsumerFromSameOrLowerLevel(currentConsumerRoundRobinIndex);
if (availableConsumerIndex != -1) {
currentConsumerRoundRobinIndex = availableConsumerIndex + 1;
return consumerList.get(availableConsumerIndex);
}
// skip already served same-priority-consumer to select consumer in round-robin manner
resultingAvailableConsumerIndex = (isSamePriorityLevel
&& currentConsumerRoundRobinIndex > resultingAvailableConsumerIndex)
? currentConsumerRoundRobinIndex : resultingAvailableConsumerIndex;
// if resultingAvailableConsumerIndex moved ahead of currentConsumerRoundRobinIndex: then we should
// check skipped consumer which had same priority as currentConsumerRoundRobinIndex consumer
boolean isLastConsumerBlocked = (currentConsumerRoundRobinIndex == (consumerList.size() - 1)
&& !isConsumerAvailable(consumerList.get(currentConsumerRoundRobinIndex)));
boolean shouldScanCurrentLevel = priorityLevel < 0
/* means moved to next lower-priority-level */ || isLastConsumerBlocked;
if (shouldScanCurrentLevel && scanFromBeginningIfCurrentConsumerNotAvailable) {
for (int i = firstConsumerIndexOfCurrentPriorityLevel; i < currentConsumerRoundRobinIndex; i++) {
Consumer nextConsumer = consumerList.get(i);
if (isConsumerAvailable(nextConsumer)) {
currentConsumerRoundRobinIndex = i + 1;
return nextConsumer;
}
// couldn't find available consumer
return null;
}
/**
* Finds index of first available consumer which has higher priority then given targetPriority
* @param targetPriority
* @return -1 if couldn't find any available consumer
*/
private int getConsumerFromHigherPriority(int targetPriority) {
for (int i = 0; i < currentConsumerRoundRobinIndex; i++) {
Consumer consumer = consumerList.get(i);
if (consumer.getPriorityLevel() < targetPriority) {
if (isConsumerAvailable(consumerList.get(i))) {
return i;
}
// now, we have scanned from the beginning: flip the flag to avoid scan again
scanFromBeginningIfCurrentConsumerNotAvailable = false;
} else {
break;
}
}
return -1;
}
Consumer nextConsumer = consumerList.get(resultingAvailableConsumerIndex);
if (isConsumerAvailable(nextConsumer)) {
currentConsumerRoundRobinIndex = resultingAvailableConsumerIndex + 1;
return nextConsumer;
/**
* Finds index of round-robin available consumer that present on same level as consumer on currentRoundRobinIndex if doesn't
* find consumer on same level then it finds first available consumer on lower priority level else returns index=-1
* if couldn't find any available consumer in the list
*
* @param currentRoundRobinIndex
* @return
*/
private int getNextConsumerFromSameOrLowerLevel(int currentRoundRobinIndex) {
int targetPriority = consumerList.get(currentRoundRobinIndex).getPriorityLevel();
// use to do round-robin if can't find consumer from currentRR to last-consumer in list
int scanIndex = currentRoundRobinIndex;
int endPriorityLevelIndex = currentRoundRobinIndex;
do {
Consumer scanConsumer = scanIndex < consumerList.size() ? consumerList.get(scanIndex)
: null /* reached to last consumer of list */;
// if reached to last consumer of list then check from beginning to currentRRIndex of the list
if (scanConsumer == null || scanConsumer.getPriorityLevel() != targetPriority) {
endPriorityLevelIndex = scanIndex; // last consumer on this level
scanIndex = getFirstConsumerIndexOfPriority(targetPriority);
} else {
if (isConsumerAvailable(scanConsumer)) {
return scanIndex;
}
scanIndex++;
}
if (++resultingAvailableConsumerIndex >= consumerList.size()) {
break;
} while (scanIndex != currentRoundRobinIndex);
// it means: didn't find consumer in the same priority-level so, check available consumer lower than this level
for (int i = endPriorityLevelIndex; i < consumerList.size(); i++) {
if (isConsumerAvailable(consumerList.get(i))) {
return i;
}
} while (true);
}
// not found unblocked consumer
return null;
return -1;
}
/**
* Finds index of first consumer in list which has same priority as given targetPriority
* @param targetPriority
* @return
*/
private int getFirstConsumerIndexOfPriority(int targetPriority) {
for (int i = 0; i < consumerList.size(); i++) {
if (consumerList.get(i).getPriorityLevel() == targetPriority) {
return i;
}
}
return -1;
}
/**
* returns true only if {@link consumerList} has atleast one unblocked consumer and have available permits
*
......
......@@ -28,6 +28,7 @@ import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
......@@ -269,15 +270,15 @@ public class PersistentDispatcherFailoverConsumerTest {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock);
Consumer consumer1 = createConsumer(0, 2, 1);
Consumer consumer2 = createConsumer(0, 2, 2);
Consumer consumer3 = createConsumer(0, 2, 3);
Consumer consumer4 = createConsumer(1, 2, 4);
Consumer consumer5 = createConsumer(1, 1, 5);
Consumer consumer6 = createConsumer(1, 2, 6);
Consumer consumer7 = createConsumer(2, 1, 7);
Consumer consumer8 = createConsumer(2, 1, 8);
Consumer consumer9 = createConsumer(2, 1, 9);
Consumer consumer1 = createConsumer(0, 2, false, 1);
Consumer consumer2 = createConsumer(0, 2, false, 2);
Consumer consumer3 = createConsumer(0, 2, false, 3);
Consumer consumer4 = createConsumer(1, 2, false, 4);
Consumer consumer5 = createConsumer(1, 1, false, 5);
Consumer consumer6 = createConsumer(1, 2, false, 6);
Consumer consumer7 = createConsumer(2, 1, false, 7);
Consumer consumer8 = createConsumer(2, 1, false, 8);
Consumer consumer9 = createConsumer(2, 1, false, 9);
dispatcher.addConsumer(consumer1);
dispatcher.addConsumer(consumer2);
dispatcher.addConsumer(consumer3);
......@@ -301,7 +302,7 @@ public class PersistentDispatcherFailoverConsumerTest {
Assert.assertEquals(getNextConsumer(dispatcher), consumer7);
Assert.assertEquals(getNextConsumer(dispatcher), consumer8);
// in between add upper priority consumer with more permits
Consumer consumer10 = createConsumer(0, 2, 10);
Consumer consumer10 = createConsumer(0, 2, false, 10);
dispatcher.addConsumer(consumer10);
Assert.assertEquals(getNextConsumer(dispatcher), consumer10);
Assert.assertEquals(getNextConsumer(dispatcher), consumer10);
......@@ -309,21 +310,138 @@ public class PersistentDispatcherFailoverConsumerTest {
}
@Test
public void testFewBlockedConsumerSamePriority() throws Exception{
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock);
Consumer consumer1 = createConsumer(0, 2, false, 1);
Consumer consumer2 = createConsumer(0, 2, false, 2);
Consumer consumer3 = createConsumer(0, 2, false, 3);
Consumer consumer4 = createConsumer(0, 2, false, 4);
Consumer consumer5 = createConsumer(0, 1, true, 5);
Consumer consumer6 = createConsumer(0, 2, true, 6);
dispatcher.addConsumer(consumer1);
dispatcher.addConsumer(consumer2);
dispatcher.addConsumer(consumer3);
dispatcher.addConsumer(consumer4);
dispatcher.addConsumer(consumer5);
dispatcher.addConsumer(consumer6);
Assert.assertEquals(getNextConsumer(dispatcher), consumer1);
Assert.assertEquals(getNextConsumer(dispatcher), consumer2);
Assert.assertEquals(getNextConsumer(dispatcher), consumer3);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer1);
Assert.assertEquals(getNextConsumer(dispatcher), consumer2);
Assert.assertEquals(getNextConsumer(dispatcher), consumer3);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), null);
}
@Test
public void testFewBlockedConsumerDifferentPriority() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock);
Consumer consumer1 = createConsumer(0, 2, false, 1);
Consumer consumer2 = createConsumer(0, 2, false, 2);
Consumer consumer3 = createConsumer(0, 2, false, 3);
Consumer consumer4 = createConsumer(0, 2, false, 4);
Consumer consumer5 = createConsumer(0, 1, true, 5);
Consumer consumer6 = createConsumer(0, 2, true, 6);
Consumer consumer7 = createConsumer(1, 2, false, 7);
Consumer consumer8 = createConsumer(1, 10, true, 8);
Consumer consumer9 = createConsumer(1, 2, false, 9);
Consumer consumer10 = createConsumer(2, 2, false, 10);
Consumer consumer11 = createConsumer(2, 10, true, 11);
Consumer consumer12 = createConsumer(2, 2, false, 12);
dispatcher.addConsumer(consumer1);
dispatcher.addConsumer(consumer2);
dispatcher.addConsumer(consumer3);
dispatcher.addConsumer(consumer4);
dispatcher.addConsumer(consumer5);
dispatcher.addConsumer(consumer6);
dispatcher.addConsumer(consumer7);
dispatcher.addConsumer(consumer8);
dispatcher.addConsumer(consumer9);
dispatcher.addConsumer(consumer10);
dispatcher.addConsumer(consumer11);
dispatcher.addConsumer(consumer12);
Assert.assertEquals(getNextConsumer(dispatcher), consumer1);
Assert.assertEquals(getNextConsumer(dispatcher), consumer2);
Assert.assertEquals(getNextConsumer(dispatcher), consumer3);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer1);
Assert.assertEquals(getNextConsumer(dispatcher), consumer2);
Assert.assertEquals(getNextConsumer(dispatcher), consumer3);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer7);
Assert.assertEquals(getNextConsumer(dispatcher), consumer9);
Assert.assertEquals(getNextConsumer(dispatcher), consumer7);
Assert.assertEquals(getNextConsumer(dispatcher), consumer9);
Assert.assertEquals(getNextConsumer(dispatcher), consumer10);
Assert.assertEquals(getNextConsumer(dispatcher), consumer12);
// add consumer with lower priority again
Consumer consumer13 = createConsumer(0, 2, false, 13);
Consumer consumer14 = createConsumer(0, 2, true, 14);
dispatcher.addConsumer(consumer13);
dispatcher.addConsumer(consumer14);
Assert.assertEquals(getNextConsumer(dispatcher), consumer13);
Assert.assertEquals(getNextConsumer(dispatcher), consumer13);
Assert.assertEquals(getNextConsumer(dispatcher), consumer10);
Assert.assertEquals(getNextConsumer(dispatcher), consumer12);
Assert.assertEquals(getNextConsumer(dispatcher), null);
}
@Test
public void testFewBlockedConsumerDifferentPriority2() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock);
Consumer consumer1 = createConsumer(0, 2, true, 1);
Consumer consumer2 = createConsumer(0, 2, true, 2);
Consumer consumer3 = createConsumer(0, 2, true, 3);
Consumer consumer4 = createConsumer(1, 2, false, 4);
Consumer consumer5 = createConsumer(1, 1, false, 5);
Consumer consumer6 = createConsumer(2, 1, false, 6);
Consumer consumer7 = createConsumer(2, 2, true, 7);
dispatcher.addConsumer(consumer1);
dispatcher.addConsumer(consumer2);
dispatcher.addConsumer(consumer3);
dispatcher.addConsumer(consumer4);
dispatcher.addConsumer(consumer5);
dispatcher.addConsumer(consumer6);
dispatcher.addConsumer(consumer7);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer5);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer6);
Assert.assertEquals(getNextConsumer(dispatcher), null);
}
private Consumer getNextConsumer(PersistentDispatcherMultipleConsumers dispatcher) throws Exception {
Consumer consumer = dispatcher.getNextConsumer();
Field field = Consumer.class.getDeclaredField("MESSAGE_PERMITS_UPDATER");
field.setAccessible(true);
AtomicIntegerFieldUpdater<Consumer> messagePermits = (AtomicIntegerFieldUpdater) field.get(consumer);
messagePermits.decrementAndGet(consumer);
return consumer;
Method getNextConsumerMethod = PersistentDispatcherMultipleConsumers.class.getDeclaredMethod("getNextConsumer");
getNextConsumerMethod.setAccessible(true);
Consumer consumer = (Consumer) getNextConsumerMethod.invoke(dispatcher);
if (consumer != null) {
Field field = Consumer.class.getDeclaredField("MESSAGE_PERMITS_UPDATER");
field.setAccessible(true);
AtomicIntegerFieldUpdater<Consumer> messagePermits = (AtomicIntegerFieldUpdater) field.get(consumer);
messagePermits.decrementAndGet(consumer);
return consumer;
}
return null;
}
private Consumer createConsumer(int priority, int permit, int id) throws BrokerServiceException {
private Consumer createConsumer(int priority, int permit, boolean blocked, int id) throws Exception {
Consumer consumer = new Consumer(null, SubType.Shared, id, priority, ""+id, 5000, serverCnx, "appId");
try {
consumer.flowPermits(permit);
} catch (Exception e) {
}
// set consumer blocked flag
Field blockField = Consumer.class.getDeclaredField("blockedConsumerOnUnackedMsgs");
blockField.setAccessible(true);
blockField.set(consumer, blocked);
return consumer;
}
......
......@@ -1268,7 +1268,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
}
// (2) wait for consumer to receive messages
Thread.sleep(200);
Thread.sleep(1000);
assertEquals(consumer.numMessagesInQueue(), receiverQueueSize);
// (3) wait for messages to expire, we should've received more
......@@ -1836,6 +1836,122 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
/**
* <pre>
* Verifies Dispatcher dispatches messages properly with shared-subscription consumers with combination of blocked
* and unblocked consumers.
*
* 1. Dispatcher will have 5 consumers : c1, c2, c3, c4, c5.
* Out of which : c1,c2,c4,c5 will be blocked due to MaxUnackedMessages limit.
* 2. So, dispatcher should moves round-robin and make sure it delivers unblocked consumer : c3
* </pre>
*
* @throws Exception
*/
@Test(timeOut=5000)
public void testSharedSamePriorityConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);
ConsumerConfiguration conf1 = new ConsumerConfiguration();
conf1.setSubscriptionType(SubscriptionType.Shared);
final int queueSize = 5;
conf1.setReceiverQueueSize(queueSize);
int maxUnAckMsgs = pulsar.getConfiguration().getMaxConcurrentLookupRequest();
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(queueSize);
Consumer c1 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
conf1);
Consumer c2 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
conf1);
ProducerConfiguration producerConf = new ProducerConfiguration();
Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic2", producerConf);
List<Future<MessageId>> futures = Lists.newArrayList();
// Asynchronously produce messages
final int totalPublishMessages = 500;
for (int i = 0; i < totalPublishMessages; i++) {
final String message = "my-message-" + i;
Future<MessageId> future = producer.sendAsync(message.getBytes());
futures.add(future);
}
log.info("Waiting for async publish to complete");
for (Future<MessageId> future : futures) {
future.get();
}
List<Message> messages = Lists.newArrayList();
// let consumer1 and consumer2 cosume messages up to the queue will be full
for (int i = 0; i < totalPublishMessages; i++) {
Message msg = c1.receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.add(msg);
} else {
break;
}
}
for (int i = 0; i < totalPublishMessages; i++) {
Message msg = c2.receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.add(msg);
} else {
break;
}
}
Assert.assertEquals(queueSize * 2, messages.size());
// create new consumers with the same priority
Consumer c3 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
conf1);
Consumer c4 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
conf1);
Consumer c5 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
conf1);
// c1 and c2 are blocked: so, let c3, c4 and c5 consume rest of the messages
for (int i = 0; i < totalPublishMessages; i++) {
Message msg = c4.receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.add(msg);
} else {
break;
}
}
for (int i = 0; i < totalPublishMessages; i++) {
Message msg = c5.receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.add(msg);
} else {
break;
}
}
for (int i = 0; i < totalPublishMessages; i++) {
Message msg = c3.receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.add(msg);
c3.acknowledge(msg);
} else {
break;
}
}
// total messages must be consumed by all consumers
Assert.assertEquals(messages.size(), totalPublishMessages);
// Asynchronously acknowledge upto and including the last message
producer.close();
c1.close();
c2.close();
c3.close();
c4.close();
c5.close();
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnAckMsgs);
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testRedeliveryFailOverConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册