DefaultMqttClientProcessor.java 13.9 KB
Newer Older
1
/*
2
 * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package net.dreamlu.iot.mqtt.core.client;
如梦技术's avatar
如梦技术 已提交
18 19

import net.dreamlu.iot.mqtt.codec.*;
如梦技术's avatar
如梦技术 已提交
20
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
21
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
22 23
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
如梦技术's avatar
如梦技术 已提交
24
import org.tio.core.ChannelContext;
25
import org.tio.core.Node;
26
import org.tio.core.Tio;
浅梦2013's avatar
浅梦2013 已提交
27 28
import org.tio.utils.hutool.CollUtil;
import org.tio.utils.timer.TimerTaskService;
如梦技术's avatar
如梦技术 已提交
29

30
import java.util.ArrayList;
如梦技术's avatar
如梦技术 已提交
31
import java.util.List;
浅梦2013's avatar
浅梦2013 已提交
32
import java.util.concurrent.ExecutorService;
33
import java.util.stream.Collectors;
如梦技术's avatar
如梦技术 已提交
34 35

/**
36
 * 默认的 mqtt 消息处理器
如梦技术's avatar
如梦技术 已提交
37 38 39
 *
 * @author L.cm
 */
如梦技术's avatar
如梦技术 已提交
40
public class DefaultMqttClientProcessor implements IMqttClientProcessor {
41
	private static final Logger logger = LoggerFactory.getLogger(DefaultMqttClientProcessor.class);
42
	private final int reSubscribeBatchSize;
43
	private final IMqttClientSession clientSession;
44
	private final IMqttClientConnectListener connectListener;
45
	private final IMqttClientMessageIdGenerator messageIdGenerator;
浅梦2013's avatar
浅梦2013 已提交
46
	private final TimerTaskService taskService;
浅梦2013's avatar
浅梦2013 已提交
47
	private final ExecutorService executor;
48

49
	public DefaultMqttClientProcessor(MqttClientCreator mqttClientCreator) {
50
		this.reSubscribeBatchSize = mqttClientCreator.getReSubscribeBatchSize();
51
		this.clientSession = mqttClientCreator.getClientSession();
52
		this.connectListener = mqttClientCreator.getConnectListener();
53
		this.messageIdGenerator = mqttClientCreator.getMessageIdGenerator();
浅梦2013's avatar
浅梦2013 已提交
54
		this.taskService = mqttClientCreator.getTaskService();
如梦技术's avatar
如梦技术 已提交
55
		this.executor = mqttClientCreator.getMqttExecutor();
如梦技术's avatar
如梦技术 已提交
56 57 58 59 60 61
	}

	@Override
	public void processDecodeFailure(ChannelContext context, MqttMessage message, Throwable ex) {
		// 客户端失败,默认记录异常日志
		logger.error(ex.getMessage(), ex);
62
	}
如梦技术's avatar
如梦技术 已提交
63 64 65

	@Override
	public void processConAck(ChannelContext context, MqttConnAckMessage message) {
66
		MqttConnAckVariableHeader connAckVariableHeader = message.variableHeader();
67
		MqttConnectReasonCode returnCode = connAckVariableHeader.connectReturnCode();
如梦技术's avatar
如梦技术 已提交
68
		switch (returnCode) {
69
			case CONNECTION_ACCEPTED:
70
				// 1. 连接成功的日志
71 72 73 74
				if (logger.isInfoEnabled()) {
					Node node = context.getServerNode();
					logger.info("MqttClient contextId:{} connection:{}:{} succeeded!", context.getId(), node.getIp(), node.getPort());
				}
75 76
				// 2. 发布连接通知
				publishConnectEvent(context);
77 78
				// 3. 如果 session 不存在重连时发送重新订阅,更改 ip、端口之后需要重新发送订阅
				if (!connAckVariableHeader.isSessionPresent() || MqttClient.isNeedReSub(context)) {
79 80
					reSendSubscription(context);
				}
81 82 83 84 85 86 87
				break;
			case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD:
			case CONNECTION_REFUSED_IDENTIFIER_REJECTED:
			case CONNECTION_REFUSED_NOT_AUTHORIZED:
			case CONNECTION_REFUSED_SERVER_UNAVAILABLE:
			case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION:
			default:
如梦技术's avatar
如梦技术 已提交
88
				String remark = "MqttClient connect error error ReturnCode:" + returnCode;
89
				Tio.close(context, remark);
如梦技术's avatar
如梦技术 已提交
90
				break;
91
		}
如梦技术's avatar
如梦技术 已提交
92 93
	}

94 95 96 97 98 99 100 101 102 103
	/**
	 * 发布连接成功事件
	 *
	 * @param context ChannelContext
	 */
	private void publishConnectEvent(ChannelContext context) {
		// 先判断是否配置监听
		if (connectListener == null) {
			return;
		}
104 105 106 107 108 109 110 111
		// 触发客户端连接事件
		executor.submit(() -> {
			try {
				connectListener.onConnected(context, context.isReconnect);
			} catch (Throwable e) {
				logger.error(e.getMessage(), e);
			}
		});
112 113 114 115 116 117 118
	}

	/**
	 * 批量重新订阅
	 *
	 * @param context ChannelContext
	 */
119
	private void reSendSubscription(ChannelContext context) {
120 121 122 123
		List<MqttClientSubscription> reSubscriptionList = clientSession.getAndCleanSubscription();
		// 1. 判断是否为空
		if (reSubscriptionList.isEmpty()) {
			return;
124
		}
125 126 127 128 129 130 131 132 133 134 135 136 137
		// 2. 订阅的数量
		int subscribedSize = reSubscriptionList.size();
		if (subscribedSize <= reSubscribeBatchSize) {
			reSendSubscription(context, reSubscriptionList);
		} else {
			List<List<MqttClientSubscription>> partitionList = CollUtil.partition(reSubscriptionList, reSubscribeBatchSize);
			for (List<MqttClientSubscription> partition : partitionList) {
				reSendSubscription(context, partition);
			}
		}
	}

	/**
138
	 * 批量重新订阅
139 140 141 142 143
	 *
	 * @param context            ChannelContext
	 * @param reSubscriptionList reSubscriptionList
	 */
	private void reSendSubscription(ChannelContext context, List<MqttClientSubscription> reSubscriptionList) {
144
		// 2. 批量重新订阅
145
		List<MqttTopicSubscription> topicSubscriptionList = reSubscriptionList.stream().map(MqttClientSubscription::toTopicSubscription).collect(Collectors.toList());
146
		int messageId = messageIdGenerator.getId();
147
		MqttSubscribeMessage message = MqttMessageBuilders.subscribe().addSubscriptions(topicSubscriptionList).messageId(messageId).build();
148 149 150
		MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(reSubscriptionList, message);
		Boolean result = Tio.send(context, message);
		logger.info("MQTT subscriptionList:{} messageId:{} resubscribing result:{}", reSubscriptionList, messageId, result);
浅梦2013's avatar
浅梦2013 已提交
151
		pendingSubscription.startRetransmitTimer(taskService, (msg) -> Tio.send(context, msg));
152
		clientSession.addPaddingSubscribe(messageId, pendingSubscription);
153 154
	}

如梦技术's avatar
如梦技术 已提交
155
	@Override
156
	public void processSubAck(ChannelContext context, MqttSubAckMessage message) {
如梦技术's avatar
如梦技术 已提交
157
		int messageId = message.variableHeader().messageId();
如梦技术's avatar
如梦技术 已提交
158
		logger.debug("MqttClient SubAck messageId:{}", messageId);
159
		MqttPendingSubscription paddingSubscribe = clientSession.getPaddingSubscribe(messageId);
如梦技术's avatar
如梦技术 已提交
160 161 162
		if (paddingSubscribe == null) {
			return;
		}
163
		List<MqttClientSubscription> subscriptionList = paddingSubscribe.getSubscriptionList();
164
		MqttSubAckPayload subAckPayload = message.payload();
165
		List<Integer> reasonCodeList = subAckPayload.reasonCodes();
166
		// reasonCodes 为空
167 168
		if (reasonCodeList.isEmpty()) {
			logger.error("MqttClient subscriptionList:{} subscribe failed reasonCodes is empty messageId:{}", subscriptionList, messageId);
169 170
			return;
		}
171 172 173 174
		// 找出订阅成功的数据
		List<MqttClientSubscription> subscribedList = new ArrayList<>();
		for (int i = 0; i < subscriptionList.size(); i++) {
			MqttClientSubscription subscription = subscriptionList.get(i);
175
			String topicFilter = subscription.getTopicFilter();
176 177 178 179 180 181
			Integer reasonCode = reasonCodeList.get(i);
			// reasonCodes 范围
			if (reasonCode == null || reasonCode < 0 || reasonCode > 2) {
				logger.error("MqttClient topicFilter:{} subscribe failed reasonCodes:{} messageId:{}", topicFilter, reasonCode, messageId);
			} else {
				subscribedList.add(subscription);
182
			}
183
		}
184
		logger.info("MQTT subscriptionList:{} subscribed successfully messageId:{}", subscribedList, messageId);
如梦技术's avatar
如梦技术 已提交
185
		paddingSubscribe.onSubAckReceived();
186
		clientSession.removePaddingSubscribe(messageId);
187
		clientSession.addSubscriptionList(subscribedList);
188 189
		// 触发已经监听的事件
		subscribedList.forEach(clientSubscription -> {
浅梦2013's avatar
浅梦2013 已提交
190 191
			String topicFilter = clientSubscription.getTopicFilter();
			MqttQoS mqttQoS = clientSubscription.getMqttQoS();
192
			IMqttClientMessageListener subscriptionListener = clientSubscription.getListener();
浅梦2013's avatar
浅梦2013 已提交
193 194
			executor.execute(() -> {
				try {
如梦技术's avatar
如梦技术 已提交
195
					subscriptionListener.onSubscribed(context, topicFilter, mqttQoS, message);
浅梦2013's avatar
浅梦2013 已提交
196 197 198 199
				} catch (Throwable e) {
					logger.error("MQTT topicFilter:{} subscribed onSubscribed event error.", subscribedList, e);
				}
			});
200
		});
如梦技术's avatar
如梦技术 已提交
201 202 203 204
	}

	@Override
	public void processPublish(ChannelContext context, MqttPublishMessage message) {
如梦技术's avatar
如梦技术 已提交
205
		MqttFixedHeader mqttFixedHeader = message.fixedHeader();
如梦技术's avatar
如梦技术 已提交
206 207 208 209
		MqttPublishVariableHeader variableHeader = message.variableHeader();
		String topicName = variableHeader.topicName();
		MqttQoS mqttQoS = mqttFixedHeader.qosLevel();
		int packetId = variableHeader.packetId();
如梦技术's avatar
如梦技术 已提交
210
		logger.debug("MqttClient received publish topic:{} qoS:{} packetId:{}", topicName, mqttQoS, packetId);
如梦技术's avatar
如梦技术 已提交
211 212
		switch (mqttFixedHeader.qosLevel()) {
			case AT_MOST_ONCE:
213
				invokeListenerForPublish(context, topicName, message);
如梦技术's avatar
如梦技术 已提交
214 215
				break;
			case AT_LEAST_ONCE:
216
				invokeListenerForPublish(context, topicName, message);
如梦技术's avatar
如梦技术 已提交
217
				if (packetId != -1) {
218
					MqttMessage messageAck = MqttMessageBuilders.pubAck().packetId(packetId).build();
如梦技术's avatar
如梦技术 已提交
219 220
					Tio.send(context, messageAck);
				}
如梦技术's avatar
如梦技术 已提交
221 222
				break;
			case EXACTLY_ONCE:
如梦技术's avatar
如梦技术 已提交
223 224 225
				if (packetId != -1) {
					MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
					MqttMessage pubRecMessage = new MqttMessage(fixedHeader, MqttMessageIdVariableHeader.from(packetId));
226 227
					Boolean resultPubRec = Tio.send(context, pubRecMessage);
					logger.debug("Publish - PubRec send topicName:{} mqttQoS:{} packetId:{} result:{}", topicName, mqttQoS, packetId, resultPubRec);
如梦技术's avatar
如梦技术 已提交
228
					MqttPendingQos2Publish pendingQos2Publish = new MqttPendingQos2Publish(message, pubRecMessage);
229
					clientSession.addPendingQos2Publish(packetId, pendingQos2Publish);
浅梦2013's avatar
浅梦2013 已提交
230
					pendingQos2Publish.startPubRecRetransmitTimer(taskService, msg -> Tio.send(context, msg));
如梦技术's avatar
如梦技术 已提交
231
				}
如梦技术's avatar
如梦技术 已提交
232 233 234
				break;
			case FAILURE:
			default:
如梦技术's avatar
如梦技术 已提交
235 236 237 238 239
		}
	}

	@Override
	public void processUnSubAck(MqttUnsubAckMessage message) {
如梦技术's avatar
如梦技术 已提交
240
		int messageId = message.variableHeader().messageId();
如梦技术's avatar
如梦技术 已提交
241
		logger.debug("MqttClient UnSubAck messageId:{}", messageId);
242
		MqttPendingUnSubscription pendingUnSubscription = clientSession.getPaddingUnSubscribe(messageId);
如梦技术's avatar
如梦技术 已提交
243
		if (pendingUnSubscription == null) {
如梦技术's avatar
如梦技术 已提交
244 245
			return;
		}
246
		List<String> unSubscriptionTopics = pendingUnSubscription.getTopics();
247
		if (logger.isInfoEnabled()) {
248
			logger.info("MQTT Topic:{} successfully unSubscribed messageId:{}", unSubscriptionTopics, messageId);
249
		}
如梦技术's avatar
如梦技术 已提交
250
		pendingUnSubscription.onUnSubAckReceived();
251
		clientSession.removePaddingUnSubscribe(messageId);
252
		clientSession.removeSubscriptions(unSubscriptionTopics);
如梦技术's avatar
如梦技术 已提交
253 254 255 256
	}

	@Override
	public void processPubAck(MqttPubAckMessage message) {
如梦技术's avatar
如梦技术 已提交
257
		int messageId = message.variableHeader().messageId();
如梦技术's avatar
如梦技术 已提交
258
		logger.debug("MqttClient PubAck messageId:{}", messageId);
259
		MqttPendingPublish pendingPublish = clientSession.getPendingPublish(messageId);
如梦技术's avatar
如梦技术 已提交
260 261 262
		if (pendingPublish == null) {
			return;
		}
263 264 265 266
		if (logger.isInfoEnabled()) {
			String topicName = pendingPublish.getMessage().variableHeader().topicName();
			logger.info("MQTT Topic:{} successfully PubAck messageId:{}", topicName, messageId);
		}
如梦技术's avatar
如梦技术 已提交
267
		pendingPublish.onPubAckReceived();
268
		clientSession.removePendingPublish(messageId);
如梦技术's avatar
如梦技术 已提交
269 270 271 272
	}

	@Override
	public void processPubRec(ChannelContext context, MqttMessage message) {
如梦技术's avatar
如梦技术 已提交
273 274
		int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
		logger.debug("MqttClient PubRec messageId:{}", messageId);
275
		MqttPendingPublish pendingPublish = clientSession.getPendingPublish(messageId);
浅梦2013's avatar
浅梦2013 已提交
276 277 278
		if (pendingPublish == null) {
			return;
		}
如梦技术's avatar
如梦技术 已提交
279 280 281 282 283 284 285 286
		pendingPublish.onPubAckReceived();

		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
		MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
		MqttMessage pubRelMessage = new MqttMessage(fixedHeader, variableHeader);
		Tio.send(context, pubRelMessage);

		pendingPublish.setPubRelMessage(pubRelMessage);
浅梦2013's avatar
浅梦2013 已提交
287
		pendingPublish.startPubRelRetransmissionTimer(taskService, (msg) -> Tio.send(context, msg));
如梦技术's avatar
如梦技术 已提交
288 289 290 291
	}

	@Override
	public void processPubRel(ChannelContext context, MqttMessage message) {
如梦技术's avatar
如梦技术 已提交
292 293
		int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
		logger.debug("MqttClient PubRel messageId:{}", messageId);
294
		MqttPendingQos2Publish pendingQos2Publish = clientSession.getPendingQos2Publish(messageId);
如梦技术's avatar
如梦技术 已提交
295 296 297
		if (pendingQos2Publish != null) {
			MqttPublishMessage incomingPublish = pendingQos2Publish.getIncomingPublish();
			String topicName = incomingPublish.variableHeader().topicName();
298
			this.invokeListenerForPublish(context, topicName, incomingPublish);
如梦技术's avatar
如梦技术 已提交
299
			pendingQos2Publish.onPubRelReceived();
300
			clientSession.removePendingQos2Publish(messageId);
如梦技术's avatar
如梦技术 已提交
301 302 303 304
		}
		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
		MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
		Tio.send(context, new MqttMessage(fixedHeader, variableHeader));
如梦技术's avatar
如梦技术 已提交
305 306 307 308
	}

	@Override
	public void processPubComp(MqttMessage message) {
如梦技术's avatar
如梦技术 已提交
309
		int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
310
		MqttPendingPublish pendingPublish = clientSession.getPendingPublish(messageId);
311 312 313
		if (pendingPublish == null) {
			return;
		}
314 315
		if (logger.isInfoEnabled()) {
			String topicName = pendingPublish.getMessage().variableHeader().topicName();
316
			logger.info("MQTT Topic:{} successfully PubComp", topicName);
317
		}
如梦技术's avatar
如梦技术 已提交
318
		pendingPublish.onPubCompReceived();
319
		clientSession.removePendingPublish(messageId);
如梦技术's avatar
如梦技术 已提交
320 321 322 323 324
	}

	/**
	 * 处理订阅的消息
	 *
325
	 * @param context   ChannelContext
如梦技术's avatar
如梦技术 已提交
326 327 328
	 * @param topicName topicName
	 * @param message   MqttPublishMessage
	 */
329
	private void invokeListenerForPublish(ChannelContext context, String topicName, MqttPublishMessage message) {
330
		List<MqttClientSubscription> subscriptionList = clientSession.getMatchedSubscription(topicName);
如梦技术's avatar
如梦技术 已提交
331 332 333
		if (subscriptionList.isEmpty()) {
			logger.warn("Mqtt message to accept topic:{} subscriptionList is empty.", topicName);
		} else {
334
			final byte[] payload = message.payload();
如梦技术's avatar
如梦技术 已提交
335 336
			subscriptionList.forEach(subscription -> {
				IMqttClientMessageListener listener = subscription.getListener();
如梦技术's avatar
如梦技术 已提交
337 338
				executor.submit(() -> {
					try {
如梦技术's avatar
如梦技术 已提交
339
						listener.onMessage(context, topicName, message, payload);
如梦技术's avatar
如梦技术 已提交
340 341 342 343
					} catch (Throwable e) {
						logger.error(e.getMessage(), e);
					}
				});
如梦技术's avatar
如梦技术 已提交
344 345
			});
		}
如梦技术's avatar
如梦技术 已提交
346
	}
347

如梦技术's avatar
如梦技术 已提交
348
}