提交 482def9e 编写于 作者: Z Zhanhui Li 提交者: dongeforever

Add javadoc to message store.

上级 06416b00
......@@ -149,7 +149,7 @@ public class Broker2Client {
long timeStampOffset;
if (timeStamp == -1) {
timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
} else {
timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
}
......
......@@ -98,7 +98,7 @@ public class PullRequestHoldService extends ServiceThread {
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
......@@ -124,7 +124,7 @@ public class PullRequestHoldService extends ServiceThread {
for (PullRequest request : requestList) {
long newestOffset = maxOffset;
if (newestOffset <= request.getPullFromThisOffset()) {
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
}
if (newestOffset > request.getPullFromThisOffset()) {
......
......@@ -73,7 +73,7 @@ public class ConsumerOffsetManager extends ConfigManager {
while (it.hasNext() && result) {
Entry<Integer, Long> next = it.next();
long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, next.getKey());
long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, next.getKey());
long offsetInPersist = next.getValue();
result = offsetInPersist <= minOffsetInStore;
}
......@@ -201,7 +201,7 @@ public class ConsumerOffsetManager extends ConfigManager {
String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR);
if (topic.equals(topicGroupArr[0])) {
for (Entry<Integer, Long> entry : offSetEntry.getValue().entrySet()) {
long minOffset = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, entry.getKey());
long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, entry.getKey());
if (entry.getValue() >= minOffset) {
Long offset = queueMinOffset.get(entry.getKey());
if (offset == null) {
......
......@@ -92,18 +92,18 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
}
@Override
public long getMaxOffsetInQuque(String topic, int queueId) {
return next.getMaxOffsetInQuque(topic, queueId);
public long getMaxOffsetInQueue(String topic, int queueId) {
return next.getMaxOffsetInQueue(topic, queueId);
}
@Override
public long getMinOffsetInQuque(String topic, int queueId) {
return next.getMinOffsetInQuque(topic, queueId);
public long getMinOffsetInQueue(String topic, int queueId) {
return next.getMinOffsetInQueue(topic, queueId);
}
@Override
public long getCommitLogOffsetInQueue(String topic, int queueId, long cqOffset) {
return next.getCommitLogOffsetInQueue(topic, queueId, cqOffset);
public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) {
return next.getCommitLogOffsetInQueue(topic, queueId, consumeQueueOffset);
}
@Override
......@@ -152,8 +152,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
}
@Override
public long getMessageStoreTimeStamp(String topic, int queueId, long offset) {
return next.getMessageStoreTimeStamp(topic, queueId, offset);
public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) {
return next.getMessageStoreTimeStamp(topic, queueId, consumeQueueOffset);
}
@Override
......@@ -172,8 +172,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
}
@Override
public void excuteDeleteFilesManualy() {
next.excuteDeleteFilesManualy();
public void executeDeleteFilesManually() {
next.executeDeleteFilesManually();
}
@Override
......
......@@ -376,7 +376,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
final GetMaxOffsetRequestHeader requestHeader =
(GetMaxOffsetRequestHeader) request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId());
long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId());
responseHeader.setOffset(offset);
......@@ -391,7 +391,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
final GetMinOffsetRequestHeader requestHeader =
(GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
long offset = this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId());
long offset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId());
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
......@@ -537,11 +537,11 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
mq.setQueueId(i);
TopicOffset topicOffset = new TopicOffset();
long min = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, i);
long min = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, i);
if (min < 0)
min = 0;
long max = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
long max = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
if (max < 0)
max = 0;
......@@ -679,7 +679,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
OffsetWrapper offsetWrapper = new OffsetWrapper();
long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
if (brokerOffset < 0)
brokerOffset = 0;
......@@ -862,7 +862,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
long minTime = this.brokerController.getMessageStore().getEarliestMessageTime(topic, i);
timeSpan.setMinTimeStamp(minTime);
long max = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
long max = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
long maxTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max - 1);
timeSpan.setMaxTimeStamp(maxTime);
......@@ -876,7 +876,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
timeSpan.setConsumeTimeStamp(consumeTime);
long maxBrokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(), i);
long maxBrokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), i);
if (consumerOffset < maxBrokerOffset) {
long nextTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset);
timeSpan.setDelayTime(System.currentTimeMillis() - nextTime);
......@@ -1126,7 +1126,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
mq.setQueueId(i);
OffsetWrapper offsetWrapper = new OffsetWrapper();
long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
if (brokerOffset < 0)
brokerOffset = 0;
long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(//
......
......@@ -135,7 +135,7 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
response.setRemark(null);
} else {
long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
if (minOffset <= 0
&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
......
......@@ -580,7 +580,7 @@ public class DefaultMessageStore implements MessageStore {
/**
*/
public long getMaxOffsetInQuque(String topic, int queueId) {
public long getMaxOffsetInQueue(String topic, int queueId) {
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
long offset = logic.getMaxOffsetInQueue();
......@@ -593,7 +593,7 @@ public class DefaultMessageStore implements MessageStore {
/**
*/
public long getMinOffsetInQuque(String topic, int queueId) {
public long getMinOffsetInQueue(String topic, int queueId) {
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
return logic.getMinOffsetInQueue();
......@@ -603,10 +603,10 @@ public class DefaultMessageStore implements MessageStore {
}
@Override
public long getCommitLogOffsetInQueue(String topic, int queueId, long cqOffset) {
public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) {
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(cqOffset);
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(consumeQueueOffset);
if (bufferConsumeQueue != null) {
try {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
......@@ -740,10 +740,10 @@ public class DefaultMessageStore implements MessageStore {
}
@Override
public long getMessageStoreTimeStamp(String topic, int queueId, long offset) {
public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) {
ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);
if (logicQueue != null) {
SelectMappedBufferResult result = logicQueue.getIndexBuffer(offset);
SelectMappedBufferResult result = logicQueue.getIndexBuffer(consumeQueueOffset);
if (result != null) {
try {
final long phyOffset = result.getByteBuffer().getLong();
......@@ -798,7 +798,7 @@ public class DefaultMessageStore implements MessageStore {
}
@Override
public void excuteDeleteFilesManualy() {
public void executeDeleteFilesManually() {
this.cleanCommitLogService.excuteDeleteFilesManualy();
}
......@@ -1434,7 +1434,7 @@ public class DefaultMessageStore implements MessageStore {
public void excuteDeleteFilesManualy() {
this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
DefaultMessageStore.log.info("excuteDeleteFilesManualy was invoked");
DefaultMessageStore.log.info("executeDeleteFilesManually was invoked");
}
public void run() {
......
......@@ -22,91 +22,304 @@ import java.util.Set;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
/**
* This class defines contracting interfaces to implement, allowing third-party vendor to use customized message store.
*/
public interface MessageStore {
/**
* Load previously stored messages.
* @return true if success; false otherwise.
*/
boolean load();
/**
* Launch this message store.
* @throws Exception if there is any error.
*/
void start() throws Exception;
/**
* Shutdown this message store.
*/
void shutdown();
/**
* Destroy this message store. Generally, all persistent files should be removed after invocation.
*/
void destroy();
/**
* Store a message into store.
* @param msg Message instance to store
* @return result of store operation.
*/
PutMessageResult putMessage(final MessageExtBrokerInner msg);
/**
* Store a batch of messages.
* @param messageExtBatch Message batch.
* @return result of storing batch messages.
*/
PutMessageResult putMessages(final MessageExtBatch messageExtBatch);
/**
* Query at most <code>maxMsgNums</code> messages belonging to <code>topic</code> at <code>queueId</code> starting
* from given <code>offset</code>. Resulting messages will further be screened using provided message filter.
*
* @param group Consumer group that launches this query.
* @param topic Topic to query.
* @param queueId Queue ID to query.
* @param offset Logical offset to start from.
* @param maxMsgNums Maximum count of messages to query.
* @param messageFilter Message filter used to screen desired messages.
* @return Matched messages.
*/
GetMessageResult getMessage(final String group, final String topic, final int queueId,
final long offset, final int maxMsgNums, final MessageFilter messageFilter);
long getMaxOffsetInQuque(final String topic, final int queueId);
long getMinOffsetInQuque(final String topic, final int queueId);
long getCommitLogOffsetInQueue(final String topic, final int queueId, final long cqOffset);
/**
* Get maximum offset of the topic queue.
* @param topic Topic name.
* @param queueId Queue ID.
* @return Maximum offset at present.
*/
long getMaxOffsetInQueue(final String topic, final int queueId);
/**
* Get the minimum offset of the topic queue.
* @param topic Topic name.
* @param queueId Queue ID.
* @return Minimum offset at present.
*/
long getMinOffsetInQueue(final String topic, final int queueId);
/**
* Get the offset of the message in the commit log, which is also known as physical offset.
* @param topic Topic of the message to lookup.
* @param queueId Queue ID.
* @param consumeQueueOffset offset of consume queue.
* @return physical offset.
*/
long getCommitLogOffsetInQueue(final String topic, final int queueId, final long consumeQueueOffset);
/**
* Look up the physical offset of the message whose store timestamp is as specified.
* @param topic Topic of the message.
* @param queueId Queue ID.
* @param timestamp Timestamp to look up.
* @return physical offset which matches.
*/
long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp);
/**
* Look up the message by given commit log offset.
* @param commitLogOffset physical offset.
* @return Message whose physical offset is as specified.
*/
MessageExt lookMessageByOffset(final long commitLogOffset);
/**
* Get one message from the specified commit log offset.
* @param commitLogOffset commit log offset.
* @return wrapped result of the message.
*/
SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset);
/**
* Get one message from the specified commit log offset.
* @param commitLogOffset commit log offset.
* @param msgSize message size.
* @return wrapped result of the message.
*/
SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset, final int msgSize);
/**
* Get the running information of this store.
* @return message store running info.
*/
String getRunningDataInfo();
/**
* Message store runtime information, which should generally contains various statistical information.
* @return runtime information of the message store in format of key-value pairs.
*/
HashMap<String, String> getRuntimeInfo();
/**
* Get the maximum commit log offset.
* @return maximum commit log offset.
*/
long getMaxPhyOffset();
/**
* Get the minimum commit log offset.
* @return minimum commit log offset.
*/
long getMinPhyOffset();
/**
* Get the store time of the earliest message in the given queue.
* @param topic Topic of the messages to query.
* @param queueId Queue ID to find.
* @return store time of the earliest message.
*/
long getEarliestMessageTime(final String topic, final int queueId);
/**
* Get the store time of the earliest message in this store.
* @return timestamp of the earliest message in this store.
*/
long getEarliestMessageTime();
long getMessageStoreTimeStamp(final String topic, final int queueId, final long offset);
/**
* Get the store time of the message specified.
* @param topic message topic.
* @param queueId queue ID.
* @param consumeQueueOffset consume queue offset.
* @return store timestamp of the message.
*/
long getMessageStoreTimeStamp(final String topic, final int queueId, final long consumeQueueOffset);
/**
* Get the total number of the messages in the specified queue.
* @param topic Topic
* @param queueId Queue ID.
* @return total number.
*/
long getMessageTotalInQueue(final String topic, final int queueId);
/**
* Get the raw commit log data starting from the given offset, which should used for replication purpose.
* @param offset starting offset.
* @return commit log data.
*/
SelectMappedBufferResult getCommitLogData(final long offset);
/**
* Append data to commit log.
* @param startOffset starting offset.
* @param data data to append.
* @return true if success; false otherwise.
*/
boolean appendToCommitLog(final long startOffset, final byte[] data);
void excuteDeleteFilesManualy();
QueryMessageResult queryMessage(final String topic, final String key, final int maxNum,
final long begin, final long end);
/**
* Execute file deletion manually.
*/
void executeDeleteFilesManually();
/**
* Query messages by given key.
* @param topic topic of the message.
* @param key message key.
* @param maxNum maximum number of the messages possible.
* @param begin begin timestamp.
* @param end end timestamp.
* @return
*/
QueryMessageResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
final long end);
/**
* Update HA master address.
* @param newAddr new address.
*/
void updateHaMasterAddress(final String newAddr);
/**
* Return how much the slave falls behind.
* @return number of bytes that slave falls behind.
*/
long slaveFallBehindMuch();
/**
* Return the current timestamp of the store.
* @return current time in milliseconds since 1970-01-01.
*/
long now();
/**
* Clean unused topics.
* @param topics all valid topics.
* @return number of the topics deleted.
*/
int cleanUnusedTopic(final Set<String> topics);
/**
* Clean expired consume queues.
*/
void cleanExpiredConsumerQueue();
/**
* Check if the given message has been swapped out of the memory.
* @param topic topic.
* @param queueId queue ID.
* @param consumeOffset consume queue offset.
* @return true if the message is no longer in memory; false otherwise.
*/
boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset);
/**
* Get number of the bytes that have been stored in commit log and not yet dispatched to consume queue.
* @return number of the bytes to dispatch.
*/
long dispatchBehindBytes();
/**
* Flush the message store to persist all data.
* @return maximum offset flushed to persistent storage device.
*/
long flush();
/**
* Reset written offset.
* @param phyOffset new offset.
* @return true if success; false otherwise.
*/
boolean resetWriteOffset(long phyOffset);
/**
* Get confirm offset.
* @return confirm offset.
*/
long getConfirmOffset();
/**
* Set confirm offset.
* @param phyOffset confirm offset to set.
*/
void setConfirmOffset(long phyOffset);
/**
* Check if the operation system page cache is busy or not.
* @return true if the OS page cache is busy; false otherwise.
*/
boolean isOSPageCacheBusy();
/**
* Get lock time in milliseconds of the store by far.
* @return lock time in milliseconds.
*/
long lockTimeMills();
/**
* Check if the transient store pool is deficient.
* @return true if the transient store pool is running out; false otherwise.
*/
boolean isTransientStorePoolDeficient();
/**
* Get the dispatcher list.
* @return list of the dispatcher.
*/
LinkedList<CommitLogDispatcher> getDispatcherList();
/**
* Get consume queue of the topic/queue.
* @param topic Topic.
* @param queueId Queue ID.
* @return Consume queue.
*/
ConsumeQueue getConsumeQueue(String topic, int queueId);
}
......@@ -79,7 +79,7 @@ public class ScheduleMessageService extends ConfigManager {
Entry<Integer, Long> next = it.next();
int queueId = delayLevel2QueueId(next.getKey());
long delayOffset = next.getValue();
long maxOffset = this.defaultMessageStore.getMaxOffsetInQuque(SCHEDULE_TOPIC, queueId);
long maxOffset = this.defaultMessageStore.getMaxOffsetInQueue(SCHEDULE_TOPIC, queueId);
String value = String.format("%d,%d", delayOffset, maxOffset);
String key = String.format("%s_%d", RunningStats.scheduleMessageOffset.name(), next.getKey());
stats.put(key, value);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册