未验证 提交 523b60cf 编写于 作者: V von gosling 提交者: GitHub

fix(replica): resolve abnormal slow recover

Add PhysicMsgTimestamp update logic when replicas recovering from the abnormal case.

Closes #1456
...@@ -22,6 +22,7 @@ import java.util.List; ...@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.config.StorePathConfigHelper;
public class ConsumeQueue { public class ConsumeQueue {
...@@ -397,6 +398,10 @@ public class ConsumeQueue { ...@@ -397,6 +398,10 @@ public class ConsumeQueue {
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(), boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset()); request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
if (result) { if (result) {
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
}
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp()); this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
return; return;
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册