diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index a63d3746b30663e562778201f6c2ee4dd1365b17..87ff0a096cf57a7f0d1ea7a82a4f3e94a262e374 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.StorePathConfigHelper; public class ConsumeQueue { @@ -397,6 +398,10 @@ public class ConsumeQueue { boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(), request.getMsgSize(), tagsCode, request.getConsumeQueueOffset()); if (result) { + if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE || + this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) { + this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp()); + } this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp()); return; } else {