提交 f13606e8 编写于 作者: R Rajan 提交者: Matteo Merli

avoid incrementing unack-msg count for non-shared sub and not show it on stats (#376)

上级 55c49c8f
......@@ -208,7 +208,7 @@ public class Consumer {
}
private void incrementUnackedMessages(int ackedMessages) {
if (UNACKED_MESSAGES_UPDATER.addAndGet(this, ackedMessages) >= maxUnackedMessages && shouldBlockConsumerOnUnackMsgs()) {
if (shouldBlockConsumerOnUnackMsgs() && UNACKED_MESSAGES_UPDATER.addAndGet(this, ackedMessages) >= maxUnackedMessages) {
blockedConsumerOnUnackedMsgs = true;
}
}
......
......@@ -23,6 +23,7 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
public interface Subscription {
void addConsumer(Consumer consumer) throws BrokerServiceException;
......@@ -67,5 +68,7 @@ public interface Subscription {
double getExpiredMessageRate();
SubType getType();
String getTypeString();
}
......@@ -213,10 +213,12 @@ public class PersistentSubscription implements Subscription {
return this.topicName;
}
@Override
public SubType getType() {
return dispatcher != null ? dispatcher.getType() : null;
}
@Override
public String getTypeString() {
SubType type = getType();
if (type == null) {
......
......@@ -893,12 +893,16 @@ public class PersistentTopic implements Topic, AddEntryCallback {
destStatsStream.writePair("address", consumerStats.address);
destStatsStream.writePair("consumerName", consumerStats.consumerName);
destStatsStream.writePair("availablePermits", consumerStats.availablePermits);
destStatsStream.writePair("unackedMessages", consumerStats.unackedMessages);
destStatsStream.writePair("blockedConsumerOnUnackedMsgs", consumerStats.blockedConsumerOnUnackedMsgs);
destStatsStream.writePair("connectedSince", consumerStats.connectedSince);
destStatsStream.writePair("msgRateOut", consumerStats.msgRateOut);
destStatsStream.writePair("msgThroughputOut", consumerStats.msgThroughputOut);
destStatsStream.writePair("msgRateRedeliver", consumerStats.msgRateRedeliver);
if (SubType.Shared.equals(subscription.getType())) {
destStatsStream.writePair("unackedMessages", consumerStats.unackedMessages);
destStatsStream.writePair("blockedConsumerOnUnackedMsgs",
consumerStats.blockedConsumerOnUnackedMsgs);
}
destStatsStream.endObject();
}
......@@ -911,8 +915,11 @@ public class PersistentTopic implements Topic, AddEntryCallback {
destStatsStream.writePair("msgRateOut", subMsgRateOut);
destStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
destStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
destStatsStream.writePair("unackedMessages", subUnackedMessages);
destStatsStream.writePair("type", subscription.getTypeString());
if (SubType.Shared.equals(subscription.getType())) {
destStatsStream.writePair("unackedMessages", subUnackedMessages);
}
// Close consumers
destStatsStream.endObject();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册