From 71af9c75cfdfc3a0a026b6524c45daea4b68cd65 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Wed, 10 Nov 2021 14:23:12 +0800 Subject: [PATCH] Finish the logic for SendProcessor --- .../processor/SendMessageProcessor.java | 75 ++++++++++++++----- .../impl/producer/TopicPublishInfo.java | 5 -- .../common/LogicQueueMappingItem.java | 4 + .../common/TopicQueueMappingDetail.java | 18 +++++ .../common/protocol/ResponseCode.java | 2 + 5 files changed, 82 insertions(+), 22 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 91463a49..9daebf43 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; +import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.help.FAQUrl; @@ -96,6 +97,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement if (requestHeader == null) { return CompletableFuture.completedFuture(null); } + RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader); + if (rewriteResult != null) { + return CompletableFuture.completedFuture(rewriteResult); + } mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); if (requestHeader.isBatch()) { @@ -106,6 +111,54 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } } + private RemotingCommand buildErrorResponse(int code, String remark) { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(code); + response.setRemark(remark); + return response; + } + /** + * If the response is not null, it meets some errors + * @param requestHeader + * @return + */ + private RemotingCommand rewriteRequestForStaticTopic(SendMessageRequestHeader requestHeader) { + try { + TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic()); + if (mappingDetail == null) { + return null; + } + if (!mappingDetail.getCurrIdMap().containsKey(requestHeader.getQueueId())) { + return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName())); + } + requestHeader.setQueueId(mappingDetail.getCurrIdMap().get(requestHeader.getQueueId())); + } catch (Throwable t) { + return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); + } + return null; + } + + private RemotingCommand rewriteResponseForStaticTopic(String topic, SendMessageResponseHeader responseHeader) { + try { + TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic); + if (mappingDetail == null) { + return null; + } + if (!mappingDetail.getCurrIdMap().containsKey(responseHeader.getQueueId())) { + return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in current broker %s", topic, responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName())); + } + long staticLogicOffset = mappingDetail.convertToLogicOffset(responseHeader.getQueueId(), responseHeader.getQueueOffset()); + if (staticLogicOffset < 0) { + return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", topic, responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName())); + + } + responseHeader.setQueueOffset(staticLogicOffset); + } catch (Throwable t) { + return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); + } + return null; + } + @Override public boolean rejectRequest() { return this.brokerController.getMessageStore().isOSPageCacheBusy() || @@ -310,12 +363,6 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); } - LogicalQueueContext logicalQueueContext = super.buildLogicalQueueContext(msgInner.getTopic(), msgInner.getQueueId(), response); - CompletableFuture future = logicalQueueContext.hookBeforePut(ctx, requestHeader, request, response); - if (future != null) { - return future; - } - CompletableFuture putMessageResult = null; String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (transFlag != null && Boolean.parseBoolean(transFlag)) { @@ -324,14 +371,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); - logicalQueueContext.hookAfterPut(null); return CompletableFuture.completedFuture(response); } putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); } else { putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); } - logicalQueueContext.hookAfterPut(putMessageResult); return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt); } @@ -549,6 +594,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement responseHeader.setQueueId(queueIdInt); responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); + RemotingCommand rewriteResult = rewriteResponseForStaticTopic(msg.getTopic(), responseHeader); + if (rewriteResult != null) { + return rewriteResult; + } + doResponse(ctx, request, response); if (hasSendMessageHook()) { @@ -623,16 +673,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName(); MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName); - LogicalQueueContext logicalQueueContext = super.buildLogicalQueueContext(messageExtBatch.getTopic(), messageExtBatch.getQueueId(), response); - CompletableFuture future = logicalQueueContext.hookBeforePut(ctx, requestHeader, request, response); - if (future != null) { - return future; - } - CompletableFuture putMessageResult = this.brokerController.getMessageStore().asyncPutMessages(messageExtBatch); - - logicalQueueContext.hookAfterPut(putMessageResult); - return handlePutMessageResultFuture(putMessageResult, response, request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java index 60974cca..2f8337ed 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java @@ -17,13 +17,9 @@ package org.apache.rocketmq.client.impl.producer; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; - import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; @@ -120,5 +116,4 @@ public class TopicPublishInfo { public void setTopicRouteData(final TopicRouteData topicRouteData) { this.topicRouteData = topicRouteData; } - } diff --git a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java index 50d88aeb..78a27dc5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java @@ -18,6 +18,10 @@ public class LogicQueueMappingItem { this.timeOfStart = timeOfStart; } + public long convertToStaticLogicOffset(long physicalLogicOffset) { + return logicOffset + (physicalLogicOffset - startOffset); + } + public int getGen() { return gen; } diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java index d3e3d92a..a181130f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java @@ -81,6 +81,24 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { } + public long convertToLogicOffset(Integer globalId, long physicalLogicOffset) { + List mappingItems = getMappingInfo(globalId); + if (mappingItems == null + || mappingItems.isEmpty()) { + return -1; + } + if (bname.equals(mappingItems.get(mappingItems.size() - 1).getBname())) { + return mappingItems.get(mappingItems.size() - 1).convertToStaticLogicOffset(physicalLogicOffset); + } + //Consider the "switch" process, reduce the error + if (mappingItems.size() >= 2 + && bname.equals(mappingItems.get(mappingItems.size() - 2).getBname())) { + return mappingItems.get(mappingItems.size() - 2).convertToStaticLogicOffset(physicalLogicOffset); + } + return -1; + } + + public TopicQueueMappingInfo cloneAsMappingInfo() { TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname); topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java index 42b9c4fa..3944c180 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java @@ -84,4 +84,6 @@ public class ResponseCode extends RemotingSysResponseCode { public static final int POLLING_FULL = 209; public static final int POLLING_TIMEOUT = 210; + + public static final int NOT_LEADER_FOR_QUEUE = 501; } -- GitLab