提交 368e7c86 编写于 作者: 傅冲 提交者: yukon

[ROCKETMQ-265] fix consume queue's data maybe repeat bug

Author: 傅冲 <yubao.fyb@alibaba-inc.com>
Author: fuyou001 <fuyou001@gmail.com>

Closes #146 from fuyou001/ROCKETMQ-265.
上级 6a97d288
......@@ -446,6 +446,13 @@ public class ConsumeQueue {
if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
......
......@@ -17,22 +17,21 @@
package org.apache.rocketmq.store;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Test;
import java.io.File;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Test;
public class ConsumeQueueTest {
......@@ -131,6 +130,65 @@ public class ConsumeQueueTest {
}
}
protected void deleteDirectory(String rootPath) {
File file = new File(rootPath);
deleteFile(file);
}
protected void deleteFile(File file) {
File[] subFiles = file.listFiles();
if (subFiles != null) {
for (File sub : subFiles) {
deleteFile(sub);
}
}
file.delete();
}
@Test
public void testPutMessagePositionInfo_buildCQRepeatedly() throws Exception {
DefaultMessageStore messageStore = null;
try {
messageStore = gen();
int totalMessages = 10;
for (int i = 0; i < totalMessages; i++) {
putMsg(messageStore);
}
Thread.sleep(5);
ConsumeQueue cq = messageStore.getConsumeQueueTable().get(topic).get(queueId);
Method method = cq.getClass().getDeclaredMethod("putMessagePositionInfo", long.class, int.class, long.class, long.class);
assertThat(method).isNotNull();
method.setAccessible(true);
SelectMappedBufferResult result = messageStore.getCommitLog().getData(0);
assertThat(result != null).isTrue();
DispatchRequest dispatchRequest = messageStore.getCommitLog().checkMessageAndReturnSize(result.getByteBuffer(), false, false);
assertThat(cq).isNotNull();
Object dispatchResult = method.invoke(cq, dispatchRequest.getCommitLogOffset(),
dispatchRequest.getMsgSize(), dispatchRequest.getTagsCode(), dispatchRequest.getConsumeQueueOffset());
assertThat(Boolean.parseBoolean(dispatchResult.toString())).isTrue();
} finally {
if (messageStore != null) {
messageStore.shutdown();
messageStore.destroy();
}
deleteDirectory(storePath);
}
}
@Test
public void testConsumeQueueWithExtendData() {
DefaultMessageStore master = null;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册