提交 3cec7e46 编写于 作者: R Rajan 提交者: GitHub

Add blockedConsumerOnUnackedMsgs flag in consumer-stats (#124)

上级 e525da35
...@@ -368,6 +368,7 @@ public class Consumer { ...@@ -368,6 +368,7 @@ public class Consumer {
public ConsumerStats getStats() { public ConsumerStats getStats() {
stats.availablePermits = getAvailablePermits(); stats.availablePermits = getAvailablePermits();
stats.unackedMessages = unackedMessages.get(); stats.unackedMessages = unackedMessages.get();
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
return stats; return stats;
} }
......
...@@ -845,6 +845,7 @@ public class PersistentTopic implements Topic, AddEntryCallback { ...@@ -845,6 +845,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
destStatsStream.writePair("consumerName", consumerStats.consumerName); destStatsStream.writePair("consumerName", consumerStats.consumerName);
destStatsStream.writePair("availablePermits", consumerStats.availablePermits); destStatsStream.writePair("availablePermits", consumerStats.availablePermits);
destStatsStream.writePair("unackedMessages", consumerStats.unackedMessages); destStatsStream.writePair("unackedMessages", consumerStats.unackedMessages);
destStatsStream.writePair("blockedConsumerOnUnackedMsgs", consumerStats.blockedConsumerOnUnackedMsgs);
destStatsStream.writePair("connectedSince", consumerStats.connectedSince); destStatsStream.writePair("connectedSince", consumerStats.connectedSince);
destStatsStream.writePair("msgRateOut", consumerStats.msgRateOut); destStatsStream.writePair("msgRateOut", consumerStats.msgRateOut);
destStatsStream.writePair("msgThroughputOut", consumerStats.msgThroughputOut); destStatsStream.writePair("msgThroughputOut", consumerStats.msgThroughputOut);
......
...@@ -37,6 +37,9 @@ public class ConsumerStats { ...@@ -37,6 +37,9 @@ public class ConsumerStats {
/** Number of unacknowledged messages for the consumer */ /** Number of unacknowledged messages for the consumer */
public int unackedMessages; public int unackedMessages;
/** Flag to verify if consumer is blocked due to reaching threshold of unacked messages */
public boolean blockedConsumerOnUnackedMsgs;
/** Address of this consumer */ /** Address of this consumer */
public String address; public String address;
...@@ -51,6 +54,7 @@ public class ConsumerStats { ...@@ -51,6 +54,7 @@ public class ConsumerStats {
this.msgRateRedeliver += stats.msgRateRedeliver; this.msgRateRedeliver += stats.msgRateRedeliver;
this.availablePermits += stats.availablePermits; this.availablePermits += stats.availablePermits;
this.unackedMessages += stats.unackedMessages; this.unackedMessages += stats.unackedMessages;
this.blockedConsumerOnUnackedMsgs = stats.blockedConsumerOnUnackedMsgs;
return this; return this;
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册