/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper; import static org.mockito.Mockito.any; import static org.mockito.Mockito.matches; import static org.mockito.Mockito.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertSame; import static org.testng.AssertJUnit.assertTrue; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand; import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.apache.pulsar.zookeeper.ZooKeeperCache; import org.apache.pulsar.zookeeper.ZooKeeperDataCache; import org.apache.zookeeper.ZooKeeper; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class PersistentDispatcherFailoverConsumerTest { private BrokerService brokerService; private ManagedLedgerFactory mlFactoryMock; private ServerCnx serverCnx; private ServerCnx serverCnxWithOldVersion; private ManagedLedger ledgerMock; private ManagedCursor cursorMock; private ConfigurationCacheService configCacheService; private ChannelHandlerContext channelCtx; private LinkedBlockingQueue consumerChanges; final String successTopicName = "persistent://part-perf/global/perf.t1/ptopic"; final String failTopicName = "persistent://part-perf/global/perf.t1/pfailTopic"; @BeforeMethod public void setup() throws Exception { ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); PulsarService pulsar = spy(new PulsarService(svcConfig)); doReturn(svcConfig).when(pulsar).getConfiguration(); mlFactoryMock = mock(ManagedLedgerFactory.class); doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory(); ZooKeeper mockZk = createMockZooKeeper(); doReturn(mockZk).when(pulsar).getZkClient(); doReturn(createMockBookKeeper(mockZk, pulsar.getOrderedExecutor().chooseThread(0))) .when(pulsar).getBookKeeperClient(); ZooKeeperCache cache = mock(ZooKeeperCache.class); doReturn(30).when(cache).getZkOperationTimeoutSeconds(); doReturn(cache).when(pulsar).getLocalZkCache(); configCacheService = mock(ConfigurationCacheService.class); @SuppressWarnings("unchecked") ZooKeeperDataCache zkDataCache = mock(ZooKeeperDataCache.class); LocalZooKeeperCacheService zkCache = mock(LocalZooKeeperCacheService.class); doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zkDataCache).getAsync(any()); doReturn(zkDataCache).when(zkCache).policiesCache(); doReturn(zkDataCache).when(configCacheService).policiesCache(); doReturn(configCacheService).when(pulsar).getConfigurationCache(); doReturn(zkCache).when(pulsar).getLocalZkCacheService(); brokerService = spy(new BrokerService(pulsar)); doReturn(brokerService).when(pulsar).getBrokerService(); consumerChanges = new LinkedBlockingQueue<>(); this.channelCtx = mock(ChannelHandlerContext.class); doAnswer(invocationOnMock -> { ByteBuf buf = invocationOnMock.getArgument(0); ByteBuf cmdBuf = buf.retainedSlice(4, buf.writerIndex() - 4); try { int cmdSize = (int) cmdBuf.readUnsignedInt(); int writerIndex = cmdBuf.writerIndex(); cmdBuf.writerIndex(cmdBuf.readerIndex() + cmdSize); ByteBufCodedInputStream cmdInputStream = ByteBufCodedInputStream.get(cmdBuf); BaseCommand.Builder cmdBuilder = BaseCommand.newBuilder(); BaseCommand cmd = cmdBuilder.mergeFrom(cmdInputStream, null).build(); cmdBuilder.recycle(); cmdBuf.writerIndex(writerIndex); cmdInputStream.recycle(); if (cmd.hasActiveConsumerChange()) { consumerChanges.put(cmd.getActiveConsumerChange()); } cmd.recycle(); } finally { cmdBuf.release(); } return null; }).when(channelCtx).writeAndFlush(any(), any()); serverCnx = spy(new ServerCnx(pulsar)); doReturn(true).when(serverCnx).isActive(); doReturn(true).when(serverCnx).isWritable(); doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress(); when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getNumber()); when(serverCnx.ctx()).thenReturn(channelCtx); serverCnxWithOldVersion = spy(new ServerCnx(pulsar)); doReturn(true).when(serverCnxWithOldVersion).isActive(); doReturn(true).when(serverCnxWithOldVersion).isWritable(); doReturn(new InetSocketAddress("localhost", 1234)) .when(serverCnxWithOldVersion).clientAddress(); when(serverCnxWithOldVersion.getRemoteEndpointProtocolVersion()) .thenReturn(ProtocolVersion.v11.getNumber()); when(serverCnxWithOldVersion.ctx()).thenReturn(channelCtx); NamespaceService nsSvc = mock(NamespaceService.class); doReturn(nsSvc).when(pulsar).getNamespaceService(); doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class)); doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class)); setupMLAsyncCallbackMocks(); } void setupMLAsyncCallbackMocks() { ledgerMock = mock(ManagedLedger.class); cursorMock = mock(ManagedCursor.class); doReturn(new ArrayList()).when(ledgerMock).getCursors(); doReturn("mockCursor").when(cursorMock).getName(); // call openLedgerComplete with ledgerMock on ML factory asyncOpen doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null); return null; } }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class), any()); // call openLedgerFailed on ML factory asyncOpen doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { ((OpenLedgerCallback) invocationOnMock.getArguments()[2]) .openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null); return null; } }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class), any()); // call addComplete on ledger asyncAddEntry doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { ((AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(new PositionImpl(1, 1), null); return null; } }).when(ledgerMock).asyncAddEntry(any(byte[].class), any(AddEntryCallback.class), any()); // call openCursorComplete on cursor asyncOpen doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { ((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null); return null; } }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any()); // call deleteLedgerComplete on ledger asyncDelete doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { ((DeleteLedgerCallback) invocationOnMock.getArguments()[0]).deleteLedgerComplete(null); return null; } }).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), any()); doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { ((DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete(null); return null; } }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), any()); } private void verifyActiveConsumerChange(CommandActiveConsumerChange change, long consumerId, boolean isActive) { assertEquals(consumerId, change.getConsumerId()); assertEquals(isActive, change.getIsActive()); change.recycle(); } @Test public void testConsumerGroupChangesWithOldNewConsumers() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); int partitionIndex = 0; PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock, SubType.Failover, partitionIndex, topic, sub); // 1. Verify no consumers connected assertFalse(pdfc.isConsumerConnected()); // 2. Add old consumer Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest); pdfc.addConsumer(consumer1); List consumers = pdfc.getConsumers(); assertSame(consumers.get(0).consumerName(), consumer1.consumerName()); assertEquals(1, consumers.size()); assertNull(consumerChanges.poll()); verify(channelCtx, times(0)).write(any()); // 3. Add new consumer Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest); pdfc.addConsumer(consumer2); consumers = pdfc.getConsumers(); assertSame(consumers.get(0).consumerName(), consumer1.consumerName()); assertEquals(2, consumers.size()); CommandActiveConsumerChange change = consumerChanges.take(); verifyActiveConsumerChange(change, 2, false); verify(channelCtx, times(1)).writeAndFlush(any(), any()); } @Test public void testAddRemoveConsumer() throws Exception { log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---"); PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); int partitionIndex = 4; PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock, SubType.Failover, partitionIndex, topic, sub); // 1. Verify no consumers connected assertFalse(pdfc.isConsumerConnected()); // 2. Add consumer Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest)); pdfc.addConsumer(consumer1); List consumers = pdfc.getConsumers(); assertSame(consumers.get(0).consumerName(), consumer1.consumerName()); assertEquals(1, consumers.size()); CommandActiveConsumerChange change = consumerChanges.take(); verifyActiveConsumerChange(change, 1, true); verify(consumer1, times(1)).notifyActiveConsumerChange(same(consumer1)); // 3. Add again, duplicate allowed pdfc.addConsumer(consumer1); consumers = pdfc.getConsumers(); assertSame(consumers.get(0).consumerName(), consumer1.consumerName()); assertEquals(2, consumers.size()); // 4. Verify active consumer assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName()); // get the notified with who is the leader change = consumerChanges.take(); verifyActiveConsumerChange(change, 1, true); verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1)); // 5. Add another consumer which does not change active consumer Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest)); pdfc.addConsumer(consumer2); consumers = pdfc.getConsumers(); assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName()); assertEquals(3, consumers.size()); // get notified with who is the leader change = consumerChanges.take(); verifyActiveConsumerChange(change, 2, false); verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1)); verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1)); // 6. Add a consumer which changes active consumer Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0, "Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest)); pdfc.addConsumer(consumer0); consumers = pdfc.getConsumers(); assertSame(pdfc.getActiveConsumer().consumerName(), consumer0.consumerName()); assertEquals(4, consumers.size()); // all consumers will receive notifications change = consumerChanges.take(); verifyActiveConsumerChange(change, 0, true); change = consumerChanges.take(); verifyActiveConsumerChange(change, 1, false); change = consumerChanges.take(); verifyActiveConsumerChange(change, 1, false); change = consumerChanges.take(); verifyActiveConsumerChange(change, 2, false); verify(consumer0, times(1)).notifyActiveConsumerChange(same(consumer0)); verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1)); verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer0)); verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1)); verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer0)); // 7. Remove last consumer pdfc.removeConsumer(consumer2); consumers = pdfc.getConsumers(); assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName()); assertEquals(3, consumers.size()); // not consumer group changes assertNull(consumerChanges.poll()); // 8. Verify if we cannot unsubscribe when more than one consumer is connected assertFalse(pdfc.canUnsubscribe(consumer0)); // 9. Remove active consumer pdfc.removeConsumer(consumer0); consumers = pdfc.getConsumers(); assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName()); assertEquals(2, consumers.size()); // the remaining consumers will receive notifications change = consumerChanges.take(); verifyActiveConsumerChange(change, 1, true); change = consumerChanges.take(); verifyActiveConsumerChange(change, 1, true); // 10. Attempt to remove already removed consumer String cause = ""; try { pdfc.removeConsumer(consumer0); } catch (Exception e) { cause = e.getMessage(); } assertEquals(cause, "Consumer was not connected"); // 11. Remove active consumer pdfc.removeConsumer(consumer1); consumers = pdfc.getConsumers(); assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName()); assertEquals(1, consumers.size()); // not consumer group changes assertNull(consumerChanges.poll()); // 11. With only one consumer, unsubscribe is allowed assertTrue(pdfc.canUnsubscribe(consumer1)); } @Test public void testAddRemoveConsumerNonPartitionedTopic() throws Exception { log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---"); PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); // Non partitioned topic. int partitionIndex = -1; PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock, SubType.Failover, partitionIndex, topic, sub); // 1. Verify no consumers connected assertFalse(pdfc.isConsumerConnected()); // 2. Add a consumer Consumer consumer1 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 1, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest)); pdfc.addConsumer(consumer1); List consumers = pdfc.getConsumers(); assertEquals(1, consumers.size()); assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName()); // 3. Add a consumer with same priority level and consumer name is smaller in lexicographic order. Consumer consumer2 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 1, "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest)); pdfc.addConsumer(consumer2); // 4. Verify active consumer doesn't change consumers = pdfc.getConsumers(); assertEquals(2, consumers.size()); CommandActiveConsumerChange change = consumerChanges.take(); verifyActiveConsumerChange(change, 2, false); assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName()); verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1)); // 5. Add another consumer which has higher priority level Consumer consumer3 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest)); pdfc.addConsumer(consumer3); consumers = pdfc.getConsumers(); assertEquals(3, consumers.size()); change = consumerChanges.take(); verifyActiveConsumerChange(change, 3, false); assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName()); verify(consumer3, times(1)).notifyActiveConsumerChange(same(consumer1)); // 7. Remove first consumer and active consumer should change to consumer2 since it's added before consumer3 // though consumer 3 has higher priority level pdfc.removeConsumer(consumer1); consumers = pdfc.getConsumers(); assertEquals(2, consumers.size()); change = consumerChanges.take(); verifyActiveConsumerChange(change, 2, true); assertSame(pdfc.getActiveConsumer().consumerName(), consumer2.consumerName()); verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer2)); verify(consumer3, times(1)).notifyActiveConsumerChange(same(consumer2)); } @Test public void testMultipleDispatcherGetNextConsumerWithDifferentPriorityLevel() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, null); Consumer consumer1 = createConsumer(0, 2, false, 1); Consumer consumer2 = createConsumer(0, 2, false, 2); Consumer consumer3 = createConsumer(0, 2, false, 3); Consumer consumer4 = createConsumer(1, 2, false, 4); Consumer consumer5 = createConsumer(1, 1, false, 5); Consumer consumer6 = createConsumer(1, 2, false, 6); Consumer consumer7 = createConsumer(2, 1, false, 7); Consumer consumer8 = createConsumer(2, 1, false, 8); Consumer consumer9 = createConsumer(2, 1, false, 9); dispatcher.addConsumer(consumer1); dispatcher.addConsumer(consumer2); dispatcher.addConsumer(consumer3); dispatcher.addConsumer(consumer4); dispatcher.addConsumer(consumer5); dispatcher.addConsumer(consumer6); dispatcher.addConsumer(consumer7); dispatcher.addConsumer(consumer8); dispatcher.addConsumer(consumer9); Assert.assertEquals(getNextConsumer(dispatcher), consumer1); Assert.assertEquals(getNextConsumer(dispatcher), consumer2); Assert.assertEquals(getNextConsumer(dispatcher), consumer3); Assert.assertEquals(getNextConsumer(dispatcher), consumer1); Assert.assertEquals(getNextConsumer(dispatcher), consumer2); Assert.assertEquals(getNextConsumer(dispatcher), consumer3); Assert.assertEquals(getNextConsumer(dispatcher), consumer4); Assert.assertEquals(getNextConsumer(dispatcher), consumer5); Assert.assertEquals(getNextConsumer(dispatcher), consumer6); Assert.assertEquals(getNextConsumer(dispatcher), consumer4); Assert.assertEquals(getNextConsumer(dispatcher), consumer6); Assert.assertEquals(getNextConsumer(dispatcher), consumer7); Assert.assertEquals(getNextConsumer(dispatcher), consumer8); // in between add upper priority consumer with more permits Consumer consumer10 = createConsumer(0, 2, false, 10); dispatcher.addConsumer(consumer10); Assert.assertEquals(getNextConsumer(dispatcher), consumer10); Assert.assertEquals(getNextConsumer(dispatcher), consumer10); Assert.assertEquals(getNextConsumer(dispatcher), consumer9); } @Test public void testFewBlockedConsumerSamePriority() throws Exception{ PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, null); Consumer consumer1 = createConsumer(0, 2, false, 1); Consumer consumer2 = createConsumer(0, 2, false, 2); Consumer consumer3 = createConsumer(0, 2, false, 3); Consumer consumer4 = createConsumer(0, 2, false, 4); Consumer consumer5 = createConsumer(0, 1, true, 5); Consumer consumer6 = createConsumer(0, 2, true, 6); dispatcher.addConsumer(consumer1); dispatcher.addConsumer(consumer2); dispatcher.addConsumer(consumer3); dispatcher.addConsumer(consumer4); dispatcher.addConsumer(consumer5); dispatcher.addConsumer(consumer6); Assert.assertEquals(getNextConsumer(dispatcher), consumer1); Assert.assertEquals(getNextConsumer(dispatcher), consumer2); Assert.assertEquals(getNextConsumer(dispatcher), consumer3); Assert.assertEquals(getNextConsumer(dispatcher), consumer4); Assert.assertEquals(getNextConsumer(dispatcher), consumer1); Assert.assertEquals(getNextConsumer(dispatcher), consumer2); Assert.assertEquals(getNextConsumer(dispatcher), consumer3); Assert.assertEquals(getNextConsumer(dispatcher), consumer4); assertNull(getNextConsumer(dispatcher)); } @Test public void testFewBlockedConsumerDifferentPriority() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, null); Consumer consumer1 = createConsumer(0, 2, false, 1); Consumer consumer2 = createConsumer(0, 2, false, 2); Consumer consumer3 = createConsumer(0, 2, false, 3); Consumer consumer4 = createConsumer(0, 2, false, 4); Consumer consumer5 = createConsumer(0, 1, true, 5); Consumer consumer6 = createConsumer(0, 2, true, 6); Consumer consumer7 = createConsumer(1, 2, false, 7); Consumer consumer8 = createConsumer(1, 10, true, 8); Consumer consumer9 = createConsumer(1, 2, false, 9); Consumer consumer10 = createConsumer(2, 2, false, 10); Consumer consumer11 = createConsumer(2, 10, true, 11); Consumer consumer12 = createConsumer(2, 2, false, 12); dispatcher.addConsumer(consumer1); dispatcher.addConsumer(consumer2); dispatcher.addConsumer(consumer3); dispatcher.addConsumer(consumer4); dispatcher.addConsumer(consumer5); dispatcher.addConsumer(consumer6); dispatcher.addConsumer(consumer7); dispatcher.addConsumer(consumer8); dispatcher.addConsumer(consumer9); dispatcher.addConsumer(consumer10); dispatcher.addConsumer(consumer11); dispatcher.addConsumer(consumer12); Assert.assertEquals(getNextConsumer(dispatcher), consumer1); Assert.assertEquals(getNextConsumer(dispatcher), consumer2); Assert.assertEquals(getNextConsumer(dispatcher), consumer3); Assert.assertEquals(getNextConsumer(dispatcher), consumer4); Assert.assertEquals(getNextConsumer(dispatcher), consumer1); Assert.assertEquals(getNextConsumer(dispatcher), consumer2); Assert.assertEquals(getNextConsumer(dispatcher), consumer3); Assert.assertEquals(getNextConsumer(dispatcher), consumer4); Assert.assertEquals(getNextConsumer(dispatcher), consumer7); Assert.assertEquals(getNextConsumer(dispatcher), consumer9); Assert.assertEquals(getNextConsumer(dispatcher), consumer7); Assert.assertEquals(getNextConsumer(dispatcher), consumer9); Assert.assertEquals(getNextConsumer(dispatcher), consumer10); Assert.assertEquals(getNextConsumer(dispatcher), consumer12); // add consumer with lower priority again Consumer consumer13 = createConsumer(0, 2, false, 13); Consumer consumer14 = createConsumer(0, 2, true, 14); dispatcher.addConsumer(consumer13); dispatcher.addConsumer(consumer14); Assert.assertEquals(getNextConsumer(dispatcher), consumer13); Assert.assertEquals(getNextConsumer(dispatcher), consumer13); Assert.assertEquals(getNextConsumer(dispatcher), consumer10); Assert.assertEquals(getNextConsumer(dispatcher), consumer12); assertNull(getNextConsumer(dispatcher)); } @Test public void testFewBlockedConsumerDifferentPriority2() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, null); Consumer consumer1 = createConsumer(0, 2, true, 1); Consumer consumer2 = createConsumer(0, 2, true, 2); Consumer consumer3 = createConsumer(0, 2, true, 3); Consumer consumer4 = createConsumer(1, 2, false, 4); Consumer consumer5 = createConsumer(1, 1, false, 5); Consumer consumer6 = createConsumer(2, 1, false, 6); Consumer consumer7 = createConsumer(2, 2, true, 7); dispatcher.addConsumer(consumer1); dispatcher.addConsumer(consumer2); dispatcher.addConsumer(consumer3); dispatcher.addConsumer(consumer4); dispatcher.addConsumer(consumer5); dispatcher.addConsumer(consumer6); dispatcher.addConsumer(consumer7); Assert.assertEquals(getNextConsumer(dispatcher), consumer4); Assert.assertEquals(getNextConsumer(dispatcher), consumer5); Assert.assertEquals(getNextConsumer(dispatcher), consumer4); Assert.assertEquals(getNextConsumer(dispatcher), consumer6); assertNull(getNextConsumer(dispatcher)); } @SuppressWarnings("unchecked") private Consumer getNextConsumer(PersistentDispatcherMultipleConsumers dispatcher) throws Exception { Consumer consumer = dispatcher.getNextConsumer(); if (consumer != null) { Field field = Consumer.class.getDeclaredField("MESSAGE_PERMITS_UPDATER"); field.setAccessible(true); AtomicIntegerFieldUpdater messagePermits = (AtomicIntegerFieldUpdater) field.get(consumer); messagePermits.decrementAndGet(consumer); return consumer; } return null; } private Consumer createConsumer(int priority, int permit, boolean blocked, int id) throws Exception { Consumer consumer = new Consumer(null, SubType.Shared, "test-topic", id, priority, ""+id, 5000, serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest); try { consumer.flowPermits(permit); } catch (Exception e) { } // set consumer blocked flag Field blockField = Consumer.class.getDeclaredField("blockedConsumerOnUnackedMsgs"); blockField.setAccessible(true); blockField.set(consumer, blocked); return consumer; } private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherFailoverConsumerTest.class); }