From 12915b807f54c7726fddae5cd0c3e177950850c8 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Mon, 6 Dec 2021 17:47:28 +0800 Subject: [PATCH] Add clean item logic for topic queue mapping --- .../rocketmq/broker/BrokerController.java | 25 ++++++ .../topic/TopicQueueMappingManager.java | 90 +++++++++++++++++++ .../rocketmq/common/admin/TopicOffset.java | 9 ++ .../GetTopicStatsInfoRequestHeader.java | 3 +- .../rocketmq/common/rpc/RpcClientImpl.java | 24 ++++- .../rocketmq/common/rpc/RpcResponse.java | 2 +- 6 files changed, 150 insertions(+), 3 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 9230d955..6ca46dca 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -163,6 +163,9 @@ public class BrokerController { private final BrokerOuterAPI brokerOuterAPI; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( "BrokerControllerScheduledThread")); + //the topic queue mapping is costly, so use an independent executor + private final ScheduledExecutorService scheduledForTopicQueueMapping = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( + "BrokerControllerScheduledThread-TopicQueueMapping")); private final SlaveSynchronize slaveSynchronize; private final BlockingQueue sendThreadPoolQueue; private final BlockingQueue ackThreadPoolQueue; @@ -498,6 +501,22 @@ public class BrokerController { } }, 1, 5, TimeUnit.SECONDS); + this.scheduledForTopicQueueMapping.scheduleAtFixedRate( () -> { + try { + this.topicQueueMappingManager.cleanItemListMoreThanSecondGen(); + } catch (Throwable t) { + log.error("ScheduledTask cleanItemListMoreThanSecondGen failed", t); + } + }, 1, 5, TimeUnit.MINUTES); + + this.scheduledForTopicQueueMapping.scheduleAtFixedRate( () -> { + try { + this.topicQueueMappingManager.cleanItemExpired(); + } catch (Throwable t) { + log.error("ScheduledTask cleanItemExpired failed", t); + } + }, 1, 5, TimeUnit.MINUTES); + if (!messageStoreConfig.isEnableDLegerCommitLog()) { if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { @@ -892,6 +911,12 @@ public class BrokerController { } catch (InterruptedException e) { } + this.scheduledForTopicQueueMapping.shutdown(); + try { + this.scheduledForTopicQueueMapping.awaitTermination(5000, TimeUnit.MILLISECONDS); + } catch (Throwable ignored) { + } + this.unregisterBrokerAll(); if (this.sendMessageExecutor != null) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index c484bcf3..9be442ea 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -21,6 +21,14 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.DataVersion; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.admin.TopicOffset; +import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader; +import org.apache.rocketmq.common.rpc.RpcRequest; +import org.apache.rocketmq.common.rpc.RpcResponse; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; import org.apache.rocketmq.common.constant.LoggerName; @@ -32,10 +40,14 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.DefaultMessageStore; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -272,4 +284,82 @@ public class TopicQueueMappingManager extends ConfigManager { } } + + public void cleanItemExpired() { + String when = this.brokerController.getMessageStoreConfig().getDeleteWhen(); + if (!UtilAll.isItTimeToDo(when)) { + return; + } + boolean changed = false; + long start = System.currentTimeMillis(); + try { + for(String topic : topicQueueMappingTable.keySet()) { + TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic); + if (mappingDetail == null + || mappingDetail.getHostedQueues().isEmpty()) { + continue; + } + if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) { + log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail); + continue; + } + Set brokers = new HashSet<>(); + for (List items: mappingDetail.getHostedQueues().values()) { + if (items.size() < 2) { + continue; + } + LogicQueueMappingItem earlistItem = items.get(0); + brokers.add(earlistItem.getBname()); + } + Map statsTable = new HashMap<>(); + for (String broker: brokers) { + GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader(); + header.setTopic(topic); + header.setBname(broker); + try { + RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null); + RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); + if (rpcResponse.getException() != null) { + throw rpcResponse.getException(); + } + statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody()); + } catch (Throwable rt) { + log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt); + } + } + for (List items: mappingDetail.getHostedQueues().values()) { + if (items.size() < 2) { + continue; + } + LogicQueueMappingItem earlistItem = items.get(0); + TopicStatsTable topicStats = statsTable.get(earlistItem.getBname()); + if (topicStats == null) { + continue; + } + TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId())); + if (topicOffset == null) { + //this may should not happen + log.warn("Get null topicOffset for {}", earlistItem); + continue; + } + if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()) { + boolean result = items.remove(earlistItem); + changed = changed || result; + log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset); + } + } + UtilAll.sleep(10); + } + } catch (Throwable t) { + log.error("Try cleanItemExpired failed", t); + } finally { + if (changed) { + this.dataVersion.nextVersion(); + this.persist(); + log.info("CleanItemExpired changed"); + } + log.info("cleanItemExpired cost {} ms", System.currentTimeMillis() - start); + } + } + } diff --git a/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java b/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java index 7e667491..8b52a88a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java +++ b/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java @@ -44,4 +44,13 @@ public class TopicOffset { public void setLastUpdateTimestamp(long lastUpdateTimestamp) { this.lastUpdateTimestamp = lastUpdateTimestamp; } + + @Override + public String toString() { + return "TopicOffset{" + + "minOffset=" + minOffset + + ", maxOffset=" + maxOffset + + ", lastUpdateTimestamp=" + lastUpdateTimestamp + + '}'; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java index c4cf4dec..8e921b29 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java @@ -17,11 +17,12 @@ package org.apache.rocketmq.common.protocol.header; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class GetTopicStatsInfoRequestHeader implements CommandCustomHeader { +public class GetTopicStatsInfoRequestHeader extends RpcRequestHeader { @CFNotNull private String topic; diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java index 47ffcc2b..83f31e74 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java @@ -1,8 +1,8 @@ package org.apache.rocketmq.common.rpc; -import com.alibaba.fastjson.JSON; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; +import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; @@ -83,6 +83,9 @@ public class RpcClientImpl implements RpcClient { case RequestCode.QUERY_CONSUMER_OFFSET: rpcResponsePromise = handleQueryConsumerOffset(addr, request, timeoutMs); break; + case RequestCode.GET_TOPIC_STATS_INFO: + rpcResponsePromise = handleGetTopicStats(addr, request, timeoutMs); + break; default: throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode()); } @@ -209,6 +212,25 @@ public class RpcClientImpl implements RpcClient { return rpcResponsePromise; } + public Promise handleGetTopicStats(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception { + final Promise rpcResponsePromise = createResponseFuture(); + + RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest); + RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); + assert responseCommand != null; + switch (responseCommand.getCode()) { + case ResponseCode.SUCCESS: { + TopicStatsTable topicStatsTable = TopicStatsTable.decode(responseCommand.getBody(), TopicStatsTable.class); + rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, topicStatsTable)); + break; + } + default:{ + rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"))); + } + } + return rpcResponsePromise; + } + public Promise handleGetMinOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception { final Promise rpcResponsePromise = createResponseFuture(); diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java index 5fcde36a..2f61329c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java @@ -28,7 +28,7 @@ public class RpcResponse { } - public RpcResponse(int code, CommandCustomHeader header, byte[] body) { + public RpcResponse(int code, CommandCustomHeader header, Object body) { this.code = code; this.header = header; this.body = body; -- GitLab