From 5a19e6eee3d82723acaaf7fb9198274fb2cf2cf7 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Wed, 14 Nov 2018 15:54:08 +0800 Subject: [PATCH] Fix reput problem --- .../rocketmq/store/DefaultMessageStore.java | 8 +++----- .../store/dleger/DLegerCommitLog.java | 20 ++++++++++++------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index b3c1e7ba..231bc011 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -1817,12 +1817,10 @@ public class DefaultMessageStore implements MessageStore { this.reputFromOffset += size; } else { doNext = false; - if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { - log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}", - this.reputFromOffset); + log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}", + this.reputFromOffset); - this.reputFromOffset += result.getSize() - readSize; - } + this.reputFromOffset += result.getSize() - readSize; } } } diff --git a/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java index deb34f16..7a15c0a4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java @@ -227,14 +227,20 @@ public class DLegerCommitLog extends CommitLog { final boolean readBody) { try { int bodyOffset = DLegerEntry.BODY_OFFSET; - byteBuffer.position(byteBuffer.position() + bodyOffset); - DispatchRequest dispatchRequest = super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody); - if (dispatchRequest.isSuccess()) { - dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset); - } else if (dispatchRequest.getMsgSize() > 0) { - dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset); + int pos = byteBuffer.position(); + int magic = byteBuffer.getInt(); + if (magic == MmapFileList.BLANK_MAGIC_CODE) { + return new DispatchRequest(0, true); + } else { + byteBuffer.position(pos + bodyOffset); + DispatchRequest dispatchRequest = super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody); + if (dispatchRequest.isSuccess()) { + dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset); + } else if (dispatchRequest.getMsgSize() > 0) { + dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset); + } + return dispatchRequest; } - return dispatchRequest; } catch (Exception e) { } -- GitLab