未验证 提交 5873c76d 编写于 作者: H Hao Zhang 提交者: GitHub

[ISSUE #1689]add interfaces to remove unused statsItems in BrokerStatsManager. (#2029)

上级 4a9f7a88
...@@ -299,7 +299,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -299,7 +299,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic()); this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
this.brokerController.getMessageStore() this.brokerController.getMessageStore()
.cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
this.brokerController.getBrokerStatsManager().onTopicDeleted(requestHeader.getTopic());
}
response.setCode(ResponseCode.SUCCESS); response.setCode(ResponseCode.SUCCESS);
response.setRemark(null); response.setRemark(null);
return response; return response;
...@@ -715,6 +717,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -715,6 +717,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
this.brokerController.getSubscriptionGroupManager().deleteSubscriptionGroupConfig(requestHeader.getGroupName()); this.brokerController.getSubscriptionGroupManager().deleteSubscriptionGroupConfig(requestHeader.getGroupName());
if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
this.brokerController.getBrokerStatsManager().onGroupDeleted(requestHeader.getGroupName());
}
response.setCode(ResponseCode.SUCCESS); response.setCode(ResponseCode.SUCCESS);
response.setRemark(null); response.setRemark(null);
return response; return response;
......
...@@ -184,6 +184,8 @@ public class BrokerConfig { ...@@ -184,6 +184,8 @@ public class BrokerConfig {
private boolean storeReplyMessageEnable = true; private boolean storeReplyMessageEnable = true;
private boolean autoDeleteUnusedStats = false;
public static String localHostName() { public static String localHostName() {
try { try {
return InetAddress.getLocalHost().getHostName(); return InetAddress.getLocalHost().getHostName();
...@@ -793,4 +795,12 @@ public class BrokerConfig { ...@@ -793,4 +795,12 @@ public class BrokerConfig {
public void setStoreReplyMessageEnable(boolean storeReplyMessageEnable) { public void setStoreReplyMessageEnable(boolean storeReplyMessageEnable) {
this.storeReplyMessageEnable = storeReplyMessageEnable; this.storeReplyMessageEnable = storeReplyMessageEnable;
} }
public boolean isAutoDeleteUnusedStats() {
return autoDeleteUnusedStats;
}
public void setAutoDeleteUnusedStats(boolean autoDeleteUnusedStats) {
this.autoDeleteUnusedStats = autoDeleteUnusedStats;
}
} }
...@@ -74,6 +74,26 @@ public class MomentStatsItemSet { ...@@ -74,6 +74,26 @@ public class MomentStatsItemSet {
statsItem.getValue().set(value); statsItem.getValue().set(value);
} }
public void delValueByInfixKey(final String statsKey, String separator) {
Iterator<Entry<String, MomentStatsItem>> it = this.statsItemTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MomentStatsItem> next = it.next();
if (next.getKey().contains(separator + statsKey + separator)) {
it.remove();
}
}
}
public void delValueBySuffixKey(final String statsKey, String separator) {
Iterator<Entry<String, MomentStatsItem>> it = this.statsItemTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MomentStatsItem> next = it.next();
if (next.getKey().endsWith(separator + statsKey)) {
it.remove();
}
}
}
public MomentStatsItem getAndCreateStatsItem(final String statsKey) { public MomentStatsItem getAndCreateStatsItem(final String statsKey) {
MomentStatsItem statsItem = this.statsItemTable.get(statsKey); MomentStatsItem statsItem = this.statsItemTable.get(statsKey);
if (null == statsItem) { if (null == statsItem) {
......
...@@ -158,6 +158,43 @@ public class StatsItemSet { ...@@ -158,6 +158,43 @@ public class StatsItemSet {
statsItem.getTimes().addAndGet(incTimes); statsItem.getTimes().addAndGet(incTimes);
} }
public void delValue(final String statsKey) {
StatsItem statsItem = this.statsItemTable.get(statsKey);
if (null != statsItem) {
this.statsItemTable.remove(statsKey);
}
}
public void delValueByPrefixKey(final String statsKey, String separator) {
Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, StatsItem> next = it.next();
if (next.getKey().startsWith(statsKey + separator)) {
it.remove();
}
}
}
public void delValueByInfixKey(final String statsKey, String separator) {
Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, StatsItem> next = it.next();
if (next.getKey().contains(separator + statsKey + separator)) {
it.remove();
}
}
}
public void delValueBySuffixKey(final String statsKey, String separator) {
Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, StatsItem> next = it.next();
if (next.getKey().endsWith(separator + statsKey)) {
it.remove();
}
}
}
public StatsItem getAndCreateStatsItem(final String statsKey) { public StatsItem getAndCreateStatsItem(final String statsKey) {
StatsItem statsItem = this.statsItemTable.get(statsKey); StatsItem statsItem = this.statsItemTable.get(statsKey);
if (null == statsItem) { if (null == statsItem) {
......
...@@ -37,11 +37,13 @@ public class BrokerConfigTest { ...@@ -37,11 +37,13 @@ public class BrokerConfigTest {
brokerConfig.setBrokerId(0); brokerConfig.setBrokerId(0);
brokerConfig.setBrokerClusterName("DefaultCluster"); brokerConfig.setBrokerClusterName("DefaultCluster");
brokerConfig.setMsgTraceTopicName("RMQ_SYS_TRACE_TOPIC4"); brokerConfig.setMsgTraceTopicName("RMQ_SYS_TRACE_TOPIC4");
brokerConfig.setAutoDeleteUnusedStats(true);
assertThat(brokerConfig.getBrokerClusterName()).isEqualTo("DefaultCluster"); assertThat(brokerConfig.getBrokerClusterName()).isEqualTo("DefaultCluster");
assertThat(brokerConfig.getNamesrvAddr()).isEqualTo("127.0.0.1:9876"); assertThat(brokerConfig.getNamesrvAddr()).isEqualTo("127.0.0.1:9876");
assertThat(brokerConfig.getMsgTraceTopicName()).isEqualTo("RMQ_SYS_TRACE_TOPIC4"); assertThat(brokerConfig.getMsgTraceTopicName()).isEqualTo("RMQ_SYS_TRACE_TOPIC4");
assertThat(brokerConfig.getBrokerId()).isEqualTo(0); assertThat(brokerConfig.getBrokerId()).isEqualTo(0);
assertThat(brokerConfig.getBrokerName()).isEqualTo("broker-a"); assertThat(brokerConfig.getBrokerName()).isEqualTo("broker-a");
assertThat(brokerConfig.isAutoCreateTopicEnable()).isEqualTo(false); assertThat(brokerConfig.isAutoCreateTopicEnable()).isEqualTo(false);
assertThat(brokerConfig.isAutoDeleteUnusedStats()).isEqualTo(true);
} }
} }
\ No newline at end of file
...@@ -1029,6 +1029,10 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1029,6 +1029,10 @@ public class DefaultMessageStore implements MessageStore {
} }
it.remove(); it.remove();
if (this.brokerConfig.isAutoDeleteUnusedStats()) {
this.brokerStatsManager.onTopicDeleted(topic);
}
log.info("cleanUnusedTopic: {},topic destroyed", topic); log.info("cleanUnusedTopic: {},topic destroyed", topic);
} }
} }
......
...@@ -121,6 +121,26 @@ public class BrokerStatsManager { ...@@ -121,6 +121,26 @@ public class BrokerStatsManager {
return null; return null;
} }
public void onTopicDeleted(final String topic) {
this.statsTable.get(TOPIC_PUT_NUMS).delValue(topic);
this.statsTable.get(TOPIC_PUT_SIZE).delValue(topic);
this.statsTable.get(GROUP_GET_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(GROUP_GET_SIZE).delValueByPrefixKey(topic, "@");
this.statsTable.get(SNDBCK_PUT_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(GROUP_GET_LATENCY).delValueByInfixKey(topic, "@");
this.momentStatsItemSetFallSize.delValueByInfixKey(topic, "@");
this.momentStatsItemSetFallTime.delValueByInfixKey(topic, "@");
}
public void onGroupDeleted(final String group) {
this.statsTable.get(GROUP_GET_NUMS).delValueBySuffixKey(group, "@");
this.statsTable.get(GROUP_GET_SIZE).delValueBySuffixKey(group, "@");
this.statsTable.get(SNDBCK_PUT_NUMS).delValueBySuffixKey(group, "@");
this.statsTable.get(GROUP_GET_LATENCY).delValueBySuffixKey(group, "@");
this.momentStatsItemSetFallSize.delValueBySuffixKey(group, "@");
this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@");
}
public void incTopicPutNums(final String topic) { public void incTopicPutNums(final String topic) {
this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1); this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1);
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package stats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.BROKER_PUT_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_FALL_SIZE;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_FALL_TIME;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_LATENCY;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_SIZE;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.SNDBCK_PUT_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_SIZE;
import static org.assertj.core.api.Assertions.assertThat;
public class BrokerStatsManagerTest {
private BrokerStatsManager brokerStatsManager;
private String TOPIC = "TOPIC_TEST";
private String GROUP_NAME = "GROUP_TEST";
@Before
public void init() {
brokerStatsManager = new BrokerStatsManager("DefaultCluster");
brokerStatsManager.start();
}
@After
public void destory() {
brokerStatsManager.shutdown();
}
@Test
public void testGetStatsItem() {
assertThat(brokerStatsManager.getStatsItem("TEST", "TEST")).isNull();
}
@Test
public void testIncTopicPutNums() {
brokerStatsManager.incTopicPutNums(TOPIC);
assertThat(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC).getTimes().doubleValue()).isEqualTo(1L);
brokerStatsManager.incTopicPutNums(TOPIC, 2, 2);
assertThat(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC).getValue().doubleValue()).isEqualTo(3L);
}
@Test
public void testIncTopicPutSize() {
brokerStatsManager.incTopicPutSize(TOPIC, 2);
assertThat(brokerStatsManager.getStatsItem(TOPIC_PUT_SIZE, TOPIC).getValue().doubleValue()).isEqualTo(2L);
}
@Test
public void testIncGroupGetNums() {
brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
String statsKey = brokerStatsManager.buildStatsKey(TOPIC, GROUP_NAME);
assertThat(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, statsKey).getValue().doubleValue()).isEqualTo(1L);
}
@Test
public void testIncGroupGetSize() {
brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 1);
String statsKey = brokerStatsManager.buildStatsKey(TOPIC, GROUP_NAME);
assertThat(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, statsKey).getValue().doubleValue()).isEqualTo(1L);
}
@Test
public void testIncGroupGetLatency() {
brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
String statsKey = String.format("%d@%s@%s", 1, TOPIC, GROUP_NAME);
assertThat(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, statsKey).getValue().doubleValue()).isEqualTo(1L);
}
@Test
public void testIncBrokerPutNums() {
brokerStatsManager.incBrokerPutNums();
assertThat(brokerStatsManager.getStatsItem(BROKER_PUT_NUMS, "DefaultCluster").getValue().doubleValue()).isEqualTo(1L);
}
@Test
public void testOnTopicDeleted() {
brokerStatsManager.incTopicPutNums(TOPIC);
brokerStatsManager.incTopicPutSize(TOPIC, 100);
brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100);
brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC);
brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L);
brokerStatsManager.recordDiskFallBehindSize(GROUP_NAME, TOPIC, 1, 11L);
brokerStatsManager.onTopicDeleted(TOPIC);
Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC));
Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_SIZE, TOPIC));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(SNDBCK_PUT_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME, "1@" + TOPIC + "@" + GROUP_NAME));
}
@Test
public void testOnGroupDeleted(){
brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100);
brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC);
brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L);
brokerStatsManager.recordDiskFallBehindSize(GROUP_NAME, TOPIC, 1, 11L);
brokerStatsManager.onGroupDeleted(GROUP_NAME);
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(SNDBCK_PUT_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME, "1@" + TOPIC + "@" + GROUP_NAME));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册