提交 1d7807bb 编写于 作者: D dongeforever

Finish the test for topic queue mapping clean serice

上级 ad993e30
...@@ -1438,4 +1438,8 @@ public class BrokerController { ...@@ -1438,4 +1438,8 @@ public class BrokerController {
public QueryAssignmentProcessor getQueryAssignmentProcessor() { public QueryAssignmentProcessor getQueryAssignmentProcessor() {
return queryAssignmentProcessor; return queryAssignmentProcessor;
} }
public TopicQueueMappingCleanService getTopicQueueMappingCleanService() {
return topicQueueMappingCleanService;
}
} }
...@@ -37,6 +37,7 @@ import org.apache.rocketmq.common.rpc.RpcResponse; ...@@ -37,6 +37,7 @@ import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
...@@ -74,6 +75,10 @@ public class TopicQueueMappingCleanService extends ServiceThread { ...@@ -74,6 +75,10 @@ public class TopicQueueMappingCleanService extends ServiceThread {
public void run() { public void run() {
log.info("Start topic queue mapping clean service thread!"); log.info("Start topic queue mapping clean service thread!");
while (!this.isStopped()) { while (!this.isStopped()) {
try {
this.waitForRunning(5L * 60 * 1000);
} catch (Throwable ignored) {
}
try { try {
cleanItemExpired(); cleanItemExpired();
} catch (Throwable t) { } catch (Throwable t) {
...@@ -84,11 +89,7 @@ public class TopicQueueMappingCleanService extends ServiceThread { ...@@ -84,11 +89,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
} catch (Throwable t) { } catch (Throwable t) {
log.error("topic queue mapping cleanItemListMoreThanSecondGen failed", t); log.error("topic queue mapping cleanItemListMoreThanSecondGen failed", t);
} }
try {
this.waitForRunning(5L * 60 * 1000);
} catch (Throwable ignore) {
}
} }
log.info("End topic queue mapping clean service thread!"); log.info("End topic queue mapping clean service thread!");
} }
...@@ -119,7 +120,10 @@ public class TopicQueueMappingCleanService extends ServiceThread { ...@@ -119,7 +120,10 @@ public class TopicQueueMappingCleanService extends ServiceThread {
} }
Set<String> brokers = new HashSet<>(); Set<String> brokers = new HashSet<>();
for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) { for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
if (items.size() < 2) { if (items.size() <= 1) {
continue;
}
if(!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
continue; continue;
} }
LogicQueueMappingItem earlistItem = items.get(0); LogicQueueMappingItem earlistItem = items.get(0);
...@@ -138,11 +142,18 @@ public class TopicQueueMappingCleanService extends ServiceThread { ...@@ -138,11 +142,18 @@ public class TopicQueueMappingCleanService extends ServiceThread {
} }
statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody()); statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
} catch (Throwable rt) { } catch (Throwable rt) {
log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt); log.error("Get remote topic {} state info failed from broker {}", topic, broker, rt);
} }
} }
for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) { Map<Integer, List<LogicQueueMappingItem>> newHostedQueues = new HashMap<>();
if (items.size() < 2) { boolean changedForTopic = false;
for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
Integer qid = entry.getKey();
List<LogicQueueMappingItem> items = entry.getValue();
if (items.size() <= 1) {
continue;
}
if(!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
continue; continue;
} }
LogicQueueMappingItem earlistItem = items.get(0); LogicQueueMappingItem earlistItem = items.get(0);
...@@ -153,7 +164,7 @@ public class TopicQueueMappingCleanService extends ServiceThread { ...@@ -153,7 +164,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId())); TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
if (topicOffset == null) { if (topicOffset == null) {
//this may should not happen //this may should not happen
log.warn("Get null topicOffset for {}", earlistItem); log.error("Get null topicOffset for {} {}",topic, earlistItem);
continue; continue;
} }
//ignore the maxOffset < 0, which may in case of some error //ignore the maxOffset < 0, which may in case of some error
...@@ -161,11 +172,20 @@ public class TopicQueueMappingCleanService extends ServiceThread { ...@@ -161,11 +172,20 @@ public class TopicQueueMappingCleanService extends ServiceThread {
|| topicOffset.getMaxOffset() == 0) { || topicOffset.getMaxOffset() == 0) {
List<LogicQueueMappingItem> newItems = new ArrayList<>(items); List<LogicQueueMappingItem> newItems = new ArrayList<>(items);
boolean result = newItems.remove(earlistItem); boolean result = newItems.remove(earlistItem);
this.topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false, true, false); if (result) {
changed = changed || result; changedForTopic = true;
log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset); newHostedQueues.put(qid, newItems);
}
log.info("The logic queue item {} {} is removed {} because of {}", topic, earlistItem, result, topicOffset);
} }
} }
if (changedForTopic) {
TopicQueueMappingDetail newMappingDetail = new TopicQueueMappingDetail(mappingDetail.getTopic(), mappingDetail.getTotalQueues(), mappingDetail.getBname(), mappingDetail.getEpoch());
newMappingDetail.getHostedQueues().putAll(mappingDetail.getHostedQueues());
newMappingDetail.getHostedQueues().putAll(newHostedQueues);
this.topicQueueMappingManager.updateTopicQueueMapping(newMappingDetail, false, true, false);
changed = true;
}
} catch (Throwable tt) { } catch (Throwable tt) {
log.error("Try CleanItemExpired failed for {}", topic, tt); log.error("Try CleanItemExpired failed for {}", topic, tt);
} finally { } finally {
...@@ -241,6 +261,7 @@ public class TopicQueueMappingCleanService extends ServiceThread { ...@@ -241,6 +261,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader(); GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
header.setTopic(topic); header.setTopic(topic);
header.setBname(broker); header.setBname(broker);
header.setWithMapping(true);
try { try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null); RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null);
RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get(); RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
...@@ -252,7 +273,7 @@ public class TopicQueueMappingCleanService extends ServiceThread { ...@@ -252,7 +273,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
mappingDetailMap.put(broker, mappingDetailRemote); mappingDetailMap.put(broker, mappingDetailRemote);
} }
} catch (Throwable rt) { } catch (Throwable rt) {
log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt); log.error("Get remote topic {} state info failed from broker {}", topic, broker, rt);
} }
} }
//check all the info //check all the info
......
...@@ -229,7 +229,7 @@ public class RpcClientImpl implements RpcClient { ...@@ -229,7 +229,7 @@ public class RpcClientImpl implements RpcClient {
assert responseCommand != null; assert responseCommand != null;
switch (responseCommand.getCode()) { switch (responseCommand.getCode()) {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, RemotingSerializable.decode(requestCommand.getBody(), bodyClass))); rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, RemotingSerializable.decode(responseCommand.getBody(), bodyClass)));
break; break;
} }
default:{ default:{
......
...@@ -197,8 +197,8 @@ public class TopicQueueMappingUtils { ...@@ -197,8 +197,8 @@ public class TopicQueueMappingUtils {
if (oldItems == null || oldItems.isEmpty()) { if (oldItems == null || oldItems.isEmpty()) {
return; return;
} }
if (newItems == null || newItems.isEmpty() || newItems.size() < oldItems.size()) { if (newItems == null || newItems.isEmpty()) {
throw new RuntimeException("The new item list is smaller than old ones"); throw new RuntimeException("The new item list is null or empty");
} }
int iold = 0, inew = 0; int iold = 0, inew = 0;
while (iold < oldItems.size() && inew < newItems.size()) { while (iold < oldItems.size() && inew < newItems.size()) {
...@@ -665,4 +665,13 @@ public class TopicQueueMappingUtils { ...@@ -665,4 +665,13 @@ public class TopicQueueMappingUtils {
return null; return null;
} }
public static boolean checkIfLeader(List<LogicQueueMappingItem> items, TopicQueueMappingDetail mappingDetail) {
if (items == null
|| mappingDetail == null
|| items.isEmpty()) {
return false;
}
return items.get(items.size() - 1).getBname().equals(mappingDetail.getBname());
}
} }
...@@ -129,7 +129,7 @@ public class IntegrationTestBase { ...@@ -129,7 +129,7 @@ public class IntegrationTestBase {
String baseDir = createBaseDir(); String baseDir = createBaseDir();
BrokerConfig brokerConfig = new BrokerConfig(); BrokerConfig brokerConfig = new BrokerConfig();
MessageStoreConfig storeConfig = new MessageStoreConfig(); MessageStoreConfig storeConfig = new MessageStoreConfig();
brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement()); brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.incrementAndGet());
brokerConfig.setBrokerIP1("127.0.0.1"); brokerConfig.setBrokerIP1("127.0.0.1");
brokerConfig.setNamesrvAddr(nsAddr); brokerConfig.setNamesrvAddr(nsAddr);
brokerConfig.setEnablePropertyFilter(true); brokerConfig.setEnablePropertyFilter(true);
...@@ -139,6 +139,7 @@ public class IntegrationTestBase { ...@@ -139,6 +139,7 @@ public class IntegrationTestBase {
storeConfig.setMappedFileSizeCommitLog(COMMIT_LOG_SIZE); storeConfig.setMappedFileSizeCommitLog(COMMIT_LOG_SIZE);
storeConfig.setMaxIndexNum(INDEX_NUM); storeConfig.setMaxIndexNum(INDEX_NUM);
storeConfig.setMaxHashSlotNum(INDEX_NUM * 4); storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
storeConfig.setDeleteWhen("01;02;03;04;05;06;07;08;09;10;11;12;13;14;15;16;17;18;19;20;21;22;23;00");
return createAndStartBroker(storeConfig, brokerConfig); return createAndStartBroker(storeConfig, brokerConfig);
} }
......
package org.apache.rocketmq.test.statictopic; package org.apache.rocketmq.test.statictopic;
import com.alibaba.fastjson.JSON;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.rpc.ClientMetadata; import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
...@@ -357,6 +360,121 @@ public class StaticTopicIT extends BaseConf { ...@@ -357,6 +360,121 @@ public class StaticTopicIT extends BaseConf {
} }
public void sendMessagesAndCheck(RMQNormalProducer producer, String broker, String topic, int queueNum, int msgEachQueue, long baseOffset) throws Exception {
ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
List<MessageQueue> messageQueueList = producer.getMessageQueue();
Assert.assertEquals(queueNum, messageQueueList.size());
for (int i = 0; i < queueNum; i++) {
MessageQueue messageQueue = messageQueueList.get(i);
Assert.assertEquals(i, messageQueue.getQueueId());
Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
Assert.assertEquals(destBrokerName, broker);
}
for(MessageQueue messageQueue: messageQueueList) {
producer.send(msgEachQueue, messageQueue);
}
Assert.assertEquals(0, producer.getSendErrorMsg().size());
//leave the time to build the cq
Thread.sleep(100);
for(MessageQueue messageQueue: messageQueueList) {
Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
Assert.assertEquals(msgEachQueue + baseOffset, defaultMQAdminExt.maxOffset(messageQueue));
}
}
@Test
public void testRemappingAndClear() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
RMQNormalProducer producer = getProducer(nsAddr, topic);
int queueNum = 10;
int msgEachQueue = 100;
//create to broker1Name
{
Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker1Name);
MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
//leave the time to refresh the metadata
Thread.sleep(500);
sendMessagesAndCheck(producer, broker1Name, topic, queueNum, msgEachQueue, 0L);
}
//remapping to broker2Name
{
Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker2Name);
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
//leave the time to refresh the metadata
Thread.sleep(500);
sendMessagesAndCheck(producer, broker2Name, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
}
//remapping to broker3Name
{
Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker3Name);
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
//leave the time to refresh the metadata
Thread.sleep(500);
sendMessagesAndCheck(producer, broker3Name, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE * 2);
}
// 1 -> 2 -> 3, currently 1 should not has any mappings
{
for (int i = 0; i < 10; i++) {
for (BrokerController brokerController: brokerControllerList) {
brokerController.getTopicQueueMappingCleanService().wakeup();
}
Thread.sleep(100);
}
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
Assert.assertEquals(brokerNum, brokerConfigMap.size());
TopicConfigAndQueueMapping config1 = brokerConfigMap.get(broker1Name);
TopicConfigAndQueueMapping config2 = brokerConfigMap.get(broker2Name);
TopicConfigAndQueueMapping config3 = brokerConfigMap.get(broker3Name);
Assert.assertEquals(0, config1.getMappingDetail().getHostedQueues().size());
Assert.assertEquals(queueNum, config2.getMappingDetail().getHostedQueues().size());
Assert.assertEquals(queueNum, config3.getMappingDetail().getHostedQueues().size());
}
{
Set<String> topics = new HashSet<>(brokerController1.getTopicConfigManager().getTopicConfigTable().keySet());
topics.remove(topic);
brokerController1.getMessageStore().cleanUnusedTopic(topics);
brokerController2.getMessageStore().cleanUnusedTopic(topics);
for (int i = 0; i < 10; i++) {
for (BrokerController brokerController: brokerControllerList) {
brokerController.getTopicQueueMappingCleanService().wakeup();
}
Thread.sleep(100);
}
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
Assert.assertEquals(brokerNum, brokerConfigMap.size());
TopicConfigAndQueueMapping config1 = brokerConfigMap.get(broker1Name);
TopicConfigAndQueueMapping config2 = brokerConfigMap.get(broker2Name);
TopicConfigAndQueueMapping config3 = brokerConfigMap.get(broker3Name);
Assert.assertEquals(0, config1.getMappingDetail().getHostedQueues().size());
Assert.assertEquals(queueNum, config2.getMappingDetail().getHostedQueues().size());
Assert.assertEquals(queueNum, config3.getMappingDetail().getHostedQueues().size());
//The first leader will clear it
for (List<LogicQueueMappingItem> items : config1.getMappingDetail().getHostedQueues().values()) {
Assert.assertEquals(3, items.size());
}
//The second leader do nothing
for (List<LogicQueueMappingItem> items : config3.getMappingDetail().getHostedQueues().values()) {
Assert.assertEquals(1, items.size());
}
}
}
@Test @Test
public void testRemappingWithNegativeLogicOffset() throws Exception { public void testRemappingWithNegativeLogicOffset() throws Exception {
......
...@@ -190,61 +190,26 @@ public class MQAdminUtils { ...@@ -190,61 +190,26 @@ public class MQAdminUtils {
public static Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(String topic, DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, InterruptedException, MQBrokerException { public static Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(String topic, DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, InterruptedException, MQBrokerException {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>(); Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
ClientMetadata clientMetadata = new ClientMetadata(); ClientMetadata clientMetadata = new ClientMetadata();
boolean getFromBrokers = false; //check all the brokers
TopicRouteData routeData = null; ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
try { if (clusterInfo != null
routeData = defaultMQAdminExt.examineTopicRouteInfo(topic); && clusterInfo.getBrokerAddrTable() != null) {
} catch (MQClientException exception) { clientMetadata.refreshClusterInfo(clusterInfo);
if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw new MQBrokerException(exception.getResponseCode(), exception.getErrorMessage());
} else {
getFromBrokers = true;
}
} }
if (!getFromBrokers) { for (String broker : clientMetadata.getBrokerAddrTable().keySet()) {
if (routeData != null String addr = clientMetadata.findMasterBrokerAddr(broker);
&& !routeData.getQueueDatas().isEmpty()) { try {
clientMetadata.freshTopicRoute(topic, routeData); TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
for (QueueData queueData: routeData.getQueueDatas()) { //allow the config is null
String bname = queueData.getBrokerName(); if (mapping != null) {
String addr = clientMetadata.findMasterBrokerAddr(bname); if (mapping.getMappingDetail() != null) {
try { assert mapping.getMappingDetail().getBname().equals(broker);
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
} catch (MQBrokerException exception) {
if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw exception;
}
} }
brokerConfigMap.put(broker, mapping);
} }
} } catch (MQBrokerException exception1) {
} else { if (exception1.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
//if cannot get from nameserver, then check all the brokers throw exception1;
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo != null
&& clusterInfo.getBrokerAddrTable() != null) {
clientMetadata.refreshClusterInfo(clusterInfo);
}
for (Map.Entry<String, HashMap<Long, String>> entry : clientMetadata.getBrokerAddrTable().entrySet()) {
String bname = entry.getKey();
HashMap<Long, String> map = entry.getValue();
String addr = map.get(MixAll.MASTER_ID);
if (addr != null) {
try {
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
} catch (MQBrokerException exception1) {
if (exception1.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw exception1;
}
}
} }
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册