diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index 95602206a8d2caee19fd72021098988d668d4da4..e18e8f39bb03246c679f55aa02b1be7974087a45 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -305,44 +305,44 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc return requestHeader; } - private SendMessageRequestHeaderV2 decodeSendMessageHeaderV2(RemotingCommand request) { + static SendMessageRequestHeaderV2 decodeSendMessageHeaderV2(RemotingCommand request) + throws RemotingCommandException { SendMessageRequestHeaderV2 r = new SendMessageRequestHeaderV2(); HashMap fields = request.getExtFields(); if (fields == null) { - // keep same behavior with CommandCustomHeader.decodeCommandCustomHeader - return r; + throw new RemotingCommandException("the ext fields is null"); } String s = fields.get("a"); - Objects.requireNonNull(s, "the custom field is null"); + checkNotNull(s, "the custom field is null"); r.setA(s); s = fields.get("b"); - Objects.requireNonNull(s, "the custom field is null"); + checkNotNull(s, "the custom field is null"); r.setB(s); s = fields.get("c"); - Objects.requireNonNull(s, "the custom field is null"); + checkNotNull(s, "the custom field is null"); r.setC(s); s = fields.get("d"); - Objects.requireNonNull(s, "the custom field is null"); + checkNotNull(s, "the custom field is null"); r.setD(Integer.parseInt(s)); s = fields.get("e"); - Objects.requireNonNull(s, "the custom field is null"); + checkNotNull(s, "the custom field is null"); r.setE(Integer.parseInt(s)); s = fields.get("f"); - Objects.requireNonNull(s, "the custom field is null"); + checkNotNull(s, "the custom field is null"); r.setF(Integer.parseInt(s)); s = fields.get("g"); - Objects.requireNonNull(s, "the custom field is null"); + checkNotNull(s, "the custom field is null"); r.setG(Long.parseLong(s)); s = fields.get("h"); - Objects.requireNonNull(s, "the custom field is null"); + checkNotNull(s, "the custom field is null"); r.setH(Integer.parseInt(s)); s = fields.get("i"); @@ -372,6 +372,12 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc return r; } + private static void checkNotNull(String s, String msg) throws RemotingCommandException { + if (s == null) { + throw new RemotingCommandException(msg); + } + } + public void executeSendMessageHookAfter(final RemotingCommand response, final SendMessageContext context) { if (hasSendMessageHook()) { for (SendMessageHook hook : this.sendMessageHookList) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..da2611bd5469badd0a753aa22341171e319b35d3 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessorTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.processor; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.junit.Assert; +import org.junit.Test; + +public class AbstractSendMessageProcessorTest { + @Test + public void testDecodeSendMessageHeaderV2() throws Exception { + Field[] declaredFields = SendMessageRequestHeaderV2.class.getDeclaredFields(); + List declaredFieldsList = new ArrayList<>(); + for (Field f : declaredFields) { + if (f.getName().startsWith("$")) { + continue; + } + f.setAccessible(true); + declaredFieldsList.add(f); + } + RemotingCommand command = RemotingCommand.createRequestCommand(0, null); + HashMap m = buildExtFields(declaredFieldsList); + command.setExtFields(m); + check(command, declaredFieldsList); + } + + private HashMap buildExtFields(List fields) { + HashMap extFields = new HashMap<>(); + for (Field f: fields) { + Class c = f.getType(); + if (c.equals(String.class)) { + extFields.put(f.getName(), "str"); + } else if (c.equals(Integer.class) || c.equals(int.class)) { + extFields.put(f.getName(), "123"); + } else if (c.equals(Long.class) || c.equals(long.class)) { + extFields.put(f.getName(), "1234"); + } else if (c.equals(Boolean.class) || c.equals(boolean.class)) { + extFields.put(f.getName(), "true"); + } else { + throw new RuntimeException(f.getName() + ":" + f.getType().getName()); + } + } + return extFields; + } + + private void check(RemotingCommand command, List fields) throws Exception { + SendMessageRequestHeaderV2 o1 = (SendMessageRequestHeaderV2) command.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); + SendMessageRequestHeaderV2 o2 = AbstractSendMessageProcessor.decodeSendMessageHeaderV2(command); + for (Field f : fields) { + Object value1 = f.get(o1); + Object value2 = f.get(o2); + if (value1 == null) { + Assert.assertNull(value2); + } else { + Assert.assertEquals(value1, value2); + } + } + } + +}