diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index 29ffc4bb76b94c2e7cd1f93d960cee667ed8d94c..2debf7904fa1432957f12458957689e3cee02ec9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -19,8 +19,10 @@ package org.apache.rocketmq.broker.processor; import io.netty.channel.ChannelHandlerContext; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Random; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; @@ -288,9 +290,7 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc switch (request.getCode()) { case RequestCode.SEND_BATCH_MESSAGE: case RequestCode.SEND_MESSAGE_V2: - requestHeaderV2 = - (SendMessageRequestHeaderV2) request - .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); + requestHeaderV2 = decodeSendMessageHeaderV2(request); case RequestCode.SEND_MESSAGE: if (null == requestHeaderV2) { requestHeader = @@ -305,6 +305,79 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc return requestHeader; } + static SendMessageRequestHeaderV2 decodeSendMessageHeaderV2(RemotingCommand request) + throws RemotingCommandException { + SendMessageRequestHeaderV2 r = new SendMessageRequestHeaderV2(); + HashMap fields = request.getExtFields(); + if (fields == null) { + throw new RemotingCommandException("the ext fields is null"); + } + + String s = fields.get("a"); + checkNotNull(s, "the custom field is null"); + r.setA(s); + + s = fields.get("b"); + checkNotNull(s, "the custom field is null"); + r.setB(s); + + s = fields.get("c"); + checkNotNull(s, "the custom field is null"); + r.setC(s); + + s = fields.get("d"); + checkNotNull(s, "the custom field is null"); + r.setD(Integer.parseInt(s)); + + s = fields.get("e"); + checkNotNull(s, "the custom field is null"); + r.setE(Integer.parseInt(s)); + + s = fields.get("f"); + checkNotNull(s, "the custom field is null"); + r.setF(Integer.parseInt(s)); + + s = fields.get("g"); + checkNotNull(s, "the custom field is null"); + r.setG(Long.parseLong(s)); + + s = fields.get("h"); + checkNotNull(s, "the custom field is null"); + r.setH(Integer.parseInt(s)); + + s = fields.get("i"); + if (s != null) { + r.setI(s); + } + + s = fields.get("j"); + if (s != null) { + r.setJ(Integer.parseInt(s)); + } + + s = fields.get("k"); + if (s != null) { + r.setK(Boolean.parseBoolean(s)); + } + + s = fields.get("l"); + if (s != null) { + r.setL(Integer.parseInt(s)); + } + + s = fields.get("m"); + if (s != null) { + r.setM(Boolean.parseBoolean(s)); + } + return r; + } + + private static void checkNotNull(String s, String msg) throws RemotingCommandException { + if (s == null) { + throw new RemotingCommandException(msg); + } + } + public void executeSendMessageHookAfter(final RemotingCommand response, final SendMessageContext context) { if (hasSendMessageHook()) { for (SendMessageHook hook : this.sendMessageHookList) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..da2611bd5469badd0a753aa22341171e319b35d3 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessorTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.processor; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.junit.Assert; +import org.junit.Test; + +public class AbstractSendMessageProcessorTest { + @Test + public void testDecodeSendMessageHeaderV2() throws Exception { + Field[] declaredFields = SendMessageRequestHeaderV2.class.getDeclaredFields(); + List declaredFieldsList = new ArrayList<>(); + for (Field f : declaredFields) { + if (f.getName().startsWith("$")) { + continue; + } + f.setAccessible(true); + declaredFieldsList.add(f); + } + RemotingCommand command = RemotingCommand.createRequestCommand(0, null); + HashMap m = buildExtFields(declaredFieldsList); + command.setExtFields(m); + check(command, declaredFieldsList); + } + + private HashMap buildExtFields(List fields) { + HashMap extFields = new HashMap<>(); + for (Field f: fields) { + Class c = f.getType(); + if (c.equals(String.class)) { + extFields.put(f.getName(), "str"); + } else if (c.equals(Integer.class) || c.equals(int.class)) { + extFields.put(f.getName(), "123"); + } else if (c.equals(Long.class) || c.equals(long.class)) { + extFields.put(f.getName(), "1234"); + } else if (c.equals(Boolean.class) || c.equals(boolean.class)) { + extFields.put(f.getName(), "true"); + } else { + throw new RuntimeException(f.getName() + ":" + f.getType().getName()); + } + } + return extFields; + } + + private void check(RemotingCommand command, List fields) throws Exception { + SendMessageRequestHeaderV2 o1 = (SendMessageRequestHeaderV2) command.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); + SendMessageRequestHeaderV2 o2 = AbstractSendMessageProcessor.decodeSendMessageHeaderV2(command); + for (Field f : fields) { + Object value1 = f.get(o1); + Object value2 = f.get(o2); + if (value1 == null) { + Assert.assertNull(value2); + } else { + Assert.assertEquals(value1, value2); + } + } + } + +}