diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index fb7aeceda9b1682e1562230d398309470db17064..eb811832058e8343df8aafcbc0e78a96a3106618 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -299,7 +299,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic()); this.brokerController.getMessageStore() .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); - + if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) { + this.brokerController.getBrokerStatsManager().onTopicDeleted(requestHeader.getTopic()); + } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; @@ -715,6 +717,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements this.brokerController.getSubscriptionGroupManager().deleteSubscriptionGroupConfig(requestHeader.getGroupName()); + if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) { + this.brokerController.getBrokerStatsManager().onGroupDeleted(requestHeader.getGroupName()); + } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index a7568f0a2079a9f19ee666b362fd21f5a6db3c3f..bfe8a2104db0b5a1781f066791f5b61f9fa19d12 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -184,6 +184,8 @@ public class BrokerConfig { private boolean storeReplyMessageEnable = true; + private boolean autoDeleteUnusedStats = false; + public static String localHostName() { try { return InetAddress.getLocalHost().getHostName(); @@ -793,4 +795,12 @@ public class BrokerConfig { public void setStoreReplyMessageEnable(boolean storeReplyMessageEnable) { this.storeReplyMessageEnable = storeReplyMessageEnable; } + + public boolean isAutoDeleteUnusedStats() { + return autoDeleteUnusedStats; + } + + public void setAutoDeleteUnusedStats(boolean autoDeleteUnusedStats) { + this.autoDeleteUnusedStats = autoDeleteUnusedStats; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java index 7ff26cd3b8317e370379b9e77636ff0a3e32aa49..4d2ce0cfcdc41c2c167ebcfba6d161adc66c1ff0 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java @@ -74,6 +74,26 @@ public class MomentStatsItemSet { statsItem.getValue().set(value); } + public void delValueByInfixKey(final String statsKey, String separator) { + Iterator> it = this.statsItemTable.entrySet().iterator(); + while (it.hasNext()) { + Entry next = it.next(); + if (next.getKey().contains(separator + statsKey + separator)) { + it.remove(); + } + } + } + + public void delValueBySuffixKey(final String statsKey, String separator) { + Iterator> it = this.statsItemTable.entrySet().iterator(); + while (it.hasNext()) { + Entry next = it.next(); + if (next.getKey().endsWith(separator + statsKey)) { + it.remove(); + } + } + } + public MomentStatsItem getAndCreateStatsItem(final String statsKey) { MomentStatsItem statsItem = this.statsItemTable.get(statsKey); if (null == statsItem) { diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java index 1c2de330bccb303a1eb7447d208bb9f5db2f8c7c..bcf9665959e0830ccc07ba4efe462f6765946455 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java @@ -158,6 +158,43 @@ public class StatsItemSet { statsItem.getTimes().addAndGet(incTimes); } + public void delValue(final String statsKey) { + StatsItem statsItem = this.statsItemTable.get(statsKey); + if (null != statsItem) { + this.statsItemTable.remove(statsKey); + } + } + + public void delValueByPrefixKey(final String statsKey, String separator) { + Iterator> it = this.statsItemTable.entrySet().iterator(); + while (it.hasNext()) { + Entry next = it.next(); + if (next.getKey().startsWith(statsKey + separator)) { + it.remove(); + } + } + } + + public void delValueByInfixKey(final String statsKey, String separator) { + Iterator> it = this.statsItemTable.entrySet().iterator(); + while (it.hasNext()) { + Entry next = it.next(); + if (next.getKey().contains(separator + statsKey + separator)) { + it.remove(); + } + } + } + + public void delValueBySuffixKey(final String statsKey, String separator) { + Iterator> it = this.statsItemTable.entrySet().iterator(); + while (it.hasNext()) { + Entry next = it.next(); + if (next.getKey().endsWith(separator + statsKey)) { + it.remove(); + } + } + } + public StatsItem getAndCreateStatsItem(final String statsKey) { StatsItem statsItem = this.statsItemTable.get(statsKey); if (null == statsItem) { diff --git a/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java index aa8bcfa7252657954fd31119baee23ab713ed531..07e132ff8155f6c47154b6a8e511a7e5ed8f98f3 100644 --- a/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java @@ -37,11 +37,13 @@ public class BrokerConfigTest { brokerConfig.setBrokerId(0); brokerConfig.setBrokerClusterName("DefaultCluster"); brokerConfig.setMsgTraceTopicName("RMQ_SYS_TRACE_TOPIC4"); + brokerConfig.setAutoDeleteUnusedStats(true); assertThat(brokerConfig.getBrokerClusterName()).isEqualTo("DefaultCluster"); assertThat(brokerConfig.getNamesrvAddr()).isEqualTo("127.0.0.1:9876"); assertThat(brokerConfig.getMsgTraceTopicName()).isEqualTo("RMQ_SYS_TRACE_TOPIC4"); assertThat(brokerConfig.getBrokerId()).isEqualTo(0); assertThat(brokerConfig.getBrokerName()).isEqualTo("broker-a"); assertThat(brokerConfig.isAutoCreateTopicEnable()).isEqualTo(false); + assertThat(brokerConfig.isAutoDeleteUnusedStats()).isEqualTo(true); } } \ No newline at end of file diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index f59b5eb98ba59c4f026155265a53e001a08f6b14..14d24d66f3735bb9fe839bf6635256858849cc33 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -1029,6 +1029,10 @@ public class DefaultMessageStore implements MessageStore { } it.remove(); + if (this.brokerConfig.isAutoDeleteUnusedStats()) { + this.brokerStatsManager.onTopicDeleted(topic); + } + log.info("cleanUnusedTopic: {},topic destroyed", topic); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index 4adbed76abc7513afd3e4428872d5d3d490f23a9..e151844706601bad8a19a1e8784bc95ffa0a314e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -121,6 +121,26 @@ public class BrokerStatsManager { return null; } + public void onTopicDeleted(final String topic) { + this.statsTable.get(TOPIC_PUT_NUMS).delValue(topic); + this.statsTable.get(TOPIC_PUT_SIZE).delValue(topic); + this.statsTable.get(GROUP_GET_NUMS).delValueByPrefixKey(topic, "@"); + this.statsTable.get(GROUP_GET_SIZE).delValueByPrefixKey(topic, "@"); + this.statsTable.get(SNDBCK_PUT_NUMS).delValueByPrefixKey(topic, "@"); + this.statsTable.get(GROUP_GET_LATENCY).delValueByInfixKey(topic, "@"); + this.momentStatsItemSetFallSize.delValueByInfixKey(topic, "@"); + this.momentStatsItemSetFallTime.delValueByInfixKey(topic, "@"); + } + + public void onGroupDeleted(final String group) { + this.statsTable.get(GROUP_GET_NUMS).delValueBySuffixKey(group, "@"); + this.statsTable.get(GROUP_GET_SIZE).delValueBySuffixKey(group, "@"); + this.statsTable.get(SNDBCK_PUT_NUMS).delValueBySuffixKey(group, "@"); + this.statsTable.get(GROUP_GET_LATENCY).delValueBySuffixKey(group, "@"); + this.momentStatsItemSetFallSize.delValueBySuffixKey(group, "@"); + this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@"); + } + public void incTopicPutNums(final String topic) { this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1); } diff --git a/store/src/test/java/stats/BrokerStatsManagerTest.java b/store/src/test/java/stats/BrokerStatsManagerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..17020729a63f89b80656f8d92f7bcdaf7cc12fc9 --- /dev/null +++ b/store/src/test/java/stats/BrokerStatsManagerTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stats; + +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.rocketmq.store.stats.BrokerStatsManager.BROKER_PUT_NUMS; +import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_FALL_SIZE; +import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_FALL_TIME; +import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_LATENCY; +import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_NUMS; +import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_SIZE; +import static org.apache.rocketmq.store.stats.BrokerStatsManager.SNDBCK_PUT_NUMS; +import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_NUMS; +import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_SIZE; +import static org.assertj.core.api.Assertions.assertThat; + +public class BrokerStatsManagerTest { + private BrokerStatsManager brokerStatsManager; + + private String TOPIC = "TOPIC_TEST"; + private String GROUP_NAME = "GROUP_TEST"; + + @Before + public void init() { + brokerStatsManager = new BrokerStatsManager("DefaultCluster"); + brokerStatsManager.start(); + } + + @After + public void destory() { + brokerStatsManager.shutdown(); + } + + @Test + public void testGetStatsItem() { + assertThat(brokerStatsManager.getStatsItem("TEST", "TEST")).isNull(); + } + + @Test + public void testIncTopicPutNums() { + brokerStatsManager.incTopicPutNums(TOPIC); + assertThat(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC).getTimes().doubleValue()).isEqualTo(1L); + brokerStatsManager.incTopicPutNums(TOPIC, 2, 2); + assertThat(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC).getValue().doubleValue()).isEqualTo(3L); + } + + @Test + public void testIncTopicPutSize() { + brokerStatsManager.incTopicPutSize(TOPIC, 2); + assertThat(brokerStatsManager.getStatsItem(TOPIC_PUT_SIZE, TOPIC).getValue().doubleValue()).isEqualTo(2L); + } + + @Test + public void testIncGroupGetNums() { + brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1); + String statsKey = brokerStatsManager.buildStatsKey(TOPIC, GROUP_NAME); + assertThat(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, statsKey).getValue().doubleValue()).isEqualTo(1L); + } + + @Test + public void testIncGroupGetSize() { + brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 1); + String statsKey = brokerStatsManager.buildStatsKey(TOPIC, GROUP_NAME); + assertThat(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, statsKey).getValue().doubleValue()).isEqualTo(1L); + } + + @Test + public void testIncGroupGetLatency() { + brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1); + String statsKey = String.format("%d@%s@%s", 1, TOPIC, GROUP_NAME); + assertThat(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, statsKey).getValue().doubleValue()).isEqualTo(1L); + } + + @Test + public void testIncBrokerPutNums() { + brokerStatsManager.incBrokerPutNums(); + assertThat(brokerStatsManager.getStatsItem(BROKER_PUT_NUMS, "DefaultCluster").getValue().doubleValue()).isEqualTo(1L); + } + + @Test + public void testOnTopicDeleted() { + brokerStatsManager.incTopicPutNums(TOPIC); + brokerStatsManager.incTopicPutSize(TOPIC, 100); + brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1); + brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100); + brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC); + brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1); + brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L); + brokerStatsManager.recordDiskFallBehindSize(GROUP_NAME, TOPIC, 1, 11L); + + brokerStatsManager.onTopicDeleted(TOPIC); + + Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC)); + Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_SIZE, TOPIC)); + Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, TOPIC + "@" + GROUP_NAME)); + Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, TOPIC + "@" + GROUP_NAME)); + Assert.assertNull(brokerStatsManager.getStatsItem(SNDBCK_PUT_NUMS, TOPIC + "@" + GROUP_NAME)); + Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME)); + Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME)); + Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME, "1@" + TOPIC + "@" + GROUP_NAME)); + } + + @Test + public void testOnGroupDeleted(){ + brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1); + brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100); + brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC); + brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1); + brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L); + brokerStatsManager.recordDiskFallBehindSize(GROUP_NAME, TOPIC, 1, 11L); + + brokerStatsManager.onGroupDeleted(GROUP_NAME); + + Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, TOPIC + "@" + GROUP_NAME)); + Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, TOPIC + "@" + GROUP_NAME)); + Assert.assertNull(brokerStatsManager.getStatsItem(SNDBCK_PUT_NUMS, TOPIC + "@" + GROUP_NAME)); + Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME)); + Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME)); + Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME, "1@" + TOPIC + "@" + GROUP_NAME)); + } +}