提交 8c372b45 编写于 作者: Q qqeasonchen

optimize ReplyMessageProcessor

上级 28936fa1
...@@ -23,6 +23,7 @@ import org.apache.rocketmq.broker.BrokerController; ...@@ -23,6 +23,7 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
...@@ -34,6 +35,8 @@ import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader; ...@@ -34,6 +35,8 @@ import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
...@@ -43,6 +46,7 @@ import org.apache.rocketmq.store.PutMessageResult; ...@@ -43,6 +46,7 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
public class ReplyMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { public class ReplyMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
public ReplyMessageProcessor(final BrokerController brokerController) { public ReplyMessageProcessor(final BrokerController brokerController) {
super(brokerController); super(brokerController);
...@@ -136,22 +140,20 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen ...@@ -136,22 +140,20 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen
msgInner.setStoreHost(this.getStoreHost()); msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
boolean pushOk = this.pushReplyMessage(ctx, requestHeader, msgInner, response); PushReplyResult pushReplyResult = this.pushReplyMessage(ctx, requestHeader, msgInner);
this.handlePushReplyResult(pushReplyResult, response, responseHeader, queueIdInt);
if (pushOk && this.brokerController.getBrokerConfig().isStoreReplyMessageEnable()) { if (this.brokerController.getBrokerConfig().isStoreReplyMessageEnable()) {
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
this.handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); this.handlePutMessageResult(putMessageResult, request, msgInner, responseHeader, sendMessageContext, queueIdInt);
} else {
responseHeader.setMsgId("0");
responseHeader.setQueueId(0);
responseHeader.setQueueOffset(0L);
} }
return response; return response;
} }
private boolean pushReplyMessage(final ChannelHandlerContext ctx, final SendMessageRequestHeader requestHeader, private PushReplyResult pushReplyMessage(final ChannelHandlerContext ctx,
final Message msg, final RemotingCommand response) { final SendMessageRequestHeader requestHeader,
final Message msg) {
ReplyMessageRequestHeader replyMessageRequestHeader = new ReplyMessageRequestHeader(); ReplyMessageRequestHeader replyMessageRequestHeader = new ReplyMessageRequestHeader();
replyMessageRequestHeader.setBornHost(ctx.channel().remoteAddress().toString()); replyMessageRequestHeader.setBornHost(ctx.channel().remoteAddress().toString());
replyMessageRequestHeader.setStoreHost(this.getStoreHost().toString()); replyMessageRequestHeader.setStoreHost(this.getStoreHost().toString());
...@@ -172,7 +174,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen ...@@ -172,7 +174,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen
request.setBody(msg.getBody()); request.setBody(msg.getBody());
String senderId = msg.getProperties().get(MessageConst.PROPERTY_MESSAGE_REPLY_TO); String senderId = msg.getProperties().get(MessageConst.PROPERTY_MESSAGE_REPLY_TO);
boolean pushOk = false; PushReplyResult pushReplyResult = new PushReplyResult(false);
if (senderId != null) { if (senderId != null) {
Channel channel = this.brokerController.getProducerManager().findChannel(senderId); Channel channel = this.brokerController.getProducerManager().findChannel(senderId);
...@@ -185,106 +187,102 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen ...@@ -185,106 +187,102 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen
assert pushResponse != null; assert pushResponse != null;
switch (pushResponse.getCode()) { switch (pushResponse.getCode()) {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
response.setCode(ResponseCode.SUCCESS); pushReplyResult.setPushOk(true);
response.setRemark(null);
pushOk = true;
break; break;
} }
default: { default: {
response.setCode(ResponseCode.SYSTEM_ERROR); pushReplyResult.setPushOk(false);
response.setRemark("push reply message to requester fail"); pushReplyResult.setRemark("push reply message to " + senderId + "fail.");
log.warn("push reply message to <{}> return fail, remark: {}", senderId, response.getRemark()); log.warn("push reply message to <{}> return fail, response remark: {}", senderId, pushResponse.getRemark());
} }
} }
} catch (InterruptedException e) { } catch (RemotingException | InterruptedException e) {
response.setCode(ResponseCode.SYSTEM_ERROR); pushReplyResult.setPushOk(false);
response.setRemark("push reply message to requester fail"); pushReplyResult.setRemark("push reply message to " + senderId + "fail.");
log.warn("push reply message to <{}> fail. {}", senderId, channel, e);
} catch (RemotingException e) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("push reply message to requester fail");
log.warn("push reply message to <{}> fail. {}", senderId, channel, e); log.warn("push reply message to <{}> fail. {}", senderId, channel, e);
} }
} else { } else {
response.setCode(ResponseCode.SYSTEM_ERROR); pushReplyResult.setPushOk(false);
response.setRemark("channel of <" + senderId + "> not found"); pushReplyResult.setRemark("push reply message fail, channel of <" + senderId + "> not found.");
log.warn("push reply message fial, channel of <{}> not found.", senderId); log.warn(pushReplyResult.getRemark());
} }
return pushOk; } else {
log.warn("REPLY_TO is null, can not reply message");
pushReplyResult.setPushOk(false);
pushReplyResult.setRemark("reply message properties[" + MessageConst.PROPERTY_MESSAGE_REPLY_TO + "] is null");
}
return pushReplyResult;
}
private void handlePushReplyResult(PushReplyResult pushReplyResult, final RemotingCommand response,
final SendMessageResponseHeader responseHeader, int queueIdInt) {
if (!pushReplyResult.isPushOk()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(pushReplyResult.getRemark());
} else {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
//set to zore to avoid client decoding exception
responseHeader.setMsgId("0");
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(0L);
} }
log.warn("REPLY_TO is null, can not reply message");
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("REPLY_TO is null");
return pushOk;
} }
private void handlePutMessageResult(PutMessageResult putMessageResult, final RemotingCommand response, private void handlePutMessageResult(PutMessageResult putMessageResult,
final RemotingCommand request, final MessageExt msg, final RemotingCommand request, final MessageExt msg,
final SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, final SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext,
ChannelHandlerContext ctx,
int queueIdInt) { int queueIdInt) {
if (putMessageResult == null) { if (putMessageResult == null) {
response.setRemark("push reply to requester success, but store putMessage return null"); log.warn("process reply message, store putMessage return null");
return; return;
} }
boolean sendOK = false; boolean putOk = false;
switch (putMessageResult.getPutMessageStatus()) { switch (putMessageResult.getPutMessageStatus()) {
// Success // Success
case PUT_OK: case PUT_OK:
sendOK = true;
response.setCode(ResponseCode.SUCCESS);
break;
case FLUSH_DISK_TIMEOUT: case FLUSH_DISK_TIMEOUT:
response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
sendOK = true;
break;
case FLUSH_SLAVE_TIMEOUT: case FLUSH_SLAVE_TIMEOUT:
response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
sendOK = true;
break;
case SLAVE_NOT_AVAILABLE: case SLAVE_NOT_AVAILABLE:
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); putOk = true;
sendOK = true;
break; break;
// Failed // Failed
case CREATE_MAPEDFILE_FAILED: case CREATE_MAPEDFILE_FAILED:
// response.setCode(ResponseCode.SYSTEM_ERROR); log.info("create mapped file failed, server is busy or broken.");
response.setRemark("create mapped file failed, server is busy or broken.");
break; break;
case MESSAGE_ILLEGAL: case MESSAGE_ILLEGAL:
log.info(
"the message is illegal, maybe msg properties length limit 32k.");
break;
case PROPERTIES_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED:
// response.setCode(ResponseCode.MESSAGE_ILLEGAL); log.info(
response.setRemark( "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k.");
"the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
break; break;
case SERVICE_NOT_AVAILABLE: case SERVICE_NOT_AVAILABLE:
// response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); log.info(
response.setRemark(
"service not available now, maybe disk full, maybe your broker machine memory too small."); "service not available now, maybe disk full, maybe your broker machine memory too small.");
break; break;
case OS_PAGECACHE_BUSY: case OS_PAGECACHE_BUSY:
// response.setCode(ResponseCode.SYSTEM_ERROR); log.info("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
break; break;
case UNKNOWN_ERROR: case UNKNOWN_ERROR:
// response.setCode(ResponseCode.SYSTEM_ERROR); log.info("UNKNOWN_ERROR");
response.setRemark("UNKNOWN_ERROR");
break; break;
default: default:
// response.setCode(ResponseCode.SYSTEM_ERROR); log.info("UNKNOWN_ERROR DEFAULT");
response.setRemark("UNKNOWN_ERROR DEFAULT");
break; break;
} }
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
if (sendOK) { if (putOk) {
this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(), this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes()); putMessageResult.getAppendMessageResult().getWroteBytes());
this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum()); this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
response.setRemark(null);
responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
responseHeader.setQueueId(queueIdInt); responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
...@@ -315,4 +313,30 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen ...@@ -315,4 +313,30 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen
} }
} }
} }
class PushReplyResult {
boolean pushOk;
String remark;
public PushReplyResult(boolean pushOk) {
this.pushOk = pushOk;
remark = "";
}
public boolean isPushOk() {
return pushOk;
}
public void setPushOk(boolean pushOk) {
this.pushOk = pushOk;
}
public String getRemark() {
return remark;
}
public void setRemark(String remark) {
this.remark = remark;
}
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册