提交 5ce20e8a 编写于 作者: D dongeforever

Make sure the fallen behind node not to be leader

上级 a90342e9
...@@ -51,27 +51,32 @@ public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChange ...@@ -51,27 +51,32 @@ public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChange
@Override public void run() { @Override public void run() {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
try { try {
boolean succ = false; boolean succ = true;
log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole()); log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
switch (role) { switch (role) {
case CANDIDATE: case CANDIDATE:
if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) { if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
brokerController.changeToSlave(dLedgerCommitLog.getId()); brokerController.changeToSlave(dLedgerCommitLog.getId());
} }
succ = true;
break; break;
case FOLLOWER: case FOLLOWER:
brokerController.changeToSlave(dLedgerCommitLog.getId()); brokerController.changeToSlave(dLedgerCommitLog.getId());
succ = true;
break; break;
case LEADER: case LEADER:
while (dLegerServer.getMemberState().isLeader() while (true) {
&& (dLegerServer.getdLedgerStore().getLedgerEndIndex() != dLegerServer.getdLedgerStore().getCommittedIndex() || messageStore.dispatchBehindBytes() != 0)) { if (!dLegerServer.getMemberState().isLeader()) {
DLedgerUtils.sleep(100); succ = false;
break;
}
if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == -1) {
break;
}
if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex()
&& messageStore.dispatchBehindBytes() == 0) {
break;
}
Thread.sleep(100);
} }
succ = dLegerServer.getMemberState().isLeader()
&& dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex()
&& messageStore.dispatchBehindBytes() == 0;
if (succ) { if (succ) {
messageStore.recoverTopicQueueTable(); messageStore.recoverTopicQueueTable();
brokerController.changeToMaster(BrokerRole.SYNC_MASTER); brokerController.changeToMaster(BrokerRole.SYNC_MASTER);
......
...@@ -222,12 +222,11 @@ public class DefaultMessageStore implements MessageStore { ...@@ -222,12 +222,11 @@ public class DefaultMessageStore implements MessageStore {
lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes())); lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
lockFile.getChannel().force(true); lockFile.getChannel().force(true);
{
this.flushConsumeQueueService.start(); /**
this.commitLog.start(); * 1. calculate the reput offset according to the consume queue;
this.storeStatsService.start(); * 2. make sure the lagged messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
*/
//calculate the reput offset from the consume queue itself
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset(); long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) { for (ConsumeQueue logic : maps.values()) {
...@@ -250,17 +249,23 @@ public class DefaultMessageStore implements MessageStore { ...@@ -250,17 +249,23 @@ public class DefaultMessageStore implements MessageStore {
this.reputMessageService.setReputFromOffset(reputOffset); this.reputMessageService.setReputFromOffset(reputOffset);
this.reputMessageService.start(); this.reputMessageService.start();
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService.start();
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
//Finish dispatching the messages fall behind //Finish dispatching the messages fall behind
while (this.dispatchBehindBytes() != 0) { //Note that, if dledger is enabled, the maxOffset maybe -1, so here only require the dispatchBehindBytes > 0
while (this.dispatchBehindBytes() > 0) {
Thread.sleep(1000); Thread.sleep(1000);
log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes()); log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
} }
this.recoverTopicQueueTable(); this.recoverTopicQueueTable();
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService.start();
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
this.flushConsumeQueueService.start();
this.commitLog.start();
this.storeStatsService.start();
this.createTempFile(); this.createTempFile();
this.addScheduleTask(); this.addScheduleTask();
......
...@@ -86,6 +86,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -86,6 +86,7 @@ public class DLedgerCommitLog extends CommitLog {
dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers()); dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers());
dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()); dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog()); dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog());
dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen());
originalDledgerEnableForceClean = dLedgerConfig.isEnableDiskForceClean(); originalDledgerEnableForceClean = dLedgerConfig.isEnableDiskForceClean();
id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1; id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1;
dLedgerServer = new DLedgerServer(dLedgerConfig); dLedgerServer = new DLedgerServer(dLedgerConfig);
...@@ -129,25 +130,26 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -129,25 +130,26 @@ public class DLedgerCommitLog extends CommitLog {
@Override @Override
public long getMaxOffset() { public long getMaxOffset() {
if (this.dLedgerFileStore.getCommittedPos() != -1) { if (dividedCommitlogOffset > 0 && dLedgerFileStore.getCommittedPos() < 0) {
return dLedgerFileStore.getCommittedPos(); return dividedCommitlogOffset;
} else {
return this.dLedgerFileList.getMaxWrotePosition();
} }
if (dLedgerFileStore.getCommittedPos() == -1) {
return 0;
}
return dLedgerFileStore.getCommittedPos();
} }
@Override @Override
public long getMinOffset() { public long getMinOffset() {
if (mappedFileQueue.getMappedFiles().isEmpty()) { if (!mappedFileQueue.getMappedFiles().isEmpty()) {
return dLedgerFileList.getMinOffset();
}
return mappedFileQueue.getMinOffset(); return mappedFileQueue.getMinOffset();
} }
return dLedgerFileList.getMinOffset();
}
@Override @Override
public long getConfirmOffset() { public long getConfirmOffset() {
return this.dLedgerFileStore.getCommittedPos() == -1 ? getMaxOffset() return this.getMaxOffset();
: this.dLedgerFileStore.getCommittedPos();
} }
@Override @Override
......
...@@ -174,7 +174,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { ...@@ -174,7 +174,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
Thread.sleep(1000); Thread.sleep(1000);
Assert.assertTrue(leaderStore.getCommitLog().getMaxOffset() > 0); Assert.assertEquals(0, leaderStore.getCommitLog().getMaxOffset());
Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0)); Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0));
...@@ -183,6 +183,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { ...@@ -183,6 +183,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0)); Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(1, followerStore.getMaxOffsetInQueue(topic, 0)); Assert.assertEquals(1, followerStore.getMaxOffsetInQueue(topic, 0));
Assert.assertTrue(leaderStore.getCommitLog().getMaxOffset() > 0);
leaderStore.destroy(); leaderStore.destroy();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册