diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 7f704cc370f8c0e39ece3ec0977f9a7697732419..6388c06aec30623e652787c03cd34f49f2f90e3d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -16,18 +16,13 @@ */ package org.apache.rocketmq.broker.processor; -import com.google.common.collect.ImmutableList; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.FileRegion; -import java.nio.ByteBuffer; -import java.util.List; - import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; -import org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker; import org.apache.rocketmq.broker.filter.ConsumerFilterData; import org.apache.rocketmq.broker.filter.ConsumerFilterManager; import org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter; @@ -36,18 +31,14 @@ import org.apache.rocketmq.broker.longpolling.PullRequest; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer; -import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; -import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; -import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.help.FAQUrl; -import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.RequestCode; @@ -56,18 +47,19 @@ import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; -import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData; -import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState; import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent; import org.apache.rocketmq.common.rpc.RpcClientUtils; +import org.apache.rocketmq.common.rpc.RpcRequest; +import org.apache.rocketmq.common.rpc.RpcResponse; +import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; -import org.apache.rocketmq.common.rpc.RpcRequest; -import org.apache.rocketmq.common.rpc.RpcResponse; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.exception.RemotingCommandException; @@ -75,7 +67,6 @@ import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageFilter; @@ -83,6 +74,9 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.stats.BrokerStatsManager; +import java.nio.ByteBuffer; +import java.util.List; + import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { @@ -122,10 +116,13 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d cannot find mapping item in request process of current broker %s", topic, globalId, mappingDetail.getBname())); } - if (globalOffset < mappingItem.getStartOffset()) { - log.warn("{}-{} fetch offset {} smaller than the min mapping offset {}", topic, globalId, globalOffset, mappingItem.getStartOffset()); - return buildErrorResponse(ResponseCode.PULL_OFFSET_MOVED, String.format("%s-%d fetch offset {} smaller than the min mapping offset {} in broker %s", - topic, globalId, globalOffset, mappingItem.getStartOffset(), mappingDetail.getBname())); + //TODO Check if the leader? consider the order consumer, which will lock the mq + // + + if (globalOffset < mappingItem.getLogicOffset()) { + //handleOffsetMoved + //If the physical queue is reused, we should handle the PULL_OFFSET_MOVED independently + //Otherwise, we could just transfer it to the physical process } //below are physical info String bname = mappingItem.getBname(); @@ -139,7 +136,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements } if (mappingDetail.getBname().equals(bname)) { - //just let it go + //just let it go, do the local pull process return null; } @@ -185,13 +182,8 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements //handle min offset responseHeader.setMinOffset(mappingItem.computeStaticQueueOffsetUpToEnd(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset()))); //handle max offset - { - if (mappingItem.checkIfEndOffsetDecided()) { - responseHeader.setMaxOffset(Math.max(mappingItem.computeMaxStaticQueueOffset(), TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId()))); - } else { - responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getMaxOffset())); - } - } + responseHeader.setMaxOffset(Math.max(mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getMaxOffset()), + TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId()))); //set the offsetDelta responseHeader.setOffsetDelta(mappingItem.computeOffsetDelta()); } catch (Throwable t) { @@ -238,18 +230,17 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0; - String topic = requestHeader.getTopic(); - TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (null == topicConfig) { - log.error("the topic {} not exist, consumer: {}", topic, RemotingHelper.parseChannelRemoteAddr(channel)); + log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel)); response.setCode(ResponseCode.TOPIC_NOT_EXIST); - response.setRemark(String.format("topic[%s] not exist, apply first please! %s", topic, FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL))); + response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL))); return response; } if (!PermName.isReadable(topicConfig.getPerm())) { response.setCode(ResponseCode.NO_PERMISSION); - response.setRemark("the topic[" + topic + "] pulling message is forbidden"); + response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden"); return response; } @@ -262,10 +253,9 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements } } - int queueId = requestHeader.getQueueId(); - if (queueId < 0 || queueId >= topicConfig.getReadQueueNums()) { + if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) { String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", - queueId, topic, topicConfig.getReadQueueNums(), channel.remoteAddress()); + requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress()); log.warn(errorInfo); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorInfo); @@ -277,11 +267,11 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements if (hasSubscriptionFlag) { try { subscriptionData = FilterAPI.build( - topic, requestHeader.getSubscription(), requestHeader.getExpressionType() + requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType() ); if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) { consumerFilterData = ConsumerFilterManager.build( - topic, requestHeader.getConsumerGroup(), requestHeader.getSubscription(), + requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(), requestHeader.getExpressionType(), requestHeader.getSubVersion() ); assert consumerFilterData != null; @@ -310,9 +300,9 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements return response; } - subscriptionData = consumerGroupInfo.findSubscriptionData(topic); + subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic()); if (null == subscriptionData) { - log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), topic); + log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); return response; @@ -326,7 +316,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements return response; } if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) { - consumerFilterData = this.brokerController.getConsumerFilterManager().get(topic, + consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(), requestHeader.getConsumerGroup()); if (consumerFilterData == null) { response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST); @@ -335,7 +325,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements } if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) { log.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}", - requestHeader.getConsumerGroup(), topic, consumerFilterData.getClientVersion(), requestHeader.getSubVersion()); + requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion()); response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST); response.setRemark("the consumer's consumer filter data not latest"); return response; @@ -359,72 +349,12 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements this.brokerController.getConsumerFilterManager()); } - long offset = requestHeader.getQueueOffset(); - int maxMsgNums = requestHeader.getMaxMsgNums(); - - LogicalQueuesInfoInBroker logicalQueuesInfo = this.brokerController.getTopicConfigManager().selectLogicalQueuesInfo(topic); - LogicalQueueRouteData queueRouteData = null; - if (logicalQueuesInfo != null) { - int responseErrorCode = ResponseCode.SUCCESS; - queueRouteData = logicalQueuesInfo.queryQueueRouteDataByQueueId(queueId, offset); - if (queueRouteData != null) { - if (queueRouteData.isWriteOnly()) { - responseErrorCode = ResponseCode.PULL_NOT_FOUND; - response.setRemark("logical queue write only"); - } else if (queueRouteData.isExpired()) { - responseErrorCode = ResponseCode.PULL_RETRY_IMMEDIATELY; - response.setRemark("logical queue expired"); - prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData); - } else if (MessageQueueRouteState.ReadOnly.equals(queueRouteData.getState()) && queueRouteData.getOffsetMax() >= 0) { - if (offset >= queueRouteData.getOffsetMax()) { - responseErrorCode = ResponseCode.PULL_RETRY_IMMEDIATELY; - response.setRemark("queue offset exceed offsetMax"); - prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData); - } else if (offset + maxMsgNums > queueRouteData.getOffsetMax()) { - if ((queueRouteData.getOffsetMax() - 1 <= this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId)) && - (this.brokerController.getMessageStore().getCommitLogOffsetInQueue(topic, queueId, queueRouteData.getOffsetMax() - 1) < this.brokerController.getMessageStore().getMinPhyOffset())) { - responseErrorCode = ResponseCode.PULL_RETRY_IMMEDIATELY; - response.setRemark("queue offset removed"); - prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData); - } else { - maxMsgNums = (int) (queueRouteData.getOffsetMax() - offset); - if (maxMsgNums <= 0) { - responseErrorCode = ResponseCode.PULL_RETRY_IMMEDIATELY; - response.setRemark("queue offset out of range"); - prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData); - } - } - } - } - } else { - responseErrorCode = ResponseCode.PULL_RETRY_IMMEDIATELY; - response.setRemark("no suitable queue"); - response.addExtField(MessageConst.PROPERTY_REDIRECT, "1"); - // instruct client to refresh all - response.setBody(null); - queueRouteData = logicalQueuesInfo.queryQueueRouteDataByQueueId(queueId, 0L); - } - if (responseErrorCode != ResponseCode.SUCCESS) { - response.setCode(responseErrorCode); - responseHeader.setMinOffset(offset); - responseHeader.setMaxOffset(queueRouteData != null ? queueRouteData.getOffsetMax() : offset); - responseHeader.setNextBeginOffset(queueRouteData != null ? queueRouteData.getOffsetMax() : offset); - responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); - return response; - } - } - final GetMessageResult getMessageResult = - this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), topic, - queueId, offset, maxMsgNums, messageFilter); + this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), + requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter); if (getMessageResult != null) { response.setRemark(getMessageResult.getStatus().name()); - long nextBeginOffset = getMessageResult.getNextBeginOffset(); - if (queueRouteData != null && queueRouteData.getOffsetMax() >= 0 && nextBeginOffset > queueRouteData.getOffsetMax()) { - // prevent from pulling messages from next logical queue route data - nextBeginOffset = queueRouteData.getOffsetMax(); - } - responseHeader.setNextBeginOffset(nextBeginOffset); + responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); responseHeader.setMinOffset(getMessageResult.getMinOffset()); // this does not need to be modified since it's not an accurate value under logical queue. responseHeader.setMaxOffset(getMessageResult.getMaxOffset()); @@ -475,9 +405,9 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements // XXX: warn and notify me log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", requestHeader.getQueueOffset(), - nextBeginOffset, - topic, - queueId, + getMessageResult.getNextBeginOffset(), + requestHeader.getTopic(), + requestHeader.getQueueId(), requestHeader.getConsumerGroup() ); } else { @@ -502,7 +432,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements case OFFSET_TOO_SMALL: response.setCode(ResponseCode.PULL_OFFSET_MOVED); log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}", - requestHeader.getConsumerGroup(), topic, requestHeader.getQueueOffset(), + requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(), getMessageResult.getMinOffset(), channel.remoteAddress()); break; default: @@ -522,8 +452,8 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements if (this.hasConsumeMessageHook()) { ConsumeMessageContext context = new ConsumeMessageContext(); context.setConsumerGroup(requestHeader.getConsumerGroup()); - context.setTopic(topic); - context.setQueueId(queueId); + context.setTopic(requestHeader.getTopic()); + context.setQueueId(requestHeader.getQueueId()); String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); @@ -607,6 +537,9 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); } + String topic = requestHeader.getTopic(); + long offset = requestHeader.getQueueOffset(); + int queueId = requestHeader.getQueueId(); PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); @@ -614,24 +547,34 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements break; } - if (queueRouteData != null) { - logicalQueuesInfo.readLock().lock(); - try { - List queueRouteDataList = logicalQueuesInfo.get(queueRouteData.getLogicalQueueIndex()); - MessageQueue latestMessageQueue = queueRouteDataList.get(queueRouteDataList.size() - 1).getMessageQueue(); - if (!latestMessageQueue.getBrokerName().equals(brokerController.getBrokerConfig().getBrokerName()) || latestMessageQueue.getQueueId() != queueId) { - // There are other newer message queue, instruct client to refresh meta-data to access these - prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData); - } - } finally { - logicalQueuesInfo.readLock().unlock(); - } - } - case ResponseCode.PULL_RETRY_IMMEDIATELY: break; case ResponseCode.PULL_OFFSET_MOVED: - handleOffsetMoved(requestHeader, responseHeader, response, nextBeginOffset, subscriptionGroupConfig); + if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE + || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) { + MessageQueue mq = new MessageQueue(); + mq.setTopic(requestHeader.getTopic()); + mq.setQueueId(requestHeader.getQueueId()); + mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); + + OffsetMovedEvent event = new OffsetMovedEvent(); + event.setConsumerGroup(requestHeader.getConsumerGroup()); + event.setMessageQueue(mq); + event.setOffsetRequest(requestHeader.getQueueOffset()); + event.setOffsetNew(getMessageResult.getNextBeginOffset()); + this.generateOffsetMovedEvent(event); + log.warn( + "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}", + requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(), + responseHeader.getSuggestWhichBrokerId()); + } else { + responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); + response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); + log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}", + requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(), + responseHeader.getSuggestWhichBrokerId()); + } + break; default: assert false; @@ -647,49 +590,11 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; if (storeOffsetEnable) { this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), - requestHeader.getConsumerGroup(), topic, queueId, requestHeader.getCommitOffset()); + requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); } return response; } - public void handleOffsetMoved(PullMessageRequestHeader requestHeader, PullMessageResponseHeader responseHeader, RemotingCommand response, - long nextBeginOffset, - SubscriptionGroupConfig subscriptionGroupConfig) { - if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE - || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) { - MessageQueue mq = new MessageQueue(); - mq.setTopic(requestHeader.getTopic()); - mq.setQueueId(requestHeader.getQueueId()); - mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); - - OffsetMovedEvent event = new OffsetMovedEvent(); - event.setConsumerGroup(requestHeader.getConsumerGroup()); - event.setMessageQueue(mq); - event.setOffsetRequest(requestHeader.getQueueOffset()); - event.setOffsetNew(nextBeginOffset); - this.generateOffsetMovedEvent(event); - log.warn( - "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}", - requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(), - responseHeader.getSuggestWhichBrokerId()); - } else { - responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); - response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); - log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}", - requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(), - responseHeader.getSuggestWhichBrokerId()); - } - } - - private void prepareRedirectResponse(RemotingCommand response, LogicalQueuesInfoInBroker logicalQueuesInfo, - LogicalQueueRouteData queueRouteData) { - LogicalQueueRouteData nextReadableLogicalQueueRouteData = logicalQueuesInfo.nextAvailableLogicalRouteData(queueRouteData, LogicalQueueRouteData::isReadable); - if (nextReadableLogicalQueueRouteData != null) { - response.addExtField(MessageConst.PROPERTY_REDIRECT, "1"); - response.setBody(RemotingSerializable.encode(ImmutableList.of(queueRouteData, nextReadableLogicalQueueRouteData))); - } - } - public boolean hasConsumeMessageHook() { return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty(); } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java index 30d995270c94632bf32b86c07fa823bef7462b3b..d8875421b1dbc2926f60dac059eb9935b3d6b2c3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java @@ -26,6 +26,7 @@ public class PullResult { private final long maxOffset; private List msgFoundList; + public PullResult(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset, List msgFoundList) { super(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index d57532c573d83bf880868ee4014599e067634873..6f605f0de02702fe7407028fd627f00af6fec365 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -973,7 +973,7 @@ public class MQClientAPIImpl { (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class); return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(), - responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody()); + responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody(), responseHeader.getOffsetDelta()); } private PopResult processPopResponse(final String brokerName, final RemotingCommand response, String topic, diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java index 8cd4bab00536f504fb08d513121d6730ccdca76b..da9c8b30edafbe2e9e0dd50a48fac8d33a900651 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -107,6 +107,10 @@ public class PullAPIWrapper { MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET, Long.toString(pullResult.getMaxOffset())); msg.setBrokerName(mq.getBrokerName()); + msg.setQueueId(mq.getQueueId()); + if (pullResultExt.getOffsetDelta() != null) { + msg.setQueueOffset(pullResultExt.getOffsetDelta() + msg.getQueueOffset()); + } } pullResultExt.setMsgFoundList(msgListFilterAgain); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java index c34a68f9ab3c2df4286e7bafb34fc5865dfb4009..45538eda8d3ccabef1db0a5e5f66873a97ecec0c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java @@ -25,11 +25,23 @@ public class PullResultExt extends PullResult { private final long suggestWhichBrokerId; private byte[] messageBinary; + private final Long offsetDelta; + public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset, List msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) { + this(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList, suggestWhichBrokerId, messageBinary, 0L); + } + + public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset, + List msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary, final Long offsetDelta) { super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList); this.suggestWhichBrokerId = suggestWhichBrokerId; this.messageBinary = messageBinary; + this.offsetDelta = offsetDelta; + } + + public Long getOffsetDelta() { + return offsetDelta; } public byte[] getMessageBinary() { diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java index 7c3f0e5ae475b2313509cf6082c03f5de76e2255..feb8730f6b6eb11c971f3287a77f422de48107a6 100644 --- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java @@ -2,6 +2,9 @@ package org.apache.rocketmq.test.smoke; import org.apache.log4j.Logger; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageClientExt; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.rpc.ClientMetadata; @@ -9,8 +12,11 @@ import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.MQRandomUtils; +import org.apache.rocketmq.test.util.VerifyUtils; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.junit.After; import org.junit.Assert; @@ -19,11 +25,15 @@ import org.junit.FixMethodOrder; import org.junit.Test; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import static com.google.common.truth.Truth.assertThat; import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.getMappingDetailFromConfig; @FixMethodOrder @@ -63,11 +73,16 @@ public class StaticTopicIT extends BaseConf { } @Test - public void testCreateAndRemappingStaticTopic() throws Exception { + public void testCreateProduceConsumeStaticTopic() throws Exception { String topic = "static" + MQRandomUtils.getRandomTopic(); RMQNormalProducer producer = getProducer(nsAddr, topic); + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); + int queueNum = 10; + int msgEachQueue = 100; + //create static topic Map localBrokerConfigMap = createStaticTopic(topic, queueNum, getBrokers()); + //check the static topic config { Map remoteBrokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); Assert.assertEquals(2, remoteBrokerConfigMap.size()); @@ -82,6 +97,7 @@ public class StaticTopicIT extends BaseConf { Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true); Assert.assertEquals(queueNum, globalIdMap.size()); } + //check the route data List messageQueueList = producer.getMessageQueue(); Assert.assertEquals(queueNum, messageQueueList.size()); producer.setDebug(true); @@ -91,23 +107,45 @@ public class StaticTopicIT extends BaseConf { Assert.assertEquals(i, messageQueue.getQueueId()); Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName()); } + //send and consume the msg for(MessageQueue messageQueue: messageQueueList) { - producer.send(100, messageQueue); + producer.send(msgEachQueue, messageQueue); } //leave the time to build the cq Thread.sleep(500); for(MessageQueue messageQueue: messageQueueList) { Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue)); - Assert.assertEquals(100, defaultMQAdminExt.maxOffset(messageQueue)); + Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue)); } - Assert.assertEquals(100 * queueNum, producer.getAllOriginMsg().size()); + Assert.assertEquals(msgEachQueue * queueNum, producer.getAllOriginMsg().size()); Assert.assertEquals(0, producer.getSendErrorMsg().size()); - /*{ - Set targetBrokers = Collections.singleton(broker1Name); - Map brokerConfigMapFromRemote = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); - TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMapFromRemote, targetBrokers); - }*/ + consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListener().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + Map> messagesByQueue = new HashMap<>(); + for (Object object : consumer.getListener().getAllOriginMsg()) { + MessageExt messageExt = (MessageExt) object; + if (!messagesByQueue.containsKey(messageExt.getQueueId())) { + messagesByQueue.put(messageExt.getQueueId(), new ArrayList<>()); + } + messagesByQueue.get(messageExt.getQueueId()).add(messageExt); + } + Assert.assertEquals(queueNum, messagesByQueue.size()); + for (int i = 0; i < queueNum; i++) { + List messageExts = messagesByQueue.get(i); + Assert.assertEquals(msgEachQueue, messageExts.size()); + Collections.sort(messageExts, new Comparator() { + @Override + public int compare(MessageExt o1, MessageExt o2) { + return (int) (o1.getQueueOffset() - o2.getQueueOffset()); + } + }); + for (int j = 0; j < msgEachQueue; j++) { + Assert.assertEquals(j, messageExts.get(j).getQueueOffset()); + } + } }