提交 a50b733d 编写于 作者: D dongeforever

Add tests for consume queue fall behind

上级 1e358ae3
......@@ -212,6 +212,12 @@ public class CommitLog {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
} else {
// Commitlog case files are deleted
log.warn("The commitlog files are deleted, and delete the consume queue files");
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}
......@@ -396,6 +402,7 @@ public class CommitLog {
this.confirmOffset = phyOffset;
}
@Deprecated
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
......
......@@ -236,7 +236,6 @@ public class DefaultMessageStore implements MessageStore {
}
}
}
log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMaxOffset={} clConfirmedOffset={}",
maxPhysicalPosInLogicQueue, this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
long reputOffset;
......@@ -256,6 +255,13 @@ public class DefaultMessageStore implements MessageStore {
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
//Finish dispatching the messages fall behind
while (this.dispatchBehindBytes() != 0) {
Thread.sleep(1000);
log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
}
this.recoverTopicQueueTable();
this.createTempFile();
this.addScheduleTask();
this.shutdown = false;
......@@ -1064,6 +1070,7 @@ public class DefaultMessageStore implements MessageStore {
return false;
}
@Override
public long dispatchBehindBytes() {
return this.reputMessageService.behind();
}
......
......@@ -270,13 +270,8 @@ public class DLedgerCommitLog extends CommitLog {
}
//Indicate that, it is the first time to load mixed commitlog, need to recover the old commitlog
isInrecoveringOldCommitlog = true;
//TODO only do the normal recover
//No need the abnormal recover
if (lastOk) {
super.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
super.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
super.recoverNormally(maxPhyOffsetOfConsumeQueue);
isInrecoveringOldCommitlog = false;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (mappedFile == null) {
......@@ -302,11 +297,10 @@ public class DLedgerCommitLog extends CommitLog {
byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition());
byteBuffer.putInt(BLANK_MAGIC_CODE);
mappedFile.flush(0);
//TODO already set
mappedFile.setWrotePosition(mappedFile.getFileSize());
mappedFile.setCommittedPosition(mappedFile.getFileSize());
mappedFile.setFlushedPosition(mappedFile.getFileSize());
}
mappedFile.setWrotePosition(mappedFile.getFileSize());
mappedFile.setCommittedPosition(mappedFile.getFileSize());
mappedFile.setFlushedPosition(mappedFile.getFileSize());
dLedgerFileList.getLastMappedFile(dividedCommitlogOffset);
log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset);
}
......
......@@ -37,7 +37,6 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
doPutMessages(messageStore, topic, 0, 2000, 0);
Thread.sleep(100);
Assert.assertEquals(24, mmapFileList.getMappedFiles().size());
System.out.println(mmapFileList.getMappedFiles().size());
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(2000, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
......
......@@ -2,12 +2,55 @@ package org.apache.rocketmq.store.dledger;
import java.util.UUID;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.StoreTestBase;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.junit.Assert;
import org.junit.Test;
public class MixCommitlogTest extends MessageStoreTestBase {
@Test
public void testFallBehindCQ() throws Exception {
String base = createBaseDir();
String topic = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
{
DefaultMessageStore originalStore = createMessageStore(base, false);
doPutMessages(originalStore, topic, 0, 1000, 0);
Assert.assertEquals(11, originalStore.getMaxPhyOffset()/originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog());
Thread.sleep(500);
Assert.assertEquals(0, originalStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, originalStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, originalStore.dispatchBehindBytes());
doGetMessages(originalStore, topic, 0, 1000, 0);
originalStore.shutdown();
}
//delete the cq files
{
StoreTestBase.deleteFile(StorePathConfigHelper.getStorePathConsumeQueue(base));
}
{
DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0);
Thread.sleep(2000);
Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, dledgerStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, dledgerStore.dispatchBehindBytes());
doGetMessages(dledgerStore, topic, 0, 1000, 0);
doPutMessages(dledgerStore, topic, 0, 1000, 1000);
Thread.sleep(500);
Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(2000, dledgerStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, dledgerStore.dispatchBehindBytes());
doGetMessages(dledgerStore, topic, 0, 2000, 0);
dledgerStore.shutdown();
}
}
@Test
public void testPutAndGet() throws Exception {
String base = createBaseDir();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册