diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index bab484d97a8c49731b0ef57656369e4f032ec237..9048584c42de1ed29cc9b2da23f4543a99f13d97 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -343,7 +343,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); response.setCode(ResponseCode.SUCCESS); } catch (Exception e) { - log.error("Update static failed for [{}]", request, e); + log.error("Update static topic failed for [{}]", request, e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(e.getMessage()); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index e2b46fed9d20ff1a5bcf3489d06c7b4cf0693534..d5b76b87e5e49ed99797a7cdd4a13afbd29c7e71 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -36,6 +36,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -57,9 +58,15 @@ public class TopicQueueMappingManager extends ConfigManager { this.brokerController = brokerController; } - public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) { - lock.lock(); + public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) throws Exception { + boolean locked = false; + boolean updated = false; try { + if (lock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + locked = true; + } else { + return; + } if (newDetail == null) { return; } @@ -70,6 +77,7 @@ public class TopicQueueMappingManager extends ConfigManager { TopicQueueMappingDetail oldDetail = topicQueueMappingTable.get(newDetail.getTopic()); if (oldDetail == null) { topicQueueMappingTable.put(newDetail.getTopic(), newDetail); + updated = true; return; } if (force) { @@ -77,6 +85,7 @@ public class TopicQueueMappingManager extends ConfigManager { newDetail.getHostedQueues().putIfAbsent(queueId, items); }); topicQueueMappingTable.put(newDetail.getTopic(), newDetail); + updated = true; return; } //do more check @@ -94,8 +103,14 @@ public class TopicQueueMappingManager extends ConfigManager { } } topicQueueMappingTable.put(newDetail.getTopic(), newDetail); - } finally { - lock.unlock(); + updated = true; + } finally { + if (locked) { + this.lock.unlock(); + } + if (updated) { + this.persist(); + } } }