提交 95b82399 编写于 作者: R Rajan 提交者: Matteo Merli

Add unack-message threshold to restrict consumer for receiving messages...

Add unack-message threshold to restrict consumer for receiving messages without acknowledging-msg up to the threshold (#48)
上级 cc7c8075
......@@ -97,6 +97,11 @@ tlsTrustCertsFilePath=
# Accept untrusted TLS certificate from client
tlsAllowInsecureConnection=false
# Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending
# messages to consumer once, this limit reaches until consumer starts acknowledging messages back.
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
maxUnackedMessagesPerConsumer=50000
### --- Authentication --- ###
# Enable authentication
......
......@@ -70,6 +70,11 @@ clientLibraryVersionCheckAllowUnversioned=true
# to service discovery health checks
statusFilePath=/usr/local/apache/htdocs
# Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending
# messages to consumer once, this limit reaches until consumer starts acknowledging messages back
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
maxUnackedMessagesPerConsumer=50000
### --- Authentication --- ###
# Enable authentication
......
......@@ -81,6 +81,10 @@ public class ServiceConfiguration {
// Path for the file used to determine the rotation status for the broker
// when responding to service discovery health checks
private String statusFilePath;
// Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending
// messages to consumer once, this limit reaches until consumer starts acknowledging messages back
// Using a value of 0, is disabling unackedMessage-limit check and consumer can receive messages without any restriction
private int maxUnackedMessagesPerConsumer = 50000;
/***** --- TLS --- ****/
// Enable TLS
......@@ -405,6 +409,14 @@ public class ServiceConfiguration {
this.statusFilePath = statusFilePath;
}
public int getMaxUnackedMessagesPerConsumer() {
return maxUnackedMessagesPerConsumer;
}
public void setMaxUnackedMessagesPerConsumer(int maxUnackedMessagesPerConsumer) {
this.maxUnackedMessagesPerConsumer = maxUnackedMessagesPerConsumer;
}
public boolean isTlsEnabled() {
return tlsEnabled;
}
......
......@@ -38,7 +38,7 @@ import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.ConsumerStats;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashSet;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
......@@ -63,17 +63,22 @@ public class Consumer {
// increase its availability
private final AtomicInteger messagePermits = new AtomicInteger(0);
private final ConcurrentOpenHashSet<PositionImpl> pendingAcks;
private final ConcurrentOpenHashMap<PositionImpl, Integer> pendingAcks;
private final ConsumerStats stats;
public Consumer(Subscription subscription, SubType subType, long consumerId, String consumerName, ServerCnx cnx,
String appId) throws BrokerServiceException {
private final int maxUnackedMessages;
private AtomicInteger unackedMessages = new AtomicInteger(0);
private volatile boolean blockedConsumerOnUnackedMsgs = false;
public Consumer(Subscription subscription, SubType subType, long consumerId, String consumerName,
int maxUnackedMessages, ServerCnx cnx, String appId) throws BrokerServiceException {
this.subscription = subscription;
this.subType = subType;
this.consumerId = consumerId;
this.consumerName = consumerName;
this.maxUnackedMessages = maxUnackedMessages;
this.cnx = cnx;
this.msgOut = new Rate();
this.appId = appId;
......@@ -84,7 +89,7 @@ public class Consumer {
stats.connectedSince = DATE_FORMAT.format(new Date(System.currentTimeMillis()));
if (subType == SubType.Shared) {
this.pendingAcks = new ConcurrentOpenHashSet<PositionImpl>(256, 2);
this.pendingAcks = new ConcurrentOpenHashMap<PositionImpl, Integer>(256, 2);
} else {
// We don't need to keep track of pending acks if the subscription is not shared
this.pendingAcks = null;
......@@ -156,6 +161,12 @@ public class Consumer {
return writePromise;
}
private void incrementUnackedMessages(int ackedMessages) {
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages.addAndGet(ackedMessages) >= maxUnackedMessages) {
blockedConsumerOnUnackedMsgs = true;
}
}
int getBatchSizeforEntry(ByteBuf metadataAndPayload) {
try {
// save the reader index and restore after parsing
......@@ -191,12 +202,13 @@ public class Consumer {
}
if (pendingAcks != null) {
PositionImpl pos = PositionImpl.get((PositionImpl) entry.getPosition());
pendingAcks.add(pos);
pendingAcks.put(pos, batchSize);
}
permitsToReduce += batchSize;
}
// reduce permit and increment unackedMsg count with total number of messages in batch-msgs
int permits = messagePermits.addAndGet(-permitsToReduce);
incrementUnackedMessages(permitsToReduce);
if (permits < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] message permits dropped below 0 - {}", subscription, consumerId, permits);
......@@ -266,23 +278,46 @@ public class Consumer {
} else {
subscription.acknowledgeMessage(position, ack.getAckType());
}
}
void flowPermits(int additionalNumberOfMessages) {
checkArgument(additionalNumberOfMessages > 0);
// block shared consumer when unacked-messages reaches limit
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages.get() >= maxUnackedMessages) {
blockedConsumerOnUnackedMsgs = true;
}
int oldPermits = messagePermits.getAndAdd(additionalNumberOfMessages);
if (!blockedConsumerOnUnackedMsgs) {
subscription.consumerFlow(this, additionalNumberOfMessages);
}
if (log.isDebugEnabled()) {
log.debug("[{}] Added more flow control message permits {} (old was: {})", this, additionalNumberOfMessages,
oldPermits);
log.debug("[{}] Added more flow control message permits {} (old was: {})", this,
additionalNumberOfMessages, oldPermits);
}
subscription.consumerFlow(this, additionalNumberOfMessages);
}
public int getAvailablePermits() {
return messagePermits.get();
}
public boolean isBlocked() {
return blockedConsumerOnUnackedMsgs;
}
/**
* Checks if consumer-blocking on unAckedMessages is allowed for below conditions:<br/>
* a. consumer must have Shared-subscription<br/>
* b. {@link maxUnackedMessages} value > 0
*
* @return
*/
private boolean shouldBlockConsumerOnUnackMsgs() {
return SubType.Shared.equals(subType) && maxUnackedMessages > 0;
}
public void updateRates() {
msgOut.calculateRate();
stats.msgRateOut = msgOut.getRate();
......@@ -291,6 +326,7 @@ public class Consumer {
public ConsumerStats getStats() {
stats.availablePermits = getAvailablePermits();
stats.unackedMessages = unackedMessages.get();
return stats;
}
......@@ -338,22 +374,44 @@ public class Consumer {
* @param position
*/
private void removePendingAcks(PositionImpl position) {
if (!pendingAcks.remove(position)) {
Consumer ackOwnedConsumer = null;
if (pendingAcks.get(position) == null) {
for (Consumer consumer : subscription.getConsumers()) {
if (!consumer.equals(this) && consumer.getPendingAcks().remove(position)) {
if (!consumer.equals(this) && consumer.getPendingAcks().get(position) != null) {
ackOwnedConsumer = consumer;
break;
}
}
} else {
ackOwnedConsumer = this;
}
// remove pending message from appropriate consumer and unblock unAckMsg-flow if requires
if (ackOwnedConsumer != null) {
int totalAckedMsgs = ackOwnedConsumer.getPendingAcks().remove(position);
// unblock consumer-throttling when receives half of maxUnackedMessages => consumer can start again
// consuming messages
if (ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs()
&& ((ackOwnedConsumer.unackedMessages.addAndGet(-totalAckedMsgs) == (maxUnackedMessages / 2))
&& ackOwnedConsumer.blockedConsumerOnUnackedMsgs)) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
subscription.consumerFlow(ackOwnedConsumer, ackOwnedConsumer.messagePermits.get());
}
}
}
public ConcurrentOpenHashSet<PositionImpl> getPendingAcks() {
public ConcurrentOpenHashMap<PositionImpl, Integer> getPendingAcks() {
return pendingAcks;
}
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public void redeliverUnacknowledgedMessages() {
// cleanup unackedMessage bucket and redeliver those unack-msgs again
unackedMessages.set(0);
blockedConsumerOnUnackedMsgs = false;
// redeliver unacked-msgs
subscription.redeliverUnacknowledgedMessages(this);
if (pendingAcks != null) {
pendingAcks.clear();
......
......@@ -15,6 +15,7 @@
*/
package com.yahoo.pulsar.broker.service.persistent;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
......@@ -118,7 +119,7 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer are left, reading more entries", name);
}
consumer.getPendingAcks().forEach(pendingMessages -> {
consumer.getPendingAcks().forEach((pendingMessages, totalMsg) -> {
messagesToReplay.add(pendingMessages);
});
totalAvailablePermits -= consumer.getAvailablePermits();
......@@ -148,7 +149,7 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
}
private void readMoreEntries() {
if (totalAvailablePermits > 0) {
if (totalAvailablePermits > 0 && isUnblockedConsumerAvailable()) {
int messagesToRead = Math.min(totalAvailablePermits, readBatchSize);
if (!messagesToReplay.isEmpty()) {
......@@ -257,7 +258,7 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size());
}
while (entriesToDispatch > 0 && totalAvailablePermits > 0) {
while (entriesToDispatch > 0 && totalAvailablePermits > 0 && isUnblockedConsumerAvailable()) {
Consumer c = getNextConsumer();
if (c == null) {
// Do nothing, cursor will be rewind at reconnection
......@@ -352,12 +353,45 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
if (consumerIndex >= consumerList.size()) {
consumerIndex = 0;
}
// find next available unblocked consumer
int unblockedConsumerIndex = consumerIndex;
do {
if (!consumerList.get(unblockedConsumerIndex).isBlocked()) {
consumerIndex = unblockedConsumerIndex;
return consumerList.get(consumerIndex++);
}
if (++unblockedConsumerIndex >= consumerList.size()) {
unblockedConsumerIndex = 0;
}
} while (unblockedConsumerIndex != consumerIndex);
// not found unblocked consumer
return null;
}
/**
* returns true only if {@link consumerList} has atleast one unblocked consumer
*
* @return
*/
private boolean isUnblockedConsumerAvailable() {
if (consumerList.isEmpty() || closeFuture != null) {
// abort read if no consumers are connected or if disconnect is initiated
return false;
}
Iterator<Consumer> consumerIterator = consumerList.iterator();
while (consumerIterator.hasNext()) {
if (!consumerIterator.next().isBlocked()) {
return true;
}
}
return false;
}
@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
consumer.getPendingAcks().forEach(pendingMessages -> {
consumer.getPendingAcks().forEach((pendingMessages, totalMsg) -> {
messagesToReplay.add(pendingMessages);
});
if (log.isDebugEnabled()) {
......
......@@ -329,7 +329,8 @@ public class PersistentTopic implements Topic, AddEntryCallback {
PersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
name -> new PersistentSubscription(PersistentTopic.this, cursor));
Consumer consumer = new Consumer(subscription, subType, consumerId, consumerName, cnx,
Consumer consumer = new Consumer(subscription, subType, consumerId, consumerName,
brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer(), cnx,
cnx.getRole());
subscription.addConsumer(consumer);
if (!cnx.isActive()) {
......@@ -835,6 +836,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
destStatsStream.writePair("address", consumerStats.address);
destStatsStream.writePair("consumerName", consumerStats.consumerName);
destStatsStream.writePair("availablePermits", consumerStats.availablePermits);
destStatsStream.writePair("unackedMessages", consumerStats.unackedMessages);
destStatsStream.writePair("connectedSince", consumerStats.connectedSince);
destStatsStream.writePair("msgRateOut", consumerStats.msgRateOut);
destStatsStream.writePair("msgThroughputOut", consumerStats.msgThroughputOut);
......
......@@ -192,7 +192,7 @@ public class PersistentDispatcherFailoverConsumerTest {
// 2. Add consumer
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 1 /* consumer id */, "Cons1"/* consumer name */,
serverCnx, "myrole-1");
50000, serverCnx, "myrole-1");
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertTrue(consumers.get(0).consumerName() == consumer1.consumerName());
......@@ -209,7 +209,7 @@ public class PersistentDispatcherFailoverConsumerTest {
// 5. Add another consumer which does not change active consumer
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, 2 /* consumer id */, "Cons2"/* consumer name */,
serverCnx, "myrole-1");
50000, serverCnx, "myrole-1");
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
......@@ -217,7 +217,7 @@ public class PersistentDispatcherFailoverConsumerTest {
// 6. Add a consumer which changes active consumer
Consumer consumer0 = new Consumer(sub, SubType.Exclusive, 0 /* consumer id */, "Cons0"/* consumer name */,
serverCnx, "myrole-1");
50000, serverCnx, "myrole-1");
pdfc.addConsumer(consumer0);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer0.consumerName());
......
......@@ -372,7 +372,7 @@ public class PersistentTopicTest {
// 1. simple add consumer
Consumer consumer = new Consumer(sub, SubType.Exclusive, 1 /* consumer id */, "Cons1"/* consumer name */,
serverCnx, "myrole-1");
50000, serverCnx, "myrole-1");
sub.addConsumer(consumer);
assertTrue(sub.getDispatcher().isConsumerConnected());
......@@ -402,7 +402,7 @@ public class PersistentTopicTest {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic, cursorMock);
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 1 /* consumer id */, "Cons1"/* consumer name */,
serverCnx, "myrole-1");
50000, serverCnx, "myrole-1");
sub.addConsumer(consumer1);
doAnswer(new Answer<Object>() {
......@@ -424,7 +424,7 @@ public class PersistentTopicTest {
try {
Thread.sleep(10); /* delay to ensure that the ubsubscribe gets executed first */
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, 2 /* consumer id */, "Cons2"/* consumer name */,
serverCnx, "myrole-1");
50000, serverCnx, "myrole-1");
} catch (BrokerServiceException e) {
assertTrue(e instanceof BrokerServiceException.SubscriptionFencedException);
}
......
......@@ -32,6 +32,9 @@ public class ConsumerStats {
/** Number of available message permits for the consumer */
public int availablePermits;
/** Number of unacknowledged messages for the consumer */
public int unackedMessages;
/** Address of this consumer */
public String address;
......@@ -43,6 +46,7 @@ public class ConsumerStats {
this.msgRateOut += stats.msgRateOut;
this.msgThroughputOut += stats.msgThroughputOut;
this.availablePermits += stats.availablePermits;
this.unackedMessages += stats.unackedMessages;
return this;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册