diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/Mqtt2RemotingCommandHandler.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/Mqtt2RemotingCommandHandler.java index 5721f58648cd0ee99211eac97b36d9040b2bf2eb..f999752e13bcf140014efdd2cc7124be9e6e5567 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/Mqtt2RemotingCommandHandler.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/Mqtt2RemotingCommandHandler.java @@ -21,7 +21,6 @@ import com.alibaba.fastjson.JSON; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.handler.codec.mqtt.MqttConnectMessage; -import io.netty.handler.codec.mqtt.MqttConnectPayload; import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; @@ -54,11 +53,12 @@ public class Mqtt2RemotingCommandHandler extends MessageToMessageDecoder(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), - "SnodeSendMessageThread", - false); + snodeConfig.getSnodeSendMessageMinPoolSize(), + snodeConfig.getSnodeSendMessageMaxPoolSize(), + 3000, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), + "SnodeSendMessageThread", + false); this.pullMessageExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeSendMessageMinPoolSize(), - snodeConfig.getSnodeSendMessageMaxPoolSize(), - 3000, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), - "SnodePullMessageThread", - false); + snodeConfig.getSnodeSendMessageMinPoolSize(), + snodeConfig.getSnodeSendMessageMaxPoolSize(), + 3000, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), + "SnodePullMessageThread", + false); this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeHeartBeatCorePoolSize(), - snodeConfig.getSnodeHeartBeatMaxPoolSize(), - 1000 * 60, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()), - "SnodeHeartbeatThread", - true); + snodeConfig.getSnodeHeartBeatCorePoolSize(), + snodeConfig.getSnodeHeartBeatMaxPoolSize(), + 1000 * 60, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()), + "SnodeHeartbeatThread", + true); this.consumerManagerExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeSendMessageMinPoolSize(), - snodeConfig.getSnodeSendMessageMaxPoolSize(), - 3000, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), - "SnodePullMessageThread", - false); + snodeConfig.getSnodeSendMessageMinPoolSize(), + snodeConfig.getSnodeSendMessageMaxPoolSize(), + 3000, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), + "SnodePullMessageThread", + false); this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeSendMessageMinPoolSize(), - snodeConfig.getSnodeSendMessageMaxPoolSize(), - 3000, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), - "ConsumerManagerThread", - false); + snodeConfig.getSnodeSendMessageMinPoolSize(), + snodeConfig.getSnodeSendMessageMaxPoolSize(), + 3000, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), + "ConsumerManagerThread", + false); this.handleMqttMessageExecutor = ThreadUtils.newThreadPoolExecutor( snodeConfig.getSnodeHandleMqttMessageMinPoolSize(), @@ -192,7 +198,8 @@ public class SnodeController { if (this.snodeConfig.getNamesrvAddr() != null) { this.nnodeService.updateNnodeAddressList(this.snodeConfig.getNamesrvAddr()); - log.info("Set user specified name server address: {}", this.snodeConfig.getNamesrvAddr()); + log.info("Set user specified name server address: {}", + this.snodeConfig.getNamesrvAddr()); } this.subscriptionGroupManager = new SubscriptionGroupManager(this); @@ -208,7 +215,8 @@ public class SnodeController { this.producerManager = new ProducerManagerImpl(); this.consumerManager = new ConsumerManagerImpl(this); this.iotClientManager = new IOTClientManagerImpl(this); - this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, this.consumerManager, this.iotClientManager); + this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, + this.consumerManager, this.iotClientManager); this.slowConsumerService = new SlowConsumerServiceImpl(this); } @@ -217,22 +225,26 @@ public class SnodeController { } private void initRemotingServerInterceptorGroup() { - List remotingServerInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath()); + List remotingServerInterceptors = InterceptorFactory.getInstance() + .loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath()); if (remotingServerInterceptors != null && remotingServerInterceptors.size() > 0) { if (this.remotingServerInterceptorGroup == null) { this.remotingServerInterceptorGroup = new InterceptorGroup(); } for (Interceptor interceptor : remotingServerInterceptors) { this.remotingServerInterceptorGroup.registerInterceptor(interceptor); - log.warn("Remoting server interceptor: {} registered!", interceptor.interceptorName()); + log.warn("Remoting server interceptor: {} registered!", + interceptor.interceptorName()); } } } public boolean initialize() { - this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig, this.clientHousekeepingService); + this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer() + .init(this.nettyServerConfig, this.clientHousekeepingService); this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer( - RemotingUtil.MQTT_PROTOCOL).init(this.nettyServerConfig, this.clientHousekeepingService); + RemotingUtil.MQTT_PROTOCOL) + .init(this.nettyServerConfig, this.clientHousekeepingService); this.registerProcessor(); initSnodeInterceptorGroup(); initRemotingServerInterceptorGroup(); @@ -249,13 +261,14 @@ public class SnodeController { return; } - List accessValidators = ServiceProvider.loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); + List accessValidators = ServiceProvider + .loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); if (accessValidators == null || accessValidators.isEmpty()) { log.info("The snode dose not load the AccessValidator"); return; } - for (AccessValidator accessValidator: accessValidators) { + for (AccessValidator accessValidator : accessValidators) { final AccessValidator validator = accessValidator; this.remotingServerInterceptorGroup.registerInterceptor(new Interceptor() { @@ -264,31 +277,40 @@ public class SnodeController { return "snodeRequestAclControlInterceptor"; } - @Override public void beforeRequest(RequestContext requestContext) { + @Override + public void beforeRequest(RequestContext requestContext) { //Do not catch the exception RemotingCommand request = requestContext.getRequest(); - String remoteAddr = RemotingUtil.socketAddress2IpString(requestContext.getRemotingChannel().remoteAddress()); + String remoteAddr = RemotingUtil.socketAddress2IpString( + requestContext.getRemotingChannel().remoteAddress()); validator.validate(validator.parse(request, remoteAddr)); } - @Override public void afterRequest(ResponseContext responseContext) { } + @Override + public void afterRequest(ResponseContext responseContext) { + } - @Override public void onException(ExceptionContext exceptionContext) { } + @Override + public void onException(ExceptionContext exceptionContext) { + } }); } } private void initSnodeInterceptorGroup() { - List consumeMessageInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath()); + List consumeMessageInterceptors = InterceptorFactory.getInstance() + .loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath()); if (consumeMessageInterceptors != null && consumeMessageInterceptors.size() > 0) { this.consumeMessageInterceptorGroup = new InterceptorGroup(); for (Interceptor interceptor : consumeMessageInterceptors) { this.consumeMessageInterceptorGroup.registerInterceptor(interceptor); - log.warn("Consume message interceptor: {} registered!", interceptor.interceptorName()); + log.warn("Consume message interceptor: {} registered!", + interceptor.interceptorName()); } } - List sendMessageInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath()); + List sendMessageInterceptors = InterceptorFactory.getInstance() + .loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath()); if (sendMessageInterceptors != null && sendMessageInterceptors.size() > 0) { this.sendMessageInterceptorGroup = new InterceptorGroup(); for (Interceptor interceptor : sendMessageInterceptors) { @@ -300,30 +322,56 @@ public class SnodeController { } public void registerProcessor() { - this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor); - this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor); - this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor); - this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor); - this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor); - this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor); - this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, + this.sendMessageExecutor); + this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, + this.sendMessageExecutor); + this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, + this.heartbeatExecutor); + this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, + this.heartbeatExecutor); + this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, + this.heartbeatExecutor); + this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, + this.pullMessageExecutor); + this.snodeServer + .registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, + this.consumerManageExecutor); + this.snodeServer + .registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, + this.consumerManageExecutor); + this.snodeServer + .registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, + this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, + this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, + this.consumerManageExecutor); + this.snodeServer + .registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, + this.consumerManageExecutor); this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, defaultMqttMessageProcessor, handleMqttMessageExecutor); - defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.CONNECT, new MqttConnectMessageHandler(this)); - defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.DISCONNECT, new MqttDisconnectMessageHandler(this)); - defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PINGREQ, new MqttPingreqMessageHandler(this)); - defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBLISH, new MqttPublishMessageHandler(this)); - defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this)); - defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBCOMP, new MqttPubcompMessageHandler(this)); - defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this)); - defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this)); - defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.SUBSCRIBE, new MqttSubscribeMessageHandler(this)); - defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.UNSUBSCRIBE, new MqttUnsubscribeMessagHandler(this)); + defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.CONNECT, + new MqttConnectMessageHandler(this)); + defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.DISCONNECT, + new MqttDisconnectMessageHandler(this)); + defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PINGREQ, + new MqttPingreqMessageHandler(this)); + defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBLISH, + new MqttPublishMessageHandler(this)); + defaultMqttMessageProcessor + .registerMessageHanlder(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this)); + defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBCOMP, + new MqttPubcompMessageHandler(this)); + defaultMqttMessageProcessor + .registerMessageHanlder(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this)); + defaultMqttMessageProcessor + .registerMessageHanlder(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this)); + defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.SUBSCRIBE, + new MqttSubscribeMessageHandler(this)); + defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.UNSUBSCRIBE, + new MqttUnsubscribeMessagHandler(this)); } public void start() { @@ -361,16 +409,16 @@ public class SnodeController { if (this.mqttRemotingClient != null) { this.mqttRemotingClient.shutdown(); } - if(this.mqttRemotingServer != null) { + if (this.mqttRemotingServer != null) { this.mqttRemotingServer.shutdown(); } - if(this.scheduledService != null){ + if (this.scheduledService != null) { this.scheduledService.shutdown(); } - if(this.clientHousekeepingService != null) { + if (this.clientHousekeepingService != null) { this.clientHousekeepingService.shutdown(); } - if(this.pushService != null) { + if (this.pushService != null) { this.pushService.shutdown(); } } @@ -448,7 +496,7 @@ public class SnodeController { } public void setRemotingServerInterceptorGroup( - InterceptorGroup remotingServerInterceptorGroup) { + InterceptorGroup remotingServerInterceptorGroup) { this.remotingServerInterceptorGroup = remotingServerInterceptorGroup; } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java index 795f93c8c1e1f61ac59fdf7a9f87c9e4f8755465..1a3ced1ac10fc83ef03215187d10c44ba9dbf0d2 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java @@ -19,11 +19,12 @@ package org.apache.rocketmq.snode.processor; import com.alibaba.fastjson.JSON; import io.netty.handler.codec.mqtt.MqttConnectMessage; -import io.netty.handler.codec.mqtt.MqttConnectPayload; import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; +import java.io.UnsupportedEncodingException; import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; @@ -33,6 +34,7 @@ import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload; import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler; public class DefaultMqttMessageProcessor implements RequestProcessor { @@ -44,26 +46,24 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { @Override public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message) - throws RemotingCommandException { - //TODO + throws RemotingCommandException, UnsupportedEncodingException { MqttHeader mqttHeader = (MqttHeader) message.decodeCommandCustomHeader(MqttHeader.class); - MqttFixedHeader fixedHeader = new MqttFixedHeader(mqttHeader.getMessageType(), - mqttHeader.isDup(), mqttHeader.getQosLevel(), mqttHeader.isRetain(), + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.valueOf(mqttHeader.getMessageType()), + mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), mqttHeader.getRemainingLength()); MqttMessage mqttMessage = null; - switch (mqttHeader.getMessageType()) { + switch (fixedHeader.messageType()) { case CONNECT: MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader( mqttHeader.getName(), mqttHeader.getVersion(), mqttHeader.isHasUserName(), mqttHeader.isHasPassword(), mqttHeader.isWillRetain(), mqttHeader.getWillQos(), mqttHeader.isWillFlag(), mqttHeader.isCleanSession(), mqttHeader.getKeepAliveTimeSeconds()); - MqttConnectPayload payload = decode(message.getBody(), MqttConnectPayload.class); - mqttMessage = new MqttConnectMessage(fixedHeader, variableHeader, payload); - + RocketMQMqttConnectPayload payload = decode(message.getBody(), RocketMQMqttConnectPayload.class); + mqttMessage = new MqttConnectMessage(fixedHeader, variableHeader, payload.toMqttConnectPayload()); case DISCONNECT: } - return type2handler.get(mqttHeader.getMessageType()).handleMessage(mqttMessage, remotingChannel); + return type2handler.get(MqttMessageType.valueOf(mqttHeader.getMessageType())).handleMessage(mqttMessage, remotingChannel); } @Override diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java index e4f3e838f921e12fc1cfbdbf65234b8eef91506d..c1b1633cabf5ea80e45e624f1f0d2f7d860a9fb7 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java @@ -35,18 +35,12 @@ public class MqttConnectMessageHandler implements MessageHandler { private static final int MIN_AVAILABLE_VERSION = 3; private static final int MAX_AVAILABLE_VERSION = 4; -/* private ClientManager clientManager; - - public MqttConnectMessageHandler(ClientManager clientManager) { - this.clientManager = clientManager; - }*/ - public MqttConnectMessageHandler(SnodeController snodeController) { this.snodeController = snodeController; } - @Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) { -// MqttClient client = (MqttClient) message.getClient(); + @Override + public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) { if (!(message instanceof MqttConnectMessage)) { return null; } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java index a6fe2e4c22a5b4abb97854ff890ac33ef03631bb..d07ec9de97e1f0efbfd516cf98c5f89f0aa87ea6 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java @@ -26,26 +26,18 @@ public class MqttDisconnectMessageHandler implements MessageHandler { private final SnodeController snodeController; - /* private ClientManager clientManager; - - public MqttDisconnectMessageHandler(ClientManager clientManager) { - this.clientManager = clientManager; - }*/ - public MqttDisconnectMessageHandler(SnodeController snodeController) { - this.snodeController = snodeController; - } + + public MqttDisconnectMessageHandler(SnodeController snodeController) { + this.snodeController = snodeController; + } /** - * handle the DISCONNECT message from the client - *
    - *
  1. discard the Will Message and Will Topic
  2. - *
  3. remove the client from the ClientManager
  4. - *
  5. disconnect the connection
  6. - *
- * @param message - * @return + * handle the DISCONNECT message from the client
  1. discard the Will Message and Will + * Topic
  2. remove the client from the ClientManager
  3. disconnect the + * connection
*/ - @Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) { + @Override + public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) { // TODO discard the Will Message and Will Topic return null; } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java index e119bdc1837d4e4443be9fc193ea2e2b3074ce43..6fec9f4808852c6ef1ddbf67f8e2d1db3362cd69 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java @@ -24,17 +24,14 @@ import org.apache.rocketmq.snode.SnodeController; public class MqttPublishMessageHandler implements MessageHandler { -/* private MessageStore messageStore; - - public MqttPublishMessageHandler(MessageStore messageStore) { - this.messageStore = messageStore; - }*/ -private final SnodeController snodeController; + private final SnodeController snodeController; public MqttPublishMessageHandler(SnodeController snodeController) { this.snodeController = snodeController; } - @Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) { + + @Override + public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) { return null; } diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..2a2a4459c28f3103c12ef8f9e9e4bab1652479fa --- /dev/null +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.snode.processor; + +import com.alibaba.fastjson.JSON; +import io.netty.handler.codec.mqtt.MqttConnectPayload; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; +import java.io.UnsupportedEncodingException; +import java.nio.charset.Charset; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.remoting.ClientConfig; +import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.ServerConfig; +import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.CodecHelper; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload; +import org.apache.rocketmq.snode.SnodeController; +import org.apache.rocketmq.snode.config.SnodeConfig; +import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class DefaultMqttMessageProcessorTest { + private DefaultMqttMessageProcessor defaultMqttMessageProcessor; + + @Spy + private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig()); + + @Mock + private RemotingChannel remotingChannel; + + private String topic = "SnodeTopic"; + + private String group = "SnodeGroup"; + + private String enodeName = "enodeName"; + + @Before + public void init() { + defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(); + } + + @Test + public void testProcessRequest() throws RemotingCommandException, UnsupportedEncodingException { + RemotingCommand request = createMqttConnectMesssageCommand(); + defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.CONNECT, new MqttConnectMessageHandler(snodeController)); + defaultMqttMessageProcessor.processRequest(remotingChannel, request); + } + + private MqttHeader createMqttConnectMesssageHeader() { + MqttHeader mqttHeader = new MqttHeader(); + mqttHeader.setMessageType(MqttMessageType.CONNECT.value()); + mqttHeader.setDup(false); + mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value()); + mqttHeader.setRetain(false); + mqttHeader.setRemainingLength(200); + + mqttHeader.setName("MQTT"); + mqttHeader.setVersion(4); + mqttHeader.setHasUserName(false); + mqttHeader.setHasPassword(false); + mqttHeader.setWillRetain(false); + mqttHeader.setWillQos(0); + mqttHeader.setWillFlag(false); + mqttHeader.setCleanSession(false); + mqttHeader.setKeepAliveTimeSeconds(60); + + return mqttHeader; + } + + private RemotingCommand createMqttConnectMesssageCommand() { + MqttHeader mqttHeader = createMqttConnectMesssageHeader(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader); + MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes()); + request.setBody(RocketMQMqttConnectPayload.fromMqttConnectPayload(payload).encode()); + CodecHelper.makeCustomHeaderToNet(request); + return request; + } + private byte[] encode(Object obj) { + String json = JSON.toJSONString(obj, false); + if (json != null) { + return json.getBytes(Charset.forName(RemotingUtil.REMOTING_CHARSET)); + } + return null; + } +} diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..abfff90ce0d8366ca73e8f317d9d1845f57e03a1 --- /dev/null +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.snode.processor; + +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class MqttConnectMessageHandlerTest { + +}