diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index ced7c2014f4a5d24b5832971142e686ebc404df0..95602206a8d2caee19fd72021098988d668d4da4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -19,8 +19,10 @@ package org.apache.rocketmq.broker.processor; import io.netty.channel.ChannelHandlerContext; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Random; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; @@ -288,9 +290,7 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc switch (request.getCode()) { case RequestCode.SEND_BATCH_MESSAGE: case RequestCode.SEND_MESSAGE_V2: - requestHeaderV2 = - (SendMessageRequestHeaderV2) request - .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); + requestHeaderV2 = decodeSendMessageHeaderV2(request); case RequestCode.SEND_MESSAGE: if (null == requestHeaderV2) { requestHeader = @@ -305,6 +305,73 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc return requestHeader; } + private SendMessageRequestHeaderV2 decodeSendMessageHeaderV2(RemotingCommand request) { + SendMessageRequestHeaderV2 r = new SendMessageRequestHeaderV2(); + HashMap fields = request.getExtFields(); + if (fields == null) { + // keep same behavior with CommandCustomHeader.decodeCommandCustomHeader + return r; + } + + String s = fields.get("a"); + Objects.requireNonNull(s, "the custom field is null"); + r.setA(s); + + s = fields.get("b"); + Objects.requireNonNull(s, "the custom field is null"); + r.setB(s); + + s = fields.get("c"); + Objects.requireNonNull(s, "the custom field is null"); + r.setC(s); + + s = fields.get("d"); + Objects.requireNonNull(s, "the custom field is null"); + r.setD(Integer.parseInt(s)); + + s = fields.get("e"); + Objects.requireNonNull(s, "the custom field is null"); + r.setE(Integer.parseInt(s)); + + s = fields.get("f"); + Objects.requireNonNull(s, "the custom field is null"); + r.setF(Integer.parseInt(s)); + + s = fields.get("g"); + Objects.requireNonNull(s, "the custom field is null"); + r.setG(Long.parseLong(s)); + + s = fields.get("h"); + Objects.requireNonNull(s, "the custom field is null"); + r.setH(Integer.parseInt(s)); + + s = fields.get("i"); + if (s != null) { + r.setI(s); + } + + s = fields.get("j"); + if (s != null) { + r.setJ(Integer.parseInt(s)); + } + + s = fields.get("k"); + if (s != null) { + r.setK(Boolean.parseBoolean(s)); + } + + s = fields.get("l"); + if (s != null) { + r.setL(Integer.parseInt(s)); + } + + s = fields.get("m"); + if (s != null) { + r.setM(Boolean.parseBoolean(s)); + } + return r; + } + public void executeSendMessageHookAfter(final RemotingCommand response, final SendMessageContext context) { if (hasSendMessageHook()) { for (SendMessageHook hook : this.sendMessageHookList) {