From 866e3090393fe4a091364f4889af69ed0643b527 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Tue, 20 Nov 2018 15:06:46 +0800 Subject: [PATCH] Make role change handler asynchronous --- .../dleger/DLegerRoleChangeHandler.java | 74 +++++++++++++------ 1 file changed, 50 insertions(+), 24 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java index 67498964..e1d8e7e9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java @@ -16,9 +16,13 @@ */ package org.apache.rocketmq.broker.dleger; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.dleger.DLegerLeaderElector; +import org.apache.rocketmq.dleger.DLegerServer; import org.apache.rocketmq.dleger.MemberState; import org.apache.rocketmq.dleger.utils.UtilAll; import org.apache.rocketmq.logging.InternalLogger; @@ -30,41 +34,63 @@ import org.apache.rocketmq.store.dleger.DLegerCommitLog; public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHandler { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLegerRoleChangeHandler_")); private BrokerController brokerController; private DefaultMessageStore messageStore; private DLegerCommitLog dLegerCommitLog; + private DLegerServer dLegerServer; public DLegerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) { this.brokerController = brokerController; this.messageStore = messageStore; this.dLegerCommitLog = (DLegerCommitLog) messageStore.getCommitLog(); + this.dLegerServer = dLegerCommitLog.getdLegerServer(); } @Override public void handle(long term, MemberState.Role role) { - long start = System.currentTimeMillis(); - try { - log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole()); - switch (role) { - case CANDIDATE: - if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) { - brokerController.changeToSlave(dLegerCommitLog.getId()); + Runnable runnable = new Runnable() { + @Override public void run() { + long start = System.currentTimeMillis(); + try { + log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole()); + switch (role) { + case CANDIDATE: + if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) { + brokerController.changeToSlave(dLegerCommitLog.getId()); + } + break; + case FOLLOWER: + brokerController.changeToSlave(dLegerCommitLog.getId()); + break; + case LEADER: + while (dLegerServer.getMemberState().isLeader() + && (dLegerServer.getdLegerStore().getLegerEndIndex() != dLegerServer.getdLegerStore().getCommittedIndex() || messageStore.dispatchBehindBytes() != 0)) { + UtilAll.sleep(100); + } + boolean succ = dLegerServer.getMemberState().isLeader() + && dLegerServer.getdLegerStore().getLegerEndIndex() == dLegerServer.getdLegerStore().getCommittedIndex() + && messageStore.dispatchBehindBytes() == 0; + if (succ) { + messageStore.recoverTopicQueueTable(); + brokerController.changeToMaster(BrokerRole.SYNC_MASTER); + } + log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), UtilAll.elapsed(start)); + break; + default: + break; } - break; - case FOLLOWER: - brokerController.changeToSlave(dLegerCommitLog.getId()); - break; - case LEADER: - while (messageStore.dispatchBehindBytes() != 0) { - Thread.sleep(100); - } - messageStore.recoverTopicQueueTable(); - brokerController.changeToMaster(BrokerRole.SYNC_MASTER); - break; - default: - break; + } catch (Throwable t) { + log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), UtilAll.elapsed(start), t); + } } - log.info("Finish handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), UtilAll.elapsed(start)); - } catch (Throwable t) { - log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), UtilAll.elapsed(start), t); - } + }; + executorService.submit(runnable); + } + + @Override public void startup() { + + } + + @Override public void shutdown() { + executorService.shutdown(); } } -- GitLab