diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index a56f5660c9e843a6ccab21afe340351cb9793dea..095f9ba80422a15245c48e174a29972204913578 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -25,6 +25,7 @@ import io.netty.util.concurrent.FastThreadLocal; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -64,12 +65,17 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi */ private final Map recentlyJoinedConsumers; + private final Set stuckConsumers; + private final Set nextStuckConsumers; + PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) { super(topic, cursor, subscription); this.allowOutOfOrderDelivery = ksm.getAllowOutOfOrderDelivery(); this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? Collections.emptyMap() : new HashMap<>(); + this.stuckConsumers = new HashSet<>(); + this.nextStuckConsumers = new HashSet<>(); switch (ksm.getKeySharedMode()) { case AUTO_SPLIT: @@ -143,6 +149,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi return; } + nextStuckConsumers.clear(); + final Map> groupedEntries = localGroupedEntries.get(); groupedEntries.clear(); @@ -217,11 +225,14 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi } } + stuckConsumers.clear(); + if (totalMessagesSent == 0 && recentlyJoinedConsumers.isEmpty()) { // This means, that all the messages we've just read cannot be dispatched right now. // This condition can only happen when: // 1. We have consumers ready to accept messages (otherwise the would not haven been triggered) // 2. All keys in the current set of messages are routing to consumers that are currently busy + // and stuck is not caused by stuckConsumers // // The solution here is to move on and read next batch of messages which might hopefully contain // also keys meant for other consumers. @@ -230,18 +241,31 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi // ahead in the stream while the new consumers are not ready to accept the new messages, // therefore would be most likely only increase the distance between read-position and mark-delete // position. - isDispatcherStuckOnReplays = true; + if (!nextStuckConsumers.isEmpty()) { + isDispatcherStuckOnReplays = true; + stuckConsumers.addAll(nextStuckConsumers); + } + // readMoreEntries should run regardless whether or not stuck is caused by stuckConsumers for avoid stopping dispatch. readMoreEntries(); } } private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List entries, int maxMessages) { if (maxMessages == 0) { + // the consumer was stuck + nextStuckConsumers.add(consumer); return 0; } PositionImpl maxReadPosition = recentlyJoinedConsumers.get(consumer); if (maxReadPosition == null) { + // stop to dispatch by stuckConsumers + if (stuckConsumers.contains(consumer)) { + if (log.isDebugEnabled()) { + log.debug("[{}] stop to dispatch by stuckConsumers, consumer: {}", name, consumer); + } + return 0; + } // The consumer has not recently joined, so we can send all messages return maxMessages; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index 6325f1073903499abcbfccb75b24e3c6956087cd..c281400db0bc3c5f4bf1a68916b1bd6b03dfbfba 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -41,6 +41,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.ObjectFactory; import org.testng.annotations.Test; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -49,6 +50,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; @PrepareForTest({ DispatchRateLimiter.class }) @@ -79,6 +81,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { configMock = mock(ServiceConfiguration.class); doReturn(true).when(configMock).isSubscriptionRedeliveryTrackerEnabled(); doReturn(100).when(configMock).getDispatcherMaxReadBatchSize(); + doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing(); + doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints(); pulsarMock = mock(PulsarService.class); doReturn(configMock).when(pulsarMock).getConfiguration(); @@ -96,6 +100,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { consumerMock = mock(Consumer.class); channelMock = mock(ChannelPromise.class); + doReturn("consumer1").when(consumerMock).consumerName(); doReturn(1000).when(consumerMock).getAvailablePermits(); doReturn(true).when(consumerMock).isWritable(); doReturn(channelMock).when(consumerMock).sendMessages( @@ -120,12 +125,17 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( topicMock, cursorMock, subscriptionMock, configMock, KeySharedMeta.getDefaultInstance()); - persistentDispatcher.addConsumer(consumerMock); - persistentDispatcher.consumerFlow(consumerMock, 1000); } @Test public void testSendMarkerMessage() { + try { + persistentDispatcher.addConsumer(consumerMock); + persistentDispatcher.consumerFlow(consumerMock, 1000); + } catch (Exception e) { + fail("Failed to add mock consumer", e); + } + List entries = new ArrayList<>(); ByteBuf markerMessage = Markers.newReplicatedSubscriptionsSnapshotRequest("testSnapshotId", "testSourceCluster"); entries.add(EntryImpl.create(1, 1, markerMessage)); @@ -156,11 +166,100 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5); } + @Test + public void testSkipRedeliverTemporally() { + final Consumer slowConsumerMock = mock(Consumer.class); + final ChannelPromise slowChannelMock = mock(ChannelPromise.class); + // add entries to redeliver and read target + final List redeliverEntries = new ArrayList<>(); + redeliverEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key1"))); + final List readEntries = new ArrayList<>(); + readEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, "key1"))); + readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key2"))); + + try { + Field totalAvailablePermitsField = PersistentDispatcherMultipleConsumers.class.getDeclaredField("totalAvailablePermits"); + totalAvailablePermitsField.setAccessible(true); + totalAvailablePermitsField.set(persistentDispatcher, 1000); + + doAnswer(invocationOnMock -> { + ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2)) + .readEntriesComplete(readEntries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal); + return null; + }).when(cursorMock).asyncReadEntriesOrWait( + anyInt(), anyLong(), any(PersistentStickyKeyDispatcherMultipleConsumers.class), + eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal)); + } catch (Exception e) { + fail("Failed to set to field", e); + } + + // Create 2Consumers + try { + doReturn("consumer2").when(slowConsumerMock).consumerName(); + // Change slowConsumer availablePermits to 0 and back to normal + when(slowConsumerMock.getAvailablePermits()) + .thenReturn(0) + .thenReturn(1); + doReturn(true).when(slowConsumerMock).isWritable(); + doReturn(slowChannelMock).when(slowConsumerMock).sendMessages( + anyList(), + any(EntryBatchSizes.class), + any(EntryBatchIndexesAcks.class), + anyInt(), + anyLong(), + anyLong(), + any(RedeliveryTracker.class) + ); + + persistentDispatcher.addConsumer(consumerMock); + persistentDispatcher.addConsumer(slowConsumerMock); + } catch (Exception e) { + fail("Failed to add mock consumer", e); + } + + // run PersistentStickyKeyDispatcherMultipleConsumers#sendMessagesToConsumers + // run readMoreEntries internally (and skip internally) + // Change slowConsumer availablePermits to 1 + // run PersistentStickyKeyDispatcherMultipleConsumers#sendMessagesToConsumers internally + // and then stop to dispatch to slowConsumer + persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal, redeliverEntries); + + verify(consumerMock, times(1)).sendMessages( + argThat(arg -> { + assertEquals(arg.size(), 1); + Entry entry = arg.get(0); + assertEquals(entry.getLedgerId(), 1); + assertEquals(entry.getEntryId(), 3); + return true; + }), + any(EntryBatchSizes.class), + any(EntryBatchIndexesAcks.class), + anyInt(), + anyLong(), + anyLong(), + any(RedeliveryTracker.class) + ); + verify(slowConsumerMock, times(0)).sendMessages( + anyList(), + any(EntryBatchSizes.class), + any(EntryBatchIndexesAcks.class), + anyInt(), + anyLong(), + anyLong(), + any(RedeliveryTracker.class) + ); + } + private ByteBuf createMessage(String message, int sequenceId) { + return createMessage(message, sequenceId, "testKey"); + } + + private ByteBuf createMessage(String message, int sequenceId, String key) { PulsarApi.MessageMetadata.Builder messageMetadata = PulsarApi.MessageMetadata.newBuilder(); messageMetadata.setSequenceId(sequenceId); messageMetadata.setProducerName("testProducer"); - messageMetadata.setPartitionKey("testKey"); + messageMetadata.setPartitionKey(key); + messageMetadata.setPartitionKeyB64Encoded(false); messageMetadata.setPublishTime(System.currentTimeMillis()); return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata.build(), Unpooled.copiedBuffer(message.getBytes(UTF_8))); }