提交 5e55d799 编写于 作者: S sschepens 提交者: Matteo Merli

Add redelivery rates to consumer and subscription stats (#95)

上级 340dc8d1
......@@ -62,6 +62,7 @@ public class Consumer {
private final long consumerId;
private final String consumerName;
private final Rate msgOut;
private final Rate msgRedeliver;
// Represents how many messages we can safely send to the consumer without
// overflowing its receiving queue. The consumer will use Flow commands to
......@@ -90,6 +91,7 @@ public class Consumer {
this.maxUnackedMessages = maxUnackedMessages;
this.cnx = cnx;
this.msgOut = new Rate();
this.msgRedeliver = new Rate();
this.appId = appId;
stats = new ConsumerStats();
......@@ -357,8 +359,10 @@ public class Consumer {
public void updateRates() {
msgOut.calculateRate();
msgRedeliver.calculateRate();
stats.msgRateOut = msgOut.getRate();
stats.msgThroughputOut = msgOut.getValueRate();
stats.msgRateRedeliver = msgRedeliver.getRate();
}
public ConsumerStats getStats() {
......@@ -452,6 +456,11 @@ public class Consumer {
subscription.redeliverUnacknowledgedMessages(this);
flowConsumerBlockedPermits(this);
if (pendingAcks != null) {
int totalRedeliveryMessages = 0;
for (Integer batchSize : pendingAcks.values()) {
totalRedeliveryMessages += batchSize;
}
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages);
pendingAcks.clear();
}
......@@ -474,6 +483,7 @@ public class Consumer {
blockedConsumerOnUnackedMsgs = false;
subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages);
int numberOfBlockedPermits = Math.min(totalRedeliveryMessages,
permitsReceivedWhileConsumerBlocked.get());
......
......@@ -575,6 +575,8 @@ public class PersistentSubscription implements Subscription {
subStats.consumers.add(consumerStats);
subStats.msgRateOut += consumerStats.msgRateOut;
subStats.msgThroughputOut += consumerStats.msgThroughputOut;
subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
subStats.unackedMessages += consumerStats.unackedMessages;
});
}
......
......@@ -817,6 +817,8 @@ public class PersistentTopic implements Topic, AddEntryCallback {
subscriptions.forEach((subscriptionName, subscription) -> {
double subMsgRateOut = 0;
double subMsgThroughputOut = 0;
double subMsgRateRedeliver = 0;
long subUnackedMessages = 0;
// Start subscription name & consumers
try {
......@@ -834,6 +836,8 @@ public class PersistentTopic implements Topic, AddEntryCallback {
ConsumerStats consumerStats = consumer.getStats();
subMsgRateOut += consumerStats.msgRateOut;
subMsgThroughputOut += consumerStats.msgThroughputOut;
subMsgRateRedeliver += consumerStats.msgRateRedeliver;
subUnackedMessages += consumerStats.unackedMessages;
// Populate consumer specific stats here
destStatsStream.startObject();
......@@ -844,6 +848,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
destStatsStream.writePair("connectedSince", consumerStats.connectedSince);
destStatsStream.writePair("msgRateOut", consumerStats.msgRateOut);
destStatsStream.writePair("msgThroughputOut", consumerStats.msgThroughputOut);
destStatsStream.writePair("msgRateRedeliver", consumerStats.msgRateRedeliver);
destStatsStream.endObject();
}
......@@ -855,6 +860,8 @@ public class PersistentTopic implements Topic, AddEntryCallback {
destStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
destStatsStream.writePair("msgRateOut", subMsgRateOut);
destStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
destStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
destStatsStream.writePair("unackedMessages", subUnackedMessages);
destStatsStream.writePair("type", subscription.getTypeString());
// Close consumers
......
......@@ -185,6 +185,96 @@ public class BrokerServiceTest extends BrokerTestBase {
assertEquals(subStats.msgBacklog, 0);
}
@Test
public void testBrokerServicePersistentRedeliverTopicStats() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/successSharedTopic";
final String subName = "successSharedSub";
PersistentTopicStats stats;
PersistentSubscriptionStats subStats;
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Shared);
Consumer consumer = pulsarClient.subscribe(topicName, subName, conf);
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName);
assertNotNull(topicRef);
rolloverPerIntervalStats();
stats = topicRef.getStats();
subStats = stats.subscriptions.values().iterator().next();
// subscription stats
assertEquals(stats.subscriptions.keySet().size(), 1);
assertEquals(subStats.msgBacklog, 0);
assertEquals(subStats.consumers.size(), 1);
Producer producer = pulsarClient.createProducer(topicName);
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
rolloverPerIntervalStats();
stats = topicRef.getStats();
subStats = stats.subscriptions.values().iterator().next();
// publisher stats
assertEquals(subStats.msgBacklog, 10);
assertEquals(stats.publishers.size(), 1);
assertTrue(stats.publishers.get(0).msgRateIn > 0.0);
assertTrue(stats.publishers.get(0).msgThroughputIn > 0.0);
assertTrue(stats.publishers.get(0).averageMsgSize > 0.0);
// aggregated publish stats
assertEquals(stats.msgRateIn, stats.publishers.get(0).msgRateIn);
assertEquals(stats.msgThroughputIn, stats.publishers.get(0).msgThroughputIn);
double diff = stats.averageMsgSize - stats.publishers.get(0).averageMsgSize;
assertTrue(Math.abs(diff) < 0.000001);
// consumer stats
assertTrue(subStats.consumers.get(0).msgRateOut > 0.0);
assertTrue(subStats.consumers.get(0).msgThroughputOut > 0.0);
assertEquals(subStats.msgRateRedeliver, 0.0);
assertEquals(subStats.consumers.get(0).unackedMessages, 10);
// aggregated consumer stats
assertEquals(subStats.msgRateOut, subStats.consumers.get(0).msgRateOut);
assertEquals(subStats.msgThroughputOut, subStats.consumers.get(0).msgThroughputOut);
assertEquals(subStats.msgRateRedeliver, subStats.consumers.get(0).msgRateRedeliver);
assertEquals(stats.msgRateOut, subStats.consumers.get(0).msgRateOut);
assertEquals(stats.msgThroughputOut, subStats.consumers.get(0).msgThroughputOut);
assertEquals(subStats.msgRateRedeliver, subStats.consumers.get(0).msgRateRedeliver);
assertEquals(subStats.unackedMessages, subStats.consumers.get(0).unackedMessages);
consumer.redeliverUnacknowledgedMessages();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
rolloverPerIntervalStats();
stats = topicRef.getStats();
subStats = stats.subscriptions.values().iterator().next();
assertTrue(subStats.msgRateRedeliver > 0.0);
assertEquals(subStats.msgRateRedeliver, subStats.consumers.get(0).msgRateRedeliver);
Message msg;
for (int i = 0; i < 10; i++) {
msg = consumer.receive();
consumer.acknowledge(msg);
}
consumer.close();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
rolloverPerIntervalStats();
stats = topicRef.getStats();
subStats = stats.subscriptions.values().iterator().next();
assertEquals(subStats.msgBacklog, 0);
}
@Test
public void testBrokerStatsMetrics() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/newTopic";
......
......@@ -26,6 +26,9 @@ public class ConsumerStats {
/** Total throughput delivered to the consumer. bytes/s */
public double msgThroughputOut;
/** Total rate of messages redelivered by this consumer. msg/s */
public double msgRateRedeliver;
/** Name of the consumer */
public String consumerName;
......@@ -45,6 +48,7 @@ public class ConsumerStats {
checkNotNull(stats);
this.msgRateOut += stats.msgRateOut;
this.msgThroughputOut += stats.msgThroughputOut;
this.msgRateRedeliver += stats.msgRateRedeliver;
this.availablePermits += stats.availablePermits;
this.unackedMessages += stats.unackedMessages;
return this;
......
......@@ -31,9 +31,15 @@ public class PersistentSubscriptionStats {
/** Total throughput delivered on this subscription. bytes/s */
public double msgThroughputOut;
/** Total rate of messages redelivered on this subscription. msg/s */
public double msgRateRedeliver;
/** Number of messages in the subscription backlog */
public long msgBacklog;
/** Number of unacknowledged messages for the subscription */
public long unackedMessages;
/** whether this subscription is Exclusive or Shared or Failover */
public SubType type;
......@@ -50,7 +56,9 @@ public class PersistentSubscriptionStats {
public void reset() {
msgRateOut = 0;
msgThroughputOut = 0;
msgRateRedeliver = 0;
msgBacklog = 0;
unackedMessages = 0;
msgRateExpired = 0;
consumers.clear();
}
......@@ -61,7 +69,9 @@ public class PersistentSubscriptionStats {
checkNotNull(stats);
this.msgRateOut += stats.msgRateOut;
this.msgThroughputOut += stats.msgThroughputOut;
this.msgRateRedeliver += stats.msgRateRedeliver;
this.msgBacklog += stats.msgBacklog;
this.unackedMessages += stats.unackedMessages;
this.msgRateExpired += stats.msgRateExpired;
if (this.consumers.size() != stats.consumers.size()) {
for (int i = 0; i < stats.consumers.size(); i++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册