提交 f6ae797f 编写于 作者: M massakam 提交者: Matteo Merli

Prevent message duplication when active consumer is changed (#807)

上级 34ae881a
...@@ -70,6 +70,9 @@ brokerDeleteInactiveTopicsFrequencySeconds=60 ...@@ -70,6 +70,9 @@ brokerDeleteInactiveTopicsFrequencySeconds=60
# How frequently to proactively check and purge expired messages # How frequently to proactively check and purge expired messages
messageExpiryCheckIntervalInMinutes=5 messageExpiryCheckIntervalInMinutes=5
# How long to delay rewinding cursor and dispatching messages when active consumer is changed
activeConsumerFailoverDelayTimeMillis=1000
# Set the default behavior for message deduplication in the broker # Set the default behavior for message deduplication in the broker
# This can be overridden per-namespace. If enabled, broker will reject # This can be overridden per-namespace. If enabled, broker will reject
# messages that were already stored in the topic # messages that were already stored in the topic
......
...@@ -63,6 +63,9 @@ brokerDeleteInactiveTopicsFrequencySeconds=60 ...@@ -63,6 +63,9 @@ brokerDeleteInactiveTopicsFrequencySeconds=60
# How frequently to proactively check and purge expired messages # How frequently to proactively check and purge expired messages
messageExpiryCheckIntervalInMinutes=5 messageExpiryCheckIntervalInMinutes=5
# How long to delay rewinding cursor and dispatching messages when active consumer is changed
activeConsumerFailoverDelayTimeMillis=1000
# Set the default behavior for message deduplication in the broker # Set the default behavior for message deduplication in the broker
# This can be overridden per-namespace. If enabled, broker will reject # This can be overridden per-namespace. If enabled, broker will reject
# messages that were already stored in the topic # messages that were already stored in the topic
......
...@@ -78,6 +78,8 @@ public class ServiceConfiguration implements PulsarConfiguration { ...@@ -78,6 +78,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
private long brokerDeleteInactiveTopicsFrequencySeconds = 60; private long brokerDeleteInactiveTopicsFrequencySeconds = 60;
// How frequently to proactively check and purge expired messages // How frequently to proactively check and purge expired messages
private int messageExpiryCheckIntervalInMinutes = 5; private int messageExpiryCheckIntervalInMinutes = 5;
// How long to delay rewinding cursor and dispatching messages when active consumer is changed
private int activeConsumerFailoverDelayTimeMillis = 1000;
// Set the default behavior for message deduplication in the broker // Set the default behavior for message deduplication in the broker
// This can be overridden per-namespace. If enabled, broker will reject // This can be overridden per-namespace. If enabled, broker will reject
...@@ -521,6 +523,14 @@ public class ServiceConfiguration implements PulsarConfiguration { ...@@ -521,6 +523,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
this.brokerDeduplicationEnabled = brokerDeduplicationEnabled; this.brokerDeduplicationEnabled = brokerDeduplicationEnabled;
} }
public int getActiveConsumerFailoverDelayTimeMillis() {
return activeConsumerFailoverDelayTimeMillis;
}
public void setActiveConsumerFailoverDelayTimeMillis(int activeConsumerFailoverDelayTimeMillis) {
this.activeConsumerFailoverDelayTimeMillis = activeConsumerFailoverDelayTimeMillis;
}
public boolean isClientLibraryVersionCheckEnabled() { public boolean isClientLibraryVersionCheckEnabled() {
return clientLibraryVersionCheckEnabled; return clientLibraryVersionCheckEnabled;
} }
......
...@@ -23,6 +23,7 @@ import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAG ...@@ -23,6 +23,7 @@ import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAG
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledFuture;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Entry;
...@@ -54,6 +55,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp ...@@ -54,6 +55,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
private int readBatchSize; private int readBatchSize;
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS); private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
private final ServiceConfiguration serviceConfig; private final ServiceConfiguration serviceConfig;
private ScheduledFuture<?> readOnActiveConsumerTask = null;
public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex, public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex,
PersistentTopic topic) { PersistentTopic topic) {
...@@ -71,12 +73,38 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp ...@@ -71,12 +73,38 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
havePendingRead = false; havePendingRead = false;
} }
// When a new consumer is chosen, start delivery from unacked message. If there is any pending read operation, if (havePendingRead) {
// let it finish and then rewind return;
if (!havePendingRead) { }
// When a new consumer is chosen, start delivery from unacked message.
// If there is any pending read operation, let it finish and then rewind
if (subscriptionType != SubType.Failover || serviceConfig.getActiveConsumerFailoverDelayTimeMillis() <= 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] Rewind cursor and read more entries without delay", name);
}
cursor.rewind(); cursor.rewind();
readMoreEntries(ACTIVE_CONSUMER_UPDATER.get(this)); readMoreEntries(ACTIVE_CONSUMER_UPDATER.get(this));
return;
} }
// If subscription type is Failover, delay rewinding cursor and
// reading more entries in order to prevent message duplication
if (readOnActiveConsumerTask != null) {
return;
}
readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name,
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
}
cursor.rewind();
readMoreEntries(ACTIVE_CONSUMER_UPDATER.get(this));
readOnActiveConsumerTask = null;
}, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
} }
protected void cancelPendingRead() { protected void cancelPendingRead() {
...@@ -148,35 +176,47 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp ...@@ -148,35 +176,47 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
@Override @Override
public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
if (!havePendingRead) { if (havePendingRead) {
if (ACTIVE_CONSUMER_UPDATER.get(this) == consumer) { if (log.isDebugEnabled()) {
if (log.isDebugEnabled()) { log.debug("[{}-{}] Ignoring flow control message since we already have a pending read req", name,
log.debug("[{}-{}] Trigger new read after receiving flow control message", name, consumer); consumer);
} }
readMoreEntries(consumer); } else if (ACTIVE_CONSUMER_UPDATER.get(this) != consumer) {
} else { if (log.isDebugEnabled()) {
if (log.isDebugEnabled()) { log.debug("[{}-{}] Ignoring flow control message since consumer is not active partition consumer", name,
log.debug("[{}-{}] Ignoring flow control message since consumer is not active partition consumer", consumer);
name, consumer); }
} } else if (readOnActiveConsumerTask != null) {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Ignoring flow control message since consumer is waiting for cursor to be rewinded",
name, consumer);
} }
} else { } else {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("[{}-{}] Ignoring flow control message since we already have a pending read req", name, consumer); log.debug("[{}-{}] Trigger new read after receiving flow control message", name, consumer);
} }
readMoreEntries(consumer);
} }
} }
@Override @Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
if (consumer != ACTIVE_CONSUMER_UPDATER.get(this)) { if (consumer != ACTIVE_CONSUMER_UPDATER.get(this)) {
log.info("[{}] Ignoring reDeliverUnAcknowledgedMessages: Only the active consumer can call resend", log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: Only the active consumer can call resend",
consumer); name, consumer);
return;
}
if (readOnActiveConsumerTask != null) {
log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: consumer is waiting for cursor to be rewinded",
name, consumer);
return; return;
} }
if (havePendingRead && cursor.cancelPendingReadRequest()) { if (havePendingRead && cursor.cancelPendingReadRequest()) {
havePendingRead = false; havePendingRead = false;
} }
if (!havePendingRead) { if (!havePendingRead) {
cursor.rewind(); cursor.rewind();
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
......
...@@ -90,6 +90,7 @@ public abstract class MockedPulsarServiceBaseTest { ...@@ -90,6 +90,7 @@ public abstract class MockedPulsarServiceBaseTest {
this.conf.setClusterName("test"); this.conf.setClusterName("test");
this.conf.setAdvertisedAddress("localhost"); // there are TLS tests in here, they need to use localhost because of the certificate this.conf.setAdvertisedAddress("localhost"); // there are TLS tests in here, they need to use localhost because of the certificate
this.conf.setManagedLedgerCacheSizeMB(8); this.conf.setManagedLedgerCacheSizeMB(8);
this.conf.setActiveConsumerFailoverDelayTimeMillis(0);
} }
protected final void internalSetup() throws Exception { protected final void internalSetup() throws Exception {
......
...@@ -433,4 +433,88 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { ...@@ -433,4 +433,88 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
admin.persistentTopics().deletePartitionedTopic(topicName); admin.persistentTopics().deletePartitionedTopic(topicName);
} }
@Test
public void testActiveConsumerFailoverWithDelay() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/failover-topic3";
final String subName = "sub1";
final int numMsgs = 100;
List<Message> receivedMessages = Lists.newArrayList();
ConsumerConfiguration consumerConf1 = new ConsumerConfiguration();
consumerConf1.setSubscriptionType(SubscriptionType.Failover);
consumerConf1.setConsumerName("1");
consumerConf1.setMessageListener((consumer, msg) -> {
try {
synchronized (receivedMessages) {
receivedMessages.add(msg);
}
consumer.acknowledge(msg);
} catch (Exception e) {
fail("Should not fail");
}
});
ConsumerConfiguration consumerConf2 = new ConsumerConfiguration();
consumerConf2.setSubscriptionType(SubscriptionType.Failover);
consumerConf2.setConsumerName("2");
consumerConf2.setMessageListener((consumer, msg) -> {
try {
synchronized (receivedMessages) {
receivedMessages.add(msg);
}
consumer.acknowledge(msg);
} catch (Exception e) {
fail("Should not fail");
}
});
conf.setActiveConsumerFailoverDelayTimeMillis(500);
restartBroker();
// create subscription
Consumer consumer = pulsarClient.subscribe(topicName, subName, consumerConf1);
consumer.close();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName);
PersistentSubscription subRef = topicRef.getSubscription(subName);
// enqueue messages
List<CompletableFuture<MessageId>> futures = Lists.newArrayListWithCapacity(numMsgs);
Producer producer = pulsarClient.createProducer(topicName);
for (int i = 0; i < numMsgs; i++) {
String message = "my-message-" + i;
futures.add(producer.sendAsync(message.getBytes()));
}
FutureUtil.waitForAll(futures).get();
futures.clear();
producer.close();
// two consumers subscribe at almost the same time
CompletableFuture<Consumer> subscribeFuture2 = pulsarClient.subscribeAsync(topicName, subName, consumerConf2);
CompletableFuture<Consumer> subscribeFuture1 = pulsarClient.subscribeAsync(topicName, subName, consumerConf1);
// wait for all messages to be dequeued
int retry = 20;
for (int i = 0; i < retry; i++) {
if (receivedMessages.size() >= numMsgs && subRef.getNumberOfEntriesInBacklog() == 0) {
break;
} else if (i != retry - 1) {
Thread.sleep(100);
}
}
// check if message duplication has occurred
assertEquals(receivedMessages.size(), numMsgs);
assertEquals(subRef.getNumberOfEntriesInBacklog(), 0);
for (int i = 0; i < receivedMessages.size(); i++) {
Assert.assertNotNull(receivedMessages.get(i));
Assert.assertEquals(new String(receivedMessages.get(i).getData()), "my-message-" + i);
}
subscribeFuture1.get().close();
subscribeFuture2.get().unsubscribe();
admin.persistentTopics().delete(topicName);
resetConfig();
restartBroker();
}
} }
...@@ -739,6 +739,13 @@ public class ReplicatorTest extends ReplicatorTestBase { ...@@ -739,6 +739,13 @@ public class ReplicatorTest extends ReplicatorTestBase {
// Wait until the 2nd message got delivered to consumer // Wait until the 2nd message got delivered to consumer
consumer2.receive(1); consumer2.receive(1);
int retry = 10;
for (int i = 0; i < retry && replicator.getStats().replicationBacklog > 0; i++) {
if (i != retry - 1) {
Thread.sleep(100);
}
}
assertEquals(replicator.getStats().replicationBacklog, 0); assertEquals(replicator.getStats().replicationBacklog, 0);
producer1.close(); producer1.close();
......
...@@ -633,7 +633,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase { ...@@ -633,7 +633,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
Set<Integer> unackMessages = Sets.newHashSet(5, 10, 20, 21, 22, 23, 25, 26, 30, 32, 40, 80, 160, 320); Set<Integer> unackMessages = Sets.newHashSet(5, 10, 20, 21, 22, 23, 25, 26, 30, 32, 40, 80, 160, 320);
int receivedMsgCount = 0; int receivedMsgCount = 0;
for (int i = 0; i < totalProducedMsgs; i++) { for (int i = 0; i < totalProducedMsgs; i++) {
Message msg = consumer.receive(100, TimeUnit.MILLISECONDS); Message msg = consumer.receive(500, TimeUnit.MILLISECONDS);
if (!unackMessages.contains(i)) { if (!unackMessages.contains(i)) {
consumer.acknowledge(msg); consumer.acknowledge(msg);
} }
...@@ -1033,4 +1033,4 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase { ...@@ -1033,4 +1033,4 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
log.error("Stats executor error", e); log.error("Stats executor error", e);
} }
} }
} }
\ No newline at end of file
...@@ -2455,4 +2455,4 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { ...@@ -2455,4 +2455,4 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName); log.info("-- Exiting {} test --", methodName);
} }
} }
\ No newline at end of file
...@@ -394,6 +394,10 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase { ...@@ -394,6 +394,10 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
final NavigableMap<Long, TimestampEntryCount> publishTimeIdMap = new ConcurrentSkipListMap<>(); final NavigableMap<Long, TimestampEntryCount> publishTimeIdMap = new ConcurrentSkipListMap<>();
// set delay time to start dispatching messages to active consumer in order to avoid message duplication
conf.setActiveConsumerFailoverDelayTimeMillis(500);
restartBroker();
consConfig.setSubscriptionType(subType); consConfig.setSubscriptionType(subType);
consConfig.setMessageListener((MessageListener) (Consumer consumer, Message msg) -> { consConfig.setMessageListener((MessageListener) (Consumer consumer, Message msg) -> {
try { try {
...@@ -413,7 +417,8 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase { ...@@ -413,7 +417,8 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
admin.namespaces().setRetention(destName.getNamespace(), policy); admin.namespaces().setRetention(destName.getNamespace(), policy);
Consumer consumer = pulsarClient.subscribe(destName.toString(), subsId, consConfig); Consumer consumer1 = pulsarClient.subscribe(destName.toString(), subsId, consConfig);
Consumer consumer2 = pulsarClient.subscribe(destName.toString(), subsId, consConfig);
final Producer producer = pulsarClient.createProducer(destName.toString()); final Producer producer = pulsarClient.createProducer(destName.toString());
log.info("warm up started for " + destName.toString()); log.info("warm up started for " + destName.toString());
...@@ -426,7 +431,7 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase { ...@@ -426,7 +431,7 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
// sleep to ensure receiving of msgs // sleep to ensure receiving of msgs
for (int n = 0; n < 10 && received.size() < warmup; n++) { for (int n = 0; n < 10 && received.size() < warmup; n++) {
Thread.sleep(100); Thread.sleep(200);
} }
// validate received msgs // validate received msgs
...@@ -465,7 +470,6 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase { ...@@ -465,7 +470,6 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
Assert.assertTrue(subList.contains(subsId)); Assert.assertTrue(subList.contains(subsId));
admin.persistentTopics().resetCursor(destName.toString(), subsId, timestamp); admin.persistentTopics().resetCursor(destName.toString(), subsId, timestamp);
consumer = pulsarClient.subscribe(destName.toString(), subsId, consConfig);
Thread.sleep(3000); Thread.sleep(3000);
int totalExpected = 0; int totalExpected = 0;
for (TimestampEntryCount tec : expectedMessages.values()) { for (TimestampEntryCount tec : expectedMessages.values()) {
...@@ -473,7 +477,8 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase { ...@@ -473,7 +477,8 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
} }
// validate that replay happens after the timestamp // validate that replay happens after the timestamp
Assert.assertTrue(publishTimeIdMap.firstEntry().getKey() >= timestamp); Assert.assertTrue(publishTimeIdMap.firstEntry().getKey() >= timestamp);
consumer.close(); consumer1.close();
consumer2.close();
producer.close(); producer.close();
// validate that expected and received counts match // validate that expected and received counts match
int totalReceived = 0; int totalReceived = 0;
...@@ -481,6 +486,9 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase { ...@@ -481,6 +486,9 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
totalReceived += tec.numMessages; totalReceived += tec.numMessages;
} }
Assert.assertEquals(totalReceived, totalExpected, "did not receive all messages on replay after reset"); Assert.assertEquals(totalReceived, totalExpected, "did not receive all messages on replay after reset");
resetConfig();
restartBroker();
} }
/** /**
......
...@@ -72,6 +72,9 @@ configs: ...@@ -72,6 +72,9 @@ configs:
- name: messageExpiryCheckIntervalInMinutes - name: messageExpiryCheckIntervalInMinutes
default: '5' default: '5'
description: How frequently to proactively check and purge expired messages description: How frequently to proactively check and purge expired messages
- name: activeConsumerFailoverDelayTimeMillis
default: '1000'
description: How long to delay rewinding cursor and dispatching messages when active consumer is changed.
- name: clientLibraryVersionCheckEnabled - name: clientLibraryVersionCheckEnabled
default: 'false' default: 'false'
description: Enable check for minimum allowed client library version description: Enable check for minimum allowed client library version
......
...@@ -63,6 +63,9 @@ configs: ...@@ -63,6 +63,9 @@ configs:
- name: messageExpiryCheckIntervalInMinutes - name: messageExpiryCheckIntervalInMinutes
default: '5' default: '5'
description: How often to proactively check and purged expired messages. description: How often to proactively check and purged expired messages.
- name: activeConsumerFailoverDelayTimeMillis
default: '1000'
description: How long to delay rewinding cursor and dispatching messages when active consumer is changed.
- name: clientLibraryVersionCheckEnabled - name: clientLibraryVersionCheckEnabled
default: 'false' default: 'false'
description: Enable checks for minimum allowed client library version. description: Enable checks for minimum allowed client library version.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册