diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index 1c11fde163d22df8b3643ae960ed6c3c7ff35944..c442040cef1e5d28866243f74f5bef2ae433b4e7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -26,10 +26,12 @@ import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader; import org.apache.rocketmq.common.rpc.RpcRequest; import org.apache.rocketmq.common.rpc.RpcResponse; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; +import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.ResponseCode; @@ -258,45 +260,13 @@ public class TopicQueueMappingManager extends ConfigManager { public void cleanItemListMoreThanSecondGen() { - for(String topic : topicQueueMappingTable.keySet()) { - TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic); - if (mappingDetail == null - || mappingDetail.getHostedQueues().isEmpty()) { - continue; - } - if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) { - log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail); - continue; - } - Iterator>> it = mappingDetail.getHostedQueues().entrySet().iterator(); - while (it.hasNext()) { - Map.Entry> entry = it.next(); - Integer queueId = entry.getKey(); - List items = entry.getValue(); - if (items.size() <= 2) { - continue; - } - LogicQueueMappingItem leaderItem = items.get(items.size() - 1); - LogicQueueMappingItem secLeaderItem = items.get(items.size() - 2); - if (!leaderItem.getBname().equals(mappingDetail.getBname()) - && !secLeaderItem.getBname().equals(mappingDetail.getBname())) { - it.remove(); - log.info("The topic queue {} {} is expired with items {}", mappingDetail.getTopic(), queueId, items); - } - } - } - } - - - public void cleanItemExpired() { String when = this.brokerController.getMessageStoreConfig().getDeleteWhen(); if (!UtilAll.isItTimeToDo(when)) { return; } - boolean changed = false; - long start = System.currentTimeMillis(); - try { - for(String topic : topicQueueMappingTable.keySet()) { + + for(String topic : topicQueueMappingTable.keySet()) { + try { TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic); if (mappingDetail == null || mappingDetail.getHostedQueues().isEmpty()) { @@ -307,51 +277,132 @@ public class TopicQueueMappingManager extends ConfigManager { continue; } Set brokers = new HashSet<>(); - for (List items: mappingDetail.getHostedQueues().values()) { + for (List items : mappingDetail.getHostedQueues().values()) { if (items.size() < 2) { continue; } - LogicQueueMappingItem earlistItem = items.get(0); - brokers.add(earlistItem.getBname()); + LogicQueueMappingItem leaderItem = items.get(items.size() - 1); + if (!leaderItem.equals(mappingDetail.getBname())) { + brokers.add(leaderItem.getBname()); + } } - Map statsTable = new HashMap<>(); + if (brokers.isEmpty()) { + continue; + } + Map configAndQueueMappingMap = new HashMap<>(); for (String broker: brokers) { - GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader(); + GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader(); header.setTopic(topic); header.setBname(broker); try { - RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null); + RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { throw rpcResponse.getException(); } - statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody()); + configAndQueueMappingMap.put(broker, (TopicConfigAndQueueMapping) rpcResponse.getBody()); } catch (Throwable rt) { log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt); } } - for (List items: mappingDetail.getHostedQueues().values()) { + + Iterator>> it = mappingDetail.getHostedQueues().entrySet().iterator(); + while (it.hasNext()) { + Map.Entry> entry = it.next(); + Integer queueId = entry.getKey(); + List items = entry.getValue(); if (items.size() < 2) { continue; } - LogicQueueMappingItem earlistItem = items.get(0); - TopicStatsTable topicStats = statsTable.get(earlistItem.getBname()); - if (topicStats == null) { + LogicQueueMappingItem leaderItem = items.get(items.size() - 1); + + TopicConfigAndQueueMapping configAndQueueMapping = configAndQueueMappingMap.get(leaderItem.getBname()); + if (configAndQueueMapping == null) { continue; } - TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId())); - if (topicOffset == null) { - //this may should not happen - log.warn("Get null topicOffset for {}", earlistItem); + List itemsRemote = configAndQueueMapping.getMappingDetail().getHostedQueues().get(queueId); + //TODO + } + } catch (Throwable tt) { + log.error("Try cleanItemListMoreThanSecondGen failed for topic {}", topic, tt); + } finally { + UtilAll.sleep(10); + } + } + } + + + public void cleanItemExpired() { + String when = this.brokerController.getMessageStoreConfig().getDeleteWhen(); + if (!UtilAll.isItTimeToDo(when)) { + return; + } + boolean changed = false; + long start = System.currentTimeMillis(); + try { + for(String topic : topicQueueMappingTable.keySet()) { + try { + TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic); + if (mappingDetail == null + || mappingDetail.getHostedQueues().isEmpty()) { + continue; + } + if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) { + log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail); continue; } - if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()) { - boolean result = items.remove(earlistItem); - changed = changed || result; - log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset); + Set brokers = new HashSet<>(); + for (List items: mappingDetail.getHostedQueues().values()) { + if (items.size() < 2) { + continue; + } + LogicQueueMappingItem earlistItem = items.get(0); + brokers.add(earlistItem.getBname()); + } + Map statsTable = new HashMap<>(); + for (String broker: brokers) { + GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader(); + header.setTopic(topic); + header.setBname(broker); + try { + RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null); + RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); + if (rpcResponse.getException() != null) { + throw rpcResponse.getException(); + } + statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody()); + } catch (Throwable rt) { + log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt); + } + } + for (List items: mappingDetail.getHostedQueues().values()) { + if (items.size() < 2) { + continue; + } + LogicQueueMappingItem earlistItem = items.get(0); + TopicStatsTable topicStats = statsTable.get(earlistItem.getBname()); + if (topicStats == null) { + continue; + } + TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId())); + if (topicOffset == null) { + //this may should not happen + log.warn("Get null topicOffset for {}", earlistItem); + continue; + } + if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()) { + //TODO be careful of the concurrent problem + //Should use the lock + boolean result = items.remove(earlistItem); + changed = changed || result; + log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset); + } } + } catch (Throwable tt) { + log.error("Try CleanItemExpired failed for {}", topic, tt); + } finally { + UtilAll.sleep(10); } - UtilAll.sleep(10); } } catch (Throwable t) { log.error("Try cleanItemExpired failed", t); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java index 2b5d040900e4c7cda292ddd889d080a291430a6b..b282efa500c97272383dff42dcddd2dfd5397dbe 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java @@ -17,11 +17,12 @@ package org.apache.rocketmq.common.protocol.header; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class GetTopicConfigRequestHeader implements CommandCustomHeader { +public class GetTopicConfigRequestHeader extends RpcRequestHeader { @Override public void checkFields() throws RemotingCommandException { } diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java index 83f31e74283f0eea1cf88a860c19c6f5684d929a..6d75df9147a9024dcd9981e2f212e692ee9c4169 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java @@ -2,6 +2,7 @@ package org.apache.rocketmq.common.rpc; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.RequestCode; @@ -9,13 +10,19 @@ import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader; import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; +import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import java.util.ArrayList; import java.util.List; @@ -84,7 +91,10 @@ public class RpcClientImpl implements RpcClient { rpcResponsePromise = handleQueryConsumerOffset(addr, request, timeoutMs); break; case RequestCode.GET_TOPIC_STATS_INFO: - rpcResponsePromise = handleGetTopicStats(addr, request, timeoutMs); + rpcResponsePromise = handleCommonBodyRequest(addr, request, timeoutMs, TopicStatsTable.class); + break; + case RequestCode.GET_TOPIC_CONFIG: + rpcResponsePromise = handleCommonBodyRequest(addr, request, timeoutMs, TopicConfigAndQueueMapping.class); break; default: throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode()); @@ -212,16 +222,14 @@ public class RpcClientImpl implements RpcClient { return rpcResponsePromise; } - public Promise handleGetTopicStats(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception { + public Promise handleCommonBodyRequest(final String addr, RpcRequest rpcRequest, long timeoutMillis, Class bodyClass) throws Exception { final Promise rpcResponsePromise = createResponseFuture(); - RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest); RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); assert responseCommand != null; switch (responseCommand.getCode()) { case ResponseCode.SUCCESS: { - TopicStatsTable topicStatsTable = TopicStatsTable.decode(responseCommand.getBody(), TopicStatsTable.class); - rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, topicStatsTable)); + rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, RemotingSerializable.decode(requestCommand.getBody(), bodyClass))); break; } default:{