From cf8b84659fbf5ca22c31fa336a615843f98fde01 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Sat, 20 Nov 2021 12:33:15 +0800 Subject: [PATCH] Polish the lock and persist for updateTopicQueueMapping --- .../processor/AdminBrokerProcessor.java | 2 +- .../topic/TopicQueueMappingManager.java | 23 +++++++++++++++---- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index bab484d9..9048584c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -343,7 +343,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); response.setCode(ResponseCode.SUCCESS); } catch (Exception e) { - log.error("Update static failed for [{}]", request, e); + log.error("Update static topic failed for [{}]", request, e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(e.getMessage()); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index e2b46fed..d5b76b87 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -36,6 +36,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -57,9 +58,15 @@ public class TopicQueueMappingManager extends ConfigManager { this.brokerController = brokerController; } - public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) { - lock.lock(); + public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) throws Exception { + boolean locked = false; + boolean updated = false; try { + if (lock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + locked = true; + } else { + return; + } if (newDetail == null) { return; } @@ -70,6 +77,7 @@ public class TopicQueueMappingManager extends ConfigManager { TopicQueueMappingDetail oldDetail = topicQueueMappingTable.get(newDetail.getTopic()); if (oldDetail == null) { topicQueueMappingTable.put(newDetail.getTopic(), newDetail); + updated = true; return; } if (force) { @@ -77,6 +85,7 @@ public class TopicQueueMappingManager extends ConfigManager { newDetail.getHostedQueues().putIfAbsent(queueId, items); }); topicQueueMappingTable.put(newDetail.getTopic(), newDetail); + updated = true; return; } //do more check @@ -94,8 +103,14 @@ public class TopicQueueMappingManager extends ConfigManager { } } topicQueueMappingTable.put(newDetail.getTopic(), newDetail); - } finally { - lock.unlock(); + updated = true; + } finally { + if (locked) { + this.lock.unlock(); + } + if (updated) { + this.persist(); + } } } -- GitLab