提交 5b561494 编写于 作者: C chengxiangwang

add SessionManager, WillMessageService;finish CONNECT/DISCONNECT logic

上级 41f87a41
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.message.mqtt;
import io.netty.handler.codec.mqtt.MqttQoS;
public class RetainMessage {
private byte[] byteBuf;
private MqttQoS qoS;
public byte[] getByteBuf() {
return byteBuf;
}
public void setByteBuf(byte[] byteBuf) {
this.byteBuf = byteBuf;
}
public MqttQoS getQoS() {
return qoS;
}
public void setQoS(MqttQoS qoS) {
this.qoS = qoS;
}
public String getString() {
return new String(byteBuf);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.message.mqtt;
public class WillMessage {
private String willTopic;
private byte[] body;
private boolean isRetain;
private int qos;
public String getWillTopic() {
return willTopic;
}
public void setWillTopic(String willTopic) {
this.willTopic = willTopic;
}
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
public boolean isRetain() {
return isRetain;
}
public void setRetain(boolean retain) {
isRetain = retain;
}
public int getQos() {
return qos;
}
public void setQos(int qos) {
this.qos = qos;
}
public String getString() {
return new String(body);
}
}
......@@ -20,7 +20,6 @@
*/
package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......@@ -40,7 +39,7 @@ public class MqttHeader implements CommandCustomHeader {
private int remainingLength;
//variable header members
private MqttConnectReturnCode connectReturnCode;
private String connectReturnCode;
private boolean sessionPresent;
private String name;
private Integer version;
......@@ -95,11 +94,11 @@ public class MqttHeader implements CommandCustomHeader {
this.remainingLength = remainingLength;
}
public MqttConnectReturnCode getConnectReturnCode() {
public String getConnectReturnCode() {
return connectReturnCode;
}
public void setConnectReturnCode(MqttConnectReturnCode connectReturnCode) {
public void setConnectReturnCode(String connectReturnCode) {
this.connectReturnCode = connectReturnCode;
}
......
......@@ -113,31 +113,31 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
}
this.publicExecutor = ThreadUtils.newFixedThreadPool(
publicThreadNums,
10000, "Remoting-PublicExecutor", true);
10000, "MqttRemoting-PublicExecutor", true);
if (JvmUtils.isUseEpoll() && this.nettyServerConfig.isUseEpollNativeSelector()) {
this.eventLoopGroupSelector = new EpollEventLoopGroup(
serverConfig.getServerSelectorThreads(),
ThreadUtils.newGenericThreadFactory("NettyEpollIoThreads",
ThreadUtils.newGenericThreadFactory("MqttNettyEpollIoThreads",
serverConfig.getServerSelectorThreads()));
this.eventLoopGroupBoss = new EpollEventLoopGroup(
serverConfig.getServerAcceptorThreads(),
ThreadUtils.newGenericThreadFactory("NettyBossThreads",
ThreadUtils.newGenericThreadFactory("MqttNettyBossThreads",
serverConfig.getServerAcceptorThreads()));
this.socketChannelClass = EpollServerSocketChannel.class;
} else {
this.eventLoopGroupBoss = new NioEventLoopGroup(serverConfig.getServerAcceptorThreads(),
ThreadUtils.newGenericThreadFactory("NettyBossThreads",
ThreadUtils.newGenericThreadFactory("MqttNettyBossThreads",
serverConfig.getServerAcceptorThreads()));
this.eventLoopGroupSelector = new NioEventLoopGroup(
serverConfig.getServerSelectorThreads(),
ThreadUtils.newGenericThreadFactory("NettyNioIoThreads",
ThreadUtils.newGenericThreadFactory("MqttNettyNioIoThreads",
serverConfig.getServerSelectorThreads()));
this.socketChannelClass = NioServerSocketChannel.class;
}
this.port = nettyServerConfig.getMqttListenPort();
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
serverConfig.getServerWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyWorkerThreads",
ThreadUtils.newGenericThreadFactory("MqttNettyWorkerThreads",
serverConfig.getServerWorkerThreads()));
loadSslContext();
return this;
......
......@@ -19,8 +19,12 @@ package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import java.util.List;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.dispatcher.EncodeDecodeDispatcher;
import org.apache.rocketmq.remoting.transport.mqtt.dispatcher.Message2MessageEncodeDecode;
public class RemotingCommand2MqttMessageHandler extends MessageToMessageEncoder<RemotingCommand> {
......@@ -38,6 +42,17 @@ public class RemotingCommand2MqttMessageHandler extends MessageToMessageEncoder<
@Override
protected void encode(ChannelHandlerContext ctx, RemotingCommand msg, List<Object> out)
throws Exception {
if (!(msg instanceof RemotingCommand)) {
return;
}
MqttMessage mqttMessage = null;
MqttHeader mqttHeader = (MqttHeader) msg.decodeCommandCustomHeader(MqttHeader.class);
Message2MessageEncodeDecode message2MessageEncodeDecode = EncodeDecodeDispatcher
.getEncodeDecodeDispatcher().get(
MqttMessageType.valueOf(mqttHeader.getMessageType()));
if (message2MessageEncodeDecode != null) {
mqttMessage = message2MessageEncodeDecode.encode(msg);
}
out.add(mqttMessage);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.util.internal.StringUtil;
import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
/**
* Payload of {@link MqttSubscribeMessage}
*/
public final class RocketMQMqttSubscribePayload extends RemotingSerializable {
private List<MqttTopicSubscription> topicSubscriptions;
public RocketMQMqttSubscribePayload(List<MqttTopicSubscription> topicSubscriptions) {
this.topicSubscriptions = Collections.unmodifiableList(topicSubscriptions);
}
public List<MqttTopicSubscription> getTopicSubscriptions() {
return topicSubscriptions;
}
public void setTopicSubscriptions(
List<MqttTopicSubscription> topicSubscriptions) {
this.topicSubscriptions = topicSubscriptions;
}
public static RocketMQMqttSubscribePayload fromMqttSubscribePayload(MqttSubscribePayload payload) {
return new RocketMQMqttSubscribePayload(payload.topicSubscriptions());
}
public MqttSubscribePayload toMqttSubscribePayload() throws UnsupportedEncodingException {
return new MqttSubscribePayload(this.topicSubscriptions);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
for (int i = 0; i < topicSubscriptions.size() - 1; i++) {
builder.append(topicSubscriptions.get(i)).append(", ");
}
builder.append(topicSubscriptions.get(topicSubscriptions.size() - 1));
builder.append(']');
return builder.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
import io.netty.util.internal.StringUtil;
import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
/**
* Payload of the {@link io.netty.handler.codec.mqtt.MqttUnsubscribeMessage}
*/
public class RocketMQMqttUnSubscribePayload extends RemotingSerializable {
private List<String> topics;
public RocketMQMqttUnSubscribePayload(List<String> topics) {
this.topics = topics;
}
public List<String> getTopics() {
return topics;
}
public void setTopics(List<String> topics) {
this.topics = Collections.unmodifiableList(topics);
}
public static RocketMQMqttUnSubscribePayload fromMqttUnSubscribePayload(MqttUnsubscribePayload payload) {
return new RocketMQMqttUnSubscribePayload(payload.topics());
}
public MqttUnsubscribePayload toMqttUnsubscribePayload() throws UnsupportedEncodingException {
return new MqttUnsubscribePayload(this.topics);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
for (int i = 0; i < topics.size() - 1; i++) {
builder.append("topicName = ").append(topics.get(i)).append(", ");
}
builder.append("topicName = ").append(topics.get(topics.size() - 1))
.append(']');
return builder.toString();
}
}
......@@ -18,11 +18,12 @@
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface Message2MessageEncodeDecode {
RemotingCommand decode(MqttMessage mqttMessage);
MqttMessage encode(RemotingCommand remotingCommand);
MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException;
}
......@@ -17,8 +17,16 @@
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
public class MqttConnectackEncodeDecode implements Message2MessageEncodeDecode {
......@@ -28,7 +36,15 @@ public class MqttConnectackEncodeDecode implements Message2MessageEncodeDecode {
}
@Override
public MqttMessage encode(RemotingCommand remotingCommand) {
return null;
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException {
MqttHeader mqttHeader = (MqttHeader) remotingCommand
.decodeCommandCustomHeader(MqttHeader.class);
return new MqttConnAckMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()), new MqttConnAckVariableHeader(
MqttConnectReturnCode.valueOf(mqttHeader.getConnectReturnCode()),
mqttHeader.isSessionPresent()));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
public class MqttPubackEncodeDecode implements Message2MessageEncodeDecode {
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
return null;
}
@Override
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException {
MqttHeader mqttHeader = (MqttHeader) remotingCommand
.decodeCommandCustomHeader(MqttHeader.class);
return new MqttPubAckMessage(
new MqttFixedHeader(MqttMessageType.PUBACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()),
MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode {
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
ByteBuf byteBuf = ((MqttPublishMessage) mqttMessage).payload();
byte[] payload = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(payload);
RemotingCommand requestCommand = null;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttConnAckVariableHeader variableHeader = (MqttConnAckVariableHeader) mqttMessage
.variableHeader();
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setMessageType(mqttFixedHeader.messageType().value());
mqttHeader.setDup(mqttFixedHeader.isDup());
mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value());
mqttHeader.setRetain(mqttFixedHeader.isRetain());
mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength());
mqttHeader.setConnectReturnCode(variableHeader.connectReturnCode().name());
mqttHeader.setSessionPresent(variableHeader.isSessionPresent());
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
CodecHelper.makeCustomHeaderToNet(requestCommand);
requestCommand.setBody(payload);
return requestCommand;
}
@Override
public MqttMessage encode(RemotingCommand remotingCommand) {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
public class MqttSubackEncodeDecode implements Message2MessageEncodeDecode {
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
return null;
}
@Override
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException {
MqttHeader mqttHeader = (MqttHeader) remotingCommand
.decodeCommandCustomHeader(MqttHeader.class);
return new MqttSubAckMessage(
new MqttFixedHeader(MqttMessageType.SUBACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()),
MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()),new MqttSubAckPayload());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttSubscribePayload;
public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode {
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
RocketMQMqttSubscribePayload payload = RocketMQMqttSubscribePayload
.fromMqttSubscribePayload(((MqttSubscribeMessage) mqttMessage).payload());
RemotingCommand requestCommand = null;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage
.variableHeader();
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setMessageType(mqttFixedHeader.messageType().value());
mqttHeader.setDup(mqttFixedHeader.isDup());
mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value());
mqttHeader.setRetain(mqttFixedHeader.isRetain());
mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength());
mqttHeader.setMessageId(variableHeader.messageId());
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
CodecHelper.makeCustomHeaderToNet(requestCommand);
requestCommand.setBody(payload.encode());
return requestCommand;
}
@Override
public MqttMessage encode(RemotingCommand remotingCommand) {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
public class MqttUnSubackEncodeDecode implements Message2MessageEncodeDecode {
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
return null;
}
@Override
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException {
MqttHeader mqttHeader = (MqttHeader) remotingCommand
.decodeCommandCustomHeader(MqttHeader.class);
return new MqttUnsubAckMessage(
new MqttFixedHeader(MqttMessageType.UNSUBACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()),
MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttUnSubscribePayload;
public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode {
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
RocketMQMqttUnSubscribePayload payload = RocketMQMqttUnSubscribePayload
.fromMqttUnSubscribePayload(((MqttUnsubscribeMessage) mqttMessage).payload());
RemotingCommand requestCommand = null;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage
.variableHeader();
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setMessageType(mqttFixedHeader.messageType().value());
mqttHeader.setDup(mqttFixedHeader.isDup());
mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value());
mqttHeader.setRetain(mqttFixedHeader.isRetain());
mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength());
mqttHeader.setMessageId(variableHeader.messageId());
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
CodecHelper.makeCustomHeaderToNet(requestCommand);
requestCommand.setBody(payload.encode());
return requestCommand;
}
@Override
public MqttMessage encode(RemotingCommand remotingCommand) {
return null;
}
}
......@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.snode;
import io.netty.handler.codec.mqtt.MqttMessageType;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
......@@ -62,33 +61,26 @@ import org.apache.rocketmq.snode.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.snode.processor.HeartbeatProcessor;
import org.apache.rocketmq.snode.processor.PullMessageProcessor;
import org.apache.rocketmq.snode.processor.SendMessageProcessor;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttDisconnectMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPingreqMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubackMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubcompMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPublishMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubrecMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubrelMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttSubscribeMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttUnsubscribeMessagHandler;
import org.apache.rocketmq.snode.service.ClientService;
import org.apache.rocketmq.snode.service.EnodeService;
import org.apache.rocketmq.snode.service.MetricsService;
import org.apache.rocketmq.snode.service.NnodeService;
import org.apache.rocketmq.snode.service.PushService;
import org.apache.rocketmq.snode.service.ScheduledService;
import org.apache.rocketmq.snode.service.WillMessageService;
import org.apache.rocketmq.snode.service.impl.ClientServiceImpl;
import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl;
import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.PushServiceImpl;
import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl;
import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl;
import org.apache.rocketmq.snode.session.SessionManagerImpl;
public class SnodeController {
private static final InternalLogger log = InternalLoggerFactory
.getLogger(LoggerName.SNODE_LOGGER_NAME);
.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeConfig snodeConfig;
private final ServerConfig nettyServerConfig;
......@@ -108,6 +100,7 @@ public class SnodeController {
private ClientManager producerManager;
private ClientManager consumerManager;
private ClientManager iotClientManager;
private SessionManagerImpl sessionManager;
private SubscriptionManager subscriptionManager;
private ClientHousekeepingService clientHousekeepingService;
private SubscriptionGroupManager subscriptionGroupManager;
......@@ -124,14 +117,15 @@ public class SnodeController {
private ClientService clientService;
private SlowConsumerService slowConsumerService;
private MetricsService metricsService;
private WillMessageService willMessageService;
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"SnodeControllerScheduledThread"));
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"SnodeControllerScheduledThread"));
public SnodeController(ServerConfig nettyServerConfig,
ClientConfig nettyClientConfig,
SnodeConfig snodeConfig) {
ClientConfig nettyClientConfig,
SnodeConfig snodeConfig) {
this.nettyClientConfig = nettyClientConfig;
this.nettyServerConfig = nettyServerConfig;
this.snodeConfig = snodeConfig;
......@@ -139,37 +133,37 @@ public class SnodeController {
this.nnodeService = new NnodeServiceImpl(this);
this.scheduledService = new ScheduledServiceImpl(this);
this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient()
.init(this.getNettyClientConfig(), null);
.init(this.getNettyClientConfig(), null);
this.mqttRemotingClient = RemotingClientFactory.getInstance()
.createRemotingClient(RemotingUtil.MQTT_PROTOCOL)
.init(this.getNettyClientConfig(), null);
.createRemotingClient(RemotingUtil.MQTT_PROTOCOL)
.init(this.getNettyClientConfig(), null);
this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodeSendMessageThread",
false);
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodeSendMessageThread",
false);
this.pullMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodePullMessageThread",
false);
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodePullMessageThread",
false);
this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeHeartBeatCorePoolSize(),
snodeConfig.getSnodeHeartBeatMaxPoolSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()),
"SnodeHeartbeatThread",
true);
snodeConfig.getSnodeHeartBeatCorePoolSize(),
snodeConfig.getSnodeHeartBeatMaxPoolSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()),
"SnodeHeartbeatThread",
true);
// this.consumerManagerExecutor = ThreadUtils.newThreadPoolExecutor(
// snodeConfig.getSnodeSendMessageMinPoolSize(),
......@@ -181,27 +175,27 @@ public class SnodeController {
// false);
this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"ConsumerManagerThread",
false);
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"ConsumerManagerThread",
false);
this.handleMqttMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeHandleMqttMessageMinPoolSize(),
snodeConfig.getSnodeHandleMqttMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeHandleMqttThreadPoolQueueCapacity()),
"SnodeHandleMqttMessageThread",
false);
snodeConfig.getSnodeHandleMqttMessageMinPoolSize(),
snodeConfig.getSnodeHandleMqttMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeHandleMqttThreadPoolQueueCapacity()),
"SnodeHandleMqttMessageThread",
false);
if (this.snodeConfig.getNamesrvAddr() != null) {
this.nnodeService.updateNnodeAddressList(this.snodeConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}",
this.snodeConfig.getNamesrvAddr());
this.snodeConfig.getNamesrvAddr());
}
this.subscriptionGroupManager = new SubscriptionGroupManager(this);
......@@ -210,17 +204,19 @@ public class SnodeController {
this.sendMessageProcessor = new SendMessageProcessor(this);
this.heartbeatProcessor = new HeartbeatProcessor(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
this.defaultMqttMessageProcessor = new DefaultMqttMessageProcessor();
this.defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(this);
this.pushService = new PushServiceImpl(this);
this.clientService = new ClientServiceImpl(this);
this.subscriptionManager = new SubscriptionManagerImpl();
this.producerManager = new ProducerManagerImpl();
this.consumerManager = new ConsumerManagerImpl(this);
this.iotClientManager = new IOTClientManagerImpl(this);
this.sessionManager = new SessionManagerImpl(this);
this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager,
this.consumerManager, this.iotClientManager);
this.consumerManager, this.iotClientManager);
this.slowConsumerService = new SlowConsumerServiceImpl(this);
this.metricsService = new MetricsServiceImpl();
this.willMessageService = new WillMessageServiceImpl(this);
}
public SnodeConfig getSnodeConfig() {
......@@ -229,7 +225,7 @@ public class SnodeController {
private void initRemotingServerInterceptorGroup() {
List<Interceptor> remotingServerInterceptors = InterceptorFactory.getInstance()
.loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath());
.loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath());
if (remotingServerInterceptors != null && remotingServerInterceptors.size() > 0) {
if (this.remotingServerInterceptorGroup == null) {
this.remotingServerInterceptorGroup = new InterceptorGroup();
......@@ -237,17 +233,17 @@ public class SnodeController {
for (Interceptor interceptor : remotingServerInterceptors) {
this.remotingServerInterceptorGroup.registerInterceptor(interceptor);
log.warn("Remoting server interceptor: {} registered!",
interceptor.interceptorName());
interceptor.interceptorName());
}
}
}
public boolean initialize() {
this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer()
.init(this.nettyServerConfig, this.clientHousekeepingService);
.init(this.nettyServerConfig, this.clientHousekeepingService);
this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(
RemotingUtil.MQTT_PROTOCOL)
.init(this.nettyServerConfig, this.clientHousekeepingService);
RemotingUtil.MQTT_PROTOCOL)
.init(this.nettyServerConfig, this.clientHousekeepingService);
this.registerProcessor();
initSnodeInterceptorGroup();
initRemotingServerInterceptorGroup();
......@@ -265,7 +261,7 @@ public class SnodeController {
}
List<AccessValidator> accessValidators = ServiceProvider
.loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
.loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
if (accessValidators == null || accessValidators.isEmpty()) {
log.info("The snode dose not load the AccessValidator");
return;
......@@ -285,7 +281,7 @@ public class SnodeController {
//Do not catch the exception
RemotingCommand request = requestContext.getRequest();
String remoteAddr = RemotingUtil.socketAddress2IpString(
requestContext.getRemotingChannel().remoteAddress());
requestContext.getRemotingChannel().remoteAddress());
validator.validate(validator.parse(request, remoteAddr));
}
......@@ -303,17 +299,17 @@ public class SnodeController {
private void initSnodeInterceptorGroup() {
List<Interceptor> consumeMessageInterceptors = InterceptorFactory.getInstance()
.loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath());
.loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath());
if (consumeMessageInterceptors != null && consumeMessageInterceptors.size() > 0) {
this.consumeMessageInterceptorGroup = new InterceptorGroup();
for (Interceptor interceptor : consumeMessageInterceptors) {
this.consumeMessageInterceptorGroup.registerInterceptor(interceptor);
log.warn("Consume message interceptor: {} registered!",
interceptor.interceptorName());
interceptor.interceptorName());
}
}
List<Interceptor> sendMessageInterceptors = InterceptorFactory.getInstance()
.loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath());
.loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath());
if (sendMessageInterceptors != null && sendMessageInterceptors.size() > 0) {
this.sendMessageInterceptorGroup = new InterceptorGroup();
for (Interceptor interceptor : sendMessageInterceptors) {
......@@ -326,57 +322,37 @@ public class SnodeController {
public void registerProcessor() {
this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor,
this.sendMessageExecutor);
this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor,
this.sendMessageExecutor);
this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor,
this.heartbeatExecutor);
this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor,
this.heartbeatExecutor);
this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor,
this.heartbeatExecutor);
this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor,
this.pullMessageExecutor);
this.pullMessageExecutor);
this.snodeServer
.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor,
this.consumerManageExecutor);
.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer
.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer
.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.consumerManageExecutor);
this.snodeServer
.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor,
.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor);
this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE,
defaultMqttMessageProcessor, handleMqttMessageExecutor);
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.CONNECT,
new MqttConnectMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.DISCONNECT,
new MqttDisconnectMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PINGREQ,
new MqttPingreqMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBLISH,
new MqttPublishMessageHandler(this));
defaultMqttMessageProcessor
.registerMessageHanlder(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBCOMP,
new MqttPubcompMessageHandler(this));
defaultMqttMessageProcessor
.registerMessageHanlder(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this));
defaultMqttMessageProcessor
.registerMessageHanlder(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.SUBSCRIBE,
new MqttSubscribeMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.UNSUBSCRIBE,
new MqttUnsubscribeMessagHandler(this));
defaultMqttMessageProcessor, handleMqttMessageExecutor);
}
......@@ -503,7 +479,7 @@ public class SnodeController {
}
public void setRemotingServerInterceptorGroup(
InterceptorGroup remotingServerInterceptorGroup) {
InterceptorGroup remotingServerInterceptorGroup) {
this.remotingServerInterceptorGroup = remotingServerInterceptorGroup;
}
......@@ -531,6 +507,14 @@ public class SnodeController {
this.iotClientManager = iotClientManager;
}
public SessionManagerImpl getSessionManager() {
return sessionManager;
}
public void setSessionManager(SessionManagerImpl sessionManager) {
this.sessionManager = sessionManager;
}
public SubscriptionManager getSubscriptionManager() {
return subscriptionManager;
}
......@@ -566,4 +550,13 @@ public class SnodeController {
public void setMetricsService(MetricsService metricsService) {
this.metricsService = metricsService;
}
public WillMessageService getWillMessageService() {
return willMessageService;
}
public void setWillMessageService(
WillMessageService willMessageService) {
this.willMessageService = willMessageService;
}
}
......@@ -21,8 +21,10 @@ import java.util.Set;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.snode.client.impl.ClientRole;
import org.apache.rocketmq.snode.session.Session;
public class Client {
private ClientRole clientRole;
private String clientId;
......@@ -41,6 +43,8 @@ public class Client {
private boolean isConnected;
private Session session;
public ClientRole getClientRole() {
return clientRole;
}
......@@ -49,14 +53,16 @@ public class Client {
this.clientRole = clientRole;
}
@Override public boolean equals(Object o) {
if (this == o)
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
if (!(o instanceof Client))
}
if (!(o instanceof Client)) {
return false;
}
Client client = (Client) o;
return
version == client.version &&
return version == client.version &&
clientRole == client.clientRole &&
Objects.equals(clientId, client.clientId) &&
Objects.equals(groups, client.groups) &&
......@@ -65,8 +71,10 @@ public class Client {
isConnected == client.isConnected();
}
@Override public int hashCode() {
return Objects.hash(clientRole, clientId, groups, remotingChannel, heartbeatInterval, lastUpdateTimestamp, version, language, isConnected);
@Override
public int hashCode() {
return Objects.hash(clientRole, clientId, groups, remotingChannel, heartbeatInterval,
lastUpdateTimestamp, version, language, isConnected);
}
public RemotingChannel getRemotingChannel() {
......@@ -125,6 +133,14 @@ public class Client {
isConnected = connected;
}
public Session getSession() {
return session;
}
public void setSession(Session session) {
session = session;
}
public Set<String> getGroups() {
return groups;
}
......@@ -133,18 +149,20 @@ public class Client {
this.groups = groups;
}
@Override public String toString() {
@Override
public String toString() {
return "Client{" +
"clientRole=" + clientRole +
", clientId='" + clientId + '\'' +
", groups=" + groups +
", remotingChannel=" + remotingChannel +
", heartbeatInterval=" + heartbeatInterval +
", lastUpdateTimestamp=" + lastUpdateTimestamp +
", version=" + version +
", language=" + language +
", isConnected=" + isConnected +
'}';
"clientRole=" + clientRole +
", clientId='" + clientId + '\'' +
", groups=" + groups +
", remotingChannel=" + remotingChannel +
", heartbeatInterval=" + heartbeatInterval +
", lastUpdateTimestamp=" + lastUpdateTimestamp +
", version=" + version +
", language=" + language +
", isConnected=" + isConnected +
", session=" + session +
'}';
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.exception;
public class MqttConnectException extends RuntimeException{
public MqttConnectException(String message) {
super(message);
}
}
......@@ -38,15 +38,47 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttDisconnectMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPingreqMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubackMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubcompMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPublishMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubrecMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubrelMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttSubscribeMessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttUnsubscribeMessagHandler;
public class DefaultMqttMessageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private Map<MqttMessageType, MessageHandler> type2handler = new HashMap<>();
private final SnodeController snodeController;
private static final int MIN_AVAILABLE_VERSION = 3;
private static final int MAX_AVAILABLE_VERSION = 4;
public DefaultMqttMessageProcessor(SnodeController snodeController) {
this.snodeController = snodeController;
registerMessageHandler(MqttMessageType.CONNECT,
new MqttConnectMessageHandler(this.snodeController));
registerMessageHandler(MqttMessageType.DISCONNECT,
new MqttDisconnectMessageHandler(this.snodeController));
registerMessageHandler(MqttMessageType.PINGREQ,
new MqttPingreqMessageHandler(this.snodeController));
registerMessageHandler(MqttMessageType.PUBLISH,
new MqttPublishMessageHandler(this.snodeController));
registerMessageHandler(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this.snodeController));
registerMessageHandler(MqttMessageType.PUBCOMP,
new MqttPubcompMessageHandler(this.snodeController));
registerMessageHandler(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this.snodeController));
registerMessageHandler(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this.snodeController));
registerMessageHandler(MqttMessageType.SUBSCRIBE,
new MqttSubscribeMessageHandler(this.snodeController));
registerMessageHandler(MqttMessageType.UNSUBSCRIBE,
new MqttUnsubscribeMessagHandler(this.snodeController));
}
@Override
public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message)
......@@ -80,7 +112,7 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
return JSON.parseObject(json, classOfT);
}
public void registerMessageHanlder(MqttMessageType type, MessageHandler handler) {
private void registerMessageHandler(MqttMessageType type, MessageHandler handler) {
type2handler.put(type, handler);
}
}
......@@ -17,20 +17,33 @@
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.mqtt.WillMessage;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.ClientManager;
import org.apache.rocketmq.snode.client.impl.ClientRole;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
import org.apache.rocketmq.snode.exception.MqttConnectException;
import org.apache.rocketmq.snode.session.Session;
import org.apache.rocketmq.snode.session.SessionManagerImpl;
public class MqttConnectMessageHandler implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeController snodeController;
private static final int MIN_AVAILABLE_VERSION = 3;
private static final int MAX_AVAILABLE_VERSION = 4;
......@@ -47,14 +60,99 @@ public class MqttConnectMessageHandler implements MessageHandler {
MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) message;
MqttConnectPayload payload = mqttConnectMessage.payload();
MqttConnectReturnCode returnCode;
MqttConnAckMessage ackMessage;
RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class);
MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader();
mqttHeader.setMessageType(MqttMessageType.CONNACK.value());
mqttHeader.setDup(false);
mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
mqttHeader.setRetain(false);
mqttHeader.setRemainingLength(0x02);
/* TODO when clientId.length=0 and cleanSession=0, the server should assign a unique clientId to the client.*/
//validate clientId
if (StringUtils.isBlank(payload.clientIdentifier()) && !mqttConnectMessage.variableHeader()
.isCleanSession()) {
mqttHeader.setConnectReturnCode(
MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED.name());
mqttHeader.setSessionPresent(false);
command.setCode(ResponseCode.SYSTEM_ERROR);
command.setRemark("CONNECTION_REFUSED_IDENTIFIER_REJECTED");
return command;
}
//authentication
if (mqttConnectMessage.variableHeader().hasPassword() && mqttConnectMessage.variableHeader()
.hasUserName()
&& !authorized(payload.userName(), payload.password())) {
mqttHeader.setConnectReturnCode(
MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD.name());
mqttHeader.setSessionPresent(false);
command.setCode(ResponseCode.SYSTEM_ERROR);
command.setRemark("CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD");
return command;
}
//process a second CONNECT packet as a protocol violation and disconnect
if (isConnected(remotingChannel, payload.clientIdentifier())) {
remotingChannel.close();
return null;
}
//set Session Present according to whether the server has already stored Session State for the clientId
if (mqttConnectMessage.variableHeader().isCleanSession()) {
mqttHeader.setSessionPresent(false);
} else {
if (alreadyStoredSession(payload.clientIdentifier())) {
mqttHeader.setSessionPresent(true);
} else {
mqttHeader.setSessionPresent(false);
}
}
ClientManager iotClientManager = snodeController.getIotClientManager();
SessionManagerImpl sessionManager = snodeController.getSessionManager();
Client client = new Client();
client.setClientId(payload.clientIdentifier());
client.setClientRole(ClientRole.IOTCLIENT);
client.setConnected(true);
client.setRemotingChannel(remotingChannel);
client.setLastUpdateTimestamp(System.currentTimeMillis());
//register remotingChannel<--->client
iotClientManager.register(IOTClientManagerImpl.IOTGROUP, client);
Session session = new Session();
session.setClientId(client.getClientId());
//register client<--->session
sessionManager.register(client.getClientId(), session);
if (isConnected(remotingChannel, mqttConnectMessage.payload().clientIdentifier())) {
//save will message if have
if (mqttConnectMessage.variableHeader().isWillFlag()) {
if (payload.willTopic() == null || payload.willMessageInBytes() == null) {
log.error("Will message and will topic can not be null.");
throw new MqttConnectException("Will message and will topic can not be null.");
}
WillMessage willMessage = new WillMessage();
willMessage.setQos(mqttConnectMessage.variableHeader().willQos());
willMessage.setWillTopic(payload.willTopic());
willMessage.setRetain(mqttConnectMessage.variableHeader().isWillRetain());
willMessage.setBody(payload.willMessageInBytes());
snodeController.getWillMessageService().saveWillMessage(client.getClientId(), willMessage);
}
mqttHeader.setConnectReturnCode(MqttConnectReturnCode.CONNECTION_ACCEPTED.name());
command.setCode(ResponseCode.SUCCESS);
command.setRemark(null);
return command;
}
private boolean alreadyStoredSession(String clientId) {
SessionManagerImpl sessionManager = snodeController.getSessionManager();
Session session = sessionManager.getSession(clientId);
if (session != null && session.getClientId().equals(clientId)) {
return true;
}
// ChannelHandlerContext ctx = client.getCtx();
return null;
return false;
}
private boolean authorized(String username, String password) {
//TODO
return true;
}
private boolean isConnected(RemotingChannel remotingChannel, String clientId) {
......@@ -66,7 +164,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
return false;
}
private boolean isServiceAviable(MqttConnectMessage connectMessage) {
private boolean isServiceAvailable(MqttConnectMessage connectMessage) {
int version = connectMessage.variableHeader().version();
return version >= MIN_AVAILABLE_VERSION && version <= MAX_AVAILABLE_VERSION;
}
......
......@@ -17,13 +17,22 @@
package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
public class MqttDisconnectMessageHandler implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory
.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeController snodeController;
......@@ -32,13 +41,34 @@ public class MqttDisconnectMessageHandler implements MessageHandler {
}
/**
* handle the DISCONNECT message from the client <ol> <li>discard the Will Message and Will
* Topic</li> <li>remove the client from the ClientManager</li> <li>disconnect the
* connection</li> </ol>
* handle the DISCONNECT message from the client
* <ol>
* <li>discard the Will Message and Will Topic</li>
* <li>remove the client from the IOTClientManager</li>
* <li>disconnect the connection</li>
* </ol>
*/
@Override
public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
// TODO discard the Will Message and Will Topic
MqttFixedHeader fixedHeader = message.fixedHeader();
if (fixedHeader.qosLevel() != MqttQoS.AT_MOST_ONCE || !fixedHeader.isDup() || !fixedHeader
.isRetain()) {
log.error(
"The reserved bits(qos/isDup/isRetain) are not zero. Qos={}, isDup={}, isRetain={}",
fixedHeader.qosLevel(), fixedHeader.isDup(), fixedHeader.isRetain());
remotingChannel.close();
return null;
}
//discard will message associated with the current connection(client)
Client client = snodeController.getIotClientManager()
.getClient(IOTClientManagerImpl.IOTGROUP, remotingChannel);
if (client != null) {
snodeController.getWillMessageService().deleteWillMessage(client.getClientId());
}
client.setConnected(false);
remotingChannel.close();
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.service;
import org.apache.rocketmq.common.message.mqtt.WillMessage;
public interface WillMessageService {
void saveWillMessage(String clientId, WillMessage willMessage);
void sendWillMessage(String clientId);
void deleteWillMessage(String clientId);
}
......@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.service.impl;
import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.service.impl;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.message.mqtt.WillMessage;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.service.WillMessageService;
public class WillMessageServiceImpl implements WillMessageService {
private static ConcurrentHashMap<String/*clientId*/, WillMessage> willMessageTable = new ConcurrentHashMap<>();
private final SnodeController snodeController;
public WillMessageServiceImpl(SnodeController snodeController) {
this.snodeController = snodeController;
}
@Override
public void saveWillMessage(String clientId, WillMessage willMessage) {
willMessageTable.put(clientId, willMessage);
}
@Override
public void sendWillMessage(String clientId) {
}
@Override
public void deleteWillMessage(String clientId) {
willMessageTable.remove(clientId);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.session;
import java.util.Objects;
public class Session {
private String clientId;
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public long getLastUpdateTimestamp() {
return lastUpdateTimestamp;
}
public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
this.lastUpdateTimestamp = lastUpdateTimestamp;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Session)) {
return false;
}
Session session = (Session) o;
return Objects.equals(clientId, session.clientId);
}
@Override
public int hashCode() {
return Objects.hash(clientId, lastUpdateTimestamp);
}
@Override
public String toString() {
return "Session{" +
"clientId='" + clientId + '\'' +
", lastUpdateTimestamp=" + lastUpdateTimestamp +
'}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.session;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.snode.SnodeController;
public class SessionManagerImpl {
private static final InternalLogger log = InternalLoggerFactory
.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final ConcurrentHashMap<String/*clientId*/, Session> clientSessionTable = new ConcurrentHashMap<>(
1024);
private final SnodeController snodeController;
public SessionManagerImpl(SnodeController snodeController) {
this.snodeController = snodeController;
}
public boolean register(String clientId, Session session) {
boolean updated = false;
if (clientId != null && session != null) {
Session prev = clientSessionTable.put(clientId, session);
if (prev != null) {
log.info("Session updated, clientId: {} session: {}", clientId,
session);
updated = true;
} else {
log.info("New session registered, clientId: {} session: {}", clientId,
session);
}
session.setLastUpdateTimestamp(System.currentTimeMillis());
}
return updated;
}
public void unRegister(String clientId) {
Session prev = clientSessionTable.remove(clientId);
if (prev != null) {
log.info("Unregister session: {} of client, {}", prev, clientId);
}
}
public Session getSession(String clientId) {
return clientSessionTable.get(clientId);
}
public SnodeController getSnodeController() {
return snodeController;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.util;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import org.apache.rocketmq.snode.constant.MqttConstant;
public class MessageUtil {
public static final String MQTT_QOS_LEVEL = "MQTT_QOS_LEVEL";
public static final String MQTT_IS_RETAIN = "MQTT_IS_RETAIN";
public static final String MQTT_PACKET_ID = "MQTT_PACKET_ID";
public static final String MQTT_TOPIC_NAME = "MQTT_TOPIC_NAME";
public static final String MQTT_REMAINING_LENGTH = "MQTT_REMAINING_LENGTH";
public static final String MQTT_IS_DUP = "MQTT_IS_DUP";
public static final String MQTT_CLIENT_NAME = "MQTT_CLIENT_NAME";
public static final String MQTT_IS_CLEAN_SESSION = "MQTT_IS_CLEAN_SESSION";
public static final String MQTT_KEEP_ALIVE_TIME = "MQTT_KEEP_ALIVE_TIME";
public static final String MQTT_PROTOCOL_VERSION = "MQTT_PROTOCOL_VERSION";
public static MqttSubAckMessage getMqttSubackMessage(MqttSubscribeMessage message,
MqttSubAckPayload payload) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.SUBACK,
false,
message.fixedHeader().qosLevel(),
message.fixedHeader().isRetain(),
0
);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader
.from(message.variableHeader().messageId());
return new MqttSubAckMessage(fixedHeader, variableHeader, payload);
}
public static MqttPublishMessage getMqttPublishMessage(MqttMessage message, boolean isDup) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBLISH,
isDup,
message.fixedHeader().qosLevel(),
message.fixedHeader().isRetain(),
message.fixedHeader().remainingLength()
);
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(
((MqttPublishVariableHeader)message.variableHeader()).topicName(),
((MqttPublishVariableHeader)message.variableHeader()).packetId()
);
ByteBuf buf = Unpooled.buffer();
buf.writeBytes((byte[]) message.payload());
return new MqttPublishMessage(fixedHeader, variableHeader, buf);
}
public static MqttConnAckMessage getMqttConnackMessage(MqttConnectMessage message,
MqttConnectReturnCode returnCode) {
assert message.fixedHeader().messageType() == MqttMessageType.CONNECT;
MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(
returnCode,
message.variableHeader().isCleanSession()
);
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.CONNACK,
message.fixedHeader().isDup(),
message.fixedHeader().qosLevel(),
message.fixedHeader().isRetain(),
0);
return new MqttConnAckMessage(fixedHeader, variableHeader);
}
public static MqttPubAckMessage getMqttPubackMessage(MqttPublishMessage message) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBACK,
message.fixedHeader().isDup(),
message.fixedHeader().qosLevel(),
message.fixedHeader().isRetain(),
message.fixedHeader().remainingLength()
);
return new MqttPubAckMessage(fixedHeader,
MqttMessageIdVariableHeader.from(message.variableHeader().packetId()));
}
public static MqttMessage getMqttPubrecMessage(MqttPublishMessage message) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBREC,
message.fixedHeader().isDup(),
message.fixedHeader().qosLevel(),
message.fixedHeader().isRetain(),
message.fixedHeader().remainingLength()
);
return new MqttMessage(fixedHeader);
}
public static MqttMessage getMqttPubrelMessage(MqttMessage message) {
assert message.fixedHeader().messageType() == MqttMessageType.PUBREC;
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBREL,
message.fixedHeader().isDup(),
message.fixedHeader().qosLevel(),
message.fixedHeader().isRetain(),
message.fixedHeader().remainingLength()
);
return new MqttMessage(fixedHeader);
}
public static MqttMessage getMqttPubcompMessage(MqttMessage message) {
assert message.fixedHeader().messageType() == MqttMessageType.PUBREL;
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBCOMP,
message.fixedHeader().isDup(),
message.fixedHeader().qosLevel(),
message.fixedHeader().isRetain(),
message.fixedHeader().remainingLength()
);
return new MqttMessage(fixedHeader);
}
public static MqttMessage getMqttPingrespMessage(MqttMessage message) {
assert message.fixedHeader().messageType() == MqttMessageType.PINGREQ;
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PINGRESP,
message.fixedHeader().isDup(),
message.fixedHeader().qosLevel(),
message.fixedHeader().isRetain(),
0
);
return new MqttMessage(fixedHeader);
}
public static MqttUnsubAckMessage getMqttUnsubackMessage(MqttUnsubscribeMessage message) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.UNSUBACK,
message.fixedHeader().isDup(),
message.fixedHeader().qosLevel(),
message.fixedHeader().isRetain(),
0
);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader
.from(message.variableHeader().messageId());
return new MqttUnsubAckMessage(fixedHeader, variableHeader);
}
public static int actualQos(int qos) {
return Math.min(MqttConstant.MAX_SUPPORTED_QOS, qos);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.util;
import java.util.UUID;
public class MqttUtil {
public static String generateClientId() {
return UUID.randomUUID().toString();
}
}
......@@ -34,7 +34,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -60,13 +59,12 @@ public class DefaultMqttMessageProcessorTest {
@Before
public void init() {
defaultMqttMessageProcessor = new DefaultMqttMessageProcessor();
defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(snodeController);
}
@Test
public void testProcessRequest() throws RemotingCommandException, UnsupportedEncodingException {
RemotingCommand request = createMqttConnectMesssageCommand();
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.CONNECT, new MqttConnectMessageHandler(snodeController));
defaultMqttMessageProcessor.processRequest(remotingChannel, request);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.message.mqtt.WillMessage;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttDisconnectMessageHandler;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class MqttDisconnectMessageHandlerTest {
@Mock
private RemotingChannel remotingChannel;
@Test
public void testHandlerMessage() throws Exception {
SnodeController snodeController = new SnodeController(new ServerConfig(),
new ClientConfig(), new SnodeConfig());
MqttDisconnectMessageHandler mqttDisconnectMessageHandler = new MqttDisconnectMessageHandler(
snodeController);
Client client = new Client();
client.setRemotingChannel(remotingChannel);
client.setClientId("123456");
snodeController.getIotClientManager().register(IOTClientManagerImpl.IOTGROUP, client);
snodeController.getWillMessageService().saveWillMessage("123456", new WillMessage());
MqttMessage mqttDisconnectMessage = new MqttMessage(new MqttFixedHeader(
MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200));
mqttDisconnectMessageHandler.handleMessage(mqttDisconnectMessage, remotingChannel);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.service;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.message.mqtt.WillMessage;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.SnodeTestBase;
import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class WillMessageServiceImplTest extends SnodeTestBase {
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(),
new ClientConfig(), new SnodeConfig());
private WillMessageService willMessageService;
@Before
public void init() {
willMessageService = new WillMessageServiceImpl(snodeController);
}
@Test
public void saveWillMessageTest() {
willMessageService.saveWillMessage("testClientId", new WillMessage());
}
@Test
public void deleteWillMessageTest() {
willMessageService.saveWillMessage("testClientId", new WillMessage());
willMessageService.deleteWillMessage("testClientId");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册