diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java index 582d46188cebb7a110e39063028d35f062ac9042..8f765616cc6dc5795f70ae66ac185bf8e47a8fd7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java @@ -84,22 +84,35 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase { } FutureUtil.waitForAll(futures).get(); + List acked = new ArrayList<>(50); for (int i = 0; i < messages; i++) { + Message msg = consumer.receive(); if (i % 2 == 0) { - consumer.acknowledge(consumer.receive()); + consumer.acknowledge(msg); + acked.add(msg.getMessageId()); } else { consumer.negativeAcknowledge(consumer.receive()); } } - List> received = new ArrayList<>(50); + List received = new ArrayList<>(50); for (int i = 0; i < 50; i++) { - received.add(consumer.receive()); + received.add(consumer.receive().getMessageId()); } Assert.assertEquals(received.size(), 50); + acked.retainAll(received); + Assert.assertEquals(acked.size(), 0); - Message moreMessage = consumer.receive(1, TimeUnit.SECONDS); + for (MessageId messageId : received) { + consumer.acknowledge(messageId); + } + + Thread.sleep(1000); + + consumer.redeliverUnacknowledgedMessages(); + + Message moreMessage = consumer.receive(2, TimeUnit.SECONDS); Assert.assertNull(moreMessage); futures.clear(); @@ -109,7 +122,7 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase { FutureUtil.waitForAll(futures).get(); for (int i = 0; i < 50; i++) { - received.add(consumer.receive()); + received.add(consumer.receive().getMessageId()); } // Ensure the flow permit is work well since the client skip the acked batch index, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 28995ad7949687ea32979dfaa186fd42badec498..f9546e99a395ad6e3602bcd70b76402aba4f4747 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -101,6 +101,7 @@ import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.SafeCollectionUtils; +import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.slf4j.Logger; @@ -1304,6 +1305,10 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle possibleToDeadLetter = new ArrayList<>(); } int skippedMessages = 0; + BitSetRecyclable ackBitSet = null; + if (ackSet != null && ackSet.size() > 0) { + ackBitSet = BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet)); + } try { for (int i = 0; i < batchSize; ++i) { if (log.isDebugEnabled()) { @@ -1337,7 +1342,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle continue; } - if (ackSet != null && BitSet.valueOf(SafeCollectionUtils.longListToArray(ackSet)).get(i)) { + if (ackBitSet != null && !ackBitSet.get(i)) { singleMessagePayload.release(); singleMessageMetadataBuilder.recycle(); ++skippedMessages; @@ -1367,6 +1372,9 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle singleMessagePayload.release(); singleMessageMetadataBuilder.recycle(); } + if (ackBitSet != null) { + ackBitSet.recycle(); + } } catch (IOException e) { log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName); discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError);