DefaultMqttServerProcessor.java 19.7 KB
Newer Older
1
/*
2
 * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
3 4 5 6 7 8 9 10 11 12 13 14 15 16
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

如梦技术's avatar
如梦技术 已提交
17
package net.dreamlu.iot.mqtt.core.server.support;
18 19

import net.dreamlu.iot.mqtt.codec.*;
如梦技术's avatar
如梦技术 已提交
20 21
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
22 23 24
import net.dreamlu.iot.mqtt.core.server.MqttConst;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.MqttServerProcessor;
如梦技术's avatar
如梦技术 已提交
25
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerAuthHandler;
26
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerPublishPermission;
如梦技术's avatar
如梦技术 已提交
27
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerSubscribeValidator;
28
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerUniqueIdService;
29
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
30
import net.dreamlu.iot.mqtt.core.server.enums.MessageType;
31 32 33
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import net.dreamlu.iot.mqtt.core.server.model.Message;
如梦技术's avatar
如梦技术 已提交
34
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
35
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
36 37 38
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
39
import org.tio.core.Node;
40 41 42
import org.tio.core.Tio;
import org.tio.utils.hutool.StrUtil;

如梦技术's avatar
如梦技术 已提交
43
import java.nio.ByteBuffer;
如梦技术's avatar
如梦技术 已提交
44
import java.util.ArrayList;
45 46 47 48 49 50 51 52
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;

/**
 * mqtt broker 处理器
 *
 * @author L.cm
 */
如梦技术's avatar
如梦技术 已提交
53 54
public class DefaultMqttServerProcessor implements MqttServerProcessor {
	private static final Logger logger = LoggerFactory.getLogger(DefaultMqttServerProcessor.class);
55 56 57 58
	/**
	 * 默认的超时时间
	 */
	private static final long DEFAULT_HEARTBEAT_TIMEOUT = 120_000L;
59 60 61 62 63
	/**
	 * 2 倍客户端 keepAlive 时间
	 */
	private static final long KEEP_ALIVE_UNIT = 2000L;
	private final long heartbeatTimeout;
64
	private final IMqttMessageStore messageStore;
如梦技术's avatar
如梦技术 已提交
65 66
	private final IMqttSessionManager sessionManager;
	private final IMqttServerAuthHandler authHandler;
67
	private final IMqttServerUniqueIdService uniqueIdService;
如梦技术's avatar
如梦技术 已提交
68
	private final IMqttServerSubscribeValidator subscribeValidator;
69
	private final IMqttServerPublishPermission publishPermission;
70
	private final IMqttMessageDispatcher messageDispatcher;
浅梦2013's avatar
浅梦2013 已提交
71
	private final IMqttConnectStatusListener connectStatusListener;
72
	private final IMqttMessageListener messageListener;
73
	private final String nodeName;
74 75
	private final ScheduledThreadPoolExecutor executor;

浅梦2013's avatar
浅梦2013 已提交
76
	public DefaultMqttServerProcessor(MqttServerCreator serverCreator, ScheduledThreadPoolExecutor executor) {
77
		this.heartbeatTimeout = serverCreator.getHeartbeatTimeout() == null ? DEFAULT_HEARTBEAT_TIMEOUT : serverCreator.getHeartbeatTimeout();
浅梦2013's avatar
浅梦2013 已提交
78 79 80
		this.messageStore = serverCreator.getMessageStore();
		this.sessionManager = serverCreator.getSessionManager();
		this.authHandler = serverCreator.getAuthHandler();
81
		this.uniqueIdService = serverCreator.getUniqueIdService();
如梦技术's avatar
如梦技术 已提交
82
		this.subscribeValidator = serverCreator.getSubscribeValidator();
83
		this.publishPermission = serverCreator.getPublishPermission();
浅梦2013's avatar
浅梦2013 已提交
84 85 86
		this.messageDispatcher = serverCreator.getMessageDispatcher();
		this.connectStatusListener = serverCreator.getConnectStatusListener();
		this.messageListener = serverCreator.getMessageListener();
87
		this.nodeName = serverCreator.getNodeName();
88 89 90 91 92 93
		this.executor = executor;
	}

	@Override
	public void processConnect(ChannelContext context, MqttConnectMessage mqttMessage) {
		MqttConnectPayload payload = mqttMessage.payload();
94
		// 参数
95
		String clientId = payload.clientIdentifier();
96
		String userName = payload.userName();
97
		String password = payload.password();
98
		// 1. 获取唯一id,用于 mqtt 内部绑定,部分用户的业务采用 userName 作为唯一id,故抽象之,默认:uniqueId == clientId
99
		String uniqueId = uniqueIdService.getUniqueId(context, clientId, userName, password);
100 101 102
		// 2. 客户端必须提供 uniqueId, 不管 cleanSession 是否为1, 此处没有参考标准协议实现
		if (StrUtil.isBlank(uniqueId)) {
			connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
103 104
			return;
		}
105
		// 3. 认证
106
		if (!authHandler.verifyAuthenticate(context, uniqueId, clientId, userName, password)) {
107
			connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
108 109
			return;
		}
110 111
		// 4. 判断 uniqueId 是否在多个地方使用,如果在其他地方有使用,先解绑
		ChannelContext otherContext = Tio.getByBsId(context.getTioConfig(), uniqueId);
浅梦2013's avatar
浅梦2013 已提交
112 113
		if (otherContext != null) {
			Tio.unbindBsId(otherContext);
114 115
			String remark = String.format("uniqueId:[%s] clientId:[%s] now bind on new context id:[%s]", uniqueId, clientId, context.getId());
			Tio.remove(otherContext, remark);
116
			cleanSession(uniqueId);
浅梦2013's avatar
浅梦2013 已提交
117
		}
118 119
		// 4.5 广播上线消息,避免一个 uniqueId 多个集群服务器中连接。
		sendConnected(context, uniqueId);
120 121
		// 5. 绑定 uniqueId
		Tio.bindBsId(context, uniqueId);
122
		MqttConnectVariableHeader variableHeader = mqttMessage.variableHeader();
123
		// 6. 心跳超时时间,当然这个值如果小于全局配置(默认:120s),定时检查的时间间隔还是以全局为准,只是在判断时用此值
124
		int keepAliveSeconds = variableHeader.keepAliveTimeSeconds();
125 126 127
		// 2倍客户端 keepAlive 时间作为服务端心跳超时时间,如果配置同全局默认不设置,节约内存
		if (keepAliveSeconds > 0 && heartbeatTimeout != keepAliveSeconds * KEEP_ALIVE_UNIT) {
			context.setHeartbeatTimeout(keepAliveSeconds * KEEP_ALIVE_UNIT);
128
		}
129 130
		// 7. session 处理,先默认全部连接关闭时清除,mqtt5 为 CleanStart,
		// 按照 mqtt 协议的规则是下一次连接时清除,emq 是添加了全局 session 超时,关闭时激活 session 有效期倒计时
131
//		boolean cleanSession = variableHeader.isCleanSession();
132 133 134 135 136 137 138
//		if (cleanSession) {
//			// TODO L.cm 考虑 session 处理 可参数: https://www.emqx.com/zh/blog/mqtt-session
//			// mqtt v5.0 会话超时时间
//			MqttProperties properties = variableHeader.properties();
//			Integer sessionExpiryInterval = properties.getPropertyValue(MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL);
//			System.out.println(sessionExpiryInterval);
//		}
139
		// 8. 存储遗嘱消息
140 141 142
		boolean willFlag = variableHeader.isWillFlag();
		if (willFlag) {
			Message willMessage = new Message();
143
			willMessage.setMessageType(MessageType.DOWN_STREAM);
144
			willMessage.setFromClientId(uniqueId);
145
			willMessage.setFromUsername(userName);
146
			willMessage.setTopic(payload.willTopic());
147 148 149 150
			byte[] willMessageInBytes = payload.willMessageInBytes();
			if (willMessageInBytes != null) {
				willMessage.setPayload(ByteBuffer.wrap(willMessageInBytes));
			}
151 152
			willMessage.setQos(variableHeader.willQos());
			willMessage.setRetain(variableHeader.isWillRetain());
153 154 155 156
			willMessage.setTimestamp(System.currentTimeMillis());
			Node clientNode = context.getClientNode();
			// 客户端 ip:端口
			willMessage.setPeerHost(clientNode.getIp() + ':' + clientNode.getPort());
157
			willMessage.setNode(nodeName);
158
			messageStore.addWillMessage(uniqueId, willMessage);
159
		}
160 161 162 163 164 165 166 167 168
		// 9. 在线状态
		try {
			connectStatusListener.online(context, uniqueId);
		} catch (Throwable e) {
			logger.error("mqtt connectStatusListener", e);
			connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
			return;
		}
		// 10. 返回 ack
169
		connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_ACCEPTED);
170 171
	}

172
	private static void connAckByReturnCode(String clientId, String uniqueId, ChannelContext context, MqttConnectReasonCode returnCode) {
173 174 175 176 177
		MqttConnAckMessage message = MqttMessageBuilders.connAck()
			.returnCode(returnCode)
			.sessionPresent(false)
			.build();
		Tio.send(context, message);
178
		logger.info("Connect ack send - clientId: {} uniqueId:{} returnCode:{}", clientId, uniqueId, returnCode);
179 180
	}

181 182 183 184 185 186 187 188 189 190 191
	private void sendConnected(ChannelContext context, String uniqueId) {
		Message message = new Message();
		message.setClientId(uniqueId);
		message.setMessageType(MessageType.CONNECT);
		message.setNode(nodeName);
		message.setTimestamp(System.currentTimeMillis());
		Node clientNode = context.getClientNode();
		message.setPeerHost(clientNode.getIp() + ':' + clientNode.getPort());
		messageDispatcher.send(message);
	}

192 193 194 195 196 197 198 199
	private void cleanSession(String clientId) {
		try {
			sessionManager.remove(clientId);
		} catch (Throwable throwable) {
			logger.error("Mqtt server clientId:{} session clean error.", clientId, throwable);
		}
	}

200 201 202 203 204 205 206
	@Override
	public void processPublish(ChannelContext context, MqttPublishMessage message) {
		String clientId = context.getBsId();
		MqttFixedHeader fixedHeader = message.fixedHeader();
		MqttQoS mqttQoS = fixedHeader.qosLevel();
		MqttPublishVariableHeader variableHeader = message.variableHeader();
		String topicName = variableHeader.topicName();
207
		// 1. 判断是否有发布权限,没有权限则断开 mqtt 连接 mqtt 5.x qos1、qos2 可以响应 reasonCode
208
		if (publishPermission != null && !publishPermission.verifyPermission(context, clientId, topicName, mqttQoS, fixedHeader.isRetain())) {
209
			Tio.remove(context, "Mqtt clientId:" + clientId + " publish topic: " + topicName + " no permission.");
210 211 212
			return;
		}
		// 2. 处理发布逻辑
213
		int packetId = variableHeader.packetId();
如梦技术's avatar
如梦技术 已提交
214
		logger.debug("Publish - clientId:{} topicName:{} mqttQoS:{} packetId:{}", clientId, topicName, mqttQoS, packetId);
215 216
		switch (mqttQoS) {
			case AT_MOST_ONCE:
217
				invokeListenerForPublish(context, clientId, mqttQoS, topicName, message);
218 219
				break;
			case AT_LEAST_ONCE:
220
				invokeListenerForPublish(context, clientId, mqttQoS, topicName, message);
221 222 223 224
				if (packetId != -1) {
					MqttMessage messageAck = MqttMessageBuilders.pubAck()
						.packetId(packetId)
						.build();
如梦技术's avatar
如梦技术 已提交
225 226
					Boolean resultPubAck = Tio.send(context, messageAck);
					logger.debug("Publish - PubAck send clientId:{} topicName:{} mqttQoS:{} packetId:{} result:{}", clientId, topicName, mqttQoS, packetId, resultPubAck);
227 228 229 230 231 232 233
				}
				break;
			case EXACTLY_ONCE:
				if (packetId != -1) {
					MqttFixedHeader pubRecFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
					MqttMessage pubRecMessage = new MqttMessage(pubRecFixedHeader, MqttMessageIdVariableHeader.from(packetId));
					MqttPendingQos2Publish pendingQos2Publish = new MqttPendingQos2Publish(message, pubRecMessage);
如梦技术's avatar
如梦技术 已提交
234 235
					Boolean resultPubRec = Tio.send(context, pubRecMessage);
					logger.debug("Publish - PubRec send clientId:{} topicName:{} mqttQoS:{} packetId:{} result:{}", clientId, topicName, mqttQoS, packetId, resultPubRec);
如梦技术's avatar
如梦技术 已提交
236
					sessionManager.addPendingQos2Publish(clientId, packetId, pendingQos2Publish);
237 238 239 240 241
					pendingQos2Publish.startPubRecRetransmitTimer(executor, msg -> Tio.send(context, msg));
				}
				break;
			case FAILURE:
			default:
如梦技术's avatar
如梦技术 已提交
242
				break;
243 244 245 246 247 248 249
		}
	}

	@Override
	public void processPubAck(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
		int messageId = variableHeader.messageId();
		String clientId = context.getBsId();
浅梦2013's avatar
浅梦2013 已提交
250
		logger.debug("PubAck - clientId:{}, messageId:{}", clientId, messageId);
如梦技术's avatar
如梦技术 已提交
251
		MqttPendingPublish pendingPublish = sessionManager.getPendingPublish(clientId, messageId);
如梦技术's avatar
如梦技术 已提交
252 253 254 255
		if (pendingPublish == null) {
			return;
		}
		pendingPublish.onPubAckReceived();
如梦技术's avatar
如梦技术 已提交
256
		sessionManager.removePendingPublish(clientId, messageId);
257
		pendingPublish.getPayload().clear();
258 259 260 261 262 263
	}

	@Override
	public void processPubRec(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
		String clientId = context.getBsId();
		int messageId = variableHeader.messageId();
浅梦2013's avatar
浅梦2013 已提交
264
		logger.debug("PubRec - clientId:{}, messageId:{}", clientId, messageId);
如梦技术's avatar
如梦技术 已提交
265
		MqttPendingPublish pendingPublish = sessionManager.getPendingPublish(clientId, messageId);
如梦技术's avatar
如梦技术 已提交
266 267 268
		if (pendingPublish == null) {
			return;
		}
如梦技术's avatar
如梦技术 已提交
269 270 271 272 273 274 275 276
		pendingPublish.onPubAckReceived();

		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
		MqttMessage pubRelMessage = new MqttMessage(fixedHeader, variableHeader);
		Tio.send(context, pubRelMessage);

		pendingPublish.setPubRelMessage(pubRelMessage);
		pendingPublish.startPubRelRetransmissionTimer(executor, msg -> Tio.send(context, msg));
277 278 279 280 281
	}

	@Override
	public void processPubRel(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
		String clientId = context.getBsId();
如梦技术's avatar
如梦技术 已提交
282
		int messageId = variableHeader.messageId();
浅梦2013's avatar
浅梦2013 已提交
283
		logger.debug("PubRel - clientId:{}, messageId:{}", clientId, messageId);
如梦技术's avatar
如梦技术 已提交
284
		MqttPendingQos2Publish pendingQos2Publish = sessionManager.getPendingQos2Publish(clientId, messageId);
如梦技术's avatar
如梦技术 已提交
285 286 287
		if (pendingQos2Publish != null) {
			MqttPublishMessage incomingPublish = pendingQos2Publish.getIncomingPublish();
			String topicName = incomingPublish.variableHeader().topicName();
288 289
			MqttFixedHeader incomingFixedHeader = incomingPublish.fixedHeader();
			MqttQoS mqttQoS = incomingFixedHeader.qosLevel();
290
			invokeListenerForPublish(context, clientId, mqttQoS, topicName, incomingPublish);
如梦技术's avatar
如梦技术 已提交
291
			pendingQos2Publish.onPubRelReceived();
如梦技术's avatar
如梦技术 已提交
292
			sessionManager.removePendingQos2Publish(clientId, messageId);
如梦技术's avatar
如梦技术 已提交
293
		}
294 295
		MqttMessage message = MqttMessageFactory.newMessage(
			new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0),
如梦技术's avatar
如梦技术 已提交
296
			MqttMessageIdVariableHeader.from(messageId), null);
297 298 299 300 301 302 303
		Tio.send(context, message);
	}

	@Override
	public void processPubComp(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
		int messageId = variableHeader.messageId();
		String clientId = context.getBsId();
浅梦2013's avatar
浅梦2013 已提交
304
		logger.debug("PubComp - clientId:{}, messageId:{}", clientId, messageId);
如梦技术's avatar
如梦技术 已提交
305
		MqttPendingPublish pendingPublish = sessionManager.getPendingPublish(clientId, messageId);
如梦技术's avatar
如梦技术 已提交
306
		if (pendingPublish != null) {
307
			pendingPublish.getPayload().clear();
如梦技术's avatar
如梦技术 已提交
308
			pendingPublish.onPubCompReceived();
如梦技术's avatar
如梦技术 已提交
309
			sessionManager.removePendingPublish(clientId, messageId);
如梦技术's avatar
如梦技术 已提交
310
		}
311 312 313 314 315 316
	}

	@Override
	public void processSubscribe(ChannelContext context, MqttSubscribeMessage message) {
		String clientId = context.getBsId();
		int messageId = message.variableHeader().messageId();
317
		// 1. 校验订阅的 topicFilter
318 319 320 321 322 323 324
		List<MqttTopicSubscription> topicSubscriptionList = message.payload().topicSubscriptions();
		List<MqttQoS> grantedQosList = new ArrayList<>();
		// 校验订阅
		List<String> subscribedTopicList = new ArrayList<>();
		boolean enableSubscribeValidator = subscribeValidator != null;
		for (MqttTopicSubscription subscription : topicSubscriptionList) {
			String topicFilter = subscription.topicName();
如梦技术's avatar
如梦技术 已提交
325
			MqttQoS mqttQoS = subscription.qualityOfService();
326
			// 校验是否可以订阅
327
			if (enableSubscribeValidator && !subscribeValidator.verifyTopicFilter(context, clientId, topicFilter, mqttQoS)) {
328 329 330 331 332 333 334 335
				grantedQosList.add(MqttQoS.FAILURE);
				logger.error("Subscribe - clientId:{} topicFilter:{} mqttQoS:{} valid failed messageId:{}", clientId, topicFilter, mqttQoS, messageId);
			} else {
				grantedQosList.add(mqttQoS);
				subscribedTopicList.add(topicFilter);
				sessionManager.addSubscribe(topicFilter, clientId, mqttQoS.value());
				logger.info("Subscribe - clientId:{} topicFilter:{} mqttQoS:{} messageId:{}", clientId, topicFilter, mqttQoS, messageId);
			}
如梦技术's avatar
如梦技术 已提交
336
		}
337 338
		// 3. 返回 ack
		MqttMessage subAckMessage = MqttMessageBuilders.subAck()
339
			.addGrantedQosList(grantedQosList)
340 341 342
			.packetId(messageId)
			.build();
		Tio.send(context, subAckMessage);
343
		// 4. 发送保留消息
344
		for (String topic : subscribedTopicList) {
345 346 347 348 349
			List<Message> retainMessageList = messageStore.getRetainMessage(topic);
			if (retainMessageList != null && !retainMessageList.isEmpty()) {
				for (Message retainMessage : retainMessageList) {
					messageDispatcher.send(clientId, retainMessage);
				}
350 351
			}
		}
352 353 354 355 356 357
	}

	@Override
	public void processUnSubscribe(ChannelContext context, MqttUnsubscribeMessage message) {
		String clientId = context.getBsId();
		int messageId = message.variableHeader().messageId();
如梦技术's avatar
如梦技术 已提交
358 359
		List<String> topicFilterList = message.payload().topics();
		for (String topicFilter : topicFilterList) {
360
			sessionManager.removeSubscribe(topicFilter, clientId);
如梦技术's avatar
如梦技术 已提交
361
		}
浅梦2013's avatar
浅梦2013 已提交
362
		logger.info("UnSubscribe - clientId:{} Topic:{} messageId:{}", clientId, topicFilterList, messageId);
363 364 365 366 367 368 369 370 371
		MqttMessage unSubMessage = MqttMessageBuilders.unsubAck()
			.packetId(messageId)
			.build();
		Tio.send(context, unSubMessage);
	}

	@Override
	public void processPingReq(ChannelContext context) {
		String clientId = context.getBsId();
如梦技术's avatar
如梦技术 已提交
372
		logger.debug("PingReq - clientId:{}", clientId);
373 374 375 376 377 378
		Tio.send(context, MqttMessage.PINGRESP);
	}

	@Override
	public void processDisConnect(ChannelContext context) {
		String clientId = context.getBsId();
浅梦2013's avatar
浅梦2013 已提交
379
		logger.info("DisConnect - clientId:{} contextId:{}", clientId, context.getId());
380 381
		// 设置正常断开的标识
		context.set(MqttConst.DIS_CONNECTED, (byte) 1);
382
		Tio.remove(context, "Mqtt DisConnect");
383 384 385 386 387
	}

	/**
	 * 处理订阅的消息
	 *
388 389 390 391
	 * @param context        ChannelContext
	 * @param clientId       clientId
	 * @param topicName      topicName
	 * @param publishMessage MqttPublishMessage
392
	 */
393
	private void invokeListenerForPublish(ChannelContext context, String clientId, MqttQoS mqttQoS,
394 395
										  String topicName, MqttPublishMessage publishMessage) {
		MqttFixedHeader fixedHeader = publishMessage.fixedHeader();
396
		boolean isRetain = fixedHeader.isRetain();
397
		ByteBuffer payload = publishMessage.payload();
398 399 400 401 402 403 404 405 406
		// 1. retain 消息逻辑
		if (isRetain) {
			// qos == 0 or payload is none,then clear previous retain message
			if (MqttQoS.AT_MOST_ONCE == mqttQoS || payload == null || payload.array().length == 0) {
				this.messageStore.clearRetainMessage(topicName);
			} else {
				Message retainMessage = new Message();
				retainMessage.setTopic(topicName);
				retainMessage.setQos(mqttQoS.value());
407
				retainMessage.setPayload(payload);
408
				retainMessage.setFromClientId(clientId);
409
				retainMessage.setMessageType(MessageType.DOWN_STREAM);
410
				retainMessage.setRetain(true);
411
				retainMessage.setDup(fixedHeader.isDup());
412
				retainMessage.setTimestamp(System.currentTimeMillis());
413 414 415
				Node clientNode = context.getClientNode();
				// 客户端 ip:端口
				retainMessage.setPeerHost(clientNode.getIp() + ':' + clientNode.getPort());
416
				retainMessage.setNode(nodeName);
417 418 419
				this.messageStore.addRetainMessage(topicName, retainMessage);
			}
		}
420 421 422 423 424 425 426 427 428 429 430
		// 2. message
		MqttPublishVariableHeader variableHeader = publishMessage.variableHeader();
		// messageId
		int packetId = variableHeader.packetId();
		Message message = new Message();
		message.setId(packetId);
		// 注意:broker 消息转发是不需要设置 toClientId 而是应该按 topic 找到订阅的客户端进行发送
		message.setFromClientId(clientId);
		message.setTopic(topicName);
		message.setQos(mqttQoS.value());
		if (payload != null) {
431
			message.setPayload(payload);
432
		}
433
		message.setMessageType(MessageType.UP_STREAM);
如梦技术's avatar
如梦技术 已提交
434
		message.setRetain(isRetain);
435 436 437 438 439 440 441
		message.setDup(fixedHeader.isDup());
		message.setTimestamp(System.currentTimeMillis());
		Node clientNode = context.getClientNode();
		// 客户端 ip:端口
		message.setPeerHost(clientNode.getIp() + ':' + clientNode.getPort());
		message.setNode(nodeName);
		// 3. 消息发布
442
		try {
443
			messageListener.onMessage(context, clientId, message);
444 445 446
		} catch (Throwable e) {
			logger.error(e.getMessage(), e);
		}
447 448 449
	}

}