提交 ed31e74f 编写于 作者: D dongeforever

Add tests for mixed commitlog test

上级 8f788c3d
......@@ -456,7 +456,7 @@ public class CommitLog {
}
}
} else {
log.info("recover physics file end, " + mappedFile.getFileName());
log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
break;
}
}
......
......@@ -71,6 +71,9 @@ public class DLedgerCommitLog extends CommitLog {
//The old commitlog should be deleted before the dledger commitlog
private final boolean originalDledgerEnableForceClean;
private boolean isInrecoveringOldCommitlog = false;
public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) {
super(defaultMessageStore);
dLedgerConfig = new DLedgerConfig();
......@@ -101,41 +104,7 @@ public class DLedgerCommitLog extends CommitLog {
if (!result) {
return false;
}
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (mappedFile == null) {
return true;
}
dLedgerConfig.setEnableDiskForceClean(false);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
int currentPos = 0;
boolean needWriteMagicCode = true;
while (true) {
byteBuffer.position(currentPos);
// 1 TOTAL SIZE
int totalSize = byteBuffer.getInt();
int magicCode = byteBuffer.getInt();
if (magicCode == BLANK_MAGIC_CODE) {
needWriteMagicCode = false;
break;
}
if (magicCode != MESSAGE_MAGIC_CODE) {
log.info("Recover old commitlog found a illegal magic code={}", magicCode);
break;
}
currentPos = currentPos + totalSize;
}
log.info("Recover old commitlog needWriteMagicCode={} pos={} file={}", needWriteMagicCode, currentPos, mappedFile.getFileName());
if (needWriteMagicCode) {
byteBuffer.position(currentPos);
byteBuffer.putInt(mappedFile.getFileSize() - currentPos);
byteBuffer.putInt(BLANK_MAGIC_CODE);
mappedFile.flush(0);
}
dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize();
if (dLedgerFileList.getMappedFiles().isEmpty()) {
log.info("Recover to set the initial offset the dledger commitlog dividedCommitlogOffset={}", dividedCommitlogOffset);
dLedgerFileList.getLastMappedFile(dividedCommitlogOffset);
}
return true;
}
......@@ -261,22 +230,59 @@ public class DLedgerCommitLog extends CommitLog {
return null;
}
@Override
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
private void recover(long maxPhyOffsetOfConsumeQueue, boolean lastOk) {
dLedgerFileStore.load();
dLedgerFileStore.recover();
if (dLedgerFileList.getMappedFiles().isEmpty()) {
if (dLedgerFileList.getMappedFiles().size() > 0) {
dLedgerFileStore.recover();
return;
}
//Indicate that, it is the first time to load mixed commitlog, need to recover the old commitlog
isInrecoveringOldCommitlog = true;
if (lastOk) {
super.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
super.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
isInrecoveringOldCommitlog = false;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (mappedFile == null) {
return;
}
dLedgerConfig.setEnableDiskForceClean(false);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
byteBuffer.position(mappedFile.getWrotePosition());
boolean needWriteMagicCode = true;
// 1 TOTAL SIZE
byteBuffer.getInt(); //size
int magicCode = byteBuffer.getInt();
if (magicCode == CommitLog.BLANK_MAGIC_CODE) {
needWriteMagicCode = false;
} else {
log.info("Recover old commitlog found a illegal magic code={}", magicCode);
}
dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize();
log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} dividedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), dividedCommitlogOffset);
if (needWriteMagicCode) {
byteBuffer.position(mappedFile.getWrotePosition());
byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition());
byteBuffer.putInt(BLANK_MAGIC_CODE);
mappedFile.flush(0);
mappedFile.setWrotePosition(mappedFile.getFileSize());
mappedFile.setCommittedPosition(mappedFile.getFileSize());
mappedFile.setFlushedPosition(mappedFile.getFileSize());
}
dLedgerFileList.getLastMappedFile(dividedCommitlogOffset);
log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset);
}
@Override
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
dLedgerFileStore.load();
dLedgerFileStore.recover();
if (dLedgerFileList.getMappedFiles().isEmpty()) {
super.recoverNormally(maxPhyOffsetOfConsumeQueue);
}
recover(maxPhyOffsetOfConsumeQueue, true);
}
@Override
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
recover(maxPhyOffsetOfConsumeQueue, false);
}
@Override
......@@ -287,23 +293,31 @@ public class DLedgerCommitLog extends CommitLog {
@Override
public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC,
final boolean readBody) {
if (isInrecoveringOldCommitlog) {
return super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody);
}
try {
int bodyOffset = DLedgerEntry.BODY_OFFSET;
int pos = byteBuffer.position();
int magic = byteBuffer.getInt();
//In dledger, this field is size, it must be gt 0, so it could prevent collision
int magicOld = byteBuffer.getInt();
if (magicOld == CommitLog.BLANK_MAGIC_CODE || magicOld == CommitLog.MESSAGE_MAGIC_CODE) {
byteBuffer.position(pos);
return super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody);
}
if (magic == MmapFileList.BLANK_MAGIC_CODE) {
return new DispatchRequest(0, true);
} else {
byteBuffer.position(pos + bodyOffset);
DispatchRequest dispatchRequest = super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody);
if (dispatchRequest.isSuccess()) {
dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset);
} else if (dispatchRequest.getMsgSize() > 0) {
dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset);
}
return dispatchRequest;
}
} catch (Exception e) {
byteBuffer.position(pos + bodyOffset);
DispatchRequest dispatchRequest = super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody);
if (dispatchRequest.isSuccess()) {
dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset);
} else if (dispatchRequest.getMsgSize() > 0) {
dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset);
}
return dispatchRequest;
} catch (Throwable ignored) {
}
return new DispatchRequest(-1, false /* success */);
......
......@@ -9,6 +9,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.junit.After;
public class StoreTestBase {
......@@ -51,6 +52,13 @@ public class StoreTestBase {
return baseDir;
}
public static boolean makeSureFileExists(String fileName) throws Exception {
File file = new File(fileName);
MappedFile.ensureDirOK(file.getParent());
return file.createNewFile();
}
public static void deleteFile(String fileName) {
deleteFile(new File(fileName));
}
......
package org.apache.rocketmq.store.dledger;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import io.openmessaging.storage.dledger.DLedgerServer;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MappedFile;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.StoreTestBase;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Assert;
import org.junit.Test;
public class DLedgerCommitlogTest extends StoreTestBase {
private DefaultMessageStore createMessageStore(String base, String group, String selfId, String peers, String leaderId, boolean createAbort) throws Exception {
baseDirs.add(base);
MessageStoreConfig storeConfig = new MessageStoreConfig();
storeConfig.setMapedFileSizeCommitLog(1024 * 100);
storeConfig.setMapedFileSizeConsumeQueue(1024);
storeConfig.setMaxHashSlotNum(100);
storeConfig.setMaxIndexNum(100 * 10);
storeConfig.setStorePathRootDir(base);
storeConfig.setStorePathCommitLog(base + File.separator + "commitlog");
storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
storeConfig.setEnableDLegerCommitLog(true);
storeConfig.setdLegerGroup(group);
storeConfig.setdLegerPeers(peers);
storeConfig.setdLegerSelfId(selfId);
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLedgerCommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {
}, new BrokerConfig());
if (leaderId != null) {
DLedgerServer dLegerServer = ((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLedgerServer();
dLegerServer.getdLedgerConfig().setEnableLeaderElector(false);
if (selfId.equals(leaderId)) {
dLegerServer.getMemberState().changeToLeader(-1);
} else {
dLegerServer.getMemberState().changeToFollower(-1, leaderId);
}
}
if (createAbort) {
String fileName = StorePathConfigHelper.getAbortFile(storeConfig.getStorePathRootDir());
File file = new File(fileName);
MappedFile.ensureDirOK(file.getParent());
file.createNewFile();
}
defaultMessageStore.load();
defaultMessageStore.start();
return defaultMessageStore;
}
public class DLedgerCommitlogTest extends MessageStoreTestBase {
@Test
public void testReputOffset() throws Exception {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
{
DefaultMessageStore messageStore = createMessageStore(base, group, "n0", peers, null, false);
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false);
Thread.sleep(1000);
String topic = UUID.randomUUID().toString();
for (int i = 0; i < 1000; i++) {
MessageExtBrokerInner msgInner = buildMessage();
msgInner.setTopic(topic);
msgInner.setQueueId(0);
PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
Assert.assertEquals(i, putMessageResult.getAppendMessageResult().getLogicsOffset());
}
doPutMessages(messageStore, topic, 0, 1000, 0);
Thread.sleep(100);
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, messageStore.getMaxOffsetInQueue(topic, 0));
......@@ -95,13 +38,14 @@ public class DLedgerCommitlogTest extends StoreTestBase {
{
//normal recover
DefaultMessageStore messageStore = createMessageStore(base, group, "n0", peers, null, false);
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false);
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
messageStore.shutdown();
}
{
//normal recover
DefaultMessageStore messageStore = createMessageStore(base, group, "n0", peers, null, true);
//abnormal recover
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, true);
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
messageStore.shutdown();
}
......@@ -114,7 +58,7 @@ public class DLedgerCommitlogTest extends StoreTestBase {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
DefaultMessageStore messageStore = createMessageStore(base, group, "n0", peers, null, false);
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false);
Thread.sleep(1000);
String topic = UUID.randomUUID().toString();
......@@ -154,7 +98,7 @@ public class DLedgerCommitlogTest extends StoreTestBase {
public void testCommittedPos() throws Exception {
String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort());
String group = UUID.randomUUID().toString();
DefaultMessageStore leaderStore = createMessageStore(createBaseDir(), group,"n0", peers, "n0", false);
DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group,"n0", peers, "n0", false);
String topic = UUID.randomUUID().toString();
MessageExtBrokerInner msgInner = buildMessage();
......@@ -169,7 +113,7 @@ public class DLedgerCommitlogTest extends StoreTestBase {
Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0));
DefaultMessageStore followerStore = createMessageStore(createBaseDir(), group,"n1", peers, "n0", false);
DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group,"n1", peers, "n0", false);
Thread.sleep(2000);
Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0));
......
package org.apache.rocketmq.store.dledger;
import io.openmessaging.storage.dledger.DLedgerServer;
import java.io.File;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.StoreTestBase;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Assert;
public class MessageStoreTestBase extends StoreTestBase {
protected DefaultMessageStore createDledgerMessageStore(String base, String group, String selfId, String peers, String leaderId, boolean createAbort) throws Exception {
baseDirs.add(base);
MessageStoreConfig storeConfig = new MessageStoreConfig();
storeConfig.setMapedFileSizeCommitLog(1024 * 100);
storeConfig.setMapedFileSizeConsumeQueue(1024);
storeConfig.setMaxHashSlotNum(100);
storeConfig.setMaxIndexNum(100 * 10);
storeConfig.setStorePathRootDir(base);
storeConfig.setStorePathCommitLog(base + File.separator + "commitlog");
storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
storeConfig.setEnableDLegerCommitLog(true);
storeConfig.setdLegerGroup(group);
storeConfig.setdLegerPeers(peers);
storeConfig.setdLegerSelfId(selfId);
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLedgerCommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {
}, new BrokerConfig());
if (leaderId != null) {
DLedgerServer dLegerServer = ((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLedgerServer();
dLegerServer.getdLedgerConfig().setEnableLeaderElector(false);
if (selfId.equals(leaderId)) {
dLegerServer.getMemberState().changeToLeader(-1);
} else {
dLegerServer.getMemberState().changeToFollower(-1, leaderId);
}
}
if (createAbort) {
String fileName = StorePathConfigHelper.getAbortFile(storeConfig.getStorePathRootDir());
makeSureFileExists(fileName);
}
Assert.assertTrue(defaultMessageStore.load());
defaultMessageStore.start();
return defaultMessageStore;
}
protected DefaultMessageStore createMessageStore(String base, boolean createAbort) throws Exception {
baseDirs.add(base);
MessageStoreConfig storeConfig = new MessageStoreConfig();
storeConfig.setMapedFileSizeCommitLog(1024 * 100);
storeConfig.setMapedFileSizeConsumeQueue(1024);
storeConfig.setMaxHashSlotNum(100);
storeConfig.setMaxIndexNum(100 * 10);
storeConfig.setStorePathRootDir(base);
storeConfig.setStorePathCommitLog(base + File.separator + "commitlog");
storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("CommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {
}, new BrokerConfig());
if (createAbort) {
String fileName = StorePathConfigHelper.getAbortFile(storeConfig.getStorePathRootDir());
makeSureFileExists(fileName);
}
Assert.assertTrue(defaultMessageStore.load());
defaultMessageStore.start();
return defaultMessageStore;
}
protected void doPutMessages(MessageStore messageStore, String topic, int queueId, int num, long beginLogicsOffset) {
for (int i = 0; i < num; i++) {
MessageExtBrokerInner msgInner = buildMessage();
msgInner.setTopic(topic);
msgInner.setQueueId(queueId);
PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
Assert.assertEquals(beginLogicsOffset + i, putMessageResult.getAppendMessageResult().getLogicsOffset());
}
}
}
package org.apache.rocketmq.store.dledger;
import java.util.UUID;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.junit.Assert;
import org.junit.Test;
public class MixCommitlogTest extends DLedgerCommitlogTest {
@Test
public void testPutAndGet() throws Exception {
String base = createBaseDir();
String topic = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
long dividedOffset;
{
DefaultMessageStore originalStore = createMessageStore(base, false);
doPutMessages(originalStore, topic, 0, 1000, 0);
Thread.sleep(500);
Assert.assertEquals(0, originalStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, originalStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, originalStore.dispatchBehindBytes());
dividedOffset = originalStore.getCommitLog().getMaxOffset();
dividedOffset = dividedOffset - dividedOffset % originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog() + originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
originalStore.shutdown();
}
{
DefaultMessageStore recoverOriginalStore = createMessageStore(base, true);
Thread.sleep(500);
Assert.assertEquals(0, recoverOriginalStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, recoverOriginalStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, recoverOriginalStore.dispatchBehindBytes());
recoverOriginalStore.shutdown();
}
{
DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) dledgerStore.getCommitLog();
Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset());
Thread.sleep(2000);
doPutMessages(dledgerStore, topic, 0, 1000, 1000);
Thread.sleep(500);
Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(2000, dledgerStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, dledgerStore.dispatchBehindBytes());
dledgerStore.shutdown();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册