diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 14a8848b20d4011db59fbeb8525cd0fbde69c648..bb92cc0cabdcb8fd8153f7ba6d9b66ed2b0d7ba5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -672,7 +672,7 @@ public class DefaultMessageStore implements MessageStore { } this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet(); - getResult.addMessage(selectResult); + getResult.addMessage(selectResult, offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE)); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; } @@ -1496,6 +1496,7 @@ public class DefaultMessageStore implements MessageStore { return haService; } + @Override public ScheduleMessageService getScheduleMessageService() { return scheduleMessageService; } diff --git a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java index 996e24d8058cf9b88ec18c2c456e4af11e755fcd..6fcb3101a139ac8d5b1ef96866288b8f513801ca 100644 --- a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java @@ -27,6 +27,7 @@ public class GetMessageResult { new ArrayList(100); private final List messageBufferList = new ArrayList(100); + private final List messageQueueOffset = new ArrayList<>(100); private GetMessageStatus status; private long nextBeginOffset; @@ -90,6 +91,11 @@ public class GetMessageResult { mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT); } + public void addMessage(final SelectMappedBufferResult mapedBuffer, final long queueOffset) { + addMessage(mapedBuffer); + this.messageQueueOffset.add(queueOffset); + } + public void release() { for (SelectMappedBufferResult select : this.messageMapedList) { select.release(); @@ -124,6 +130,10 @@ public class GetMessageResult { this.msgCount4Commercial = msgCount4Commercial; } + public List getMessageQueueOffset() { + return messageQueueOffset; + } + @Override public String toString() { return "GetMessageResult [status=" + status + ", nextBeginOffset=" + nextBeginOffset + ", minOffset=" diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 64eb5250de6e59f7ce3ef19033cd40b5d5f0254f..0cea607677d4200e77885710168018357c7608f0 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -20,10 +20,10 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.Set; import java.util.concurrent.CompletableFuture; - import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.store.config.BrokerRole; +import org.apache.rocketmq.store.schedule.ScheduleMessageService; import org.apache.rocketmq.store.stats.BrokerStatsManager; /** @@ -383,6 +383,8 @@ public interface MessageStore { */ ConsumeQueue getConsumeQueue(String topic, int queueId); + ScheduleMessageService getScheduleMessageService(); + /** * Get BrokerStatsManager of the messageStore. * diff --git a/store/src/main/java/org/apache/rocketmq/store/pop/AckMsg.java b/store/src/main/java/org/apache/rocketmq/store/pop/AckMsg.java new file mode 100644 index 0000000000000000000000000000000000000000..ab017a951cd8871ded56035287c63cab72dd64de --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/pop/AckMsg.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.pop; + +public class AckMsg { + private long ackOffset; + private long startOffset; + private String consumerGroup; + private String topic; + private int queueId; + private long popTime; + + public long getPopTime() { + return popTime; + } + + public void setPopTime(long popTime) { + this.popTime = popTime; + } + + public void setQueueId(int queueId) { + this.queueId = queueId; + } + + public int getQueueId() { + return queueId; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getTopic() { + return topic; + } + + public long getAckOffset() { + return ackOffset; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public void setAckOffset(long ackOffset) { + this.ackOffset = ackOffset; + } + + public long getStartOffset() { + return startOffset; + } + + public void setStartOffset(long startOffset) { + this.startOffset = startOffset; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("AckMsg{"); + sb.append("ackOffset=").append(ackOffset); + sb.append(", startOffset=").append(startOffset); + sb.append(", consumerGroup='").append(consumerGroup).append('\''); + sb.append(", topic='").append(topic).append('\''); + sb.append(", queueId=").append(queueId); + sb.append(", popTime=").append(popTime); + sb.append('}'); + return sb.toString(); + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java new file mode 100644 index 0000000000000000000000000000000000000000..f5b7ac6c53890a93727673f3c029534a81ae6b18 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.pop; + +import java.util.ArrayList; +import java.util.List; + +public class PopCheckPoint { + private long startOffset; + private long popTime; + private long invisibleTime; + private int bitMap; + private byte num; + private byte queueId; + private String topic; + private String cid; + private long reviveOffset; + private List queueOffsetDiff; + + public long getReviveOffset() { + return reviveOffset; + } + + public void setReviveOffset(long reviveOffset) { + this.reviveOffset = reviveOffset; + } + + public long getStartOffset() { + return startOffset; + } + + public void setStartOffset(long startOffset) { + this.startOffset = startOffset; + } + + public void getStartOffset(long startOffset) { + this.startOffset = startOffset; + } + + public void setPopTime(long popTime) { + this.popTime = popTime; + } + + public void setInvisibleTime(long invisibleTime) { + this.invisibleTime = invisibleTime; + } + + public long getPopTime() { + return popTime; + } + + public long getInvisibleTime() { + return invisibleTime; + } + + public long getReviveTime() { + return popTime + invisibleTime; + } + + public int getBitMap() { + return bitMap; + } + + public void setBitMap(int bitMap) { + this.bitMap = bitMap; + } + + public byte getNum() { + return num; + } + + public void setNum(byte num) { + this.num = num; + } + + public byte getQueueId() { + return queueId; + } + + public void setQueueId(byte queueId) { + this.queueId = queueId; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getCId() { + return cid; + } + + public void setCId(String cid) { + this.cid = cid; + } + + public List getQueueOffsetDiff() { + return queueOffsetDiff; + } + + public void setQueueOffsetDiff(List queueOffsetDiff) { + this.queueOffsetDiff = queueOffsetDiff; + } + + public void addDiff(int diff) { + if (this.queueOffsetDiff == null) { + this.queueOffsetDiff = new ArrayList<>(8); + } + this.queueOffsetDiff.add(diff); + } + + public int indexOfAck(long ackOffset) { + if (ackOffset < startOffset) { + return -1; + } + + // old version of checkpoint + if (queueOffsetDiff == null || queueOffsetDiff.isEmpty()) { + + if (ackOffset - startOffset < num) { + return (int) (ackOffset - startOffset); + } + + return -1; + } + + // new version of checkpoint + return queueOffsetDiff.indexOf((int) (ackOffset - startOffset)); + } + + public long ackOffsetByIndex(byte index) { + // old version of checkpoint + if (queueOffsetDiff == null || queueOffsetDiff.isEmpty()) { + return startOffset + index; + } + + return startOffset + queueOffsetDiff.get(index); + } + + @Override + public String toString() { + return "PopCheckPoint [topic=" + topic + ", cid=" + cid + ", queueId=" + queueId + ", startOffset=" + startOffset + ", bitMap=" + bitMap + ", num=" + num + ", reviveTime=" + getReviveTime() + + ", reviveOffset=" + reviveOffset + ", diff=" + queueOffsetDiff + "]"; + } + +} diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index 3b19a16fd4b7fc238b9e5697a32c2672fd270e19..ee994c37a6da3185309fef1e9e5dbeb8f3bf8adc 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -16,25 +16,28 @@ */ package org.apache.rocketmq.store.schedule; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.topic.TopicValidator; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.running.RunningStats; +import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.ConsumeQueueExt; import org.apache.rocketmq.store.DefaultMessageStore; @@ -221,6 +224,17 @@ public class ScheduleMessageService extends ConfigManager { return true; } + public int computeDelayLevel(long timeMillis) { + long intervalMillis = timeMillis - System.currentTimeMillis(); + List> sortedLevels = delayLevelTable.entrySet().stream().sorted(Comparator.comparingLong(Map.Entry::getValue)).collect(Collectors.toList()); + for (Map.Entry entry : sortedLevels) { + if (entry.getValue() > intervalMillis) { + return entry.getKey(); + } + } + return sortedLevels.get(sortedLevels.size() - 1).getKey(); + } + class DeliverDelayedMessageTimerTask extends TimerTask { private final int delayLevel; private final long offset;