diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 4ef34f932b78995fb06ecf8928a971faf1319332..338f31e241564f4eecb2146b6e3c7591e344676f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1438,4 +1438,8 @@ public class BrokerController { public QueryAssignmentProcessor getQueryAssignmentProcessor() { return queryAssignmentProcessor; } + + public TopicQueueMappingCleanService getTopicQueueMappingCleanService() { + return topicQueueMappingCleanService; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java index 91fd60d16ccf559c8dddfacb12350574100288ca..05c5003037a4f56f22851680649b083058fc73df 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java @@ -37,6 +37,7 @@ import org.apache.rocketmq.common.rpc.RpcResponse; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.config.MessageStoreConfig; @@ -74,6 +75,10 @@ public class TopicQueueMappingCleanService extends ServiceThread { public void run() { log.info("Start topic queue mapping clean service thread!"); while (!this.isStopped()) { + try { + this.waitForRunning(5L * 60 * 1000); + } catch (Throwable ignored) { + } try { cleanItemExpired(); } catch (Throwable t) { @@ -84,11 +89,7 @@ public class TopicQueueMappingCleanService extends ServiceThread { } catch (Throwable t) { log.error("topic queue mapping cleanItemListMoreThanSecondGen failed", t); } - try { - this.waitForRunning(5L * 60 * 1000); - } catch (Throwable ignore) { - } } log.info("End topic queue mapping clean service thread!"); } @@ -119,7 +120,10 @@ public class TopicQueueMappingCleanService extends ServiceThread { } Set brokers = new HashSet<>(); for (List items: mappingDetail.getHostedQueues().values()) { - if (items.size() < 2) { + if (items.size() <= 1) { + continue; + } + if(!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) { continue; } LogicQueueMappingItem earlistItem = items.get(0); @@ -138,11 +142,18 @@ public class TopicQueueMappingCleanService extends ServiceThread { } statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody()); } catch (Throwable rt) { - log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt); + log.error("Get remote topic {} state info failed from broker {}", topic, broker, rt); } } - for (List items: mappingDetail.getHostedQueues().values()) { - if (items.size() < 2) { + Map> newHostedQueues = new HashMap<>(); + boolean changedForTopic = false; + for (Map.Entry> entry : mappingDetail.getHostedQueues().entrySet()) { + Integer qid = entry.getKey(); + List items = entry.getValue(); + if (items.size() <= 1) { + continue; + } + if(!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) { continue; } LogicQueueMappingItem earlistItem = items.get(0); @@ -153,7 +164,7 @@ public class TopicQueueMappingCleanService extends ServiceThread { TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId())); if (topicOffset == null) { //this may should not happen - log.warn("Get null topicOffset for {}", earlistItem); + log.error("Get null topicOffset for {} {}",topic, earlistItem); continue; } //ignore the maxOffset < 0, which may in case of some error @@ -161,11 +172,20 @@ public class TopicQueueMappingCleanService extends ServiceThread { || topicOffset.getMaxOffset() == 0) { List newItems = new ArrayList<>(items); boolean result = newItems.remove(earlistItem); - this.topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false, true, false); - changed = changed || result; - log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset); + if (result) { + changedForTopic = true; + newHostedQueues.put(qid, newItems); + } + log.info("The logic queue item {} {} is removed {} because of {}", topic, earlistItem, result, topicOffset); } } + if (changedForTopic) { + TopicQueueMappingDetail newMappingDetail = new TopicQueueMappingDetail(mappingDetail.getTopic(), mappingDetail.getTotalQueues(), mappingDetail.getBname(), mappingDetail.getEpoch()); + newMappingDetail.getHostedQueues().putAll(mappingDetail.getHostedQueues()); + newMappingDetail.getHostedQueues().putAll(newHostedQueues); + this.topicQueueMappingManager.updateTopicQueueMapping(newMappingDetail, false, true, false); + changed = true; + } } catch (Throwable tt) { log.error("Try CleanItemExpired failed for {}", topic, tt); } finally { @@ -241,6 +261,7 @@ public class TopicQueueMappingCleanService extends ServiceThread { GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader(); header.setTopic(topic); header.setBname(broker); + header.setWithMapping(true); try { RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null); RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get(); @@ -252,7 +273,7 @@ public class TopicQueueMappingCleanService extends ServiceThread { mappingDetailMap.put(broker, mappingDetailRemote); } } catch (Throwable rt) { - log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt); + log.error("Get remote topic {} state info failed from broker {}", topic, broker, rt); } } //check all the info diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java index 6d75df9147a9024dcd9981e2f212e692ee9c4169..eaa22be758d0cf5028d31982a9f4618603242efe 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java @@ -229,7 +229,7 @@ public class RpcClientImpl implements RpcClient { assert responseCommand != null; switch (responseCommand.getCode()) { case ResponseCode.SUCCESS: { - rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, RemotingSerializable.decode(requestCommand.getBody(), bodyClass))); + rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, RemotingSerializable.decode(responseCommand.getBody(), bodyClass))); break; } default:{ diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index 1b5290406000a5a0934550da59a28c7a11849f9c..5c564a3a5f50ee86878d4a799b4be960ec23c022 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -197,8 +197,8 @@ public class TopicQueueMappingUtils { if (oldItems == null || oldItems.isEmpty()) { return; } - if (newItems == null || newItems.isEmpty() || newItems.size() < oldItems.size()) { - throw new RuntimeException("The new item list is smaller than old ones"); + if (newItems == null || newItems.isEmpty()) { + throw new RuntimeException("The new item list is null or empty"); } int iold = 0, inew = 0; while (iold < oldItems.size() && inew < newItems.size()) { @@ -665,4 +665,13 @@ public class TopicQueueMappingUtils { return null; } + + public static boolean checkIfLeader(List items, TopicQueueMappingDetail mappingDetail) { + if (items == null + || mappingDetail == null + || items.isEmpty()) { + return false; + } + return items.get(items.size() - 1).getBname().equals(mappingDetail.getBname()); + } } diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java index 06f079c4d5806a87feecc37470adc66e70d65954..56ee320dbad6abc49a5697b57e5698ac23bc7d2a 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java @@ -129,7 +129,7 @@ public class IntegrationTestBase { String baseDir = createBaseDir(); BrokerConfig brokerConfig = new BrokerConfig(); MessageStoreConfig storeConfig = new MessageStoreConfig(); - brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement()); + brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.incrementAndGet()); brokerConfig.setBrokerIP1("127.0.0.1"); brokerConfig.setNamesrvAddr(nsAddr); brokerConfig.setEnablePropertyFilter(true); @@ -139,6 +139,7 @@ public class IntegrationTestBase { storeConfig.setMappedFileSizeCommitLog(COMMIT_LOG_SIZE); storeConfig.setMaxIndexNum(INDEX_NUM); storeConfig.setMaxHashSlotNum(INDEX_NUM * 4); + storeConfig.setDeleteWhen("01;02;03;04;05;06;07;08;09;10;11;12;13;14;15;16;17;18;19;20;21;22;23;00"); return createAndStartBroker(storeConfig, brokerConfig); } diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java index 14bb96783b8b58c8513411cd51798bdb7f5f09af..fdefb06c0363fd21062559c5441825c85a1f27d8 100644 --- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java @@ -1,10 +1,13 @@ package org.apache.rocketmq.test.statictopic; +import com.alibaba.fastjson.JSON; import org.apache.log4j.Logger; +import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.rpc.ClientMetadata; +import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; @@ -357,6 +360,121 @@ public class StaticTopicIT extends BaseConf { } + public void sendMessagesAndCheck(RMQNormalProducer producer, String broker, String topic, int queueNum, int msgEachQueue, long baseOffset) throws Exception { + ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt); + List messageQueueList = producer.getMessageQueue(); + Assert.assertEquals(queueNum, messageQueueList.size()); + for (int i = 0; i < queueNum; i++) { + MessageQueue messageQueue = messageQueueList.get(i); + Assert.assertEquals(i, messageQueue.getQueueId()); + Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName()); + String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue); + Assert.assertEquals(destBrokerName, broker); + } + + for(MessageQueue messageQueue: messageQueueList) { + producer.send(msgEachQueue, messageQueue); + } + Assert.assertEquals(0, producer.getSendErrorMsg().size()); + //leave the time to build the cq + Thread.sleep(100); + for(MessageQueue messageQueue: messageQueueList) { + Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue)); + Assert.assertEquals(msgEachQueue + baseOffset, defaultMQAdminExt.maxOffset(messageQueue)); + } + } + + + @Test + public void testRemappingAndClear() throws Exception { + String topic = "static" + MQRandomUtils.getRandomTopic(); + RMQNormalProducer producer = getProducer(nsAddr, topic); + int queueNum = 10; + int msgEachQueue = 100; + //create to broker1Name + { + Set targetBrokers = new HashSet<>(); + targetBrokers.add(broker1Name); + MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt); + //leave the time to refresh the metadata + Thread.sleep(500); + sendMessagesAndCheck(producer, broker1Name, topic, queueNum, msgEachQueue, 0L); + } + + //remapping to broker2Name + { + Set targetBrokers = new HashSet<>(); + targetBrokers.add(broker2Name); + MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt); + //leave the time to refresh the metadata + Thread.sleep(500); + sendMessagesAndCheck(producer, broker2Name, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE); + } + + //remapping to broker3Name + { + Set targetBrokers = new HashSet<>(); + targetBrokers.add(broker3Name); + MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt); + //leave the time to refresh the metadata + Thread.sleep(500); + sendMessagesAndCheck(producer, broker3Name, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE * 2); + } + + // 1 -> 2 -> 3, currently 1 should not has any mappings + + { + for (int i = 0; i < 10; i++) { + for (BrokerController brokerController: brokerControllerList) { + brokerController.getTopicQueueMappingCleanService().wakeup(); + } + Thread.sleep(100); + } + Map brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); + Assert.assertEquals(brokerNum, brokerConfigMap.size()); + TopicConfigAndQueueMapping config1 = brokerConfigMap.get(broker1Name); + TopicConfigAndQueueMapping config2 = brokerConfigMap.get(broker2Name); + TopicConfigAndQueueMapping config3 = brokerConfigMap.get(broker3Name); + Assert.assertEquals(0, config1.getMappingDetail().getHostedQueues().size()); + Assert.assertEquals(queueNum, config2.getMappingDetail().getHostedQueues().size()); + + Assert.assertEquals(queueNum, config3.getMappingDetail().getHostedQueues().size()); + + } + { + Set topics = new HashSet<>(brokerController1.getTopicConfigManager().getTopicConfigTable().keySet()); + topics.remove(topic); + brokerController1.getMessageStore().cleanUnusedTopic(topics); + brokerController2.getMessageStore().cleanUnusedTopic(topics); + for (int i = 0; i < 10; i++) { + for (BrokerController brokerController: brokerControllerList) { + brokerController.getTopicQueueMappingCleanService().wakeup(); + } + Thread.sleep(100); + } + + Map brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); + Assert.assertEquals(brokerNum, brokerConfigMap.size()); + TopicConfigAndQueueMapping config1 = brokerConfigMap.get(broker1Name); + TopicConfigAndQueueMapping config2 = brokerConfigMap.get(broker2Name); + TopicConfigAndQueueMapping config3 = brokerConfigMap.get(broker3Name); + Assert.assertEquals(0, config1.getMappingDetail().getHostedQueues().size()); + Assert.assertEquals(queueNum, config2.getMappingDetail().getHostedQueues().size()); + Assert.assertEquals(queueNum, config3.getMappingDetail().getHostedQueues().size()); + //The first leader will clear it + for (List items : config1.getMappingDetail().getHostedQueues().values()) { + Assert.assertEquals(3, items.size()); + } + //The second leader do nothing + for (List items : config3.getMappingDetail().getHostedQueues().values()) { + Assert.assertEquals(1, items.size()); + } + + } + + + } + @Test public void testRemappingWithNegativeLogicOffset() throws Exception { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java index eed6b767c25221615152ba2cf1cbd8472c2718a9..0fda471c8c12d4b494fb25855fc597fcc86cc10d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java @@ -190,61 +190,26 @@ public class MQAdminUtils { public static Map examineTopicConfigAll(String topic, DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, InterruptedException, MQBrokerException { Map brokerConfigMap = new HashMap<>(); ClientMetadata clientMetadata = new ClientMetadata(); - boolean getFromBrokers = false; - TopicRouteData routeData = null; - try { - routeData = defaultMQAdminExt.examineTopicRouteInfo(topic); - } catch (MQClientException exception) { - if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { - throw new MQBrokerException(exception.getResponseCode(), exception.getErrorMessage()); - } else { - getFromBrokers = true; - } + //check all the brokers + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + if (clusterInfo != null + && clusterInfo.getBrokerAddrTable() != null) { + clientMetadata.refreshClusterInfo(clusterInfo); } - if (!getFromBrokers) { - if (routeData != null - && !routeData.getQueueDatas().isEmpty()) { - clientMetadata.freshTopicRoute(topic, routeData); - for (QueueData queueData: routeData.getQueueDatas()) { - String bname = queueData.getBrokerName(); - String addr = clientMetadata.findMasterBrokerAddr(bname); - try { - TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic); - //allow the config is null - if (mapping != null) { - brokerConfigMap.put(bname, mapping); - } - } catch (MQBrokerException exception) { - if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { - throw exception; - } + for (String broker : clientMetadata.getBrokerAddrTable().keySet()) { + String addr = clientMetadata.findMasterBrokerAddr(broker); + try { + TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic); + //allow the config is null + if (mapping != null) { + if (mapping.getMappingDetail() != null) { + assert mapping.getMappingDetail().getBname().equals(broker); } - + brokerConfigMap.put(broker, mapping); } - } - } else { - //if cannot get from nameserver, then check all the brokers - ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); - if (clusterInfo != null - && clusterInfo.getBrokerAddrTable() != null) { - clientMetadata.refreshClusterInfo(clusterInfo); - } - for (Map.Entry> entry : clientMetadata.getBrokerAddrTable().entrySet()) { - String bname = entry.getKey(); - HashMap map = entry.getValue(); - String addr = map.get(MixAll.MASTER_ID); - if (addr != null) { - try { - TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic); - //allow the config is null - if (mapping != null) { - brokerConfigMap.put(bname, mapping); - } - } catch (MQBrokerException exception1) { - if (exception1.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { - throw exception1; - } - } + } catch (MQBrokerException exception1) { + if (exception1.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { + throw exception1; } } }