提交 71af9c75 编写于 作者: D dongeforever

Finish the logic for SendProcessor

上级 1f897336
......@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.help.FAQUrl;
......@@ -96,6 +97,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
if (requestHeader == null) {
return CompletableFuture.completedFuture(null);
}
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader);
if (rewriteResult != null) {
return CompletableFuture.completedFuture(rewriteResult);
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
if (requestHeader.isBatch()) {
......@@ -106,6 +111,54 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
}
private RemotingCommand buildErrorResponse(int code, String remark) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(code);
response.setRemark(remark);
return response;
}
/**
* If the response is not null, it meets some errors
* @param requestHeader
* @return
*/
private RemotingCommand rewriteRequestForStaticTopic(SendMessageRequestHeader requestHeader) {
try {
TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
if (mappingDetail == null) {
return null;
}
if (!mappingDetail.getCurrIdMap().containsKey(requestHeader.getQueueId())) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
}
requestHeader.setQueueId(mappingDetail.getCurrIdMap().get(requestHeader.getQueueId()));
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
return null;
}
private RemotingCommand rewriteResponseForStaticTopic(String topic, SendMessageResponseHeader responseHeader) {
try {
TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);
if (mappingDetail == null) {
return null;
}
if (!mappingDetail.getCurrIdMap().containsKey(responseHeader.getQueueId())) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in current broker %s", topic, responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
}
long staticLogicOffset = mappingDetail.convertToLogicOffset(responseHeader.getQueueId(), responseHeader.getQueueOffset());
if (staticLogicOffset < 0) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", topic, responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
}
responseHeader.setQueueOffset(staticLogicOffset);
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
return null;
}
@Override
public boolean rejectRequest() {
return this.brokerController.getMessageStore().isOSPageCacheBusy() ||
......@@ -310,12 +363,6 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
}
LogicalQueueContext logicalQueueContext = super.buildLogicalQueueContext(msgInner.getTopic(), msgInner.getQueueId(), response);
CompletableFuture<RemotingCommand> future = logicalQueueContext.hookBeforePut(ctx, requestHeader, request, response);
if (future != null) {
return future;
}
CompletableFuture<PutMessageResult> putMessageResult = null;
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
......@@ -324,14 +371,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
logicalQueueContext.hookAfterPut(null);
return CompletableFuture.completedFuture(response);
}
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
logicalQueueContext.hookAfterPut(putMessageResult);
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
......@@ -549,6 +594,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
RemotingCommand rewriteResult = rewriteResponseForStaticTopic(msg.getTopic(), responseHeader);
if (rewriteResult != null) {
return rewriteResult;
}
doResponse(ctx, request, response);
if (hasSendMessageHook()) {
......@@ -623,16 +673,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName);
LogicalQueueContext logicalQueueContext = super.buildLogicalQueueContext(messageExtBatch.getTopic(), messageExtBatch.getQueueId(), response);
CompletableFuture<RemotingCommand> future = logicalQueueContext.hookBeforePut(ctx, requestHeader, request, response);
if (future != null) {
return future;
}
CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessages(messageExtBatch);
logicalQueueContext.hookAfterPut(putMessageResult);
return handlePutMessageResultFuture(putMessageResult, response, request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt);
}
......
......@@ -17,13 +17,9 @@
package org.apache.rocketmq.client.impl.producer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
......@@ -120,5 +116,4 @@ public class TopicPublishInfo {
public void setTopicRouteData(final TopicRouteData topicRouteData) {
this.topicRouteData = topicRouteData;
}
}
......@@ -18,6 +18,10 @@ public class LogicQueueMappingItem {
this.timeOfStart = timeOfStart;
}
public long convertToStaticLogicOffset(long physicalLogicOffset) {
return logicOffset + (physicalLogicOffset - startOffset);
}
public int getGen() {
return gen;
}
......
......@@ -81,6 +81,24 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
}
public long convertToLogicOffset(Integer globalId, long physicalLogicOffset) {
List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
if (mappingItems == null
|| mappingItems.isEmpty()) {
return -1;
}
if (bname.equals(mappingItems.get(mappingItems.size() - 1).getBname())) {
return mappingItems.get(mappingItems.size() - 1).convertToStaticLogicOffset(physicalLogicOffset);
}
//Consider the "switch" process, reduce the error
if (mappingItems.size() >= 2
&& bname.equals(mappingItems.get(mappingItems.size() - 2).getBname())) {
return mappingItems.get(mappingItems.size() - 2).convertToStaticLogicOffset(physicalLogicOffset);
}
return -1;
}
public TopicQueueMappingInfo cloneAsMappingInfo() {
TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname);
topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
......
......@@ -84,4 +84,6 @@ public class ResponseCode extends RemotingSysResponseCode {
public static final int POLLING_FULL = 209;
public static final int POLLING_TIMEOUT = 210;
public static final int NOT_LEADER_FOR_QUEUE = 501;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册