提交 15683fd0 编写于 作者: R Rajan 提交者: GitHub

Complete Message replay immediately if there is no entry to replay which will...

Complete Message replay immediately if there is no entry to replay which will not block reads from other consumer (#103)
上级 1f053681
......@@ -803,6 +803,14 @@ public class ManagedCursorImpl implements ManagedCursor {
return result.entries;
}
/**
* Async replays given positions:
* a. before reading it filters out already-acked messages
* b. reads remaining entries async and gives it to given ReadEntriesCallback
* c. returns all already-acked messages which are not replayed so, those messages can be removed by
* caller(Dispatcher)'s replay-list and it won't try to replay it again
*
*/
@Override
public Set<? extends Position> asyncReplayEntries(final Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx) {
List<Entry> entries = Lists.newArrayListWithExpectedSize(positions.size());
......
......@@ -173,6 +173,12 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
ReadType.Replay);
// clear already acked positions from replay bucket
messagesToReplay.removeAll(deletedMessages);
// if all the entries are acked-entries and cleared up from messagesToReplay, try to read
// next entries as readCompletedEntries-callback was never called
if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
havePendingReplayRead = false;
readMoreEntries();
}
} else if (!havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
......
......@@ -1129,4 +1129,85 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
producer.close();
}
/**
* Verify:
* 1. Broker should not replay already acknowledged messages
* 2. Dispatcher should not stuck while dispatching new messages due to previous-replay
* of invalid/already-acked messages
*
* @throws Exception
*/
@Test
public void testMessageReplay() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/topic2";
final String subName = "sub2";
Message msg;
int totalMessages = 10;
int replayIndex = totalMessages / 2;
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Shared);
conf.setReceiverQueueSize(1);
Consumer consumer = pulsarClient.subscribe(topicName, subName, conf);
Producer producer = pulsarClient.createProducer(topicName);
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName);
assertNotNull(topicRef);
PersistentSubscription subRef = topicRef.getPersistentSubscription(subName);
PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) subRef
.getDispatcher();
Field replayMap = PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToReplay");
replayMap.setAccessible(true);
TreeSet<PositionImpl> messagesToReplay = Sets.newTreeSet();
assertNotNull(subRef);
// (1) Produce messages
for (int i = 0; i < totalMessages; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
MessageIdImpl firstAckedMsg = null;
// (2) Consume and ack messages except first message
for (int i = 0; i < totalMessages; i++) {
msg = consumer.receive();
consumer.acknowledge(msg);
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
if (i == 0) {
firstAckedMsg = msgId;
}
if (i < replayIndex) {
// (3) accumulate acked messages for replay
messagesToReplay.add(new PositionImpl(msgId.getLedgerId(), msgId.getEntryId()));
}
}
// (4) redelivery : should redeliver only unacked messages
Thread.sleep(1000);
replayMap.set(dispatcher, messagesToReplay);
// (a) redelivery with all acked-message should clear messageReply bucket
dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0));
assertEquals(messagesToReplay.size(), 0);
// (b) fill messageReplyBucket with already acked entry again: and try to publish new msg and read it
messagesToReplay.add(new PositionImpl(firstAckedMsg.getLedgerId(), firstAckedMsg.getEntryId()));
replayMap.set(dispatcher, messagesToReplay);
// send new message
final String testMsg = "testMsg";
producer.send(testMsg.getBytes());
// consumer should be able to receive only new message and not the
dispatcher.consumerFlow(dispatcher.getConsumers().get(0), 1);
msg = consumer.receive(1, TimeUnit.SECONDS);
assertNotNull(msg);
assertEquals(msg.getData(), testMsg.getBytes());
consumer.close();
producer.close();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册