diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 24696e933466bb44d2a402e541c47ec351d293e1..5476b8dea7a9403dbc897c9243dc5a2ee1c6b57d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -361,7 +361,9 @@ public class PersistentSubscription implements Subscription { cursor.asyncDelete(positions, deleteCallback, positions); } - dispatcher.getRedeliveryTracker().removeBatch(positions); + if(dispatcher != null){ + dispatcher.getRedeliveryTracker().removeBatch(positions); + } } if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) { @@ -379,11 +381,15 @@ public class PersistentSubscription implements Subscription { if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) { // Notify all consumer that the end of topic was reached - dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic); + if(dispatcher != null){ + dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic); + } } // Signal the dispatchers to give chance to take extra actions - dispatcher.acknowledgementWasProcessed(); + if(dispatcher != null){ + dispatcher.acknowledgementWasProcessed(); + } } /**