提交 d126b1d2 编写于 作者: R Rajan Dhabalia 提交者: xiaolong.ran

[pulsar-broker] Fix: race condition : Failed to read-more entries on dispatcher (#5391)

* [pulsar-broker] Fix: race condition : Failed to read-more entries on dispatcher

* clean up non-used method

(cherry picked from commit 13ea25a3)
上级 ae864d5f
...@@ -24,6 +24,7 @@ import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAG ...@@ -24,6 +24,7 @@ import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAG
import com.google.common.collect.ComparisonChain; import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
...@@ -296,14 +297,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul ...@@ -296,14 +297,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
} }
if (hasMessagesToReplay()) { Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
if (!messagesToReplayNow.isEmpty()) {
if (havePendingReplayRead) { if (havePendingReplayRead) {
log.debug("[{}] Skipping replay while awaiting previous read to complete", name); log.debug("[{}] Skipping replay while awaiting previous read to complete", name);
return; return;
} }
Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(), log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(),
consumerList.size()); consumerList.size());
...@@ -710,29 +711,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul ...@@ -710,29 +711,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime()); return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime());
} }
/**
* Returns whether we have any message that could be immediately replayed.
* This could be a message that was requested to be re-delivered or a delayed
* delivery.
*/
private boolean hasMessagesToReplay() {
if (!messagesToRedeliver.isEmpty()) {
return true;
}
if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) {
return true;
}
return false;
}
private synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) { private synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
if (!messagesToRedeliver.isEmpty()) { if (!messagesToRedeliver.isEmpty()) {
return messagesToRedeliver.items(maxMessagesToRead, return messagesToRedeliver.items(maxMessagesToRead,
(ledgerId, entryId) -> new PositionImpl(ledgerId, entryId)); (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId));
} else { } else if (delayedDeliveryTracker.isPresent()) {
return delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead); return delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
} else {
return Collections.emptySet();
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册