提交 e1494951 编写于 作者: D dongeforever

Fix the initial reput offset, according to the maxPhisicalPosInLogicQueue

上级 55eda184
......@@ -224,8 +224,10 @@ public class DefaultMessageStore implements MessageStore {
lockFile.getChannel().force(true);
{
/**
* 1. calculate the reput offset according to the consume queue;
* 2. make sure the lagged messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
* 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog;
* 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go;
* 3. Calculate the reput offset according to the consume queue;
* 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
*/
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
......@@ -235,23 +237,34 @@ public class DefaultMessageStore implements MessageStore {
}
}
}
log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMaxOffset={} clConfirmedOffset={}",
maxPhysicalPosInLogicQueue, this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
long reputOffset;
if (this.getMessageStoreConfig().isDuplicationEnable()) {
reputOffset = Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getConfirmOffset());
} else {
reputOffset = Math.min(maxPhysicalPosInLogicQueue, this.commitLog.getMaxOffset());
if (maxPhysicalPosInLogicQueue < 0) {
maxPhysicalPosInLogicQueue = 0;
}
if (reputOffset < 0) {
reputOffset = 0;
if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
/**
* This happens in following conditions:
* 1. If someone removes all the consumequeue files or the disk get damaged.
* 2. Launch a new broker, and copy the commitlog from other brokers.
*
* All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0.
* If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong.
*/
log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
}
this.reputMessageService.setReputFromOffset(reputOffset);
log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();
//Finish dispatching the messages fall behind
//Note that, if dledger is enabled, the maxOffset maybe -1, so here only require the dispatchBehindBytes > 0
while (this.dispatchBehindBytes() > 0) {
/**
* 1. Finish dispatching the messages fall behind, then to start other services.
* 2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0
*/
while (true) {
if (dispatchBehindBytes() <= 0) {
break;
}
Thread.sleep(1000);
log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
}
......
......@@ -135,13 +135,13 @@ public class DLedgerCommitLog extends CommitLog {
@Override
public long getMaxOffset() {
if (dividedCommitlogOffset > 0 && dLedgerFileStore.getCommittedPos() < 0) {
return dividedCommitlogOffset;
if (dLedgerFileStore.getCommittedPos() > 0) {
return dLedgerFileStore.getCommittedPos();
}
if (dLedgerFileStore.getCommittedPos() == -1) {
return 0;
if (dLedgerFileList.getMinOffset() > 0) {
return dLedgerFileList.getMinOffset();
}
return dLedgerFileStore.getCommittedPos();
return 0;
}
@Override
......
......@@ -85,6 +85,8 @@ public class MixCommitlogTest extends MessageStoreTestBase {
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) dledgerStore.getCommitLog();
Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean());
Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset());
Assert.assertEquals(0, dledgerStore.dispatchBehindBytes());
Assert.assertEquals(dividedOffset, dLedgerCommitLog.getMaxOffset());
Thread.sleep(2000);
doPutMessages(dledgerStore, topic, 0, 1000, 1000);
Thread.sleep(500);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册