提交 527382e4 编写于 作者: D dongeforever

Clean the items more than second gen

上级 d677d85c
......@@ -353,13 +353,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
}
this.brokerController.getTopicConfigManager().deleteTopicConfig(topic);
this.brokerController.getTopicQueueMappingManager().delete(topic);
this.brokerController.getMessageStore()
.cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
this.brokerController.getBrokerStatsManager().onTopicDeleted(requestHeader.getTopic());
}
//TODO delete the topic route
//this.brokerController.getTopicQueueMappingManager()
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
......
......@@ -33,7 +33,9 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
......@@ -61,6 +63,7 @@ public class TopicQueueMappingManager extends ConfigManager {
public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) throws Exception {
boolean locked = false;
boolean updated = false;
TopicQueueMappingDetail oldDetail = null;
try {
if (lock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
locked = true;
......@@ -74,7 +77,7 @@ public class TopicQueueMappingManager extends ConfigManager {
TopicQueueMappingUtils.checkLogicQueueMappingItemOffset(items);
});
TopicQueueMappingDetail oldDetail = topicQueueMappingTable.get(newDetail.getTopic());
oldDetail = topicQueueMappingTable.get(newDetail.getTopic());
if (oldDetail == null) {
topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
updated = true;
......@@ -115,11 +118,23 @@ public class TopicQueueMappingManager extends ConfigManager {
}
if (updated) {
this.persist();
log.info("Update topic queue mapping from [{}] to [{}], force {}", oldDetail, newDetail, force);
}
}
}
public void delete(final String topic) {
TopicQueueMappingDetail old = this.topicQueueMappingTable.remove(topic);
if (old != null) {
log.info("delete topic queue mapping OK, topic queue mapping: {}", old);
this.dataVersion.nextVersion();
this.persist();
} else {
log.warn("delete topic queue mapping failed, topic: {} not exists", topic);
}
}
public TopicQueueMappingDetail getTopicQueueMapping(String topic) {
return topicQueueMappingTable.get(topic);
}
......@@ -177,6 +192,9 @@ public class TopicQueueMappingManager extends ConfigManager {
//it is not static topic
return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null);
}
assert mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName());
//If not find mappingItem, it encounters some errors
Integer globalId = requestHeader.getQueueId();
if (globalId < 0 && !selectOneWhenMiss) {
......@@ -224,4 +242,34 @@ public class TopicQueueMappingManager extends ConfigManager {
}
public void cleanItemListMoreThanSecondGen() {
for(String topic : topicQueueMappingTable.keySet()) {
TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
if (mappingDetail == null
|| mappingDetail.getHostedQueues().isEmpty()) {
continue;
}
if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
continue;
}
Iterator<Map.Entry<Integer, List<LogicQueueMappingItem>>> it = mappingDetail.getHostedQueues().entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Integer, List<LogicQueueMappingItem>> entry = it.next();
Integer queueId = entry.getKey();
List<LogicQueueMappingItem> items = entry.getValue();
if (items.size() <= 2) {
continue;
}
LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
LogicQueueMappingItem secLeaderItem = items.get(items.size() - 2);
if (!leaderItem.getBname().equals(mappingDetail.getBname())
&& !secLeaderItem.getBname().equals(mappingDetail.getBname())) {
it.remove();
log.info("The topic queue {} {} is expired with items {}", mappingDetail.getTopic(), queueId, items);
}
}
}
}
}
......@@ -128,4 +128,17 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
.append(hostedQueues)
.toHashCode();
}
@Override
public String toString() {
return "TopicQueueMappingDetail{" +
"hostedQueues=" + hostedQueues +
", topic='" + topic + '\'' +
", totalQueues=" + totalQueues +
", bname='" + bname + '\'' +
", epoch=" + epoch +
", dirty=" + dirty +
", currIdMap=" + currIdMap +
'}';
}
}
......@@ -124,4 +124,16 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
.append(currIdMap)
.toHashCode();
}
@Override
public String toString() {
return "TopicQueueMappingInfo{" +
"topic='" + topic + '\'' +
", totalQueues=" + totalQueues +
", bname='" + bname + '\'' +
", epoch=" + epoch +
", dirty=" + dirty +
", currIdMap=" + currIdMap +
'}';
}
}
......@@ -169,6 +169,7 @@ public class RouteInfoManager {
topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<String, TopicQueueMappingInfo>());
}
//Note asset brokerName equal entry.getValue().getBname()
//here use the mappingDetail.bname
topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册