提交 4d88d5b4 编写于 作者: C chengxiangwang

fix checkstyle error

上级 686b5c79
...@@ -21,7 +21,6 @@ import com.alibaba.fastjson.JSON; ...@@ -21,7 +21,6 @@ import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
...@@ -54,11 +53,12 @@ public class Mqtt2RemotingCommandHandler extends MessageToMessageDecoder<MqttMes ...@@ -54,11 +53,12 @@ public class Mqtt2RemotingCommandHandler extends MessageToMessageDecoder<MqttMes
switch (msg.fixedHeader().messageType()) { switch (msg.fixedHeader().messageType()) {
case CONNECT: case CONNECT:
MqttConnectPayload payload = ((MqttConnectMessage) msg).payload(); RocketMQMqttConnectPayload payload = RocketMQMqttConnectPayload
.fromMqttConnectPayload(((MqttConnectMessage) msg).payload());
MqttHeader mqttHeader = new MqttHeader(); MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setMessageType(mqttFixedHeader.messageType()); mqttHeader.setMessageType(mqttFixedHeader.messageType().value());
mqttHeader.setDup(mqttFixedHeader.isDup()); mqttHeader.setDup(mqttFixedHeader.isDup());
mqttHeader.setQosLevel(mqttFixedHeader.qosLevel()); mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value());
mqttHeader.setRetain(mqttFixedHeader.isRetain()); mqttHeader.setRetain(mqttFixedHeader.isRetain());
mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength()); mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength());
...@@ -78,7 +78,7 @@ public class Mqtt2RemotingCommandHandler extends MessageToMessageDecoder<MqttMes ...@@ -78,7 +78,7 @@ public class Mqtt2RemotingCommandHandler extends MessageToMessageDecoder<MqttMes
.createRequestCommand(1000, mqttHeader); .createRequestCommand(1000, mqttHeader);
requestCommand.makeCustomHeaderToNet(); requestCommand.makeCustomHeaderToNet();
requestCommand.setBody(encode(payload)); requestCommand.setBody(payload.encode());
out.add(requestCommand); out.add(requestCommand);
case CONNACK: case CONNACK:
case DISCONNECT: case DISCONNECT:
......
...@@ -21,8 +21,6 @@ ...@@ -21,8 +21,6 @@
package org.apache.rocketmq.remoting.transport.mqtt; package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
...@@ -31,11 +29,11 @@ public class MqttHeader implements CommandCustomHeader { ...@@ -31,11 +29,11 @@ public class MqttHeader implements CommandCustomHeader {
//fix header members //fix header members
@CFNotNull @CFNotNull
private MqttMessageType messageType; private Integer messageType;
@CFNotNull @CFNotNull
private boolean isDup; private boolean isDup;
@CFNotNull @CFNotNull
private MqttQoS qosLevel; private Integer qosLevel;
@CFNotNull @CFNotNull
private boolean isRetain; private boolean isRetain;
@CFNotNull @CFNotNull
...@@ -57,11 +55,11 @@ public class MqttHeader implements CommandCustomHeader { ...@@ -57,11 +55,11 @@ public class MqttHeader implements CommandCustomHeader {
private String topicName; private String topicName;
private Integer packetId; private Integer packetId;
public MqttMessageType getMessageType() { public Integer getMessageType() {
return messageType; return messageType;
} }
public void setMessageType(MqttMessageType messageType) { public void setMessageType(Integer messageType) {
this.messageType = messageType; this.messageType = messageType;
} }
...@@ -73,11 +71,11 @@ public class MqttHeader implements CommandCustomHeader { ...@@ -73,11 +71,11 @@ public class MqttHeader implements CommandCustomHeader {
isDup = dup; isDup = dup;
} }
public MqttQoS getQosLevel() { public Integer getQosLevel() {
return qosLevel; return qosLevel;
} }
public void setQosLevel(MqttQoS qosLevel) { public void setQosLevel(Integer qosLevel) {
this.qosLevel = qosLevel; this.qosLevel = qosLevel;
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.util.internal.StringUtil;
import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
/**
* Payload of {@link MqttConnectMessage}
*/
public final class RocketMQMqttConnectPayload extends RemotingSerializable {
private String clientIdentifier;
private String willTopic;
private String willMessage;
private String userName;
private String password;
public RocketMQMqttConnectPayload(
String clientIdentifier,
String willTopic,
String willMessage,
String userName,
String password) {
this.clientIdentifier = clientIdentifier;
this.willTopic = willTopic;
this.willMessage = willMessage;
this.userName = userName;
this.password = password;
}
public static RocketMQMqttConnectPayload fromMqttConnectPayload(MqttConnectPayload payload) {
return new RocketMQMqttConnectPayload(payload.clientIdentifier(), payload.willTopic(),
payload.willMessage(), payload.userName(), payload.password());
}
public MqttConnectPayload toMqttConnectPayload() throws UnsupportedEncodingException {
return new MqttConnectPayload(this.clientIdentifier, this.willTopic, this.willMessage.getBytes(
RemotingUtil.REMOTING_CHARSET), this.userName, this.password.getBytes(RemotingUtil.REMOTING_CHARSET));
}
public String getClientIdentifier() {
return clientIdentifier;
}
public String getWillTopic() {
return willTopic;
}
public String getWillMessage() {
return willMessage;
}
public String getUserName() {
return userName;
}
public String getPassword() {
return password;
}
public void setClientIdentifier(String clientIdentifier) {
this.clientIdentifier = clientIdentifier;
}
public void setWillTopic(String willTopic) {
this.willTopic = willTopic;
}
public void setWillMessage(String willMessage) {
this.willMessage = willMessage;
}
public void setUserName(String userName) {
this.userName = userName;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this))
.append('[')
.append("clientIdentifier=").append(clientIdentifier)
.append(", willTopic=").append(willTopic)
.append(", willMessage=").append(willMessage)
.append(", userName=").append(userName)
.append(", password=").append(password)
.append(']')
.toString();
}
}
...@@ -84,7 +84,9 @@ import org.apache.rocketmq.snode.service.impl.PushServiceImpl; ...@@ -84,7 +84,9 @@ import org.apache.rocketmq.snode.service.impl.PushServiceImpl;
import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl; import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl;
public class SnodeController { public class SnodeController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory
.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeConfig snodeConfig; private final SnodeConfig snodeConfig;
private final ServerConfig nettyServerConfig; private final ServerConfig nettyServerConfig;
...@@ -121,65 +123,69 @@ public class SnodeController { ...@@ -121,65 +123,69 @@ public class SnodeController {
private ClientService clientService; private ClientService clientService;
private SlowConsumerService slowConsumerService; private SlowConsumerService slowConsumerService;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( private final ScheduledExecutorService scheduledExecutorService = Executors
"SnodeControllerScheduledThread")); .newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"SnodeControllerScheduledThread"));
public SnodeController(ServerConfig nettyServerConfig, public SnodeController(ServerConfig nettyServerConfig,
ClientConfig nettyClientConfig, ClientConfig nettyClientConfig,
SnodeConfig snodeConfig) { SnodeConfig snodeConfig) {
this.nettyClientConfig = nettyClientConfig; this.nettyClientConfig = nettyClientConfig;
this.nettyServerConfig = nettyServerConfig; this.nettyServerConfig = nettyServerConfig;
this.snodeConfig = snodeConfig; this.snodeConfig = snodeConfig;
this.enodeService = new EnodeServiceImpl(this); this.enodeService = new EnodeServiceImpl(this);
this.nnodeService = new NnodeServiceImpl(this); this.nnodeService = new NnodeServiceImpl(this);
this.scheduledService = new ScheduledServiceImpl(this); this.scheduledService = new ScheduledServiceImpl(this);
this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient().init(this.getNettyClientConfig(), null); this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient()
this.mqttRemotingClient = RemotingClientFactory.getInstance().createRemotingClient(RemotingUtil.MQTT_PROTOCOL).init(this.getNettyClientConfig(), null); .init(this.getNettyClientConfig(), null);
this.mqttRemotingClient = RemotingClientFactory.getInstance()
.createRemotingClient(RemotingUtil.MQTT_PROTOCOL)
.init(this.getNettyClientConfig(), null);
this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor( this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(), snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(), snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000, 3000,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodeSendMessageThread", "SnodeSendMessageThread",
false); false);
this.pullMessageExecutor = ThreadUtils.newThreadPoolExecutor( this.pullMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(), snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(), snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000, 3000,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodePullMessageThread", "SnodePullMessageThread",
false); false);
this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor( this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeHeartBeatCorePoolSize(), snodeConfig.getSnodeHeartBeatCorePoolSize(),
snodeConfig.getSnodeHeartBeatMaxPoolSize(), snodeConfig.getSnodeHeartBeatMaxPoolSize(),
1000 * 60, 1000 * 60,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()), new ArrayBlockingQueue<>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()),
"SnodeHeartbeatThread", "SnodeHeartbeatThread",
true); true);
this.consumerManagerExecutor = ThreadUtils.newThreadPoolExecutor( this.consumerManagerExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(), snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(), snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000, 3000,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodePullMessageThread", "SnodePullMessageThread",
false); false);
this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor( this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(), snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(), snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000, 3000,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"ConsumerManagerThread", "ConsumerManagerThread",
false); false);
this.handleMqttMessageExecutor = ThreadUtils.newThreadPoolExecutor( this.handleMqttMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeHandleMqttMessageMinPoolSize(), snodeConfig.getSnodeHandleMqttMessageMinPoolSize(),
...@@ -192,7 +198,8 @@ public class SnodeController { ...@@ -192,7 +198,8 @@ public class SnodeController {
if (this.snodeConfig.getNamesrvAddr() != null) { if (this.snodeConfig.getNamesrvAddr() != null) {
this.nnodeService.updateNnodeAddressList(this.snodeConfig.getNamesrvAddr()); this.nnodeService.updateNnodeAddressList(this.snodeConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}", this.snodeConfig.getNamesrvAddr()); log.info("Set user specified name server address: {}",
this.snodeConfig.getNamesrvAddr());
} }
this.subscriptionGroupManager = new SubscriptionGroupManager(this); this.subscriptionGroupManager = new SubscriptionGroupManager(this);
...@@ -208,7 +215,8 @@ public class SnodeController { ...@@ -208,7 +215,8 @@ public class SnodeController {
this.producerManager = new ProducerManagerImpl(); this.producerManager = new ProducerManagerImpl();
this.consumerManager = new ConsumerManagerImpl(this); this.consumerManager = new ConsumerManagerImpl(this);
this.iotClientManager = new IOTClientManagerImpl(this); this.iotClientManager = new IOTClientManagerImpl(this);
this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, this.consumerManager, this.iotClientManager); this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager,
this.consumerManager, this.iotClientManager);
this.slowConsumerService = new SlowConsumerServiceImpl(this); this.slowConsumerService = new SlowConsumerServiceImpl(this);
} }
...@@ -217,22 +225,26 @@ public class SnodeController { ...@@ -217,22 +225,26 @@ public class SnodeController {
} }
private void initRemotingServerInterceptorGroup() { private void initRemotingServerInterceptorGroup() {
List<Interceptor> remotingServerInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath()); List<Interceptor> remotingServerInterceptors = InterceptorFactory.getInstance()
.loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath());
if (remotingServerInterceptors != null && remotingServerInterceptors.size() > 0) { if (remotingServerInterceptors != null && remotingServerInterceptors.size() > 0) {
if (this.remotingServerInterceptorGroup == null) { if (this.remotingServerInterceptorGroup == null) {
this.remotingServerInterceptorGroup = new InterceptorGroup(); this.remotingServerInterceptorGroup = new InterceptorGroup();
} }
for (Interceptor interceptor : remotingServerInterceptors) { for (Interceptor interceptor : remotingServerInterceptors) {
this.remotingServerInterceptorGroup.registerInterceptor(interceptor); this.remotingServerInterceptorGroup.registerInterceptor(interceptor);
log.warn("Remoting server interceptor: {} registered!", interceptor.interceptorName()); log.warn("Remoting server interceptor: {} registered!",
interceptor.interceptorName());
} }
} }
} }
public boolean initialize() { public boolean initialize() {
this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig, this.clientHousekeepingService); this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer()
.init(this.nettyServerConfig, this.clientHousekeepingService);
this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer( this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(
RemotingUtil.MQTT_PROTOCOL).init(this.nettyServerConfig, this.clientHousekeepingService); RemotingUtil.MQTT_PROTOCOL)
.init(this.nettyServerConfig, this.clientHousekeepingService);
this.registerProcessor(); this.registerProcessor();
initSnodeInterceptorGroup(); initSnodeInterceptorGroup();
initRemotingServerInterceptorGroup(); initRemotingServerInterceptorGroup();
...@@ -249,13 +261,14 @@ public class SnodeController { ...@@ -249,13 +261,14 @@ public class SnodeController {
return; return;
} }
List<AccessValidator> accessValidators = ServiceProvider.loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); List<AccessValidator> accessValidators = ServiceProvider
.loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
if (accessValidators == null || accessValidators.isEmpty()) { if (accessValidators == null || accessValidators.isEmpty()) {
log.info("The snode dose not load the AccessValidator"); log.info("The snode dose not load the AccessValidator");
return; return;
} }
for (AccessValidator accessValidator: accessValidators) { for (AccessValidator accessValidator : accessValidators) {
final AccessValidator validator = accessValidator; final AccessValidator validator = accessValidator;
this.remotingServerInterceptorGroup.registerInterceptor(new Interceptor() { this.remotingServerInterceptorGroup.registerInterceptor(new Interceptor() {
...@@ -264,31 +277,40 @@ public class SnodeController { ...@@ -264,31 +277,40 @@ public class SnodeController {
return "snodeRequestAclControlInterceptor"; return "snodeRequestAclControlInterceptor";
} }
@Override public void beforeRequest(RequestContext requestContext) { @Override
public void beforeRequest(RequestContext requestContext) {
//Do not catch the exception //Do not catch the exception
RemotingCommand request = requestContext.getRequest(); RemotingCommand request = requestContext.getRequest();
String remoteAddr = RemotingUtil.socketAddress2IpString(requestContext.getRemotingChannel().remoteAddress()); String remoteAddr = RemotingUtil.socketAddress2IpString(
requestContext.getRemotingChannel().remoteAddress());
validator.validate(validator.parse(request, remoteAddr)); validator.validate(validator.parse(request, remoteAddr));
} }
@Override public void afterRequest(ResponseContext responseContext) { } @Override
public void afterRequest(ResponseContext responseContext) {
}
@Override public void onException(ExceptionContext exceptionContext) { } @Override
public void onException(ExceptionContext exceptionContext) {
}
}); });
} }
} }
private void initSnodeInterceptorGroup() { private void initSnodeInterceptorGroup() {
List<Interceptor> consumeMessageInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath()); List<Interceptor> consumeMessageInterceptors = InterceptorFactory.getInstance()
.loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath());
if (consumeMessageInterceptors != null && consumeMessageInterceptors.size() > 0) { if (consumeMessageInterceptors != null && consumeMessageInterceptors.size() > 0) {
this.consumeMessageInterceptorGroup = new InterceptorGroup(); this.consumeMessageInterceptorGroup = new InterceptorGroup();
for (Interceptor interceptor : consumeMessageInterceptors) { for (Interceptor interceptor : consumeMessageInterceptors) {
this.consumeMessageInterceptorGroup.registerInterceptor(interceptor); this.consumeMessageInterceptorGroup.registerInterceptor(interceptor);
log.warn("Consume message interceptor: {} registered!", interceptor.interceptorName()); log.warn("Consume message interceptor: {} registered!",
interceptor.interceptorName());
} }
} }
List<Interceptor> sendMessageInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath()); List<Interceptor> sendMessageInterceptors = InterceptorFactory.getInstance()
.loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath());
if (sendMessageInterceptors != null && sendMessageInterceptors.size() > 0) { if (sendMessageInterceptors != null && sendMessageInterceptors.size() > 0) {
this.sendMessageInterceptorGroup = new InterceptorGroup(); this.sendMessageInterceptorGroup = new InterceptorGroup();
for (Interceptor interceptor : sendMessageInterceptors) { for (Interceptor interceptor : sendMessageInterceptors) {
...@@ -300,30 +322,56 @@ public class SnodeController { ...@@ -300,30 +322,56 @@ public class SnodeController {
} }
public void registerProcessor() { public void registerProcessor() {
this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor); this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor,
this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor); this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor,
this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor); this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor,
this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor); this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor,
this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor,
this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor,
this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor); this.pullMessageExecutor);
this.snodeServer
.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer
.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer
.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer
.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor,
this.consumerManageExecutor);
this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE,
defaultMqttMessageProcessor, handleMqttMessageExecutor); defaultMqttMessageProcessor, handleMqttMessageExecutor);
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.CONNECT, new MqttConnectMessageHandler(this)); defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.CONNECT,
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.DISCONNECT, new MqttDisconnectMessageHandler(this)); new MqttConnectMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PINGREQ, new MqttPingreqMessageHandler(this)); defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.DISCONNECT,
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBLISH, new MqttPublishMessageHandler(this)); new MqttDisconnectMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this)); defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PINGREQ,
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBCOMP, new MqttPubcompMessageHandler(this)); new MqttPingreqMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this)); defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBLISH,
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this)); new MqttPublishMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.SUBSCRIBE, new MqttSubscribeMessageHandler(this)); defaultMqttMessageProcessor
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.UNSUBSCRIBE, new MqttUnsubscribeMessagHandler(this)); .registerMessageHanlder(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBCOMP,
new MqttPubcompMessageHandler(this));
defaultMqttMessageProcessor
.registerMessageHanlder(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this));
defaultMqttMessageProcessor
.registerMessageHanlder(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.SUBSCRIBE,
new MqttSubscribeMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.UNSUBSCRIBE,
new MqttUnsubscribeMessagHandler(this));
} }
public void start() { public void start() {
...@@ -361,16 +409,16 @@ public class SnodeController { ...@@ -361,16 +409,16 @@ public class SnodeController {
if (this.mqttRemotingClient != null) { if (this.mqttRemotingClient != null) {
this.mqttRemotingClient.shutdown(); this.mqttRemotingClient.shutdown();
} }
if(this.mqttRemotingServer != null) { if (this.mqttRemotingServer != null) {
this.mqttRemotingServer.shutdown(); this.mqttRemotingServer.shutdown();
} }
if(this.scheduledService != null){ if (this.scheduledService != null) {
this.scheduledService.shutdown(); this.scheduledService.shutdown();
} }
if(this.clientHousekeepingService != null) { if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.shutdown(); this.clientHousekeepingService.shutdown();
} }
if(this.pushService != null) { if (this.pushService != null) {
this.pushService.shutdown(); this.pushService.shutdown();
} }
} }
...@@ -448,7 +496,7 @@ public class SnodeController { ...@@ -448,7 +496,7 @@ public class SnodeController {
} }
public void setRemotingServerInterceptorGroup( public void setRemotingServerInterceptorGroup(
InterceptorGroup remotingServerInterceptorGroup) { InterceptorGroup remotingServerInterceptorGroup) {
this.remotingServerInterceptorGroup = remotingServerInterceptorGroup; this.remotingServerInterceptorGroup = remotingServerInterceptorGroup;
} }
......
...@@ -19,11 +19,12 @@ package org.apache.rocketmq.snode.processor; ...@@ -19,11 +19,12 @@ package org.apache.rocketmq.snode.processor;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
...@@ -33,6 +34,7 @@ import org.apache.rocketmq.remoting.common.RemotingUtil; ...@@ -33,6 +34,7 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler; import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler;
public class DefaultMqttMessageProcessor implements RequestProcessor { public class DefaultMqttMessageProcessor implements RequestProcessor {
...@@ -44,26 +46,24 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { ...@@ -44,26 +46,24 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
@Override @Override
public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message) public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message)
throws RemotingCommandException { throws RemotingCommandException, UnsupportedEncodingException {
//TODO
MqttHeader mqttHeader = (MqttHeader) message.decodeCommandCustomHeader(MqttHeader.class); MqttHeader mqttHeader = (MqttHeader) message.decodeCommandCustomHeader(MqttHeader.class);
MqttFixedHeader fixedHeader = new MqttFixedHeader(mqttHeader.getMessageType(), MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.valueOf(mqttHeader.getMessageType()),
mqttHeader.isDup(), mqttHeader.getQosLevel(), mqttHeader.isRetain(), mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()); mqttHeader.getRemainingLength());
MqttMessage mqttMessage = null; MqttMessage mqttMessage = null;
switch (mqttHeader.getMessageType()) { switch (fixedHeader.messageType()) {
case CONNECT: case CONNECT:
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader( MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
mqttHeader.getName(), mqttHeader.getVersion(), mqttHeader.isHasUserName(), mqttHeader.getName(), mqttHeader.getVersion(), mqttHeader.isHasUserName(),
mqttHeader.isHasPassword(), mqttHeader.isWillRetain(), mqttHeader.isHasPassword(), mqttHeader.isWillRetain(),
mqttHeader.getWillQos(), mqttHeader.isWillFlag(), mqttHeader.getWillQos(), mqttHeader.isWillFlag(),
mqttHeader.isCleanSession(), mqttHeader.getKeepAliveTimeSeconds()); mqttHeader.isCleanSession(), mqttHeader.getKeepAliveTimeSeconds());
MqttConnectPayload payload = decode(message.getBody(), MqttConnectPayload.class); RocketMQMqttConnectPayload payload = decode(message.getBody(), RocketMQMqttConnectPayload.class);
mqttMessage = new MqttConnectMessage(fixedHeader, variableHeader, payload); mqttMessage = new MqttConnectMessage(fixedHeader, variableHeader, payload.toMqttConnectPayload());
case DISCONNECT: case DISCONNECT:
} }
return type2handler.get(mqttHeader.getMessageType()).handleMessage(mqttMessage, remotingChannel); return type2handler.get(MqttMessageType.valueOf(mqttHeader.getMessageType())).handleMessage(mqttMessage, remotingChannel);
} }
@Override @Override
......
...@@ -35,18 +35,12 @@ public class MqttConnectMessageHandler implements MessageHandler { ...@@ -35,18 +35,12 @@ public class MqttConnectMessageHandler implements MessageHandler {
private static final int MIN_AVAILABLE_VERSION = 3; private static final int MIN_AVAILABLE_VERSION = 3;
private static final int MAX_AVAILABLE_VERSION = 4; private static final int MAX_AVAILABLE_VERSION = 4;
/* private ClientManager clientManager;
public MqttConnectMessageHandler(ClientManager clientManager) {
this.clientManager = clientManager;
}*/
public MqttConnectMessageHandler(SnodeController snodeController) { public MqttConnectMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController; this.snodeController = snodeController;
} }
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) { @Override
// MqttClient client = (MqttClient) message.getClient(); public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
if (!(message instanceof MqttConnectMessage)) { if (!(message instanceof MqttConnectMessage)) {
return null; return null;
} }
......
...@@ -26,26 +26,18 @@ public class MqttDisconnectMessageHandler implements MessageHandler { ...@@ -26,26 +26,18 @@ public class MqttDisconnectMessageHandler implements MessageHandler {
private final SnodeController snodeController; private final SnodeController snodeController;
/* private ClientManager clientManager;
public MqttDisconnectMessageHandler(SnodeController snodeController) {
public MqttDisconnectMessageHandler(ClientManager clientManager) { this.snodeController = snodeController;
this.clientManager = clientManager; }
}*/
public MqttDisconnectMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
/** /**
* handle the DISCONNECT message from the client * handle the DISCONNECT message from the client <ol> <li>discard the Will Message and Will
* <ol> * Topic</li> <li>remove the client from the ClientManager</li> <li>disconnect the
* <li>discard the Will Message and Will Topic</li> * connection</li> </ol>
* <li>remove the client from the ClientManager</li>
* <li>disconnect the connection</li>
* </ol>
* @param message
* @return
*/ */
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) { @Override
public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
// TODO discard the Will Message and Will Topic // TODO discard the Will Message and Will Topic
return null; return null;
} }
......
...@@ -24,17 +24,14 @@ import org.apache.rocketmq.snode.SnodeController; ...@@ -24,17 +24,14 @@ import org.apache.rocketmq.snode.SnodeController;
public class MqttPublishMessageHandler implements MessageHandler { public class MqttPublishMessageHandler implements MessageHandler {
/* private MessageStore messageStore; private final SnodeController snodeController;
public MqttPublishMessageHandler(MessageStore messageStore) {
this.messageStore = messageStore;
}*/
private final SnodeController snodeController;
public MqttPublishMessageHandler(SnodeController snodeController) { public MqttPublishMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController; this.snodeController = snodeController;
} }
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
@Override
public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null; return null;
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor;
import com.alibaba.fastjson.JSON;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMqttMessageProcessorTest {
private DefaultMqttMessageProcessor defaultMqttMessageProcessor;
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
@Mock
private RemotingChannel remotingChannel;
private String topic = "SnodeTopic";
private String group = "SnodeGroup";
private String enodeName = "enodeName";
@Before
public void init() {
defaultMqttMessageProcessor = new DefaultMqttMessageProcessor();
}
@Test
public void testProcessRequest() throws RemotingCommandException, UnsupportedEncodingException {
RemotingCommand request = createMqttConnectMesssageCommand();
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.CONNECT, new MqttConnectMessageHandler(snodeController));
defaultMqttMessageProcessor.processRequest(remotingChannel, request);
}
private MqttHeader createMqttConnectMesssageHeader() {
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setMessageType(MqttMessageType.CONNECT.value());
mqttHeader.setDup(false);
mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
mqttHeader.setRetain(false);
mqttHeader.setRemainingLength(200);
mqttHeader.setName("MQTT");
mqttHeader.setVersion(4);
mqttHeader.setHasUserName(false);
mqttHeader.setHasPassword(false);
mqttHeader.setWillRetain(false);
mqttHeader.setWillQos(0);
mqttHeader.setWillFlag(false);
mqttHeader.setCleanSession(false);
mqttHeader.setKeepAliveTimeSeconds(60);
return mqttHeader;
}
private RemotingCommand createMqttConnectMesssageCommand() {
MqttHeader mqttHeader = createMqttConnectMesssageHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader);
MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes());
request.setBody(RocketMQMqttConnectPayload.fromMqttConnectPayload(payload).encode());
CodecHelper.makeCustomHeaderToNet(request);
return request;
}
private byte[] encode(Object obj) {
String json = JSON.toJSONString(obj, false);
if (json != null) {
return json.getBytes(Charset.forName(RemotingUtil.REMOTING_CHARSET));
}
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class MqttConnectMessageHandlerTest {
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册