diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index e7ef46d0c61607e8d81eb478ecdcec373947e6f2..d5d6a53a367271d42dbaf2d10c0c5741e0c363e1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -38,6 +39,7 @@ import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener; import org.apache.rocketmq.broker.client.ProducerManager; import org.apache.rocketmq.broker.client.net.Broker2Client; import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager; +import org.apache.rocketmq.broker.dleger.DLegerRoleChangeHandler; import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap; import org.apache.rocketmq.broker.filter.ConsumerFilterManager; import org.apache.rocketmq.broker.filtersrv.FilterServerManager; @@ -97,6 +99,7 @@ import org.apache.rocketmq.store.MessageArrivingListener; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.dleger.DLegerCommitLog; import org.apache.rocketmq.store.stats.BrokerStats; import org.apache.rocketmq.store.stats.BrokerStatsManager; @@ -156,6 +159,7 @@ public class BrokerController { private TransactionalMessageCheckService transactionalMessageCheckService; private TransactionalMessageService transactionalMessageService; private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener; + private Future slaveSyncFuture; public BrokerController( final BrokerConfig brokerConfig, @@ -231,6 +235,10 @@ public class BrokerController { this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); + if (messageStoreConfig.isEnableDLegerCommitLog()) { + DLegerRoleChangeHandler roleChangeHandler = new DLegerRoleChangeHandler(this, (DefaultMessageStore) messageStore); + ((DLegerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLegerServer().getdLegerLeaderElector().addRoleChangeHandler(roleChangeHandler); + } this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); //load plugin MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); @@ -392,40 +400,6 @@ public class BrokerController { } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } - - if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { - if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { - this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); - this.updateMasterHAServerAddrPeriodically = false; - } else { - this.updateMasterHAServerAddrPeriodically = true; - } - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - try { - BrokerController.this.slaveSynchronize.syncAll(); - } catch (Throwable e) { - log.error("ScheduledTask syncAll slave exception", e); - } - } - }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); - } else { - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - try { - BrokerController.this.printMasterAndSlaveDiff(); - } catch (Throwable e) { - log.error("schedule printMasterAndSlaveDiff error.", e); - } - } - }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); - } - if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { @@ -831,11 +805,9 @@ public class BrokerController { this.brokerFastFailure.start(); } - if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) { - if (this.transactionalMessageCheckService != null) { - log.info("Start transaction service!"); - this.transactionalMessageCheckService.start(); - } + if (!messageStoreConfig.isEnableDLegerCommitLog()) { + startProcessorByHa(); + handleSlaveSynchronize(messageStoreConfig.getBrokerRole()); } } @@ -1052,4 +1024,95 @@ public class BrokerController { public BlockingQueue getEndTransactionThreadPoolQueue() { return endTransactionThreadPoolQueue; } + + + + private void handleSlaveSynchronize(BrokerRole role) { + if (role == BrokerRole.SLAVE) { + if (null != slaveSyncFuture){ + slaveSyncFuture.cancel(false); + } + this.slaveSynchronize.setMasterAddr(null); + slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + BrokerController.this.slaveSynchronize.syncAll(); + } + catch (Throwable e) { + log.error("ScheduledTask SlaveSynchronize syncAll error.", e); + } + } + }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); + } else { + //handle the slave synchronise + if (null != slaveSyncFuture){ + slaveSyncFuture.cancel(false); + } + this.slaveSynchronize.setMasterAddr(null); + } + } + + public void changeToSlave() { + //change the role + brokerConfig.setBrokerId(1); //TO DO check + messageStoreConfig.setBrokerRole(BrokerRole.SLAVE); + + //handle the scheduled service + this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE); + + //handle the transactional service + this.shutdownProcessorByHa(); + + //handle the slave synchronise + handleSlaveSynchronize(BrokerRole.SLAVE); + + try { + this.registerBrokerAll(true, true, brokerConfig.isForceRegister()); + } catch (Throwable ignored) { + + } + } + + + + public void changeToMaster(BrokerRole role) { + if (role == BrokerRole.SLAVE) { + return; + } + //handle the slave synchronise + handleSlaveSynchronize(role); + //change the role + brokerConfig.setBrokerId(0); //TO DO check + messageStoreConfig.setBrokerRole(role); + + //handle the scheduled service + this.messageStore.handleScheduleMessageService(role); + + //handle the transactional service + this.startProcessorByHa(); + + try { + this.registerBrokerAll(true, true, brokerConfig.isForceRegister()); + } catch (Throwable ignored) { + + } + } + + private void startProcessorByHa() { + if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) { + if (this.transactionalMessageCheckService != null) { + this.transactionalMessageCheckService.start(); + } + } + } + + private void shutdownProcessorByHa() { + if (this.transactionalMessageCheckService != null) { + this.transactionalMessageCheckService.shutdown(true); + } + } + + + } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..3d3b63a121fb7c39dc707dcd52c73b571faf4849 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java @@ -0,0 +1,49 @@ +package org.apache.rocketmq.broker.dleger; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.dleger.DLegerLeaderElector; +import org.apache.rocketmq.dleger.MemberState; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.config.BrokerRole; + +public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHandler { + + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private BrokerController brokerController; + private DefaultMessageStore messageStore; + public DLegerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) { + this.brokerController = brokerController; + this.messageStore = messageStore; + } + + @Override public void handle(long term, MemberState.Role role) { + try { + log.info("Begin handling lastRole change term={} lastRole={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole()); + switch (role) { + case CANDIDATE: + if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) { + brokerController.changeToSlave(); + } + break; + case FOLLOWER: + brokerController.changeToSlave(); + break; + case LEADER: + while (messageStore.dispatchBehindBytes() != 0) { + Thread.sleep(100); + } + messageStore.recoverTopicQueueTable(); + brokerController.changeToMaster(BrokerRole.SYNC_MASTER); + break; + default: + break; + } + log.info("Finish handling lastRole change term={} lastRole={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole()); + } catch (Throwable t) { + log.info("Failed handling lastRole change term={} lastRole={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), t); + } + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index 643a812fa02b8d64f62279b2c1cc814c1d6b6a7d..7b5e5645a0b0f383863116892efd27e7e22c0604 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -54,7 +54,7 @@ public class SlaveSynchronize { private void syncTopicConfig() { String masterAddrBak = this.masterAddr; - if (masterAddrBak != null) { + if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) { try { TopicConfigSerializeWrapper topicWrapper = this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak); @@ -78,7 +78,7 @@ public class SlaveSynchronize { private void syncConsumerOffset() { String masterAddrBak = this.masterAddr; - if (masterAddrBak != null) { + if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) { try { ConsumerOffsetSerializeWrapper offsetWrapper = this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak); @@ -94,7 +94,7 @@ public class SlaveSynchronize { private void syncDelayOffset() { String masterAddrBak = this.masterAddr; - if (masterAddrBak != null) { + if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) { try { String delayOffset = this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak); @@ -118,7 +118,7 @@ public class SlaveSynchronize { private void syncSubscriptionGroupConfig() { String masterAddrBak = this.masterAddr; - if (masterAddrBak != null) { + if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) { try { SubscriptionGroupWrapper subscriptionWrapper = this.brokerController.getBrokerOuterAPI() diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 76be492ba468a5efb595536cec5fdbfc4586412f..cd0c46def2795ec12a2bf9df12446426aef6abeb 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -226,10 +226,6 @@ public class DefaultMessageStore implements MessageStore { this.commitLog.start(); this.storeStatsService.start(); - if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) { - this.scheduleMessageService.start(); - } - if (this.getMessageStoreConfig().isDuplicationEnable()) { this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset()); } else { @@ -237,13 +233,18 @@ public class DefaultMessageStore implements MessageStore { } this.reputMessageService.start(); - this.haService.start(); + if (!messageStoreConfig.isEnableDLegerCommitLog()) { + this.haService.start(); + this.handleScheduleMessageService(messageStoreConfig.getBrokerRole()); + } this.createTempFile(); this.addScheduleTask(); this.shutdown = false; } + + public void shutdown() { if (!this.shutdown) { this.shutdown = true; @@ -260,8 +261,9 @@ public class DefaultMessageStore implements MessageStore { if (this.scheduleMessageService != null) { this.scheduleMessageService.shutdown(); } - - this.haService.shutdown(); + if (this.haService != null) { + this.haService.shutdown(); + } this.storeStatsService.shutdown(); this.indexService.shutdown(); @@ -1325,7 +1327,7 @@ public class DefaultMessageStore implements MessageStore { return maxPhysicOffset; } - private void recoverTopicQueueTable() { + public void recoverTopicQueueTable() { HashMap table = new HashMap(1024); long minPhyOffset = this.commitLog.getMinOffset(); for (ConcurrentMap maps : this.consumeQueueTable.values()) { @@ -1387,6 +1389,18 @@ public class DefaultMessageStore implements MessageStore { return brokerStatsManager; } + @Override + public void handleScheduleMessageService(final BrokerRole brokerRole) { + if (this.scheduleMessageService != null) { + if (brokerRole == BrokerRole.SLAVE) { + this.scheduleMessageService.shutdown(); + } else { + this.scheduleMessageService.start(); + } + } + + } + public int remainTransientStoreBufferNumbs() { return this.transientStorePool.remainBufferNumbs(); } diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 0f9b4f0ae6e35ef552c521855078b5638b705313..5a046ff181f747624f1bb7c427541dfba28509e3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -21,6 +21,7 @@ import java.util.LinkedList; import java.util.Set; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.stats.BrokerStatsManager; /** @@ -366,4 +367,10 @@ public interface MessageStore { * @return BrokerStatsManager. */ BrokerStatsManager getBrokerStatsManager(); + + /** + * handle + * @param brokerRole + */ + void handleScheduleMessageService(BrokerRole brokerRole); } diff --git a/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java index 272a1868ba02c461c67c608b44c2e078bde3b0f3..16a2f27486b46c8e6a5c2f58068fe53558d589fc 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java @@ -626,4 +626,8 @@ public class DLegerCommitLog extends CommitLog { } } + + public DLegerServer getdLegerServer() { + return dLegerServer; + } } diff --git a/store/src/test/java/org/apache/rocketmq/store/dleger/DLegerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dleger/DLegerCommitlogTest.java index 8ad58c8f403001bd49df8e4ef0213c2a6e00ad24..67c2ed9164973906bd8ef82bd4306f919673702e 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dleger/DLegerCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dleger/DLegerCommitlogTest.java @@ -87,6 +87,7 @@ public class DLegerCommitlogTest extends StoreTestBase { Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId(), messageExt.getMsgId()); Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(), messageExt.getCommitLogOffset()); } + messageStore.destroy(); } diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java index 9dcb2d8b755cb1e521276ea05b437034a6ebf846..48d9a266e090b7a95d4d040c22aa64fd8a7a50d4 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java @@ -179,15 +179,7 @@ public class IntegrationTestBase { if (!file.exists()) { return; } - if (file.isFile()) { - file.delete(); - } else if (file.isDirectory()) { - File[] files = file.listFiles(); - for (File file1 : files) { - deleteFile(file1); - } - file.delete(); - } + UtilAll.deleteFile(file); } }