提交 45fd3cd9 编写于 作者: 如梦技术's avatar 如梦技术 🐛

微调。

上级 c2485cdb
......@@ -16,6 +16,9 @@
package net.dreamlu.iot.mqtt.core.client;
import org.tio.client.TioClient;
import org.tio.core.ChannelContext;
/**
* mqtt 客户端
*
......@@ -23,10 +26,21 @@ package net.dreamlu.iot.mqtt.core.client;
*/
public class MqttClient {
// 1. 服务端信息 ip、端口、timeout,考虑 ssl
// 2. 重连配置
// 3. 客户端信息 clientId、userName、pwd
// 4. 自定义消息处理
// 5. sub 的 topic 和 监听的配置,sub 是自定义业务处理器的行为,不应该在这里面配置
private MqttClientConfig clientConfig;
private MqttClientProcessor processor;
private TioClient tioClient;
private ChannelContext context;
public void connect() {
}
public void disconnect() {
}
public boolean stop() {
return false;
}
}
......@@ -16,11 +16,14 @@
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttConnectMessage;
import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import org.tio.client.DefaultClientAioListener;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.utils.hutool.StrUtil;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
/**
* mqtt 客户端监听器
......@@ -28,26 +31,38 @@ import org.tio.core.Tio;
* @author L.cm
*/
public class MqttClientAioListener extends DefaultClientAioListener {
private final String clientId;
private final String username;
private final byte[] password;
private final MqttClientConfig clientConfig;
private final MqttWillMessage willMessage;
public MqttClientAioListener(String clientId, String username, byte[] password) {
this.clientId = clientId;
this.username = username;
this.password = password;
public MqttClientAioListener(MqttClientConfig clientConfig) {
this.clientConfig = Objects.requireNonNull(clientConfig);
this.willMessage = clientConfig.getWillMessage();
}
@Override
public void onAfterConnected(ChannelContext context, boolean isConnected, boolean isReconnect) {
if (isConnected) {
// 1. 建立连接后发送 mqtt 连接的消息
MqttConnectMessage message = MqttMessageBuilders.connect()
.clientId(clientId)
.username(username)
.password(password)
.build();
Tio.send(context, message);
MqttMessageBuilders.ConnectBuilder builder = MqttMessageBuilders.connect()
.clientId(clientConfig.getClientId())
.username(clientConfig.getUsername())
.keepAlive(clientConfig.getKeepAliveSecs())
.cleanSession(clientConfig.isCleanSession())
.protocolVersion(clientConfig.getProtocolVersion())
.willFlag(willMessage != null);
// 密码
String password = clientConfig.getPassword();
if (StrUtil.isNotBlank(password)) {
builder.password(password.getBytes(StandardCharsets.UTF_8));
}
// 遗嘱消息
if (willMessage != null) {
builder.willTopic(willMessage.getTopic())
.willMessage(willMessage.getMessage())
.willRetain(willMessage.isRetain())
.willQoS(willMessage.getQos());
}
Tio.send(context, builder.build());
}
}
......
......@@ -41,7 +41,7 @@ public class MqttClientConfig {
/**
* Keep Alive (s)
*/
private Integer keepAliveSecs;
private int keepAliveSecs = 60;
/**
* SSL配置
*/
......@@ -83,4 +83,107 @@ public class MqttClientConfig {
*/
private MqttWillMessage willMessage;
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public Integer getTimeout() {
return timeout;
}
public void setTimeout(Integer timeout) {
this.timeout = timeout;
}
public Integer getKeepAliveSecs() {
return keepAliveSecs;
}
public void setKeepAliveSecs(Integer keepAliveSecs) {
this.keepAliveSecs = keepAliveSecs;
}
public SslConfig getSslConfig() {
return sslConfig;
}
public void setSslConfig(SslConfig sslConfig) {
this.sslConfig = sslConfig;
}
public boolean isReconnect() {
return reconnect;
}
public void setReconnect(boolean reconnect) {
this.reconnect = reconnect;
}
public Long getReInterval() {
return reInterval;
}
public void setReInterval(Long reInterval) {
this.reInterval = reInterval;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public MqttVersion getProtocolVersion() {
return protocolVersion;
}
public void setProtocolVersion(MqttVersion protocolVersion) {
this.protocolVersion = protocolVersion;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public boolean isCleanSession() {
return cleanSession;
}
public void setCleanSession(boolean cleanSession) {
this.cleanSession = cleanSession;
}
public MqttWillMessage getWillMessage() {
return willMessage;
}
public void setWillMessage(MqttWillMessage willMessage) {
this.willMessage = willMessage;
}
}
......@@ -18,6 +18,8 @@ package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Objects;
/**
......@@ -27,7 +29,7 @@ import java.util.Objects;
*/
public final class MqttWillMessage {
private final String topic;
private final String message;
private final byte[] message;
/**
* 遗嘱消息保留标志
*/
......@@ -37,7 +39,7 @@ public final class MqttWillMessage {
*/
private final MqttQoS qos;
private MqttWillMessage(String topic, String message, boolean retain, MqttQoS qos) {
private MqttWillMessage(String topic, byte[] message, boolean retain, MqttQoS qos) {
this.topic = topic;
this.message = message;
this.retain = retain;
......@@ -48,7 +50,7 @@ public final class MqttWillMessage {
return topic;
}
public String getMessage() {
public byte[] getMessage() {
return message;
}
......@@ -66,7 +68,7 @@ public final class MqttWillMessage {
public static final class Builder {
private String topic;
private String message;
private byte[] message;
private boolean retain;
private MqttQoS qos;
......@@ -75,11 +77,16 @@ public final class MqttWillMessage {
return this;
}
public Builder message(String message) {
public Builder message(byte[] message) {
this.message = Objects.requireNonNull(message);
return this;
}
public Builder messageText(String message) {
this.message = Objects.requireNonNull(message).getBytes(StandardCharsets.UTF_8);
return this;
}
public Builder retain(boolean retain) {
this.retain = retain;
return this;
......@@ -119,7 +126,7 @@ public final class MqttWillMessage {
public String toString() {
return "MqttWillMessage{" +
"topic='" + topic + '\'' +
", message='" + message + '\'' +
", message='" + Arrays.toString(message) + '\'' +
", retain=" + retain +
", qos=" + qos +
'}';
......
......@@ -3,6 +3,7 @@ package net.dreamlu.iot.mqtt.client;
import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.core.client.MqttClientAioHandler;
import net.dreamlu.iot.mqtt.core.client.MqttClientAioListener;
import net.dreamlu.iot.mqtt.core.client.MqttClientConfig;
import net.dreamlu.iot.mqtt.core.client.MqttClientProcessor;
import org.tio.client.ClientChannelContext;
import org.tio.client.ClientTioConfig;
......@@ -27,7 +28,13 @@ public class MqttClientTest {
public static void main(String[] args) throws Exception {
MqttClientProcessor processor = new MqttClientProcessorImpl();
ClientAioHandler clientAioHandler = new MqttClientAioHandler(processor);
ClientAioListener clientAioListener = new MqttClientAioListener("MqttClientTest", "admin", "123456".getBytes());
// 暂时用 set 后期抽成 builder
MqttClientConfig clientConfig = new MqttClientConfig();
clientConfig.setClientId("MqttClientTest");
clientConfig.setUsername("admin");
clientConfig.setPassword("123456");
//
ClientAioListener clientAioListener = new MqttClientAioListener(clientConfig);
ReconnConf reconnConf = new ReconnConf();
ClientTioConfig tioConfig = new ClientTioConfig(clientAioHandler, clientAioListener, reconnConf);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册