diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java index c00898c3c4e0741f09369a4f51063affe5f4448c..863da627e7db64f8bc9321574a1af5f9e4737dab 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java @@ -149,7 +149,7 @@ public class Broker2Client { long timeStampOffset; if (timeStamp == -1) { - timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i); + timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i); } else { timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java index 1a53db198bb08ae7cfffb3ceb59f4af89a2d63c5..71f56a4bd36b68af5c3805f3cac8faf2f4c1543a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java @@ -98,7 +98,7 @@ public class PullRequestHoldService extends ServiceThread { if (2 == kArray.length) { String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]); - final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId); + final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); try { this.notifyMessageArriving(topic, queueId, offset); } catch (Throwable e) { @@ -124,7 +124,7 @@ public class PullRequestHoldService extends ServiceThread { for (PullRequest request : requestList) { long newestOffset = maxOffset; if (newestOffset <= request.getPullFromThisOffset()) { - newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId); + newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); } if (newestOffset > request.getPullFromThisOffset()) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index bdcf30c40ec1dc7b8420959b6fd70cf7fd5c02e0..769c4ad0fb2dbecfb0965156e64343052a2fe48a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -73,7 +73,7 @@ public class ConsumerOffsetManager extends ConfigManager { while (it.hasNext() && result) { Entry next = it.next(); - long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, next.getKey()); + long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, next.getKey()); long offsetInPersist = next.getValue(); result = offsetInPersist <= minOffsetInStore; } @@ -201,7 +201,7 @@ public class ConsumerOffsetManager extends ConfigManager { String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR); if (topic.equals(topicGroupArr[0])) { for (Entry entry : offSetEntry.getValue().entrySet()) { - long minOffset = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, entry.getKey()); + long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, entry.getKey()); if (entry.getValue() >= minOffset) { Long offset = queueMinOffset.get(entry.getKey()); if (offset == null) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java index 8ded973e0eb14ced3971524f34086c3d2d834a00..690f70bfc88d4333252390c99976d186376075cb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java @@ -92,18 +92,18 @@ public abstract class AbstractPluginMessageStore implements MessageStore { } @Override - public long getMaxOffsetInQuque(String topic, int queueId) { - return next.getMaxOffsetInQuque(topic, queueId); + public long getMaxOffsetInQueue(String topic, int queueId) { + return next.getMaxOffsetInQueue(topic, queueId); } @Override - public long getMinOffsetInQuque(String topic, int queueId) { - return next.getMinOffsetInQuque(topic, queueId); + public long getMinOffsetInQueue(String topic, int queueId) { + return next.getMinOffsetInQueue(topic, queueId); } @Override - public long getCommitLogOffsetInQueue(String topic, int queueId, long cqOffset) { - return next.getCommitLogOffsetInQueue(topic, queueId, cqOffset); + public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) { + return next.getCommitLogOffsetInQueue(topic, queueId, consumeQueueOffset); } @Override @@ -152,8 +152,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore { } @Override - public long getMessageStoreTimeStamp(String topic, int queueId, long offset) { - return next.getMessageStoreTimeStamp(topic, queueId, offset); + public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) { + return next.getMessageStoreTimeStamp(topic, queueId, consumeQueueOffset); } @Override @@ -172,8 +172,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore { } @Override - public void excuteDeleteFilesManualy() { - next.excuteDeleteFilesManualy(); + public void executeDeleteFilesManually() { + next.executeDeleteFilesManually(); } @Override diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index daea53c7de89fb05dc2d0ec4e96f841b655bc3da..f59d2952e1b1468ad2d1d588d3d1ab86347d2b56 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -376,7 +376,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { final GetMaxOffsetRequestHeader requestHeader = (GetMaxOffsetRequestHeader) request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class); - long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId()); + long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); responseHeader.setOffset(offset); @@ -391,7 +391,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { final GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class); - long offset = this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId()); + long offset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); responseHeader.setOffset(offset); response.setCode(ResponseCode.SUCCESS); @@ -537,11 +537,11 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { mq.setQueueId(i); TopicOffset topicOffset = new TopicOffset(); - long min = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, i); + long min = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, i); if (min < 0) min = 0; - long max = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i); + long max = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i); if (max < 0) max = 0; @@ -679,7 +679,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { OffsetWrapper offsetWrapper = new OffsetWrapper(); - long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i); + long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i); if (brokerOffset < 0) brokerOffset = 0; @@ -862,7 +862,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { long minTime = this.brokerController.getMessageStore().getEarliestMessageTime(topic, i); timeSpan.setMinTimeStamp(minTime); - long max = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i); + long max = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i); long maxTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max - 1); timeSpan.setMaxTimeStamp(maxTime); @@ -876,7 +876,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } timeSpan.setConsumeTimeStamp(consumeTime); - long maxBrokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(), i); + long maxBrokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), i); if (consumerOffset < maxBrokerOffset) { long nextTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset); timeSpan.setDelayTime(System.currentTimeMillis() - nextTime); @@ -1126,7 +1126,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); mq.setQueueId(i); OffsetWrapper offsetWrapper = new OffsetWrapper(); - long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i); + long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i); if (brokerOffset < 0) brokerOffset = 0; long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(// diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java index 2c1029c55584a29035c54a2629e6107f991e0b66..bb427050d9c5c82b735818ff1873ef945dd3e094 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java @@ -135,7 +135,7 @@ public class ConsumerManageProcessor implements NettyRequestProcessor { response.setRemark(null); } else { long minOffset = - this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(), + this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); if (minOffset <= 0 && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset( diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 7bed62c7c56b21336581e9209396d54703b11377..931edc7656767e993e01ee03c3bb3b97bef2dba5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -580,7 +580,7 @@ public class DefaultMessageStore implements MessageStore { /** */ - public long getMaxOffsetInQuque(String topic, int queueId) { + public long getMaxOffsetInQueue(String topic, int queueId) { ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { long offset = logic.getMaxOffsetInQueue(); @@ -593,7 +593,7 @@ public class DefaultMessageStore implements MessageStore { /** */ - public long getMinOffsetInQuque(String topic, int queueId) { + public long getMinOffsetInQueue(String topic, int queueId) { ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { return logic.getMinOffsetInQueue(); @@ -603,10 +603,10 @@ public class DefaultMessageStore implements MessageStore { } @Override - public long getCommitLogOffsetInQueue(String topic, int queueId, long cqOffset) { + public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) { ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) { - SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(cqOffset); + SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(consumeQueueOffset); if (bufferConsumeQueue != null) { try { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); @@ -740,10 +740,10 @@ public class DefaultMessageStore implements MessageStore { } @Override - public long getMessageStoreTimeStamp(String topic, int queueId, long offset) { + public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) { ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId); if (logicQueue != null) { - SelectMappedBufferResult result = logicQueue.getIndexBuffer(offset); + SelectMappedBufferResult result = logicQueue.getIndexBuffer(consumeQueueOffset); if (result != null) { try { final long phyOffset = result.getByteBuffer().getLong(); @@ -798,7 +798,7 @@ public class DefaultMessageStore implements MessageStore { } @Override - public void excuteDeleteFilesManualy() { + public void executeDeleteFilesManually() { this.cleanCommitLogService.excuteDeleteFilesManualy(); } @@ -1434,7 +1434,7 @@ public class DefaultMessageStore implements MessageStore { public void excuteDeleteFilesManualy() { this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES; - DefaultMessageStore.log.info("excuteDeleteFilesManualy was invoked"); + DefaultMessageStore.log.info("executeDeleteFilesManually was invoked"); } public void run() { diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index e841c083b59deb9c47d854d270be9530949c9003..55572ce10f4624553741be9b8ed2510f110d1f5d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -22,91 +22,304 @@ import java.util.Set; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; +/** + * This class defines contracting interfaces to implement, allowing third-party vendor to use customized message store. + */ public interface MessageStore { + /** + * Load previously stored messages. + * @return true if success; false otherwise. + */ boolean load(); + /** + * Launch this message store. + * @throws Exception if there is any error. + */ void start() throws Exception; + /** + * Shutdown this message store. + */ void shutdown(); + /** + * Destroy this message store. Generally, all persistent files should be removed after invocation. + */ void destroy(); + /** + * Store a message into store. + * @param msg Message instance to store + * @return result of store operation. + */ PutMessageResult putMessage(final MessageExtBrokerInner msg); + /** + * Store a batch of messages. + * @param messageExtBatch Message batch. + * @return result of storing batch messages. + */ PutMessageResult putMessages(final MessageExtBatch messageExtBatch); + /** + * Query at most maxMsgNums messages belonging to topic at queueId starting + * from given offset. Resulting messages will further be screened using provided message filter. + * + * @param group Consumer group that launches this query. + * @param topic Topic to query. + * @param queueId Queue ID to query. + * @param offset Logical offset to start from. + * @param maxMsgNums Maximum count of messages to query. + * @param messageFilter Message filter used to screen desired messages. + * @return Matched messages. + */ GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final MessageFilter messageFilter); - long getMaxOffsetInQuque(final String topic, final int queueId); - - long getMinOffsetInQuque(final String topic, final int queueId); - - long getCommitLogOffsetInQueue(final String topic, final int queueId, final long cqOffset); - + /** + * Get maximum offset of the topic queue. + * @param topic Topic name. + * @param queueId Queue ID. + * @return Maximum offset at present. + */ + long getMaxOffsetInQueue(final String topic, final int queueId); + + /** + * Get the minimum offset of the topic queue. + * @param topic Topic name. + * @param queueId Queue ID. + * @return Minimum offset at present. + */ + long getMinOffsetInQueue(final String topic, final int queueId); + + /** + * Get the offset of the message in the commit log, which is also known as physical offset. + * @param topic Topic of the message to lookup. + * @param queueId Queue ID. + * @param consumeQueueOffset offset of consume queue. + * @return physical offset. + */ + long getCommitLogOffsetInQueue(final String topic, final int queueId, final long consumeQueueOffset); + + /** + * Look up the physical offset of the message whose store timestamp is as specified. + * @param topic Topic of the message. + * @param queueId Queue ID. + * @param timestamp Timestamp to look up. + * @return physical offset which matches. + */ long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp); + /** + * Look up the message by given commit log offset. + * @param commitLogOffset physical offset. + * @return Message whose physical offset is as specified. + */ MessageExt lookMessageByOffset(final long commitLogOffset); + /** + * Get one message from the specified commit log offset. + * @param commitLogOffset commit log offset. + * @return wrapped result of the message. + */ SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset); + /** + * Get one message from the specified commit log offset. + * @param commitLogOffset commit log offset. + * @param msgSize message size. + * @return wrapped result of the message. + */ SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset, final int msgSize); + /** + * Get the running information of this store. + * @return message store running info. + */ String getRunningDataInfo(); + /** + * Message store runtime information, which should generally contains various statistical information. + * @return runtime information of the message store in format of key-value pairs. + */ HashMap getRuntimeInfo(); + /** + * Get the maximum commit log offset. + * @return maximum commit log offset. + */ long getMaxPhyOffset(); + /** + * Get the minimum commit log offset. + * @return minimum commit log offset. + */ long getMinPhyOffset(); + /** + * Get the store time of the earliest message in the given queue. + * @param topic Topic of the messages to query. + * @param queueId Queue ID to find. + * @return store time of the earliest message. + */ long getEarliestMessageTime(final String topic, final int queueId); + /** + * Get the store time of the earliest message in this store. + * @return timestamp of the earliest message in this store. + */ long getEarliestMessageTime(); - long getMessageStoreTimeStamp(final String topic, final int queueId, final long offset); - + /** + * Get the store time of the message specified. + * @param topic message topic. + * @param queueId queue ID. + * @param consumeQueueOffset consume queue offset. + * @return store timestamp of the message. + */ + long getMessageStoreTimeStamp(final String topic, final int queueId, final long consumeQueueOffset); + + /** + * Get the total number of the messages in the specified queue. + * @param topic Topic + * @param queueId Queue ID. + * @return total number. + */ long getMessageTotalInQueue(final String topic, final int queueId); + /** + * Get the raw commit log data starting from the given offset, which should used for replication purpose. + * @param offset starting offset. + * @return commit log data. + */ SelectMappedBufferResult getCommitLogData(final long offset); + /** + * Append data to commit log. + * @param startOffset starting offset. + * @param data data to append. + * @return true if success; false otherwise. + */ boolean appendToCommitLog(final long startOffset, final byte[] data); - void excuteDeleteFilesManualy(); - - QueryMessageResult queryMessage(final String topic, final String key, final int maxNum, - final long begin, final long end); - + /** + * Execute file deletion manually. + */ + void executeDeleteFilesManually(); + + /** + * Query messages by given key. + * @param topic topic of the message. + * @param key message key. + * @param maxNum maximum number of the messages possible. + * @param begin begin timestamp. + * @param end end timestamp. + * @return + */ + QueryMessageResult queryMessage(final String topic, final String key, final int maxNum, final long begin, + final long end); + + /** + * Update HA master address. + * @param newAddr new address. + */ void updateHaMasterAddress(final String newAddr); + /** + * Return how much the slave falls behind. + * @return number of bytes that slave falls behind. + */ long slaveFallBehindMuch(); + /** + * Return the current timestamp of the store. + * @return current time in milliseconds since 1970-01-01. + */ long now(); + /** + * Clean unused topics. + * @param topics all valid topics. + * @return number of the topics deleted. + */ int cleanUnusedTopic(final Set topics); + /** + * Clean expired consume queues. + */ void cleanExpiredConsumerQueue(); + /** + * Check if the given message has been swapped out of the memory. + * @param topic topic. + * @param queueId queue ID. + * @param consumeOffset consume queue offset. + * @return true if the message is no longer in memory; false otherwise. + */ boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset); + /** + * Get number of the bytes that have been stored in commit log and not yet dispatched to consume queue. + * @return number of the bytes to dispatch. + */ long dispatchBehindBytes(); + /** + * Flush the message store to persist all data. + * @return maximum offset flushed to persistent storage device. + */ long flush(); + /** + * Reset written offset. + * @param phyOffset new offset. + * @return true if success; false otherwise. + */ boolean resetWriteOffset(long phyOffset); + /** + * Get confirm offset. + * @return confirm offset. + */ long getConfirmOffset(); + /** + * Set confirm offset. + * @param phyOffset confirm offset to set. + */ void setConfirmOffset(long phyOffset); + /** + * Check if the operation system page cache is busy or not. + * @return true if the OS page cache is busy; false otherwise. + */ boolean isOSPageCacheBusy(); + /** + * Get lock time in milliseconds of the store by far. + * @return lock time in milliseconds. + */ long lockTimeMills(); + /** + * Check if the transient store pool is deficient. + * @return true if the transient store pool is running out; false otherwise. + */ boolean isTransientStorePoolDeficient(); + /** + * Get the dispatcher list. + * @return list of the dispatcher. + */ LinkedList getDispatcherList(); + /** + * Get consume queue of the topic/queue. + * @param topic Topic. + * @param queueId Queue ID. + * @return Consume queue. + */ ConsumeQueue getConsumeQueue(String topic, int queueId); } diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index d45b994a3b0a594420e33c17ec1c77bf6070e916..501876ed296b5128b25693ae4c7bfedd6deca116 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -79,7 +79,7 @@ public class ScheduleMessageService extends ConfigManager { Entry next = it.next(); int queueId = delayLevel2QueueId(next.getKey()); long delayOffset = next.getValue(); - long maxOffset = this.defaultMessageStore.getMaxOffsetInQuque(SCHEDULE_TOPIC, queueId); + long maxOffset = this.defaultMessageStore.getMaxOffsetInQueue(SCHEDULE_TOPIC, queueId); String value = String.format("%d,%d", delayOffset, maxOffset); String key = String.format("%s_%d", RunningStats.scheduleMessageOffset.name(), next.getKey()); stats.put(key, value);