diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index b1dfbbdb2ad7adcffa605e96f16ad392c19ba522..93d56b6ab0a587e9880f128a63d289237afb3fee 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -49,6 +49,7 @@ public class NamespaceStatsAggregator { public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, SimpleTextOutputStream stream) { String cluster = pulsar.getConfiguration().getClusterName(); AggregatedNamespaceStats namespaceStats = localNamespaceStats.get(); + TopicStats.resetTypes(); TopicStats topicStats = localTopicStats.get(); printDefaultBrokerStats(stream, cluster); @@ -260,6 +261,7 @@ public class NamespaceStatsAggregator { private static void metric(SimpleTextOutputStream stream, String cluster, String name, long value) { + TopicStats.metricType(stream, name); stream.write(name) .write("{cluster=\"").write(cluster).write("\"} ") .write(value).write(' ').write(System.currentTimeMillis()) @@ -268,18 +270,21 @@ public class NamespaceStatsAggregator { private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name, long value) { + TopicStats.metricType(stream, name); stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} "); stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); } private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name, double value) { + TopicStats.metricType(stream, name); stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} "); stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); } private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace, String name, String remoteCluster, double value) { + TopicStats.metricType(stream, name); stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace); stream.write("\",remote_cluster=\"").write(remoteCluster).write("\"} "); stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 17ab714e7dbf683687eb7d4b5fa80bd5b0d3165e..66bb70aa6396886da26c6334f540357a03652b03 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -51,6 +51,10 @@ class TopicStats { Map replicationStats = new HashMap<>(); Map subscriptionStats = new HashMap<>(); + // Used for tracking duplicate TYPE definitions + static Map metricWithTypeDefinition = new HashMap<>(); + + public void reset() { subscriptionsCount = 0; producersCount = 0; @@ -74,6 +78,10 @@ class TopicStats { entrySizeBuckets.reset(); } + static void resetTypes() { + metricWithTypeDefinition.clear(); + } + static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic, TopicStats stats) { metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount); @@ -156,8 +164,18 @@ class TopicStats { } } + static void metricType(SimpleTextOutputStream stream, String name) { + + if (!metricWithTypeDefinition.containsKey(name)) { + metricWithTypeDefinition.put(name, "gauge"); + stream.write("# TYPE ").write(name).write(" gauge\n"); + } + + } + private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String name, double value) { + metricType(stream, name); stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace) .write("\",topic=\"").write(topic).write("\"} "); stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); @@ -165,6 +183,7 @@ class TopicStats { private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription, String name, long value) { + metricType(stream, name); stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace) .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} "); stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); @@ -172,6 +191,7 @@ class TopicStats { private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription, String name, double value) { + metricType(stream, name); stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace) .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} "); stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); @@ -179,6 +199,7 @@ class TopicStats { private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription, String consumerName, long consumerId, String name, long value) { + metricType(stream, name); stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace) .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription) .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId).write("\"} "); @@ -187,6 +208,7 @@ class TopicStats { private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription, String consumerName, long consumerId, String name, double value) { + metricType(stream, name); stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace) .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription) .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId).write("\"} "); @@ -196,6 +218,7 @@ class TopicStats { private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String name, String remoteCluster, double value) { + metricType(stream, name); stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace); stream.write("\",topic=\"").write(topic).write("\",remote_cluster=\"").write(remoteCluster).write("\"} "); stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index ac8e8023e7a77ee90fb7b0e09971a4af16ee27ba..054450bff699232f63b4711cf95e2f8f24dcdbd3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -21,10 +21,12 @@ package org.apache.pulsar.broker.stats; import static com.google.common.base.Preconditions.checkArgument; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; +import static org.testng.Assert.fail; import java.io.ByteArrayOutputStream; import java.util.List; import java.util.Map; +import java.util.HashMap; import java.util.TreeMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -136,6 +138,101 @@ public class PrometheusMetricsTest extends BrokerTestBase { p2.close(); } + /** Checks for duplicate type definitions for a metric in the Prometheus metrics output. If the Prometheus parser + finds a TYPE definition for the same metric more than once, it errors out: + https://github.com/prometheus/prometheus/blob/f04b1b5559a80a4fd1745cf891ce392a056460c9/vendor/github.com/prometheus/common/expfmt/text_parse.go#L499-L502 + This can happen when including topic metrics, since the same metric is reported multiple times with different labels. For example: + + # TYPE pulsar_subscriptions_count gauge + pulsar_subscriptions_count{cluster="standalone"} 0 1556372982118 + pulsar_subscriptions_count{cluster="standalone",namespace="public/functions",topic="persistent://public/functions/metadata"} 1.0 1556372982118 + pulsar_subscriptions_count{cluster="standalone",namespace="public/functions",topic="persistent://public/functions/coordinate"} 1.0 1556372982118 + pulsar_subscriptions_count{cluster="standalone",namespace="public/functions",topic="persistent://public/functions/assignments"} 1.0 1556372982118 + + **/ + // Running the test twice to make sure types are present when generated multiple times + @Test(invocationCount = 2) + public void testDuplicateMetricTypeDefinitions() throws Exception { + Producer p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create(); + Producer p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create(); + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + p1.send(message.getBytes()); + p2.send(message.getBytes()); + } + + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut); + String metricsStr = new String(statsOut.toByteArray()); + + Map typeDefs = new HashMap(); + Map metricNames = new HashMap(); + + Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)"); + Pattern metricNamePattern = Pattern.compile("^(\\w+)\\{.+"); + + Splitter.on("\n").split(metricsStr).forEach(line -> { + if (line.isEmpty()) { + return; + } + if (line.startsWith("#")) { + // Check for duplicate type definitions + Matcher typeMatcher = typePattern.matcher(line); + checkArgument(typeMatcher.matches()); + String metricName = typeMatcher.group(1); + String type = typeMatcher.group(2); + + // From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md + // "Only one TYPE line may exist for a given metric name." + if (!typeDefs.containsKey(metricName)) { + typeDefs.put(metricName, type); + } else { + fail("Duplicate type definition found for TYPE definition " + metricName); + System.out.println(metricsStr); + + } + // From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md + // "The TYPE line for a metric name must appear before the first sample is reported for that metric name." + if (metricNames.containsKey(metricName)) { + System.out.println(metricsStr); + fail("TYPE definition for " + metricName + " appears after first sample"); + + } + } else { + Matcher metricMatcher = metricNamePattern.matcher(line); + checkArgument(metricMatcher.matches()); + String metricName = metricMatcher.group(1); + metricNames.put(metricName, metricName); + } + }); + + // Metrics with no type definition + for (String metricName : metricNames.keySet()) { + + if (!typeDefs.containsKey(metricName)) { + // This may be OK if this is a _sum or _count metric from a summary + if(metricName.endsWith("_sum")) { + String summaryMetricName = metricName.substring(0, metricName.indexOf("_sum")); + if (!typeDefs.containsKey(summaryMetricName)) { + fail("Metric " + metricName + " does not have a corresponding summary type definition"); + } + } else if (metricName.endsWith("_count")) { + String summaryMetricName = metricName.substring(0, metricName.indexOf("_count")); + if (!typeDefs.containsKey(summaryMetricName)) { + fail("Metric " + metricName + " does not have a corresponding summary type definition"); + } + } else { + fail("Metric " + metricName + " does not have a type definition"); + } + + } + } + + p1.close(); + p2.close(); + } + + /** * Hacky parsing of Prometheus text format. Sould be good enough for unit tests */