提交 9c8fa5e1 编写于 作者: D dongeforever

Catch service thread exception of scheduledMessage and transactionalMessage when changing role

上级 30028a11
......@@ -1067,10 +1067,18 @@ public class BrokerController {
messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);
//handle the scheduled service
this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);
try {
this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);
} catch (Throwable t) {
log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t);
}
//handle the transactional service
this.shutdownProcessorByHa();
try {
this.shutdownProcessorByHa();
} catch (Throwable t) {
log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t);
}
//handle the slave synchronise
handleSlaveSynchronize(BrokerRole.SLAVE);
......@@ -1095,10 +1103,18 @@ public class BrokerController {
handleSlaveSynchronize(role);
//handle the scheduled service
this.messageStore.handleScheduleMessageService(role);
try {
this.messageStore.handleScheduleMessageService(role);
} catch (Throwable t) {
log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t);
}
//handle the transactional service
this.startProcessorByHa(BrokerRole.SYNC_MASTER);
try {
this.startProcessorByHa(BrokerRole.SYNC_MASTER);
} catch (Throwable t) {
log.error("[MONITOR] startProcessorByHa failed when changing to master", t);
}
//if the operations above are totally successful, we change to master
brokerConfig.setBrokerId(0); //TO DO check
......
......@@ -51,33 +51,36 @@ public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChange
@Override public void run() {
long start = System.currentTimeMillis();
try {
boolean succ = false;
log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
switch (role) {
case CANDIDATE:
if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
brokerController.changeToSlave(dLedgerCommitLog.getId());
}
succ = true;
break;
case FOLLOWER:
brokerController.changeToSlave(dLedgerCommitLog.getId());
succ = true;
break;
case LEADER:
while (dLegerServer.getMemberState().isLeader()
&& (dLegerServer.getdLedgerStore().getLedgerEndIndex() != dLegerServer.getdLedgerStore().getCommittedIndex() || messageStore.dispatchBehindBytes() != 0)) {
UtilAll.sleep(100);
}
boolean succ = dLegerServer.getMemberState().isLeader()
succ = dLegerServer.getMemberState().isLeader()
&& dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex()
&& messageStore.dispatchBehindBytes() == 0;
if (succ) {
messageStore.recoverTopicQueueTable();
brokerController.changeToMaster(BrokerRole.SYNC_MASTER);
}
log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), UtilAll.elapsed(start));
break;
default:
break;
}
log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), UtilAll.elapsed(start));
} catch (Throwable t) {
log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), UtilAll.elapsed(start), t);
}
......
......@@ -152,7 +152,7 @@ public class BrokerOuterAPI {
registerBrokerResultList.add(result);
}
log.info("register broker to name server {} OK", namesrvAddr);
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
......
......@@ -29,31 +29,10 @@ public class TransactionalMessageCheckService extends ServiceThread {
private BrokerController brokerController;
private final AtomicBoolean started = new AtomicBoolean(false);
public TransactionalMessageCheckService(BrokerController brokerController) {
this.brokerController = brokerController;
}
@Override
public void start() {
if (started.compareAndSet(false, true)) {
super.start();
//no need to do this
//this.brokerController.getTransactionalMessageService().open();
}
}
@Override
public void shutdown(boolean interrupt) {
if (started.compareAndSet(true, false)) {
super.shutdown(interrupt);
//no need to do this
//this.brokerController.getTransactionalMessageService().close();
//this.brokerController.getTransactionalMessageCheckListener().shutDown();
}
}
@Override
public String getServiceName() {
return TransactionalMessageCheckService.class.getSimpleName();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册