提交 7f234337 编写于 作者: C chengxiangwang

optimize logic with dispatcher;add some test case

上级 4d88d5b4
......@@ -17,8 +17,6 @@
package org.apache.rocketmq.remoting.protocol;
import com.alibaba.fastjson.annotation.JSONField;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -176,34 +174,6 @@ public class RemotingCommand {
return CodecHelper.decodeCommandCustomHeader(this, classHeader);
}
public void makeCustomHeaderToNet() {
if (this.customHeader != null) {
Field[] fields = customHeader.getClass().getDeclaredFields();
if (null == this.extFields) {
this.extFields = new HashMap<String, String>();
}
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String name = field.getName();
if (!name.startsWith("this")) {
Object value = null;
try {
field.setAccessible(true);
value = field.get(this.customHeader);
} catch (Exception e) {
log.error("Failed to access field [{}]", name, e);
}
if (value != null) {
this.extFields.put(name, value.toString());
}
}
}
}
}
}
public ByteBuffer encodeHeader(final int bodyLength) {
return CodecHelper.encodeHeader(bodyLength, this);
}
......
......@@ -17,19 +17,15 @@
package org.apache.rocketmq.remoting.transport.mqtt;
import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import java.nio.charset.Charset;
import java.util.List;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.dispatcher.EncodeDecodeDispatcher;
import org.apache.rocketmq.remoting.transport.mqtt.dispatcher.Message2MessageEncodeDecode;
public class Mqtt2RemotingCommandHandler extends MessageToMessageDecoder<MqttMessage> {
public class MqttMessage2RemotingCommandHandler extends MessageToMessageDecoder<MqttMessage> {
/**
* Decode from one message to an other. This method will be called for each written message that
......@@ -48,59 +44,11 @@ public class Mqtt2RemotingCommandHandler extends MessageToMessageDecoder<MqttMes
return;
}
RemotingCommand requestCommand = null;
MqttFixedHeader mqttFixedHeader = msg.fixedHeader();
Object variableHeader = msg.variableHeader();
switch (msg.fixedHeader().messageType()) {
case CONNECT:
RocketMQMqttConnectPayload payload = RocketMQMqttConnectPayload
.fromMqttConnectPayload(((MqttConnectMessage) msg).payload());
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setMessageType(mqttFixedHeader.messageType().value());
mqttHeader.setDup(mqttFixedHeader.isDup());
mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value());
mqttHeader.setRetain(mqttFixedHeader.isRetain());
mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength());
MqttConnectVariableHeader mqttConnectVariableHeader = (MqttConnectVariableHeader) variableHeader;
mqttHeader.setName(mqttConnectVariableHeader.name());
mqttHeader.setVersion(mqttConnectVariableHeader.version());
mqttHeader.setHasUserName(mqttConnectVariableHeader.hasUserName());
mqttHeader.setHasPassword(mqttConnectVariableHeader.hasPassword());
mqttHeader.setWillRetain(mqttConnectVariableHeader.isWillRetain());
mqttHeader.setWillQos(mqttConnectVariableHeader.willQos());
mqttHeader.setWillFlag(mqttConnectVariableHeader.isWillFlag());
mqttHeader.setCleanSession(mqttConnectVariableHeader.isCleanSession());
mqttHeader
.setKeepAliveTimeSeconds(mqttConnectVariableHeader.keepAliveTimeSeconds());
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
requestCommand.makeCustomHeaderToNet();
requestCommand.setBody(payload.encode());
out.add(requestCommand);
case CONNACK:
case DISCONNECT:
case PUBLISH:
case PUBACK:
case PUBREC:
case PUBREL:
case PUBCOMP:
case SUBSCRIBE:
case SUBACK:
case UNSUBSCRIBE:
case UNSUBACK:
case PINGREQ:
case PINGRESP:
}
}
private byte[] encode(Object obj) {
String json = JSON.toJSONString(obj, false);
if (json != null) {
return json.getBytes(Charset.forName(RemotingUtil.REMOTING_CHARSET));
Message2MessageEncodeDecode message2MessageEncodeDecode = EncodeDecodeDispatcher
.getEncodeDecodeDispatcher().get(msg.fixedHeader().messageType());
if (message2MessageEncodeDecode != null) {
requestCommand = message2MessageEncodeDecode.decode(msg);
}
return null;
out.add(requestCommand);
}
}
......@@ -188,22 +188,22 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R
try {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
throw new RemotingTimeoutException("InvokeSync call timeout");
}
RemotingCommand response = this.invokeSyncWithInterceptor(remotingChannel, request, timeoutMillis - costTime);
return response;
} catch (RemotingException remotingException) {
if (remotingException instanceof RemotingSendRequestException) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
log.warn("InvokeSync: send request exception, so close the channel[{}]", addr);
this.closeRemotingChannel(addr, remotingChannel);
throw (RemotingSendRequestException) remotingException;
}
if (remotingException instanceof RemotingTimeoutException) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeRemotingChannel(addr, remotingChannel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
log.warn("InvokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
log.warn("InvokeSync: wait response timeout exception, the channel[{}]", addr);
throw (RemotingTimeoutException) remotingException;
}
}
......@@ -229,11 +229,11 @@ public class MqttRemotingClient extends NettyRemotingClientAbstract implements R
try {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsync call timeout");
throw new RemotingTooMuchRequestException("InvokeAsync call timeout");
}
this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
} catch (RemotingSendRequestException e) {
log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
log.warn("InvokeAsync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
......
......@@ -182,8 +182,8 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
.addLast(defaultEventExecutorGroup,
new MqttDecoder(),
MqttEncoder.INSTANCE,
new Mqtt2RemotingCommandHandler(),
new RemotingCommand2MqttHandler(),
new MqttMessage2RemotingCommandHandler(),
new RemotingCommand2MqttMessageHandler(),
new IdleStateHandler(nettyServerConfig
.getConnectionChannelReaderIdleSeconds(),
nettyServerConfig
......
......@@ -22,7 +22,7 @@ import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class RemotingCommand2MqttHandler extends MessageToMessageEncoder<RemotingCommand> {
public class RemotingCommand2MqttMessageHandler extends MessageToMessageEncoder<RemotingCommand> {
/**
* Encode from one message to an other. This method will be called for each written message that
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.handler.codec.mqtt.MqttMessageType;
import java.util.HashMap;
import java.util.Map;
public class EncodeDecodeDispatcher {
private static Map<MqttMessageType, Message2MessageEncodeDecode> encodeDecodeDispatcher = new HashMap<>();
static {
encodeDecodeDispatcher.put(MqttMessageType.CONNECT, new MqttConnectEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.CONNACK, new MqttConnectackEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.DISCONNECT, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBLISH, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBACK, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBREC, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBREL, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBCOMP, null);
encodeDecodeDispatcher.put(MqttMessageType.SUBSCRIBE, null);
encodeDecodeDispatcher.put(MqttMessageType.SUBACK, null);
encodeDecodeDispatcher.put(MqttMessageType.UNSUBSCRIBE, null);
encodeDecodeDispatcher.put(MqttMessageType.UNSUBACK, null);
encodeDecodeDispatcher.put(MqttMessageType.PINGREQ, null);
encodeDecodeDispatcher.put(MqttMessageType.PINGRESP, null);
}
public static Map<MqttMessageType, Message2MessageEncodeDecode> getEncodeDecodeDispatcher() {
return encodeDecodeDispatcher;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface Message2MessageEncodeDecode {
RemotingCommand decode(MqttMessage mqttMessage);
MqttMessage encode(RemotingCommand remotingCommand);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
public class MqttConnectEncodeDecode implements Message2MessageEncodeDecode {
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
RocketMQMqttConnectPayload payload = RocketMQMqttConnectPayload
.fromMqttConnectPayload(((MqttConnectMessage) mqttMessage).payload());
RemotingCommand requestCommand = null;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttConnectVariableHeader variableHeader = (MqttConnectVariableHeader) mqttMessage
.variableHeader();
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setMessageType(mqttFixedHeader.messageType().value());
mqttHeader.setDup(mqttFixedHeader.isDup());
mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value());
mqttHeader.setRetain(mqttFixedHeader.isRetain());
mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength());
mqttHeader.setName(variableHeader.name());
mqttHeader.setVersion(variableHeader.version());
mqttHeader.setHasUserName(variableHeader.hasUserName());
mqttHeader.setHasPassword(variableHeader.hasPassword());
mqttHeader.setWillRetain(variableHeader.isWillRetain());
mqttHeader.setWillQos(variableHeader.willQos());
mqttHeader.setWillFlag(variableHeader.isWillFlag());
mqttHeader.setCleanSession(variableHeader.isCleanSession());
mqttHeader
.setKeepAliveTimeSeconds(variableHeader.keepAliveTimeSeconds());
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
CodecHelper.makeCustomHeaderToNet(requestCommand);
requestCommand.setBody(payload.encode());
return requestCommand;
}
@Override
public MqttMessage encode(RemotingCommand remotingCommand) {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class MqttConnectackEncodeDecode implements Message2MessageEncodeDecode {
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
return null;
}
@Override
public MqttMessage encode(RemotingCommand remotingCommand) {
return null;
}
}
......@@ -16,10 +16,39 @@
*/
package org.apache.rocketmq.snode.processor;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class MqttConnectMessageHandlerTest {
@Mock
private RemotingChannel remotingChannel;
@Test
public void testHandlerMessage() throws Exception {
MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(
new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig()));
MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes());
MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(new MqttFixedHeader(
MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200),new MqttConnectVariableHeader(null,4,false,false,false,0,false,false,50),new MqttConnectPayload("abcd", "ttest", "message".getBytes(),"user","password".getBytes()));
mqttConnectMessageHandler.handleMessage(mqttConnectMessage, remotingChannel);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册