提交 5a19e6ee 编写于 作者: D dongeforever

Fix reput problem

上级 84bd6409
......@@ -1817,12 +1817,10 @@ public class DefaultMessageStore implements MessageStore {
this.reputFromOffset += size;
} else {
doNext = false;
if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
this.reputFromOffset += result.getSize() - readSize;
}
this.reputFromOffset += result.getSize() - readSize;
}
}
}
......
......@@ -227,14 +227,20 @@ public class DLegerCommitLog extends CommitLog {
final boolean readBody) {
try {
int bodyOffset = DLegerEntry.BODY_OFFSET;
byteBuffer.position(byteBuffer.position() + bodyOffset);
DispatchRequest dispatchRequest = super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody);
if (dispatchRequest.isSuccess()) {
dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset);
} else if (dispatchRequest.getMsgSize() > 0) {
dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset);
int pos = byteBuffer.position();
int magic = byteBuffer.getInt();
if (magic == MmapFileList.BLANK_MAGIC_CODE) {
return new DispatchRequest(0, true);
} else {
byteBuffer.position(pos + bodyOffset);
DispatchRequest dispatchRequest = super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody);
if (dispatchRequest.isSuccess()) {
dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset);
} else if (dispatchRequest.getMsgSize() > 0) {
dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset);
}
return dispatchRequest;
}
return dispatchRequest;
} catch (Exception e) {
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册