提交 1276539e 编写于 作者: D dongeforever

Add test for trucating consumequeue if the dledger commitlog is deleted

上级 f8dd3f70
......@@ -421,6 +421,7 @@ public class ConsumeQueue {
final long cqOffset) {
if (offset + size <= this.maxPhysicOffset) {
log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
return true;
}
......
......@@ -154,8 +154,8 @@ public class MappedFileQueue {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, ignore it");
return true;
+ " length not matched message store config value, please check it manually");
return false;
}
try {
......
......@@ -253,13 +253,19 @@ public class DLedgerCommitLog extends CommitLog {
dLedgerFileStore.load();
if (dLedgerFileList.getMappedFiles().size() > 0) {
dLedgerFileStore.recover();
dividedCommitlogOffset = dLedgerFileList.getFirstMappedFile().getFileFromOffset();
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (mappedFile != null) {
dLedgerConfig.setEnableDiskForceClean(false);
dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize();
} else {
hasSetOriginalDledgerEnableForceClean.set(true);
}
long maxPhyOffset = dLedgerFileList.getMaxWrotePosition();
// Clear ConsumeQueue redundant data
if (maxPhyOffsetOfConsumeQueue >= maxPhyOffset) {
log.warn("[TruncateCQ]maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, maxPhyOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(maxPhyOffset);
}
return;
}
//Indicate that, it is the first time to load mixed commitlog, need to recover the old commitlog
......
package org.apache.rocketmq.store.dledger;
import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
import io.openmessaging.storage.dledger.store.file.MmapFileList;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
......@@ -18,6 +21,63 @@ import org.junit.Test;
public class DLedgerCommitlogTest extends MessageStoreTestBase {
@Test
public void testTruncateCQ() throws Exception {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
String topic = UUID.randomUUID().toString();
{
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
DLedgerServer dLedgerServer = dLedgerCommitLog.getdLedgerServer();
DLedgerMmapFileStore dLedgerMmapFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
MmapFileList mmapFileList = dLedgerMmapFileStore.getDataFileList();
Thread.sleep(2000);
doPutMessages(messageStore, topic, 0, 2000, 0);
Thread.sleep(100);
Assert.assertEquals(24, mmapFileList.getMappedFiles().size());
System.out.println(mmapFileList.getMappedFiles().size());
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(2000, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
doGetMessages(messageStore, topic, 0, 2000, 0);
messageStore.shutdown();
}
{
//Abnormal recover, left some commitlogs
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 4);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
DLedgerServer dLedgerServer = dLedgerCommitLog.getdLedgerServer();
DLedgerMmapFileStore dLedgerMmapFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
MmapFileList mmapFileList = dLedgerMmapFileStore.getDataFileList();
Thread.sleep(1000);
Assert.assertEquals(20, mmapFileList.getMappedFiles().size());
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1700, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
doGetMessages(messageStore, topic, 0, 1700, 0);
messageStore.shutdown();
}
{
//Abnormal recover, left none commitlogs
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 20);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
DLedgerServer dLedgerServer = dLedgerCommitLog.getdLedgerServer();
DLedgerMmapFileStore dLedgerMmapFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
MmapFileList mmapFileList = dLedgerMmapFileStore.getDataFileList();
Thread.sleep(1000);
Assert.assertEquals(0, mmapFileList.getMappedFiles().size());
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
messageStore.shutdown();
}
}
@Test
public void testRecover() throws Exception {
String base = createBaseDir();
......@@ -25,7 +85,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
String group = UUID.randomUUID().toString();
String topic = UUID.randomUUID().toString();
{
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false);
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
Thread.sleep(1000);
doPutMessages(messageStore, topic, 0, 1000, 0);
Thread.sleep(100);
......@@ -38,7 +98,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
{
//normal recover
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false);
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
......@@ -48,7 +108,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
{
//abnormal recover
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, true);
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0);
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
......@@ -64,7 +124,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false);
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
Thread.sleep(1000);
String topic = UUID.randomUUID().toString();
......@@ -104,7 +164,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
public void testCommittedPos() throws Exception {
String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort());
String group = UUID.randomUUID().toString();
DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group,"n0", peers, "n0", false);
DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group,"n0", peers, "n0", false, 0);
String topic = UUID.randomUUID().toString();
MessageExtBrokerInner msgInner = buildMessage();
......@@ -119,7 +179,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0));
DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group,"n1", peers, "n0", false);
DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group,"n1", peers, "n0", false, 0);
Thread.sleep(2000);
Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0));
......
package org.apache.rocketmq.store.dledger;
import io.openmessaging.storage.dledger.DLedgerConfig;
import io.openmessaging.storage.dledger.DLedgerServer;
import java.io.File;
import java.util.Arrays;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
......@@ -20,7 +22,7 @@ import org.junit.Assert;
public class MessageStoreTestBase extends StoreTestBase {
protected DefaultMessageStore createDledgerMessageStore(String base, String group, String selfId, String peers, String leaderId, boolean createAbort) throws Exception {
protected DefaultMessageStore createDledgerMessageStore(String base, String group, String selfId, String peers, String leaderId, boolean createAbort, int deleteFileNum) throws Exception {
System.setProperty("dledger.disk.ratio.check", "0.95");
System.setProperty("dledger.disk.ratio.clean", "0.95");
baseDirs.add(base);
......@@ -40,8 +42,8 @@ public class MessageStoreTestBase extends StoreTestBase {
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLedgerCommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {
}, new BrokerConfig());
DLedgerServer dLegerServer = ((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLedgerServer();
if (leaderId != null) {
DLedgerServer dLegerServer = ((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLedgerServer();
dLegerServer.getdLedgerConfig().setEnableLeaderElector(false);
if (selfId.equals(leaderId)) {
dLegerServer.getMemberState().changeToLeader(-1);
......@@ -54,6 +56,23 @@ public class MessageStoreTestBase extends StoreTestBase {
String fileName = StorePathConfigHelper.getAbortFile(storeConfig.getStorePathRootDir());
makeSureFileExists(fileName);
}
if (deleteFileNum > 0) {
DLedgerConfig config = dLegerServer.getdLedgerConfig();
if (deleteFileNum > 0) {
File dir = new File(config.getDataStorePath());
File[] files = dir.listFiles();
if (files != null) {
Arrays.sort(files);
for (int i = files.length - 1; i >= 0; i--) {
File file = files[i];
file.delete();
if (files.length - i >= deleteFileNum) {
break;
}
}
}
}
}
Assert.assertTrue(defaultMessageStore.load());
defaultMessageStore.start();
return defaultMessageStore;
......
......@@ -38,7 +38,7 @@ public class MixCommitlogTest extends MessageStoreTestBase {
recoverOriginalStore.shutdown();
}
{
DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true);
DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) dledgerStore.getCommitLog();
Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean());
Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset());
......@@ -52,7 +52,7 @@ public class MixCommitlogTest extends MessageStoreTestBase {
dledgerStore.shutdown();
}
{
DefaultMessageStore recoverDledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true);
DefaultMessageStore recoverDledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) recoverDledgerStore.getCommitLog();
Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean());
Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset());
......@@ -88,7 +88,7 @@ public class MixCommitlogTest extends MessageStoreTestBase {
}
long maxPhysicalOffset;
{
DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true);
DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) dledgerStore.getCommitLog();
Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean());
Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册