提交 cf8b8465 编写于 作者: D dongeforever

Polish the lock and persist for updateTopicQueueMapping

上级 d18d787f
...@@ -343,7 +343,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -343,7 +343,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
response.setCode(ResponseCode.SUCCESS); response.setCode(ResponseCode.SUCCESS);
} catch (Exception e) { } catch (Exception e) {
log.error("Update static failed for [{}]", request, e); log.error("Update static topic failed for [{}]", request, e);
response.setCode(ResponseCode.SYSTEM_ERROR); response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(e.getMessage()); response.setRemark(e.getMessage());
} }
......
...@@ -36,6 +36,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; ...@@ -36,6 +36,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
...@@ -57,9 +58,15 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -57,9 +58,15 @@ public class TopicQueueMappingManager extends ConfigManager {
this.brokerController = brokerController; this.brokerController = brokerController;
} }
public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) { public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) throws Exception {
lock.lock(); boolean locked = false;
boolean updated = false;
try { try {
if (lock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
locked = true;
} else {
return;
}
if (newDetail == null) { if (newDetail == null) {
return; return;
} }
...@@ -70,6 +77,7 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -70,6 +77,7 @@ public class TopicQueueMappingManager extends ConfigManager {
TopicQueueMappingDetail oldDetail = topicQueueMappingTable.get(newDetail.getTopic()); TopicQueueMappingDetail oldDetail = topicQueueMappingTable.get(newDetail.getTopic());
if (oldDetail == null) { if (oldDetail == null) {
topicQueueMappingTable.put(newDetail.getTopic(), newDetail); topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
updated = true;
return; return;
} }
if (force) { if (force) {
...@@ -77,6 +85,7 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -77,6 +85,7 @@ public class TopicQueueMappingManager extends ConfigManager {
newDetail.getHostedQueues().putIfAbsent(queueId, items); newDetail.getHostedQueues().putIfAbsent(queueId, items);
}); });
topicQueueMappingTable.put(newDetail.getTopic(), newDetail); topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
updated = true;
return; return;
} }
//do more check //do more check
...@@ -94,8 +103,14 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -94,8 +103,14 @@ public class TopicQueueMappingManager extends ConfigManager {
} }
} }
topicQueueMappingTable.put(newDetail.getTopic(), newDetail); topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
} finally { updated = true;
lock.unlock(); } finally {
if (locked) {
this.lock.unlock();
}
if (updated) {
this.persist();
}
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册