MqttClientCreator.java 8.6 KB
Newer Older
1
/*
2
 * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package net.dreamlu.iot.mqtt.core.client;

如梦技术's avatar
如梦技术 已提交
19 20 21 22
import net.dreamlu.iot.mqtt.codec.ByteBufferAllocator;
import net.dreamlu.iot.mqtt.codec.MqttConstant;
import net.dreamlu.iot.mqtt.codec.MqttProperties;
import net.dreamlu.iot.mqtt.codec.MqttVersion;
23 24 25 26 27 28 29 30
import org.tio.client.ClientChannelContext;
import org.tio.client.ClientTioConfig;
import org.tio.client.ReconnConf;
import org.tio.client.TioClient;
import org.tio.client.intf.ClientAioHandler;
import org.tio.client.intf.ClientAioListener;
import org.tio.core.Node;
import org.tio.core.ssl.SslConfig;
31
import org.tio.utils.hutool.StrUtil;
如梦技术's avatar
如梦技术 已提交
32
import org.tio.utils.thread.pool.DefaultThreadFactory;
33

如梦技术's avatar
如梦技术 已提交
34
import java.util.concurrent.ScheduledThreadPoolExecutor;
浅梦2013's avatar
浅梦2013 已提交
35
import java.util.concurrent.TimeUnit;
36 37 38 39 40 41 42 43 44
import java.util.function.Consumer;

/**
 * mqtt 客户端构造器
 *
 * @author L.cm
 */
public final class MqttClientCreator {

如梦技术's avatar
如梦技术 已提交
45 46 47 48
	/**
	 * 名称
	 */
	private String name = "Mica-Mqtt-Client";
49
	/**
如梦技术's avatar
如梦技术 已提交
50
	 * ip,可为空,默认为 127.0.0.1
51
	 */
如梦技术's avatar
如梦技术 已提交
52
	private String ip = "127.0.0.1";
53 54 55 56 57
	/**
	 * 端口,默认:1883
	 */
	private int port = 1883;
	/**
浅梦2013's avatar
浅梦2013 已提交
58
	 * 超时时间,t-io 配置,可为 null,默认为:5秒
59 60
	 */
	private Integer timeout;
61
	/**
如梦技术's avatar
如梦技术 已提交
62
	 * t-io 每次消息读取长度,跟 maxBytesInMessage 相关
63
	 */
如梦技术's avatar
如梦技术 已提交
64
	private int readBufferSize = MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
65 66 67 68
	/**
	 * 消息解析最大 bytes 长度,默认:8092
	 */
	private int maxBytesInMessage = MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
69 70 71 72
	/**
	 * mqtt 3.1 会校验此参数
	 */
	private int maxClientIdLength = MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
73 74 75 76 77 78 79
	/**
	 * Keep Alive (s)
	 */
	private int keepAliveSecs = 60;
	/**
	 * SSL配置
	 */
如梦技术's avatar
如梦技术 已提交
80
	private SslConfig sslConfig;
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
	/**
	 * 自动重连
	 */
	private boolean reconnect = true;
	/**
	 * 重连重试时间
	 */
	private Long reInterval;
	/**
	 * 客户端 id,默认:随机生成
	 */
	private String clientId;
	/**
	 * mqtt 协议,默认:3_1_1
	 */
如梦技术's avatar
如梦技术 已提交
96
	private MqttVersion version = MqttVersion.MQTT_3_1_1;
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
	/**
	 * 用户名
	 */
	private String username = null;
	/**
	 * 密码
	 */
	private String password = null;
	/**
	 * 清除会话
	 * <p>
	 * false 表示如果订阅的客户机断线了,那么要保存其要推送的消息,如果其重新连接时,则将这些消息推送。
	 * true 表示消除,表示客户机是第一次连接,消息所以以前的连接信息。
	 * </p>
	 */
	private boolean cleanSession = true;
	/**
	 * 遗嘱消息
	 */
	private MqttWillMessage willMessage;
如梦技术's avatar
如梦技术 已提交
117 118 119 120
	/**
	 * mqtt5 properties
	 */
	private MqttProperties properties;
121
	/**
如梦技术's avatar
如梦技术 已提交
122
	 * ByteBuffer Allocator,支持堆内存和堆外内存,默认为:堆内存
123 124
	 */
	private ByteBufferAllocator bufferAllocator = ByteBufferAllocator.HEAP;
125 126 127 128
	/**
	 * 连接监听器
	 */
	private IMqttClientConnectListener connectListener;
129

如梦技术's avatar
如梦技术 已提交
130 131 132 133 134
	public String getName() {
		return name;
	}

	public String getIp() {
135 136 137
		return ip;
	}

如梦技术's avatar
如梦技术 已提交
138
	public int getPort() {
139 140 141
		return port;
	}

如梦技术's avatar
如梦技术 已提交
142
	public Integer getTimeout() {
143 144 145
		return timeout;
	}

146 147 148 149
	public int getReadBufferSize() {
		return readBufferSize;
	}

150 151 152 153
	public int getMaxBytesInMessage() {
		return maxBytesInMessage;
	}

154 155 156 157
	public int getMaxClientIdLength() {
		return maxClientIdLength;
	}

如梦技术's avatar
如梦技术 已提交
158
	public int getKeepAliveSecs() {
159 160 161
		return keepAliveSecs;
	}

如梦技术's avatar
如梦技术 已提交
162
	public SslConfig getSslConfig() {
163 164 165
		return sslConfig;
	}

如梦技术's avatar
如梦技术 已提交
166
	public boolean isReconnect() {
167 168 169
		return reconnect;
	}

如梦技术's avatar
如梦技术 已提交
170
	public Long getReInterval() {
171 172 173 174 175 176 177
		return reInterval;
	}

	public String getClientId() {
		return clientId;
	}

如梦技术's avatar
如梦技术 已提交
178 179
	public MqttVersion getVersion() {
		return version;
180 181
	}

如梦技术's avatar
如梦技术 已提交
182
	public String getUsername() {
183 184 185
		return username;
	}

如梦技术's avatar
如梦技术 已提交
186
	public String getPassword() {
187 188 189
		return password;
	}

如梦技术's avatar
如梦技术 已提交
190
	public boolean isCleanSession() {
191 192 193
		return cleanSession;
	}

如梦技术's avatar
如梦技术 已提交
194
	public MqttWillMessage getWillMessage() {
195 196 197
		return willMessage;
	}

如梦技术's avatar
如梦技术 已提交
198 199 200 201
	public MqttProperties getProperties() {
		return properties;
	}

202 203 204 205
	public ByteBufferAllocator getBufferAllocator() {
		return bufferAllocator;
	}

206 207 208 209
	public IMqttClientConnectListener getConnectListener() {
		return connectListener;
	}

如梦技术's avatar
如梦技术 已提交
210 211 212 213 214
	public MqttClientCreator name(String name) {
		this.name = name;
		return this;
	}

215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
	public MqttClientCreator ip(String ip) {
		this.ip = ip;
		return this;
	}

	public MqttClientCreator port(int port) {
		this.port = port;
		return this;
	}

	public MqttClientCreator timeout(int timeout) {
		this.timeout = timeout;
		return this;
	}

230 231 232 233 234
	public MqttClientCreator readBufferSize(int readBufferSize) {
		this.readBufferSize = readBufferSize;
		return this;
	}

235 236 237 238 239
	public MqttClientCreator maxBytesInMessage(int maxBytesInMessage) {
		this.maxBytesInMessage = maxBytesInMessage;
		return this;
	}

240 241 242 243 244
	public MqttClientCreator maxClientIdLength(int maxClientIdLength) {
		this.maxClientIdLength = maxClientIdLength;
		return this;
	}

245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
	public MqttClientCreator keepAliveSecs(int keepAliveSecs) {
		this.keepAliveSecs = keepAliveSecs;
		return this;
	}

	public MqttClientCreator sslConfig(SslConfig sslConfig) {
		this.sslConfig = sslConfig;
		return this;
	}

	public MqttClientCreator reconnect(boolean reconnect) {
		this.reconnect = reconnect;
		return this;
	}

	public MqttClientCreator reInterval(long reInterval) {
		this.reInterval = reInterval;
		return this;
	}

	public MqttClientCreator clientId(String clientId) {
		this.clientId = clientId;
		return this;
	}

如梦技术's avatar
如梦技术 已提交
270 271
	public MqttClientCreator version(MqttVersion version) {
		this.version = version;
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
		return this;
	}

	public MqttClientCreator username(String username) {
		this.username = username;
		return this;
	}

	public MqttClientCreator password(String password) {
		this.password = password;
		return this;
	}

	public MqttClientCreator cleanSession(boolean cleanSession) {
		this.cleanSession = cleanSession;
		return this;
	}

	public MqttClientCreator willMessage(MqttWillMessage willMessage) {
		this.willMessage = willMessage;
		return this;
	}

	public MqttClientCreator willMessage(Consumer<MqttWillMessage.Builder> consumer) {
		MqttWillMessage.Builder builder = MqttWillMessage.builder();
		consumer.accept(builder);
		return willMessage(builder.build());
	}

如梦技术's avatar
如梦技术 已提交
301 302 303 304
	public MqttClientCreator properties(MqttProperties properties) {
		this.properties = properties;
		return this;
	}
如梦技术's avatar
如梦技术 已提交
305

306 307 308 309 310
	public MqttClientCreator bufferAllocator(ByteBufferAllocator allocator) {
		this.bufferAllocator = allocator;
		return this;
	}

311 312 313 314 315
	public MqttClientCreator connectListener(IMqttClientConnectListener connectListener) {
		this.connectListener = connectListener;
		return this;
	}

316
	public MqttClient connect() {
317 318 319
		// 1. 生成 默认的 clientId
		String clientId = getClientId();
		if (StrUtil.isBlank(clientId)) {
如梦技术's avatar
如梦技术 已提交
320 321
			// 默认为:MICA-MQTT- 前缀和 36进制的纳秒数
			this.clientId("MICA-MQTT-" + Long.toString(System.nanoTime(), 36));
322
		}
如梦技术's avatar
如梦技术 已提交
323
		MqttClientStore clientStore = new MqttClientStore();
如梦技术's avatar
如梦技术 已提交
324
		ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, DefaultThreadFactory.getInstance("MqttClient"));
325
		IMqttClientProcessor processor = new DefaultMqttClientProcessor(clientStore, executor);
326
		// 2. 初始化 mqtt 处理器
327
		ClientAioHandler clientAioHandler = new MqttClientAioHandler(this, processor);
如梦技术's avatar
如梦技术 已提交
328
		ClientAioListener clientAioListener = new MqttClientAioListener(this, clientStore, executor);
329
		// 3. 重连配置
330 331
		ReconnConf reconnConf = null;
		if (this.reconnect) {
332 333
			if (this.reInterval != null && this.reInterval > 0) {
				reconnConf = new ReconnConf(this.reInterval);
334 335 336 337
			} else {
				reconnConf = new ReconnConf();
			}
		}
如梦技术's avatar
如梦技术 已提交
338 339 340
		// 4. tioConfig
		ClientTioConfig tioConfig = new ClientTioConfig(clientAioHandler, clientAioListener, reconnConf);
		tioConfig.setName(this.name);
浅梦2013's avatar
浅梦2013 已提交
341 342 343
		// 5. 心跳超时时间
		tioConfig.setHeartbeatTimeout(TimeUnit.SECONDS.toMillis(this.keepAliveSecs));
		// 6. mqtt 消息最大长度
344
		tioConfig.setReadBufferSize(this.readBufferSize);
浅梦2013's avatar
浅梦2013 已提交
345
		// 7. tioClient
346 347 348 349 350 351 352
		try {
			TioClient tioClient = new TioClient(tioConfig);
			ClientChannelContext context = tioClient.connect(new Node(this.ip, this.port), this.timeout);
			return new MqttClient(tioClient, this, context, clientStore, executor);
		} catch (Exception e) {
			throw new IllegalStateException("Mica mqtt client start fail.", e);
		}
353 354 355
	}

}