提交 927385be 编写于 作者: D dongeforever

Enhance tests for mixed commitlog

上级 ed31e74f
...@@ -234,6 +234,11 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -234,6 +234,11 @@ public class DLedgerCommitLog extends CommitLog {
dLedgerFileStore.load(); dLedgerFileStore.load();
if (dLedgerFileList.getMappedFiles().size() > 0) { if (dLedgerFileList.getMappedFiles().size() > 0) {
dLedgerFileStore.recover(); dLedgerFileStore.recover();
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (mappedFile != null) {
dLedgerConfig.setEnableDiskForceClean(false);
dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize();
}
return; return;
} }
//Indicate that, it is the first time to load mixed commitlog, need to recover the old commitlog //Indicate that, it is the first time to load mixed commitlog, need to recover the old commitlog
...@@ -248,7 +253,6 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -248,7 +253,6 @@ public class DLedgerCommitLog extends CommitLog {
if (mappedFile == null) { if (mappedFile == null) {
return; return;
} }
dLedgerConfig.setEnableDiskForceClean(false);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
byteBuffer.position(mappedFile.getWrotePosition()); byteBuffer.position(mappedFile.getWrotePosition());
boolean needWriteMagicCode = true; boolean needWriteMagicCode = true;
...@@ -260,6 +264,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -260,6 +264,7 @@ public class DLedgerCommitLog extends CommitLog {
} else { } else {
log.info("Recover old commitlog found a illegal magic code={}", magicCode); log.info("Recover old commitlog found a illegal magic code={}", magicCode);
} }
dLedgerConfig.setEnableDiskForceClean(false);
dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize(); dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize();
log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} dividedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), dividedCommitlogOffset); log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} dividedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), dividedCommitlogOffset);
if (needWriteMagicCode) { if (needWriteMagicCode) {
...@@ -465,7 +470,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -465,7 +470,7 @@ public class DLedgerCommitLog extends CommitLog {
@Override @Override
public SelectMappedBufferResult getMessage(final long offset, final int size) { public SelectMappedBufferResult getMessage(final long offset, final int size) {
if (offset < dividedCommitlogOffset) { if (offset < dividedCommitlogOffset) {
return getMessage(offset, size); return super.getMessage(offset, size);
} }
int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData(); int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData();
MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0); MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0);
......
...@@ -23,7 +23,6 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { ...@@ -23,7 +23,6 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
String base = createBaseDir(); String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort()); String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString(); String group = UUID.randomUUID().toString();
{ {
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false); DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false);
Thread.sleep(1000); Thread.sleep(1000);
......
...@@ -3,7 +3,10 @@ package org.apache.rocketmq.store.dledger; ...@@ -3,7 +3,10 @@ package org.apache.rocketmq.store.dledger;
import io.openmessaging.storage.dledger.DLedgerServer; import io.openmessaging.storage.dledger.DLedgerServer;
import java.io.File; import java.io.File;
import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageResult;
...@@ -89,4 +92,14 @@ public class MessageStoreTestBase extends StoreTestBase { ...@@ -89,4 +92,14 @@ public class MessageStoreTestBase extends StoreTestBase {
} }
} }
protected void doGetMessages(MessageStore messageStore, String topic, int queueId, int num, long beginLogicsOffset) {
for (int i = 0; i < num; i++) {
GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, beginLogicsOffset + i, 3, null);
Assert.assertNotNull(getMessageResult);
Assert.assertTrue(!getMessageResult.getMessageBufferList().isEmpty());
MessageExt messageExt = MessageDecoder.decode(getMessageResult.getMessageBufferList().get(0));
Assert.assertEquals(beginLogicsOffset + i, messageExt.getQueueOffset());
}
}
} }
...@@ -5,7 +5,7 @@ import org.apache.rocketmq.store.DefaultMessageStore; ...@@ -5,7 +5,7 @@ import org.apache.rocketmq.store.DefaultMessageStore;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class MixCommitlogTest extends DLedgerCommitlogTest { public class MixCommitlogTest extends MessageStoreTestBase {
@Test @Test
...@@ -25,6 +25,7 @@ public class MixCommitlogTest extends DLedgerCommitlogTest { ...@@ -25,6 +25,7 @@ public class MixCommitlogTest extends DLedgerCommitlogTest {
Assert.assertEquals(0, originalStore.dispatchBehindBytes()); Assert.assertEquals(0, originalStore.dispatchBehindBytes());
dividedOffset = originalStore.getCommitLog().getMaxOffset(); dividedOffset = originalStore.getCommitLog().getMaxOffset();
dividedOffset = dividedOffset - dividedOffset % originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog() + originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); dividedOffset = dividedOffset - dividedOffset % originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog() + originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
doGetMessages(originalStore, topic, 0, 1000, 0);
originalStore.shutdown(); originalStore.shutdown();
} }
{ {
...@@ -33,11 +34,13 @@ public class MixCommitlogTest extends DLedgerCommitlogTest { ...@@ -33,11 +34,13 @@ public class MixCommitlogTest extends DLedgerCommitlogTest {
Assert.assertEquals(0, recoverOriginalStore.getMinOffsetInQueue(topic, 0)); Assert.assertEquals(0, recoverOriginalStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, recoverOriginalStore.getMaxOffsetInQueue(topic, 0)); Assert.assertEquals(1000, recoverOriginalStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, recoverOriginalStore.dispatchBehindBytes()); Assert.assertEquals(0, recoverOriginalStore.dispatchBehindBytes());
doGetMessages(recoverOriginalStore, topic, 0, 1000, 0);
recoverOriginalStore.shutdown(); recoverOriginalStore.shutdown();
} }
{ {
DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true); DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) dledgerStore.getCommitLog(); DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) dledgerStore.getCommitLog();
Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean());
Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset()); Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset());
Thread.sleep(2000); Thread.sleep(2000);
doPutMessages(dledgerStore, topic, 0, 1000, 1000); doPutMessages(dledgerStore, topic, 0, 1000, 1000);
...@@ -45,8 +48,23 @@ public class MixCommitlogTest extends DLedgerCommitlogTest { ...@@ -45,8 +48,23 @@ public class MixCommitlogTest extends DLedgerCommitlogTest {
Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0)); Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(2000, dledgerStore.getMaxOffsetInQueue(topic, 0)); Assert.assertEquals(2000, dledgerStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, dledgerStore.dispatchBehindBytes()); Assert.assertEquals(0, dledgerStore.dispatchBehindBytes());
doGetMessages(dledgerStore, topic, 0, 2000, 0);
dledgerStore.shutdown(); dledgerStore.shutdown();
} }
{
DefaultMessageStore recoverDledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) recoverDledgerStore.getCommitLog();
Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean());
Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset());
Thread.sleep(2000);
doPutMessages(recoverDledgerStore, topic, 0, 1000, 2000);
Thread.sleep(500);
Assert.assertEquals(0, recoverDledgerStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(3000, recoverDledgerStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, recoverDledgerStore.dispatchBehindBytes());
doGetMessages(recoverDledgerStore, topic, 0, 3000, 0);
recoverDledgerStore.shutdown();
}
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册