提交 a63f9913 编写于 作者: L lipenghui 提交者: Sijie Guo

Fix read 0 entries cause message stop dispatch (#5894)

### Motivation

Fix topic stop dispatch messages to consumers after read or replay 0 entries from bookie
fter check the broker log, i find following logs:
```
23:18:27.125 [bookkeeper-ml-workers-OrderedExecutor-0-0] DEBUG org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://message-platform/client-metrics/monitor-metrics / metrics-report] Distributing 0 messages to 4 consumers
```
This will cause do not continue reading more entries

https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L460

Because readMoreEntries is called at the end of sendMessagesToConsumers, if the consumer no longer sends a permit request because it has not received any messages, dispatcher will stop dispatch any messages.

### Modifications

If read 0 entries from bookie, trigger readMoreEntries
上级 616d4590
......@@ -456,14 +456,18 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
if (entries == null || entries.size() == 0) {
return;
}
if (needTrimAckedMessages()) {
cursor.trimDeletedEntries(entries);
}
int start = 0;
int entriesToDispatch = entries.size();
// Trigger read more messages
if (entriesToDispatch == 0) {
readMoreEntries();
return;
}
int start = 0;
long totalMessagesSent = 0;
long totalBytesSent = 0;
......
......@@ -66,86 +66,89 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
long totalMessagesSent = 0;
long totalBytesSent = 0;
if (entries.size() > 0) {
final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
for (Entry entry : entries) {
int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
groupedEntries.putIfAbsent(key, new ArrayList<>());
groupedEntries.get(key).add(entry);
// Trigger read more messages
if (entries.size() == 0) {
readMoreEntries();
return;
}
final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
for (Entry entry : entries) {
int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
groupedEntries.putIfAbsent(key, new ArrayList<>());
groupedEntries.get(key).add(entry);
}
final Iterator<Map.Entry<Integer, List<Entry>>> iterator = groupedEntries.entrySet().iterator();
AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
while (iterator.hasNext() && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
//TODO: None key policy
Consumer consumer = selector.select(entriesWithSameKey.getKey());
if (consumer == null) {
// Do nothing, cursor will be rewind at reconnection
log.info("[{}] rewind because no available consumer found for key {} from total {}", name,
entriesWithSameKey.getKey(), consumerList.size());
entriesWithSameKey.getValue().forEach(Entry::release);
cursor.rewind();
return;
}
final Iterator<Map.Entry<Integer, List<Entry>>> iterator = groupedEntries.entrySet().iterator();
AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
while (iterator.hasNext() && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
//TODO: None key policy
Consumer consumer = selector.select(entriesWithSameKey.getKey());
if (consumer == null) {
// Do nothing, cursor will be rewind at reconnection
log.info("[{}] rewind because no available consumer found for key {} from total {}", name,
entriesWithSameKey.getKey(), consumerList.size());
entriesWithSameKey.getValue().forEach(Entry::release);
cursor.rewind();
return;
}
int messagesForC = Math.min(entriesWithSameKey.getValue().size(), consumer.getAvailablePermits());
if (log.isDebugEnabled()) {
log.debug("[{}] select consumer {} for key {} with messages num {}, read type is {}",
name, consumer.consumerName(), entriesWithSameKey.getKey(), messagesForC, readType);
int messagesForC = Math.min(entriesWithSameKey.getValue().size(), consumer.getAvailablePermits());
if (log.isDebugEnabled()) {
log.debug("[{}] select consumer {} for key {} with messages num {}, read type is {}",
name, consumer.consumerName(), entriesWithSameKey.getKey(), messagesForC, readType);
}
if (messagesForC > 0) {
// remove positions first from replay list first : sendMessages recycles entries
List<Entry> subList = new ArrayList<>(entriesWithSameKey.getValue().subList(0, messagesForC));
if (readType == ReadType.Replay) {
subList.forEach(entry -> messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId()));
}
if (messagesForC > 0) {
// remove positions first from replay list first : sendMessages recycles entries
List<Entry> subList = new ArrayList<>(entriesWithSameKey.getValue().subList(0, messagesForC));
if (readType == ReadType.Replay) {
subList.forEach(entry -> messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId()));
}
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(subList.size());
filterEntriesForConsumer(subList, batchSizes, sendMessageInfo);
consumer.sendMessages(subList, batchSizes, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), getRedeliveryTracker()).addListener(future -> {
if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
readMoreEntries();
}
});
entriesWithSameKey.getValue().removeAll(subList);
totalAvailablePermits -= sendMessageInfo.getTotalMessages();
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();
if (entriesWithSameKey.getValue().size() == 0) {
iterator.remove();
}
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(subList.size());
filterEntriesForConsumer(subList, batchSizes, sendMessageInfo);
consumer.sendMessages(subList, batchSizes, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), getRedeliveryTracker()).addListener(future -> {
if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
readMoreEntries();
}
});
entriesWithSameKey.getValue().removeAll(subList);
totalAvailablePermits -= sendMessageInfo.getTotalMessages();
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();
if (entriesWithSameKey.getValue().size() == 0) {
iterator.remove();
}
}
}
// acquire message-dispatch permits for already delivered messages
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
}
// acquire message-dispatch permits for already delivered messages
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
}
if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
}
if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
}
}
if (groupedEntries.size() > 0) {
int laterReplay = 0;
for (List<Entry> entryList : groupedEntries.values()) {
laterReplay += entryList.size();
entryList.forEach(entry -> {
messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId());
entry.release();
});
}
if (log.isDebugEnabled()) {
log.debug("[{}] No consumers found with available permits, storing {} positions for later replay", name,
laterReplay);
}
if (groupedEntries.size() > 0) {
int laterReplay = 0;
for (List<Entry> entryList : groupedEntries.values()) {
laterReplay += entryList.size();
entryList.forEach(entry -> {
messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId());
entry.release();
});
}
if (log.isDebugEnabled()) {
log.debug("[{}] No consumers found with available permits, storing {} positions for later replay", name,
laterReplay);
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册