From c7ac08b13e95cd7bbbcf551d2112e9131153bd83 Mon Sep 17 00:00:00 2001 From: Yuri Mizushima Date: Wed, 2 Sep 2020 23:36:29 +0900 Subject: [PATCH] [pulsar-broker] Stop to dispatch when skip message temporally since Key_Shared consumer stuck on delivery (#7553) ### Motivation In some case of Key_Shared consumer, messages ordering was broken. Here is how to reproduce(I think it is one of case to reproduce this issue). 1. Connect Consumer1 to Key_Shared subscription `sub` and stop to receive - receiverQueueSize: 500 2. Connect Producer and publish 500 messages with key `(i % 10)` 3. Connect Consumer2 to same subscription and start to receive - receiverQueueSize: 1 - since https://github.com/apache/pulsar/pull/7106 , Consumer2 can't receive (expected) 4. Producer publish more 500 messages with same key generation algorithm 5. After that, Consumer1 start to receive 6. Check Consumer2 message ordering - sometimes message ordering was broken in same key Consumer1: ``` Connected: Tue Jul 14 09:36:39 JST 2020 [pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [persistent://public/default/key-shared-test] [sub0] [820f0] Prefetched messages: 499 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0 Received: my-message-0 PublishTime: 1594687006203 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-1 PublishTime: 1594687006243 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-2 PublishTime: 1594687006247 Date: Tue Jul 14 09:37:46 JST 2020 ... Received: my-message-498 PublishTime: 1594687008727 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-499 PublishTime: 1594687008731 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-500 PublishTime: 1594687038742 Date: Tue Jul 14 09:37:46 JST 2020 ... Received: my-message-990 PublishTime: 1594687040094 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-994 PublishTime: 1594687040103 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-995 PublishTime: 1594687040105 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-997 PublishTime: 1594687040113 Date: Tue Jul 14 09:37:46 JST 2020 ``` Consumer2: ``` Connected: Tue Jul 14 09:37:03 JST 2020 [pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider Received: my-message-501 MessageId: 4:1501:-1 PublishTime: 1594687038753 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-502 MessageId: 4:1502:-1 PublishTime: 1594687038755 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-503 MessageId: 4:1503:-1 PublishTime: 1594687038759 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-506 MessageId: 4:1506:-1 PublishTime: 1594687038785 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-508 MessageId: 4:1508:-1 PublishTime: 1594687038812 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-901 MessageId: 4:1901:-1 PublishTime: 1594687039871 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-509 MessageId: 4:1509:-1 PublishTime: 1594687038815 Date: Tue Jul 14 09:37:46 JST 2020 ordering was broken, key: 1 oldNum: 901 newNum: 511 Received: my-message-511 MessageId: 4:1511:-1 PublishTime: 1594687038826 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-512 MessageId: 4:1512:-1 PublishTime: 1594687038830 Date: Tue Jul 14 09:37:46 JST 2020 ... ``` I think this issue is caused by https://github.com/apache/pulsar/pull/7105. Here is an example. 1. dispatch messages 2. Consumer2 was stuck and `totalMessagesSent=0` - Consumer2 availablePermits was 0 3. skip redeliver messages temporally - Consumer2 availablePermits was back to 1 4. dispatch new messages - new message was dispatched to Consumer2 5. back to redeliver messages 4. dispatch messages - ordering was broken ### Modifications Stop to dispatch when skip message temporally since Key_Shared consumer stuck on delivery. --- ...tStickyKeyDispatcherMultipleConsumers.java | 26 ++++- ...ckyKeyDispatcherMultipleConsumersTest.java | 105 +++++++++++++++++- 2 files changed, 127 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index a56f5660c9e..095f9ba8042 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -25,6 +25,7 @@ import io.netty.util.concurrent.FastThreadLocal; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -64,12 +65,17 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi */ private final Map recentlyJoinedConsumers; + private final Set stuckConsumers; + private final Set nextStuckConsumers; + PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) { super(topic, cursor, subscription); this.allowOutOfOrderDelivery = ksm.getAllowOutOfOrderDelivery(); this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? Collections.emptyMap() : new HashMap<>(); + this.stuckConsumers = new HashSet<>(); + this.nextStuckConsumers = new HashSet<>(); switch (ksm.getKeySharedMode()) { case AUTO_SPLIT: @@ -143,6 +149,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi return; } + nextStuckConsumers.clear(); + final Map> groupedEntries = localGroupedEntries.get(); groupedEntries.clear(); @@ -217,11 +225,14 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi } } + stuckConsumers.clear(); + if (totalMessagesSent == 0 && recentlyJoinedConsumers.isEmpty()) { // This means, that all the messages we've just read cannot be dispatched right now. // This condition can only happen when: // 1. We have consumers ready to accept messages (otherwise the would not haven been triggered) // 2. All keys in the current set of messages are routing to consumers that are currently busy + // and stuck is not caused by stuckConsumers // // The solution here is to move on and read next batch of messages which might hopefully contain // also keys meant for other consumers. @@ -230,18 +241,31 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi // ahead in the stream while the new consumers are not ready to accept the new messages, // therefore would be most likely only increase the distance between read-position and mark-delete // position. - isDispatcherStuckOnReplays = true; + if (!nextStuckConsumers.isEmpty()) { + isDispatcherStuckOnReplays = true; + stuckConsumers.addAll(nextStuckConsumers); + } + // readMoreEntries should run regardless whether or not stuck is caused by stuckConsumers for avoid stopping dispatch. readMoreEntries(); } } private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List entries, int maxMessages) { if (maxMessages == 0) { + // the consumer was stuck + nextStuckConsumers.add(consumer); return 0; } PositionImpl maxReadPosition = recentlyJoinedConsumers.get(consumer); if (maxReadPosition == null) { + // stop to dispatch by stuckConsumers + if (stuckConsumers.contains(consumer)) { + if (log.isDebugEnabled()) { + log.debug("[{}] stop to dispatch by stuckConsumers, consumer: {}", name, consumer); + } + return 0; + } // The consumer has not recently joined, so we can send all messages return maxMessages; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index 6325f107390..c281400db0b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -41,6 +41,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.ObjectFactory; import org.testng.annotations.Test; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -49,6 +50,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; @PrepareForTest({ DispatchRateLimiter.class }) @@ -79,6 +81,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { configMock = mock(ServiceConfiguration.class); doReturn(true).when(configMock).isSubscriptionRedeliveryTrackerEnabled(); doReturn(100).when(configMock).getDispatcherMaxReadBatchSize(); + doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing(); + doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints(); pulsarMock = mock(PulsarService.class); doReturn(configMock).when(pulsarMock).getConfiguration(); @@ -96,6 +100,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { consumerMock = mock(Consumer.class); channelMock = mock(ChannelPromise.class); + doReturn("consumer1").when(consumerMock).consumerName(); doReturn(1000).when(consumerMock).getAvailablePermits(); doReturn(true).when(consumerMock).isWritable(); doReturn(channelMock).when(consumerMock).sendMessages( @@ -120,12 +125,17 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( topicMock, cursorMock, subscriptionMock, configMock, KeySharedMeta.getDefaultInstance()); - persistentDispatcher.addConsumer(consumerMock); - persistentDispatcher.consumerFlow(consumerMock, 1000); } @Test public void testSendMarkerMessage() { + try { + persistentDispatcher.addConsumer(consumerMock); + persistentDispatcher.consumerFlow(consumerMock, 1000); + } catch (Exception e) { + fail("Failed to add mock consumer", e); + } + List entries = new ArrayList<>(); ByteBuf markerMessage = Markers.newReplicatedSubscriptionsSnapshotRequest("testSnapshotId", "testSourceCluster"); entries.add(EntryImpl.create(1, 1, markerMessage)); @@ -156,11 +166,100 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5); } + @Test + public void testSkipRedeliverTemporally() { + final Consumer slowConsumerMock = mock(Consumer.class); + final ChannelPromise slowChannelMock = mock(ChannelPromise.class); + // add entries to redeliver and read target + final List redeliverEntries = new ArrayList<>(); + redeliverEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key1"))); + final List readEntries = new ArrayList<>(); + readEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, "key1"))); + readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key2"))); + + try { + Field totalAvailablePermitsField = PersistentDispatcherMultipleConsumers.class.getDeclaredField("totalAvailablePermits"); + totalAvailablePermitsField.setAccessible(true); + totalAvailablePermitsField.set(persistentDispatcher, 1000); + + doAnswer(invocationOnMock -> { + ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2)) + .readEntriesComplete(readEntries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal); + return null; + }).when(cursorMock).asyncReadEntriesOrWait( + anyInt(), anyLong(), any(PersistentStickyKeyDispatcherMultipleConsumers.class), + eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal)); + } catch (Exception e) { + fail("Failed to set to field", e); + } + + // Create 2Consumers + try { + doReturn("consumer2").when(slowConsumerMock).consumerName(); + // Change slowConsumer availablePermits to 0 and back to normal + when(slowConsumerMock.getAvailablePermits()) + .thenReturn(0) + .thenReturn(1); + doReturn(true).when(slowConsumerMock).isWritable(); + doReturn(slowChannelMock).when(slowConsumerMock).sendMessages( + anyList(), + any(EntryBatchSizes.class), + any(EntryBatchIndexesAcks.class), + anyInt(), + anyLong(), + anyLong(), + any(RedeliveryTracker.class) + ); + + persistentDispatcher.addConsumer(consumerMock); + persistentDispatcher.addConsumer(slowConsumerMock); + } catch (Exception e) { + fail("Failed to add mock consumer", e); + } + + // run PersistentStickyKeyDispatcherMultipleConsumers#sendMessagesToConsumers + // run readMoreEntries internally (and skip internally) + // Change slowConsumer availablePermits to 1 + // run PersistentStickyKeyDispatcherMultipleConsumers#sendMessagesToConsumers internally + // and then stop to dispatch to slowConsumer + persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal, redeliverEntries); + + verify(consumerMock, times(1)).sendMessages( + argThat(arg -> { + assertEquals(arg.size(), 1); + Entry entry = arg.get(0); + assertEquals(entry.getLedgerId(), 1); + assertEquals(entry.getEntryId(), 3); + return true; + }), + any(EntryBatchSizes.class), + any(EntryBatchIndexesAcks.class), + anyInt(), + anyLong(), + anyLong(), + any(RedeliveryTracker.class) + ); + verify(slowConsumerMock, times(0)).sendMessages( + anyList(), + any(EntryBatchSizes.class), + any(EntryBatchIndexesAcks.class), + anyInt(), + anyLong(), + anyLong(), + any(RedeliveryTracker.class) + ); + } + private ByteBuf createMessage(String message, int sequenceId) { + return createMessage(message, sequenceId, "testKey"); + } + + private ByteBuf createMessage(String message, int sequenceId, String key) { PulsarApi.MessageMetadata.Builder messageMetadata = PulsarApi.MessageMetadata.newBuilder(); messageMetadata.setSequenceId(sequenceId); messageMetadata.setProducerName("testProducer"); - messageMetadata.setPartitionKey("testKey"); + messageMetadata.setPartitionKey(key); + messageMetadata.setPartitionKeyB64Encoded(false); messageMetadata.setPublishTime(System.currentTimeMillis()); return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata.build(), Unpooled.copiedBuffer(message.getBytes(UTF_8))); } -- GitLab