From 368e7c86a0b06099f336c81672112dcb5143cf9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=82=85=E5=86=B2?= Date: Tue, 5 Sep 2017 20:28:23 +0800 Subject: [PATCH] [ROCKETMQ-265] fix consume queue's data maybe repeat bug MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Author: 傅冲 Author: fuyou001 Closes #146 from fuyou001/ROCKETMQ-265. --- .../apache/rocketmq/store/ConsumeQueue.java | 7 ++ .../rocketmq/store/ConsumeQueueTest.java | 74 +++++++++++++++++-- 2 files changed, 73 insertions(+), 8 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 0bf0aa9a..4922e3d9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -446,6 +446,13 @@ public class ConsumeQueue { if (cqOffset != 0) { long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset(); + + if (expectLogicOffset < currentLogicOffset) { + log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", + expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset); + return true; + } + if (expectLogicOffset != currentLogicOffset) { LOG_ERROR.warn( "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java index b03f2fce..b7d38f8c 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java @@ -17,22 +17,21 @@ package org.apache.rocketmq.store; -import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.apache.rocketmq.store.stats.BrokerStatsManager; -import org.junit.Test; - import java.io.File; +import java.lang.reflect.Method; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.Map; - +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.stats.BrokerStatsManager; import static org.assertj.core.api.Assertions.assertThat; +import org.junit.Test; public class ConsumeQueueTest { @@ -131,6 +130,65 @@ public class ConsumeQueueTest { } } + protected void deleteDirectory(String rootPath) { + File file = new File(rootPath); + deleteFile(file); + } + + protected void deleteFile(File file) { + File[] subFiles = file.listFiles(); + if (subFiles != null) { + for (File sub : subFiles) { + deleteFile(sub); + } + } + + file.delete(); + } + + @Test + public void testPutMessagePositionInfo_buildCQRepeatedly() throws Exception { + DefaultMessageStore messageStore = null; + try { + + messageStore = gen(); + + int totalMessages = 10; + + for (int i = 0; i < totalMessages; i++) { + putMsg(messageStore); + } + Thread.sleep(5); + + ConsumeQueue cq = messageStore.getConsumeQueueTable().get(topic).get(queueId); + Method method = cq.getClass().getDeclaredMethod("putMessagePositionInfo", long.class, int.class, long.class, long.class); + + assertThat(method).isNotNull(); + + method.setAccessible(true); + + SelectMappedBufferResult result = messageStore.getCommitLog().getData(0); + assertThat(result != null).isTrue(); + + DispatchRequest dispatchRequest = messageStore.getCommitLog().checkMessageAndReturnSize(result.getByteBuffer(), false, false); + + assertThat(cq).isNotNull(); + + Object dispatchResult = method.invoke(cq, dispatchRequest.getCommitLogOffset(), + dispatchRequest.getMsgSize(), dispatchRequest.getTagsCode(), dispatchRequest.getConsumeQueueOffset()); + + assertThat(Boolean.parseBoolean(dispatchResult.toString())).isTrue(); + + } finally { + if (messageStore != null) { + messageStore.shutdown(); + messageStore.destroy(); + } + deleteDirectory(storePath); + } + + } + @Test public void testConsumeQueueWithExtendData() { DefaultMessageStore master = null; -- GitLab