MqttServerConfiguration.java 7.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

17
package net.dreamlu.iot.mqtt.spring.server.config;
18

19 20
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
如梦技术's avatar
如梦技术 已提交
21
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerAuthHandler;
22
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerPublishPermission;
如梦技术's avatar
如梦技术 已提交
23
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerSubscribeValidator;
24
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerUniqueIdService;
25 26
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
27
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
28
import net.dreamlu.iot.mqtt.core.server.event.IMqttSessionListener;
29 30
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
31
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttServerAuthHandler;
32 33 34
import net.dreamlu.iot.mqtt.spring.server.MqttServerCustomizer;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import net.dreamlu.iot.mqtt.spring.server.event.SpringEventMqttConnectStatusListener;
35
import org.springframework.beans.factory.ObjectProvider;
浅梦2013's avatar
浅梦2013 已提交
36
import org.springframework.boot.autoconfigure.AutoConfiguration;
37
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
浅梦2013's avatar
浅梦2013 已提交
38
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
39
import org.springframework.boot.context.properties.EnableConfigurationProperties;
40
import org.springframework.context.ApplicationEventPublisher;
41
import org.springframework.context.annotation.Bean;
42 43
import org.tio.core.stat.IpStatListener;
import org.tio.utils.hutool.StrUtil;
44 45 46 47 48 49

/**
 * mqtt server 配置
 *
 * @author L.cm
 */
浅梦2013's avatar
浅梦2013 已提交
50
@AutoConfiguration
浅梦2013's avatar
浅梦2013 已提交
51 52 53
@ConditionalOnProperty(
	prefix = MqttServerProperties.PREFIX,
	name = "enabled",
54 55
	havingValue = "true",
	matchIfMissing = true
浅梦2013's avatar
浅梦2013 已提交
56
)
57 58 59
@EnableConfigurationProperties(MqttServerProperties.class)
public class MqttServerConfiguration {

60 61 62 63 64 65
	@Bean
	@ConditionalOnMissingBean
	public IMqttConnectStatusListener springEventMqttConnectStatusListener(ApplicationEventPublisher eventPublisher) {
		return new SpringEventMqttConnectStatusListener(eventPublisher);
	}

66 67
	@Bean
	public MqttServerCreator mqttServerCreator(MqttServerProperties properties,
68
											   ObjectProvider<IMqttServerAuthHandler> authHandlerObjectProvider,
69
											   ObjectProvider<IMqttServerUniqueIdService> uniqueIdServiceObjectProvider,
如梦技术's avatar
如梦技术 已提交
70
											   ObjectProvider<IMqttServerSubscribeValidator> subscribeValidatorObjectProvider,
71
											   ObjectProvider<IMqttServerPublishPermission> publishPermissionObjectProvider,
72 73 74
											   ObjectProvider<IMqttMessageDispatcher> messageDispatcherObjectProvider,
											   ObjectProvider<IMqttMessageStore> messageStoreObjectProvider,
											   ObjectProvider<IMqttSessionManager> sessionManagerObjectProvider,
75
											   ObjectProvider<IMqttSessionListener> sessionListenerObjectProvider,
76
											   ObjectProvider<IMqttMessageListener> messageListenerObjectProvider,
77 78
											   ObjectProvider<IMqttConnectStatusListener> connectStatusListenerObjectProvider,
											   ObjectProvider<IpStatListener> ipStatListenerObjectProvider,
79 80 81 82 83 84
											   ObjectProvider<MqttServerCustomizer> customizers) {
		MqttServerCreator serverCreator = MqttServer.create()
			.name(properties.getName())
			.ip(properties.getIp())
			.port(properties.getPort())
			.heartbeatTimeout(properties.getHeartbeatTimeout())
85
			.keepaliveBackoff(properties.getKeepaliveBackoff())
86 87
			.readBufferSize((int) properties.getReadBufferSize().toBytes())
			.maxBytesInMessage((int) properties.getMaxBytesInMessage().toBytes())
88
			.bufferAllocator(properties.getBufferAllocator())
浅梦2013's avatar
浅梦2013 已提交
89
			.maxClientIdLength(properties.getMaxClientIdLength())
90
			.webPort(properties.getWebPort())
浅梦2013's avatar
浅梦2013 已提交
91
			.websocketEnable(properties.isWebsocketEnable())
92
			.httpEnable(properties.isHttpEnable())
93 94
			.nodeName(properties.getNodeName())
			.statEnable(properties.isStatEnable());
95 96 97
		if (properties.isDebug()) {
			serverCreator.debug();
		}
98 99

		// http 认证
100 101 102 103
		MqttServerProperties.HttpBasicAuth httpBasicAuth = properties.getHttpBasicAuth();
		if (serverCreator.isHttpEnable() && httpBasicAuth.isEnable()) {
			serverCreator.httpBasicAuth(httpBasicAuth.getUsername(), httpBasicAuth.getPassword());
		}
104 105 106 107 108 109 110 111
		MqttServerProperties.Ssl ssl = properties.getSsl();
		String keyStorePath = ssl.getKeyStorePath();
		String trustStorePath = ssl.getTrustStorePath();
		String password = ssl.getPassword();
		// ssl 配置
		if (StrUtil.isNotBlank(keyStorePath) && StrUtil.isNotBlank(trustStorePath) && StrUtil.isNotBlank(password)) {
			serverCreator.useSsl(keyStorePath, trustStorePath, password);
		}
112 113
		// 自定义消息监听
		messageListenerObjectProvider.ifAvailable(serverCreator::messageListener);
114
		// 认证处理器
115 116 117 118
		IMqttServerAuthHandler authHandler = authHandlerObjectProvider.getIfAvailable(() -> {
			MqttServerProperties.MqttAuth mqttAuth = properties.getAuth();
			return mqttAuth.isEnable() ? new DefaultMqttServerAuthHandler(mqttAuth.getUsername(), mqttAuth.getPassword()) : null;
		});
119
		serverCreator.authHandler(authHandler);
120 121
		// mqtt 内唯一id
		uniqueIdServiceObjectProvider.ifAvailable(serverCreator::uniqueIdService);
如梦技术's avatar
如梦技术 已提交
122 123
		// 订阅校验
		subscribeValidatorObjectProvider.ifAvailable(serverCreator::subscribeValidator);
124 125
		// 订阅权限校验
		publishPermissionObjectProvider.ifAvailable(serverCreator::publishPermission);
126
		// 消息转发
127
		messageDispatcherObjectProvider.ifAvailable(serverCreator::messageDispatcher);
128
		// 消息存储
129
		messageStoreObjectProvider.ifAvailable(serverCreator::messageStore);
130
		// session 管理
131
		sessionManagerObjectProvider.ifAvailable(serverCreator::sessionManager);
132 133
		// session 监听
		sessionListenerObjectProvider.ifAvailable(serverCreator::sessionListener);
134
		// 状态监听
135
		connectStatusListenerObjectProvider.ifAvailable(serverCreator::connectStatusListener);
136
		// ip 状态监听
137
		ipStatListenerObjectProvider.ifAvailable(serverCreator::ipStatListener);
138
		// 自定义处理
139
		customizers.ifAvailable((customizer) -> customizer.customize(serverCreator));
140 141 142 143
		return serverCreator;
	}

	@Bean
浅梦2013's avatar
浅梦2013 已提交
144
	public MqttServer mqttServer(MqttServerCreator mqttServerCreator) {
145
		return mqttServerCreator.build();
浅梦2013's avatar
浅梦2013 已提交
146 147 148
	}

	@Bean
149 150
	public MqttServerLauncher mqttServerLauncher(MqttServer mqttServer) {
		return new MqttServerLauncher(mqttServer);
151 152
	}

浅梦2013's avatar
浅梦2013 已提交
153 154 155 156 157
	@Bean
	public MqttServerTemplate mqttServerTemplate(MqttServer mqttServer) {
		return new MqttServerTemplate(mqttServer);
	}

158
}