未验证 提交 be7193ea 编写于 作者: L lipenghui 提交者: guangning

Improve backlogSize stats in the topic. (#6700)

### Motivation

When all subscriptions have no backlogs, but the backlog size of the topic stats is not 0. So this PR improves the backlog size calculation of the managed ledger.

### Modifications

If all entries are consumed, return the ledger size as the consumed size.

### Verifying this change

A new unit test added.

(cherry picked from commit d72e383b)
上级 3a02f76c
......@@ -985,8 +985,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
if (ledgerEntries <= 0) {
return 0;
}
long averageSize = ledgerSize / ledgerEntries;
return consumedEntries >= 0 ? (consumedEntries + 1) * averageSize : 0;
if (ledgerEntries == (consumedEntries + 1)) {
return ledgerSize;
} else {
long averageSize = ledgerSize / ledgerEntries;
return consumedEntries >= 0 ? (consumedEntries + 1) * averageSize : 0;
}
}
@Override
......
......@@ -2249,4 +2249,31 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
Assert.assertTrue(admin.topics().getStats(topic).subscriptions.values().iterator().next().lastExpireTimestamp > 0L);
}
@Test
public void testBacklogSizeShouldBeZeroWhenConsumerAckedAllMessages() throws Exception {
final String topic = "persistent://prop-xyz/ns1/testBacklogSizeShouldBeZeroWhenConsumerAckedAllMessages";
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
final int messages = 33;
for (int i = 0; i < messages; i++) {
producer.send(new byte[1024 * i * 5]);
}
for (int i = 0; i < messages; i++) {
consumer.acknowledgeCumulative(consumer.receive());
}
// Wait ack send
Thread.sleep(1000);
TopicStats topicStats = admin.topics().getStats(topic);
assertEquals(topicStats.backlogSize, 0);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册