From 8c372b4596d64f90e41e38534820baf655b3f1a1 Mon Sep 17 00:00:00 2001 From: qqeasonchen Date: Mon, 23 Sep 2019 21:30:56 +0800 Subject: [PATCH] optimize ReplyMessageProcessor --- .../processor/ReplyMessageProcessor.java | 146 ++++++++++-------- 1 file changed, 85 insertions(+), 61 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java index 23e1e9ac..265c5c12 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java @@ -23,6 +23,7 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; @@ -34,6 +35,8 @@ import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; @@ -43,6 +46,7 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.stats.BrokerStatsManager; public class ReplyMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); public ReplyMessageProcessor(final BrokerController brokerController) { super(brokerController); @@ -136,22 +140,20 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); - boolean pushOk = this.pushReplyMessage(ctx, requestHeader, msgInner, response); + PushReplyResult pushReplyResult = this.pushReplyMessage(ctx, requestHeader, msgInner); + this.handlePushReplyResult(pushReplyResult, response, responseHeader, queueIdInt); - if (pushOk && this.brokerController.getBrokerConfig().isStoreReplyMessageEnable()) { + if (this.brokerController.getBrokerConfig().isStoreReplyMessageEnable()) { PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); - this.handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); - } else { - responseHeader.setMsgId("0"); - responseHeader.setQueueId(0); - responseHeader.setQueueOffset(0L); + this.handlePutMessageResult(putMessageResult, request, msgInner, responseHeader, sendMessageContext, queueIdInt); } return response; } - private boolean pushReplyMessage(final ChannelHandlerContext ctx, final SendMessageRequestHeader requestHeader, - final Message msg, final RemotingCommand response) { + private PushReplyResult pushReplyMessage(final ChannelHandlerContext ctx, + final SendMessageRequestHeader requestHeader, + final Message msg) { ReplyMessageRequestHeader replyMessageRequestHeader = new ReplyMessageRequestHeader(); replyMessageRequestHeader.setBornHost(ctx.channel().remoteAddress().toString()); replyMessageRequestHeader.setStoreHost(this.getStoreHost().toString()); @@ -172,7 +174,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen request.setBody(msg.getBody()); String senderId = msg.getProperties().get(MessageConst.PROPERTY_MESSAGE_REPLY_TO); - boolean pushOk = false; + PushReplyResult pushReplyResult = new PushReplyResult(false); if (senderId != null) { Channel channel = this.brokerController.getProducerManager().findChannel(senderId); @@ -185,106 +187,102 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen assert pushResponse != null; switch (pushResponse.getCode()) { case ResponseCode.SUCCESS: { - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - pushOk = true; + pushReplyResult.setPushOk(true); break; } default: { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("push reply message to requester fail"); - log.warn("push reply message to <{}> return fail, remark: {}", senderId, response.getRemark()); + pushReplyResult.setPushOk(false); + pushReplyResult.setRemark("push reply message to " + senderId + "fail."); + log.warn("push reply message to <{}> return fail, response remark: {}", senderId, pushResponse.getRemark()); } } - } catch (InterruptedException e) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("push reply message to requester fail"); - log.warn("push reply message to <{}> fail. {}", senderId, channel, e); - } catch (RemotingException e) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("push reply message to requester fail"); + } catch (RemotingException | InterruptedException e) { + pushReplyResult.setPushOk(false); + pushReplyResult.setRemark("push reply message to " + senderId + "fail."); log.warn("push reply message to <{}> fail. {}", senderId, channel, e); } } else { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("channel of <" + senderId + "> not found"); - log.warn("push reply message fial, channel of <{}> not found.", senderId); + pushReplyResult.setPushOk(false); + pushReplyResult.setRemark("push reply message fail, channel of <" + senderId + "> not found."); + log.warn(pushReplyResult.getRemark()); } - return pushOk; + } else { + log.warn("REPLY_TO is null, can not reply message"); + pushReplyResult.setPushOk(false); + pushReplyResult.setRemark("reply message properties[" + MessageConst.PROPERTY_MESSAGE_REPLY_TO + "] is null"); + } + return pushReplyResult; + } + + private void handlePushReplyResult(PushReplyResult pushReplyResult, final RemotingCommand response, + final SendMessageResponseHeader responseHeader, int queueIdInt) { + + if (!pushReplyResult.isPushOk()) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(pushReplyResult.getRemark()); + } else { + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + //set to zore to avoid client decoding exception + responseHeader.setMsgId("0"); + responseHeader.setQueueId(queueIdInt); + responseHeader.setQueueOffset(0L); } - log.warn("REPLY_TO is null, can not reply message"); - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("REPLY_TO is null"); - return pushOk; } - private void handlePutMessageResult(PutMessageResult putMessageResult, final RemotingCommand response, + private void handlePutMessageResult(PutMessageResult putMessageResult, final RemotingCommand request, final MessageExt msg, final SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, - ChannelHandlerContext ctx, int queueIdInt) { if (putMessageResult == null) { - response.setRemark("push reply to requester success, but store putMessage return null"); + log.warn("process reply message, store putMessage return null"); return; } - boolean sendOK = false; + boolean putOk = false; switch (putMessageResult.getPutMessageStatus()) { // Success case PUT_OK: - sendOK = true; - response.setCode(ResponseCode.SUCCESS); - break; case FLUSH_DISK_TIMEOUT: - response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT); - sendOK = true; - break; case FLUSH_SLAVE_TIMEOUT: - response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT); - sendOK = true; - break; case SLAVE_NOT_AVAILABLE: - response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); - sendOK = true; + putOk = true; break; // Failed case CREATE_MAPEDFILE_FAILED: -// response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("create mapped file failed, server is busy or broken."); + log.info("create mapped file failed, server is busy or broken."); break; case MESSAGE_ILLEGAL: + log.info( + "the message is illegal, maybe msg properties length limit 32k."); + break; case PROPERTIES_SIZE_EXCEEDED: -// response.setCode(ResponseCode.MESSAGE_ILLEGAL); - response.setRemark( - "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); + log.info( + "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k."); break; case SERVICE_NOT_AVAILABLE: -// response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); - response.setRemark( + log.info( "service not available now, maybe disk full, maybe your broker machine memory too small."); break; case OS_PAGECACHE_BUSY: -// response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while"); + log.info("[PC_SYNCHRONIZED]broker busy, start flow control for a while"); break; case UNKNOWN_ERROR: -// response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("UNKNOWN_ERROR"); + log.info("UNKNOWN_ERROR"); break; default: -// response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("UNKNOWN_ERROR DEFAULT"); + log.info("UNKNOWN_ERROR DEFAULT"); break; } String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); - if (sendOK) { + if (putOk) { this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes()); this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum()); - response.setRemark(null); + responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); responseHeader.setQueueId(queueIdInt); responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); @@ -315,4 +313,30 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen } } } + + class PushReplyResult { + boolean pushOk; + String remark; + + public PushReplyResult(boolean pushOk) { + this.pushOk = pushOk; + remark = ""; + } + + public boolean isPushOk() { + return pushOk; + } + + public void setPushOk(boolean pushOk) { + this.pushOk = pushOk; + } + + public String getRemark() { + return remark; + } + + public void setRemark(String remark) { + this.remark = remark; + } + } } -- GitLab