diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 685b86db170a22950019e25e0d5e63c21d2ac6df..644ae347897e9357968cd9e524720bbe8b6d4fd9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -985,8 +985,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (ledgerEntries <= 0) { return 0; } - long averageSize = ledgerSize / ledgerEntries; - return consumedEntries >= 0 ? (consumedEntries + 1) * averageSize : 0; + if (ledgerEntries == (consumedEntries + 1)) { + return ledgerSize; + } else { + long averageSize = ledgerSize / ledgerEntries; + return consumedEntries >= 0 ? (consumedEntries + 1) * averageSize : 0; + } } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index a73b65f1bf6a6290732e369e5fa259d4f23d148a..1598be75873faf252e914b72a2d85a286eddf8b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -2249,4 +2249,31 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { Assert.assertTrue(admin.topics().getStats(topic).subscriptions.values().iterator().next().lastExpireTimestamp > 0L); } + + @Test + public void testBacklogSizeShouldBeZeroWhenConsumerAckedAllMessages() throws Exception { + final String topic = "persistent://prop-xyz/ns1/testBacklogSizeShouldBeZeroWhenConsumerAckedAllMessages"; + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub-1") + .subscribe(); + Producer producer = pulsarClient.newProducer() + .topic(topic) + .create(); + + final int messages = 33; + for (int i = 0; i < messages; i++) { + producer.send(new byte[1024 * i * 5]); + } + + for (int i = 0; i < messages; i++) { + consumer.acknowledgeCumulative(consumer.receive()); + } + + // Wait ack send + Thread.sleep(1000); + + TopicStats topicStats = admin.topics().getStats(topic); + assertEquals(topicStats.backlogSize, 0); + } }