提交 5f730028 编写于 作者: D dongeforever

Add role change handler for broker controller

上级 515bc353
......@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
......@@ -38,6 +39,7 @@ import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
import org.apache.rocketmq.broker.dleger.DLegerRoleChangeHandler;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
......@@ -97,6 +99,7 @@ import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.dleger.DLegerCommitLog;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
......@@ -156,6 +159,7 @@ public class BrokerController {
private TransactionalMessageCheckService transactionalMessageCheckService;
private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
private Future<?> slaveSyncFuture;
public BrokerController(
final BrokerConfig brokerConfig,
......@@ -231,6 +235,10 @@ public class BrokerController {
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLegerRoleChangeHandler roleChangeHandler = new DLegerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLegerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLegerServer().getdLegerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
......@@ -392,40 +400,6 @@ public class BrokerController {
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
} catch (Throwable e) {
log.error("ScheduledTask syncAll slave exception", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
......@@ -831,11 +805,9 @@ public class BrokerController {
this.brokerFastFailure.start();
}
if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) {
if (this.transactionalMessageCheckService != null) {
log.info("Start transaction service!");
this.transactionalMessageCheckService.start();
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa();
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
}
}
......@@ -1052,4 +1024,95 @@ public class BrokerController {
public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
return endTransactionThreadPoolQueue;
}
private void handleSlaveSynchronize(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
if (null != slaveSyncFuture){
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
}
catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
} else {
//handle the slave synchronise
if (null != slaveSyncFuture){
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
}
}
public void changeToSlave() {
//change the role
brokerConfig.setBrokerId(1); //TO DO check
messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);
//handle the scheduled service
this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);
//handle the transactional service
this.shutdownProcessorByHa();
//handle the slave synchronise
handleSlaveSynchronize(BrokerRole.SLAVE);
try {
this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
} catch (Throwable ignored) {
}
}
public void changeToMaster(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
return;
}
//handle the slave synchronise
handleSlaveSynchronize(role);
//change the role
brokerConfig.setBrokerId(0); //TO DO check
messageStoreConfig.setBrokerRole(role);
//handle the scheduled service
this.messageStore.handleScheduleMessageService(role);
//handle the transactional service
this.startProcessorByHa();
try {
this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
} catch (Throwable ignored) {
}
}
private void startProcessorByHa() {
if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.start();
}
}
}
private void shutdownProcessorByHa() {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.shutdown(true);
}
}
}
package org.apache.rocketmq.broker.dleger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.dleger.DLegerLeaderElector;
import org.apache.rocketmq.dleger.MemberState;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private BrokerController brokerController;
private DefaultMessageStore messageStore;
public DLegerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) {
this.brokerController = brokerController;
this.messageStore = messageStore;
}
@Override public void handle(long term, MemberState.Role role) {
try {
log.info("Begin handling lastRole change term={} lastRole={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
switch (role) {
case CANDIDATE:
if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
brokerController.changeToSlave();
}
break;
case FOLLOWER:
brokerController.changeToSlave();
break;
case LEADER:
while (messageStore.dispatchBehindBytes() != 0) {
Thread.sleep(100);
}
messageStore.recoverTopicQueueTable();
brokerController.changeToMaster(BrokerRole.SYNC_MASTER);
break;
default:
break;
}
log.info("Finish handling lastRole change term={} lastRole={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
} catch (Throwable t) {
log.info("Failed handling lastRole change term={} lastRole={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), t);
}
}
}
......@@ -54,7 +54,7 @@ public class SlaveSynchronize {
private void syncTopicConfig() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
TopicConfigSerializeWrapper topicWrapper =
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
......@@ -78,7 +78,7 @@ public class SlaveSynchronize {
private void syncConsumerOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
ConsumerOffsetSerializeWrapper offsetWrapper =
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
......@@ -94,7 +94,7 @@ public class SlaveSynchronize {
private void syncDelayOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
String delayOffset =
this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
......@@ -118,7 +118,7 @@ public class SlaveSynchronize {
private void syncSubscriptionGroupConfig() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
SubscriptionGroupWrapper subscriptionWrapper =
this.brokerController.getBrokerOuterAPI()
......
......@@ -226,10 +226,6 @@ public class DefaultMessageStore implements MessageStore {
this.commitLog.start();
this.storeStatsService.start();
if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
this.scheduleMessageService.start();
}
if (this.getMessageStoreConfig().isDuplicationEnable()) {
this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
} else {
......@@ -237,13 +233,18 @@ public class DefaultMessageStore implements MessageStore {
}
this.reputMessageService.start();
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService.start();
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
this.createTempFile();
this.addScheduleTask();
this.shutdown = false;
}
public void shutdown() {
if (!this.shutdown) {
this.shutdown = true;
......@@ -260,8 +261,9 @@ public class DefaultMessageStore implements MessageStore {
if (this.scheduleMessageService != null) {
this.scheduleMessageService.shutdown();
}
if (this.haService != null) {
this.haService.shutdown();
}
this.storeStatsService.shutdown();
this.indexService.shutdown();
......@@ -1325,7 +1327,7 @@ public class DefaultMessageStore implements MessageStore {
return maxPhysicOffset;
}
private void recoverTopicQueueTable() {
public void recoverTopicQueueTable() {
HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
long minPhyOffset = this.commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
......@@ -1387,6 +1389,18 @@ public class DefaultMessageStore implements MessageStore {
return brokerStatsManager;
}
@Override
public void handleScheduleMessageService(final BrokerRole brokerRole) {
if (this.scheduleMessageService != null) {
if (brokerRole == BrokerRole.SLAVE) {
this.scheduleMessageService.shutdown();
} else {
this.scheduleMessageService.start();
}
}
}
public int remainTransientStoreBufferNumbs() {
return this.transientStorePool.remainBufferNumbs();
}
......
......@@ -21,6 +21,7 @@ import java.util.LinkedList;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
/**
......@@ -366,4 +367,10 @@ public interface MessageStore {
* @return BrokerStatsManager.
*/
BrokerStatsManager getBrokerStatsManager();
/**
* handle
* @param brokerRole
*/
void handleScheduleMessageService(BrokerRole brokerRole);
}
......@@ -626,4 +626,8 @@ public class DLegerCommitLog extends CommitLog {
}
}
public DLegerServer getdLegerServer() {
return dLegerServer;
}
}
......@@ -87,6 +87,7 @@ public class DLegerCommitlogTest extends StoreTestBase {
Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId(), messageExt.getMsgId());
Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(), messageExt.getCommitLogOffset());
}
messageStore.destroy();
}
......
......@@ -179,15 +179,7 @@ public class IntegrationTestBase {
if (!file.exists()) {
return;
}
if (file.isFile()) {
file.delete();
} else if (file.isDirectory()) {
File[] files = file.listFiles();
for (File file1 : files) {
deleteFile(file1);
}
file.delete();
}
UtilAll.deleteFile(file);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册