提交 5a3102d9 编写于 作者: C chengxiangwang

completing qos=0 message pub and sub

上级 e2c697bf
...@@ -56,12 +56,12 @@ public class SnodeConfig { ...@@ -56,12 +56,12 @@ public class SnodeConfig {
private int snodeSendThreadPoolQueueCapacity = 10000; private int snodeSendThreadPoolQueueCapacity = 10000;
private int snodeHandleMqttThreadPoolQueueCapacity = 10000;
private int snodeSendMessageMinPoolSize = 10; private int snodeSendMessageMinPoolSize = 10;
private int snodeSendMessageMaxPoolSize = 20; private int snodeSendMessageMaxPoolSize = 20;
private int snodeHandleMqttThreadPoolQueueCapacity = 10000;
private int snodeHandleMqttMessageMinPoolSize = 10; private int snodeHandleMqttMessageMinPoolSize = 10;
private int snodeHandleMqttMessageMaxPoolSize = 20; private int snodeHandleMqttMessageMaxPoolSize = 20;
...@@ -88,6 +88,12 @@ public class SnodeConfig { ...@@ -88,6 +88,12 @@ public class SnodeConfig {
private int snodePushMessageThreadPoolQueueCapacity = 10000; private int snodePushMessageThreadPoolQueueCapacity = 10000;
private int snodePushMqttMessageMinPoolSize = 10;
private int snodePushMqttMessageMaxPoolSize = 20;
private int snodePushMqttMessageThreadPoolQueueCapacity = 10000;
private int slowConsumerThreshold = 1024; private int slowConsumerThreshold = 1024;
private final String sendMessageInterceptorPath = "META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor"; private final String sendMessageInterceptorPath = "META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor";
...@@ -230,14 +236,6 @@ public class SnodeConfig { ...@@ -230,14 +236,6 @@ public class SnodeConfig {
this.snodeSendThreadPoolQueueCapacity = snodeSendThreadPoolQueueCapacity; this.snodeSendThreadPoolQueueCapacity = snodeSendThreadPoolQueueCapacity;
} }
public int getSnodeHandleMqttThreadPoolQueueCapacity() {
return snodeHandleMqttThreadPoolQueueCapacity;
}
public void setSnodeHandleMqttThreadPoolQueueCapacity(
int snodeHandleMqttThreadPoolQueueCapacity) {
this.snodeHandleMqttThreadPoolQueueCapacity = snodeHandleMqttThreadPoolQueueCapacity;
}
public int getSnodeSendMessageMinPoolSize() { public int getSnodeSendMessageMinPoolSize() {
return snodeSendMessageMinPoolSize; return snodeSendMessageMinPoolSize;
...@@ -279,6 +277,14 @@ public class SnodeConfig { ...@@ -279,6 +277,14 @@ public class SnodeConfig {
this.snodeId = snodeId; this.snodeId = snodeId;
} }
public int getSnodeHandleMqttThreadPoolQueueCapacity() {
return snodeHandleMqttThreadPoolQueueCapacity;
}
public void setSnodeHandleMqttThreadPoolQueueCapacity(int snodeHandleMqttThreadPoolQueueCapacity) {
this.snodeHandleMqttThreadPoolQueueCapacity = snodeHandleMqttThreadPoolQueueCapacity;
}
public int getSnodeHandleMqttMessageMinPoolSize() { public int getSnodeHandleMqttMessageMinPoolSize() {
return snodeHandleMqttMessageMinPoolSize; return snodeHandleMqttMessageMinPoolSize;
} }
...@@ -359,6 +365,30 @@ public class SnodeConfig { ...@@ -359,6 +365,30 @@ public class SnodeConfig {
this.snodePushMessageThreadPoolQueueCapacity = snodePushMessageThreadPoolQueueCapacity; this.snodePushMessageThreadPoolQueueCapacity = snodePushMessageThreadPoolQueueCapacity;
} }
public int getSnodePushMqttMessageMinPoolSize() {
return snodePushMqttMessageMinPoolSize;
}
public void setSnodePushMqttMessageMinPoolSize(int snodePushMqttMessageMinPoolSize) {
this.snodePushMqttMessageMinPoolSize = snodePushMqttMessageMinPoolSize;
}
public int getSnodePushMqttMessageMaxPoolSize() {
return snodePushMqttMessageMaxPoolSize;
}
public void setSnodePushMqttMessageMaxPoolSize(int snodePushMqttMessageMaxPoolSize) {
this.snodePushMqttMessageMaxPoolSize = snodePushMqttMessageMaxPoolSize;
}
public int getSnodePushMqttMessageThreadPoolQueueCapacity() {
return snodePushMqttMessageThreadPoolQueueCapacity;
}
public void setSnodePushMqttMessageThreadPoolQueueCapacity(int snodePushMqttMessageThreadPoolQueueCapacity) {
this.snodePushMqttMessageThreadPoolQueueCapacity = snodePushMqttMessageThreadPoolQueueCapacity;
}
public String getSendMessageInterceptorPath() { public String getSendMessageInterceptorPath() {
return sendMessageInterceptorPath; return sendMessageInterceptorPath;
} }
......
...@@ -68,8 +68,6 @@ public class MqttSubscriptionData extends SubscriptionData { ...@@ -68,8 +68,6 @@ public class MqttSubscriptionData extends SubscriptionData {
if (getClass() != obj.getClass()) if (getClass() != obj.getClass())
return false; return false;
MqttSubscriptionData other = (MqttSubscriptionData) obj; MqttSubscriptionData other = (MqttSubscriptionData) obj;
if (qos != other.qos)
return false;
if (clientId != other.clientId) { if (clientId != other.clientId) {
return false; return false;
} }
......
...@@ -62,5 +62,9 @@ ...@@ -62,5 +62,9 @@
<groupId>io.prometheus</groupId> <groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId> <artifactId>simpleclient_hotspot</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MqttSampleConsumer {
private static Logger log = LoggerFactory.getLogger(MqttSampleConsumer.class);
public static void main(String[] args) throws InterruptedException {
String topic = "mqtt-sample";
int qos = 0;
String broker = "tcp://127.0.0.1:1883";
String clinetId = "JavaSampleConsumer";
MemoryPersistence persistence = new MemoryPersistence();
{
try {
MqttClient sampleClient = new MqttClient(broker, clinetId, persistence);
MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setCleanSession(true);
connectOptions.setKeepAliveInterval(6000);
log.info("Connecting to broker: " + broker);
sampleClient.connect(connectOptions);
log.info("Connected");
sampleClient.setCallback(new MqttCallback() {
@Override public void connectionLost(Throwable throwable) {
System.out.println("connection lost." + throwable.getLocalizedMessage());
}
@Override public void messageArrived(String s, MqttMessage message) throws Exception {
System.out.println(message.toString());
// System.exit(0);
}
@Override public void deliveryComplete(IMqttDeliveryToken token) {
try {
System.out.println("delivery complete." + token.getMessage().toString());
} catch (MqttException e) {
e.printStackTrace();
}
}
});
log.info("Subscribing topic: " + topic);
sampleClient.subscribe(topic, qos);
log.info("Subsrcribe success.");
Thread.sleep(100000000);
} catch (MqttException me) {
log.error("reason " + me.getReasonCode());
log.error("msg " + me.getMessage());
log.error("loc " + me.getLocalizedMessage());
log.error("cause " + me.getCause());
log.error("excep " + me);
me.printStackTrace();
me.printStackTrace();
System.exit(1);
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MqttSampleProducer {
private static Logger log = LoggerFactory.getLogger(MqttSampleProducer.class);
public static void main(String[] args) throws InterruptedException {
String topic = "mqtt-sample";
String messageContent = "hello mqtt";
int qos = 0;
String broker = "tcp://127.0.0.1:1883";
String clientId = "JavaSampleProducer";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setKeepAliveInterval(6000);
log.info("Connecting to broker: " + broker);
sampleClient.connect(connOpts);
log.info("Connected");
log.info("Publishing message: " + messageContent);
MqttMessage message = new MqttMessage(messageContent.getBytes());
message.setQos(qos);
message.setRetained(true);
sampleClient.publish(topic, message);
log.info("Message published");
/*sampleClient.disconnect();
log.info("Disconnected");*/
Thread.sleep(10000000);
} catch (MqttException me) {
log.error("reason " + me.getReasonCode());
log.error("msg " + me.getMessage());
log.error("loc " + me.getLocalizedMessage());
log.error("cause " + me.getCause());
log.error("excep " + me);
me.printStackTrace();
System.exit(1);
}
}
}
...@@ -648,6 +648,11 @@ ...@@ -648,6 +648,11 @@
<artifactId>simpleclient_hotspot</artifactId> <artifactId>simpleclient_hotspot</artifactId>
<version>0.6.0</version> <version>0.6.0</version>
</dependency> </dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
</project> </project>
...@@ -80,6 +80,8 @@ public class RemotingCommand { ...@@ -80,6 +80,8 @@ public class RemotingCommand {
private transient byte[] body; private transient byte[] body;
private Object payload;
public RemotingCommand() { public RemotingCommand() {
} }
...@@ -259,6 +261,14 @@ public class RemotingCommand { ...@@ -259,6 +261,14 @@ public class RemotingCommand {
this.body = body; this.body = body;
} }
public Object getPayload() {
return payload;
}
public void setPayload(Object payload) {
this.payload = payload;
}
public HashMap<String, String> getExtFields() { public HashMap<String, String> getExtFields() {
return extFields; return extFields;
} }
......
...@@ -42,7 +42,7 @@ public class MqttMessage2RemotingCommandHandler extends MessageToMessageDecoder< ...@@ -42,7 +42,7 @@ public class MqttMessage2RemotingCommandHandler extends MessageToMessageDecoder<
if (!(msg instanceof MqttMessage)) { if (!(msg instanceof MqttMessage)) {
return; return;
} }
RemotingCommand requestCommand = null; RemotingCommand requestCommand;
Message2MessageEncodeDecode message2MessageEncodeDecode = EncodeDecodeDispatcher Message2MessageEncodeDecode message2MessageEncodeDecode = EncodeDecodeDispatcher
.getEncodeDecodeDispatcher().get(msg.fixedHeader().messageType()); .getEncodeDecodeDispatcher().get(msg.fixedHeader().messageType());
if (message2MessageEncodeDecode == null) { if (message2MessageEncodeDecode == null) {
......
...@@ -43,8 +43,8 @@ public class RemotingCommand2MqttMessageHandler extends MessageToMessageEncoder< ...@@ -43,8 +43,8 @@ public class RemotingCommand2MqttMessageHandler extends MessageToMessageEncoder<
if (!(msg instanceof RemotingCommand)) { if (!(msg instanceof RemotingCommand)) {
return; return;
} }
MqttMessage mqttMessage = null; MqttMessage mqttMessage;
MqttHeader mqttHeader = (MqttHeader) msg.decodeCommandCustomHeader(MqttHeader.class); MqttHeader mqttHeader = (MqttHeader) msg.readCustomHeader();
Message2MessageEncodeDecode message2MessageEncodeDecode = EncodeDecodeDispatcher Message2MessageEncodeDecode message2MessageEncodeDecode = EncodeDecodeDispatcher
.getEncodeDecodeDispatcher().get( .getEncodeDecodeDispatcher().get(
MqttMessageType.valueOf(mqttHeader.getMessageType())); MqttMessageType.valueOf(mqttHeader.getMessageType()));
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.util.internal.StringUtil;
import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
/**
* Payload of {@link MqttConnectMessage}
*/
public final class RocketMQMqttConnectPayload extends RemotingSerializable {
private String clientIdentifier;
private String willTopic;
private String willMessage;
private String userName;
private String password;
public RocketMQMqttConnectPayload(
String clientIdentifier,
String willTopic,
String willMessage,
String userName,
String password) {
this.clientIdentifier = clientIdentifier;
this.willTopic = willTopic;
this.willMessage = willMessage;
this.userName = userName;
this.password = password;
}
public static RocketMQMqttConnectPayload fromMqttConnectPayload(MqttConnectPayload payload) {
return new RocketMQMqttConnectPayload(payload.clientIdentifier(), payload.willTopic(),
payload.willMessage(), payload.userName(), payload.password());
}
public MqttConnectPayload toMqttConnectPayload() throws UnsupportedEncodingException {
return new MqttConnectPayload(this.clientIdentifier, this.willTopic, this.willMessage.getBytes(
RemotingUtil.REMOTING_CHARSET), this.userName, this.password.getBytes(RemotingUtil.REMOTING_CHARSET));
}
public String getClientIdentifier() {
return clientIdentifier;
}
public String getWillTopic() {
return willTopic;
}
public String getWillMessage() {
return willMessage;
}
public String getUserName() {
return userName;
}
public String getPassword() {
return password;
}
public void setClientIdentifier(String clientIdentifier) {
this.clientIdentifier = clientIdentifier;
}
public void setWillTopic(String willTopic) {
this.willTopic = willTopic;
}
public void setWillMessage(String willMessage) {
this.willMessage = willMessage;
}
public void setUserName(String userName) {
this.userName = userName;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this))
.append('[')
.append("clientIdentifier=").append(clientIdentifier)
.append(", willTopic=").append(willTopic)
.append(", willMessage=").append(willMessage)
.append(", userName=").append(userName)
.append(", password=").append(password)
.append(']')
.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.util.internal.StringUtil;
import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
/**
* Payload of {@link MqttSubAckMessage}
*/
public final class RocketMQMqttSubAckPayload extends RemotingSerializable {
private List<Integer> grantedQoSLevels;
public RocketMQMqttSubAckPayload(List<Integer> grantedQoSLevels) {
this.grantedQoSLevels = Collections.unmodifiableList(grantedQoSLevels);
}
public List<Integer> getGrantedQoSLevels() {
return grantedQoSLevels;
}
public void setGrantedQoSLevels(List<Integer> grantedQoSLevels) {
this.grantedQoSLevels = grantedQoSLevels;
}
public static RocketMQMqttSubAckPayload fromMqttSubAckPayload(MqttSubAckPayload payload) {
return new RocketMQMqttSubAckPayload(payload.grantedQoSLevels());
}
public MqttSubAckPayload toMqttSubAckPayload() throws UnsupportedEncodingException {
return new MqttSubAckPayload(this.grantedQoSLevels);
}
@Override
public String toString() {
return StringUtil.simpleClassName(this) + '[' + "grantedQoSLevels=" + this.grantedQoSLevels + ']';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.util.internal.StringUtil;
import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
/**
* Payload of {@link MqttSubscribeMessage}
*/
public final class RocketMQMqttSubscribePayload extends RemotingSerializable {
private List<MqttTopicSubscription> topicSubscriptions;
public RocketMQMqttSubscribePayload(List<MqttTopicSubscription> topicSubscriptions) {
this.topicSubscriptions = Collections.unmodifiableList(topicSubscriptions);
}
public List<MqttTopicSubscription> getTopicSubscriptions() {
return topicSubscriptions;
}
public void setTopicSubscriptions(
List<MqttTopicSubscription> topicSubscriptions) {
this.topicSubscriptions = topicSubscriptions;
}
public static RocketMQMqttSubscribePayload fromMqttSubscribePayload(MqttSubscribePayload payload) {
return new RocketMQMqttSubscribePayload(payload.topicSubscriptions());
}
public MqttSubscribePayload toMqttSubscribePayload() throws UnsupportedEncodingException {
return new MqttSubscribePayload(this.topicSubscriptions);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
for (int i = 0; i < topicSubscriptions.size() - 1; i++) {
builder.append(topicSubscriptions.get(i)).append(", ");
}
builder.append(topicSubscriptions.get(topicSubscriptions.size() - 1));
builder.append(']');
return builder.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.mqtt;
import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
import io.netty.util.internal.StringUtil;
import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
/**
* Payload of the {@link io.netty.handler.codec.mqtt.MqttUnsubscribeMessage}
*/
public class RocketMQMqttUnSubscribePayload extends RemotingSerializable {
private List<String> topics;
public RocketMQMqttUnSubscribePayload(List<String> topics) {
this.topics = topics;
}
public List<String> getTopics() {
return topics;
}
public void setTopics(List<String> topics) {
this.topics = Collections.unmodifiableList(topics);
}
public static RocketMQMqttUnSubscribePayload fromMqttUnSubscribePayload(MqttUnsubscribePayload payload) {
return new RocketMQMqttUnSubscribePayload(payload.topics());
}
public MqttUnsubscribePayload toMqttUnsubscribePayload() throws UnsupportedEncodingException {
return new MqttUnsubscribePayload(this.topics);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
for (int i = 0; i < topics.size() - 1; i++) {
builder.append("topicName = ").append(topics.get(i)).append(", ");
}
builder.append("topicName = ").append(topics.get(topics.size() - 1))
.append(']');
return builder.toString();
}
}
...@@ -29,15 +29,15 @@ public class EncodeDecodeDispatcher { ...@@ -29,15 +29,15 @@ public class EncodeDecodeDispatcher {
encodeDecodeDispatcher.put(MqttMessageType.CONNECT, new MqttConnectEncodeDecode()); encodeDecodeDispatcher.put(MqttMessageType.CONNECT, new MqttConnectEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.CONNACK, new MqttConnectackEncodeDecode()); encodeDecodeDispatcher.put(MqttMessageType.CONNACK, new MqttConnectackEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.DISCONNECT, null); encodeDecodeDispatcher.put(MqttMessageType.DISCONNECT, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBLISH, null); encodeDecodeDispatcher.put(MqttMessageType.PUBLISH, new MqttPublishEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.PUBACK, null); encodeDecodeDispatcher.put(MqttMessageType.PUBACK, new MqttPubackEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.PUBREC, null); encodeDecodeDispatcher.put(MqttMessageType.PUBREC, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBREL, null); encodeDecodeDispatcher.put(MqttMessageType.PUBREL, null);
encodeDecodeDispatcher.put(MqttMessageType.PUBCOMP, null); encodeDecodeDispatcher.put(MqttMessageType.PUBCOMP, null);
encodeDecodeDispatcher.put(MqttMessageType.SUBSCRIBE, null); encodeDecodeDispatcher.put(MqttMessageType.SUBSCRIBE, new MqttSubscribeEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.SUBACK, null); encodeDecodeDispatcher.put(MqttMessageType.SUBACK, new MqttSubackEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.UNSUBSCRIBE, null); encodeDecodeDispatcher.put(MqttMessageType.UNSUBSCRIBE, new MqttUnSubscribeEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.UNSUBACK, null); encodeDecodeDispatcher.put(MqttMessageType.UNSUBACK, new MqttUnSubackEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.PINGREQ, null); encodeDecodeDispatcher.put(MqttMessageType.PINGREQ, null);
encodeDecodeDispatcher.put(MqttMessageType.PINGRESP, null); encodeDecodeDispatcher.put(MqttMessageType.PINGRESP, null);
} }
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher; package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
...@@ -25,5 +26,5 @@ public interface Message2MessageEncodeDecode { ...@@ -25,5 +26,5 @@ public interface Message2MessageEncodeDecode {
RemotingCommand decode(MqttMessage mqttMessage); RemotingCommand decode(MqttMessage mqttMessage);
MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException; MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException, UnsupportedEncodingException;
} }
...@@ -21,18 +21,14 @@ import io.netty.handler.codec.mqtt.MqttConnectMessage; ...@@ -21,18 +21,14 @@ import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
public class MqttConnectEncodeDecode implements Message2MessageEncodeDecode { public class MqttConnectEncodeDecode implements Message2MessageEncodeDecode {
@Override @Override
public RemotingCommand decode(MqttMessage mqttMessage) { public RemotingCommand decode(MqttMessage mqttMessage) {
RocketMQMqttConnectPayload payload = RocketMQMqttConnectPayload RemotingCommand requestCommand;
.fromMqttConnectPayload(((MqttConnectMessage) mqttMessage).payload());
RemotingCommand requestCommand = null;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttConnectVariableHeader variableHeader = (MqttConnectVariableHeader) mqttMessage MqttConnectVariableHeader variableHeader = (MqttConnectVariableHeader) mqttMessage
.variableHeader(); .variableHeader();
...@@ -57,9 +53,7 @@ public class MqttConnectEncodeDecode implements Message2MessageEncodeDecode { ...@@ -57,9 +53,7 @@ public class MqttConnectEncodeDecode implements Message2MessageEncodeDecode {
requestCommand = RemotingCommand requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader); .createRequestCommand(1000, mqttHeader);
CodecHelper.makeCustomHeaderToNet(requestCommand); requestCommand.setPayload(((MqttConnectMessage) mqttMessage).payload());
requestCommand.setBody(payload.encode());
return requestCommand; return requestCommand;
} }
......
...@@ -37,9 +37,7 @@ public class MqttConnectackEncodeDecode implements Message2MessageEncodeDecode { ...@@ -37,9 +37,7 @@ public class MqttConnectackEncodeDecode implements Message2MessageEncodeDecode {
@Override @Override
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException { public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException {
MqttHeader mqttHeader = (MqttHeader) remotingCommand MqttHeader mqttHeader = (MqttHeader)remotingCommand.readCustomHeader();
.decodeCommandCustomHeader(MqttHeader.class);
return new MqttConnAckMessage( return new MqttConnAckMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, mqttHeader.isDup(), new MqttFixedHeader(MqttMessageType.CONNACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
......
...@@ -36,8 +36,7 @@ public class MqttPubackEncodeDecode implements Message2MessageEncodeDecode { ...@@ -36,8 +36,7 @@ public class MqttPubackEncodeDecode implements Message2MessageEncodeDecode {
@Override @Override
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException { public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException {
MqttHeader mqttHeader = (MqttHeader) remotingCommand MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader();
.decodeCommandCustomHeader(MqttHeader.class);
return new MqttPubAckMessage( return new MqttPubAckMessage(
new MqttFixedHeader(MqttMessageType.PUBACK, mqttHeader.isDup(), new MqttFixedHeader(MqttMessageType.PUBACK, mqttHeader.isDup(),
......
...@@ -18,11 +18,12 @@ ...@@ -18,11 +18,12 @@
package org.apache.rocketmq.remoting.transport.mqtt.dispatcher; package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.apache.rocketmq.remoting.netty.CodecHelper; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
...@@ -30,13 +31,9 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode { ...@@ -30,13 +31,9 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode {
@Override @Override
public RemotingCommand decode(MqttMessage mqttMessage) { public RemotingCommand decode(MqttMessage mqttMessage) {
ByteBuf byteBuf = ((MqttPublishMessage) mqttMessage).payload(); RemotingCommand requestCommand;
byte[] payload = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(payload);
RemotingCommand requestCommand = null;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttConnAckVariableHeader variableHeader = (MqttConnAckVariableHeader) mqttMessage MqttPublishVariableHeader variableHeader = (MqttPublishVariableHeader) mqttMessage
.variableHeader(); .variableHeader();
MqttHeader mqttHeader = new MqttHeader(); MqttHeader mqttHeader = new MqttHeader();
...@@ -46,19 +43,23 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode { ...@@ -46,19 +43,23 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode {
mqttHeader.setRetain(mqttFixedHeader.isRetain()); mqttHeader.setRetain(mqttFixedHeader.isRetain());
mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength()); mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength());
mqttHeader.setConnectReturnCode(variableHeader.connectReturnCode().name()); mqttHeader.setTopicName(variableHeader.topicName());
mqttHeader.setSessionPresent(variableHeader.isSessionPresent()); mqttHeader.setPacketId(variableHeader.packetId());
requestCommand = RemotingCommand requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader); .createRequestCommand(1000, mqttHeader);
CodecHelper.makeCustomHeaderToNet(requestCommand); //invoke copy to generate a new ByteBuf or increase refCnt by 1 by invoking retain() method, because release method is invoked in Message2MessageEncodeDecode.channelRead
requestCommand.setPayload(((MqttPublishMessage) mqttMessage).payload().copy());
requestCommand.setBody(payload);
return requestCommand; return requestCommand;
} }
@Override @Override
public MqttMessage encode(RemotingCommand remotingCommand) { public MqttMessage encode(RemotingCommand remotingCommand) {
return null; MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader();
return new MqttPublishMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()),
new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId()), (ByteBuf) remotingCommand.getPayload());
} }
} }
...@@ -24,6 +24,7 @@ import io.netty.handler.codec.mqtt.MqttMessageType; ...@@ -24,6 +24,7 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload; import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
...@@ -36,14 +37,12 @@ public class MqttSubackEncodeDecode implements Message2MessageEncodeDecode { ...@@ -36,14 +37,12 @@ public class MqttSubackEncodeDecode implements Message2MessageEncodeDecode {
} }
@Override @Override
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException { public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException, UnsupportedEncodingException {
MqttHeader mqttHeader = (MqttHeader) remotingCommand MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader();
.decodeCommandCustomHeader(MqttHeader.class);
return new MqttSubAckMessage( return new MqttSubAckMessage(
new MqttFixedHeader(MqttMessageType.SUBACK, mqttHeader.isDup(), new MqttFixedHeader(MqttMessageType.SUBACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()), mqttHeader.getRemainingLength()),
MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()), new MqttSubAckPayload()); MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()), (MqttSubAckPayload) remotingCommand.getPayload());
} }
} }
...@@ -21,19 +21,14 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader; ...@@ -21,19 +21,14 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttSubscribePayload;
public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode { public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode {
@Override @Override
public RemotingCommand decode(MqttMessage mqttMessage) { public RemotingCommand decode(MqttMessage mqttMessage) {
RocketMQMqttSubscribePayload payload = RocketMQMqttSubscribePayload RemotingCommand requestCommand;
.fromMqttSubscribePayload(((MqttSubscribeMessage) mqttMessage).payload());
RemotingCommand requestCommand = null;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage
.variableHeader(); .variableHeader();
...@@ -49,9 +44,7 @@ public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode { ...@@ -49,9 +44,7 @@ public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode {
requestCommand = RemotingCommand requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader); .createRequestCommand(1000, mqttHeader);
CodecHelper.makeCustomHeaderToNet(requestCommand); requestCommand.setPayload(((MqttSubscribeMessage) mqttMessage).payload());
requestCommand.setBody(payload.encode());
return requestCommand; return requestCommand;
} }
......
...@@ -36,8 +36,7 @@ public class MqttUnSubackEncodeDecode implements Message2MessageEncodeDecode { ...@@ -36,8 +36,7 @@ public class MqttUnSubackEncodeDecode implements Message2MessageEncodeDecode {
@Override @Override
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException { public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException {
MqttHeader mqttHeader = (MqttHeader) remotingCommand MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader();
.decodeCommandCustomHeader(MqttHeader.class);
return new MqttUnsubAckMessage( return new MqttUnsubAckMessage(
new MqttFixedHeader(MqttMessageType.UNSUBACK, mqttHeader.isDup(), new MqttFixedHeader(MqttMessageType.UNSUBACK, mqttHeader.isDup(),
......
...@@ -21,19 +21,14 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader; ...@@ -21,19 +21,14 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttUnSubscribePayload;
public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode { public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode {
@Override @Override
public RemotingCommand decode(MqttMessage mqttMessage) { public RemotingCommand decode(MqttMessage mqttMessage) {
RocketMQMqttUnSubscribePayload payload = RocketMQMqttUnSubscribePayload RemotingCommand requestCommand;
.fromMqttUnSubscribePayload(((MqttUnsubscribeMessage) mqttMessage).payload());
RemotingCommand requestCommand = null;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage
.variableHeader(); .variableHeader();
...@@ -44,14 +39,10 @@ public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode ...@@ -44,14 +39,10 @@ public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode
mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value()); mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value());
mqttHeader.setRetain(mqttFixedHeader.isRetain()); mqttHeader.setRetain(mqttFixedHeader.isRetain());
mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength()); mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength());
mqttHeader.setMessageId(variableHeader.messageId()); mqttHeader.setMessageId(variableHeader.messageId());
requestCommand = RemotingCommand requestCommand = RemotingCommand.createRequestCommand(1000, mqttHeader);
.createRequestCommand(1000, mqttHeader); requestCommand.setPayload(((MqttUnsubscribeMessage) mqttMessage).payload());
CodecHelper.makeCustomHeaderToNet(requestCommand);
requestCommand.setBody(payload.encode());
return requestCommand; return requestCommand;
} }
......
...@@ -72,6 +72,7 @@ import org.apache.rocketmq.snode.service.WillMessageService; ...@@ -72,6 +72,7 @@ import org.apache.rocketmq.snode.service.WillMessageService;
import org.apache.rocketmq.snode.service.impl.ClientServiceImpl; import org.apache.rocketmq.snode.service.impl.ClientServiceImpl;
import org.apache.rocketmq.snode.service.impl.LocalEnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.LocalEnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl; import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl;
import org.apache.rocketmq.snode.service.impl.MqttPushServiceImpl;
import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.PushServiceImpl; import org.apache.rocketmq.snode.service.impl.PushServiceImpl;
import org.apache.rocketmq.snode.service.impl.RemoteEnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.RemoteEnodeServiceImpl;
...@@ -118,6 +119,7 @@ public class SnodeController { ...@@ -118,6 +119,7 @@ public class SnodeController {
private SlowConsumerService slowConsumerService; private SlowConsumerService slowConsumerService;
private MetricsService metricsService; private MetricsService metricsService;
private WillMessageService willMessageService; private WillMessageService willMessageService;
private MqttPushServiceImpl mqttPushService;
private final ScheduledExecutorService scheduledExecutorService = Executors private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( .newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
...@@ -136,11 +138,15 @@ public class SnodeController { ...@@ -136,11 +138,15 @@ public class SnodeController {
} }
this.nnodeService = new NnodeServiceImpl(this); this.nnodeService = new NnodeServiceImpl(this);
this.scheduledService = new ScheduledServiceImpl(this); this.scheduledService = new ScheduledServiceImpl(this);
this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient() this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient();
.init(this.getNettyClientConfig(), null); if (this.remotingClient != null) {
this.remotingClient.init(this.getNettyClientConfig(), null);
}
this.mqttRemotingClient = RemotingClientFactory.getInstance() this.mqttRemotingClient = RemotingClientFactory.getInstance()
.createRemotingClient(RemotingUtil.MQTT_PROTOCOL) .createRemotingClient(RemotingUtil.MQTT_PROTOCOL);
.init(this.getNettyClientConfig(), null); if (this.mqttRemotingClient != null) {
this.mqttRemotingClient.init(this.getNettyClientConfig(), null);
}
this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor( this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(), snodeConfig.getSnodeSendMessageMinPoolSize(),
...@@ -211,6 +217,7 @@ public class SnodeController { ...@@ -211,6 +217,7 @@ public class SnodeController {
this.slowConsumerService = new SlowConsumerServiceImpl(this); this.slowConsumerService = new SlowConsumerServiceImpl(this);
this.metricsService = new MetricsServiceImpl(); this.metricsService = new MetricsServiceImpl();
this.willMessageService = new WillMessageServiceImpl(this); this.willMessageService = new WillMessageServiceImpl(this);
this.mqttPushService = new MqttPushServiceImpl(this);
} }
public SnodeConfig getSnodeConfig() { public SnodeConfig getSnodeConfig() {
...@@ -233,15 +240,21 @@ public class SnodeController { ...@@ -233,15 +240,21 @@ public class SnodeController {
} }
public boolean initialize() { public boolean initialize() {
this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig, this.clientHousekeepingService);
this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(RemotingUtil.MQTT_PROTOCOL)
.init(this.nettyServerConfig, this.clientHousekeepingService);
this.registerProcessor();
initSnodeInterceptorGroup(); initSnodeInterceptorGroup();
initRemotingServerInterceptorGroup(); initRemotingServerInterceptorGroup();
initAclInterceptorGroup(); initAclInterceptorGroup();
this.snodeServer.registerInterceptorGroup(this.remotingServerInterceptorGroup); this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer();
this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup); if (this.snodeServer != null) {
this.snodeServer.init(this.nettyServerConfig, this.clientHousekeepingService);
this.snodeServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
}
this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(
RemotingUtil.MQTT_PROTOCOL);
if (this.mqttRemotingServer != null) {
this.mqttRemotingServer.init(this.nettyServerConfig, this.clientHousekeepingService);
this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
}
registerProcessor();
return true; return true;
} }
...@@ -313,30 +326,40 @@ public class SnodeController { ...@@ -313,30 +326,40 @@ public class SnodeController {
} }
public void registerProcessor() { public void registerProcessor() {
this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor); if (snodeServer != null) {
this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor); this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor); this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.LOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.UNLOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.LOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor);
this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, defaultMqttMessageProcessor, handleMqttMessageExecutor); this.snodeServer.registerProcessor(RequestCode.UNLOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor);
}
if (mqttRemotingServer != null) {
this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, defaultMqttMessageProcessor, handleMqttMessageExecutor);
}
} }
public void start() { public void start() {
initialize(); initialize();
this.snodeServer.start(); if (snodeServer != null) {
this.mqttRemotingServer.start(); this.snodeServer.start();
}
if (mqttRemotingServer != null) {
this.mqttRemotingServer.start();
}
this.remotingClient.start(); this.remotingClient.start();
this.mqttRemotingClient.start(); if (mqttRemotingClient != null) {
this.mqttRemotingClient.start();
}
this.scheduledService.startScheduleTask(); this.scheduledService.startScheduleTask();
this.clientHousekeepingService.start(this.snodeConfig.getHouseKeepingInterval()); this.clientHousekeepingService.start(this.snodeConfig.getHouseKeepingInterval());
this.metricsService.start(this.snodeConfig.getMetricsExportPort()); this.metricsService.start(this.snodeConfig.getMetricsExportPort());
...@@ -526,4 +549,12 @@ public class SnodeController { ...@@ -526,4 +549,12 @@ public class SnodeController {
WillMessageService willMessageService) { WillMessageService willMessageService) {
this.willMessageService = willMessageService; this.willMessageService = willMessageService;
} }
public MqttPushServiceImpl getMqttPushService() {
return mqttPushService;
}
public void setMqttPushService(MqttPushServiceImpl mqttPushService) {
this.mqttPushService = mqttPushService;
}
} }
...@@ -38,6 +38,7 @@ import org.apache.rocketmq.logging.InternalLogger; ...@@ -38,6 +38,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.TlsMode; import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig; import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
...@@ -119,6 +120,22 @@ public class SnodeStartup { ...@@ -119,6 +120,22 @@ public class SnodeStartup {
System.exit(-2); System.exit(-2);
} }
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), snodeConfig);
String namesrvAddr = snodeConfig.getNamesrvAddr();
if (null != namesrvAddr) {
try {
String[] addrArray = namesrvAddr.split(";");
for (String addr : addrArray) {
RemotingUtil.string2SocketAddress(addr);
}
} catch (Exception e) {
System.out.printf(
"The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
namesrvAddr);
System.exit(-3);
}
}
MixAll.printObjectProperties(log, snodeConfig); MixAll.printObjectProperties(log, snodeConfig);
MixAll.printObjectProperties(log, snodeConfig.getNettyServerConfig()); MixAll.printObjectProperties(log, snodeConfig.getNettyServerConfig());
MixAll.printObjectProperties(log, snodeConfig.getNettyClientConfig()); MixAll.printObjectProperties(log, snodeConfig.getNettyClientConfig());
......
...@@ -44,6 +44,8 @@ public class Client { ...@@ -44,6 +44,8 @@ public class Client {
private boolean cleanSession; private boolean cleanSession;
private boolean willFlag;
private String snodeAddress; private String snodeAddress;
public ClientRole getClientRole() { public ClientRole getClientRole() {
...@@ -71,13 +73,14 @@ public class Client { ...@@ -71,13 +73,14 @@ public class Client {
language == client.language && language == client.language &&
isConnected == client.isConnected && isConnected == client.isConnected &&
cleanSession == client.cleanSession && cleanSession == client.cleanSession &&
willFlag == client.willFlag &&
snodeAddress == client.snodeAddress; snodeAddress == client.snodeAddress;
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(clientRole, clientId, groups, remotingChannel, heartbeatInterval, return Objects.hash(clientRole, clientId, groups, remotingChannel, heartbeatInterval,
lastUpdateTimestamp, version, language, isConnected, cleanSession, snodeAddress); lastUpdateTimestamp, version, language, isConnected, cleanSession, willFlag, snodeAddress);
} }
public RemotingChannel getRemotingChannel() { public RemotingChannel getRemotingChannel() {
...@@ -144,6 +147,14 @@ public class Client { ...@@ -144,6 +147,14 @@ public class Client {
this.cleanSession = cleanSession; this.cleanSession = cleanSession;
} }
public boolean isWillFlag() {
return willFlag;
}
public void setWillFlag(boolean willFlag) {
this.willFlag = willFlag;
}
public String getSnodeAddress() { public String getSnodeAddress() {
return snodeAddress; return snodeAddress;
} }
...@@ -173,6 +184,7 @@ public class Client { ...@@ -173,6 +184,7 @@ public class Client {
", language=" + language + ", language=" + language +
", isConnected=" + isConnected + ", isConnected=" + isConnected +
", cleanSession=" + cleanSession + ", cleanSession=" + cleanSession +
", willFlag=" + willFlag +
", snodeAddress=" + snodeAddress + ", snodeAddress=" + snodeAddress +
'}'; '}';
} }
......
...@@ -43,7 +43,7 @@ public class ClientHousekeepingService implements ChannelEventListener { ...@@ -43,7 +43,7 @@ public class ClientHousekeepingService implements ChannelEventListener {
public void start(long interval) { public void start(long interval) {
this.producerManager.startScan(interval); this.producerManager.startScan(interval);
this.consumerManager.startScan(interval); this.consumerManager.startScan(interval);
this.iotClientManager.startScan(interval); // this.iotClientManager.startScan(interval);
} }
public void shutdown() { public void shutdown() {
......
...@@ -144,7 +144,7 @@ public abstract class ClientManagerImpl implements ClientManager { ...@@ -144,7 +144,7 @@ public abstract class ClientManagerImpl implements ClientManager {
return updated; return updated;
} }
private void removeClient(String groupId, RemotingChannel remotingChannel) { protected void removeClient(String groupId, RemotingChannel remotingChannel) {
ConcurrentHashMap<RemotingChannel, Client> channelTable = groupClientTable.get(groupId); ConcurrentHashMap<RemotingChannel, Client> channelTable = groupClientTable.get(groupId);
if (channelTable != null) { if (channelTable != null) {
Client prev = channelTable.remove(remotingChannel); Client prev = channelTable.remove(remotingChannel);
...@@ -211,6 +211,9 @@ public abstract class ClientManagerImpl implements ClientManager { ...@@ -211,6 +211,9 @@ public abstract class ClientManagerImpl implements ClientManager {
} }
ConcurrentHashMap<RemotingChannel, Client> channelClientMap = groupClientTable ConcurrentHashMap<RemotingChannel, Client> channelClientMap = groupClientTable
.get(groupId); .get(groupId);
if (remotingChannel instanceof NettyChannelHandlerContextImpl) {
remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel());
}
return channelClientMap.get(remotingChannel); return channelClientMap.get(remotingChannel);
} }
} }
...@@ -19,9 +19,10 @@ package org.apache.rocketmq.snode.client.impl; ...@@ -19,9 +19,10 @@ package org.apache.rocketmq.snode.client.impl;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.heartbeat.MqttSubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
...@@ -35,7 +36,7 @@ public class IOTClientManagerImpl extends ClientManagerImpl { ...@@ -35,7 +36,7 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
public static final String IOT_GROUP = "IOT_GROUP"; public static final String IOT_GROUP = "IOT_GROUP";
private final SnodeController snodeController; private final SnodeController snodeController;
private final ConcurrentHashMap<String/*root topic*/, ConcurrentHashMap<Client, List<MqttSubscriptionData>>> topic2SubscriptionTable = new ConcurrentHashMap<>( private final ConcurrentHashMap<String/*root topic*/, ConcurrentHashMap<Client, Set<SubscriptionData>>> topic2SubscriptionTable = new ConcurrentHashMap<>(
1024); 1024);
private final ConcurrentHashMap<String/*clientId*/, Subscription> clientId2Subscription = new ConcurrentHashMap<>(1024); private final ConcurrentHashMap<String/*clientId*/, Subscription> clientId2Subscription = new ConcurrentHashMap<>(1024);
...@@ -47,9 +48,25 @@ public class IOTClientManagerImpl extends ClientManagerImpl { ...@@ -47,9 +48,25 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
//do the logic when client sends unsubscribe packet. //do the logic when client sends unsubscribe packet.
} }
@Override public void onClose(Set<String> groups, RemotingChannel remotingChannel) {
for (String groupId : groups) {
//remove client after invoking onClosed method(client may be used in onClosed)
onClosed(groupId, remotingChannel);
removeClient(groupId, remotingChannel);
}
}
@Override @Override
public void onClosed(String group, RemotingChannel remotingChannel) { public void onClosed(String group, RemotingChannel remotingChannel) {
//do the logic when connection is closed by any reason. //do the logic when connection is closed by any reason.
//step1. Clean subscription data if cleanSession=1
Client client = this.getClient(IOT_GROUP, remotingChannel);
if (client.isCleanSession()) {
cleanSessionState(client.getClientId());
}
//step2. Publish will message associated with current connection(Question: Does will message need to be deleted after publishing.)
//step3. If will retain is true, add the will message to retain message.
} }
@Override @Override
...@@ -63,10 +80,10 @@ public class IOTClientManagerImpl extends ClientManagerImpl { ...@@ -63,10 +80,10 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
public void cleanSessionState(String clientId) { public void cleanSessionState(String clientId) {
clientId2Subscription.remove(clientId); clientId2Subscription.remove(clientId);
for (Iterator<Map.Entry<String, ConcurrentHashMap<Client, List<MqttSubscriptionData>>>> iterator = topic2SubscriptionTable.entrySet().iterator(); iterator.hasNext(); ) { for (Iterator<Map.Entry<String, ConcurrentHashMap<Client, Set<SubscriptionData>>>> iterator = topic2SubscriptionTable.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, ConcurrentHashMap<Client, List<MqttSubscriptionData>>> next = iterator.next(); Map.Entry<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> next = iterator.next();
for (Iterator<Map.Entry<Client, List<MqttSubscriptionData>>> iterator1 = next.getValue().entrySet().iterator(); iterator1.hasNext(); ) { for (Iterator<Map.Entry<Client, Set<SubscriptionData>>> iterator1 = next.getValue().entrySet().iterator(); iterator1.hasNext(); ) {
Map.Entry<Client, List<MqttSubscriptionData>> next1 = iterator1.next(); Map.Entry<Client, Set<SubscriptionData>> next1 = iterator1.next();
if (!next1.getKey().getClientId().equals(clientId)) { if (!next1.getKey().getClientId().equals(clientId)) {
continue; continue;
} }
...@@ -84,7 +101,7 @@ public class IOTClientManagerImpl extends ClientManagerImpl { ...@@ -84,7 +101,7 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
return snodeController; return snodeController;
} }
public ConcurrentHashMap<String, ConcurrentHashMap<Client, List<MqttSubscriptionData>>> getTopic2SubscriptionTable() { public ConcurrentHashMap<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> getTopic2SubscriptionTable() {
return topic2SubscriptionTable; return topic2SubscriptionTable;
} }
......
...@@ -17,15 +17,20 @@ ...@@ -17,15 +17,20 @@
package org.apache.rocketmq.snode.processor; package org.apache.rocketmq.snode.processor;
import com.alibaba.fastjson.JSON; import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
...@@ -33,11 +38,9 @@ import org.apache.rocketmq.logging.InternalLogger; ...@@ -33,11 +38,9 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler; import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
...@@ -83,20 +86,31 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { ...@@ -83,20 +86,31 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
@Override @Override
public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message) public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message)
throws RemotingCommandException, UnsupportedEncodingException { throws RemotingCommandException, UnsupportedEncodingException {
MqttHeader mqttHeader = (MqttHeader) message.decodeCommandCustomHeader(MqttHeader.class); MqttHeader mqttHeader = (MqttHeader) message.readCustomHeader();
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.valueOf(mqttHeader.getMessageType()), MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.valueOf(mqttHeader.getMessageType()),
mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()); mqttHeader.getRemainingLength());
MqttMessage mqttMessage = null; MqttMessage mqttMessage = null;
switch (fixedHeader.messageType()) { switch (fixedHeader.messageType()) {
case CONNECT: case CONNECT:
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader( MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(
mqttHeader.getName(), mqttHeader.getVersion(), mqttHeader.isHasUserName(), mqttHeader.getName(), mqttHeader.getVersion(), mqttHeader.isHasUserName(),
mqttHeader.isHasPassword(), mqttHeader.isWillRetain(), mqttHeader.isHasPassword(), mqttHeader.isWillRetain(),
mqttHeader.getWillQos(), mqttHeader.isWillFlag(), mqttHeader.getWillQos(), mqttHeader.isWillFlag(),
mqttHeader.isCleanSession(), mqttHeader.getKeepAliveTimeSeconds()); mqttHeader.isCleanSession(), mqttHeader.getKeepAliveTimeSeconds());
RocketMQMqttConnectPayload payload = decode(message.getBody(), RocketMQMqttConnectPayload.class); MqttConnectPayload mqttConnectPayload = (MqttConnectPayload) message.getPayload();
mqttMessage = new MqttConnectMessage(fixedHeader, variableHeader, payload.toMqttConnectPayload()); mqttMessage = new MqttConnectMessage(fixedHeader, mqttConnectVariableHeader, mqttConnectPayload);
break;
case PUBLISH:
MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(mqttHeader.getTopicName(), mqttHeader.getPacketId());
mqttMessage = new MqttPublishMessage(fixedHeader, mqttPublishVariableHeader, (ByteBuf) message.getPayload());
break;
case SUBSCRIBE:
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(mqttHeader.getMessageId());
mqttMessage = new MqttSubscribeMessage(fixedHeader, mqttMessageIdVariableHeader, (MqttSubscribePayload) message.getPayload());
break;
case UNSUBSCRIBE:
case PINGREQ:
case DISCONNECT: case DISCONNECT:
} }
return type2handler.get(MqttMessageType.valueOf(mqttHeader.getMessageType())).handleMessage(mqttMessage, remotingChannel); return type2handler.get(MqttMessageType.valueOf(mqttHeader.getMessageType())).handleMessage(mqttMessage, remotingChannel);
...@@ -107,11 +121,6 @@ public class DefaultMqttMessageProcessor implements RequestProcessor { ...@@ -107,11 +121,6 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
return false; return false;
} }
private <T> T decode(final byte[] data, Class<T> classOfT) {
final String json = new String(data, Charset.forName(RemotingUtil.REMOTING_CHARSET));
return JSON.parseObject(json, classOfT);
}
private void registerMessageHandler(MqttMessageType type, MessageHandler handler) { private void registerMessageHandler(MqttMessageType type, MessageHandler handler) {
type2handler.put(type, handler); type2handler.put(type, handler);
} }
......
...@@ -23,6 +23,7 @@ import io.netty.handler.codec.mqtt.MqttConnectReturnCode; ...@@ -23,6 +23,7 @@ import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.HashSet;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.mqtt.WillMessage; import org.apache.rocketmq.common.message.mqtt.WillMessage;
...@@ -92,6 +93,7 @@ public class MqttConnectMessageHandler implements MessageHandler { ...@@ -92,6 +93,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
} }
//treat a second CONNECT packet as a protocol violation and disconnect //treat a second CONNECT packet as a protocol violation and disconnect
if (isConnected(remotingChannel, payload.clientIdentifier())) { if (isConnected(remotingChannel, payload.clientIdentifier())) {
log.error("This client has been connected. The second CONNECT packet is treated as a protocol vialation and the connection will be closed.");
remotingChannel.close(); remotingChannel.close();
return null; return null;
} }
...@@ -118,6 +120,7 @@ public class MqttConnectMessageHandler implements MessageHandler { ...@@ -118,6 +120,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
Client client = new Client(); Client client = new Client();
client.setClientId(payload.clientIdentifier()); client.setClientId(payload.clientIdentifier());
client.setClientRole(ClientRole.IOTCLIENT); client.setClientRole(ClientRole.IOTCLIENT);
client.setGroups(new HashSet<String>(){{add("IOT_GROUP");}});
client.setConnected(true); client.setConnected(true);
client.setRemotingChannel(remotingChannel); client.setRemotingChannel(remotingChannel);
client.setLastUpdateTimestamp(System.currentTimeMillis()); client.setLastUpdateTimestamp(System.currentTimeMillis());
......
...@@ -62,7 +62,9 @@ public class MqttDisconnectMessageHandler implements MessageHandler { ...@@ -62,7 +62,9 @@ public class MqttDisconnectMessageHandler implements MessageHandler {
snodeController.getWillMessageService().deleteWillMessage(client.getClientId()); snodeController.getWillMessageService().deleteWillMessage(client.getClientId());
} }
client.setConnected(false); client.setConnected(false);
remotingChannel.close(); if (remotingChannel.isActive()) {
remotingChannel.close();
}
return null; return null;
} }
} }
...@@ -17,13 +17,27 @@ ...@@ -17,13 +17,27 @@
package org.apache.rocketmq.snode.processor.mqtthandler; package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.exception.WrongMessageTypeException;
import org.apache.rocketmq.snode.util.MqttUtil;
public class MqttPublishMessageHandler implements MessageHandler { public class MqttPublishMessageHandler implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeController snodeController; private final SnodeController snodeController;
public MqttPublishMessageHandler(SnodeController snodeController) { public MqttPublishMessageHandler(SnodeController snodeController) {
...@@ -32,7 +46,40 @@ public class MqttPublishMessageHandler implements MessageHandler { ...@@ -32,7 +46,40 @@ public class MqttPublishMessageHandler implements MessageHandler {
@Override @Override
public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) { public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
if (!(message instanceof MqttPublishMessage)) {
log.error("Wrong message type! Expected type: PUBLISH but {} was received.", message.fixedHeader().messageType());
throw new WrongMessageTypeException("Wrong message type exception.");
}
MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) message;
MqttFixedHeader fixedHeader = mqttPublishMessage.fixedHeader();
MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader();
if (MqttUtil.isQosLegal(fixedHeader.qosLevel())) {
log.error("The QoS level should be 0 or 1 or 2. The connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
remotingChannel.close();
return null;
}
ByteBuf payload = mqttPublishMessage.payload();
if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) {
snodeController.getMqttPushService().pushMessageQos0(variableHeader.topicName(), payload);
} else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) {
//Push messages to subscribers and add it to IN-FLIGHT messages
}
if (fixedHeader.qosLevel().value() > 0) {
RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class);
MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader();
if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) {
mqttHeader.setMessageType(MqttMessageType.PUBACK.value());
mqttHeader.setDup(false);
mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
mqttHeader.setRetain(false);
mqttHeader.setRemainingLength(2);
mqttHeader.setPacketId(0);
} else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) {
//PUBREC/PUBREL/PUBCOMP
}
return command;
}
return null; return null;
} }
} }
...@@ -25,7 +25,9 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage; ...@@ -25,7 +25,9 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscribePayload; import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription; import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
...@@ -36,7 +38,6 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; ...@@ -36,7 +38,6 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttSubAckPayload;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client; import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
...@@ -80,6 +81,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler { ...@@ -80,6 +81,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
if (payload.topicSubscriptions() == null || payload.topicSubscriptions().size() == 0) { if (payload.topicSubscriptions() == null || payload.topicSubscriptions().size() == 0) {
log.error("The payload of a SUBSCRIBE packet MUST contain at least one Topic Filter / QoS pair. This will be treated as protocol violation and the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString()); log.error("The payload of a SUBSCRIBE packet MUST contain at least one Topic Filter / QoS pair. This will be treated as protocol violation and the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
remotingChannel.close(); remotingChannel.close();
return null;
} }
if (isQosLegal(payload.topicSubscriptions())) { if (isQosLegal(payload.topicSubscriptions())) {
log.error("The QoS level of Topic Filter / QoS pairs should be 0 or 1 or 2. The connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString()); log.error("The QoS level of Topic Filter / QoS pairs should be 0 or 1 or 2. The connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
...@@ -97,12 +99,13 @@ public class MqttSubscribeMessageHandler implements MessageHandler { ...@@ -97,12 +99,13 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
mqttHeader.setDup(false); mqttHeader.setDup(false);
mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value()); mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
mqttHeader.setRetain(false); mqttHeader.setRetain(false);
// mqttHeader.setRemainingLength(0x02);
mqttHeader.setMessageId(mqttSubscribeMessage.variableHeader().messageId()); mqttHeader.setMessageId(mqttSubscribeMessage.variableHeader().messageId());
List<Integer> grantQoss = doSubscribe(client, payload.topicSubscriptions(), iotClientManager); List<Integer> grantQoss = doSubscribe(client, payload.topicSubscriptions(), iotClientManager);
RocketMQMqttSubAckPayload ackPayload = RocketMQMqttSubAckPayload.fromMqttSubAckPayload(new MqttSubAckPayload(grantQoss)); //Publish retained messages to subscribers.
command.setBody(ackPayload.encode()); MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(grantQoss);
command.setPayload(mqttSubAckPayload);
mqttHeader.setRemainingLength(0x02 + mqttSubAckPayload.grantedQoSLevels().size());
command.setRemark(null); command.setRemark(null);
command.setCode(ResponseCode.SUCCESS); command.setCode(ResponseCode.SUCCESS);
return command; return command;
...@@ -111,9 +114,9 @@ public class MqttSubscribeMessageHandler implements MessageHandler { ...@@ -111,9 +114,9 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
private List<Integer> doSubscribe(Client client, List<MqttTopicSubscription> mqttTopicSubscriptions, private List<Integer> doSubscribe(Client client, List<MqttTopicSubscription> mqttTopicSubscriptions,
IOTClientManagerImpl iotClientManager) { IOTClientManagerImpl iotClientManager) {
//do the logic when client sends subscribe packet. //do the logic when client sends subscribe packet.
//1.register clientId2Subscription //1.update clientId2Subscription
ConcurrentHashMap<String, Subscription> clientId2Subscription = iotClientManager.getClientId2Subscription(); ConcurrentHashMap<String, Subscription> clientId2Subscription = iotClientManager.getClientId2Subscription();
ConcurrentHashMap<String, ConcurrentHashMap<Client, List<MqttSubscriptionData>>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable(); ConcurrentHashMap<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable();
Subscription subscription = null; Subscription subscription = null;
if (clientId2Subscription.containsKey(client.getClientId())) { if (clientId2Subscription.containsKey(client.getClientId())) {
subscription = clientId2Subscription.get(client.getClientId()); subscription = clientId2Subscription.get(client.getClientId());
...@@ -128,7 +131,25 @@ public class MqttSubscribeMessageHandler implements MessageHandler { ...@@ -128,7 +131,25 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
grantQoss.add(actualQos); grantQoss.add(actualQos);
SubscriptionData subscriptionData = new MqttSubscriptionData(mqttTopicSubscription.qualityOfService().value(), client.getClientId(), mqttTopicSubscription.topicName()); SubscriptionData subscriptionData = new MqttSubscriptionData(mqttTopicSubscription.qualityOfService().value(), client.getClientId(), mqttTopicSubscription.topicName());
subscriptionDatas.put(mqttTopicSubscription.topicName(), subscriptionData); subscriptionDatas.put(mqttTopicSubscription.topicName(), subscriptionData);
//2.register topic2SubscriptionTable //2.update topic2SubscriptionTable
String rootTopic = MqttUtil.getRootTopic(mqttTopicSubscription.topicName());
ConcurrentHashMap<Client, Set<SubscriptionData>> client2SubscriptionData = topic2SubscriptionTable.get(rootTopic);
if (client2SubscriptionData == null) {
client2SubscriptionData = new ConcurrentHashMap<>();
ConcurrentHashMap<Client, Set<SubscriptionData>> prev = topic2SubscriptionTable.putIfAbsent(rootTopic, client2SubscriptionData);
if (prev != null) {
client2SubscriptionData = prev;
}
Set<SubscriptionData> subscriptionDataSet = client2SubscriptionData.get(client);
if (subscriptionDataSet == null) {
subscriptionDataSet = new HashSet<>();
Set<SubscriptionData> prevSubscriptionDataSet = client2SubscriptionData.putIfAbsent(client, subscriptionDataSet);
if (prevSubscriptionDataSet != null) {
subscriptionDataSet = prevSubscriptionDataSet;
}
subscriptionDataSet.add(subscriptionData);
}
}
} }
return grantQoss; return grantQoss;
} }
......
...@@ -17,34 +17,109 @@ ...@@ -17,34 +17,109 @@
package org.apache.rocketmq.snode.processor.mqtthandler; package org.apache.rocketmq.snode.processor.mqtthandler;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
import org.apache.rocketmq.snode.client.impl.Subscription;
import org.apache.rocketmq.snode.exception.WrongMessageTypeException;
import org.apache.rocketmq.snode.util.MqttUtil;
/** /**
* handle the UNSUBSCRIBE message from the client * handle the UNSUBSCRIBE message from the client
* <ol>
* <li>extract topic filters to be un-subscribed</li>
* <li>get the topics matching with the topic filters</li>
* <li>verify the authorization of the client to the </li>
* <li>remove subscription from the SubscriptionStore</li>
* </ol>
*/ */
public class MqttUnsubscribeMessagHandler implements MessageHandler { public class MqttUnsubscribeMessagHandler implements MessageHandler {
/* private SubscriptionStore subscriptionStore; private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
public MqttUnsubscribeMessagHandler(SubscriptionStore subscriptionStore) {
this.subscriptionStore = subscriptionStore;
}*/
private final SnodeController snodeController; private final SnodeController snodeController;
public MqttUnsubscribeMessagHandler(SnodeController snodeController) { public MqttUnsubscribeMessagHandler(SnodeController snodeController) {
this.snodeController = snodeController; this.snodeController = snodeController;
} }
@Override @Override
public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) { public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null; if (!(message instanceof MqttUnsubscribeMessage)) {
log.error("Wrong message type! Expected type: UNSUBSCRIBE but {} was received. MqttMessage={}", message.fixedHeader().messageType(), message.toString());
throw new WrongMessageTypeException("Wrong message type exception.");
}
MqttUnsubscribeMessage unsubscribeMessage = (MqttUnsubscribeMessage) message;
MqttFixedHeader fixedHeader = unsubscribeMessage.fixedHeader();
if (fixedHeader.isDup() || !fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE) || fixedHeader.isRetain()) {
log.error("Malformed value of reserved bits(bits 3,2,1,0) of fixed header. Expected=0010, received={}{}{}{}", fixedHeader.isDup() ? 1 : 0, Integer.toBinaryString(fixedHeader.qosLevel().value()), fixedHeader.isRetain() ? 1 : 0);
remotingChannel.close();
return null;
}
MqttUnsubscribePayload payload = unsubscribeMessage.payload();
if (payload.topics() == null || payload.topics().size() == 0) {
log.error("The payload of a UNSUBSCRIBE packet MUST contain at least one Topic Filter. This will be treated as protocol violation and the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
remotingChannel.close();
return null;
}
IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager();
Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
if (client == null) {
log.error("Can't find associated client, the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
remotingChannel.close();
return null;
}
RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class);
MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader();
mqttHeader.setMessageType(MqttMessageType.SUBACK.value());
mqttHeader.setDup(false);
mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
mqttHeader.setRetain(false);
mqttHeader.setMessageId(unsubscribeMessage.variableHeader().messageId());
doUnsubscribe(client, payload.topics(), iotClientManager);
mqttHeader.setRemainingLength(0x02);
command.setRemark(null);
command.setCode(ResponseCode.SUCCESS);
return command;
}
private void doUnsubscribe(Client client, List<String> topics, IOTClientManagerImpl iotClientManager) {
ConcurrentHashMap<String, Subscription> clientId2Subscription = iotClientManager.getClientId2Subscription();
ConcurrentHashMap<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable();
for (String topicFilter : topics) {
//1.update clientId2Subscription
if (clientId2Subscription.containsKey(client.getClientId())) {
Subscription subscription = clientId2Subscription.get(client.getClientId());
subscription.getSubscriptionTable().remove(topicFilter);
}
//2.update topic2SubscriptionTable
String rootTopic = MqttUtil.getRootTopic(topicFilter);
ConcurrentHashMap<Client, Set<SubscriptionData>> client2SubscriptionData = topic2SubscriptionTable.get(rootTopic);
if (client2SubscriptionData != null) {
Set<SubscriptionData> subscriptionDataSet = client2SubscriptionData.get(client);
if (subscriptionDataSet != null) {
Iterator<SubscriptionData> iterator = subscriptionDataSet.iterator();
while (iterator.hasNext()) {
if (iterator.next().getTopic().equals(topicFilter))
iterator.remove();
}
}
}
}
} }
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.service.impl;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.util.ReferenceCountUtil;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
import org.apache.rocketmq.snode.constant.SnodeConstant;
import org.apache.rocketmq.snode.util.MqttUtil;
public class MqttPushServiceImpl {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private SnodeController snodeController;
private ExecutorService pushMqttMessageExecutorService;
public MqttPushServiceImpl(final SnodeController snodeController) {
this.snodeController = snodeController;
pushMqttMessageExecutorService = ThreadUtils.newThreadPoolExecutor(
this.snodeController.getSnodeConfig().getSnodePushMqttMessageMinPoolSize(),
this.snodeController.getSnodeConfig().getSnodePushMqttMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(this.snodeController.getSnodeConfig().getSnodePushMqttMessageThreadPoolQueueCapacity()),
"SnodePushMqttMessageThread",
false);
}
public class MqttPushTask implements Runnable {
private AtomicBoolean canceled = new AtomicBoolean(false);
private final ByteBuf message;
private final String topic;
private final Integer qos;
private boolean retain;
private Integer packetId;
public MqttPushTask(final String topic, final ByteBuf message, final Integer qos, boolean retain,
Integer packetId) {
this.message = message;
this.topic = topic;
this.qos = qos;
this.retain = retain;
this.packetId = packetId;
}
@Override
public void run() {
if (!canceled.get()) {
try {
RemotingCommand requestCommand = buildRequestCommand(topic, qos, retain, packetId);
//find those clients publishing the message to
IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager();
ConcurrentHashMap<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable();
Set<Client> clients = new HashSet<>();
if (topic2SubscriptionTable.containsKey(MqttUtil.getRootTopic(topic))) {
ConcurrentHashMap<Client, Set<SubscriptionData>> client2SubscriptionDatas = topic2SubscriptionTable.get(MqttUtil.getRootTopic(topic));
for (Map.Entry<Client, Set<SubscriptionData>> entry : client2SubscriptionDatas.entrySet()) {
Set<SubscriptionData> subscriptionDatas = entry.getValue();
for (SubscriptionData subscriptionData : subscriptionDatas) {
if (MqttUtil.isMatch(subscriptionData.getTopic(), topic)) {
clients.add(entry.getKey());
break;
}
}
}
}
for (Client client : clients) {
RemotingChannel remotingChannel = client.getRemotingChannel();
if (client.getRemotingChannel() instanceof NettyChannelHandlerContextImpl) {
remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) client.getRemotingChannel()).getChannelHandlerContext().channel());
}
requestCommand.setPayload(message.copy());
snodeController.getMqttRemotingServer().push(remotingChannel, requestCommand, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
}
} catch (Exception ex) {
log.warn("Exception was thrown when pushing MQTT message to topic: {}, exception={}", topic, ex.getMessage());
}finally {
System.out.println("Release Bytebuf");
ReferenceCountUtil.release(message);
}
} else {
log.info("Push message to topic: {} canceled!", topic);
}
}
private RemotingCommand buildRequestCommand(final String topic, final Integer qos, boolean retain,
Integer packetId) {
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setMessageType(MqttMessageType.PUBLISH.value());
if (qos == 0) {
mqttHeader.setDup(false);//DUP is always 0 for qos=0 messages
} else {
mqttHeader.setDup(false);//DUP is depending on whether it is a re-delivery of an earlier attempt.
}
mqttHeader.setQosLevel(qos);
mqttHeader.setRetain(retain);
mqttHeader.setPacketId(packetId);
mqttHeader.setTopicName(topic);
mqttHeader.setRemainingLength(4 + topic.getBytes().length + message.readableBytes());
RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader);
// pushMessage.setPayload(message);
return pushMessage;
}
public void setCanceled(AtomicBoolean canceled) {
this.canceled = canceled;
}
}
public void pushMessageQos0(final String topic, final ByteBuf message) {
MqttPushTask pushTask = new MqttPushTask(topic, message, 0, false, 0);
pushMqttMessageExecutorService.submit(pushTask);
}
public void pushMessageQos1(final String topic, final ByteBuf message, final Integer qos, boolean retain,
Integer packetId) {
MqttPushTask pushTask = new MqttPushTask(topic, message, qos, retain, packetId);
pushMqttMessageExecutorService.submit(pushTask);
}
public void shutdown() {
this.pushMqttMessageExecutorService.shutdown();
}
}
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.rocketmq.snode.util; package org.apache.rocketmq.snode.util;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.UUID; import java.util.UUID;
import org.apache.rocketmq.snode.constant.MqttConstant; import org.apache.rocketmq.snode.constant.MqttConstant;
...@@ -33,4 +34,33 @@ public class MqttUtil { ...@@ -33,4 +34,33 @@ public class MqttUtil {
public static int actualQos(int qos) { public static int actualQos(int qos) {
return Math.min(MqttConstant.MAX_SUPPORTED_QOS, qos); return Math.min(MqttConstant.MAX_SUPPORTED_QOS, qos);
} }
public static boolean isQosLegal(MqttQoS qos) {
if (!qos.equals(MqttQoS.AT_LEAST_ONCE) && !qos.equals(MqttQoS.AT_MOST_ONCE) && !qos.equals(MqttQoS.EXACTLY_ONCE)) {
return false;
}
return false;
}
public static boolean isMatch(String topicFiter, String topic) {
if (!topicFiter.contains(MqttConstant.SUBSCRIPTION_FLAG_PLUS) && !topicFiter.contains(MqttConstant.SUBSCRIPTION_FLAG_SHARP)) {
return topicFiter.equals(topic);
}
String[] filterTopics = topicFiter.split(MqttConstant.SUBSCRIPTION_SEPARATOR);
String[] actualTopics = topic.split(MqttConstant.SUBSCRIPTION_SEPARATOR);
int i = 0;
for (; i < filterTopics.length && i < actualTopics.length; i++) {
if (MqttConstant.SUBSCRIPTION_FLAG_PLUS.equals(filterTopics[i])) {
continue;
}
if (MqttConstant.SUBSCRIPTION_FLAG_SHARP.equals(filterTopics[i])) {
return true;
}
if (!filterTopics[i].equals(actualTopics[i])) {
return false;
}
}
return i == actualTopics.length;
}
} }
...@@ -16,23 +16,18 @@ ...@@ -16,23 +16,18 @@
*/ */
package org.apache.rocketmq.snode.processor; package org.apache.rocketmq.snode.processor;
import com.alibaba.fastjson.JSON;
import io.netty.handler.codec.mqtt.MqttConnectPayload; import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
...@@ -93,16 +88,7 @@ public class DefaultMqttMessageProcessorTest { ...@@ -93,16 +88,7 @@ public class DefaultMqttMessageProcessorTest {
MqttHeader mqttHeader = createMqttConnectMesssageHeader(); MqttHeader mqttHeader = createMqttConnectMesssageHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader);
MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes()); MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes());
request.setBody(RocketMQMqttConnectPayload.fromMqttConnectPayload(payload).encode()); request.setPayload(payload);
CodecHelper.makeCustomHeaderToNet(request);
return request; return request;
} }
private byte[] encode(Object obj) {
String json = JSON.toJSONString(obj, false);
if (json != null) {
return json.getBytes(Charset.forName(RemotingUtil.REMOTING_CHARSET));
}
return null;
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册