MqttClientCreator.java 10.3 KB
Newer Older
1
/*
2
 * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package net.dreamlu.iot.mqtt.core.client;

如梦技术's avatar
如梦技术 已提交
19 20 21 22
import net.dreamlu.iot.mqtt.codec.ByteBufferAllocator;
import net.dreamlu.iot.mqtt.codec.MqttConstant;
import net.dreamlu.iot.mqtt.codec.MqttProperties;
import net.dreamlu.iot.mqtt.codec.MqttVersion;
23 24 25 26 27 28 29 30
import org.tio.client.ClientChannelContext;
import org.tio.client.ClientTioConfig;
import org.tio.client.ReconnConf;
import org.tio.client.TioClient;
import org.tio.client.intf.ClientAioHandler;
import org.tio.client.intf.ClientAioListener;
import org.tio.core.Node;
import org.tio.core.ssl.SslConfig;
31
import org.tio.utils.hutool.StrUtil;
如梦技术's avatar
如梦技术 已提交
32
import org.tio.utils.thread.pool.DefaultThreadFactory;
33

如梦技术's avatar
如梦技术 已提交
34
import java.util.concurrent.ScheduledThreadPoolExecutor;
浅梦2013's avatar
浅梦2013 已提交
35
import java.util.concurrent.TimeUnit;
36 37 38 39 40 41 42 43 44
import java.util.function.Consumer;

/**
 * mqtt 客户端构造器
 *
 * @author L.cm
 */
public final class MqttClientCreator {

如梦技术's avatar
如梦技术 已提交
45 46 47 48
	/**
	 * 名称
	 */
	private String name = "Mica-Mqtt-Client";
49
	/**
如梦技术's avatar
如梦技术 已提交
50
	 * ip,可为空,默认为 127.0.0.1
51
	 */
如梦技术's avatar
如梦技术 已提交
52
	private String ip = "127.0.0.1";
53 54 55 56 57
	/**
	 * 端口,默认:1883
	 */
	private int port = 1883;
	/**
浅梦2013's avatar
浅梦2013 已提交
58
	 * 超时时间,t-io 配置,可为 null,默认为:5秒
59 60
	 */
	private Integer timeout;
61
	/**
如梦技术's avatar
如梦技术 已提交
62
	 * t-io 每次消息读取长度,跟 maxBytesInMessage 相关
63
	 */
如梦技术's avatar
如梦技术 已提交
64
	private int readBufferSize = MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
65 66 67 68
	/**
	 * 消息解析最大 bytes 长度,默认:8092
	 */
	private int maxBytesInMessage = MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
69 70 71 72
	/**
	 * mqtt 3.1 会校验此参数
	 */
	private int maxClientIdLength = MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
73 74 75 76 77 78 79
	/**
	 * Keep Alive (s)
	 */
	private int keepAliveSecs = 60;
	/**
	 * SSL配置
	 */
如梦技术's avatar
如梦技术 已提交
80
	private SslConfig sslConfig;
81 82 83 84 85
	/**
	 * 自动重连
	 */
	private boolean reconnect = true;
	/**
86
	 * 重连的间隔时间,单位毫秒,默认:5000
87
	 */
88 89 90 91 92
	private long reInterval = 5000;
	/**
	 * 连续重连次数,当连续重连这么多次都失败时,不再重连。0和负数则一直重连
	 */
	private int retryCount = 0;
93 94 95 96
	/**
	 * 重连,重新订阅一个批次大小,默认:20
	 */
	private int reSubscribeBatchSize = 20;
97 98 99 100 101 102 103
	/**
	 * 客户端 id,默认:随机生成
	 */
	private String clientId;
	/**
	 * mqtt 协议,默认:3_1_1
	 */
如梦技术's avatar
如梦技术 已提交
104
	private MqttVersion version = MqttVersion.MQTT_3_1_1;
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
	/**
	 * 用户名
	 */
	private String username = null;
	/**
	 * 密码
	 */
	private String password = null;
	/**
	 * 清除会话
	 * <p>
	 * false 表示如果订阅的客户机断线了,那么要保存其要推送的消息,如果其重新连接时,则将这些消息推送。
	 * true 表示消除,表示客户机是第一次连接,消息所以以前的连接信息。
	 * </p>
	 */
	private boolean cleanSession = true;
121 122 123 124
	/**
	 * mqtt 5.0 session 有效期,单位秒
	 */
	private Integer sessionExpiryIntervalSecs;
125 126 127 128
	/**
	 * 遗嘱消息
	 */
	private MqttWillMessage willMessage;
如梦技术's avatar
如梦技术 已提交
129 130 131 132
	/**
	 * mqtt5 properties
	 */
	private MqttProperties properties;
133
	/**
如梦技术's avatar
如梦技术 已提交
134
	 * ByteBuffer Allocator,支持堆内存和堆外内存,默认为:堆内存
135 136
	 */
	private ByteBufferAllocator bufferAllocator = ByteBufferAllocator.HEAP;
137 138 139 140
	/**
	 * 连接监听器
	 */
	private IMqttClientConnectListener connectListener;
141 142 143 144
	/**
	 * 客户端 session
	 */
	private IMqttClientSession clientSession;
145 146 147 148
	/**
	 * messageId 生成器
	 */
	private IMqttClientMessageIdGenerator messageIdGenerator;
149

如梦技术's avatar
如梦技术 已提交
150 151 152 153 154
	public String getName() {
		return name;
	}

	public String getIp() {
155 156 157
		return ip;
	}

如梦技术's avatar
如梦技术 已提交
158
	public int getPort() {
159 160 161
		return port;
	}

如梦技术's avatar
如梦技术 已提交
162
	public Integer getTimeout() {
163 164 165
		return timeout;
	}

166 167 168 169
	public int getReadBufferSize() {
		return readBufferSize;
	}

170 171 172 173
	public int getMaxBytesInMessage() {
		return maxBytesInMessage;
	}

174 175 176 177
	public int getMaxClientIdLength() {
		return maxClientIdLength;
	}

如梦技术's avatar
如梦技术 已提交
178
	public int getKeepAliveSecs() {
179 180 181
		return keepAliveSecs;
	}

如梦技术's avatar
如梦技术 已提交
182
	public SslConfig getSslConfig() {
183 184 185
		return sslConfig;
	}

如梦技术's avatar
如梦技术 已提交
186
	public boolean isReconnect() {
187 188 189
		return reconnect;
	}

190 191 192 193 194
	public int getRetryCount() {
		return retryCount;
	}

	public long getReInterval() {
195 196 197
		return reInterval;
	}

198 199 200 201
	public int getReSubscribeBatchSize() {
		return reSubscribeBatchSize;
	}

202 203 204 205
	public String getClientId() {
		return clientId;
	}

如梦技术's avatar
如梦技术 已提交
206 207
	public MqttVersion getVersion() {
		return version;
208 209
	}

如梦技术's avatar
如梦技术 已提交
210
	public String getUsername() {
211 212 213
		return username;
	}

如梦技术's avatar
如梦技术 已提交
214
	public String getPassword() {
215 216 217
		return password;
	}

如梦技术's avatar
如梦技术 已提交
218
	public boolean isCleanSession() {
219 220 221
		return cleanSession;
	}

222 223 224 225
	public Integer getSessionExpiryIntervalSecs() {
		return sessionExpiryIntervalSecs;
	}

如梦技术's avatar
如梦技术 已提交
226
	public MqttWillMessage getWillMessage() {
227 228 229
		return willMessage;
	}

如梦技术's avatar
如梦技术 已提交
230 231 232 233
	public MqttProperties getProperties() {
		return properties;
	}

234 235 236 237
	public ByteBufferAllocator getBufferAllocator() {
		return bufferAllocator;
	}

238 239 240 241
	public IMqttClientConnectListener getConnectListener() {
		return connectListener;
	}

242 243 244 245
	public IMqttClientSession getClientSession() {
		return clientSession;
	}

246 247 248 249
	public IMqttClientMessageIdGenerator getMessageIdGenerator() {
		return messageIdGenerator;
	}

如梦技术's avatar
如梦技术 已提交
250 251 252 253 254
	public MqttClientCreator name(String name) {
		this.name = name;
		return this;
	}

255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
	public MqttClientCreator ip(String ip) {
		this.ip = ip;
		return this;
	}

	public MqttClientCreator port(int port) {
		this.port = port;
		return this;
	}

	public MqttClientCreator timeout(int timeout) {
		this.timeout = timeout;
		return this;
	}

270 271 272 273 274
	public MqttClientCreator readBufferSize(int readBufferSize) {
		this.readBufferSize = readBufferSize;
		return this;
	}

275 276 277 278 279
	public MqttClientCreator maxBytesInMessage(int maxBytesInMessage) {
		this.maxBytesInMessage = maxBytesInMessage;
		return this;
	}

280 281 282 283 284
	public MqttClientCreator maxClientIdLength(int maxClientIdLength) {
		this.maxClientIdLength = maxClientIdLength;
		return this;
	}

285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
	public MqttClientCreator keepAliveSecs(int keepAliveSecs) {
		this.keepAliveSecs = keepAliveSecs;
		return this;
	}

	public MqttClientCreator sslConfig(SslConfig sslConfig) {
		this.sslConfig = sslConfig;
		return this;
	}

	public MqttClientCreator reconnect(boolean reconnect) {
		this.reconnect = reconnect;
		return this;
	}

300
	public MqttClientCreator retryCount(int retryCount) {
301 302 303 304
		this.retryCount = retryCount;
		return this;
	}

305 306 307 308 309
	public MqttClientCreator reInterval(long reInterval) {
		this.reInterval = reInterval;
		return this;
	}

310 311 312 313 314
	public MqttClientCreator reSubscribeBatchSize(int reSubscribeBatchSize) {
		this.reSubscribeBatchSize = reSubscribeBatchSize;
		return this;
	}

315 316 317 318 319
	public MqttClientCreator clientId(String clientId) {
		this.clientId = clientId;
		return this;
	}

如梦技术's avatar
如梦技术 已提交
320 321
	public MqttClientCreator version(MqttVersion version) {
		this.version = version;
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
		return this;
	}

	public MqttClientCreator username(String username) {
		this.username = username;
		return this;
	}

	public MqttClientCreator password(String password) {
		this.password = password;
		return this;
	}

	public MqttClientCreator cleanSession(boolean cleanSession) {
		this.cleanSession = cleanSession;
		return this;
	}

340 341 342 343 344
	public MqttClientCreator sessionExpiryIntervalSecs(Integer sessionExpiryIntervalSecs) {
		this.sessionExpiryIntervalSecs = sessionExpiryIntervalSecs;
		return this;
	}

345 346 347 348 349 350 351 352 353 354 355
	public MqttClientCreator willMessage(MqttWillMessage willMessage) {
		this.willMessage = willMessage;
		return this;
	}

	public MqttClientCreator willMessage(Consumer<MqttWillMessage.Builder> consumer) {
		MqttWillMessage.Builder builder = MqttWillMessage.builder();
		consumer.accept(builder);
		return willMessage(builder.build());
	}

如梦技术's avatar
如梦技术 已提交
356 357 358 359
	public MqttClientCreator properties(MqttProperties properties) {
		this.properties = properties;
		return this;
	}
如梦技术's avatar
如梦技术 已提交
360

361 362 363 364 365
	public MqttClientCreator bufferAllocator(ByteBufferAllocator allocator) {
		this.bufferAllocator = allocator;
		return this;
	}

366 367 368 369 370
	public MqttClientCreator connectListener(IMqttClientConnectListener connectListener) {
		this.connectListener = connectListener;
		return this;
	}

371 372 373 374 375
	public MqttClientCreator clientSession(IMqttClientSession clientSession) {
		this.clientSession = clientSession;
		return this;
	}

376 377 378 379 380
	public MqttClientCreator messageIdGenerator(IMqttClientMessageIdGenerator messageIdGenerator) {
		this.messageIdGenerator = messageIdGenerator;
		return this;
	}

381
	public MqttClient connect() {
382 383 384
		// 1. 生成 默认的 clientId
		String clientId = getClientId();
		if (StrUtil.isBlank(clientId)) {
如梦技术's avatar
如梦技术 已提交
385 386
			// 默认为:MICA-MQTT- 前缀和 36进制的纳秒数
			this.clientId("MICA-MQTT-" + Long.toString(System.nanoTime(), 36));
387
		}
388 389 390 391
		// 2. 客户端 session
		if (this.clientSession == null) {
			this.clientSession = new DefaultMqttClientSession();
		}
392 393 394 395
		// 3. 消息id 生成器
		if (this.messageIdGenerator == null) {
			this.messageIdGenerator = new DefaultMqttClientMessageIdGenerator();
		}
如梦技术's avatar
如梦技术 已提交
396
		ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, DefaultThreadFactory.getInstance("MqttClient"));
397
		IMqttClientProcessor processor = new DefaultMqttClientProcessor(this, executor);
398
		// 4. 初始化 mqtt 处理器
399
		ClientAioHandler clientAioHandler = new MqttClientAioHandler(this, processor);
400
		ClientAioListener clientAioListener = new MqttClientAioListener(this);
401
		// 5. 重连配置
402 403
		ReconnConf reconnConf = null;
		if (this.reconnect) {
404
			reconnConf = new ReconnConf(this.reInterval, this.retryCount);
405
		}
406
		// 6. tioConfig
如梦技术's avatar
如梦技术 已提交
407 408
		ClientTioConfig tioConfig = new ClientTioConfig(clientAioHandler, clientAioListener, reconnConf);
		tioConfig.setName(this.name);
409
		// 7. 心跳超时时间
浅梦2013's avatar
浅梦2013 已提交
410
		tioConfig.setHeartbeatTimeout(TimeUnit.SECONDS.toMillis(this.keepAliveSecs));
411
		// 8. mqtt 消息最大长度
412
		tioConfig.setReadBufferSize(this.readBufferSize);
413 414 415
		// 9. ssl 证书设置
		tioConfig.setSslConfig(this.sslConfig);
		// 10. tioClient
416 417 418
		try {
			TioClient tioClient = new TioClient(tioConfig);
			ClientChannelContext context = tioClient.connect(new Node(this.ip, this.port), this.timeout);
419
			return new MqttClient(tioClient, this, context, executor);
420 421 422
		} catch (Exception e) {
			throw new IllegalStateException("Mica mqtt client start fail.", e);
		}
423 424 425
	}

}