提交 1ca218f1 编写于 作者: D dongeforever

Add TopicQueueMappingCleanService

上级 c06564f6
...@@ -56,6 +56,7 @@ import org.apache.rocketmq.broker.processor.SendMessageProcessor; ...@@ -56,6 +56,7 @@ import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.slave.SlaveSynchronize; import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
import org.apache.rocketmq.broker.topic.TopicQueueMappingManager; import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener; import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService; import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
...@@ -64,11 +65,9 @@ import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageC ...@@ -64,11 +65,9 @@ import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageC
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
import org.apache.rocketmq.broker.util.ServiceProvider; import org.apache.rocketmq.broker.util.ServiceProvider;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Configuration; import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
...@@ -76,7 +75,6 @@ import org.apache.rocketmq.common.constant.LoggerName; ...@@ -76,7 +75,6 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
...@@ -87,9 +85,6 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; ...@@ -87,9 +85,6 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer; import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.TlsMode; import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer; import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
...@@ -163,9 +158,6 @@ public class BrokerController { ...@@ -163,9 +158,6 @@ public class BrokerController {
private final BrokerOuterAPI brokerOuterAPI; private final BrokerOuterAPI brokerOuterAPI;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"BrokerControllerScheduledThread")); "BrokerControllerScheduledThread"));
//the topic queue mapping is costly, so use an independent executor
private final ScheduledExecutorService scheduledForTopicQueueMapping = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"BrokerControllerScheduledThread-TopicQueueMapping"));
private final SlaveSynchronize slaveSynchronize; private final SlaveSynchronize slaveSynchronize;
private final BlockingQueue<Runnable> sendThreadPoolQueue; private final BlockingQueue<Runnable> sendThreadPoolQueue;
private final BlockingQueue<Runnable> ackThreadPoolQueue; private final BlockingQueue<Runnable> ackThreadPoolQueue;
...@@ -202,6 +194,7 @@ public class BrokerController { ...@@ -202,6 +194,7 @@ public class BrokerController {
private InetSocketAddress storeHost; private InetSocketAddress storeHost;
private BrokerFastFailure brokerFastFailure; private BrokerFastFailure brokerFastFailure;
private Configuration configuration; private Configuration configuration;
private TopicQueueMappingCleanService topicQueueMappingCleanService;
private FileWatchService fileWatchService; private FileWatchService fileWatchService;
private TransactionalMessageCheckService transactionalMessageCheckService; private TransactionalMessageCheckService transactionalMessageCheckService;
private TransactionalMessageService transactionalMessageService; private TransactionalMessageService transactionalMessageService;
...@@ -501,21 +494,6 @@ public class BrokerController { ...@@ -501,21 +494,6 @@ public class BrokerController {
} }
}, 1, 5, TimeUnit.SECONDS); }, 1, 5, TimeUnit.SECONDS);
this.scheduledForTopicQueueMapping.scheduleAtFixedRate( () -> {
try {
this.topicQueueMappingManager.cleanItemListMoreThanSecondGen();
} catch (Throwable t) {
log.error("ScheduledTask cleanItemListMoreThanSecondGen failed", t);
}
}, 1, 5, TimeUnit.MINUTES);
this.scheduledForTopicQueueMapping.scheduleAtFixedRate( () -> {
try {
this.topicQueueMappingManager.cleanItemExpired();
} catch (Throwable t) {
log.error("ScheduledTask cleanItemExpired failed", t);
}
}, 1, 5, TimeUnit.MINUTES);
if (!messageStoreConfig.isEnableDLegerCommitLog()) { if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
...@@ -539,6 +517,8 @@ public class BrokerController { ...@@ -539,6 +517,8 @@ public class BrokerController {
} }
} }
this.topicQueueMappingCleanService = new TopicQueueMappingCleanService(this);
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext // Register a listener to reload SslContext
try { try {
...@@ -897,6 +877,10 @@ public class BrokerController { ...@@ -897,6 +877,10 @@ public class BrokerController {
this.fastRemotingServer.shutdown(); this.fastRemotingServer.shutdown();
} }
if (this.topicQueueMappingCleanService != null) {
this.topicQueueMappingCleanService.shutdown();
}
if (this.fileWatchService != null) { if (this.fileWatchService != null) {
this.fileWatchService.shutdown(); this.fileWatchService.shutdown();
} }
...@@ -911,12 +895,6 @@ public class BrokerController { ...@@ -911,12 +895,6 @@ public class BrokerController {
} catch (InterruptedException e) { } catch (InterruptedException e) {
} }
this.scheduledForTopicQueueMapping.shutdown();
try {
this.scheduledForTopicQueueMapping.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (Throwable ignored) {
}
this.unregisterBrokerAll(); this.unregisterBrokerAll();
if (this.sendMessageExecutor != null) { if (this.sendMessageExecutor != null) {
...@@ -1020,6 +998,10 @@ public class BrokerController { ...@@ -1020,6 +998,10 @@ public class BrokerController {
this.fastRemotingServer.start(); this.fastRemotingServer.start();
} }
if (this.topicQueueMappingCleanService != null) {
this.topicQueueMappingCleanService.start();
}
if (this.fileWatchService != null) { if (this.fileWatchService != null) {
this.fileWatchService.start(); this.fileWatchService.start();
} }
......
...@@ -323,7 +323,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -323,7 +323,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
try { try {
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingDetail, force); this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingDetail, force, false, true);
this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
response.setCode(ResponseCode.SUCCESS); response.setCode(ResponseCode.SUCCESS);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.topic;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.rpc.RpcClient;
import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class TopicQueueMappingCleanService extends ServiceThread {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private TopicQueueMappingManager topicQueueMappingManager;
private BrokerOuterAPI brokerOuterAPI;
private RpcClient rpcClient;
private MessageStoreConfig messageStoreConfig;
private BrokerConfig brokerConfig;
public TopicQueueMappingCleanService(BrokerController brokerController) {
this.topicQueueMappingManager = brokerController.getTopicQueueMappingManager();
this.rpcClient = brokerController.getBrokerOuterAPI().getRpcClient();
this.messageStoreConfig = brokerController.getMessageStoreConfig();
this.brokerConfig = brokerController.getBrokerConfig();
this.brokerOuterAPI = brokerController.getBrokerOuterAPI();
}
@Override
public String getServiceName() {
return TopicQueueMappingCleanService.class.getSimpleName();
}
@Override
public void run() {
log.info("Start topic queue mapping clean service thread!");
while (!this.isStopped()) {
try {
cleanItemExpired();
} catch (Throwable t) {
log.error("topic queue mapping cleanItemExpired failed", t);
}
try {
cleanItemListMoreThanSecondGen();
} catch (Throwable t) {
log.error("topic queue mapping cleanItemListMoreThanSecondGen failed", t);
}
try {
this.waitForRunning(5L * 60 * 1000);
} catch (Throwable ignore) {
}
}
log.info("End topic queue mapping clean service thread!");
}
public void cleanItemExpired() {
String when = messageStoreConfig.getDeleteWhen();
if (!UtilAll.isItTimeToDo(when)) {
return;
}
boolean changed = false;
long start = System.currentTimeMillis();
try {
for(String topic : this.topicQueueMappingManager.getTopicQueueMappingTable().keySet()) {
try {
if (isStopped()) {
break;
}
TopicQueueMappingDetail mappingDetail = this.topicQueueMappingManager.getTopicQueueMappingTable().get(topic);
if (mappingDetail == null
|| mappingDetail.getHostedQueues().isEmpty()) {
continue;
}
if (!mappingDetail.getBname().equals(brokerConfig.getBrokerName())) {
log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
continue;
}
Set<String> brokers = new HashSet<>();
for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
if (items.size() < 2) {
continue;
}
LogicQueueMappingItem earlistItem = items.get(0);
brokers.add(earlistItem.getBname());
}
Map<String, TopicStatsTable> statsTable = new HashMap<>();
for (String broker: brokers) {
GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
header.setTopic(topic);
header.setBname(broker);
try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
} catch (Throwable rt) {
log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
}
}
for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
if (items.size() < 2) {
continue;
}
LogicQueueMappingItem earlistItem = items.get(0);
TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
if (topicStats == null) {
continue;
}
TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
if (topicOffset == null) {
//this may should not happen
log.warn("Get null topicOffset for {}", earlistItem);
continue;
}
//ignore the maxOffset < 0, which may in case of some error
if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()
|| topicOffset.getMaxOffset() == 0) {
List<LogicQueueMappingItem> newItems = new ArrayList<>(items);
boolean result = newItems.remove(earlistItem);
this.topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false, true, false);
changed = changed || result;
log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset);
}
}
} catch (Throwable tt) {
log.error("Try CleanItemExpired failed for {}", topic, tt);
} finally {
UtilAll.sleep(10);
}
}
} catch (Throwable t) {
log.error("Try cleanItemExpired failed", t);
} finally {
if (changed) {
this.topicQueueMappingManager.getDataVersion().nextVersion();
this.topicQueueMappingManager.persist();
log.info("CleanItemExpired changed");
}
log.info("cleanItemExpired cost {} ms", System.currentTimeMillis() - start);
}
}
public void cleanItemListMoreThanSecondGen() {
String when = messageStoreConfig.getDeleteWhen();
if (!UtilAll.isItTimeToDo(when)) {
return;
}
boolean changed = false;
long start = System.currentTimeMillis();
try {
ClientMetadata clientMetadata = new ClientMetadata();
for (String topic : this.topicQueueMappingManager.getTopicQueueMappingTable().keySet()) {
try {
if (isStopped()) {
break;
}
TopicQueueMappingDetail mappingDetail = this.topicQueueMappingManager.getTopicQueueMappingTable().get(topic);
if (mappingDetail == null
|| mappingDetail.getHostedQueues().isEmpty()) {
continue;
}
if (!mappingDetail.getBname().equals(brokerConfig.getBrokerName())) {
log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
continue;
}
Map<Integer, String> qid2CurrLeaderBroker = new HashMap<>();
for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
Integer qId = entry.getKey();
List<LogicQueueMappingItem> items = entry.getValue();
if (items.isEmpty()) {
continue;
}
LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
if (!leaderItem.getBname().equals(mappingDetail.getBname())) {
qid2CurrLeaderBroker.put(qId, leaderItem.getBname());
}
}
if (qid2CurrLeaderBroker.isEmpty()) {
continue;
}
//find the topic route
TopicRouteData topicRouteData = brokerOuterAPI.getTopicRouteInfoFromNameServer(topic, brokerConfig.getForwardTimeout());
clientMetadata.freshTopicRoute(topic, topicRouteData);
Map<Integer, String> qid2RealLeaderBroker = new HashMap<>();
//fine the real leader
for (Map.Entry<Integer, String> entry : qid2CurrLeaderBroker.entrySet()) {
qid2RealLeaderBroker.put(entry.getKey(), clientMetadata.getBrokerNameFromMessageQueue(new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, entry.getKey())));
}
//find the mapping detail of real leader
Map<String, TopicQueueMappingDetail> mappingDetailMap = new HashMap<>();
for (Map.Entry<Integer, String> entry : qid2RealLeaderBroker.entrySet()) {
if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(entry.getValue())) {
continue;
}
String broker = entry.getValue();
GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
header.setTopic(topic);
header.setBname(broker);
try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null);
RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
TopicQueueMappingDetail mappingDetailRemote = ((TopicConfigAndQueueMapping) rpcResponse.getBody()).getMappingDetail();
if (broker.equals(mappingDetailRemote.getBname())) {
mappingDetailMap.put(broker, mappingDetailRemote);
}
} catch (Throwable rt) {
log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
}
}
//check all the info
Set<Integer> ids2delete = new HashSet<>();
for (Map.Entry<Integer, String> entry : qid2CurrLeaderBroker.entrySet()) {
Integer qId = entry.getKey();
String currLeaderBroker = entry.getValue();
String realLeaderBroker = qid2RealLeaderBroker.get(qId);
TopicQueueMappingDetail remoteMappingDetail = mappingDetailMap.get(realLeaderBroker);
if (remoteMappingDetail == null
|| remoteMappingDetail.getTotalQueues() != mappingDetail.getTotalQueues()
|| remoteMappingDetail.getEpoch() != mappingDetail.getEpoch()) {
continue;
}
List<LogicQueueMappingItem> items = remoteMappingDetail.getHostedQueues().get(qId);
if (items.isEmpty()) {
continue;
}
LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
if (!realLeaderBroker.equals(leaderItem.getBname())) {
continue;
}
//all the check is ok
if (!realLeaderBroker.equals(currLeaderBroker)) {
ids2delete.add(qId);
}
}
for (Integer qid : ids2delete) {
List<LogicQueueMappingItem> items = mappingDetail.getHostedQueues().remove(qid);
changed = true;
if (items != null) {
log.info("Remove the ItemListMoreThanSecondGen topic {} qid {} items {}", topic, qid, items);
}
}
} catch (Throwable tt) {
log.error("Try cleanItemListMoreThanSecondGen failed for topic {}", topic, tt);
} finally {
UtilAll.sleep(10);
}
}
} catch (Throwable t) {
log.error("Try cleanItemListMoreThanSecondGen failed", t);
} finally {
if (changed) {
this.topicQueueMappingManager.getDataVersion().nextVersion();
this.topicQueueMappingManager.persist();
}
log.info("Try cleanItemListMoreThanSecondGen cost {} ms", System.currentTimeMillis() - start);
}
}
}
...@@ -22,27 +22,22 @@ import org.apache.rocketmq.broker.BrokerPathConfigHelper; ...@@ -22,27 +22,22 @@ import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
import org.apache.rocketmq.common.rpc.RpcRequest; import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse; import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.DefaultMessageStore;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
...@@ -74,7 +69,7 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -74,7 +69,7 @@ public class TopicQueueMappingManager extends ConfigManager {
this.brokerController = brokerController; this.brokerController = brokerController;
} }
public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) throws Exception { public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force, boolean isClean, boolean flush) throws Exception {
boolean locked = false; boolean locked = false;
boolean updated = false; boolean updated = false;
TopicQueueMappingDetail oldDetail = null; TopicQueueMappingDetail oldDetail = null;
...@@ -124,7 +119,7 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -124,7 +119,7 @@ public class TopicQueueMappingManager extends ConfigManager {
newDetail.getHostedQueues().put(globalId, oldItems); newDetail.getHostedQueues().put(globalId, oldItems);
} }
} else { } else {
TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems, newItems, epochEqual); TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems, newItems, epochEqual, isClean);
} }
} }
topicQueueMappingTable.put(newDetail.getTopic(), newDetail); topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
...@@ -133,7 +128,8 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -133,7 +128,8 @@ public class TopicQueueMappingManager extends ConfigManager {
if (locked) { if (locked) {
this.lock.unlock(); this.lock.unlock();
} }
if (updated) { if (updated && flush) {
this.dataVersion.nextVersion();
this.persist(); this.persist();
log.info("Update topic queue mapping from [{}] to [{}], force {}", oldDetail, newDetail, force); log.info("Update topic queue mapping from [{}] to [{}], force {}", oldDetail, newDetail, force);
} }
...@@ -258,162 +254,4 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -258,162 +254,4 @@ public class TopicQueueMappingManager extends ConfigManager {
} }
} }
public void cleanItemListMoreThanSecondGen() {
String when = this.brokerController.getMessageStoreConfig().getDeleteWhen();
if (!UtilAll.isItTimeToDo(when)) {
return;
}
for(String topic : topicQueueMappingTable.keySet()) {
try {
TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
if (mappingDetail == null
|| mappingDetail.getHostedQueues().isEmpty()) {
continue;
}
if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
continue;
}
Set<String> brokers = new HashSet<>();
for (List<LogicQueueMappingItem> items : mappingDetail.getHostedQueues().values()) {
if (items.size() < 2) {
continue;
}
LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
if (!leaderItem.equals(mappingDetail.getBname())) {
brokers.add(leaderItem.getBname());
}
}
if (brokers.isEmpty()) {
continue;
}
Map<String, TopicConfigAndQueueMapping> configAndQueueMappingMap = new HashMap<>();
for (String broker: brokers) {
GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
header.setTopic(topic);
header.setBname(broker);
try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
configAndQueueMappingMap.put(broker, (TopicConfigAndQueueMapping) rpcResponse.getBody());
} catch (Throwable rt) {
log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
}
}
Iterator<Map.Entry<Integer, List<LogicQueueMappingItem>>> it = mappingDetail.getHostedQueues().entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Integer, List<LogicQueueMappingItem>> entry = it.next();
Integer queueId = entry.getKey();
List<LogicQueueMappingItem> items = entry.getValue();
if (items.size() < 2) {
continue;
}
LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
TopicConfigAndQueueMapping configAndQueueMapping = configAndQueueMappingMap.get(leaderItem.getBname());
if (configAndQueueMapping == null) {
continue;
}
List<LogicQueueMappingItem> itemsRemote = configAndQueueMapping.getMappingDetail().getHostedQueues().get(queueId);
//TODO
}
} catch (Throwable tt) {
log.error("Try cleanItemListMoreThanSecondGen failed for topic {}", topic, tt);
} finally {
UtilAll.sleep(10);
}
}
}
public void cleanItemExpired() {
String when = this.brokerController.getMessageStoreConfig().getDeleteWhen();
if (!UtilAll.isItTimeToDo(when)) {
return;
}
boolean changed = false;
long start = System.currentTimeMillis();
try {
for(String topic : topicQueueMappingTable.keySet()) {
try {
TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
if (mappingDetail == null
|| mappingDetail.getHostedQueues().isEmpty()) {
continue;
}
if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
continue;
}
Set<String> brokers = new HashSet<>();
for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
if (items.size() < 2) {
continue;
}
LogicQueueMappingItem earlistItem = items.get(0);
brokers.add(earlistItem.getBname());
}
Map<String, TopicStatsTable> statsTable = new HashMap<>();
for (String broker: brokers) {
GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
header.setTopic(topic);
header.setBname(broker);
try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
} catch (Throwable rt) {
log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
}
}
for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
if (items.size() < 2) {
continue;
}
LogicQueueMappingItem earlistItem = items.get(0);
TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
if (topicStats == null) {
continue;
}
TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
if (topicOffset == null) {
//this may should not happen
log.warn("Get null topicOffset for {}", earlistItem);
continue;
}
if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()) {
//TODO be careful of the concurrent problem
//Should use the lock
boolean result = items.remove(earlistItem);
changed = changed || result;
log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset);
}
}
} catch (Throwable tt) {
log.error("Try CleanItemExpired failed for {}", topic, tt);
} finally {
UtilAll.sleep(10);
}
}
} catch (Throwable t) {
log.error("Try cleanItemExpired failed", t);
} finally {
if (changed) {
this.dataVersion.nextVersion();
this.persist();
log.info("CleanItemExpired changed");
}
log.info("cleanItemExpired cost {} ms", System.currentTimeMillis() - start);
}
}
} }
...@@ -93,7 +93,7 @@ public class TopicQueueMappingManagerTest { ...@@ -93,7 +93,7 @@ public class TopicQueueMappingManagerTest {
Assert.assertEquals(0, topicQueueMappingManager.getTopicQueueMappingTable().size()); Assert.assertEquals(0, topicQueueMappingManager.getTopicQueueMappingTable().size());
for (TopicQueueMappingDetail mappingDetail : mappingDetailMap.values()) { for (TopicQueueMappingDetail mappingDetail : mappingDetailMap.values()) {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false); topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false, false, true);
} }
} }
topicQueueMappingManager.persist(); topicQueueMappingManager.persist();
......
...@@ -85,8 +85,8 @@ public class MixAll { ...@@ -85,8 +85,8 @@ public class MixAll {
public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml"; public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
public static final String REPLY_MESSAGE_FLAG = "reply"; public static final String REPLY_MESSAGE_FLAG = "reply";
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME = "__logical_queue_broker__"; public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME = "__logic_broker__";
public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST = "__logical_queue_broker_not_exist__"; public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST = "__logic_broker_none__";
public static String getWSAddr() { public static String getWSAddr() {
String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP); String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
......
...@@ -193,7 +193,7 @@ public class TopicQueueMappingUtils { ...@@ -193,7 +193,7 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum); return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
} }
public static void makeSureLogicQueueMappingItemImmutable(List<LogicQueueMappingItem> oldItems, List<LogicQueueMappingItem> newItems, boolean epochEqual) { public static void makeSureLogicQueueMappingItemImmutable(List<LogicQueueMappingItem> oldItems, List<LogicQueueMappingItem> newItems, boolean epochEqual, boolean isCLean) {
if (oldItems == null || oldItems.isEmpty()) { if (oldItems == null || oldItems.isEmpty()) {
return; return;
} }
...@@ -205,10 +205,15 @@ public class TopicQueueMappingUtils { ...@@ -205,10 +205,15 @@ public class TopicQueueMappingUtils {
LogicQueueMappingItem newItem = newItems.get(inew); LogicQueueMappingItem newItem = newItems.get(inew);
LogicQueueMappingItem oldItem = oldItems.get(iold); LogicQueueMappingItem oldItem = oldItems.get(iold);
if (newItem.getGen() < oldItem.getGen()) { if (newItem.getGen() < oldItem.getGen()) {
//the old one may have been deleted
inew++; inew++;
continue;
} else if (oldItem.getGen() < newItem.getGen()){ } else if (oldItem.getGen() < newItem.getGen()){
throw new RuntimeException("The gen is not correct for old item"); //the new one may be the "delete one from "
if (isCLean) {
iold++;
} else {
throw new RuntimeException("The new item-list has less items than old item-list");
}
} else { } else {
assert oldItem.getBname().equals(newItem.getBname()); assert oldItem.getBname().equals(newItem.getBname());
assert oldItem.getQueueId() == newItem.getQueueId(); assert oldItem.getQueueId() == newItem.getQueueId();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册