未验证 提交 af3a7afd 编写于 作者: T Tboy 提交者: GitHub

Fix NPE when acknowledge messages at the broker side (#7878) (#7937)

上级 b4b77be5
...@@ -361,7 +361,9 @@ public class PersistentSubscription implements Subscription { ...@@ -361,7 +361,9 @@ public class PersistentSubscription implements Subscription {
cursor.asyncDelete(positions, deleteCallback, positions); cursor.asyncDelete(positions, deleteCallback, positions);
} }
dispatcher.getRedeliveryTracker().removeBatch(positions); if(dispatcher != null){
dispatcher.getRedeliveryTracker().removeBatch(positions);
}
} }
if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) { if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
...@@ -379,11 +381,15 @@ public class PersistentSubscription implements Subscription { ...@@ -379,11 +381,15 @@ public class PersistentSubscription implements Subscription {
if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) { if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) {
// Notify all consumer that the end of topic was reached // Notify all consumer that the end of topic was reached
dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic); if(dispatcher != null){
dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
}
} }
// Signal the dispatchers to give chance to take extra actions // Signal the dispatchers to give chance to take extra actions
dispatcher.acknowledgementWasProcessed(); if(dispatcher != null){
dispatcher.acknowledgementWasProcessed();
}
} }
/** /**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册