diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index e08dbbef64fcab4d6086d6f439a87456f57e6335..03fd66121b3937410457b65878bbb831b8f68e14 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -236,7 +236,7 @@ public class BrokerController { this.clientHousekeepingService = new ClientHousekeepingService(this); this.broker2Client = new Broker2Client(this); this.subscriptionGroupManager = new SubscriptionGroupManager(this); - this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig); + this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig, this); this.filterServerManager = new FilterServerManager(this); this.assignmentManager = new AssignmentManager(this); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index a77586467f956f276da85d3385652ae97ca68ada..7f3ce81d2c37c173404f21b0f22c53a8b6aa67d7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -23,8 +23,12 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; + +import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor; +import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; @@ -41,6 +45,8 @@ import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader; @@ -66,18 +72,20 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class BrokerOuterAPI { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final RemotingClient remotingClient; + private final BrokerController brokerController; private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr()); private String nameSrvAddr = null; private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue(32), new ThreadFactoryImpl("brokerOutApi_thread_", true)); - public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) { - this(nettyClientConfig, null); + public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, final BrokerController brokerController) { + this(nettyClientConfig, null, brokerController); } - public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) { + public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook, final BrokerController brokerController) { this.remotingClient = new NettyRemotingClient(nettyClientConfig); this.remotingClient.registerRPCHook(rpcHook); + this.brokerController = brokerController; } public void start() { @@ -454,4 +462,20 @@ public class BrokerOuterAPI { public void forwardRequest(String brokerAddr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException, RemotingTooMuchRequestException, RemotingConnectException { this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, invokeCallback); } + + public RemotingCommand pullMessage(String brokerName, PullMessageRequestHeader requestHeader, long timeoutMillis) throws Exception { + + String addr = this.brokerController.getBrokerAddrByName(brokerName); + if (addr == null) { + final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("%s-%d cannot find addr when forward to broker %s in broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), brokerName, this.brokerController.getBrokerConfig().getBrokerName())); + return response; + } + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); + RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); + assert response != null; + return response; + + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index e61ef11309e93ddb701bbe0adfc6e293cf354e9f..5ab3c01a10335c122759b94c5c35dc0ef98d68af 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -24,6 +24,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.FileRegion; import java.nio.ByteBuffer; import java.util.List; + import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker; @@ -35,9 +36,15 @@ import org.apache.rocketmq.broker.longpolling.PullRequest; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.impl.consumer.PullResultExt; +import org.apache.rocketmq.common.LogicQueueMappingItem; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; +import org.apache.rocketmq.common.TopicQueueMappingContext; +import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.filter.ExpressionType; @@ -49,6 +56,8 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData; @@ -95,6 +104,132 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements return false; } + + private RemotingCommand buildErrorResponse(int code, String remark) { + final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); + response.setCode(code); + response.setRemark(remark); + return response; + } + + private TopicQueueMappingContext buildTopicQueueMappingContext(PullMessageRequestHeader requestHeader) { + if (requestHeader.getPhysical() != null + && Boolean.TRUE.equals(requestHeader.getPhysical())) { + return null; + } + TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic()); + if (mappingDetail == null) { + //it is not static topic + return null; + } + String topic = requestHeader.getTopic(); + Integer globalId = requestHeader.getQueueId(); + Long globalOffset = requestHeader.getQueueOffset(); + + LogicQueueMappingItem mappingItem = mappingDetail.getLogicQueueMappingItem(globalId, globalOffset); + return new TopicQueueMappingContext(topic, globalId, globalOffset, mappingDetail, mappingItem); + } + + private RemotingCommand rewriteRequestForStaticTopic(PullMessageRequestHeader requestHeader, TopicQueueMappingContext mappingContext) { + try { + if (mappingContext == null) { + return null; + } + String topic = mappingContext.getTopic(); + Integer globalId = mappingContext.getGlobalId(); + Long globalOffset = mappingContext.getGlobalOffset(); + + LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); + if (mappingItem == null) { + return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d cannot find mapping item in request process of current broker %s", + topic, globalId, this.brokerController.getBrokerConfig().getBrokerName())); + } + + if (globalOffset < mappingItem.getStartOffset()) { + log.warn("{}-{} fetch offset {} smaller than the min mapping offset {}", topic, globalId, globalOffset, mappingItem.getStartOffset()); + return buildErrorResponse(ResponseCode.PULL_OFFSET_MOVED, String.format("%s-%d fetch offset {} smaller than the min mapping offset {} in broker %s", + topic, globalId, globalOffset, mappingItem.getStartOffset(), this.brokerController.getBrokerConfig().getBrokerName())); + } + //below are physical info + String bname = mappingItem.getBname(); + Integer phyQueueId = mappingItem.getQueueId(); + Long phyQueueOffset = mappingItem.convertToPhysicalQueueOffset(globalOffset); + requestHeader.setQueueId(phyQueueId); + requestHeader.setQueueOffset(phyQueueOffset); + if (mappingItem.isEndOffsetDecided() + && requestHeader.getMaxMsgNums() != null) { + requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums())); + } + + if (this.brokerController.getBrokerConfig().getBrokerName().equals(bname)) { + //just let it go + return null; + } + + requestHeader.setPhysical(true); + RemotingCommand response = this.brokerController.getBrokerOuterAPI().pullMessage(bname, requestHeader, this.brokerController.getBrokerConfig().getForwardTimeout()); + switch (response.getCode()) { + case ResponseCode.SYSTEM_ERROR: + return response; + case ResponseCode.SUCCESS: + case ResponseCode.PULL_NOT_FOUND: + case ResponseCode.PULL_RETRY_IMMEDIATELY: + case ResponseCode.PULL_OFFSET_MOVED: + break; + default: + throw new MQBrokerException(response.getCode(), response.getRemark(), mappingItem.getBname()); + } + PullMessageResponseHeader responseHeader = + (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class); + { + RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext); + if (rewriteResult != null) { + return rewriteResult; + } + } + return response; + } catch (Throwable t) { + return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); + } + } + + private RemotingCommand rewriteResponseForStaticTopic(PullMessageRequestHeader requestHeader, PullMessageResponseHeader responseHeader, TopicQueueMappingContext mappingContext) { + try { + if (mappingContext == null) { + return null; + } + TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); + LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); + //handle nextBeginOffset + { + long nextBeginOffset = responseHeader.getNextBeginOffset(); + assert nextBeginOffset >= requestHeader.getQueueOffset(); + //the next begin offset should no more than the end offset + if (mappingItem.isEndOffsetDecided() + && nextBeginOffset >= mappingItem.getEndOffset()) { + nextBeginOffset = mappingItem.getEndOffset(); + } + responseHeader.setNextBeginOffset(mappingItem.convertToStaticQueueOffset(nextBeginOffset)); + } + //handle min offset + responseHeader.setMinOffset(mappingItem.convertToStaticQueueOffset(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset()))); + //handle max offset + { + if (mappingItem.isEndOffsetDecided()) { + responseHeader.setMaxOffset(Math.max(mappingItem.convertToMaxStaticQueueOffset(), mappingDetail.getMaxOffsetFromMapping(mappingContext.getGlobalId()))); + } else { + responseHeader.setMaxOffset(mappingItem.convertToStaticQueueOffset(responseHeader.getMaxOffset())); + } + } + //set the offsetDelta + responseHeader.setOffsetDelta(mappingItem.convertOffsetDelta()); + } catch (Throwable t) { + return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); + } + return null; + } + + private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException { RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); @@ -147,6 +282,15 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements return response; } + TopicQueueMappingContext mappingContext = buildTopicQueueMappingContext(requestHeader); + + { + RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); + if (rewriteResult != null) { + return rewriteResult; + } + } + int queueId = requestHeader.getQueueId(); if (queueId < 0 || queueId >= topicConfig.getReadQueueNums()) { String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", @@ -395,6 +539,15 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements break; } + //rewrite the response for the + { + RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext); + if (rewriteResult != null) { + return rewriteResult; + } + } + + if (this.hasConsumeMessageHook()) { ConsumeMessageContext context = new ConsumeMessageContext(); context.setConsumerGroup(requestHeader.getConsumerGroup()); @@ -507,31 +660,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements case ResponseCode.PULL_RETRY_IMMEDIATELY: break; case ResponseCode.PULL_OFFSET_MOVED: - if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE - || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) { - MessageQueue mq = new MessageQueue(); - mq.setTopic(requestHeader.getTopic()); - mq.setQueueId(requestHeader.getQueueId()); - mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); - - OffsetMovedEvent event = new OffsetMovedEvent(); - event.setConsumerGroup(requestHeader.getConsumerGroup()); - event.setMessageQueue(mq); - event.setOffsetRequest(requestHeader.getQueueOffset()); - event.setOffsetNew(nextBeginOffset); - this.generateOffsetMovedEvent(event); - log.warn( - "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}", - requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(), - responseHeader.getSuggestWhichBrokerId()); - } else { - responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); - response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); - log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}", - requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(), - responseHeader.getSuggestWhichBrokerId()); - } - + handleOffsetMoved(requestHeader, responseHeader, response, nextBeginOffset, subscriptionGroupConfig); break; default: assert false; @@ -552,6 +681,35 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements return response; } + public void handleOffsetMoved(PullMessageRequestHeader requestHeader, PullMessageResponseHeader responseHeader, RemotingCommand response, + long nextBeginOffset, + SubscriptionGroupConfig subscriptionGroupConfig) { + if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE + || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) { + MessageQueue mq = new MessageQueue(); + mq.setTopic(requestHeader.getTopic()); + mq.setQueueId(requestHeader.getQueueId()); + mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); + + OffsetMovedEvent event = new OffsetMovedEvent(); + event.setConsumerGroup(requestHeader.getConsumerGroup()); + event.setMessageQueue(mq); + event.setOffsetRequest(requestHeader.getQueueOffset()); + event.setOffsetNew(nextBeginOffset); + this.generateOffsetMovedEvent(event); + log.warn( + "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}", + requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(), + responseHeader.getSuggestWhichBrokerId()); + } else { + responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); + response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); + log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}", + requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(), + responseHeader.getSuggestWhichBrokerId()); + } + } + private void prepareRedirectResponse(RemotingCommand response, LogicalQueuesInfoInBroker logicalQueuesInfo, LogicalQueueRouteData queueRouteData) { LogicalQueueRouteData nextReadableLogicalQueueRouteData = logicalQueuesInfo.nextAvailableLogicalRouteData(queueRouteData, LogicalQueueRouteData::isReadable); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 7b88c7528ed400ad1308a30e3e48b055363a4d61..62e282807b6b56b0ced0d32816c50b6c780a0c9b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -33,6 +33,7 @@ import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; +import org.apache.rocketmq.common.TopicQueueMappingContext; import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.PermName; @@ -98,38 +99,53 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement if (requestHeader == null) { return CompletableFuture.completedFuture(null); } - RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader); + TopicQueueMappingContext mappingContext = buildTopicQueueMappingContext(requestHeader); + RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); if (rewriteResult != null) { return CompletableFuture.completedFuture(rewriteResult); } mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); if (requestHeader.isBatch()) { - return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader); + return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader, mappingContext); } else { - return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader); + return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader, mappingContext); } } } + + private RemotingCommand buildErrorResponse(int code, String remark) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); response.setCode(code); response.setRemark(remark); return response; } + + private TopicQueueMappingContext buildTopicQueueMappingContext(SendMessageRequestHeader requestHeader) { + if (requestHeader.getPhysical() != null + && Boolean.TRUE.equals(requestHeader.getPhysical())) { + return null; + } + TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic()); + if (mappingDetail == null) { + //it is not static topic + return null; + } + return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, mappingDetail, null); + } /** * If the response is not null, it meets some errors * @param requestHeader * @return */ - private RemotingCommand rewriteRequestForStaticTopic(SendMessageRequestHeader requestHeader) { + private RemotingCommand rewriteRequestForStaticTopic(SendMessageRequestHeader requestHeader, TopicQueueMappingContext mappingContext) { try { - TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic()); - if (mappingDetail == null) { - //it is not static topic + if (mappingContext == null) { return null; } + TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); Integer phyQueueId = null; //compatible with the old logic, but it fact, this should not happen if (requestHeader.getQueueId() < 0) { @@ -151,22 +167,18 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } } - private RemotingCommand rewriteResponseForStaticTopic(String topic, SendMessageResponseHeader responseHeader) { + private RemotingCommand rewriteResponseForStaticTopic(SendMessageRequestHeader requestHeader, SendMessageResponseHeader responseHeader, TopicQueueMappingContext mappingContext) { try { - TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic); - if (mappingDetail == null) { + if (mappingContext == null) { return null; } - Integer globalId = mappingDetail.getCurrIdMapRevert().get(responseHeader.getQueueId()); - if (globalId == null) { - return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exist in response process of current broker %s", topic, responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName())); - } - long staticLogicOffset = mappingDetail.convertToLogicOffset(globalId, responseHeader.getQueueOffset()); - if (staticLogicOffset < 0) { - return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", topic, responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName())); + TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); + long staticLogicOffset = mappingDetail.convertToLogicOffset(mappingContext.getGlobalId(), responseHeader.getQueueOffset()); + if (staticLogicOffset < 0) { + return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName())); } - responseHeader.setQueueId(globalId); + responseHeader.setQueueId(mappingContext.getGlobalId()); responseHeader.setQueueOffset(staticLogicOffset); } catch (Throwable t) { return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); @@ -332,7 +344,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement private CompletableFuture asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext mqtraceContext, - SendMessageRequestHeader requestHeader) { + SendMessageRequestHeader requestHeader, + TopicQueueMappingContext mappingContext) { final RemotingCommand response = preSend(ctx, request, requestHeader); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); @@ -392,7 +405,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } else { putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); } - return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt); + return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt, requestHeader, mappingContext); } private CompletableFuture handlePutMessageResultFuture(CompletableFuture putMessageResult, @@ -402,9 +415,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx, - int queueIdInt) { + int queueIdInt, + SendMessageRequestHeader requestHeader, + TopicQueueMappingContext mappingContext) { return putMessageResult.thenApply((r) -> - handlePutMessageResult(r, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt) + handlePutMessageResult(r, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, requestHeader, mappingContext) ); } @@ -456,7 +471,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement private RemotingCommand sendMessage(final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, - final SendMessageRequestHeader requestHeader) throws RemotingCommandException { + final SendMessageRequestHeader requestHeader, + final TopicQueueMappingContext mappingContext) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); @@ -525,14 +541,15 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); } - return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); + return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, requestHeader, mappingContext); } private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response, RemotingCommand request, MessageExt msg, SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx, - int queueIdInt) { + int queueIdInt, SendMessageRequestHeader requestHeader, + TopicQueueMappingContext mappingContext) { if (putMessageResult == null) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("store putMessage return null"); @@ -609,7 +626,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement responseHeader.setQueueId(queueIdInt); responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); - RemotingCommand rewriteResult = rewriteResponseForStaticTopic(msg.getTopic(), responseHeader); + RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext); if (rewriteResult != null) { return rewriteResult; } @@ -647,7 +664,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement private CompletableFuture asyncSendBatchMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext mqtraceContext, - SendMessageRequestHeader requestHeader) { + SendMessageRequestHeader requestHeader, + TopicQueueMappingContext mappingContext) { final RemotingCommand response = preSend(ctx, request, requestHeader); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); @@ -689,7 +707,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName); CompletableFuture putMessageResult = this.brokerController.getMessageStore().asyncPutMessages(messageExtBatch); - return handlePutMessageResultFuture(putMessageResult, response, request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt); + return handlePutMessageResultFuture(putMessageResult, response, request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt, requestHeader, mappingContext); } diff --git a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java index 78a27dc57cf99d17d69fbcaa7c7d3dcef5949a76..0a9ee96fe01fc588dcbb20c5240fd103bff01032 100644 --- a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java @@ -7,6 +7,7 @@ public class LogicQueueMappingItem { private String bname; private long logicOffset; // the start of the logic offset private long startOffset; // the start of the physical offset + private long endOffset; // the end of the physical offset private long timeOfStart = -1; //mutable public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long timeOfStart) { @@ -18,8 +19,32 @@ public class LogicQueueMappingItem { this.timeOfStart = timeOfStart; } - public long convertToStaticLogicOffset(long physicalLogicOffset) { - return logicOffset + (physicalLogicOffset - startOffset); + public long convertToStaticQueueOffset(long physicalQueueOffset) { + return logicOffset + (physicalQueueOffset - startOffset); + } + + public long convertToPhysicalQueueOffset(long staticQueueOffset) { + return (staticQueueOffset - logicOffset) + startOffset; + } + + public long convertToMaxStaticQueueOffset() { + if (endOffset >= startOffset) { + return logicOffset + endOffset - startOffset; + } else { + return logicOffset; + } + } + public boolean isShouldDeleted() { + return endOffset == startOffset; + } + + public boolean isEndOffsetDecided() { + //if the endOffset == startOffset, then the item should be deleted + return endOffset > startOffset; + } + + public long convertOffsetDelta() { + return logicOffset - startOffset; } public int getGen() { @@ -55,4 +80,13 @@ public class LogicQueueMappingItem { public long getStartOffset() { return startOffset; } + + public long getEndOffset() { + return endOffset; + } + + + public void setEndOffset(long endOffset) { + this.endOffset = endOffset; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java new file mode 100644 index 0000000000000000000000000000000000000000..50ac43ec8ff9573dc1731cc7f19325ae3359ef18 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +public class TopicQueueMappingContext { + private String topic; + private Integer globalId; + private Long globalOffset; + private TopicQueueMappingDetail mappingDetail; + private LogicQueueMappingItem mappingItem; + + public TopicQueueMappingContext(String topic, Integer globalId, Long globalOffset, TopicQueueMappingDetail mappingDetail, LogicQueueMappingItem mappingItem) { + this.topic = topic; + this.globalId = globalId; + this.globalOffset = globalOffset; + this.mappingDetail = mappingDetail; + this.mappingItem = mappingItem; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public Integer getGlobalId() { + return globalId; + } + + public void setGlobalId(Integer globalId) { + this.globalId = globalId; + } + + public Long getGlobalOffset() { + return globalOffset; + } + + public void setGlobalOffset(Long globalOffset) { + this.globalOffset = globalOffset; + } + + public TopicQueueMappingDetail getMappingDetail() { + return mappingDetail; + } + + public void setMappingDetail(TopicQueueMappingDetail mappingDetail) { + this.mappingDetail = mappingDetail; + } + + public LogicQueueMappingItem getMappingItem() { + return mappingItem; + } + + public void setMappingItem(LogicQueueMappingItem mappingItem) { + this.mappingItem = mappingItem; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java index 0021310cd5abe82eeaab119224feb1377252aa1c..8c2aad9f659e6acea1452ce3f0d38ba599138949 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java @@ -27,9 +27,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { // the mapping info in current broker, do not register to nameserver ConcurrentMap> hostedQueues = new ConcurrentHashMap>(); - transient ConcurrentMap currIdMapRevert = new ConcurrentHashMap(); - - public TopicQueueMappingDetail(String topic, int totalQueues, String bname) { super(topic, totalQueues, bname); @@ -48,7 +45,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { public void buildIdMap() { this.currIdMap = buildIdMap(LEVEL_0); this.prevIdMap = buildIdMap(LEVEL_1); - this.currIdMapRevert = revert(this.currIdMap); } public ConcurrentMap revert(ConcurrentMap original) { @@ -103,16 +99,48 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { return -1; } if (bname.equals(mappingItems.get(mappingItems.size() - 1).getBname())) { - return mappingItems.get(mappingItems.size() - 1).convertToStaticLogicOffset(physicalLogicOffset); + return mappingItems.get(mappingItems.size() - 1).convertToStaticQueueOffset(physicalLogicOffset); } //Consider the "switch" process, reduce the error if (mappingItems.size() >= 2 && bname.equals(mappingItems.get(mappingItems.size() - 2).getBname())) { - return mappingItems.get(mappingItems.size() - 2).convertToStaticLogicOffset(physicalLogicOffset); + return mappingItems.get(mappingItems.size() - 2).convertToStaticQueueOffset(physicalLogicOffset); } return -1; } + public LogicQueueMappingItem getLogicQueueMappingItem(Integer globalId, long logicOffset) { + List mappingItems = getMappingInfo(globalId); + if (mappingItems == null + || mappingItems.isEmpty()) { + return null; + } + //Could use bi-search to polish performance + for (int i = mappingItems.size() - 1; i >= 0; i--) { + LogicQueueMappingItem item = mappingItems.get(i); + if (logicOffset >= item.getLogicOffset()) { + return item; + } + } + //if not found, maybe out of range, return the first one + for (int i = 0; i < mappingItems.size(); i++) { + if (!mappingItems.get(i).isShouldDeleted()) { + return mappingItems.get(i); + } + } + return null; + } + + public long getMaxOffsetFromMapping(Integer globalId) { + List mappingItems = getMappingInfo(globalId); + if (mappingItems == null + || mappingItems.isEmpty()) { + return -1; + } + LogicQueueMappingItem item = mappingItems.get(mappingItems.size() - 1); + return item.convertToMaxStaticQueueOffset(); + } + public TopicQueueMappingInfo cloneAsMappingInfo() { TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname); @@ -122,13 +150,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { return topicQueueMappingInfo; } - public ConcurrentMap getCurrIdMapRevert() { - return currIdMapRevert; - } - - public void setCurrIdMapRevert(ConcurrentMap currIdMapRevert) { - this.currIdMapRevert = currIdMapRevert; - } public int getTotalQueues() { return totalQueues; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java index 106e89e511c159e2b3a4aa5aa6d1bd2fe4e85487..1bce01f5b02cd03490c734919bf3b8ef7658d161 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java @@ -20,12 +20,12 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.RequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class PullMessageRequestHeader implements CommandCustomHeader { +public class PullMessageRequestHeader extends RequestHeader { @CFNotNull private String consumerGroup; @CFNotNull diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java index 0112f7da8d68ecc826f42046aa9dd4f7ca0f893a..88af984df816947e2d7fc77226da29887581480a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java @@ -22,6 +22,7 @@ package org.apache.rocketmq.common.protocol.header; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; public class PullMessageResponseHeader implements CommandCustomHeader { @@ -33,6 +34,8 @@ public class PullMessageResponseHeader implements CommandCustomHeader { private Long minOffset; @CFNotNull private Long maxOffset; + @CFNullable + private Long offsetDelta; @Override public void checkFields() throws RemotingCommandException { @@ -69,4 +72,12 @@ public class PullMessageResponseHeader implements CommandCustomHeader { public void setSuggestWhichBrokerId(Long suggestWhichBrokerId) { this.suggestWhichBrokerId = suggestWhichBrokerId; } + + public Long getOffsetDelta() { + return offsetDelta; + } + + public void setOffsetDelta(Long offsetDelta) { + this.offsetDelta = offsetDelta; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java index 2df31e6bb2a12b2791a8969f8b7ea8f2425aca41..f9dcbff2e39295c894efcdeae767c62674826da5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java @@ -20,12 +20,12 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.RequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class SendMessageRequestHeader implements CommandCustomHeader { +public class SendMessageRequestHeader extends RequestHeader { @CFNotNull private String producerGroup; @CFNotNull diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RequestHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..109fb9e03e0c17a3802571be41969d960f3666ca --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RequestHeader.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting; + +public abstract class RequestHeader implements CommandCustomHeader { + protected Boolean physical; + + public Boolean getPhysical() { + return physical; + } + + public void setPhysical(Boolean physical) { + this.physical = physical; + } +}