From f18dc03809a36bf14f6d911a3813cf6d4f606ae2 Mon Sep 17 00:00:00 2001 From: yukon Date: Thu, 2 Nov 2017 11:38:38 +0800 Subject: [PATCH] [ROCKETMQ-270] Move flush position forward to first MappedFile whose start offset is non-zero. --- .../rocketmq/store/MappedFileQueue.java | 4 +-- .../rocketmq/store/MappedFileQueueTest.java | 26 ++++++++++++++++--- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index edf4c918..0ee1b9f9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -421,7 +421,7 @@ public class MappedFileQueue { public boolean flush(final int flushLeastPages) { boolean result = true; - MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, false); + MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); int offset = mappedFile.flush(flushLeastPages); @@ -438,7 +438,7 @@ public class MappedFileQueue { public boolean commit(final int commitLeastPages) { boolean result = true; - MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, false); + MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0); if (mappedFile != null) { int offset = mappedFile.commit(commitLeastPages); long where = mappedFile.getFileFromOffset() + offset; diff --git a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java index 203dfcd5..dd7b229d 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java @@ -20,9 +20,7 @@ package org.apache.rocketmq.store; import java.io.File; import java.nio.ByteBuffer; import java.util.Arrays; - import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; import org.junit.Test; @@ -47,7 +45,7 @@ public class MappedFileQueueTest { } @Test - public void test_findMappedFileByOffset() { + public void testFindMappedFileByOffset() { // four-byte string. final String fixedMsg = "abcd"; @@ -97,6 +95,28 @@ public class MappedFileQueueTest { mappedFileQueue.destroy(); } + @Test + public void testFindMappedFileByOffset_StartOffsetIsNonZero() { + MappedFileQueue mappedFileQueue = + new MappedFileQueue("target/unit_test_store/b/", 1024, null); + + //Start from a non-zero offset + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(1024); + assertThat(mappedFile).isNotNull(); + + assertThat(mappedFileQueue.findMappedFileByOffset(1025)).isEqualTo(mappedFile); + + assertThat(mappedFileQueue.findMappedFileByOffset(0)).isNull(); + assertThat(mappedFileQueue.findMappedFileByOffset(123, false)).isNull(); + assertThat(mappedFileQueue.findMappedFileByOffset(123, true)).isEqualTo(mappedFile); + + assertThat(mappedFileQueue.findMappedFileByOffset(0, false)).isNull(); + assertThat(mappedFileQueue.findMappedFileByOffset(0, true)).isEqualTo(mappedFile); + + mappedFileQueue.shutdown(1000); + mappedFileQueue.destroy(); + } + @Test public void testAppendMessage() { final String fixedMsg = "0123456789abcdef"; -- GitLab