From 81202e147f9460e3ab60d82fe0ad55510a258ed6 Mon Sep 17 00:00:00 2001 From: ran Date: Tue, 1 Sep 2020 11:54:21 +0800 Subject: [PATCH] support use `BitSet` generate the `BatchMessageAcker` (#7909) Motivation Currently, we have to know the batchSize to generate BatchMessageAcker. If we could get the batch index ack bitSet from Broker we could generate the BatchMessageAcker by the bitSet, this is useful for consuming transaction messages, we don't need to change the protocol to get the total message number of one transaction. Modifications Add a new static method to generate the BatchMessageAcker by BitSet. --- .../pulsar/client/impl/BatchMessageAcker.java | 5 ++++ .../pulsar/client/impl/ConsumerImpl.java | 9 ++++++-- ...sistentAcknowledgmentsGroupingTracker.java | 18 +++++++++++---- .../client/impl/BatchMessageAckerTest.java | 12 ++++++++++ .../ConcurrentBitSetRecyclable.java | 7 ++++++ .../ConcurrentBitSetRecyclableTest.java | 23 +++++++++++++++++++ 6 files changed, 68 insertions(+), 6 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java index d46d3b38bf7..e34d1a1b4d3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java @@ -34,6 +34,11 @@ class BatchMessageAcker { return new BatchMessageAcker(bitSet, batchSize); } + // Use the param bitSet as the BatchMessageAcker's bitSet, don't care about the batchSize. + static BatchMessageAcker newAcker(BitSet bitSet) { + return new BatchMessageAcker(bitSet, -1); + } + // bitset shared across messages in the same batch. private final int batchSize; private final BitSet bitSet; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index b1df6f8c82c..cfaaa89e76f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1356,16 +1356,21 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle // create ack tracker for entry aka batch MessageIdImpl batchMessage = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex()); - BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize); List> possibleToDeadLetter = null; if (deadLetterPolicy != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) { possibleToDeadLetter = new ArrayList<>(); } - int skippedMessages = 0; + + BatchMessageAcker acker; BitSetRecyclable ackBitSet = null; if (ackSet != null && ackSet.size() > 0) { ackBitSet = BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet)); + acker = BatchMessageAcker.newAcker(BitSet.valueOf(SafeCollectionUtils.longListToArray(ackSet))); + } else { + acker = BatchMessageAcker.newAcker(batchSize); } + + int skippedMessages = 0; try { int startBatchIndex = Math.max(messageId.getBatchIndex(), 0); for (int i = startBatchIndex; i < batchSize; ++i) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 6a4deef46cc..fd61c42ba61 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -168,8 +168,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments } else if (ackType == AckType.Individual) { ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), msgId.getPartitionIndex()), (v) -> { - ConcurrentBitSetRecyclable value = ConcurrentBitSetRecyclable.create(); - value.set(0, batchSize); + ConcurrentBitSetRecyclable value; + if (msgId.getAcker() != null && !(msgId.getAcker() instanceof BatchMessageAckerDisabled)) { + value = ConcurrentBitSetRecyclable.create(msgId.getAcker().getBitSet()); + } else { + value = ConcurrentBitSetRecyclable.create(); + value.set(0, batchSize); + } return value; }); bitSet.clear(batchIndex); @@ -221,8 +226,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments if (cnx == null) { return false; } - BitSetRecyclable bitSet = BitSetRecyclable.create(); - bitSet.set(0, batchSize); + BitSetRecyclable bitSet; + if (msgId.getAcker() != null && !(msgId.getAcker() instanceof BatchMessageAckerDisabled)) { + bitSet = BitSetRecyclable.valueOf(msgId.getAcker().getBitSet().toLongArray()); + } else { + bitSet = BitSetRecyclable.create(); + bitSet.set(0, batchSize); + } if (ackType == AckType.Cumulative) { bitSet.clear(0, batchIndex + 1); } else { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java index 2bfa620d43a..8c1565eb2cb 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java @@ -22,9 +22,12 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.BitSet; + public class BatchMessageAckerTest { private static final int BATCH_SIZE = 10; @@ -68,4 +71,13 @@ public class BatchMessageAckerTest { assertEquals(0, acker.getOutstandingAcks()); } + @Test + public void testBitSetAcker() { + BitSet bitSet = BitSet.valueOf(acker.getBitSet().toLongArray()); + BatchMessageAcker bitSetAcker = BatchMessageAcker.newAcker(bitSet); + + Assert.assertEquals(acker.getBitSet(), bitSetAcker.getBitSet()); + Assert.assertEquals(acker.getOutstandingAcks(), bitSetAcker.getOutstandingAcks()); + } + } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java index 8e787c1ea6e..21ee42bb9c0 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java @@ -20,6 +20,7 @@ package org.apache.pulsar.common.util.collections; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; +import java.util.BitSet; /** * Safe multithreaded version of {@code BitSet} and leverage netty recycler. @@ -43,6 +44,12 @@ public class ConcurrentBitSetRecyclable extends ConcurrentBitSet { return RECYCLER.get(); } + public static ConcurrentBitSetRecyclable create(BitSet bitSet) { + ConcurrentBitSetRecyclable recyclable = RECYCLER.get(); + recyclable.or(bitSet); + return recyclable; + } + public void recycle() { this.clear(); recyclerHandle.recycle(this); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java index b037c705fae..e9371767f2a 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.common.util.collections; +import java.util.BitSet; import org.testng.Assert; import org.testng.annotations.Test; @@ -34,4 +35,26 @@ public class ConcurrentBitSetRecyclableTest { Assert.assertFalse(bitset2.get(3)); Assert.assertNotSame(bitset3, bitset1); } + + @Test + public void testGenerateByBitSet() { + BitSet bitSet = new BitSet(); + ConcurrentBitSetRecyclable bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet); + Assert.assertEquals(bitSet, bitSetRecyclable); + + bitSet.set(0, 10); + bitSetRecyclable.recycle(); + bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet); + Assert.assertEquals(bitSet, bitSetRecyclable); + + bitSet.clear(5); + bitSetRecyclable.recycle(); + bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet); + Assert.assertEquals(bitSet, bitSetRecyclable); + + bitSet.clear(); + bitSetRecyclable.recycle(); + bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet); + Assert.assertEquals(bitSet, bitSetRecyclable); + } } -- GitLab