提交 f8da6ffa 编写于 作者: C chengxiangwang

optimize Client of IOT; add in-flight window

上级 e010ca74
......@@ -39,13 +39,9 @@ public class Client {
private LanguageCode language;
private boolean isConnected;
private boolean cleanSession;
private String snodeAddress;
private boolean willFlag;
private String snodeAddress;
public ClientRole getClientRole() {
return clientRole;
......@@ -70,16 +66,13 @@ public class Client {
Objects.equals(groups, client.groups) &&
Objects.equals(remotingChannel, client.remotingChannel) &&
language == client.language &&
isConnected == client.isConnected &&
cleanSession == client.cleanSession &&
willFlag == client.willFlag &&
snodeAddress == client.snodeAddress;
}
@Override
public int hashCode() {
return Objects.hash(clientRole, clientId, groups, remotingChannel, heartbeatInterval,
lastUpdateTimestamp, version, language, isConnected, cleanSession, willFlag, snodeAddress);
lastUpdateTimestamp, version, language, snodeAddress);
}
public RemotingChannel getRemotingChannel() {
......@@ -130,30 +123,6 @@ public class Client {
this.language = language;
}
public boolean isConnected() {
return isConnected;
}
public void setConnected(boolean connected) {
isConnected = connected;
}
public boolean isCleanSession() {
return cleanSession;
}
public void setCleanSession(boolean cleanSession) {
this.cleanSession = cleanSession;
}
public boolean isWillFlag() {
return willFlag;
}
public void setWillFlag(boolean willFlag) {
this.willFlag = willFlag;
}
public String getSnodeAddress() {
return snodeAddress;
}
......@@ -181,9 +150,6 @@ public class Client {
", lastUpdateTimestamp=" + lastUpdateTimestamp +
", version=" + version +
", language=" + language +
", isConnected=" + isConnected +
", cleanSession=" + cleanSession +
", willFlag=" + willFlag +
", snodeAddress=" + snodeAddress +
'}';
}
......
......@@ -17,8 +17,8 @@
package org.apache.rocketmq.mqtt.client;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
......@@ -37,8 +37,6 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
public static final String IOT_GROUP = "IOT_GROUP";
// private final ConcurrentHashMap<String/*root topic*/, ConcurrentHashMap<Client, Set<SubscriptionData>>> topic2SubscriptionTable = new ConcurrentHashMap<>(
// 1024);
private final ConcurrentHashMap<String/*root topic*/, Set<Client>> topic2Clients = new ConcurrentHashMap<>(
1024);
private final ConcurrentHashMap<String/*clientId*/, Subscription> clientId2Subscription = new ConcurrentHashMap<>(1024);
......@@ -47,10 +45,6 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
public IOTClientManagerImpl() {
}
public void onUnsubscribe(Client client, List<String> topics) {
//do the logic when client sends unsubscribe packet.
}
@Override public void onClose(Set<String> groups, RemotingChannel remotingChannel) {
for (String groupId : groups) {
//remove client after invoking onClosed method(client may be used in onClosed)
......@@ -63,9 +57,12 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
public void onClosed(String group, RemotingChannel remotingChannel) {
//do the logic when connection is closed by any reason.
//step1. Clean subscription data if cleanSession=1
Client client = this.getClient(IOT_GROUP, remotingChannel);
MQTTSession client = (MQTTSession) this.getClient(IOT_GROUP, remotingChannel);
if (client.isCleanSession()) {
cleanSessionState(client.getClientId());
} else {
client.setConnected(false);
//TODO update persistent store
}
//step2. Publish will message associated with current connection(Question: Does will message need to be deleted after publishing.)
......@@ -82,42 +79,30 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
}
public void cleanSessionState(String clientId) {
/* clientId2Subscription.remove(clientId);
for (Iterator<Map.Entry<String, ConcurrentHashMap<Client, Set<SubscriptionData>>>> iterator = topic2SubscriptionTable.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> next = iterator.next();
for (Iterator<Map.Entry<Client, Set<SubscriptionData>>> iterator1 = next.getValue().entrySet().iterator(); iterator1.hasNext(); ) {
Map.Entry<Client, Set<SubscriptionData>> next1 = iterator1.next();
if (!next1.getKey().getClientId().equals(clientId)) {
continue;
}
iterator1.remove();
}
if (next.getValue() == null || next.getValue().size() == 0) {
iterator.remove();
}
}*/
Map<String, Set<Client>> toBeRemoveFromPersistentStore = new HashMap<>();
for (Iterator<Map.Entry<String, Set<Client>>> iterator = topic2Clients.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, Set<Client>> next = iterator.next();
Iterator<Client> iterator1 = next.getValue().iterator();
while (iterator1.hasNext()) {
if (iterator1.next().getClientId().equals(clientId)) {
Client client = iterator1.next();
if (client.getClientId().equals(clientId)) {
iterator1.remove();
Set<Client> clients = toBeRemoveFromPersistentStore.getOrDefault((next.getKey()), new HashSet<>());
clients.add(client);
toBeRemoveFromPersistentStore.put(next.getKey(), clients);
}
}
}
//TODO update persistent store base on toBeRemoveFromPersistentStore
clientId2Subscription.remove(clientId);
//remove offline messages
//TODO update persistent store
//TODO remove offline messages
}
public Subscription getSubscriptionByClientId(String clientId) {
return clientId2Subscription.get(clientId);
}
/* public ConcurrentHashMap<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> getTopic2SubscriptionTable() {
return topic2SubscriptionTable;
}*/
public ConcurrentHashMap<String/*root topic*/, Set<Client>> getTopic2Clients() {
return topic2Clients;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.client;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
public class InFlightMessage {
final String topic;
final MqttQoS publishingQos;
final ByteBuf payload;
InFlightMessage(String topic, MqttQoS publishingQos, ByteBuf payload) {
this.topic = topic;
this.publishingQos = publishingQos;
this.payload = payload;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.client;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.mqtt.exception.MqttRuntimeException;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
public class MQTTSession extends Client {
private boolean cleanSession;
private boolean isConnected;
private boolean willFlag;
private final AtomicInteger inflightSlots = new AtomicInteger(10);
private final Map<Integer, InFlightMessage> inflightWindow = new HashMap<>();
private final DelayQueue<InFlightPacket> inflightTimeouts = new DelayQueue<>();
private final AtomicInteger lastPacketId = new AtomicInteger(0);
private Hashtable inUsePacketIds = new Hashtable();
private int nextPacketId = 0;
static class InFlightPacket implements Delayed {
final int packetId;
private long startTime;
InFlightPacket(int packetId, long delayInMilliseconds) {
this.packetId = packetId;
this.startTime = System.currentTimeMillis() + delayInMilliseconds;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
if ((this.startTime - ((InFlightPacket) o).startTime) == 0) {
return 0;
}
if ((this.startTime - ((InFlightPacket) o).startTime) > 0) {
return 1;
} else {
return -1;
}
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Client)) {
return false;
}
Client client = (Client) o;
return Objects.equals(this.getClientId(), client.getClientId());
}
public boolean isConnected() {
return isConnected;
}
public void setConnected(boolean connected) {
isConnected = connected;
}
public boolean isCleanSession() {
return cleanSession;
}
public void setCleanSession(boolean cleanSession) {
this.cleanSession = cleanSession;
}
public boolean isWillFlag() {
return willFlag;
}
public void setWillFlag(boolean willFlag) {
this.willFlag = willFlag;
}
public void pushMessageAtQos(MqttHeader mqttHeader, ByteBuf payload,
DefaultMqttMessageProcessor defaultMqttMessageProcessor) {
if (mqttHeader.getQosLevel() > 0) {
// IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager();
// ConcurrentHashMap<String, Subscription> clientId2Subscription = iotClientManager.getClientId2Subscription();
// Subscription subscription = clientId2Subscription.get(this.getClientId());
// Enumeration<String> topicFilters = subscription.getSubscriptionTable().keys();
// while (topicFilters.hasMoreElements()) {
// String topicFilter = topicFilters.nextElement();
// }
inflightSlots.decrementAndGet();
mqttHeader.setPacketId(getNextPacketId());
inflightWindow.put(mqttHeader.getPacketId(), new InFlightMessage(mqttHeader.getTopicName(), ));
}
defaultMqttMessageProcessor.getMqttPushService().pushMessageQos(mqttHeader, payload, this);
}
private synchronized void releasePacketId(int msgId) {
this.inUsePacketIds.remove(new Integer(msgId));
}
private synchronized int getNextPacketId() {
int startingMessageId = this.nextPacketId;
int loopCount = 0;
do {
++this.nextPacketId;
if (this.nextPacketId > 65535) {
this.nextPacketId = 1;
}
if (this.nextPacketId == startingMessageId) {
++loopCount;
if (loopCount == 2) {
throw new MqttRuntimeException("Could not get available packetId.");
}
}
}
while (this.inUsePacketIds.containsKey(new Integer(this.nextPacketId)));
Integer id = new Integer(this.nextPacketId);
this.inUsePacketIds.put(id, id);
return this.nextPacketId;
}
}
......@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.Subscription;
import org.apache.rocketmq.common.exception.MQClientException;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.client.MQTTSession;
import org.apache.rocketmq.mqtt.util.MqttUtil;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
......@@ -55,12 +56,14 @@ public interface MessageHandler {
if (topic2Clients.containsKey(MqttUtil.getRootTopic(topic))) {
Set<Client> clients = topic2Clients.get(MqttUtil.getRootTopic(topic));
for (Client client : clients) {
Subscription subscription = clientId2Subscription.get(client.getClientId());
Enumeration<String> keys = subscription.getSubscriptionTable().keys();
while (keys.hasMoreElements()) {
String topicFilter = keys.nextElement();
if (MqttUtil.isMatch(topicFilter, topic)) {
clientsTobePush.add(client);
if(((MQTTSession)client).isConnected()) {
Subscription subscription = clientId2Subscription.get(client.getClientId());
Enumeration<String> keys = subscription.getSubscriptionTable().keys();
while (keys.hasMoreElements()) {
String topicFilter = keys.nextElement();
if (MqttUtil.isMatch(topicFilter, topic)) {
clientsTobePush.add(client);
}
}
}
}
......
......@@ -25,7 +25,6 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.HashSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.client.ClientManager;
import org.apache.rocketmq.common.client.ClientRole;
import org.apache.rocketmq.common.client.Subscription;
......@@ -35,6 +34,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.client.MQTTSession;
import org.apache.rocketmq.mqtt.exception.MqttConnectException;
import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
......@@ -117,7 +117,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
}
}
Client client = new Client();
MQTTSession client = new MQTTSession();
client.setClientId(payload.clientIdentifier());
client.setClientRole(ClientRole.IOTCLIENT);
client.setGroups(new HashSet<String>() {
......@@ -169,7 +169,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
private boolean isConnected(RemotingChannel remotingChannel, String clientId) {
ClientManager iotClientManager = defaultMqttMessageProcessor.getIotClientManager();
Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
MQTTSession client = (MQTTSession) iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
if (client != null && client.getClientId().equals(clientId) && client.isConnected()) {
return true;
}
......
......@@ -20,11 +20,11 @@ package org.apache.rocketmq.mqtt.mqtthandler.impl;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.client.MQTTSession;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel;
......@@ -56,7 +56,7 @@ public class MqttDisconnectMessageHandler implements MessageHandler {
}
//discard will message associated with the current connection(client)
Client client = defaultMqttMessageProcessor.getIotClientManager()
MQTTSession client = (MQTTSession)defaultMqttMessageProcessor.getIotClientManager()
.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
if (client != null) {
defaultMqttMessageProcessor.getWillMessageService().deleteWillMessage(client.getClientId());
......
......@@ -17,17 +17,14 @@
package org.apache.rocketmq.mqtt.mqtthandler.impl;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
import org.apache.rocketmq.mqtt.client.MQTTSession;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel;
......@@ -52,7 +49,7 @@ public class MqttPingreqMessageHandler implements MessageHandler {
@Override
public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager();
Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
MQTTSession client = (MQTTSession)iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
log.debug("Handle MQTT client: {} Pingreq.", client.getClientId());
RemotingCommand response = RemotingCommand.createResponseCommand(MqttHeader.class);
if (client != null && client.isConnected()) {
......
......@@ -38,6 +38,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
import org.apache.rocketmq.mqtt.client.MQTTSession;
import org.apache.rocketmq.mqtt.constant.MqttConstant;
import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
......@@ -74,7 +75,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) message;
MqttSubscribePayload payload = mqttSubscribeMessage.payload();
IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager();
Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
MQTTSession client = (MQTTSession)iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
if (client == null) {
log.error("Can't find associated client, the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
remotingChannel.close();
......@@ -98,6 +99,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class);
MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader();
mqttHeader.setMessageType(MqttMessageType.SUBACK.value());
// dup/qos/retain value are always as below of SUBACK
mqttHeader.setDup(false);
mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
mqttHeader.setRetain(false);
......@@ -124,7 +126,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
subscription = clientId2Subscription.get(client.getClientId());
} else {
subscription = new Subscription();
subscription.setCleanSession(client.isCleanSession());
subscription.setCleanSession(((MQTTSession)client).isCleanSession());
}
ConcurrentHashMap<String, SubscriptionData> subscriptionDatas = subscription.getSubscriptionTable();
List<Integer> grantQoss = new ArrayList<>();
......@@ -147,6 +149,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
}
}
}
//TODO update persistent store of topic2Clients and clientId2Subscription
return grantQoss;
}
......
......@@ -84,7 +84,8 @@ public class MqttUnsubscribeMessagHandler implements MessageHandler {
RemotingCommand command = RemotingCommand.createResponseCommand(MqttHeader.class);
MqttHeader mqttHeader = (MqttHeader) command.readCustomHeader();
mqttHeader.setMessageType(MqttMessageType.SUBACK.value());
// dup/qos/retain value are always as below of UNSUBACK
mqttHeader.setMessageType(MqttMessageType.UNSUBACK.value());
mqttHeader.setDup(false);
mqttHeader.setQosLevel(MqttQoS.AT_MOST_ONCE.value());
mqttHeader.setRetain(false);
......@@ -127,5 +128,7 @@ public class MqttUnsubscribeMessagHandler implements MessageHandler {
iterator.remove();
}
}
//TODO update persistent store
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.service;
import io.netty.buffer.ByteBuf;
import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
public interface MqttPushService {
void pushMessageQos(MqttHeader mqttHeader, final ByteBuf message, Client client);
}
......@@ -17,9 +17,7 @@
package org.apache.rocketmq.mqtt.service.impl;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.util.ReferenceCountUtil;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
......@@ -33,13 +31,14 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.mqtt.constant.MqttConstant;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.mqtt.service.MqttPushService;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
public class MqttPushServiceImpl {
public class MqttPushServiceImpl implements MqttPushService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
private ExecutorService pushMqttMessageExecutorService;
......@@ -57,22 +56,23 @@ public class MqttPushServiceImpl {
false);
}
static class MqttPushTask implements Runnable {
public static class MqttPushTask implements Runnable {
private AtomicBoolean canceled = new AtomicBoolean(false);
private final ByteBuf message;
private final String topic;
private final Integer qos;
private boolean retain;
private Integer packetId;
private final MqttHeader mqttHeader;
private Client client;
// private final String topic;
// private final Integer qos;
// private boolean retain;
// private Integer packetId;
public MqttPushTask(final String topic, final ByteBuf message, final Integer qos, boolean retain,
Integer packetId, Client client) {
public MqttPushTask(final MqttHeader mqttHeader, final ByteBuf message, Client client) {
this.message = message;
this.topic = topic;
this.qos = qos;
this.retain = retain;
this.packetId = packetId;
this.mqttHeader = mqttHeader;
// this.topic = topic;
// this.qos = qos;
// this.retain = retain;
// this.packetId = packetId;
this.client = client;
}
......@@ -80,7 +80,7 @@ public class MqttPushServiceImpl {
public void run() {
if (!canceled.get()) {
try {
RemotingCommand requestCommand = buildRequestCommand(topic, qos, retain, packetId);
RemotingCommand requestCommand = buildRequestCommand(this.mqttHeader);
RemotingChannel remotingChannel = client.getRemotingChannel();
if (client.getRemotingChannel() instanceof NettyChannelHandlerContextImpl) {
......@@ -91,29 +91,22 @@ public class MqttPushServiceImpl {
requestCommand.setBody(body);
defaultMqttMessageProcessor.getMqttRemotingServer().push(remotingChannel, requestCommand, MqttConstant.DEFAULT_TIMEOUT_MILLS);
} catch (Exception ex) {
log.warn("Exception was thrown when pushing MQTT message to topic: {}, exception={}", topic, ex.getMessage());
log.warn("Exception was thrown when pushing MQTT message to topic: {}, clientId:{}, exception={}", mqttHeader.getTopicName(), client.getClientId(), ex.getMessage());
} finally {
ReferenceCountUtil.release(message);
}
} else {
log.info("Push message to topic: {} canceled!", topic);
log.info("Push message to topic: {}, clientId:{}, canceled!", mqttHeader.getTopicName(), client.getClientId());
}
}
private RemotingCommand buildRequestCommand(final String topic, final Integer qos, boolean retain,
Integer packetId) {
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setMessageType(MqttMessageType.PUBLISH.value());
if (qos == 0) {
mqttHeader.setDup(false);//DUP is always 0 for qos=0 messages
} else {
mqttHeader.setDup(false);//DUP is depending on whether it is a re-delivery of an earlier attempt.
}
mqttHeader.setQosLevel(qos);
mqttHeader.setRetain(retain);
mqttHeader.setPacketId(packetId);
mqttHeader.setTopicName(topic);
mqttHeader.setRemainingLength(4 + topic.getBytes().length + message.readableBytes());
private RemotingCommand buildRequestCommand(MqttHeader mqttHeader) {
// if (qos == 0) {
// mqttHeader.setDup(false);//DUP is always 0 for qos=0 messages
// } else {
// mqttHeader.setDup(false);//DUP is depending on whether it is a re-delivery of an earlier attempt.
// }
// mqttHeader.setRemainingLength(4 + topic.getBytes().length + message.readableBytes());
RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.MQTT_MESSAGE, mqttHeader);
return pushMessage;
......@@ -125,21 +118,9 @@ public class MqttPushServiceImpl {
}
public void pushMessageQos0(final String topic, final ByteBuf message, Set<Client> clientsTobePublish) {
//For clientIds connected to the current snode
for (Client client : clientsTobePublish) {
MqttPushTask pushTask = new MqttPushTask(topic, message, 0, false, 0, client);
pushMqttMessageExecutorService.submit(pushTask);
}
}
public void pushMessageQos1(final String topic, final ByteBuf message, boolean retain, Integer packetId,
Set<Client> clientsTobePublish) {
for (Client client : clientsTobePublish) {
MqttPushTask pushTask = new MqttPushTask(topic, message, 1, retain, packetId, client);
pushMqttMessageExecutorService.submit(pushTask);
}
public void pushMessageQos(MqttHeader mqttHeader, final ByteBuf message, Client client) {
MqttPushTask pushTask = new MqttPushTask(mqttHeader, message, client);
pushMqttMessageExecutorService.submit(pushTask);
}
public void shutdown() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册