From 1fa4508e0a744a4a5cf040740b4d4cf367db9ee1 Mon Sep 17 00:00:00 2001 From: Rajan Date: Fri, 10 Mar 2017 11:19:45 -0800 Subject: [PATCH] reduce collection instances while generating metrics (#284) reduce collection instances while generating metrics --- .../pulsar/broker/stats/NamespaceStats.java | 4 - .../broker/stats/metrics/AbstractMetrics.java | 14 +-- .../metrics/ManagedLedgerCacheMetrics.java | 8 +- .../stats/metrics/ManagedLedgerMetrics.java | 72 ++++++++------- .../stats/ManagedLedgerMetricsTest.java | 88 +++++++++++++++++++ 5 files changed, 137 insertions(+), 49 deletions(-) create mode 100644 pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/NamespaceStats.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/NamespaceStats.java index b9b34f1ae0f..d80d2df1caa 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/NamespaceStats.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/NamespaceStats.java @@ -15,13 +15,9 @@ */ package com.yahoo.pulsar.broker.stats; -import java.util.List; import java.util.Map; -import java.util.Map.Entry; import com.google.common.collect.Maps; -import com.yahoo.pulsar.common.policies.data.PersistentSubscriptionStats; -import com.yahoo.pulsar.common.policies.data.ReplicatorStats; public class NamespaceStats { diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/metrics/AbstractMetrics.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/metrics/AbstractMetrics.java index 397a9e9b0c2..546c53e05a6 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/metrics/AbstractMetrics.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/metrics/AbstractMetrics.java @@ -168,7 +168,7 @@ abstract class AbstractMetrics { return createMetrics(dimensionMap); } - protected void populateBucketEntries(Map> map, String mkey, double[] boundaries, + protected void populateBucketEntries(Map map, String mkey, double[] boundaries, long[] bucketValues) { // bucket values should be one more that the boundaries to have the last element as OVERFLOW @@ -191,11 +191,8 @@ abstract class AbstractMetrics { value = (bucketValues == null) ? 0.0D : (double) bucketValues[i]; - if (!map.containsKey(bucketKey)) { - map.put(bucketKey, Lists.newArrayList(value)); - } else { - map.get(bucketKey).add(value); - } + Double val = map.getOrDefault(bucketKey, 0.0); + map.put(bucketKey, val + value); } } @@ -207,6 +204,11 @@ abstract class AbstractMetrics { } } + protected void populateAggregationMapWithSum(Map map, String mkey, double value) { + Double val = map.getOrDefault(mkey, 0.0); + map.put(mkey, val + value); + } + protected void populateMaxMap(Map map, String mkey, long value) { Long existingValue = map.get(mkey); if (existingValue == null || value > existingValue) { diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java index d3aeacc85ba..cf3c00a0a90 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java @@ -31,12 +31,14 @@ import io.netty.buffer.PooledByteBufAllocator; public class ManagedLedgerCacheMetrics extends AbstractMetrics { + private List metrics; public ManagedLedgerCacheMetrics(PulsarService pulsar) { super(pulsar); + this.metrics = Lists.newArrayList(); } @Override - public List generate() { + public synchronized List generate() { // get the ML cache stats bean @@ -87,7 +89,9 @@ public class ManagedLedgerCacheMetrics extends AbstractMetrics { m.put("brk_ml_cache_pool_active_allocations_normal", activeAllocationsNormal); m.put("brk_ml_cache_pool_active_allocations_huge", activeAllocationsHuge); - return Lists.newArrayList(m); + metrics.clear(); + metrics.add(m); + return metrics; } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java index 2af764dc24d..f8808e94304 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java @@ -29,87 +29,87 @@ import com.yahoo.pulsar.broker.stats.Metrics; public class ManagedLedgerMetrics extends AbstractMetrics { + private List metricsCollection; + private Map> ledgersByDimensionMap; + // temp map to prepare aggregation metrics + private Map tempAggregatedMetricsMap; + public ManagedLedgerMetrics(PulsarService pulsar) { super(pulsar); + this.metricsCollection = Lists.newArrayList(); + this.ledgersByDimensionMap = Maps.newHashMap(); + this.tempAggregatedMetricsMap = Maps.newHashMap(); } @Override - public List generate() { + public synchronized List generate() { // get the current snapshot of ledgers by NS dimension - - Map> ledgersByDimension = groupLedgersByDimension(); - - return aggregate(ledgersByDimension); + return aggregate(groupLedgersByDimension()); } /** - * Aggregation by namespace + * Aggregation by namespace (not thread-safe) * * @param ledgersByDimension * @return */ private List aggregate(Map> ledgersByDimension) { - List metricsCollection = Lists.newArrayList(); - + metricsCollection.clear(); + for (Entry> e : ledgersByDimension.entrySet()) { Metrics metrics = e.getKey(); List ledgers = e.getValue(); // prepare aggregation map - - Map> aggregatedMetricsMap = Maps.newHashMap(); + tempAggregatedMetricsMap.clear(); // generate the collections by each metrics and then apply the aggregation for (ManagedLedgerImpl ledger : ledgers) { ManagedLedgerMXBean lStats = ledger.getStats(); - populateAggregationMap(aggregatedMetricsMap, "brk_ml_AddEntryBytesRate", lStats.getAddEntryBytesRate()); - populateAggregationMap(aggregatedMetricsMap, "brk_ml_AddEntryErrors", + populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_AddEntryBytesRate", lStats.getAddEntryBytesRate()); + populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_AddEntryErrors", (double) lStats.getAddEntryErrors()); - populateAggregationMap(aggregatedMetricsMap, "brk_ml_AddEntryMessagesRate", + populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_AddEntryMessagesRate", lStats.getAddEntryMessagesRate()); - populateAggregationMap(aggregatedMetricsMap, "brk_ml_AddEntrySucceed", + populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_AddEntrySucceed", (double) lStats.getAddEntrySucceed()); - populateAggregationMap(aggregatedMetricsMap, "brk_ml_NumberOfMessagesInBacklog", + populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_NumberOfMessagesInBacklog", (double) lStats.getNumberOfMessagesInBacklog()); - populateAggregationMap(aggregatedMetricsMap, "brk_ml_ReadEntriesBytesRate", + populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_ReadEntriesBytesRate", lStats.getReadEntriesBytesRate()); - populateAggregationMap(aggregatedMetricsMap, "brk_ml_ReadEntriesErrors", + populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_ReadEntriesErrors", (double) lStats.getReadEntriesErrors()); - populateAggregationMap(aggregatedMetricsMap, "brk_ml_ReadEntriesRate", lStats.getReadEntriesRate()); - populateAggregationMap(aggregatedMetricsMap, "brk_ml_ReadEntriesSucceeded", + populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_ReadEntriesRate", lStats.getReadEntriesRate()); + populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_ReadEntriesSucceeded", (double) lStats.getReadEntriesSucceeded()); - populateAggregationMap(aggregatedMetricsMap, "brk_ml_StoredMessagesSize", + populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_StoredMessagesSize", (double) lStats.getStoredMessagesSize()); // handle bucket entries initialization here - populateBucketEntries(aggregatedMetricsMap, "brk_ml_AddEntryLatencyBuckets", ENTRY_LATENCY_BUCKETS_MS, - lStats.getAddEntryLatencyBuckets()); + populateBucketEntries(tempAggregatedMetricsMap, "brk_ml_AddEntryLatencyBuckets", + ENTRY_LATENCY_BUCKETS_MS, lStats.getAddEntryLatencyBuckets()); - populateBucketEntries(aggregatedMetricsMap, "brk_ml_LedgerSwitchLatencyBuckets", + populateBucketEntries(tempAggregatedMetricsMap, "brk_ml_LedgerSwitchLatencyBuckets", ENTRY_LATENCY_BUCKETS_MS, lStats.getLedgerSwitchLatencyBuckets()); - populateBucketEntries(aggregatedMetricsMap, "brk_ml_EntrySizeBuckets", ENTRY_SIZE_BUCKETS_BYTES, + populateBucketEntries(tempAggregatedMetricsMap, "brk_ml_EntrySizeBuckets", ENTRY_SIZE_BUCKETS_BYTES, lStats.getEntrySizeBuckets()); - populateAggregationMap(aggregatedMetricsMap, "brk_ml_MarkDeleteRate", lStats.getMarkDeleteRate()); + populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_MarkDeleteRate", lStats.getMarkDeleteRate()); } // SUM up collections of each metrics - for (Entry> ma : aggregatedMetricsMap.entrySet()) { + for (Entry ma : tempAggregatedMetricsMap.entrySet()) { - // sum - String metricsName = ma.getKey(); - Double metricsValue = sum(ma.getValue()); - - metrics.put(metricsName, metricsValue); + metrics.put(ma.getKey(), ma.getValue()); } metricsCollection.add(metrics); @@ -119,21 +119,19 @@ public class ManagedLedgerMetrics extends AbstractMetrics { } /** - * Build a map of dimensions key to list of destination stats + * Build a map of dimensions key to list of destination stats (not thread-safe) *

* * @return */ private Map> groupLedgersByDimension() { - Map> ledgersByDimensionMap = Maps.newHashMap(); - + ledgersByDimensionMap.clear(); + // get the current destinations statistics from StatsBrokerFilter // Map : destination-name->dest-stat - Map ledgersMap = getManagedLedgers(); - - for (Entry e : ledgersMap.entrySet()) { + for (Entry e : getManagedLedgers().entrySet()) { String ledgerName = e.getKey(); ManagedLedgerImpl ledger = e.getValue(); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java new file mode 100644 index 00000000000..ba7d39be3b7 --- /dev/null +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java @@ -0,0 +1,88 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.broker.stats; + +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.yahoo.pulsar.broker.service.BrokerTestBase; +import com.yahoo.pulsar.broker.stats.metrics.ManagedLedgerMetrics; +import com.yahoo.pulsar.client.api.Producer; + +import junit.framework.Assert; + +/** + */ +public class ManagedLedgerMetricsTest extends BrokerTestBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.baseSetup(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testManagedLedgerMetrics() throws Exception { + ManagedLedgerMetrics metrics = new ManagedLedgerMetrics(pulsar); + + final String addEntryRateKey = "brk_ml_AddEntryMessagesRate"; + List list1 = metrics.generate(); + Assert.assertTrue(list1.isEmpty()); + + Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1"); + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + for (Entry ledger : ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory()) + .getManagedLedgers().entrySet()) { + ManagedLedgerMBeanImpl stats = (ManagedLedgerMBeanImpl) ledger.getValue().getStats(); + stats.refreshStats(1, TimeUnit.SECONDS); + } + + List list2 = metrics.generate(); + Assert.assertEquals(list2.get(0).getMetrics().get(addEntryRateKey), 10.0D); + + for (int i = 0; i < 5; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + for (Entry ledger : ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory()) + .getManagedLedgers().entrySet()) { + ManagedLedgerMBeanImpl stats = (ManagedLedgerMBeanImpl) ledger.getValue().getStats(); + stats.refreshStats(1, TimeUnit.SECONDS); + } + List list3 = metrics.generate(); + Assert.assertEquals(list3.get(0).getMetrics().get(addEntryRateKey), 5.0D); + + } + +} -- GitLab