From ed31e74f6846f437415b0bb7f66182335203f06a Mon Sep 17 00:00:00 2001 From: dongeforever Date: Tue, 4 Dec 2018 17:58:40 +0800 Subject: [PATCH] Add tests for mixed commitlog test --- .../org/apache/rocketmq/store/CommitLog.java | 2 +- .../store/dledger/DLedgerCommitLog.java | 122 ++++++++++-------- .../apache/rocketmq/store/StoreTestBase.java | 8 ++ .../store/dledger/DLedgerCommitlogTest.java | 76 ++--------- .../store/dledger/MessageStoreTestBase.java | 92 +++++++++++++ .../store/dledger/MixCommitlogTest.java | 52 ++++++++ 6 files changed, 231 insertions(+), 121 deletions(-) create mode 100644 store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java create mode 100644 store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 8dec7408..64437565 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -456,7 +456,7 @@ public class CommitLog { } } } else { - log.info("recover physics file end, " + mappedFile.getFileName()); + log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position()); break; } } diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index e923f879..655c4de4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -71,6 +71,9 @@ public class DLedgerCommitLog extends CommitLog { //The old commitlog should be deleted before the dledger commitlog private final boolean originalDledgerEnableForceClean; + + private boolean isInrecoveringOldCommitlog = false; + public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) { super(defaultMessageStore); dLedgerConfig = new DLedgerConfig(); @@ -101,41 +104,7 @@ public class DLedgerCommitLog extends CommitLog { if (!result) { return false; } - MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); - if (mappedFile == null) { - return true; - } - dLedgerConfig.setEnableDiskForceClean(false); - ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); - int currentPos = 0; - boolean needWriteMagicCode = true; - while (true) { - byteBuffer.position(currentPos); - // 1 TOTAL SIZE - int totalSize = byteBuffer.getInt(); - int magicCode = byteBuffer.getInt(); - if (magicCode == BLANK_MAGIC_CODE) { - needWriteMagicCode = false; - break; - } - if (magicCode != MESSAGE_MAGIC_CODE) { - log.info("Recover old commitlog found a illegal magic code={}", magicCode); - break; - } - currentPos = currentPos + totalSize; - } - log.info("Recover old commitlog needWriteMagicCode={} pos={} file={}", needWriteMagicCode, currentPos, mappedFile.getFileName()); - if (needWriteMagicCode) { - byteBuffer.position(currentPos); - byteBuffer.putInt(mappedFile.getFileSize() - currentPos); - byteBuffer.putInt(BLANK_MAGIC_CODE); - mappedFile.flush(0); - } - dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize(); - if (dLedgerFileList.getMappedFiles().isEmpty()) { - log.info("Recover to set the initial offset the dledger commitlog dividedCommitlogOffset={}", dividedCommitlogOffset); - dLedgerFileList.getLastMappedFile(dividedCommitlogOffset); - } + return true; } @@ -261,22 +230,59 @@ public class DLedgerCommitLog extends CommitLog { return null; } - @Override - public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) { + private void recover(long maxPhyOffsetOfConsumeQueue, boolean lastOk) { dLedgerFileStore.load(); - dLedgerFileStore.recover(); - if (dLedgerFileList.getMappedFiles().isEmpty()) { + if (dLedgerFileList.getMappedFiles().size() > 0) { + dLedgerFileStore.recover(); + return; + } + //Indicate that, it is the first time to load mixed commitlog, need to recover the old commitlog + isInrecoveringOldCommitlog = true; + if (lastOk) { + super.recoverNormally(maxPhyOffsetOfConsumeQueue); + } else { super.recoverAbnormally(maxPhyOffsetOfConsumeQueue); } + isInrecoveringOldCommitlog = false; + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); + if (mappedFile == null) { + return; + } + dLedgerConfig.setEnableDiskForceClean(false); + ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); + byteBuffer.position(mappedFile.getWrotePosition()); + boolean needWriteMagicCode = true; + // 1 TOTAL SIZE + byteBuffer.getInt(); //size + int magicCode = byteBuffer.getInt(); + if (magicCode == CommitLog.BLANK_MAGIC_CODE) { + needWriteMagicCode = false; + } else { + log.info("Recover old commitlog found a illegal magic code={}", magicCode); + } + dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize(); + log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} dividedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), dividedCommitlogOffset); + if (needWriteMagicCode) { + byteBuffer.position(mappedFile.getWrotePosition()); + byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition()); + byteBuffer.putInt(BLANK_MAGIC_CODE); + mappedFile.flush(0); + mappedFile.setWrotePosition(mappedFile.getFileSize()); + mappedFile.setCommittedPosition(mappedFile.getFileSize()); + mappedFile.setFlushedPosition(mappedFile.getFileSize()); + } + dLedgerFileList.getLastMappedFile(dividedCommitlogOffset); + log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset); } @Override public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { - dLedgerFileStore.load(); - dLedgerFileStore.recover(); - if (dLedgerFileList.getMappedFiles().isEmpty()) { - super.recoverNormally(maxPhyOffsetOfConsumeQueue); - } + recover(maxPhyOffsetOfConsumeQueue, true); + } + + @Override + public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) { + recover(maxPhyOffsetOfConsumeQueue, false); } @Override @@ -287,23 +293,31 @@ public class DLedgerCommitLog extends CommitLog { @Override public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) { + if (isInrecoveringOldCommitlog) { + return super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody); + } try { int bodyOffset = DLedgerEntry.BODY_OFFSET; int pos = byteBuffer.position(); int magic = byteBuffer.getInt(); + //In dledger, this field is size, it must be gt 0, so it could prevent collision + int magicOld = byteBuffer.getInt(); + if (magicOld == CommitLog.BLANK_MAGIC_CODE || magicOld == CommitLog.MESSAGE_MAGIC_CODE) { + byteBuffer.position(pos); + return super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody); + } if (magic == MmapFileList.BLANK_MAGIC_CODE) { return new DispatchRequest(0, true); - } else { - byteBuffer.position(pos + bodyOffset); - DispatchRequest dispatchRequest = super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody); - if (dispatchRequest.isSuccess()) { - dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset); - } else if (dispatchRequest.getMsgSize() > 0) { - dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset); - } - return dispatchRequest; } - } catch (Exception e) { + byteBuffer.position(pos + bodyOffset); + DispatchRequest dispatchRequest = super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody); + if (dispatchRequest.isSuccess()) { + dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset); + } else if (dispatchRequest.getMsgSize() > 0) { + dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset); + } + return dispatchRequest; + } catch (Throwable ignored) { } return new DispatchRequest(-1, false /* success */); diff --git a/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java index 9e09d813..aa2919d8 100644 --- a/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java +++ b/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java @@ -9,6 +9,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.junit.After; public class StoreTestBase { @@ -51,6 +52,13 @@ public class StoreTestBase { return baseDir; } + public static boolean makeSureFileExists(String fileName) throws Exception { + File file = new File(fileName); + MappedFile.ensureDirOK(file.getParent()); + return file.createNewFile(); + } + + public static void deleteFile(String fileName) { deleteFile(new File(fileName)); } diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java index 57f04ea6..28a1d8ea 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java @@ -1,91 +1,34 @@ package org.apache.rocketmq.store.dledger; -import java.io.File; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.UUID; -import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; -import io.openmessaging.storage.dledger.DLedgerServer; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; -import org.apache.rocketmq.store.MappedFile; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; -import org.apache.rocketmq.store.StoreTestBase; -import org.apache.rocketmq.store.config.FlushDiskType; -import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.apache.rocketmq.store.config.StorePathConfigHelper; -import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.Assert; import org.junit.Test; -public class DLedgerCommitlogTest extends StoreTestBase { - - private DefaultMessageStore createMessageStore(String base, String group, String selfId, String peers, String leaderId, boolean createAbort) throws Exception { - baseDirs.add(base); - MessageStoreConfig storeConfig = new MessageStoreConfig(); - storeConfig.setMapedFileSizeCommitLog(1024 * 100); - storeConfig.setMapedFileSizeConsumeQueue(1024); - storeConfig.setMaxHashSlotNum(100); - storeConfig.setMaxIndexNum(100 * 10); - storeConfig.setStorePathRootDir(base); - storeConfig.setStorePathCommitLog(base + File.separator + "commitlog"); - storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH); - - storeConfig.setEnableDLegerCommitLog(true); - storeConfig.setdLegerGroup(group); - storeConfig.setdLegerPeers(peers); - storeConfig.setdLegerSelfId(selfId); - DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLedgerCommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { - - }, new BrokerConfig()); - if (leaderId != null) { - DLedgerServer dLegerServer = ((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLedgerServer(); - dLegerServer.getdLedgerConfig().setEnableLeaderElector(false); - if (selfId.equals(leaderId)) { - dLegerServer.getMemberState().changeToLeader(-1); - } else { - dLegerServer.getMemberState().changeToFollower(-1, leaderId); - } - - } - if (createAbort) { - String fileName = StorePathConfigHelper.getAbortFile(storeConfig.getStorePathRootDir()); - File file = new File(fileName); - MappedFile.ensureDirOK(file.getParent()); - file.createNewFile(); - } - defaultMessageStore.load(); - defaultMessageStore.start(); - return defaultMessageStore; - } - +public class DLedgerCommitlogTest extends MessageStoreTestBase { @Test public void testReputOffset() throws Exception { - String base = createBaseDir(); String peers = String.format("n0-localhost:%d", nextPort()); String group = UUID.randomUUID().toString(); { - DefaultMessageStore messageStore = createMessageStore(base, group, "n0", peers, null, false); + DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false); Thread.sleep(1000); String topic = UUID.randomUUID().toString(); - for (int i = 0; i < 1000; i++) { - MessageExtBrokerInner msgInner = buildMessage(); - msgInner.setTopic(topic); - msgInner.setQueueId(0); - PutMessageResult putMessageResult = messageStore.putMessage(msgInner); - Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus()); - Assert.assertEquals(i, putMessageResult.getAppendMessageResult().getLogicsOffset()); - } + doPutMessages(messageStore, topic, 0, 1000, 0); Thread.sleep(100); Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0)); Assert.assertEquals(1000, messageStore.getMaxOffsetInQueue(topic, 0)); @@ -95,13 +38,14 @@ public class DLedgerCommitlogTest extends StoreTestBase { { //normal recover - DefaultMessageStore messageStore = createMessageStore(base, group, "n0", peers, null, false); + DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false); Assert.assertEquals(0, messageStore.dispatchBehindBytes()); messageStore.shutdown(); } + { - //normal recover - DefaultMessageStore messageStore = createMessageStore(base, group, "n0", peers, null, true); + //abnormal recover + DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, true); Assert.assertEquals(0, messageStore.dispatchBehindBytes()); messageStore.shutdown(); } @@ -114,7 +58,7 @@ public class DLedgerCommitlogTest extends StoreTestBase { String base = createBaseDir(); String peers = String.format("n0-localhost:%d", nextPort()); String group = UUID.randomUUID().toString(); - DefaultMessageStore messageStore = createMessageStore(base, group, "n0", peers, null, false); + DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false); Thread.sleep(1000); String topic = UUID.randomUUID().toString(); @@ -154,7 +98,7 @@ public class DLedgerCommitlogTest extends StoreTestBase { public void testCommittedPos() throws Exception { String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort()); String group = UUID.randomUUID().toString(); - DefaultMessageStore leaderStore = createMessageStore(createBaseDir(), group,"n0", peers, "n0", false); + DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group,"n0", peers, "n0", false); String topic = UUID.randomUUID().toString(); MessageExtBrokerInner msgInner = buildMessage(); @@ -169,7 +113,7 @@ public class DLedgerCommitlogTest extends StoreTestBase { Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0)); - DefaultMessageStore followerStore = createMessageStore(createBaseDir(), group,"n1", peers, "n0", false); + DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group,"n1", peers, "n0", false); Thread.sleep(2000); Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0)); diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java new file mode 100644 index 00000000..d671287d --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java @@ -0,0 +1,92 @@ +package org.apache.rocketmq.store.dledger; + +import io.openmessaging.storage.dledger.DLedgerServer; +import java.io.File; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageStatus; +import org.apache.rocketmq.store.StoreTestBase; +import org.apache.rocketmq.store.config.FlushDiskType; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.config.StorePathConfigHelper; +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.junit.Assert; + +public class MessageStoreTestBase extends StoreTestBase { + + protected DefaultMessageStore createDledgerMessageStore(String base, String group, String selfId, String peers, String leaderId, boolean createAbort) throws Exception { + baseDirs.add(base); + MessageStoreConfig storeConfig = new MessageStoreConfig(); + storeConfig.setMapedFileSizeCommitLog(1024 * 100); + storeConfig.setMapedFileSizeConsumeQueue(1024); + storeConfig.setMaxHashSlotNum(100); + storeConfig.setMaxIndexNum(100 * 10); + storeConfig.setStorePathRootDir(base); + storeConfig.setStorePathCommitLog(base + File.separator + "commitlog"); + storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH); + + storeConfig.setEnableDLegerCommitLog(true); + storeConfig.setdLegerGroup(group); + storeConfig.setdLegerPeers(peers); + storeConfig.setdLegerSelfId(selfId); + DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLedgerCommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { + + }, new BrokerConfig()); + if (leaderId != null) { + DLedgerServer dLegerServer = ((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLedgerServer(); + dLegerServer.getdLedgerConfig().setEnableLeaderElector(false); + if (selfId.equals(leaderId)) { + dLegerServer.getMemberState().changeToLeader(-1); + } else { + dLegerServer.getMemberState().changeToFollower(-1, leaderId); + } + + } + if (createAbort) { + String fileName = StorePathConfigHelper.getAbortFile(storeConfig.getStorePathRootDir()); + makeSureFileExists(fileName); + } + Assert.assertTrue(defaultMessageStore.load()); + defaultMessageStore.start(); + return defaultMessageStore; + } + + + protected DefaultMessageStore createMessageStore(String base, boolean createAbort) throws Exception { + baseDirs.add(base); + MessageStoreConfig storeConfig = new MessageStoreConfig(); + storeConfig.setMapedFileSizeCommitLog(1024 * 100); + storeConfig.setMapedFileSizeConsumeQueue(1024); + storeConfig.setMaxHashSlotNum(100); + storeConfig.setMaxIndexNum(100 * 10); + storeConfig.setStorePathRootDir(base); + storeConfig.setStorePathCommitLog(base + File.separator + "commitlog"); + storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH); + DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("CommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { + + }, new BrokerConfig()); + + if (createAbort) { + String fileName = StorePathConfigHelper.getAbortFile(storeConfig.getStorePathRootDir()); + makeSureFileExists(fileName); + } + Assert.assertTrue(defaultMessageStore.load()); + defaultMessageStore.start(); + return defaultMessageStore; + } + + protected void doPutMessages(MessageStore messageStore, String topic, int queueId, int num, long beginLogicsOffset) { + for (int i = 0; i < num; i++) { + MessageExtBrokerInner msgInner = buildMessage(); + msgInner.setTopic(topic); + msgInner.setQueueId(queueId); + PutMessageResult putMessageResult = messageStore.putMessage(msgInner); + Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus()); + Assert.assertEquals(beginLogicsOffset + i, putMessageResult.getAppendMessageResult().getLogicsOffset()); + } + } + +} diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java new file mode 100644 index 00000000..cba9fe9e --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java @@ -0,0 +1,52 @@ +package org.apache.rocketmq.store.dledger; + +import java.util.UUID; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.junit.Assert; +import org.junit.Test; + +public class MixCommitlogTest extends DLedgerCommitlogTest { + + + @Test + public void testPutAndGet() throws Exception { + String base = createBaseDir(); + String topic = UUID.randomUUID().toString(); + String peers = String.format("n0-localhost:%d", nextPort()); + String group = UUID.randomUUID().toString(); + + long dividedOffset; + { + DefaultMessageStore originalStore = createMessageStore(base, false); + doPutMessages(originalStore, topic, 0, 1000, 0); + Thread.sleep(500); + Assert.assertEquals(0, originalStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(1000, originalStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, originalStore.dispatchBehindBytes()); + dividedOffset = originalStore.getCommitLog().getMaxOffset(); + dividedOffset = dividedOffset - dividedOffset % originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog() + originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); + originalStore.shutdown(); + } + { + DefaultMessageStore recoverOriginalStore = createMessageStore(base, true); + Thread.sleep(500); + Assert.assertEquals(0, recoverOriginalStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(1000, recoverOriginalStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, recoverOriginalStore.dispatchBehindBytes()); + recoverOriginalStore.shutdown(); + } + { + DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true); + DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) dledgerStore.getCommitLog(); + Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset()); + Thread.sleep(2000); + doPutMessages(dledgerStore, topic, 0, 1000, 1000); + Thread.sleep(500); + Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(2000, dledgerStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, dledgerStore.dispatchBehindBytes()); + dledgerStore.shutdown(); + } + + } +} -- GitLab