提交 8ab57c96 编写于 作者: R Rajan 提交者: GitHub

Merge pull request #102 from rdhabalia/stuck

Fix: complete MessageReplay immediately if there is no entry to replay which will not block further reads as current Replay is finished
......@@ -22,7 +22,7 @@
<parent>
<groupId>com.yahoo.pulsar</groupId>
<artifactId>pulsar</artifactId>
<version>1.15.1</version>
<version>1.15.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>com.yahoo.pulsar</groupId>
<artifactId>pulsar</artifactId>
<version>1.15.1</version>
<version>1.15.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
......
......@@ -803,6 +803,14 @@ public class ManagedCursorImpl implements ManagedCursor {
return result.entries;
}
/**
* Async replays given positions:
* a. before reading it filters out already-acked messages
* b. reads remaining entries async and gives it to given ReadEntriesCallback
* c. returns all already-acked messages which are not replayed so, those messages can be removed by
* caller(Dispatcher)'s replay-list and it won't try to replay it again
*
*/
@Override
public Set<? extends Position> asyncReplayEntries(final Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx) {
List<Entry> entries = Lists.newArrayListWithExpectedSize(positions.size());
......
......@@ -24,7 +24,7 @@
<groupId>com.yahoo.pulsar</groupId>
<artifactId>pulsar</artifactId>
<version>1.15.1</version>
<version>1.15.2-SNAPSHOT</version>
<name>Pulsar</name>
<description>Pulsar is a distributed pub-sub messaging platform with a very
......
......@@ -23,7 +23,7 @@
<parent>
<groupId>com.yahoo.pulsar</groupId>
<artifactId>pulsar</artifactId>
<version>1.15.1</version>
<version>1.15.2-SNAPSHOT</version>
</parent>
<artifactId>pulsar-broker-common</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>com.yahoo.pulsar</groupId>
<artifactId>pulsar</artifactId>
<version>1.15.1</version>
<version>1.15.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
......
......@@ -173,6 +173,12 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
ReadType.Replay);
// clear already acked positions from replay bucket
messagesToReplay.removeAll(deletedMessages);
// if all the entries are acked-entries and cleared up from messagesToReplay, try to read
// next entries as readCompletedEntries-callback was never called
if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
havePendingReplayRead = false;
readMoreEntries();
}
} else if (!havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
......
......@@ -1129,4 +1129,85 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
producer.close();
}
/**
* Verify:
* 1. Broker should not replay already acknowledged messages
* 2. Dispatcher should not stuck while dispatching new messages due to previous-replay
* of invalid/already-acked messages
*
* @throws Exception
*/
@Test
public void testMessageReplay() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/topic2";
final String subName = "sub2";
Message msg;
int totalMessages = 10;
int replayIndex = totalMessages / 2;
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Shared);
conf.setReceiverQueueSize(1);
Consumer consumer = pulsarClient.subscribe(topicName, subName, conf);
Producer producer = pulsarClient.createProducer(topicName);
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName);
assertNotNull(topicRef);
PersistentSubscription subRef = topicRef.getPersistentSubscription(subName);
PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) subRef
.getDispatcher();
Field replayMap = PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToReplay");
replayMap.setAccessible(true);
TreeSet<PositionImpl> messagesToReplay = Sets.newTreeSet();
assertNotNull(subRef);
// (1) Produce messages
for (int i = 0; i < totalMessages; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
MessageIdImpl firstAckedMsg = null;
// (2) Consume and ack messages except first message
for (int i = 0; i < totalMessages; i++) {
msg = consumer.receive();
consumer.acknowledge(msg);
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
if (i == 0) {
firstAckedMsg = msgId;
}
if (i < replayIndex) {
// (3) accumulate acked messages for replay
messagesToReplay.add(new PositionImpl(msgId.getLedgerId(), msgId.getEntryId()));
}
}
// (4) redelivery : should redeliver only unacked messages
Thread.sleep(1000);
replayMap.set(dispatcher, messagesToReplay);
// (a) redelivery with all acked-message should clear messageReply bucket
dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0));
assertEquals(messagesToReplay.size(), 0);
// (b) fill messageReplyBucket with already acked entry again: and try to publish new msg and read it
messagesToReplay.add(new PositionImpl(firstAckedMsg.getLedgerId(), firstAckedMsg.getEntryId()));
replayMap.set(dispatcher, messagesToReplay);
// send new message
final String testMsg = "testMsg";
producer.send(testMsg.getBytes());
// consumer should be able to receive only new message and not the
dispatcher.consumerFlow(dispatcher.getConsumers().get(0), 1);
msg = consumer.receive(1, TimeUnit.SECONDS);
assertNotNull(msg);
assertEquals(msg.getData(), testMsg.getBytes());
consumer.close();
producer.close();
}
}
......@@ -23,7 +23,7 @@
<parent>
<groupId>com.yahoo.pulsar</groupId>
<artifactId>pulsar</artifactId>
<version>1.15.1</version>
<version>1.15.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
......
......@@ -23,7 +23,7 @@
<parent>
<groupId>com.yahoo.pulsar</groupId>
<artifactId>pulsar</artifactId>
<version>1.15.1</version>
<version>1.15.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
......
......@@ -21,7 +21,7 @@
<parent>
<groupId>com.yahoo.pulsar</groupId>
<artifactId>pulsar</artifactId>
<version>1.15.1</version>
<version>1.15.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>com.yahoo.pulsar</groupId>
<artifactId>pulsar</artifactId>
<version>1.15.1</version>
<version>1.15.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
......
......@@ -23,7 +23,7 @@
<parent>
<groupId>com.yahoo.pulsar</groupId>
<artifactId>pulsar</artifactId>
<version>1.15.1</version>
<version>1.15.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>com.yahoo.pulsar</groupId>
<artifactId>pulsar</artifactId>
<version>1.15.1</version>
<version>1.15.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
......
......@@ -23,7 +23,7 @@
<parent>
<groupId>com.yahoo.pulsar</groupId>
<artifactId>pulsar</artifactId>
<version>1.15.1</version>
<version>1.15.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>com.yahoo.pulsar</groupId>
<artifactId>pulsar</artifactId>
<version>1.15.1</version>
<version>1.15.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>com.yahoo.pulsar</groupId>
<artifactId>pulsar</artifactId>
<version>1.15.1</version>
<version>1.15.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>com.yahoo.pulsar</groupId>
<artifactId>pulsar</artifactId>
<version>1.15.1</version>
<version>1.15.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册