未验证 提交 b3dfa1c4 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #733 from xiangwangcheng/snode

[ISSUE #730]refactor Remoting module to support mqtt encode/decode and implement part of logic of mqtt CONNECT request
......@@ -180,4 +180,6 @@ public class RequestCode {
public static final int MQTT_MESSAGE = 1000;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*//*
*/
/**
* $Id: EndTransactionResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*//*
package org.apache.rocketmq.common.protocol.header.mqtt;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class MqttHeader implements CommandCustomHeader {
//fix header members
@CFNotNull
private MqttMessageType messageType;
@CFNotNull
private boolean isDup;
@CFNotNull
private MqttQoS qosLevel;
@CFNotNull
private boolean isRetain;
@CFNotNull
private int remainingLength;
//variable header members
private MqttConnectReturnCode connectReturnCode;
private boolean sessionPresent;
private String name;
private Integer version;
private boolean hasUserName;
private boolean hasPassword;
private boolean isWillRetain;
private Integer willQos;
private boolean isWillFlag;
private boolean isCleanSession;
private Integer keepAliveTimeSeconds;
private Integer messageId;
private String topicName;
private Integer packetId;
public MqttMessageType getMessageType() {
return messageType;
}
public void setMessageType(MqttMessageType messageType) {
this.messageType = messageType;
}
public boolean isDup() {
return isDup;
}
public void setDup(boolean dup) {
isDup = dup;
}
public MqttQoS getQosLevel() {
return qosLevel;
}
public void setQosLevel(MqttQoS qosLevel) {
this.qosLevel = qosLevel;
}
public boolean isRetain() {
return isRetain;
}
public void setRetain(boolean retain) {
isRetain = retain;
}
public int getRemainingLength() {
return remainingLength;
}
public void setRemainingLength(int remainingLength) {
this.remainingLength = remainingLength;
}
public MqttConnectReturnCode getConnectReturnCode() {
return connectReturnCode;
}
public void setConnectReturnCode(MqttConnectReturnCode connectReturnCode) {
this.connectReturnCode = connectReturnCode;
}
public boolean isSessionPresent() {
return sessionPresent;
}
public void setSessionPresent(boolean sessionPresent) {
this.sessionPresent = sessionPresent;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getVersion() {
return version;
}
public void setVersion(Integer version) {
this.version = version;
}
public boolean isHasUserName() {
return hasUserName;
}
public void setHasUserName(boolean hasUserName) {
this.hasUserName = hasUserName;
}
public boolean isHasPassword() {
return hasPassword;
}
public void setHasPassword(boolean hasPassword) {
this.hasPassword = hasPassword;
}
public boolean isWillRetain() {
return isWillRetain;
}
public void setWillRetain(boolean willRetain) {
isWillRetain = willRetain;
}
public Integer getWillQos() {
return willQos;
}
public void setWillQos(Integer willQos) {
this.willQos = willQos;
}
public boolean isWillFlag() {
return isWillFlag;
}
public void setWillFlag(boolean willFlag) {
isWillFlag = willFlag;
}
public boolean isCleanSession() {
return isCleanSession;
}
public void setCleanSession(boolean cleanSession) {
isCleanSession = cleanSession;
}
public Integer getKeepAliveTimeSeconds() {
return keepAliveTimeSeconds;
}
public void setKeepAliveTimeSeconds(Integer keepAliveTimeSeconds) {
this.keepAliveTimeSeconds = keepAliveTimeSeconds;
}
public Integer getMessageId() {
return messageId;
}
public void setMessageId(Integer messageId) {
this.messageId = messageId;
}
public String getTopicName() {
return topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
public Integer getPacketId() {
return packetId;
}
public void setPacketId(Integer packetId) {
this.packetId = packetId;
}
@Override
public void checkFields() throws RemotingCommandException {
}
}
*/
......@@ -39,7 +39,7 @@ public class RemotingServerFactory {
}
public RemotingServer createRemotingServer(String protocol) {
return ServiceProvider.createInstance(protocolPathMap.get(protocol), RemotingClient.class);
return ServiceProvider.createInstance(protocolPathMap.get(protocol), RemotingServer.class);
}
public RemotingServer createRemotingServer() {
......
......@@ -20,6 +20,7 @@ import org.apache.rocketmq.remoting.netty.NettySystemConfig;
public class ServerConfig implements Cloneable {
private int listenPort = 8888;
private int mqttListenPort = 1883;
private int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 8;
private int serverSelectorThreads = 3;
......@@ -75,6 +76,14 @@ public class ServerConfig implements Cloneable {
this.listenPort = listenPort;
}
public int getMqttListenPort() {
return mqttListenPort;
}
public void setMqttListenPort(int mqttListenPort) {
this.mqttListenPort = mqttListenPort;
}
public int getServerWorkerThreads() {
return serverWorkerThreads;
}
......
......@@ -43,6 +43,7 @@ public class RemotingUtil {
private static boolean isWindowsPlatform = false;
public static final String DEFAULT_PROTOCOL = "rocketmq";
public static final String HTTP2_PROTOCOL = "http2";
public static final String MQTT_PROTOCOL = "mqtt";
public static final String REMOTING_CHARSET = "UTF-8";
static {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.channel.Channel;
public class DisconnectChannelEvent {
private Channel channel;
public DisconnectChannelEvent(Channel channel) {
this.channel = channel;
}
public Channel getChannel() {
return channel;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* $Id: EndTransactionResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class MqttHeader implements CommandCustomHeader {
//fix header members
@CFNotNull
private Integer messageType;
@CFNotNull
private boolean isDup;
@CFNotNull
private Integer qosLevel;
@CFNotNull
private boolean isRetain;
@CFNotNull
private int remainingLength;
//variable header members
private MqttConnectReturnCode connectReturnCode;
private boolean sessionPresent;
private String name;
private Integer version;
private boolean hasUserName;
private boolean hasPassword;
private boolean isWillRetain;
private Integer willQos;
private boolean isWillFlag;
private boolean isCleanSession;
private Integer keepAliveTimeSeconds;
private Integer messageId;
private String topicName;
private Integer packetId;
public Integer getMessageType() {
return messageType;
}
public void setMessageType(Integer messageType) {
this.messageType = messageType;
}
public boolean isDup() {
return isDup;
}
public void setDup(boolean dup) {
isDup = dup;
}
public Integer getQosLevel() {
return qosLevel;
}
public void setQosLevel(Integer qosLevel) {
this.qosLevel = qosLevel;
}
public boolean isRetain() {
return isRetain;
}
public void setRetain(boolean retain) {
isRetain = retain;
}
public int getRemainingLength() {
return remainingLength;
}
public void setRemainingLength(int remainingLength) {
this.remainingLength = remainingLength;
}
public MqttConnectReturnCode getConnectReturnCode() {
return connectReturnCode;
}
public void setConnectReturnCode(MqttConnectReturnCode connectReturnCode) {
this.connectReturnCode = connectReturnCode;
}
public boolean isSessionPresent() {
return sessionPresent;
}
public void setSessionPresent(boolean sessionPresent) {
this.sessionPresent = sessionPresent;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getVersion() {
return version;
}
public void setVersion(Integer version) {
this.version = version;
}
public boolean isHasUserName() {
return hasUserName;
}
public void setHasUserName(boolean hasUserName) {
this.hasUserName = hasUserName;
}
public boolean isHasPassword() {
return hasPassword;
}
public void setHasPassword(boolean hasPassword) {
this.hasPassword = hasPassword;
}
public boolean isWillRetain() {
return isWillRetain;
}
public void setWillRetain(boolean willRetain) {
isWillRetain = willRetain;
}
public Integer getWillQos() {
return willQos;
}
public void setWillQos(Integer willQos) {
this.willQos = willQos;
}
public boolean isWillFlag() {
return isWillFlag;
}
public void setWillFlag(boolean willFlag) {
isWillFlag = willFlag;
}
public boolean isCleanSession() {
return isCleanSession;
}
public void setCleanSession(boolean cleanSession) {
isCleanSession = cleanSession;
}
public Integer getKeepAliveTimeSeconds() {
return keepAliveTimeSeconds;
}
public void setKeepAliveTimeSeconds(Integer keepAliveTimeSeconds) {
this.keepAliveTimeSeconds = keepAliveTimeSeconds;
}
public Integer getMessageId() {
return messageId;
}
public void setMessageId(Integer messageId) {
this.messageId = messageId;
}
public String getTopicName() {
return topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
public Integer getPacketId() {
return packetId;
}
public void setPacketId(Integer packetId) {
this.packetId = packetId;
}
@Override
public void checkFields() throws RemotingCommandException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.mqtt.MqttMessage;
import java.util.List;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.dispatcher.EncodeDecodeDispatcher;
import org.apache.rocketmq.remoting.transport.mqtt.dispatcher.Message2MessageEncodeDecode;
public class MqttMessage2RemotingCommandHandler extends MessageToMessageDecoder<MqttMessage> {
/**
* Decode from one message to an other. This method will be called for each written message that
* can be handled by this encoder.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageDecoder}
* belongs to
* @param msg the message to decode to an other one
* @param out the {@link List} to which decoded messages should be added
* @throws Exception is thrown if an error occurs
*/
@Override
protected void decode(ChannelHandlerContext ctx, MqttMessage msg, List<Object> out)
throws Exception {
if (!(msg instanceof MqttMessage)) {
return;
}
RemotingCommand requestCommand = null;
Message2MessageEncodeDecode message2MessageEncodeDecode = EncodeDecodeDispatcher
.getEncodeDecodeDispatcher().get(msg.fixedHeader().messageType());
if (message2MessageEncodeDecode != null) {
requestCommand = message2MessageEncodeDecode.decode(msg);
}
out.add(requestCommand);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.io.IOException;
import java.security.cert.CertificateException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.netty.TlsHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.NettyRemotingClientAbstract;
import org.apache.rocketmq.remoting.util.ThreadUtils;
public class MqttRemotingClient extends NettyRemotingClientAbstract implements RemotingClient {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private ClientConfig nettyClientConfig;
private Bootstrap bootstrap = new Bootstrap();
private EventLoopGroup eventLoopGroupWorker;
private ExecutorService publicExecutor;
/**
* Invoke the callback methods in this executor when process response.
*/
private ExecutorService callbackExecutor;
private ChannelEventListener channelEventListener;
private DefaultEventExecutorGroup defaultEventExecutorGroup;
private InterceptorGroup interceptorGroup;
public MqttRemotingClient() {
super();
}
public MqttRemotingClient(final ClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
}
public MqttRemotingClient(final ClientConfig nettyClientConfig,
final ChannelEventListener channelEventListener) {
super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
init(nettyClientConfig, channelEventListener);
}
@Override
public RemotingClient init(ClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
this.nettyClientConfig = nettyClientConfig;
this.channelEventListener = channelEventListener;
this.eventLoopGroupWorker = new NioEventLoopGroup(nettyClientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
nettyClientConfig.getClientWorkerThreads()));
this.publicExecutor = ThreadUtils.newFixedThreadPool(
nettyClientConfig.getClientCallbackExecutorThreads(),
10000, "Remoting-PublicExecutor", true);
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", nettyClientConfig.getClientWorkerThreads()));
if (nettyClientConfig.isUseTLS()) {
try {
sslContext = TlsHelper.buildSslContext(true);
log.info("SSL enabled for client");
} catch (IOException e) {
log.error("Failed to create SSLContext", e);
} catch (CertificateException e) {
log.error("Failed to create SSLContext", e);
throw new RuntimeException("Failed to create SSLContext", e);
}
}
return this;
}
@Override
public Bootstrap getBootstrap() {
return this.bootstrap;
}
@Override
public void start() {
bootstrap = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
defaultEventExecutorGroup,
MqttEncoder.INSTANCE,
new MqttDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
startUpHouseKeepingService();
}
@Override
public void shutdown() {
super.shutdown();
try {
clearChannels();
if (this.eventLoopGroupWorker != null) {
this.eventLoopGroupWorker.shutdownGracefully();
}
if (this.defaultEventExecutorGroup != null) {
this.defaultEventExecutorGroup.shutdownGracefully();
}
if (this.publicExecutor != null) {
this.publicExecutor.shutdown();
}
} catch (Exception e) {
log.error("NettyRemotingClient shutdown exception, ", e);
}
}
@Override
public void registerInterceptorGroup(InterceptorGroup interceptorGroup) {
this.interceptorGroup = interceptorGroup;
}
@Override
public void updateNameServerAddressList(List<String> addrs) {
super.updateNameServerAddressList(addrs);
}
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
final RemotingChannel remotingChannel = this.getAndCreateChannel(addr, timeoutMillis);
if (remotingChannel != null && remotingChannel.isActive()) {
try {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("InvokeSync call timeout");
}
RemotingCommand response = this.invokeSyncWithInterceptor(remotingChannel, request, timeoutMillis - costTime);
return response;
} catch (RemotingException remotingException) {
if (remotingException instanceof RemotingSendRequestException) {
log.warn("InvokeSync: send request exception, so close the channel[{}]", addr);
this.closeRemotingChannel(addr, remotingChannel);
throw (RemotingSendRequestException) remotingException;
}
if (remotingException instanceof RemotingTimeoutException) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeRemotingChannel(addr, remotingChannel);
log.warn("InvokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("InvokeSync: wait response timeout exception, the channel[{}]", addr);
throw (RemotingTimeoutException) remotingException;
}
}
} else {
this.closeRemotingChannel(addr, remotingChannel);
throw new RemotingConnectException(addr);
}
return null;
}
@Override
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
final RemotingChannel remotingChannel = this.getAndCreateChannel(addr, timeoutMillis);
Channel channel = null;
if (remotingChannel instanceof NettyChannelImpl) {
channel = ((NettyChannelImpl) remotingChannel).getChannel();
}
if (channel != null && channel.isActive()) {
try {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("InvokeAsync call timeout");
}
this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
} catch (RemotingSendRequestException e) {
log.warn("InvokeAsync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
@Override
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
final RemotingChannel remotingChannel = getAndCreateChannel(addr, timeoutMillis);
if (remotingChannel != null && remotingChannel.isActive()) {
try {
this.invokeOnewayWithInterceptor(remotingChannel, request, timeoutMillis);
} catch (RemotingSendRequestException e) {
log.warn("InvokeOneway: send request exception, so close the channel[{}]", addr);
this.closeRemotingChannel(addr, remotingChannel);
throw e;
}
} else {
this.closeRemotingChannel(addr, remotingChannel);
throw new RemotingConnectException(addr);
}
}
@Override
public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = this.publicExecutor;
}
Pair<RequestProcessor, ExecutorService> pair = new Pair<>(processor, executorThis);
this.processorTable.put(requestCode, pair);
}
@Override
public List<String> getNameServerAddressList() {
return this.namesrvAddrList.get();
}
@Override
public ChannelEventListener getChannelEventListener() {
return channelEventListener;
}
@Override
public InterceptorGroup getInterceptorGroup() {
return this.interceptorGroup;
}
@Override
public ExecutorService getCallbackExecutor() {
return callbackExecutor != null ? callbackExecutor : publicExecutor;
}
@Override
public void setCallbackExecutor(final ExecutorService callbackExecutor) {
this.callbackExecutor = callbackExecutor;
}
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.cert.CertificateException;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.netty.FileRegionEncoder;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.netty.TlsHelper;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.NettyRemotingServerAbstract;
import org.apache.rocketmq.remoting.util.JvmUtils;
import org.apache.rocketmq.remoting.util.ThreadUtils;
public class MqttRemotingServer extends NettyRemotingServerAbstract implements RemotingServer {
private static final InternalLogger log = InternalLoggerFactory
.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private ServerBootstrap serverBootstrap;
private EventLoopGroup eventLoopGroupSelector;
private EventLoopGroup eventLoopGroupBoss;
private ServerConfig nettyServerConfig;
private ExecutorService publicExecutor;
private ChannelEventListener channelEventListener;
private DefaultEventExecutorGroup defaultEventExecutorGroup;
private Class<? extends ServerSocketChannel> socketChannelClass;
private int port = 1883;
private InterceptorGroup interceptorGroup;
private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
private static final String TLS_HANDLER_NAME = "sslHandler";
private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
public MqttRemotingServer() {
super();
}
public MqttRemotingServer(final ServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
}
public MqttRemotingServer(final ServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
init(nettyServerConfig, channelEventListener);
}
@Override
public RemotingServer init(ServerConfig serverConfig,
ChannelEventListener channelEventListener) {
this.nettyServerConfig = serverConfig;
super.init(nettyServerConfig.getServerOnewaySemaphoreValue(),
nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
this.publicExecutor = ThreadUtils.newFixedThreadPool(
publicThreadNums,
10000, "Remoting-PublicExecutor", true);
if (JvmUtils.isUseEpoll() && this.nettyServerConfig.isUseEpollNativeSelector()) {
this.eventLoopGroupSelector = new EpollEventLoopGroup(
serverConfig.getServerSelectorThreads(),
ThreadUtils.newGenericThreadFactory("NettyEpollIoThreads",
serverConfig.getServerSelectorThreads()));
this.eventLoopGroupBoss = new EpollEventLoopGroup(
serverConfig.getServerAcceptorThreads(),
ThreadUtils.newGenericThreadFactory("NettyBossThreads",
serverConfig.getServerAcceptorThreads()));
this.socketChannelClass = EpollServerSocketChannel.class;
} else {
this.eventLoopGroupBoss = new NioEventLoopGroup(serverConfig.getServerAcceptorThreads(),
ThreadUtils.newGenericThreadFactory("NettyBossThreads",
serverConfig.getServerAcceptorThreads()));
this.eventLoopGroupSelector = new NioEventLoopGroup(
serverConfig.getServerSelectorThreads(),
ThreadUtils.newGenericThreadFactory("NettyNioIoThreads",
serverConfig.getServerSelectorThreads()));
this.socketChannelClass = NioServerSocketChannel.class;
}
this.port = nettyServerConfig.getMqttListenPort();
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
serverConfig.getServerWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyWorkerThreads",
serverConfig.getServerWorkerThreads()));
loadSslContext();
return this;
}
public void loadSslContext() {
TlsMode tlsMode = TlsSystemConfig.tlsMode;
log.info("Server is running in TLS {} mode", tlsMode.getName());
if (tlsMode != TlsMode.DISABLED) {
try {
sslContext = TlsHelper.buildSslContext(false);
log.info("SSLContext created for server");
} catch (CertificateException e) {
log.error("Failed to create SSLContext for server", e);
} catch (IOException e) {
log.error("Failed to create SSLContext for server", e);
}
}
}
@Override
public void start() {
super.start();
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(socketChannelClass)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF,
nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF,
nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new MqttDecoder(),
MqttEncoder.INSTANCE,
new MqttMessage2RemotingCommandHandler(),
new RemotingCommand2MqttMessageHandler(),
new IdleStateHandler(nettyServerConfig
.getConnectionChannelReaderIdleSeconds(),
nettyServerConfig
.getConnectionChannelWriterIdleSeconds(),
nettyServerConfig
.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException",
e1);
}
startUpHouseKeepingService();
registerMessageHandler();
}
private void registerMessageHandler() {
}
@Override
public void shutdown() {
try {
super.shutdown();
if (this.eventLoopGroupBoss != null) {
this.eventLoopGroupBoss.shutdownGracefully();
}
if (this.eventLoopGroupSelector != null) {
this.eventLoopGroupSelector.shutdownGracefully();
}
if (this.defaultEventExecutorGroup != null) {
this.defaultEventExecutorGroup.shutdownGracefully();
}
} catch (Exception e) {
log.error("NettyRemotingServer shutdown exception, ", e);
}
if (this.publicExecutor != null) {
try {
this.publicExecutor.shutdown();
} catch (Exception e) {
log.error("NettyRemotingServer shutdown exception, ", e);
}
}
}
@Override
public void registerInterceptorGroup(InterceptorGroup interceptorGroup) {
this.interceptorGroup = interceptorGroup;
}
@Override
public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) {
executor = executor == null ? this.publicExecutor : executor;
registerNettyProcessor(requestCode, processor, executor);
}
@Override
public void registerDefaultProcessor(RequestProcessor processor, ExecutorService executor) {
this.defaultRequestProcessor = new Pair<>(processor, executor);
}
@Override
public int localListenPort() {
return this.port;
}
@Override
public Pair<RequestProcessor, ExecutorService> getProcessorPair(int requestCode) {
return processorTable.get(requestCode);
}
@Override
public RemotingCommand invokeSync(final RemotingChannel remotingChannel,
final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
return this.invokeSyncImpl(((NettyChannelImpl) remotingChannel).getChannel(), request,
timeoutMillis);
}
@Override
public void invokeAsync(RemotingChannel remotingChannel, RemotingCommand request,
long timeoutMillis,
InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
this.invokeAsyncImpl(((NettyChannelImpl) remotingChannel).getChannel(), request,
timeoutMillis, invokeCallback);
}
@Override
public void invokeOneway(RemotingChannel remotingChannel, RemotingCommand request,
long timeoutMillis) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
this.invokeOnewayImpl(((NettyChannelImpl) remotingChannel).getChannel(), request,
timeoutMillis);
}
@Override
public ChannelEventListener getChannelEventListener() {
return this.channelEventListener;
}
@Override
public InterceptorGroup getInterceptorGroup() {
return this.interceptorGroup;
}
@Override
protected RemotingChannel getAndCreateChannel(String addr, long timeout)
throws InterruptedException {
return null;
}
@Override
public ExecutorService getCallbackExecutor() {
return this.publicExecutor;
}
class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final TlsMode tlsMode;
private static final byte HANDSHAKE_MAGIC_CODE = 0x16;
HandshakeHandler(TlsMode tlsMode) {
this.tlsMode = tlsMode;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// mark the current position so that we can peek the first byte to determine if the content is starting with
// TLS handshake
msg.markReaderIndex();
byte b = msg.getByte(0);
if (b == HANDSHAKE_MAGIC_CODE) {
switch (tlsMode) {
case DISABLED:
ctx.close();
log.warn(
"Clients intend to establish a SSL connection while this server is running in SSL disabled mode");
break;
case PERMISSIVE:
case ENFORCING:
if (null != sslContext) {
ctx.pipeline()
.addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
TLS_HANDLER_NAME,
sslContext.newHandler(ctx.channel().alloc()))
.addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME,
FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
log.info(
"Handlers prepended to channel pipeline to establish SSL connection");
} else {
ctx.close();
log.error(
"Trying to establish a SSL connection but sslContext is null");
}
break;
default:
log.warn("Unknown TLS mode");
break;
}
} else if (tlsMode == TlsMode.ENFORCING) {
ctx.close();
log.warn(
"Clients intend to establish an insecure connection while this server is running in SSL enforcing mode");
}
// reset the reader index so that handshake negotiation may proceed as normal.
msg.resetReaderIndex();
try {
// Remove this handler
ctx.pipeline().remove(this);
} catch (NoSuchElementException e) {
log.error("Error while removing HandshakeHandler", e);
}
// Hand over this message to the next .
ctx.fireChannelRead(msg.retain());
}
}
@Override
public void push(RemotingChannel remotingChannel, RemotingCommand request,
long timeoutMillis) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
this.invokeOneway(remotingChannel, request, timeoutMillis);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class RemotingCommand2MqttMessageHandler extends MessageToMessageEncoder<RemotingCommand> {
/**
* Encode from one message to an other. This method will be called for each written message that
* can be handled by this encoder.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageEncoder}
* belongs to
* @param msg the message to encode to an other one
* @param out the {@link List} into which the encoded msg should be added needs to do some kind
* of aggregation
* @throws Exception is thrown if an error occurs
*/
@Override
protected void encode(ChannelHandlerContext ctx, RemotingCommand msg, List<Object> out)
throws Exception {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.util.internal.StringUtil;
import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
/**
* Payload of {@link MqttConnectMessage}
*/
public final class RocketMQMqttConnectPayload extends RemotingSerializable {
private String clientIdentifier;
private String willTopic;
private String willMessage;
private String userName;
private String password;
public RocketMQMqttConnectPayload(
String clientIdentifier,
String willTopic,
String willMessage,
String userName,
String password) {
this.clientIdentifier = clientIdentifier;
this.willTopic = willTopic;
this.willMessage = willMessage;
this.userName = userName;
this.password = password;
}
public static RocketMQMqttConnectPayload fromMqttConnectPayload(MqttConnectPayload payload) {
return new RocketMQMqttConnectPayload(payload.clientIdentifier(), payload.willTopic(),
payload.willMessage(), payload.userName(), payload.password());
}
public MqttConnectPayload toMqttConnectPayload() throws UnsupportedEncodingException {
return new MqttConnectPayload(this.clientIdentifier, this.willTopic, this.willMessage.getBytes(
RemotingUtil.REMOTING_CHARSET), this.userName, this.password.getBytes(RemotingUtil.REMOTING_CHARSET));
}
public String getClientIdentifier() {
return clientIdentifier;
}
public String getWillTopic() {
return willTopic;
}
public String getWillMessage() {
return willMessage;
}
public String getUserName() {
return userName;
}
public String getPassword() {
return password;
}
public void setClientIdentifier(String clientIdentifier) {
this.clientIdentifier = clientIdentifier;
}
public void setWillTopic(String willTopic) {
this.willTopic = willTopic;
}
public void setWillMessage(String willMessage) {
this.willMessage = willMessage;
}
public void setUserName(String userName) {
this.userName = userName;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this))
.append('[')
.append("clientIdentifier=").append(clientIdentifier)
.append(", willTopic=").append(willTopic)
.append(", willMessage=").append(willMessage)
.append(", userName=").append(userName)
.append(", password=").append(password)
.append(']')
.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.handler.codec.mqtt.MqttMessageType;
import java.util.HashMap;
import java.util.Map;
public class EncodeDecodeDispatcher {
private static Map<MqttMessageType, Message2MessageEncodeDecode> encodeDecodeDispatcher = new HashMap<>();
static {
encodeDecodeDispatcher.put(MqttMessageType.CONNECT, new MqttConnectEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.CONNACK, new MqttConnectackEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.DISCONNECT, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBLISH, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBACK, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBREC, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBREL, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBCOMP, null);
encodeDecodeDispatcher.put(MqttMessageType.SUBSCRIBE, null);
encodeDecodeDispatcher.put(MqttMessageType.SUBACK, null);
encodeDecodeDispatcher.put(MqttMessageType.UNSUBSCRIBE, null);
encodeDecodeDispatcher.put(MqttMessageType.UNSUBACK, null);
encodeDecodeDispatcher.put(MqttMessageType.PINGREQ, null);
encodeDecodeDispatcher.put(MqttMessageType.PINGRESP, null);
}
public static Map<MqttMessageType, Message2MessageEncodeDecode> getEncodeDecodeDispatcher() {
return encodeDecodeDispatcher;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface Message2MessageEncodeDecode {
RemotingCommand decode(MqttMessage mqttMessage);
MqttMessage encode(RemotingCommand remotingCommand);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
public class MqttConnectEncodeDecode implements Message2MessageEncodeDecode {
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
RocketMQMqttConnectPayload payload = RocketMQMqttConnectPayload
.fromMqttConnectPayload(((MqttConnectMessage) mqttMessage).payload());
RemotingCommand requestCommand = null;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttConnectVariableHeader variableHeader = (MqttConnectVariableHeader) mqttMessage
.variableHeader();
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setMessageType(mqttFixedHeader.messageType().value());
mqttHeader.setDup(mqttFixedHeader.isDup());
mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value());
mqttHeader.setRetain(mqttFixedHeader.isRetain());
mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength());
mqttHeader.setName(variableHeader.name());
mqttHeader.setVersion(variableHeader.version());
mqttHeader.setHasUserName(variableHeader.hasUserName());
mqttHeader.setHasPassword(variableHeader.hasPassword());
mqttHeader.setWillRetain(variableHeader.isWillRetain());
mqttHeader.setWillQos(variableHeader.willQos());
mqttHeader.setWillFlag(variableHeader.isWillFlag());
mqttHeader.setCleanSession(variableHeader.isCleanSession());
mqttHeader
.setKeepAliveTimeSeconds(variableHeader.keepAliveTimeSeconds());
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
CodecHelper.makeCustomHeaderToNet(requestCommand);
requestCommand.setBody(payload.encode());
return requestCommand;
}
@Override
public MqttMessage encode(RemotingCommand remotingCommand) {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class MqttConnectackEncodeDecode implements Message2MessageEncodeDecode {
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
return null;
}
@Override
public MqttMessage encode(RemotingCommand remotingCommand) {
return null;
}
}
rocketmq=org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient
http2=org.apache.rocketmq.remoting.transport.http2.Http2ClientImpl
\ No newline at end of file
http2=org.apache.rocketmq.remoting.transport.http2.Http2ClientImpl
mqtt=org.apache.rocketmq.remoting.transport.mqtt.MqttRemotingClient
\ No newline at end of file
rocketmq=org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer
http2=org.apache.rocketmq.remoting.transport.http2.Http2ServerImpl
mqtt=org.apache.rocketmq.remoting.transport.mqtt.MqttRemotingServer
......@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.snode;
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
......@@ -41,8 +43,6 @@ import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.slf4j.LoggerFactory;
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
public class SnodeStartup {
private static InternalLogger log;
public static Properties properties = null;
......@@ -86,7 +86,7 @@ public class SnodeStartup {
final ClientConfig nettyClientConfig = new ClientConfig();
nettyServerConfig.setListenPort(snodeConfig.getListenPort());
nettyServerConfig.setListenPort(11911);
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
......
......@@ -39,6 +39,8 @@ public class Client {
private LanguageCode language;
private boolean isConnected;
public ClientRole getClientRole() {
return clientRole;
}
......@@ -59,11 +61,12 @@ public class Client {
Objects.equals(clientId, client.clientId) &&
Objects.equals(groups, client.groups) &&
Objects.equals(remotingChannel, client.remotingChannel) &&
language == client.language;
language == client.language &&
isConnected == client.isConnected();
}
@Override public int hashCode() {
return Objects.hash(clientRole, clientId, groups, remotingChannel, heartbeatInterval, lastUpdateTimestamp, version, language);
return Objects.hash(clientRole, clientId, groups, remotingChannel, heartbeatInterval, lastUpdateTimestamp, version, language, isConnected);
}
public RemotingChannel getRemotingChannel() {
......@@ -114,6 +117,14 @@ public class Client {
this.language = language;
}
public boolean isConnected() {
return isConnected;
}
public void setConnected(boolean connected) {
isConnected = connected;
}
public Set<String> getGroups() {
return groups;
}
......@@ -132,6 +143,7 @@ public class Client {
", lastUpdateTimestamp=" + lastUpdateTimestamp +
", version=" + version +
", language=" + language +
", isConnected=" + isConnected +
'}';
}
}
......
......@@ -32,21 +32,25 @@ public class ClientHousekeepingService implements ChannelEventListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ClientManager producerManager;
private final ClientManager consumerManager;
private final ClientManager iotClientManager;
public ClientHousekeepingService(final ClientManager producerManager,
final ClientManager consumerManager) {
final ClientManager consumerManager, final ClientManager iotClientManager) {
this.producerManager = producerManager;
this.consumerManager = consumerManager;
this.iotClientManager = iotClientManager;
}
public void start(long interval) {
this.producerManager.startScan(interval);
this.consumerManager.startScan(interval);
this.iotClientManager.startScan(interval);
}
public void shutdown() {
this.producerManager.shutdown();
this.consumerManager.shutdown();
this.iotClientManager.shutdown();
}
private ClientRole clientRole(RemotingChannel remotingChannel) {
......@@ -74,6 +78,9 @@ public class ClientHousekeepingService implements ChannelEventListener {
case Producer:
this.producerManager.onClose(remoteAddress, remotingChannel);
return;
case IOTCLIENT:
this.iotClientManager.onClose(remoteAddress, remotingChannel);
return;
default:
}
}
......
......@@ -30,6 +30,8 @@ public interface ClientManager {
List<String> getAllClientId(String groupId);
Client getClient(String groupId, RemotingChannel remotingChannel);
void startScan(long interval);
void shutdown();
......
......@@ -35,12 +35,15 @@ import org.apache.rocketmq.snode.client.ClientManager;
public abstract class ClientManagerImpl implements ClientManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory
.getLogger(LoggerName.SNODE_LOGGER_NAME);
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
.newSingleThreadScheduledExecutor(
new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
private final ConcurrentHashMap<String/*Producer or Consumer Group*/, ConcurrentHashMap<RemotingChannel, Client>> groupClientTable = new ConcurrentHashMap<>(1024);
private final ConcurrentHashMap<String/*Producer or Consumer Group*/, ConcurrentHashMap<RemotingChannel, Client>> groupClientTable = new ConcurrentHashMap<>(
1024);
public abstract void onClosed(String group, RemotingChannel remotingChannel);
......@@ -72,7 +75,8 @@ public abstract class ClientManagerImpl implements ClientManager {
while (iterator.hasNext()) {
Map.Entry entry = (Map.Entry) iterator.next();
String group = (String) entry.getKey();
ConcurrentHashMap<RemotingChannel, Client> channelTable = (ConcurrentHashMap<RemotingChannel, Client>) entry.getValue();
ConcurrentHashMap<RemotingChannel, Client> channelTable = (ConcurrentHashMap<RemotingChannel, Client>) entry
.getValue();
Iterator iter = channelTable.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry channelTableEntry = (Map.Entry) iter.next();
......@@ -81,11 +85,15 @@ public abstract class ClientManagerImpl implements ClientManager {
if (interval > CHANNEL_EXPIRED_TIMEOUT) {
iter.remove();
client.getRemotingChannel().close();
log.warn("SCAN: Remove expired channel from {}ClientTable. channel={}, group={}", client.getClientRole(),
RemotingHelper.parseChannelRemoteAddr(client.getRemotingChannel().remoteAddress()), group);
log.warn(
"SCAN: Remove expired channel from {}ClientTable. channel={}, group={}",
client.getClientRole(),
RemotingHelper.parseChannelRemoteAddr(
client.getRemotingChannel().remoteAddress()), group);
if (channelTable.isEmpty()) {
iterator.remove();
log.warn("SCAN: Remove group={} channel from {}ClientTable.", group, client.getClientRole());
log.warn("SCAN: Remove group={} channel from {}ClientTable.", group,
client.getClientRole());
}
}
}
......@@ -107,22 +115,25 @@ public abstract class ClientManagerImpl implements ClientManager {
if (oldClient == null) {
Client prev = channelTable.put(client.getRemotingChannel(), client);
if (prev != null) {
log.info("New client connected, group: {} {} {} channel: {}", groupId, client.toString());
log.info("New client connected, group: {} {} {} channel: {}", groupId,
client.toString());
updated = true;
}
oldClient = client;
} else {
if (!oldClient.getClientId().equals(client.getClientId())) {
log.error("[BUG] client channel exist in snode, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
groupId,
oldClient.toString(),
channelTable.toString());
log.error(
"[BUG] client channel exist in snode, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
groupId,
oldClient.toString(),
channelTable.toString());
channelTable.put(client.getRemotingChannel(), client);
}
}
oldClient.setLastUpdateTimestamp(System.currentTimeMillis());
}
log.debug("Register client role: {}, group: {}, last: {}", client.getClientRole(), groupId, client.getLastUpdateTimestamp());
log.debug("Register client role: {}, group: {}, last: {}", client.getClientRole(), groupId,
client.getLastUpdateTimestamp());
onRegister(groupId, client.getRemotingChannel());
return updated;
}
......@@ -136,7 +147,8 @@ public abstract class ClientManagerImpl implements ClientManager {
}
if (channelTable.isEmpty()) {
groupClientTable.remove(groupId);
log.info("Unregister client ok, no any connection, and remove consumer group, {}", groupId);
log.info("Unregister client ok, no any connection, and remove consumer group, {}",
groupId);
}
}
}
......@@ -171,7 +183,8 @@ public abstract class ClientManagerImpl implements ClientManager {
List<String> result = new ArrayList<>();
Map<RemotingChannel, Client> channelClientMap = this.groupClientTable.get(groupId);
if (channelClientMap != null) {
Iterator<Map.Entry<RemotingChannel, Client>> it = channelClientMap.entrySet().iterator();
Iterator<Map.Entry<RemotingChannel, Client>> it = channelClientMap.entrySet()
.iterator();
while (it.hasNext()) {
Map.Entry<RemotingChannel, Client> entry = it.next();
Client client = entry.getValue();
......@@ -180,4 +193,15 @@ public abstract class ClientManagerImpl implements ClientManager {
}
return result;
}
@Override
public Client getClient(String groupId, RemotingChannel remotingChannel) {
assert groupId != null && remotingChannel != null;
if (!groupClientTable.containsKey(groupId)) {
return null;
}
ConcurrentHashMap<RemotingChannel, Client> channelClientMap = groupClientTable
.get(groupId);
return channelClientMap.get(remotingChannel);
}
}
......@@ -17,9 +17,17 @@
package org.apache.rocketmq.snode.client.impl;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.snode.SnodeController;
public class IOTClientManagerImpl extends ClientManagerImpl {
public static final String IOTGROUP = "IOTGROUP";
private final SnodeController snodeController;
public IOTClientManagerImpl(SnodeController snodeController) {
this.snodeController = snodeController;
}
@Override
public void onClosed(String group, RemotingChannel remotingChannel) {
......@@ -33,4 +41,8 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
@Override public void onRegister(String group, RemotingChannel remotingChannel) {
}
public SnodeController getSnodeController() {
return snodeController;
}
}
......@@ -24,7 +24,6 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY;
public class SnodeConfig {
......@@ -51,10 +50,16 @@ public class SnodeConfig {
private int snodeSendThreadPoolQueueCapacity = 10000;
private int snodeHandleMqttThreadPoolQueueCapacity = 10000;
private int snodeSendMessageMinPoolSize = 10;
private int snodeSendMessageMaxPoolSize = 20;
private int snodeHandleMqttMessageMinPoolSize = 10;
private int snodeHandleMqttMessageMaxPoolSize = 20;
private int snodeHeartBeatCorePoolSize = 1;
private int snodeHeartBeatMaxPoolSize = 2;
......@@ -211,6 +216,15 @@ public class SnodeConfig {
this.snodeSendThreadPoolQueueCapacity = snodeSendThreadPoolQueueCapacity;
}
public int getSnodeHandleMqttThreadPoolQueueCapacity() {
return snodeHandleMqttThreadPoolQueueCapacity;
}
public void setSnodeHandleMqttThreadPoolQueueCapacity(
int snodeHandleMqttThreadPoolQueueCapacity) {
this.snodeHandleMqttThreadPoolQueueCapacity = snodeHandleMqttThreadPoolQueueCapacity;
}
public int getSnodeSendMessageMinPoolSize() {
return snodeSendMessageMinPoolSize;
}
......@@ -251,6 +265,22 @@ public class SnodeConfig {
this.snodeId = snodeId;
}
public int getSnodeHandleMqttMessageMinPoolSize() {
return snodeHandleMqttMessageMinPoolSize;
}
public void setSnodeHandleMqttMessageMinPoolSize(int snodeHandleMqttMessageMinPoolSize) {
this.snodeHandleMqttMessageMinPoolSize = snodeHandleMqttMessageMinPoolSize;
}
public int getSnodeHandleMqttMessageMaxPoolSize() {
return snodeHandleMqttMessageMaxPoolSize;
}
public void setSnodeHandleMqttMessageMaxPoolSize(int snodeHandleMqttMessageMaxPoolSize) {
this.snodeHandleMqttMessageMaxPoolSize = snodeHandleMqttMessageMaxPoolSize;
}
public String getSnodeName() {
return snodeName;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.constant;
public class MqttConstant {
public static final int MAX_SUPPORTED_QOS = 0;
public static final String SUBSCRIPTION_FLAG_PLUS = "+";
public static final String SUBSCRIPTION_FLAG_SHARP = "#";
public static final String SUBSCRIPTION_SEPARATOR = "/";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor;
import com.alibaba.fastjson.JSON;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler;
public class DefaultMqttMessageProcessor implements RequestProcessor {
private Map<MqttMessageType, MessageHandler> type2handler = new HashMap<>();
private static final int MIN_AVAILABLE_VERSION = 3;
private static final int MAX_AVAILABLE_VERSION = 4;
@Override
public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message)
throws RemotingCommandException, UnsupportedEncodingException {
MqttHeader mqttHeader = (MqttHeader) message.decodeCommandCustomHeader(MqttHeader.class);
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.valueOf(mqttHeader.getMessageType()),
mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength());
MqttMessage mqttMessage = null;
switch (fixedHeader.messageType()) {
case CONNECT:
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
mqttHeader.getName(), mqttHeader.getVersion(), mqttHeader.isHasUserName(),
mqttHeader.isHasPassword(), mqttHeader.isWillRetain(),
mqttHeader.getWillQos(), mqttHeader.isWillFlag(),
mqttHeader.isCleanSession(), mqttHeader.getKeepAliveTimeSeconds());
RocketMQMqttConnectPayload payload = decode(message.getBody(), RocketMQMqttConnectPayload.class);
mqttMessage = new MqttConnectMessage(fixedHeader, variableHeader, payload.toMqttConnectPayload());
case DISCONNECT:
}
return type2handler.get(MqttMessageType.valueOf(mqttHeader.getMessageType())).handleMessage(mqttMessage, remotingChannel);
}
@Override
public boolean rejectRequest() {
return false;
}
private <T> T decode(final byte[] data, Class<T> classOfT) {
final String json = new String(data, Charset.forName(RemotingUtil.REMOTING_CHARSET));
return JSON.parseObject(json, classOfT);
}
public void registerMessageHanlder(MqttMessageType type, MessageHandler handler) {
type2handler.put(type, handler);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface MessageHandler {
/**
* Handle message from client
*
* @param message
*/
RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.ClientManager;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
public class MqttConnectMessageHandler implements MessageHandler {
private final SnodeController snodeController;
private static final int MIN_AVAILABLE_VERSION = 3;
private static final int MAX_AVAILABLE_VERSION = 4;
public MqttConnectMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
@Override
public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
if (!(message instanceof MqttConnectMessage)) {
return null;
}
MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) message;
MqttConnectPayload payload = mqttConnectMessage.payload();
MqttConnectReturnCode returnCode;
MqttConnAckMessage ackMessage;
if (isConnected(remotingChannel, mqttConnectMessage.payload().clientIdentifier())) {
}
// ChannelHandlerContext ctx = client.getCtx();
return null;
}
private boolean isConnected(RemotingChannel remotingChannel, String clientId) {
ClientManager iotClientManager = snodeController.getIotClientManager();
Client client = iotClientManager.getClient(IOTClientManagerImpl.IOTGROUP, remotingChannel);
if (client != null && client.getClientId().equals(clientId) && client.isConnected()) {
return true;
}
return false;
}
private boolean isServiceAviable(MqttConnectMessage connectMessage) {
int version = connectMessage.variableHeader().version();
return version >= MIN_AVAILABLE_VERSION && version <= MAX_AVAILABLE_VERSION;
}
private boolean checkPassword(byte[] bytes) {
return true;
}
private boolean checkUsername(String s) {
return true;
}
private boolean isAuthorized(MqttConnectMessage message) {
return true;
}
private boolean isClientIdValid(String s) {
return true;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttDisconnectMessageHandler implements MessageHandler {
private final SnodeController snodeController;
public MqttDisconnectMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* handle the DISCONNECT message from the client <ol> <li>discard the Will Message and Will
* Topic</li> <li>remove the client from the ClientManager</li> <li>disconnect the
* connection</li> </ol>
*/
@Override
public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
// TODO discard the Will Message and Will Topic
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttMessageForwarder implements MessageHandler {
private final SnodeController snodeController;
/* private SubscriptionStore subscriptionStore;
public MqttMessageForwarder(SubscriptionStore subscriptionStore) {
this.subscriptionStore = subscriptionStore;
}*/
public MqttMessageForwarder(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* handle PUBLISH message from client
*
* @param message
* @return whether the message is handled successfully
*/
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttMessageSender implements MessageHandler {
private final SnodeController snodeController;
public MqttMessageSender(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* send the PUBLISH message to client
*
* @param message
* @return whether the message is handled successfully
*/
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttPingreqMessageHandler implements MessageHandler {
private final SnodeController snodeController;
public MqttPingreqMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* handle the PINGREQ message from client
* <ol>
* <li>check client exists</li>
* <li>check client is connected</li>
* <li>generate the PINGRESP message</li>
* <li>send the PINGRESP message to the client</li>
* </ol>
*
* @param message
* @return
*/
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttPubackMessageHandler implements MessageHandler {
private final SnodeController snodeController;
public MqttPubackMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* handle the PUBACK message from the client
* <ol>
* <li>remove the message from the published in-flight messages</li>
* <li>ack the message in the MessageStore</li>
* </ol>
* @param
* @return
*/
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttPubcompMessageHandler implements MessageHandler {
private final SnodeController snodeController;
public MqttPubcompMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* handle the PUBCOMP message from the client
* @param message
* @return
*/
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttPublishMessageHandler implements MessageHandler {
private final SnodeController snodeController;
public MqttPublishMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
@Override
public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttPubrecMessageHandler implements MessageHandler {
private final SnodeController snodeController;
public MqttPubrecMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* handle the PUBREC message from the clinet
* @param message
* @return
*/
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttPubrelMessageHandler implements MessageHandler {
private final SnodeController snodeController;
public MqttPubrelMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* handle the PUBREL message from the client
* @param message
* @return
*/
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class MqttSubscribeMessageHandler implements MessageHandler {
/* private SubscriptionStore subscriptionStore;
public MqttSubscribeMessageHandler(SubscriptionStore subscriptionStore) {
this.subscriptionStore = subscriptionStore;
}*/
private final SnodeController snodeController;
public MqttSubscribeMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* handle the SUBSCRIBE message from the client
* <ol>
* <li>validate the topic filters in each subscription</li>
* <li>set actual qos of each filter</li>
* <li>get the topics matching given filters</li>
* <li>check the client authorization of each topic</li>
* <li>generate SUBACK message which includes the subscription result for each TopicFilter</li>
* <li>send SUBACK message to the client</li>
* </ol>
*
* @param message the message wrapping MqttSubscriptionMessage
* @return
*/
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
/**
* handle the UNSUBSCRIBE message from the client
* <ol>
* <li>extract topic filters to be un-subscribed</li>
* <li>get the topics matching with the topic filters</li>
* <li>verify the authorization of the client to the </li>
* <li>remove subscription from the SubscriptionStore</li>
* </ol>
*/
public class MqttUnsubscribeMessagHandler implements MessageHandler {
/* private SubscriptionStore subscriptionStore;
public MqttUnsubscribeMessagHandler(SubscriptionStore subscriptionStore) {
this.subscriptionStore = subscriptionStore;
}*/
private final SnodeController snodeController;
public MqttUnsubscribeMessagHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
@Override
public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.util;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import org.apache.rocketmq.snode.constant.MqttConstant;
public class MessageUtil {
public static final String MQTT_QOS_LEVEL = "MQTT_QOS_LEVEL";
public static final String MQTT_IS_RETAIN = "MQTT_IS_RETAIN";
public static final String MQTT_PACKET_ID = "MQTT_PACKET_ID";
public static final String MQTT_TOPIC_NAME = "MQTT_TOPIC_NAME";
public static final String MQTT_REMAINING_LENGTH = "MQTT_REMAINING_LENGTH";
public static final String MQTT_IS_DUP = "MQTT_IS_DUP";
public static final String MQTT_CLIENT_NAME = "MQTT_CLIENT_NAME";
public static final String MQTT_IS_CLEAN_SESSION = "MQTT_IS_CLEAN_SESSION";
public static final String MQTT_KEEP_ALIVE_TIME = "MQTT_KEEP_ALIVE_TIME";
public static final String MQTT_PROTOCOL_VERSION = "MQTT_PROTOCOL_VERSION";
public static MqttSubAckMessage getMqttSubackMessage(MqttSubscribeMessage message,
MqttSubAckPayload payload) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.SUBACK,
false,
message.fixedHeader().qosLevel(),
message.fixedHeader().isRetain(),
0
);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader
.from(message.variableHeader().messageId());
return new MqttSubAckMessage(fixedHeader, variableHeader, payload);
}
public static MqttPublishMessage getMqttPublishMessage(MqttMessage message, boolean isDup) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBLISH,
isDup,
message.fixedHeader().qosLevel(),
message.fixedHeader().isRetain(),
message.fixedHeader().remainingLength()
);
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(
((MqttPublishVariableHeader)message.variableHeader()).topicName(),
((MqttPublishVariableHeader)message.variableHeader()).packetId()
);
ByteBuf buf = Unpooled.buffer();
buf.writeBytes((byte[]) message.payload());
return new MqttPublishMessage(fixedHeader, variableHeader, buf);
}
public static MqttConnAckMessage getMqttConnackMessage(MqttConnectMessage message,
MqttConnectReturnCode returnCode) {
assert message.fixedHeader().messageType() == MqttMessageType.CONNECT;
MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(
returnCode,
message.variableHeader().isCleanSession()
);
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.CONNACK,
message.fixedHeader().isDup(),
message.fixedHeader().qosLevel(),
message.fixedHeader().isRetain(),
0);
return new MqttConnAckMessage(fixedHeader, variableHeader);
}
public static MqttPubAckMessage getMqttPubackMessage(MqttPublishMessage message) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBACK,
message.fixedHeader().isDup(),
message.fixedHeader().qosLevel(),
message.fixedHeader().isRetain(),
message.fixedHeader().remainingLength()
);
return new MqttPubAckMessage(fixedHeader,
MqttMessageIdVariableHeader.from(message.variableHeader().packetId()));
}
public static MqttMessage getMqttPubrecMessage(MqttPublishMessage message) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBREC,
message.fixedHeader().isDup(),
message.fixedHeader().qosLevel(),
message.fixedHeader().isRetain(),
message.fixedHeader().remainingLength()
);
return new MqttMessage(fixedHeader);
}
public static MqttMessage getMqttPubrelMessage(MqttMessage message) {
assert message.fixedHeader().messageType() == MqttMessageType.PUBREC;
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBREL,
message.fixedHeader().isDup(),
message.fixedHeader().qosLevel(),
message.fixedHeader().isRetain(),
message.fixedHeader().remainingLength()
);
return new MqttMessage(fixedHeader);
}
public static MqttMessage getMqttPubcompMessage(MqttMessage message) {
assert message.fixedHeader().messageType() == MqttMessageType.PUBREL;
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBCOMP,
message.fixedHeader().isDup(),
message.fixedHeader().qosLevel(),
message.fixedHeader().isRetain(),
message.fixedHeader().remainingLength()
);
return new MqttMessage(fixedHeader);
}
public static MqttMessage getMqttPingrespMessage(MqttMessage message) {
assert message.fixedHeader().messageType() == MqttMessageType.PINGREQ;
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PINGRESP,
message.fixedHeader().isDup(),
message.fixedHeader().qosLevel(),
message.fixedHeader().isRetain(),
0
);
return new MqttMessage(fixedHeader);
}
public static MqttUnsubAckMessage getMqttUnsubackMessage(MqttUnsubscribeMessage message) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.UNSUBACK,
message.fixedHeader().isDup(),
message.fixedHeader().qosLevel(),
message.fixedHeader().isRetain(),
0
);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader
.from(message.variableHeader().messageId());
return new MqttUnsubAckMessage(fixedHeader, variableHeader);
}
public static int actualQos(int qos) {
return Math.min(MqttConstant.MAX_SUPPORTED_QOS, qos);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor;
import com.alibaba.fastjson.JSON;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMqttMessageProcessorTest {
private DefaultMqttMessageProcessor defaultMqttMessageProcessor;
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
@Mock
private RemotingChannel remotingChannel;
private String topic = "SnodeTopic";
private String group = "SnodeGroup";
private String enodeName = "enodeName";
@Before
public void init() {
defaultMqttMessageProcessor = new DefaultMqttMessageProcessor();
}
@Test
public void testProcessRequest() throws RemotingCommandException, UnsupportedEncodingException {
RemotingCommand request = createMqttConnectMesssageCommand();
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.CONNECT, new MqttConnectMessageHandler(snodeController));
defaultMqttMessageProcessor.processRequest(remotingChannel, request);
}
private MqttHeader createMqttConnectMesssageHeader() {
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setMessageType(MqttMessageType.CONNECT.value());
mqttHeader.setDup(false);
mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
mqttHeader.setRetain(false);
mqttHeader.setRemainingLength(200);
mqttHeader.setName("MQTT");
mqttHeader.setVersion(4);
mqttHeader.setHasUserName(false);
mqttHeader.setHasPassword(false);
mqttHeader.setWillRetain(false);
mqttHeader.setWillQos(0);
mqttHeader.setWillFlag(false);
mqttHeader.setCleanSession(false);
mqttHeader.setKeepAliveTimeSeconds(60);
return mqttHeader;
}
private RemotingCommand createMqttConnectMesssageCommand() {
MqttHeader mqttHeader = createMqttConnectMesssageHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader);
MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes());
request.setBody(RocketMQMqttConnectPayload.fromMqttConnectPayload(payload).encode());
CodecHelper.makeCustomHeaderToNet(request);
return request;
}
private byte[] encode(Object obj) {
String json = JSON.toJSONString(obj, false);
if (json != null) {
return json.getBytes(Charset.forName(RemotingUtil.REMOTING_CHARSET));
}
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class MqttConnectMessageHandlerTest {
@Mock
private RemotingChannel remotingChannel;
@Test
public void testHandlerMessage() throws Exception {
MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(
new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig()));
MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes());
MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(new MqttFixedHeader(
MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200),new MqttConnectVariableHeader(null,4,false,false,false,0,false,false,50),new MqttConnectPayload("abcd", "ttest", "message".getBytes(),"user","password".getBytes()));
mqttConnectMessageHandler.handleMessage(mqttConnectMessage, remotingChannel);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册