diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java index 77317a6ffe7cea3d3e9c4e1d27a13f15cbec04ab..59ea6c664230b9d62258cb71e03400356cc93246 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java @@ -20,6 +20,7 @@ import io.netty.channel.ChannelHandlerContext; import java.util.List; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; +import org.apache.rocketmq.common.TopicQueueMappingContext; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.protocol.RequestCode; @@ -109,6 +110,11 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen final UpdateConsumerOffsetRequestHeader requestHeader = (UpdateConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); + TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader); + RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext); + if (rewriteResult != null) { + return rewriteResult; + } this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); response.setCode(ResponseCode.SUCCESS); @@ -126,6 +132,12 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen (QueryConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class); + TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader); + RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext); + if (rewriteResult != null) { + return rewriteResult; + } + long offset = this.brokerController.getConsumerOffsetManager().queryOffset( requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); @@ -140,7 +152,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen requestHeader.getQueueId()); if (minOffset <= 0 && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset( - requestHeader.getTopic(), requestHeader.getQueueId(), 0)) { + requestHeader.getTopic(), requestHeader.getQueueId(), 0) + && mappingContext.checkIfAsPhysical()) { responseHeader.setOffset(0L); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 8d7758a778c143b602a607e2403d1df05c96abae..79869b982472cb6d4736afa550482cc585dd4e34 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -80,6 +80,8 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.stats.BrokerStatsManager; +import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; + public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; @@ -101,50 +103,26 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements } - private RemotingCommand buildErrorResponse(int code, String remark) { - final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); - response.setCode(code); - response.setRemark(remark); - return response; - } - - private TopicQueueMappingContext buildTopicQueueMappingContext(PullMessageRequestHeader requestHeader) { - if (requestHeader.getPhysical() != null - && Boolean.TRUE.equals(requestHeader.getPhysical())) { - return null; - } - TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic()); - if (mappingDetail == null) { - //it is not static topic - return null; - } - String topic = requestHeader.getTopic(); - Integer globalId = requestHeader.getQueueId(); - Long globalOffset = requestHeader.getQueueOffset(); - - LogicQueueMappingItem mappingItem = mappingDetail.findLogicQueueMappingItem(globalId, globalOffset); - return new TopicQueueMappingContext(topic, globalId, globalOffset, mappingDetail, mappingItem); - } private RemotingCommand rewriteRequestForStaticTopic(PullMessageRequestHeader requestHeader, TopicQueueMappingContext mappingContext) { try { - if (mappingContext == null) { + if (mappingContext.getMappingDetail() == null) { return null; } + TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); String topic = mappingContext.getTopic(); Integer globalId = mappingContext.getGlobalId(); Long globalOffset = mappingContext.getGlobalOffset(); LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); if (mappingItem == null) { - return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d cannot find mapping item in request process of current broker %s", - topic, globalId, this.brokerController.getBrokerConfig().getBrokerName())); + return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d cannot find mapping item in request process of current broker %s", topic, globalId, mappingDetail.getBname())); } if (globalOffset < mappingItem.getStartOffset()) { log.warn("{}-{} fetch offset {} smaller than the min mapping offset {}", topic, globalId, globalOffset, mappingItem.getStartOffset()); return buildErrorResponse(ResponseCode.PULL_OFFSET_MOVED, String.format("%s-%d fetch offset {} smaller than the min mapping offset {} in broker %s", - topic, globalId, globalOffset, mappingItem.getStartOffset(), this.brokerController.getBrokerConfig().getBrokerName())); + topic, globalId, globalOffset, mappingItem.getStartOffset(), mappingDetail.getBname())); } //below are physical info String bname = mappingItem.getBname(); @@ -157,7 +135,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums())); } - if (this.brokerController.getBrokerConfig().getBrokerName().equals(bname)) { + if (mappingDetail.getBname().equals(bname)) { //just let it go return null; } @@ -278,7 +256,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements return response; } - TopicQueueMappingContext mappingContext = buildTopicQueueMappingContext(requestHeader); + TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false, requestHeader.getQueueOffset()); { RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 52508a4c45290b49f99c1d592c1b8b9d1741ac9b..baa1024fea0d12197b0e5b6415319a7948d93ea0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; +import org.apache.rocketmq.common.LogicQueueMappingItem; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; @@ -53,6 +54,7 @@ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.remoting.TopicQueueRequestHeader; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.RemotingResponseCallback; @@ -63,6 +65,8 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.stats.BrokerStatsManager; +import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; + public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { private List consumeMessageHookList; @@ -99,8 +103,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement if (requestHeader == null) { return CompletableFuture.completedFuture(null); } - TopicQueueMappingContext mappingContext = buildTopicQueueMappingContext(requestHeader); - RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); + TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true, Long.MAX_VALUE); + RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext); if (rewriteResult != null) { return CompletableFuture.completedFuture(rewriteResult); } @@ -115,68 +119,27 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } - - private RemotingCommand buildErrorResponse(int code, String remark) { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - response.setCode(code); - response.setRemark(remark); - return response; - } - - private TopicQueueMappingContext buildTopicQueueMappingContext(SendMessageRequestHeader requestHeader) { - if (requestHeader.getPhysical() != null - && Boolean.TRUE.equals(requestHeader.getPhysical())) { - return null; - } - TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic()); - if (mappingDetail == null) { - //it is not static topic - return null; - } - return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, mappingDetail, null); - } /** * If the response is not null, it meets some errors - * @param requestHeader * @return */ - private RemotingCommand rewriteRequestForStaticTopic(SendMessageRequestHeader requestHeader, TopicQueueMappingContext mappingContext) { - try { - if (mappingContext == null) { - return null; - } - TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); - Integer phyQueueId = null; - //compatible with the old logic, but it fact, this should not happen - if (requestHeader.getQueueId() < 0) { - Iterator> it = mappingDetail.getCurrIdMap().entrySet().iterator(); - if (it.hasNext()) { - phyQueueId = it.next().getValue(); - } - } else { - phyQueueId = mappingDetail.getCurrIdMap().get(requestHeader.getQueueId()); - } - if (phyQueueId == null) { - return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName())); - } else { - requestHeader.setQueueId(phyQueueId); - return null; - } - } catch (Throwable t) { - return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); - } - } - private RemotingCommand rewriteResponseForStaticTopic(SendMessageRequestHeader requestHeader, SendMessageResponseHeader responseHeader, TopicQueueMappingContext mappingContext) { + + private RemotingCommand rewriteResponseForStaticTopic(SendMessageResponseHeader responseHeader, TopicQueueMappingContext mappingContext) { try { - if (mappingContext == null) { + if (mappingContext.getMappingDetail() == null) { return null; } TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); - long staticLogicOffset = mappingDetail.computeStaticQueueOffset(mappingContext.getGlobalId(), responseHeader.getQueueOffset()); + LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); + if (mappingItem == null) { + return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); + } + //no need to care the broker name + long staticLogicOffset = mappingItem.computeStaticQueueOffset(responseHeader.getQueueOffset()); if (staticLogicOffset < 0) { - return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName())); + return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); } responseHeader.setQueueId(mappingContext.getGlobalId()); responseHeader.setQueueOffset(staticLogicOffset); @@ -626,7 +589,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement responseHeader.setQueueId(queueIdInt); responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); - RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext); + RemotingCommand rewriteResult = rewriteResponseForStaticTopic(responseHeader, mappingContext); if (rewriteResult != null) { return rewriteResult; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index 2f5d558bac6172199f203ffdf543938715c78792..ae2e75d27b060810beeedf910c0bcc911b38c38f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -17,21 +17,32 @@ package org.apache.rocketmq.broker.topic; import com.alibaba.fastjson.JSON; +import com.google.common.collect.ImmutableList; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.DataVersion; +import org.apache.rocketmq.common.LogicQueueMappingItem; +import org.apache.rocketmq.common.TopicQueueMappingContext; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper; import org.apache.rocketmq.common.TopicQueueMappingDetail; +import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.TopicQueueRequestHeader; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; + public class TopicQueueMappingManager extends ConfigManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long LOCK_TIMEOUT_MILLIS = 3000; @@ -94,5 +105,75 @@ public class TopicQueueMappingManager extends ConfigManager { return dataVersion; } + public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader) { + return buildTopicQueueMappingContext(requestHeader, false, Long.MAX_VALUE); + } + + //Do not return a null context + public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader, boolean selectOneWhenMiss, Long globalOffset) { + if (requestHeader.getPhysical() != null + && Boolean.TRUE.equals(requestHeader.getPhysical())) { + return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null, null); + } + TopicQueueMappingDetail mappingDetail = getTopicQueueMapping(requestHeader.getTopic()); + if (mappingDetail == null) { + //it is not static topic + return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null, null); + } + //If not find mappingItem, it encounters some errors + Integer globalId = requestHeader.getQueueId(); + if (globalId < 0 && !selectOneWhenMiss) { + return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, null, null); + } + + if (globalId < 0) { + try { + if (!mappingDetail.getHostedQueues().isEmpty()) { + //do not check + globalId = mappingDetail.getHostedQueues().keySet().iterator().next(); + } + } catch (Throwable ignored) { + } + } + if (globalId < 0) { + return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, null, null); + } + + ImmutableList mappingItemList = null; + LogicQueueMappingItem mappingItem = null; + + if (globalOffset == null + || Long.MAX_VALUE == globalOffset) { + mappingItemList = mappingDetail.getMappingInfo(globalId); + if (mappingItemList != null + && mappingItemList.size() > 0) { + mappingItem = mappingItemList.get(mappingItemList.size() - 1); + } + } else { + mappingItemList = mappingDetail.getMappingInfo(globalId); + mappingItem = TopicQueueMappingDetail.findLogicQueueMappingItem(mappingItemList, globalOffset); + } + return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, mappingItemList, mappingItem); + } + + + public RemotingCommand rewriteRequestForStaticTopic(TopicQueueRequestHeader requestHeader, TopicQueueMappingContext mappingContext) { + try { + if (mappingContext.getMappingDetail() == null) { + return null; + } + TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); + LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); + if (mappingItem == null + || !mappingDetail.getBname().equals(mappingItem.getBname())) { + return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname())); + } + requestHeader.setQueueId(mappingItem.getQueueId()); + return null; + } catch (Throwable t) { + return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); + } + } + } diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java index 50ac43ec8ff9573dc1731cc7f19325ae3359ef18..ca759a1992e8a4c97e93fcdb5e104027f3fd363f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java @@ -16,21 +16,31 @@ */ package org.apache.rocketmq.common; +import com.google.common.collect.ImmutableList; + public class TopicQueueMappingContext { private String topic; private Integer globalId; private Long globalOffset; private TopicQueueMappingDetail mappingDetail; + private ImmutableList mappingItemList; private LogicQueueMappingItem mappingItem; - public TopicQueueMappingContext(String topic, Integer globalId, Long globalOffset, TopicQueueMappingDetail mappingDetail, LogicQueueMappingItem mappingItem) { + public TopicQueueMappingContext(String topic, Integer globalId, Long globalOffset, TopicQueueMappingDetail mappingDetail, ImmutableList mappingItemList, LogicQueueMappingItem mappingItem) { this.topic = topic; this.globalId = globalId; this.globalOffset = globalOffset; this.mappingDetail = mappingDetail; + this.mappingItemList = mappingItemList; this.mappingItem = mappingItem; } + public boolean checkIfAsPhysical() { + return mappingDetail == null + || mappingItemList == null + || (mappingItemList.size() == 1 && mappingItemList.get(0).getLogicOffset() == 0); + } + public String getTopic() { return topic; } @@ -63,6 +73,14 @@ public class TopicQueueMappingContext { this.mappingDetail = mappingDetail; } + public ImmutableList getMappingItemList() { + return mappingItemList; + } + + public void setMappingItemList(ImmutableList mappingItemList) { + this.mappingItemList = mappingItemList; + } + public LogicQueueMappingItem getMappingItem() { return mappingItem; } diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java index 8d5c8e8e9169f272e22bb59cb864fe3233c91c73..75f2c5218bfaf93cd55a5104e8479d62d0b6fa8d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap; public class TopicQueueMappingDetail extends TopicQueueMappingInfo { // the mapping info in current broker, do not register to nameserver + // make sure this value is not null private ConcurrentMap> hostedQueues = new ConcurrentHashMap>(); public TopicQueueMappingDetail(String topic, int totalQueues, String bname) { @@ -77,30 +78,14 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { return tmpIdMap; } - public List getMappingInfo(Integer globalId) { + public ImmutableList getMappingInfo(Integer globalId) { return hostedQueues.get(globalId); } - public long computeStaticQueueOffset(Integer globalId, long physicalLogicOffset) { - List mappingItems = getMappingInfo(globalId); - if (mappingItems == null - || mappingItems.isEmpty()) { - return -1; - } - if (bname.equals(mappingItems.get(mappingItems.size() - 1).getBname())) { - return mappingItems.get(mappingItems.size() - 1).computeStaticQueueOffset(physicalLogicOffset); - } - //Consider the "switch" process, reduce the error - if (mappingItems.size() >= 2 - && bname.equals(mappingItems.get(mappingItems.size() - 2).getBname())) { - return mappingItems.get(mappingItems.size() - 2).computeStaticQueueOffset(physicalLogicOffset); - } - return -1; - } - public LogicQueueMappingItem findLogicQueueMappingItem(Integer globalId, long logicOffset) { - List mappingItems = getMappingInfo(globalId); + + public static LogicQueueMappingItem findLogicQueueMappingItem(ImmutableList mappingItems, long logicOffset) { if (mappingItems == null || mappingItems.isEmpty()) { return null; @@ -143,4 +128,11 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { public ConcurrentMap> getHostedQueues() { return hostedQueues; } + + public boolean checkIfAsPhysical(Integer globalId) { + List mappingItems = getMappingInfo(globalId); + return mappingItems == null + || (mappingItems.size() == 1 + && mappingItems.get(0).getLogicOffset() == 0); + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java index 1bce01f5b02cd03490c734919bf3b8ef7658d161..5407964ad55d04f17a19c6831e66348250cf9517 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java @@ -20,12 +20,12 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.RequestHeader; +import org.apache.rocketmq.remoting.TopicQueueRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class PullMessageRequestHeader extends RequestHeader { +public class PullMessageRequestHeader extends TopicQueueRequestHeader { @CFNotNull private String consumerGroup; @CFNotNull @@ -60,18 +60,22 @@ public class PullMessageRequestHeader extends RequestHeader { this.consumerGroup = consumerGroup; } + @Override public String getTopic() { return topic; } + @Override public void setTopic(String topic) { this.topic = topic; } + @Override public Integer getQueueId() { return queueId; } + @Override public void setQueueId(Integer queueId) { this.queueId = queueId; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java index 3b7f627c35abaa744dd7a786360e6c7799848c05..e4e132eaab6af1906e07dbff5588df35e5e40b05 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.TopicQueueRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class QueryConsumerOffsetRequestHeader implements CommandCustomHeader { +public class QueryConsumerOffsetRequestHeader extends TopicQueueRequestHeader { @CFNotNull private String consumerGroup; @CFNotNull @@ -44,18 +44,22 @@ public class QueryConsumerOffsetRequestHeader implements CommandCustomHeader { this.consumerGroup = consumerGroup; } + @Override public String getTopic() { return topic; } + @Override public void setTopic(String topic) { this.topic = topic; } + @Override public Integer getQueueId() { return queueId; } + @Override public void setQueueId(Integer queueId) { this.queueId = queueId; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java index f9dcbff2e39295c894efcdeae767c62674826da5..0ec9795afb08d9897a3c4818ed4dd1003cc0dd5f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java @@ -20,12 +20,12 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.RequestHeader; +import org.apache.rocketmq.remoting.TopicQueueRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class SendMessageRequestHeader extends RequestHeader { +public class SendMessageRequestHeader extends TopicQueueRequestHeader { @CFNotNull private String producerGroup; @CFNotNull @@ -64,10 +64,12 @@ public class SendMessageRequestHeader extends RequestHeader { this.producerGroup = producerGroup; } + @Override public String getTopic() { return topic; } + @Override public void setTopic(String topic) { this.topic = topic; } @@ -88,10 +90,12 @@ public class SendMessageRequestHeader extends RequestHeader { this.defaultTopicQueueNums = defaultTopicQueueNums; } + @Override public Integer getQueueId() { return queueId; } + @Override public void setQueueId(Integer queueId) { this.queueId = queueId; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java index 3f44db645c5dd6d8e858a1796163167cf2e4cfa1..e17fe3611b8e3b9593206eaa1b5c1f6b3e19e0f9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.TopicQueueRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class UpdateConsumerOffsetRequestHeader implements CommandCustomHeader { +public class UpdateConsumerOffsetRequestHeader extends TopicQueueRequestHeader { @CFNotNull private String consumerGroup; @CFNotNull @@ -46,18 +46,22 @@ public class UpdateConsumerOffsetRequestHeader implements CommandCustomHeader { this.consumerGroup = consumerGroup; } + @Override public String getTopic() { return topic; } + @Override public void setTopic(String topic) { this.topic = topic; } + @Override public Integer getQueueId() { return queueId; } + @Override public void setQueueId(Integer queueId) { this.queueId = queueId; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/TopicQueueRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/TopicQueueRequestHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..08c3fefbc21e6a8344c858b67bb8862f1bb113a2 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/TopicQueueRequestHeader.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting; + +public abstract class TopicQueueRequestHeader extends RequestHeader { + public abstract String getTopic(); + public abstract void setTopic(String topic); + public abstract Integer getQueueId(); + public abstract void setQueueId(Integer queueId); + +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index 51b619491b5f0e853428253fd50f713dac0d2780..34a1790ffcd72f6107c5cde6da7c762f33427f31 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -110,6 +110,13 @@ public class RemotingCommand { return createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader); } + public static RemotingCommand buildErrorResponse(int code, String remark) { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(code); + response.setRemark(remark); + return response; + } + public static RemotingCommand createResponseCommand(int code, String remark, Class classHeader) { RemotingCommand cmd = new RemotingCommand();