提交 5020f325 编写于 作者: D dongeforever

Add id for dleger commitlog

上级 aed6f0ed
...@@ -784,7 +784,7 @@ public class BrokerController { ...@@ -784,7 +784,7 @@ public class BrokerController {
} }
if (messageStoreConfig.isEnableDLegerCommitLog()) { if (messageStoreConfig.isEnableDLegerCommitLog()) {
changeToSlave(); changeToSlave(((DLegerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getId());
} else { } else {
startProcessorByHa(); startProcessorByHa();
handleSlaveSynchronize(messageStoreConfig.getBrokerRole()); handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
...@@ -1059,9 +1059,9 @@ public class BrokerController { ...@@ -1059,9 +1059,9 @@ public class BrokerController {
} }
} }
public void changeToSlave() { public void changeToSlave(int brokerId) {
//change the role //change the role
brokerConfig.setBrokerId(1); //TO DO check brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check
messageStoreConfig.setBrokerRole(BrokerRole.SLAVE); messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);
//handle the scheduled service //handle the scheduled service
......
...@@ -25,15 +25,18 @@ import org.apache.rocketmq.logging.InternalLogger; ...@@ -25,15 +25,18 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.dleger.DLegerCommitLog;
public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHandler { public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private BrokerController brokerController; private BrokerController brokerController;
private DefaultMessageStore messageStore; private DefaultMessageStore messageStore;
private DLegerCommitLog dLegerCommitLog;
public DLegerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) { public DLegerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) {
this.brokerController = brokerController; this.brokerController = brokerController;
this.messageStore = messageStore; this.messageStore = messageStore;
this.dLegerCommitLog = (DLegerCommitLog) messageStore.getCommitLog();
} }
@Override public void handle(long term, MemberState.Role role) { @Override public void handle(long term, MemberState.Role role) {
...@@ -43,11 +46,11 @@ public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHa ...@@ -43,11 +46,11 @@ public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHa
switch (role) { switch (role) {
case CANDIDATE: case CANDIDATE:
if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) { if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
brokerController.changeToSlave(); brokerController.changeToSlave(dLegerCommitLog.getId());
} }
break; break;
case FOLLOWER: case FOLLOWER:
brokerController.changeToSlave(); brokerController.changeToSlave(dLegerCommitLog.getId());
break; break;
case LEADER: case LEADER:
while (messageStore.dispatchBehindBytes() != 0) { while (messageStore.dispatchBehindBytes() != 0) {
......
...@@ -58,6 +58,8 @@ public class DLegerCommitLog extends CommitLog { ...@@ -58,6 +58,8 @@ public class DLegerCommitLog extends CommitLog {
private final DLegerMmapFileStore dLegerFileStore; private final DLegerMmapFileStore dLegerFileStore;
private final MmapFileList dLegerFileList; private final MmapFileList dLegerFileList;
private final int id;
private final MessageSerializer messageSerializer; private final MessageSerializer messageSerializer;
...@@ -71,6 +73,7 @@ public class DLegerCommitLog extends CommitLog { ...@@ -71,6 +73,7 @@ public class DLegerCommitLog extends CommitLog {
dLegerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup()); dLegerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup());
dLegerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers()); dLegerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers());
dLegerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()); dLegerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
id = Integer.valueOf(dLegerConfig.getSelfId().substring(1)) + 1;
dLegerServer = new DLegerServer(dLegerConfig); dLegerServer = new DLegerServer(dLegerConfig);
dLegerFileStore = (DLegerMmapFileStore) dLegerServer.getdLegerStore(); dLegerFileStore = (DLegerMmapFileStore) dLegerServer.getdLegerStore();
DLegerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> { DLegerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> {
...@@ -654,4 +657,8 @@ public class DLegerCommitLog extends CommitLog { ...@@ -654,4 +657,8 @@ public class DLegerCommitLog extends CommitLog {
public DLegerServer getdLegerServer() { public DLegerServer getdLegerServer() {
return dLegerServer; return dLegerServer;
} }
public int getId() {
return id;
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册