未验证 提交 c7ac08b1 编写于 作者: Y Yuri Mizushima 提交者: GitHub

[pulsar-broker] Stop to dispatch when skip message temporally since Key_Shared...

[pulsar-broker] Stop to dispatch when skip message temporally since Key_Shared consumer stuck on delivery (#7553)

### Motivation
In some case of Key_Shared consumer, messages ordering was broken.
Here is how to reproduce(I think it is one of case to reproduce this issue).

1. Connect Consumer1 to Key_Shared subscription `sub` and stop to receive
   - receiverQueueSize: 500
2. Connect Producer and publish 500 messages with key `(i % 10)`
3. Connect Consumer2 to same subscription and start to receive
   - receiverQueueSize: 1
   - since https://github.com/apache/pulsar/pull/7106 , Consumer2 can't receive (expected)
4. Producer publish more 500 messages with same key generation algorithm
5. After that, Consumer1 start to receive
6. Check Consumer2 message ordering
   - sometimes message ordering was broken in same key

Consumer1:
```
Connected: Tue Jul 14 09:36:39 JST 2020
[pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider
[pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [persistent://public/default/key-shared-test] [sub0] [820f0] Prefetched messages: 499 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
Received: my-message-0 PublishTime: 1594687006203 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-1 PublishTime: 1594687006243 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-2 PublishTime: 1594687006247 Date: Tue Jul 14 09:37:46 JST 2020
...
Received: my-message-498 PublishTime: 1594687008727 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-499 PublishTime: 1594687008731 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-500 PublishTime: 1594687038742 Date: Tue Jul 14 09:37:46 JST 2020
...
Received: my-message-990 PublishTime: 1594687040094 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-994 PublishTime: 1594687040103 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-995 PublishTime: 1594687040105 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-997 PublishTime: 1594687040113 Date: Tue Jul 14 09:37:46 JST 2020
```

Consumer2:
```
Connected: Tue Jul 14 09:37:03 JST 2020
[pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider
Received: my-message-501 MessageId: 4:1501:-1 PublishTime: 1594687038753 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-502 MessageId: 4:1502:-1 PublishTime: 1594687038755 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-503 MessageId: 4:1503:-1 PublishTime: 1594687038759 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-506 MessageId: 4:1506:-1 PublishTime: 1594687038785 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-508 MessageId: 4:1508:-1 PublishTime: 1594687038812 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-901 MessageId: 4:1901:-1 PublishTime: 1594687039871 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-509 MessageId: 4:1509:-1 PublishTime: 1594687038815 Date: Tue Jul 14 09:37:46 JST 2020
ordering was broken, key: 1 oldNum: 901 newNum: 511
Received: my-message-511 MessageId: 4:1511:-1 PublishTime: 1594687038826 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-512 MessageId: 4:1512:-1 PublishTime: 1594687038830 Date: Tue Jul 14 09:37:46 JST 2020
...
```

I think this issue is caused by https://github.com/apache/pulsar/pull/7105.
Here is an example.
1. dispatch messages
2. Consumer2 was stuck and `totalMessagesSent=0`
   - Consumer2 availablePermits was 0
3. skip redeliver messages temporally
   - Consumer2 availablePermits was back to 1
4. dispatch new messages
   - new message was dispatched to Consumer2
5. back to redeliver messages
4. dispatch messages
   - ordering was broken

### Modifications
Stop to dispatch when skip message temporally since Key_Shared consumer stuck on delivery.
上级 3ac98d8d
......@@ -25,6 +25,7 @@ import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -64,12 +65,17 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
*/
private final Map<Consumer, PositionImpl> recentlyJoinedConsumers;
private final Set<Consumer> stuckConsumers;
private final Set<Consumer> nextStuckConsumers;
PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) {
super(topic, cursor, subscription);
this.allowOutOfOrderDelivery = ksm.getAllowOutOfOrderDelivery();
this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? Collections.emptyMap() : new HashMap<>();
this.stuckConsumers = new HashSet<>();
this.nextStuckConsumers = new HashSet<>();
switch (ksm.getKeySharedMode()) {
case AUTO_SPLIT:
......@@ -143,6 +149,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
return;
}
nextStuckConsumers.clear();
final Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get();
groupedEntries.clear();
......@@ -217,11 +225,14 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
}
stuckConsumers.clear();
if (totalMessagesSent == 0 && recentlyJoinedConsumers.isEmpty()) {
// This means, that all the messages we've just read cannot be dispatched right now.
// This condition can only happen when:
// 1. We have consumers ready to accept messages (otherwise the would not haven been triggered)
// 2. All keys in the current set of messages are routing to consumers that are currently busy
// and stuck is not caused by stuckConsumers
//
// The solution here is to move on and read next batch of messages which might hopefully contain
// also keys meant for other consumers.
......@@ -230,18 +241,31 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
// ahead in the stream while the new consumers are not ready to accept the new messages,
// therefore would be most likely only increase the distance between read-position and mark-delete
// position.
isDispatcherStuckOnReplays = true;
if (!nextStuckConsumers.isEmpty()) {
isDispatcherStuckOnReplays = true;
stuckConsumers.addAll(nextStuckConsumers);
}
// readMoreEntries should run regardless whether or not stuck is caused by stuckConsumers for avoid stopping dispatch.
readMoreEntries();
}
}
private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages) {
if (maxMessages == 0) {
// the consumer was stuck
nextStuckConsumers.add(consumer);
return 0;
}
PositionImpl maxReadPosition = recentlyJoinedConsumers.get(consumer);
if (maxReadPosition == null) {
// stop to dispatch by stuckConsumers
if (stuckConsumers.contains(consumer)) {
if (log.isDebugEnabled()) {
log.debug("[{}] stop to dispatch by stuckConsumers, consumer: {}", name, consumer);
}
return 0;
}
// The consumer has not recently joined, so we can send all messages
return maxMessages;
}
......
......@@ -41,6 +41,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
......@@ -49,6 +50,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
@PrepareForTest({ DispatchRateLimiter.class })
......@@ -79,6 +81,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
configMock = mock(ServiceConfiguration.class);
doReturn(true).when(configMock).isSubscriptionRedeliveryTrackerEnabled();
doReturn(100).when(configMock).getDispatcherMaxReadBatchSize();
doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing();
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
pulsarMock = mock(PulsarService.class);
doReturn(configMock).when(pulsarMock).getConfiguration();
......@@ -96,6 +100,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
consumerMock = mock(Consumer.class);
channelMock = mock(ChannelPromise.class);
doReturn("consumer1").when(consumerMock).consumerName();
doReturn(1000).when(consumerMock).getAvailablePermits();
doReturn(true).when(consumerMock).isWritable();
doReturn(channelMock).when(consumerMock).sendMessages(
......@@ -120,12 +125,17 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock, KeySharedMeta.getDefaultInstance());
persistentDispatcher.addConsumer(consumerMock);
persistentDispatcher.consumerFlow(consumerMock, 1000);
}
@Test
public void testSendMarkerMessage() {
try {
persistentDispatcher.addConsumer(consumerMock);
persistentDispatcher.consumerFlow(consumerMock, 1000);
} catch (Exception e) {
fail("Failed to add mock consumer", e);
}
List<Entry> entries = new ArrayList<>();
ByteBuf markerMessage = Markers.newReplicatedSubscriptionsSnapshotRequest("testSnapshotId", "testSourceCluster");
entries.add(EntryImpl.create(1, 1, markerMessage));
......@@ -156,11 +166,100 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
}
@Test
public void testSkipRedeliverTemporally() {
final Consumer slowConsumerMock = mock(Consumer.class);
final ChannelPromise slowChannelMock = mock(ChannelPromise.class);
// add entries to redeliver and read target
final List<Entry> redeliverEntries = new ArrayList<>();
redeliverEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key1")));
final List<Entry> readEntries = new ArrayList<>();
readEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, "key1")));
readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key2")));
try {
Field totalAvailablePermitsField = PersistentDispatcherMultipleConsumers.class.getDeclaredField("totalAvailablePermits");
totalAvailablePermitsField.setAccessible(true);
totalAvailablePermitsField.set(persistentDispatcher, 1000);
doAnswer(invocationOnMock -> {
((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2))
.readEntriesComplete(readEntries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
return null;
}).when(cursorMock).asyncReadEntriesOrWait(
anyInt(), anyLong(), any(PersistentStickyKeyDispatcherMultipleConsumers.class),
eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal));
} catch (Exception e) {
fail("Failed to set to field", e);
}
// Create 2Consumers
try {
doReturn("consumer2").when(slowConsumerMock).consumerName();
// Change slowConsumer availablePermits to 0 and back to normal
when(slowConsumerMock.getAvailablePermits())
.thenReturn(0)
.thenReturn(1);
doReturn(true).when(slowConsumerMock).isWritable();
doReturn(slowChannelMock).when(slowConsumerMock).sendMessages(
anyList(),
any(EntryBatchSizes.class),
any(EntryBatchIndexesAcks.class),
anyInt(),
anyLong(),
anyLong(),
any(RedeliveryTracker.class)
);
persistentDispatcher.addConsumer(consumerMock);
persistentDispatcher.addConsumer(slowConsumerMock);
} catch (Exception e) {
fail("Failed to add mock consumer", e);
}
// run PersistentStickyKeyDispatcherMultipleConsumers#sendMessagesToConsumers
// run readMoreEntries internally (and skip internally)
// Change slowConsumer availablePermits to 1
// run PersistentStickyKeyDispatcherMultipleConsumers#sendMessagesToConsumers internally
// and then stop to dispatch to slowConsumer
persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal, redeliverEntries);
verify(consumerMock, times(1)).sendMessages(
argThat(arg -> {
assertEquals(arg.size(), 1);
Entry entry = arg.get(0);
assertEquals(entry.getLedgerId(), 1);
assertEquals(entry.getEntryId(), 3);
return true;
}),
any(EntryBatchSizes.class),
any(EntryBatchIndexesAcks.class),
anyInt(),
anyLong(),
anyLong(),
any(RedeliveryTracker.class)
);
verify(slowConsumerMock, times(0)).sendMessages(
anyList(),
any(EntryBatchSizes.class),
any(EntryBatchIndexesAcks.class),
anyInt(),
anyLong(),
anyLong(),
any(RedeliveryTracker.class)
);
}
private ByteBuf createMessage(String message, int sequenceId) {
return createMessage(message, sequenceId, "testKey");
}
private ByteBuf createMessage(String message, int sequenceId, String key) {
PulsarApi.MessageMetadata.Builder messageMetadata = PulsarApi.MessageMetadata.newBuilder();
messageMetadata.setSequenceId(sequenceId);
messageMetadata.setProducerName("testProducer");
messageMetadata.setPartitionKey("testKey");
messageMetadata.setPartitionKey(key);
messageMetadata.setPartitionKeyB64Encoded(false);
messageMetadata.setPublishTime(System.currentTimeMillis());
return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata.build(), Unpooled.copiedBuffer(message.getBytes(UTF_8)));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册