From 5020f3252633fab98ecb9a37c8dca2c368d31abb Mon Sep 17 00:00:00 2001 From: dongeforever Date: Tue, 13 Nov 2018 17:08:06 +0800 Subject: [PATCH] Add id for dleger commitlog --- .../java/org/apache/rocketmq/broker/BrokerController.java | 6 +++--- .../rocketmq/broker/dleger/DLegerRoleChangeHandler.java | 7 +++++-- .../org/apache/rocketmq/store/dleger/DLegerCommitLog.java | 7 +++++++ 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 9f66ed8e..a8526006 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -784,7 +784,7 @@ public class BrokerController { } if (messageStoreConfig.isEnableDLegerCommitLog()) { - changeToSlave(); + changeToSlave(((DLegerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getId()); } else { startProcessorByHa(); handleSlaveSynchronize(messageStoreConfig.getBrokerRole()); @@ -1059,9 +1059,9 @@ public class BrokerController { } } - public void changeToSlave() { + public void changeToSlave(int brokerId) { //change the role - brokerConfig.setBrokerId(1); //TO DO check + brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check messageStoreConfig.setBrokerRole(BrokerRole.SLAVE); //handle the scheduled service diff --git a/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java index 9503fc8e..3ba377bc 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java @@ -25,15 +25,18 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.config.BrokerRole; +import org.apache.rocketmq.store.dleger.DLegerCommitLog; public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHandler { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private BrokerController brokerController; private DefaultMessageStore messageStore; + private DLegerCommitLog dLegerCommitLog; public DLegerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) { this.brokerController = brokerController; this.messageStore = messageStore; + this.dLegerCommitLog = (DLegerCommitLog) messageStore.getCommitLog(); } @Override public void handle(long term, MemberState.Role role) { @@ -43,11 +46,11 @@ public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHa switch (role) { case CANDIDATE: if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) { - brokerController.changeToSlave(); + brokerController.changeToSlave(dLegerCommitLog.getId()); } break; case FOLLOWER: - brokerController.changeToSlave(); + brokerController.changeToSlave(dLegerCommitLog.getId()); break; case LEADER: while (messageStore.dispatchBehindBytes() != 0) { diff --git a/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java index 7bef4f19..deb34f16 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java @@ -58,6 +58,8 @@ public class DLegerCommitLog extends CommitLog { private final DLegerMmapFileStore dLegerFileStore; private final MmapFileList dLegerFileList; + private final int id; + private final MessageSerializer messageSerializer; @@ -71,6 +73,7 @@ public class DLegerCommitLog extends CommitLog { dLegerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup()); dLegerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers()); dLegerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()); + id = Integer.valueOf(dLegerConfig.getSelfId().substring(1)) + 1; dLegerServer = new DLegerServer(dLegerConfig); dLegerFileStore = (DLegerMmapFileStore) dLegerServer.getdLegerStore(); DLegerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> { @@ -654,4 +657,8 @@ public class DLegerCommitLog extends CommitLog { public DLegerServer getdLegerServer() { return dLegerServer; } + + public int getId() { + return id; + } } -- GitLab