提交 ae864d5f 编写于 作者: L lipenghui 提交者: xiaolong.ran

Trim messages which less than mark delete position for message redelivery (#5378)

* Trim messages which less than mark delete position for message redelivery.


(cherry picked from commit b11211f4)
上级 d92da4e6
......@@ -967,19 +967,24 @@ public class PersistentSubscription implements Subscription {
(PositionImpl) this.pendingCumulativeAckMessage;
positions.forEach(position -> {
if ((pendingAckMessages == null || (pendingAckMessages != null &&
!this.pendingAckMessages.contains(position))) &&
(null == cumulativeAckPosition ||
(null != cumulativeAckPosition && position.compareTo(cumulativeAckPosition) > 0))) {
if ((pendingAckMessages == null || !this.pendingAckMessages.contains(position))
&& (null == cumulativeAckPosition || position.compareTo(cumulativeAckPosition) > 0)) {
pendingPositions.add(position);
}
});
trimByMarkDeletePosition(pendingPositions);
dispatcher.redeliverUnacknowledgedMessages(consumer, pendingPositions);
} else {
trimByMarkDeletePosition(positions);
dispatcher.redeliverUnacknowledgedMessages(consumer, positions);
}
}
private void trimByMarkDeletePosition(List<PositionImpl> positions) {
positions.removeIf(position -> cursor.getMarkDeletedPosition() != null
&& position.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) <= 0);
}
@Override
public void addUnAckedMessages(int unAckMessages) {
dispatcher.addUnAckedMessages(unAckMessages);
......
......@@ -20,6 +20,8 @@ package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
......@@ -32,7 +34,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
......@@ -216,4 +221,40 @@ public class MessageRedeliveryTest extends ProducerConsumerBase {
}
@Test
public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException, PulsarAdminException {
final String topic = "testDoNotRedeliveryMarkDeleteMessages";
final String subName = "my-sub";
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();
producer.send("Pulsar".getBytes());
for (int i = 0; i < 2; i++) {
Message message = consumer.receive();
assertNotNull(message);
}
admin.topics().skipAllMessages(topic, subName);
Message message = null;
try {
message = consumer.receive(2, TimeUnit.SECONDS);
} catch (Exception ignore) {
}
assertNull(message);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册