提交 0be16ea5 编写于 作者: L lipenghui 提交者: xiaolong.ran

Fix batch index filter issue in Consumer. (#7654)

### Motivation

Fix batch index filter issue in Consumer. The previous logic is wrong at https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1344, this should be opposite.

(cherry picked from commit e9a0fd1e)
上级 304924cd
......@@ -84,22 +84,35 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase {
}
FutureUtil.waitForAll(futures).get();
List<MessageId> acked = new ArrayList<>(50);
for (int i = 0; i < messages; i++) {
Message<Integer> msg = consumer.receive();
if (i % 2 == 0) {
consumer.acknowledge(consumer.receive());
consumer.acknowledge(msg);
acked.add(msg.getMessageId());
} else {
consumer.negativeAcknowledge(consumer.receive());
}
}
List<Message<Integer>> received = new ArrayList<>(50);
List<MessageId> received = new ArrayList<>(50);
for (int i = 0; i < 50; i++) {
received.add(consumer.receive());
received.add(consumer.receive().getMessageId());
}
Assert.assertEquals(received.size(), 50);
acked.retainAll(received);
Assert.assertEquals(acked.size(), 0);
Message<Integer> moreMessage = consumer.receive(1, TimeUnit.SECONDS);
for (MessageId messageId : received) {
consumer.acknowledge(messageId);
}
Thread.sleep(1000);
consumer.redeliverUnacknowledgedMessages();
Message<Integer> moreMessage = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNull(moreMessage);
futures.clear();
......@@ -109,7 +122,7 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase {
FutureUtil.waitForAll(futures).get();
for (int i = 0; i < 50; i++) {
received.add(consumer.receive());
received.add(consumer.receive().getMessageId());
}
// Ensure the flow permit is work well since the client skip the acked batch index,
......
......@@ -101,6 +101,7 @@ import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
......@@ -1304,6 +1305,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
possibleToDeadLetter = new ArrayList<>();
}
int skippedMessages = 0;
BitSetRecyclable ackBitSet = null;
if (ackSet != null && ackSet.size() > 0) {
ackBitSet = BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet));
}
try {
for (int i = 0; i < batchSize; ++i) {
if (log.isDebugEnabled()) {
......@@ -1337,7 +1342,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
continue;
}
if (ackSet != null && BitSet.valueOf(SafeCollectionUtils.longListToArray(ackSet)).get(i)) {
if (ackBitSet != null && !ackBitSet.get(i)) {
singleMessagePayload.release();
singleMessageMetadataBuilder.recycle();
++skippedMessages;
......@@ -1367,6 +1372,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
singleMessagePayload.release();
singleMessageMetadataBuilder.recycle();
}
if (ackBitSet != null) {
ackBitSet.recycle();
}
} catch (IOException e) {
log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName);
discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册