未验证 提交 ea36854b 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #2733 from ayanamist/pop_consumer

[RIP-19] Pop Consuming (submodule "store")
...@@ -672,7 +672,7 @@ public class DefaultMessageStore implements MessageStore { ...@@ -672,7 +672,7 @@ public class DefaultMessageStore implements MessageStore {
} }
this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet(); this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
getResult.addMessage(selectResult); getResult.addMessage(selectResult, offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE));
status = GetMessageStatus.FOUND; status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE; nextPhyFileStartOffset = Long.MIN_VALUE;
} }
...@@ -1496,6 +1496,7 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1496,6 +1496,7 @@ public class DefaultMessageStore implements MessageStore {
return haService; return haService;
} }
@Override
public ScheduleMessageService getScheduleMessageService() { public ScheduleMessageService getScheduleMessageService() {
return scheduleMessageService; return scheduleMessageService;
} }
......
...@@ -27,6 +27,7 @@ public class GetMessageResult { ...@@ -27,6 +27,7 @@ public class GetMessageResult {
new ArrayList<SelectMappedBufferResult>(100); new ArrayList<SelectMappedBufferResult>(100);
private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100); private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100);
private final List<Long> messageQueueOffset = new ArrayList<>(100);
private GetMessageStatus status; private GetMessageStatus status;
private long nextBeginOffset; private long nextBeginOffset;
...@@ -90,6 +91,11 @@ public class GetMessageResult { ...@@ -90,6 +91,11 @@ public class GetMessageResult {
mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT); mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT);
} }
public void addMessage(final SelectMappedBufferResult mapedBuffer, final long queueOffset) {
addMessage(mapedBuffer);
this.messageQueueOffset.add(queueOffset);
}
public void release() { public void release() {
for (SelectMappedBufferResult select : this.messageMapedList) { for (SelectMappedBufferResult select : this.messageMapedList) {
select.release(); select.release();
...@@ -124,6 +130,10 @@ public class GetMessageResult { ...@@ -124,6 +130,10 @@ public class GetMessageResult {
this.msgCount4Commercial = msgCount4Commercial; this.msgCount4Commercial = msgCount4Commercial;
} }
public List<Long> getMessageQueueOffset() {
return messageQueueOffset;
}
@Override @Override
public String toString() { public String toString() {
return "GetMessageResult [status=" + status + ", nextBeginOffset=" + nextBeginOffset + ", minOffset=" return "GetMessageResult [status=" + status + ", nextBeginOffset=" + nextBeginOffset + ", minOffset="
......
...@@ -20,10 +20,10 @@ import java.util.HashMap; ...@@ -20,10 +20,10 @@ import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.schedule.ScheduleMessageService;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
/** /**
...@@ -383,6 +383,8 @@ public interface MessageStore { ...@@ -383,6 +383,8 @@ public interface MessageStore {
*/ */
ConsumeQueue getConsumeQueue(String topic, int queueId); ConsumeQueue getConsumeQueue(String topic, int queueId);
ScheduleMessageService getScheduleMessageService();
/** /**
* Get BrokerStatsManager of the messageStore. * Get BrokerStatsManager of the messageStore.
* *
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.pop;
public class AckMsg {
private long ackOffset;
private long startOffset;
private String consumerGroup;
private String topic;
private int queueId;
private long popTime;
public long getPopTime() {
return popTime;
}
public void setPopTime(long popTime) {
this.popTime = popTime;
}
public void setQueueId(int queueId) {
this.queueId = queueId;
}
public int getQueueId() {
return queueId;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getTopic() {
return topic;
}
public long getAckOffset() {
return ackOffset;
}
public String getConsumerGroup() {
return consumerGroup;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
public void setAckOffset(long ackOffset) {
this.ackOffset = ackOffset;
}
public long getStartOffset() {
return startOffset;
}
public void setStartOffset(long startOffset) {
this.startOffset = startOffset;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("AckMsg{");
sb.append("ackOffset=").append(ackOffset);
sb.append(", startOffset=").append(startOffset);
sb.append(", consumerGroup='").append(consumerGroup).append('\'');
sb.append(", topic='").append(topic).append('\'');
sb.append(", queueId=").append(queueId);
sb.append(", popTime=").append(popTime);
sb.append('}');
return sb.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.pop;
import java.util.ArrayList;
import java.util.List;
public class PopCheckPoint {
private long startOffset;
private long popTime;
private long invisibleTime;
private int bitMap;
private byte num;
private byte queueId;
private String topic;
private String cid;
private long reviveOffset;
private List<Integer> queueOffsetDiff;
public long getReviveOffset() {
return reviveOffset;
}
public void setReviveOffset(long reviveOffset) {
this.reviveOffset = reviveOffset;
}
public long getStartOffset() {
return startOffset;
}
public void setStartOffset(long startOffset) {
this.startOffset = startOffset;
}
public void getStartOffset(long startOffset) {
this.startOffset = startOffset;
}
public void setPopTime(long popTime) {
this.popTime = popTime;
}
public void setInvisibleTime(long invisibleTime) {
this.invisibleTime = invisibleTime;
}
public long getPopTime() {
return popTime;
}
public long getInvisibleTime() {
return invisibleTime;
}
public long getReviveTime() {
return popTime + invisibleTime;
}
public int getBitMap() {
return bitMap;
}
public void setBitMap(int bitMap) {
this.bitMap = bitMap;
}
public byte getNum() {
return num;
}
public void setNum(byte num) {
this.num = num;
}
public byte getQueueId() {
return queueId;
}
public void setQueueId(byte queueId) {
this.queueId = queueId;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getCId() {
return cid;
}
public void setCId(String cid) {
this.cid = cid;
}
public List<Integer> getQueueOffsetDiff() {
return queueOffsetDiff;
}
public void setQueueOffsetDiff(List<Integer> queueOffsetDiff) {
this.queueOffsetDiff = queueOffsetDiff;
}
public void addDiff(int diff) {
if (this.queueOffsetDiff == null) {
this.queueOffsetDiff = new ArrayList<>(8);
}
this.queueOffsetDiff.add(diff);
}
public int indexOfAck(long ackOffset) {
if (ackOffset < startOffset) {
return -1;
}
// old version of checkpoint
if (queueOffsetDiff == null || queueOffsetDiff.isEmpty()) {
if (ackOffset - startOffset < num) {
return (int) (ackOffset - startOffset);
}
return -1;
}
// new version of checkpoint
return queueOffsetDiff.indexOf((int) (ackOffset - startOffset));
}
public long ackOffsetByIndex(byte index) {
// old version of checkpoint
if (queueOffsetDiff == null || queueOffsetDiff.isEmpty()) {
return startOffset + index;
}
return startOffset + queueOffsetDiff.get(index);
}
@Override
public String toString() {
return "PopCheckPoint [topic=" + topic + ", cid=" + cid + ", queueId=" + queueId + ", startOffset=" + startOffset + ", bitMap=" + bitMap + ", num=" + num + ", reviveTime=" + getReviveTime()
+ ", reviveOffset=" + reviveOffset + ", diff=" + queueOffsetDiff + "]";
}
}
...@@ -16,25 +16,28 @@ ...@@ -16,25 +16,28 @@
*/ */
package org.apache.rocketmq.store.schedule; package org.apache.rocketmq.store.schedule;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.running.RunningStats; import org.apache.rocketmq.common.running.RunningStats;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.ConsumeQueueExt; import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DefaultMessageStore;
...@@ -221,6 +224,17 @@ public class ScheduleMessageService extends ConfigManager { ...@@ -221,6 +224,17 @@ public class ScheduleMessageService extends ConfigManager {
return true; return true;
} }
public int computeDelayLevel(long timeMillis) {
long intervalMillis = timeMillis - System.currentTimeMillis();
List<Map.Entry<Integer, Long>> sortedLevels = delayLevelTable.entrySet().stream().sorted(Comparator.comparingLong(Map.Entry::getValue)).collect(Collectors.toList());
for (Map.Entry<Integer, Long> entry : sortedLevels) {
if (entry.getValue() > intervalMillis) {
return entry.getKey();
}
}
return sortedLevels.get(sortedLevels.size() - 1).getKey();
}
class DeliverDelayedMessageTimerTask extends TimerTask { class DeliverDelayedMessageTimerTask extends TimerTask {
private final int delayLevel; private final int delayLevel;
private final long offset; private final long offset;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册