提交 866e3090 编写于 作者: D dongeforever

Make role change handler asynchronous

上级 ffc733a4
......@@ -16,9 +16,13 @@
*/
package org.apache.rocketmq.broker.dleger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.dleger.DLegerLeaderElector;
import org.apache.rocketmq.dleger.DLegerServer;
import org.apache.rocketmq.dleger.MemberState;
import org.apache.rocketmq.dleger.utils.UtilAll;
import org.apache.rocketmq.logging.InternalLogger;
......@@ -30,41 +34,63 @@ import org.apache.rocketmq.store.dleger.DLegerCommitLog;
public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLegerRoleChangeHandler_"));
private BrokerController brokerController;
private DefaultMessageStore messageStore;
private DLegerCommitLog dLegerCommitLog;
private DLegerServer dLegerServer;
public DLegerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) {
this.brokerController = brokerController;
this.messageStore = messageStore;
this.dLegerCommitLog = (DLegerCommitLog) messageStore.getCommitLog();
this.dLegerServer = dLegerCommitLog.getdLegerServer();
}
@Override public void handle(long term, MemberState.Role role) {
long start = System.currentTimeMillis();
try {
log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
switch (role) {
case CANDIDATE:
if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
brokerController.changeToSlave(dLegerCommitLog.getId());
Runnable runnable = new Runnable() {
@Override public void run() {
long start = System.currentTimeMillis();
try {
log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
switch (role) {
case CANDIDATE:
if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
brokerController.changeToSlave(dLegerCommitLog.getId());
}
break;
case FOLLOWER:
brokerController.changeToSlave(dLegerCommitLog.getId());
break;
case LEADER:
while (dLegerServer.getMemberState().isLeader()
&& (dLegerServer.getdLegerStore().getLegerEndIndex() != dLegerServer.getdLegerStore().getCommittedIndex() || messageStore.dispatchBehindBytes() != 0)) {
UtilAll.sleep(100);
}
boolean succ = dLegerServer.getMemberState().isLeader()
&& dLegerServer.getdLegerStore().getLegerEndIndex() == dLegerServer.getdLegerStore().getCommittedIndex()
&& messageStore.dispatchBehindBytes() == 0;
if (succ) {
messageStore.recoverTopicQueueTable();
brokerController.changeToMaster(BrokerRole.SYNC_MASTER);
}
log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), UtilAll.elapsed(start));
break;
default:
break;
}
break;
case FOLLOWER:
brokerController.changeToSlave(dLegerCommitLog.getId());
break;
case LEADER:
while (messageStore.dispatchBehindBytes() != 0) {
Thread.sleep(100);
}
messageStore.recoverTopicQueueTable();
brokerController.changeToMaster(BrokerRole.SYNC_MASTER);
break;
default:
break;
} catch (Throwable t) {
log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), UtilAll.elapsed(start), t);
}
}
log.info("Finish handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), UtilAll.elapsed(start));
} catch (Throwable t) {
log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), UtilAll.elapsed(start), t);
}
};
executorService.submit(runnable);
}
@Override public void startup() {
}
@Override public void shutdown() {
executorService.shutdown();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册