diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 0bf0aa9a5b9ee8f23700b699dfd5d54a8bef714c..4922e3d97b5af4ba75a4b99577035b6a437710e3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -446,6 +446,13 @@ public class ConsumeQueue { if (cqOffset != 0) { long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset(); + + if (expectLogicOffset < currentLogicOffset) { + log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", + expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset); + return true; + } + if (expectLogicOffset != currentLogicOffset) { LOG_ERROR.warn( "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java index b03f2fce7a0d2dd5aacc35d1bcbb3430c718bfd2..b7d38f8c78f44cc308b370af515a1ce2a08cb337 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java @@ -17,22 +17,21 @@ package org.apache.rocketmq.store; -import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.apache.rocketmq.store.stats.BrokerStatsManager; -import org.junit.Test; - import java.io.File; +import java.lang.reflect.Method; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.Map; - +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.stats.BrokerStatsManager; import static org.assertj.core.api.Assertions.assertThat; +import org.junit.Test; public class ConsumeQueueTest { @@ -131,6 +130,65 @@ public class ConsumeQueueTest { } } + protected void deleteDirectory(String rootPath) { + File file = new File(rootPath); + deleteFile(file); + } + + protected void deleteFile(File file) { + File[] subFiles = file.listFiles(); + if (subFiles != null) { + for (File sub : subFiles) { + deleteFile(sub); + } + } + + file.delete(); + } + + @Test + public void testPutMessagePositionInfo_buildCQRepeatedly() throws Exception { + DefaultMessageStore messageStore = null; + try { + + messageStore = gen(); + + int totalMessages = 10; + + for (int i = 0; i < totalMessages; i++) { + putMsg(messageStore); + } + Thread.sleep(5); + + ConsumeQueue cq = messageStore.getConsumeQueueTable().get(topic).get(queueId); + Method method = cq.getClass().getDeclaredMethod("putMessagePositionInfo", long.class, int.class, long.class, long.class); + + assertThat(method).isNotNull(); + + method.setAccessible(true); + + SelectMappedBufferResult result = messageStore.getCommitLog().getData(0); + assertThat(result != null).isTrue(); + + DispatchRequest dispatchRequest = messageStore.getCommitLog().checkMessageAndReturnSize(result.getByteBuffer(), false, false); + + assertThat(cq).isNotNull(); + + Object dispatchResult = method.invoke(cq, dispatchRequest.getCommitLogOffset(), + dispatchRequest.getMsgSize(), dispatchRequest.getTagsCode(), dispatchRequest.getConsumeQueueOffset()); + + assertThat(Boolean.parseBoolean(dispatchResult.toString())).isTrue(); + + } finally { + if (messageStore != null) { + messageStore.shutdown(); + messageStore.destroy(); + } + deleteDirectory(storePath); + } + + } + @Test public void testConsumeQueueWithExtendData() { DefaultMessageStore master = null;