未验证 提交 3e014241 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #3134 from areyouok/dev_speed_G

[ISSUE 2883] [Part G] Optimise parse performance for SendMessageRequestHeaderV2
...@@ -19,8 +19,10 @@ package org.apache.rocketmq.broker.processor; ...@@ -19,8 +19,10 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Random; import java.util.Random;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
...@@ -288,9 +290,7 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc ...@@ -288,9 +290,7 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
switch (request.getCode()) { switch (request.getCode()) {
case RequestCode.SEND_BATCH_MESSAGE: case RequestCode.SEND_BATCH_MESSAGE:
case RequestCode.SEND_MESSAGE_V2: case RequestCode.SEND_MESSAGE_V2:
requestHeaderV2 = requestHeaderV2 = decodeSendMessageHeaderV2(request);
(SendMessageRequestHeaderV2) request
.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
case RequestCode.SEND_MESSAGE: case RequestCode.SEND_MESSAGE:
if (null == requestHeaderV2) { if (null == requestHeaderV2) {
requestHeader = requestHeader =
...@@ -305,6 +305,79 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc ...@@ -305,6 +305,79 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
return requestHeader; return requestHeader;
} }
static SendMessageRequestHeaderV2 decodeSendMessageHeaderV2(RemotingCommand request)
throws RemotingCommandException {
SendMessageRequestHeaderV2 r = new SendMessageRequestHeaderV2();
HashMap<String, String> fields = request.getExtFields();
if (fields == null) {
throw new RemotingCommandException("the ext fields is null");
}
String s = fields.get("a");
checkNotNull(s, "the custom field <a> is null");
r.setA(s);
s = fields.get("b");
checkNotNull(s, "the custom field <b> is null");
r.setB(s);
s = fields.get("c");
checkNotNull(s, "the custom field <c> is null");
r.setC(s);
s = fields.get("d");
checkNotNull(s, "the custom field <d> is null");
r.setD(Integer.parseInt(s));
s = fields.get("e");
checkNotNull(s, "the custom field <e> is null");
r.setE(Integer.parseInt(s));
s = fields.get("f");
checkNotNull(s, "the custom field <f> is null");
r.setF(Integer.parseInt(s));
s = fields.get("g");
checkNotNull(s, "the custom field <g> is null");
r.setG(Long.parseLong(s));
s = fields.get("h");
checkNotNull(s, "the custom field <h> is null");
r.setH(Integer.parseInt(s));
s = fields.get("i");
if (s != null) {
r.setI(s);
}
s = fields.get("j");
if (s != null) {
r.setJ(Integer.parseInt(s));
}
s = fields.get("k");
if (s != null) {
r.setK(Boolean.parseBoolean(s));
}
s = fields.get("l");
if (s != null) {
r.setL(Integer.parseInt(s));
}
s = fields.get("m");
if (s != null) {
r.setM(Boolean.parseBoolean(s));
}
return r;
}
private static void checkNotNull(String s, String msg) throws RemotingCommandException {
if (s == null) {
throw new RemotingCommandException(msg);
}
}
public void executeSendMessageHookAfter(final RemotingCommand response, final SendMessageContext context) { public void executeSendMessageHookAfter(final RemotingCommand response, final SendMessageContext context) {
if (hasSendMessageHook()) { if (hasSendMessageHook()) {
for (SendMessageHook hook : this.sendMessageHookList) { for (SendMessageHook hook : this.sendMessageHookList) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.processor;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Assert;
import org.junit.Test;
public class AbstractSendMessageProcessorTest {
@Test
public void testDecodeSendMessageHeaderV2() throws Exception {
Field[] declaredFields = SendMessageRequestHeaderV2.class.getDeclaredFields();
List<Field> declaredFieldsList = new ArrayList<>();
for (Field f : declaredFields) {
if (f.getName().startsWith("$")) {
continue;
}
f.setAccessible(true);
declaredFieldsList.add(f);
}
RemotingCommand command = RemotingCommand.createRequestCommand(0, null);
HashMap<String, String> m = buildExtFields(declaredFieldsList);
command.setExtFields(m);
check(command, declaredFieldsList);
}
private HashMap<String, String> buildExtFields(List<Field> fields) {
HashMap<String, String> extFields = new HashMap<>();
for (Field f: fields) {
Class<?> c = f.getType();
if (c.equals(String.class)) {
extFields.put(f.getName(), "str");
} else if (c.equals(Integer.class) || c.equals(int.class)) {
extFields.put(f.getName(), "123");
} else if (c.equals(Long.class) || c.equals(long.class)) {
extFields.put(f.getName(), "1234");
} else if (c.equals(Boolean.class) || c.equals(boolean.class)) {
extFields.put(f.getName(), "true");
} else {
throw new RuntimeException(f.getName() + ":" + f.getType().getName());
}
}
return extFields;
}
private void check(RemotingCommand command, List<Field> fields) throws Exception {
SendMessageRequestHeaderV2 o1 = (SendMessageRequestHeaderV2) command.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
SendMessageRequestHeaderV2 o2 = AbstractSendMessageProcessor.decodeSendMessageHeaderV2(command);
for (Field f : fields) {
Object value1 = f.get(o1);
Object value2 = f.get(o2);
if (value1 == null) {
Assert.assertNull(value2);
} else {
Assert.assertEquals(value1, value2);
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册