diff --git a/common/src/main/java/org/apache/rocketmq/common/message/mqtt/WillMessage.java b/common/src/main/java/org/apache/rocketmq/common/message/mqtt/WillMessage.java index 763e0d74c34f1b33b71521417e618308f7128f0a..0121e128e9b0f783cd706fe253773173d83c3246 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/mqtt/WillMessage.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/mqtt/WillMessage.java @@ -23,7 +23,7 @@ public class WillMessage { private byte[] body; - private boolean isRetain; + private boolean isRetain; private int qos; @@ -58,6 +58,7 @@ public class WillMessage { public void setQos(int qos) { this.qos = qos; } + public String getString() { return new String(body); } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MqttSubscriptionData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MqttSubscriptionData.java index da9747cc9b24f16202381a8c372740e941f956a3..33fa1a878cdf78f1524543678e6b4d2b4d298381 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MqttSubscriptionData.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MqttSubscriptionData.java @@ -29,7 +29,7 @@ public class MqttSubscriptionData extends SubscriptionData { } public MqttSubscriptionData(int qos, String clientId, String topicFilter) { - super(topicFilter,null); + super(topicFilter, null); this.qos = qos; this.clientId = clientId; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java index 24ec854af9353cb75a2da0f0d8f94288254f00b9..6d4fc20a875ae776d06a8eee0d1c542e1c7f3622 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/MqttRemotingServer.java @@ -67,7 +67,7 @@ import org.apache.rocketmq.remoting.util.ThreadUtils; public class MqttRemotingServer extends NettyRemotingServerAbstract implements RemotingServer { private static final InternalLogger log = InternalLoggerFactory - .getLogger(RemotingHelper.ROCKETMQ_REMOTING); + .getLogger(RemotingHelper.ROCKETMQ_REMOTING); private ServerBootstrap serverBootstrap; private EventLoopGroup eventLoopGroupSelector; private EventLoopGroup eventLoopGroupBoss; @@ -94,16 +94,16 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R } public MqttRemotingServer(final ServerConfig nettyServerConfig, - final ChannelEventListener channelEventListener) { + final ChannelEventListener channelEventListener) { init(nettyServerConfig, channelEventListener); } @Override public RemotingServer init(ServerConfig serverConfig, - ChannelEventListener channelEventListener) { + ChannelEventListener channelEventListener) { this.nettyServerConfig = serverConfig; super.init(nettyServerConfig.getServerOnewaySemaphoreValue(), - nettyServerConfig.getServerAsyncSemaphoreValue()); + nettyServerConfig.getServerAsyncSemaphoreValue()); this.serverBootstrap = new ServerBootstrap(); this.channelEventListener = channelEventListener; @@ -112,33 +112,33 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R publicThreadNums = 4; } this.publicExecutor = ThreadUtils.newFixedThreadPool( - publicThreadNums, - 10000, "MqttRemoting-PublicExecutor", true); + publicThreadNums, + 10000, "MqttRemoting-PublicExecutor", true); if (JvmUtils.isUseEpoll() && this.nettyServerConfig.isUseEpollNativeSelector()) { this.eventLoopGroupSelector = new EpollEventLoopGroup( - serverConfig.getServerSelectorThreads(), - ThreadUtils.newGenericThreadFactory("MqttNettyEpollIoThreads", - serverConfig.getServerSelectorThreads())); + serverConfig.getServerSelectorThreads(), + ThreadUtils.newGenericThreadFactory("MqttNettyEpollIoThreads", + serverConfig.getServerSelectorThreads())); this.eventLoopGroupBoss = new EpollEventLoopGroup( - serverConfig.getServerAcceptorThreads(), - ThreadUtils.newGenericThreadFactory("MqttNettyBossThreads", - serverConfig.getServerAcceptorThreads())); + serverConfig.getServerAcceptorThreads(), + ThreadUtils.newGenericThreadFactory("MqttNettyBossThreads", + serverConfig.getServerAcceptorThreads())); this.socketChannelClass = EpollServerSocketChannel.class; } else { this.eventLoopGroupBoss = new NioEventLoopGroup(serverConfig.getServerAcceptorThreads(), - ThreadUtils.newGenericThreadFactory("MqttNettyBossThreads", - serverConfig.getServerAcceptorThreads())); + ThreadUtils.newGenericThreadFactory("MqttNettyBossThreads", + serverConfig.getServerAcceptorThreads())); this.eventLoopGroupSelector = new NioEventLoopGroup( - serverConfig.getServerSelectorThreads(), - ThreadUtils.newGenericThreadFactory("MqttNettyNioIoThreads", - serverConfig.getServerSelectorThreads())); + serverConfig.getServerSelectorThreads(), + ThreadUtils.newGenericThreadFactory("MqttNettyNioIoThreads", + serverConfig.getServerSelectorThreads())); this.socketChannelClass = NioServerSocketChannel.class; } this.port = nettyServerConfig.getMqttListenPort(); this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( - serverConfig.getServerWorkerThreads(), - ThreadUtils.newGenericThreadFactory("MqttNettyWorkerThreads", - serverConfig.getServerWorkerThreads())); + serverConfig.getServerWorkerThreads(), + ThreadUtils.newGenericThreadFactory("MqttNettyWorkerThreads", + serverConfig.getServerWorkerThreads())); loadSslContext(); return this; } @@ -162,40 +162,40 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R public void start() { super.start(); ServerBootstrap childHandler = - this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) - .channel(socketChannelClass) - .option(ChannelOption.SO_BACKLOG, 1024) - .option(ChannelOption.SO_REUSEADDR, true) - .option(ChannelOption.SO_KEEPALIVE, false) - .childOption(ChannelOption.TCP_NODELAY, true) - .childOption(ChannelOption.SO_SNDBUF, - nettyServerConfig.getServerSocketSndBufSize()) - .childOption(ChannelOption.SO_RCVBUF, - nettyServerConfig.getServerSocketRcvBufSize()) - .localAddress(new InetSocketAddress(this.port)) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline() - .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, - new HandshakeHandler(TlsSystemConfig.tlsMode)) - .addLast(defaultEventExecutorGroup, - new MqttDecoder(), - MqttEncoder.INSTANCE, - new MqttMessage2RemotingCommandHandler(), - new RemotingCommand2MqttMessageHandler(), - new IdleStateHandler(nettyServerConfig - .getConnectionChannelReaderIdleSeconds(), - nettyServerConfig - .getConnectionChannelWriterIdleSeconds(), - nettyServerConfig - .getServerChannelMaxIdleTimeSeconds()), - new NettyConnectManageHandler(), - new NettyServerHandler() - - ); - } - }); + this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) + .channel(socketChannelClass) + .option(ChannelOption.SO_BACKLOG, 1024) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_KEEPALIVE, false) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_SNDBUF, + nettyServerConfig.getServerSocketSndBufSize()) + .childOption(ChannelOption.SO_RCVBUF, + nettyServerConfig.getServerSocketRcvBufSize()) + .localAddress(new InetSocketAddress(this.port)) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline() + .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, + new HandshakeHandler(TlsSystemConfig.tlsMode)) + .addLast(defaultEventExecutorGroup, + new MqttDecoder(), + MqttEncoder.INSTANCE, + new MqttMessage2RemotingCommandHandler(), + new RemotingCommand2MqttMessageHandler(), + new IdleStateHandler(nettyServerConfig + .getConnectionChannelReaderIdleSeconds(), + nettyServerConfig + .getConnectionChannelWriterIdleSeconds(), + nettyServerConfig + .getServerChannelMaxIdleTimeSeconds()), + new NettyConnectManageHandler(), + new NettyServerHandler() + + ); + } + }); if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); @@ -207,7 +207,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", - e1); + e1); } startUpHouseKeepingService(); registerMessageHandler(); @@ -216,6 +216,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R private void registerMessageHandler() { } + @Override public void shutdown() { try { @@ -269,28 +270,28 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R @Override public RemotingCommand invokeSync(final RemotingChannel remotingChannel, - final RemotingCommand request, - final long timeoutMillis) - throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { + final RemotingCommand request, + final long timeoutMillis) + throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { return this.invokeSyncImpl(((NettyChannelImpl) remotingChannel).getChannel(), request, - timeoutMillis); + timeoutMillis); } @Override public void invokeAsync(RemotingChannel remotingChannel, RemotingCommand request, - long timeoutMillis, - InvokeCallback invokeCallback) - throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + long timeoutMillis, + InvokeCallback invokeCallback) + throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { this.invokeAsyncImpl(((NettyChannelImpl) remotingChannel).getChannel(), request, - timeoutMillis, invokeCallback); + timeoutMillis, invokeCallback); } @Override public void invokeOneway(RemotingChannel remotingChannel, RemotingCommand request, - long timeoutMillis) throws InterruptedException, - RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + long timeoutMillis) throws InterruptedException, + RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { this.invokeOnewayImpl(((NettyChannelImpl) remotingChannel).getChannel(), request, - timeoutMillis); + timeoutMillis); } @Override @@ -305,7 +306,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R @Override protected RemotingChannel getAndCreateChannel(String addr, long timeout) - throws InterruptedException { + throws InterruptedException { return null; } @@ -338,23 +339,23 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R case DISABLED: ctx.close(); log.warn( - "Clients intend to establish a SSL connection while this server is running in SSL disabled mode"); + "Clients intend to establish a SSL connection while this server is running in SSL disabled mode"); break; case PERMISSIVE: case ENFORCING: if (null != sslContext) { ctx.pipeline() - .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, - TLS_HANDLER_NAME, - sslContext.newHandler(ctx.channel().alloc())) - .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, - FILE_REGION_ENCODER_NAME, new FileRegionEncoder()); + .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, + TLS_HANDLER_NAME, + sslContext.newHandler(ctx.channel().alloc())) + .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, + FILE_REGION_ENCODER_NAME, new FileRegionEncoder()); log.info( - "Handlers prepended to channel pipeline to establish SSL connection"); + "Handlers prepended to channel pipeline to establish SSL connection"); } else { ctx.close(); log.error( - "Trying to establish a SSL connection but sslContext is null"); + "Trying to establish a SSL connection but sslContext is null"); } break; @@ -365,7 +366,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R } else if (tlsMode == TlsMode.ENFORCING) { ctx.close(); log.warn( - "Clients intend to establish an insecure connection while this server is running in SSL enforcing mode"); + "Clients intend to establish an insecure connection while this server is running in SSL enforcing mode"); } // reset the reader index so that handshake negotiation may proceed as normal. @@ -385,8 +386,8 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R @Override public void push(RemotingChannel remotingChannel, RemotingCommand request, - long timeoutMillis) throws InterruptedException, - RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + long timeoutMillis) throws InterruptedException, + RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { this.invokeOneway(remotingChannel, request, timeoutMillis); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubAckPayload.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubAckPayload.java index 68d047d1e7e93b07c260a4d3d0767e63e2af3399..495ffbcf77786a856420ac0125416605d28170d3 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubAckPayload.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubAckPayload.java @@ -51,6 +51,7 @@ public final class RocketMQMqttSubAckPayload extends RemotingSerializable { public MqttSubAckPayload toMqttSubAckPayload() throws UnsupportedEncodingException { return new MqttSubAckPayload(this.grantedQoSLevels); } + @Override public String toString() { return StringUtil.simpleClassName(this) + '[' + "grantedQoSLevels=" + this.grantedQoSLevels + ']'; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubscribePayload.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubscribePayload.java index fceca774964688397b23e9b87f26c289203ee384..8b2a19d9f951f59b440135f6fb1e9c157324fd9f 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubscribePayload.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttSubscribePayload.java @@ -42,9 +42,10 @@ public final class RocketMQMqttSubscribePayload extends RemotingSerializable { } public void setTopicSubscriptions( - List topicSubscriptions) { + List topicSubscriptions) { this.topicSubscriptions = topicSubscriptions; } + public static RocketMQMqttSubscribePayload fromMqttSubscribePayload(MqttSubscribePayload payload) { return new RocketMQMqttSubscribePayload(payload.topicSubscriptions()); } @@ -52,6 +53,7 @@ public final class RocketMQMqttSubscribePayload extends RemotingSerializable { public MqttSubscribePayload toMqttSubscribePayload() throws UnsupportedEncodingException { return new MqttSubscribePayload(this.topicSubscriptions); } + @Override public String toString() { StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('['); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttUnSubscribePayload.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttUnSubscribePayload.java index dee32dd034ed971fc4305f204810222628372d11..8e7484383d219f35138b657dddd7c6d463d08ff4 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttUnSubscribePayload.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/RocketMQMqttUnSubscribePayload.java @@ -41,6 +41,7 @@ public class RocketMQMqttUnSubscribePayload extends RemotingSerializable { public void setTopics(List topics) { this.topics = Collections.unmodifiableList(topics); } + public static RocketMQMqttUnSubscribePayload fromMqttUnSubscribePayload(MqttUnsubscribePayload payload) { return new RocketMQMqttUnSubscribePayload(payload.topics()); } @@ -48,6 +49,7 @@ public class RocketMQMqttUnSubscribePayload extends RemotingSerializable { public MqttUnsubscribePayload toMqttUnsubscribePayload() throws UnsupportedEncodingException { return new MqttUnsubscribePayload(this.topics); } + @Override public String toString() { StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('['); @@ -55,7 +57,7 @@ public class RocketMQMqttUnSubscribePayload extends RemotingSerializable { builder.append("topicName = ").append(topics.get(i)).append(", "); } builder.append("topicName = ").append(topics.get(topics.size() - 1)) - .append(']'); + .append(']'); return builder.toString(); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectackEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectackEncodeDecode.java index 34a664234e80fa57fcfc1e58bb541b0d3e290305..f7c60190c7f3b3015e8000e8fe072462d7e1bb16 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectackEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttConnectackEncodeDecode.java @@ -38,13 +38,13 @@ public class MqttConnectackEncodeDecode implements Message2MessageEncodeDecode { @Override public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException { MqttHeader mqttHeader = (MqttHeader) remotingCommand - .decodeCommandCustomHeader(MqttHeader.class); + .decodeCommandCustomHeader(MqttHeader.class); return new MqttConnAckMessage( - new MqttFixedHeader(MqttMessageType.CONNACK, mqttHeader.isDup(), - MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), - mqttHeader.getRemainingLength()), new MqttConnAckVariableHeader( - MqttConnectReturnCode.valueOf(mqttHeader.getConnectReturnCode()), - mqttHeader.isSessionPresent())); + new MqttFixedHeader(MqttMessageType.CONNACK, mqttHeader.isDup(), + MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), + mqttHeader.getRemainingLength()), new MqttConnAckVariableHeader( + MqttConnectReturnCode.valueOf(mqttHeader.getConnectReturnCode()), + mqttHeader.isSessionPresent())); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPubackEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPubackEncodeDecode.java index 30c4562aea46f12434ffa17b1ba2fe6304bbfa1e..fde2fcc96fdf04ced076c77ab74b3061a6b51f4c 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPubackEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPubackEncodeDecode.java @@ -37,12 +37,12 @@ public class MqttPubackEncodeDecode implements Message2MessageEncodeDecode { @Override public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException { MqttHeader mqttHeader = (MqttHeader) remotingCommand - .decodeCommandCustomHeader(MqttHeader.class); + .decodeCommandCustomHeader(MqttHeader.class); return new MqttPubAckMessage( - new MqttFixedHeader(MqttMessageType.PUBACK, mqttHeader.isDup(), - MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), - mqttHeader.getRemainingLength()), - MqttMessageIdVariableHeader.from(mqttHeader.getMessageId())); + new MqttFixedHeader(MqttMessageType.PUBACK, mqttHeader.isDup(), + MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), + mqttHeader.getRemainingLength()), + MqttMessageIdVariableHeader.from(mqttHeader.getMessageId())); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java index af2d58ecb87001b6c45b7060b5697b0557098cf6..ee76e95dbe9f630288591f3aa199970c07c339e6 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPublishEncodeDecode.java @@ -37,7 +37,7 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode { RemotingCommand requestCommand = null; MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); MqttConnAckVariableHeader variableHeader = (MqttConnAckVariableHeader) mqttMessage - .variableHeader(); + .variableHeader(); MqttHeader mqttHeader = new MqttHeader(); mqttHeader.setMessageType(mqttFixedHeader.messageType().value()); @@ -50,7 +50,7 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode { mqttHeader.setSessionPresent(variableHeader.isSessionPresent()); requestCommand = RemotingCommand - .createRequestCommand(1000, mqttHeader); + .createRequestCommand(1000, mqttHeader); CodecHelper.makeCustomHeaderToNet(requestCommand); requestCommand.setBody(payload); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java index a27fa4c5cdcc6dcddb6a5ff2a9974f9efa30d2ee..8c49e61cd787694fccaaa582814bfcc37bfa7768 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubackEncodeDecode.java @@ -38,12 +38,12 @@ public class MqttSubackEncodeDecode implements Message2MessageEncodeDecode { @Override public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException { MqttHeader mqttHeader = (MqttHeader) remotingCommand - .decodeCommandCustomHeader(MqttHeader.class); + .decodeCommandCustomHeader(MqttHeader.class); return new MqttSubAckMessage( - new MqttFixedHeader(MqttMessageType.SUBACK, mqttHeader.isDup(), - MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), - mqttHeader.getRemainingLength()), - MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()),new MqttSubAckPayload()); + new MqttFixedHeader(MqttMessageType.SUBACK, mqttHeader.isDup(), + MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), + mqttHeader.getRemainingLength()), + MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()), new MqttSubAckPayload()); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java index 645a52f050f8fa50202d2b960415ebd360dcb4dc..338672810284714bdfa65ee1d03819d68d0a9a70 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttSubscribeEncodeDecode.java @@ -31,12 +31,12 @@ public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode { @Override public RemotingCommand decode(MqttMessage mqttMessage) { RocketMQMqttSubscribePayload payload = RocketMQMqttSubscribePayload - .fromMqttSubscribePayload(((MqttSubscribeMessage) mqttMessage).payload()); + .fromMqttSubscribePayload(((MqttSubscribeMessage) mqttMessage).payload()); RemotingCommand requestCommand = null; MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage - .variableHeader(); + .variableHeader(); MqttHeader mqttHeader = new MqttHeader(); mqttHeader.setMessageType(mqttFixedHeader.messageType().value()); @@ -48,7 +48,7 @@ public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode { mqttHeader.setMessageId(variableHeader.messageId()); requestCommand = RemotingCommand - .createRequestCommand(1000, mqttHeader); + .createRequestCommand(1000, mqttHeader); CodecHelper.makeCustomHeaderToNet(requestCommand); requestCommand.setBody(payload.encode()); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubackEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubackEncodeDecode.java index 863777a95a52b8d171315c778d4cf38c898958b8..21f51b640baf67a75dd106c5e9ddbdeda434956d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubackEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubackEncodeDecode.java @@ -37,12 +37,12 @@ public class MqttUnSubackEncodeDecode implements Message2MessageEncodeDecode { @Override public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException { MqttHeader mqttHeader = (MqttHeader) remotingCommand - .decodeCommandCustomHeader(MqttHeader.class); + .decodeCommandCustomHeader(MqttHeader.class); return new MqttUnsubAckMessage( - new MqttFixedHeader(MqttMessageType.UNSUBACK, mqttHeader.isDup(), - MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), - mqttHeader.getRemainingLength()), - MqttMessageIdVariableHeader.from(mqttHeader.getMessageId())); + new MqttFixedHeader(MqttMessageType.UNSUBACK, mqttHeader.isDup(), + MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), + mqttHeader.getRemainingLength()), + MqttMessageIdVariableHeader.from(mqttHeader.getMessageId())); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java index 37b6fe12c8d200031c2671ba3fce48bf4fc65c74..88c8518b8900f8c17d51bb33e0731cd4efda7d67 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttUnSubscribeEncodeDecode.java @@ -31,12 +31,12 @@ public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode @Override public RemotingCommand decode(MqttMessage mqttMessage) { RocketMQMqttUnSubscribePayload payload = RocketMQMqttUnSubscribePayload - .fromMqttUnSubscribePayload(((MqttUnsubscribeMessage) mqttMessage).payload()); + .fromMqttUnSubscribePayload(((MqttUnsubscribeMessage) mqttMessage).payload()); RemotingCommand requestCommand = null; MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage - .variableHeader(); + .variableHeader(); MqttHeader mqttHeader = new MqttHeader(); mqttHeader.setMessageType(mqttFixedHeader.messageType().value()); @@ -48,7 +48,7 @@ public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode mqttHeader.setMessageId(variableHeader.messageId()); requestCommand = RemotingCommand - .createRequestCommand(1000, mqttHeader); + .createRequestCommand(1000, mqttHeader); CodecHelper.makeCustomHeaderToNet(requestCommand); requestCommand.setBody(payload.encode()); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index b744fd221ef528e6b0db57ab6b7d3cc9e756413d..b5a9cec219c08afd65aecbd2ab1164381b27b8c5 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -79,7 +79,7 @@ import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl; public class SnodeController { private static final InternalLogger log = InternalLoggerFactory - .getLogger(LoggerName.SNODE_LOGGER_NAME); + .getLogger(LoggerName.SNODE_LOGGER_NAME); private final SnodeConfig snodeConfig; private final ServerConfig nettyServerConfig; @@ -118,12 +118,12 @@ public class SnodeController { private WillMessageService willMessageService; private final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryImpl( - "SnodeControllerScheduledThread")); + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl( + "SnodeControllerScheduledThread")); public SnodeController(ServerConfig nettyServerConfig, - ClientConfig nettyClientConfig, - SnodeConfig snodeConfig) { + ClientConfig nettyClientConfig, + SnodeConfig snodeConfig) { this.nettyClientConfig = nettyClientConfig; this.nettyServerConfig = nettyServerConfig; this.snodeConfig = snodeConfig; @@ -131,37 +131,37 @@ public class SnodeController { this.nnodeService = new NnodeServiceImpl(this); this.scheduledService = new ScheduledServiceImpl(this); this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient() - .init(this.getNettyClientConfig(), null); + .init(this.getNettyClientConfig(), null); this.mqttRemotingClient = RemotingClientFactory.getInstance() - .createRemotingClient(RemotingUtil.MQTT_PROTOCOL) - .init(this.getNettyClientConfig(), null); + .createRemotingClient(RemotingUtil.MQTT_PROTOCOL) + .init(this.getNettyClientConfig(), null); this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeSendMessageMinPoolSize(), - snodeConfig.getSnodeSendMessageMaxPoolSize(), - 3000, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), - "SnodeSendMessageThread", - false); + snodeConfig.getSnodeSendMessageMinPoolSize(), + snodeConfig.getSnodeSendMessageMaxPoolSize(), + 3000, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), + "SnodeSendMessageThread", + false); this.pullMessageExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeSendMessageMinPoolSize(), - snodeConfig.getSnodeSendMessageMaxPoolSize(), - 3000, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), - "SnodePullMessageThread", - false); + snodeConfig.getSnodeSendMessageMinPoolSize(), + snodeConfig.getSnodeSendMessageMaxPoolSize(), + 3000, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), + "SnodePullMessageThread", + false); this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeHeartBeatCorePoolSize(), - snodeConfig.getSnodeHeartBeatMaxPoolSize(), - 1000 * 60, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()), - "SnodeHeartbeatThread", - true); + snodeConfig.getSnodeHeartBeatCorePoolSize(), + snodeConfig.getSnodeHeartBeatMaxPoolSize(), + 1000 * 60, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()), + "SnodeHeartbeatThread", + true); // this.consumerManagerExecutor = ThreadUtils.newThreadPoolExecutor( // snodeConfig.getSnodeSendMessageMinPoolSize(), @@ -173,27 +173,27 @@ public class SnodeController { // false); this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeSendMessageMinPoolSize(), - snodeConfig.getSnodeSendMessageMaxPoolSize(), - 3000, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), - "ConsumerManagerThread", - false); + snodeConfig.getSnodeSendMessageMinPoolSize(), + snodeConfig.getSnodeSendMessageMaxPoolSize(), + 3000, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), + "ConsumerManagerThread", + false); this.handleMqttMessageExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeHandleMqttMessageMinPoolSize(), - snodeConfig.getSnodeHandleMqttMessageMaxPoolSize(), - 3000, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeHandleMqttThreadPoolQueueCapacity()), - "SnodeHandleMqttMessageThread", - false); + snodeConfig.getSnodeHandleMqttMessageMinPoolSize(), + snodeConfig.getSnodeHandleMqttMessageMaxPoolSize(), + 3000, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeHandleMqttThreadPoolQueueCapacity()), + "SnodeHandleMqttMessageThread", + false); if (this.snodeConfig.getNamesrvAddr() != null) { this.nnodeService.updateNnodeAddressList(this.snodeConfig.getNamesrvAddr()); log.info("Set user specified name server address: {}", - this.snodeConfig.getNamesrvAddr()); + this.snodeConfig.getNamesrvAddr()); } this.subscriptionGroupManager = new SubscriptionGroupManager(this); @@ -210,7 +210,7 @@ public class SnodeController { this.consumerManager = new ConsumerManagerImpl(this); this.iotClientManager = new IOTClientManagerImpl(this); this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, - this.consumerManager, this.iotClientManager); + this.consumerManager, this.iotClientManager); this.slowConsumerService = new SlowConsumerServiceImpl(this); this.metricsService = new MetricsServiceImpl(); this.willMessageService = new WillMessageServiceImpl(this); @@ -222,7 +222,7 @@ public class SnodeController { private void initRemotingServerInterceptorGroup() { List remotingServerInterceptors = InterceptorFactory.getInstance() - .loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath()); + .loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath()); if (remotingServerInterceptors != null && remotingServerInterceptors.size() > 0) { if (this.remotingServerInterceptorGroup == null) { this.remotingServerInterceptorGroup = new InterceptorGroup(); @@ -230,17 +230,17 @@ public class SnodeController { for (Interceptor interceptor : remotingServerInterceptors) { this.remotingServerInterceptorGroup.registerInterceptor(interceptor); log.warn("Remoting server interceptor: {} registered!", - interceptor.interceptorName()); + interceptor.interceptorName()); } } } public boolean initialize() { this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer() - .init(this.nettyServerConfig, this.clientHousekeepingService); + .init(this.nettyServerConfig, this.clientHousekeepingService); this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer( - RemotingUtil.MQTT_PROTOCOL) - .init(this.nettyServerConfig, this.clientHousekeepingService); + RemotingUtil.MQTT_PROTOCOL) + .init(this.nettyServerConfig, this.clientHousekeepingService); this.registerProcessor(); initSnodeInterceptorGroup(); initRemotingServerInterceptorGroup(); @@ -258,7 +258,7 @@ public class SnodeController { } List accessValidators = ServiceProvider - .loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); + .loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); if (accessValidators == null || accessValidators.isEmpty()) { log.info("The snode dose not load the AccessValidator"); return; @@ -278,7 +278,7 @@ public class SnodeController { //Do not catch the exception RemotingCommand request = requestContext.getRequest(); String remoteAddr = RemotingUtil.socketAddress2IpString( - requestContext.getRemotingChannel().remoteAddress()); + requestContext.getRemotingChannel().remoteAddress()); validator.validate(validator.parse(request, remoteAddr)); } @@ -296,17 +296,17 @@ public class SnodeController { private void initSnodeInterceptorGroup() { List consumeMessageInterceptors = InterceptorFactory.getInstance() - .loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath()); + .loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath()); if (consumeMessageInterceptors != null && consumeMessageInterceptors.size() > 0) { this.consumeMessageInterceptorGroup = new InterceptorGroup(); for (Interceptor interceptor : consumeMessageInterceptors) { this.consumeMessageInterceptorGroup.registerInterceptor(interceptor); log.warn("Consume message interceptor: {} registered!", - interceptor.interceptorName()); + interceptor.interceptorName()); } } List sendMessageInterceptors = InterceptorFactory.getInstance() - .loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath()); + .loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath()); if (sendMessageInterceptors != null && sendMessageInterceptors.size() > 0) { this.sendMessageInterceptorGroup = new InterceptorGroup(); for (Interceptor interceptor : sendMessageInterceptors) { @@ -319,37 +319,37 @@ public class SnodeController { public void registerProcessor() { this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, - this.sendMessageExecutor); + this.sendMessageExecutor); this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, - this.sendMessageExecutor); + this.sendMessageExecutor); this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, - this.heartbeatExecutor); + this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, - this.heartbeatExecutor); + this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, - this.heartbeatExecutor); + this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, - this.pullMessageExecutor); + this.pullMessageExecutor); this.snodeServer - .registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, - this.consumerManageExecutor); + .registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, + this.consumerManageExecutor); this.snodeServer - .registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, - this.consumerManageExecutor); + .registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, + this.consumerManageExecutor); this.snodeServer - .registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, - this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, + .registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, + this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, - this.consumerManageExecutor); + this.consumerManageExecutor); this.snodeServer - .registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, - this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, + .registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, + this.consumerManageExecutor); this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, - defaultMqttMessageProcessor, handleMqttMessageExecutor); + defaultMqttMessageProcessor, handleMqttMessageExecutor); } @@ -476,7 +476,7 @@ public class SnodeController { } public void setRemotingServerInterceptorGroup( - InterceptorGroup remotingServerInterceptorGroup) { + InterceptorGroup remotingServerInterceptorGroup) { this.remotingServerInterceptorGroup = remotingServerInterceptorGroup; } @@ -545,7 +545,7 @@ public class SnodeController { } public void setWillMessageService( - WillMessageService willMessageService) { + WillMessageService willMessageService) { this.willMessageService = willMessageService; } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java index f48465c83142b6e2942f3d2f83d3322537455590..80ff2d1fb6c01b6b6a66732860ea9f2071631eae 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java @@ -62,39 +62,39 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { public DefaultMqttMessageProcessor(SnodeController snodeController) { this.snodeController = snodeController; registerMessageHandler(MqttMessageType.CONNECT, - new MqttConnectMessageHandler(this.snodeController)); + new MqttConnectMessageHandler(this.snodeController)); registerMessageHandler(MqttMessageType.DISCONNECT, - new MqttDisconnectMessageHandler(this.snodeController)); + new MqttDisconnectMessageHandler(this.snodeController)); registerMessageHandler(MqttMessageType.PINGREQ, - new MqttPingreqMessageHandler(this.snodeController)); + new MqttPingreqMessageHandler(this.snodeController)); registerMessageHandler(MqttMessageType.PUBLISH, - new MqttPublishMessageHandler(this.snodeController)); + new MqttPublishMessageHandler(this.snodeController)); registerMessageHandler(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this.snodeController)); registerMessageHandler(MqttMessageType.PUBCOMP, - new MqttPubcompMessageHandler(this.snodeController)); + new MqttPubcompMessageHandler(this.snodeController)); registerMessageHandler(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this.snodeController)); registerMessageHandler(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this.snodeController)); registerMessageHandler(MqttMessageType.SUBSCRIBE, - new MqttSubscribeMessageHandler(this.snodeController)); + new MqttSubscribeMessageHandler(this.snodeController)); registerMessageHandler(MqttMessageType.UNSUBSCRIBE, - new MqttUnsubscribeMessagHandler(this.snodeController)); + new MqttUnsubscribeMessagHandler(this.snodeController)); } @Override public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message) - throws RemotingCommandException, UnsupportedEncodingException { + throws RemotingCommandException, UnsupportedEncodingException { MqttHeader mqttHeader = (MqttHeader) message.decodeCommandCustomHeader(MqttHeader.class); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.valueOf(mqttHeader.getMessageType()), - mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), - mqttHeader.getRemainingLength()); + mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), + mqttHeader.getRemainingLength()); MqttMessage mqttMessage = null; switch (fixedHeader.messageType()) { case CONNECT: MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader( - mqttHeader.getName(), mqttHeader.getVersion(), mqttHeader.isHasUserName(), - mqttHeader.isHasPassword(), mqttHeader.isWillRetain(), - mqttHeader.getWillQos(), mqttHeader.isWillFlag(), - mqttHeader.isCleanSession(), mqttHeader.getKeepAliveTimeSeconds()); + mqttHeader.getName(), mqttHeader.getVersion(), mqttHeader.isHasUserName(), + mqttHeader.isHasPassword(), mqttHeader.isWillRetain(), + mqttHeader.getWillQos(), mqttHeader.isWillFlag(), + mqttHeader.isCleanSession(), mqttHeader.getKeepAliveTimeSeconds()); RocketMQMqttConnectPayload payload = decode(message.getBody(), RocketMQMqttConnectPayload.class); mqttMessage = new MqttConnectMessage(fixedHeader, variableHeader, payload.toMqttConnectPayload()); case DISCONNECT: diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java index 9755069d7bb2a8a896739191a0f5d9aff12e7cd9..b5229478a96befc3b9f2227b673afed1324161ea 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java @@ -31,39 +31,33 @@ import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; public class MqttDisconnectMessageHandler implements MessageHandler { - private static final InternalLogger log = InternalLoggerFactory - .getLogger(LoggerName.SNODE_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private final SnodeController snodeController; - public MqttDisconnectMessageHandler(SnodeController snodeController) { this.snodeController = snodeController; } /** - * handle the DISCONNECT message from the client - *
    - *
  1. discard the Will Message and Will Topic
  2. - *
  3. remove the client from the IOTClientManager
  4. - *
  5. disconnect the connection
  6. - *
+ * handle the DISCONNECT message from the client
  1. discard the Will Message and Will Topic
  2. remove + * the client from the IOTClientManager
  3. disconnect the connection
*/ @Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) { // TODO discard the Will Message and Will Topic MqttFixedHeader fixedHeader = message.fixedHeader(); if (fixedHeader.qosLevel() != MqttQoS.AT_MOST_ONCE || !fixedHeader.isDup() || !fixedHeader - .isRetain()) { + .isRetain()) { log.error( - "The reserved bits(qos/isDup/isRetain) are not zero. Qos={}, isDup={}, isRetain={}", - fixedHeader.qosLevel(), fixedHeader.isDup(), fixedHeader.isRetain()); + "The reserved bits(qos/isDup/isRetain) are not zero. Qos={}, isDup={}, isRetain={}", + fixedHeader.qosLevel(), fixedHeader.isDup(), fixedHeader.isRetain()); remotingChannel.close(); return null; } //discard will message associated with the current connection(client) Client client = snodeController.getIotClientManager() - .getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); + .getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); if (client != null) { snodeController.getWillMessageService().deleteWillMessage(client.getClientId()); } diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java index 1c4e8fde14b36e8222152078a8d151d164b7f888..85a862f4671e255ee6c0b3981aeb90d9a2136de4 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java @@ -97,6 +97,7 @@ public class DefaultMqttMessageProcessorTest { CodecHelper.makeCustomHeaderToNet(request); return request; } + private byte[] encode(Object obj) { String json = JSON.toJSONString(obj, false); if (json != null) { diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java index 643df011460c3c8d67dfa668ad7ae0f85848da13..4bb73dfc8f8809ebe9332cacfe9a7d024975cef8 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java @@ -44,16 +44,16 @@ public class MqttDisconnectMessageHandlerTest { public void testHandlerMessage() throws Exception { SnodeController snodeController = new SnodeController(new ServerConfig(), - new ClientConfig(), new SnodeConfig()); + new ClientConfig(), new SnodeConfig()); MqttDisconnectMessageHandler mqttDisconnectMessageHandler = new MqttDisconnectMessageHandler( - snodeController); + snodeController); Client client = new Client(); client.setRemotingChannel(remotingChannel); client.setClientId("123456"); snodeController.getIotClientManager().register(IOTClientManagerImpl.IOT_GROUP, client); snodeController.getWillMessageService().saveWillMessage("123456", new WillMessage()); MqttMessage mqttDisconnectMessage = new MqttMessage(new MqttFixedHeader( - MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200)); + MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200)); mqttDisconnectMessageHandler.handleMessage(mqttDisconnectMessage, remotingChannel); } diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java index 12a7fe7553d275bcaabdccae47ab4938a3f282f2..f9ca3536a3ec9fa189a0491b7779c5a5aa9d7207 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java @@ -34,7 +34,7 @@ public class WillMessageServiceImplTest extends SnodeTestBase { @Spy private SnodeController snodeController = new SnodeController(new ServerConfig(), - new ClientConfig(), new SnodeConfig()); + new ClientConfig(), new SnodeConfig()); private WillMessageService willMessageService; diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index 1d9a416497e48b6947d57046bae559e48a08311a..8cfd0f6dd8fbcd573e530250ff94d0ee2b431a61 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -27,16 +27,14 @@ import java.nio.channels.FileChannel; import java.nio.channels.OverlappingFileLockException; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.StorePathConfigHelper; -import org.junit.After; import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -62,7 +60,6 @@ public class DefaultMessageStoreTest { messageStore.start(); } - @Ignore @Test(expected = OverlappingFileLockException.class) public void test_repate_restart() throws Exception { QUEUE_TOTAL = 1; @@ -294,7 +291,7 @@ public class DefaultMessageStoreTest { private class MyMessageArrivingListener implements MessageArrivingListener { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, - byte[] filterBitMap, Map properties) { + byte[] filterBitMap, Map properties) { } } } diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java index 6f557762096f56e19996ee744515ed9e2e0d475a..8fec9ba2d1a8b2e08686ee7ed3e9f02548f94244 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java @@ -16,12 +16,10 @@ import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; public class DLedgerCommitlogTest extends MessageStoreTestBase { - @Ignore @Test public void testTruncateCQ() throws Exception { String base = createBaseDir(); @@ -77,7 +75,6 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { } - @Ignore @Test public void testRecover() throws Exception { String base = createBaseDir(); @@ -118,7 +115,6 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { } - @Ignore @Test public void testPutAndGetMessage() throws Exception { String base = createBaseDir(); @@ -159,7 +155,6 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { messageStore.shutdown(); } - @Ignore @Test public void testCommittedPos() throws Exception { String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort()); diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java index 2fac4c373641792fc994ebd01bfa423282f06bb0..5ec5a6531c4d5b0191d8512f1a87841672b73a70 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java @@ -5,13 +5,11 @@ import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.StoreTestBase; import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; public class MixCommitlogTest extends MessageStoreTestBase { - @Ignore @Test public void testFallBehindCQ() throws Exception { String base = createBaseDir(); @@ -51,7 +49,6 @@ public class MixCommitlogTest extends MessageStoreTestBase { } - @Ignore @Test public void testPutAndGet() throws Exception { String base = createBaseDir(); @@ -112,7 +109,6 @@ public class MixCommitlogTest extends MessageStoreTestBase { recoverDledgerStore.shutdown(); } } - @Ignore @Test public void testDeleteExpiredFiles() throws Exception { String base = createBaseDir();