提交 44f696c3 编写于 作者: D dongeforever

Add deleting test for mixed commitlog

上级 927385be
......@@ -30,6 +30,7 @@ import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
......@@ -70,6 +71,7 @@ public class DLedgerCommitLog extends CommitLog {
//The old commitlog should be deleted before the dledger commitlog
private final boolean originalDledgerEnableForceClean;
private final AtomicBoolean hasSetOriginalDledgerEnableForceClean = new AtomicBoolean(false);
private boolean isInrecoveringOldCommitlog = false;
......@@ -171,12 +173,28 @@ public class DLedgerCommitLog extends CommitLog {
final long intervalForcibly,
final boolean cleanImmediately
) {
int count = super.deleteExpiredFile(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
if (mappedFileQueue.getMappedFiles().isEmpty()) {
dLedgerConfig.setEnableDiskForceClean(originalDledgerEnableForceClean);
if (hasSetOriginalDledgerEnableForceClean.compareAndSet(false, true)) {
dLedgerConfig.setEnableDiskForceClean(originalDledgerEnableForceClean);
}
//To prevent too much log in defaultMessageStore
return Integer.MAX_VALUE;
}
int count = super.deleteExpiredFile(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
if (count > 0 || mappedFileQueue.getMappedFiles().size() != 1) {
return count;
}
//the old logic will keep the last file, here to delete it
MappedFile mappedFile = mappedFileQueue.getLastMappedFile();
log.info("Try to delete the last old commitlog file {}", mappedFile.getFileName());
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
while (!mappedFile.destroy(10 * 1000)) {
io.openmessaging.storage.dledger.utils.UtilAll.sleep(1000);
}
mappedFileQueue.getMappedFiles().remove(mappedFile);
}
//return 1 to prevent too much log in defaultMessageStore
return count > 0 ? count : 1;
return 1;
}
......@@ -238,6 +256,8 @@ public class DLedgerCommitLog extends CommitLog {
if (mappedFile != null) {
dLedgerConfig.setEnableDiskForceClean(false);
dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize();
} else {
hasSetOriginalDledgerEnableForceClean.set(true);
}
return;
}
......@@ -251,6 +271,7 @@ public class DLedgerCommitLog extends CommitLog {
isInrecoveringOldCommitlog = false;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (mappedFile == null) {
hasSetOriginalDledgerEnableForceClean.set(true);
return;
}
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
......
......@@ -19,33 +19,40 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
@Test
public void testReputOffset() throws Exception {
public void testRecover() throws Exception {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
String topic = UUID.randomUUID().toString();
{
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false);
Thread.sleep(1000);
String topic = UUID.randomUUID().toString();
doPutMessages(messageStore, topic, 0, 1000, 0);
Thread.sleep(100);
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
doGetMessages(messageStore, topic, 0, 1000, 0);
messageStore.shutdown();
}
{
//normal recover
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false);
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
doGetMessages(messageStore, topic, 0, 1000, 0);
messageStore.shutdown();
}
{
//abnormal recover
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, true);
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
doGetMessages(messageStore, topic, 0, 1000, 0);
messageStore.shutdown();
}
}
......
......@@ -99,6 +99,7 @@ public class MessageStoreTestBase extends StoreTestBase {
Assert.assertTrue(!getMessageResult.getMessageBufferList().isEmpty());
MessageExt messageExt = MessageDecoder.decode(getMessageResult.getMessageBufferList().get(0));
Assert.assertEquals(beginLogicsOffset + i, messageExt.getQueueOffset());
getMessageResult.release();
}
}
......
......@@ -65,6 +65,64 @@ public class MixCommitlogTest extends MessageStoreTestBase {
doGetMessages(recoverDledgerStore, topic, 0, 3000, 0);
recoverDledgerStore.shutdown();
}
}
@Test
public void testDeleteExpiredFiles() throws Exception {
String base = createBaseDir();
String topic = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
long dividedOffset;
{
DefaultMessageStore originalStore = createMessageStore(base, false);
doPutMessages(originalStore, topic, 0, 1000, 0);
Thread.sleep(500);
Assert.assertEquals(0, originalStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, originalStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, originalStore.dispatchBehindBytes());
dividedOffset = originalStore.getCommitLog().getMaxOffset();
dividedOffset = dividedOffset - dividedOffset % originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog() + originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
originalStore.shutdown();
}
long maxPhysicalOffset;
{
DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) dledgerStore.getCommitLog();
Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean());
Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset());
Thread.sleep(2000);
doPutMessages(dledgerStore, topic, 0, 1000, 1000);
Thread.sleep(500);
Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(2000, dledgerStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, dledgerStore.dispatchBehindBytes());
Assert.assertEquals(0, dledgerStore.getMinPhyOffset());
maxPhysicalOffset = dledgerStore.getMaxPhyOffset();
doGetMessages(dledgerStore, topic, 0, 2000, 0);
for (int i = 0; i < 100; i++) {
dledgerStore.getCommitLog().deleteExpiredFile(System.currentTimeMillis(), 0, 0, true);
}
Assert.assertEquals(dividedOffset, dledgerStore.getMinPhyOffset());
Assert.assertEquals(maxPhysicalOffset, dledgerStore.getMaxPhyOffset());
for (int i = 0; i < 100; i++) {
Assert.assertEquals(Integer.MAX_VALUE, dledgerStore.getCommitLog().deleteExpiredFile(System.currentTimeMillis(), 0, 0, true));
}
Assert.assertEquals(dividedOffset, dledgerStore.getMinPhyOffset());
Assert.assertEquals(maxPhysicalOffset, dledgerStore.getMaxPhyOffset());
Assert.assertTrue(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean());
dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().setEnableDiskForceClean(false);
for (int i = 0; i < 100; i++) {
Assert.assertEquals(Integer.MAX_VALUE, dledgerStore.getCommitLog().deleteExpiredFile(System.currentTimeMillis(), 0, 0, true));
}
//should not change the value
Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean());
doGetMessages(dledgerStore, topic, 0, 1000, 1000);
dledgerStore.shutdown();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册