From 23d795c08ed54606e71ac8b1de1bc32420e2fbf1 Mon Sep 17 00:00:00 2001 From: ran Date: Tue, 18 Aug 2020 17:11:19 +0800 Subject: [PATCH] Fix pending batchIndexAcks bitSet batchSize in PersistentAcknowledgmentsGroupingTracker (#7828) ### Motivation The pending batchIndexAcks bitSet batchSize is not correct. ### Modifications Fix the bitSet batchSize. --- .../pulsar/client/impl/BatchMessageIndexAckTest.java | 12 ++++++++++-- .../PersistentAcknowledgmentsGroupingTracker.java | 3 +-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java index 8f765616cc6..ae10141e8e9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java @@ -28,6 +28,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.util.FutureUtil; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -58,13 +59,14 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase { } @Test - public void testBatchMessageIndexAckForSharedSubscription() throws PulsarClientException, ExecutionException, InterruptedException { + public void testBatchMessageIndexAckForSharedSubscription() throws Exception { final String topic = "testBatchMessageIndexAckForSharedSubscription"; + final String subscriptionName = "sub"; @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.INT32) .topic(topic) - .subscriptionName("sub") + .subscriptionName(subscriptionName) .receiverQueueSize(100) .subscriptionType(SubscriptionType.Shared) .enableBatchIndexAcknowledgment(true) @@ -115,6 +117,12 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase { Message moreMessage = consumer.receive(2, TimeUnit.SECONDS); Assert.assertNull(moreMessage); + // check the mark delete position was changed + BatchMessageIdImpl ackedMessageId = (BatchMessageIdImpl) received.get(0); + PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); + String markDeletePosition = stats.cursors.get(subscriptionName).markDeletePosition; + Assert.assertEquals(ackedMessageId.ledgerId + ":" + ackedMessageId.entryId, markDeletePosition); + futures.clear(); for (int i = 0; i < 50; i++) { futures.add(producer.sendAsync(i)); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 17e8c223ede..6a4deef46cc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -169,8 +169,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), msgId.getPartitionIndex()), (v) -> { ConcurrentBitSetRecyclable value = ConcurrentBitSetRecyclable.create(); - value.set(0, batchSize + 1); - value.clear(batchIndex); + value.set(0, batchSize); return value; }); bitSet.clear(batchIndex); -- GitLab