提交 2e825615 编写于 作者: 如梦技术's avatar 如梦技术 🐛

简化客户端使用。

上级 e567f948
...@@ -11,10 +11,12 @@ ...@@ -11,10 +11,12 @@
- 继续抽象,方便使用。 - 继续抽象,方便使用。
- 实现 `mqtt-broker` 功能。 - 实现 `mqtt-broker` 功能。
## 参考 ## 参考vs借鉴
- [netty codec-mqtt](https://github.com/netty/netty/tree/4.1/codec-mqtt) - [netty codec-mqtt](https://github.com/netty/netty/tree/4.1/codec-mqtt)
- [jmqtt](https://github.com/Cicizz/jmqtt)
- [iot-mqtt-server](https://gitee.com/recallcode/iot-mqtt-server) - [iot-mqtt-server](https://gitee.com/recallcode/iot-mqtt-server)
- [moquette](https://github.com/moquette-io/moquette) - [moquette](https://github.com/moquette-io/moquette)
- [netty-mqtt-client](https://github.com/jetlinks/netty-mqtt-client)
## 工具 ## 工具
- [mqttx 优雅的跨平台 MQTT 5.0 客户端工具](https://mqttx.app/cn/) - [mqttx 优雅的跨平台 MQTT 5.0 客户端工具](https://mqttx.app/cn/)
......
...@@ -16,27 +16,116 @@ ...@@ -16,27 +16,116 @@
package net.dreamlu.iot.mqtt.core.client; package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttMessage; import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.core.common.MqttMessageListener;
import org.tio.client.ClientChannelContext; import org.tio.client.ClientChannelContext;
import org.tio.client.TioClient; import org.tio.client.TioClient;
import org.tio.core.Tio; import org.tio.core.Tio;
import java.nio.ByteBuffer;
/** /**
* mqtt 客户端 * mqtt 客户端
* *
* @author L.cm * @author L.cm
*/ */
public class MqttClient { public final class MqttClient {
private final TioClient tioClient;
private final MqttClientCreator config;
private final ClientChannelContext context;
private MqttClientConfig clientConfig; public static MqttClientCreator create() {
private MqttClientProcessor processor; return new MqttClientCreator();
private TioClient tioClient; }
private ClientChannelContext clientContext;
// TODO add subscribe MqttClient(TioClient tioClient, MqttClientCreator config, ClientChannelContext context) {
this.tioClient = tioClient;
this.config = config;
this.context = context;
}
public void connect() { /**
* 订阅
*
* @param topicFilter topicFilter
* @param listener MqttMessageListener
* @return MqttClient
*/
public MqttClient subscribe(String topicFilter, MqttMessageListener listener) {
return this;
}
/**
* 订阅
*
* @param topicFilter topicFilter
* @param qos MqttQoS
* @param listener MqttMessageListener
* @return MqttClient
*/
public MqttClient subscribe(String topicFilter, MqttQoS qos, MqttMessageListener listener) {
return this;
}
/**
* 取消订阅
*
* @param topicFilter topicFilter
* @return MqttClient
*/
public MqttClient unSubscribe(String topicFilter) {
return this;
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @return 是否发送成功
*/
public Boolean publish(String topic, ByteBuffer payload) {
return publish(topic, payload, MqttQoS.AT_MOST_ONCE);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @return 是否发送成功
*/
public Boolean publish(String topic, ByteBuffer payload, MqttQoS qos) {
return publish(topic, payload, qos, false);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public Boolean publish(String topic, ByteBuffer payload, boolean retain) {
return publish(topic, payload, MqttQoS.AT_MOST_ONCE, retain);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public Boolean publish(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
MqttPublishMessage message = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retain, 0),
new MqttPublishVariableHeader(topic, 0), payload);
return Tio.send(context, message);
} }
/** /**
...@@ -44,17 +133,16 @@ public class MqttClient { ...@@ -44,17 +133,16 @@ public class MqttClient {
* *
* @throws Exception 异常 * @throws Exception 异常
*/ */
public void reconnect() throws Exception { public MqttClient reconnect() throws Exception {
checkState(); tioClient.reconnect(context, config.getTimeout());
tioClient.reconnect(clientContext, clientConfig.getTimeout()); return this;
} }
/** /**
* 断开 mqtt 连接 * 断开 mqtt 连接
*/ */
public void disconnect() { public void disconnect() {
checkState(); Tio.send(context, MqttMessage.DISCONNECT);
Tio.send(clientContext, MqttMessage.DISCONNECT);
} }
/** /**
...@@ -63,19 +151,11 @@ public class MqttClient { ...@@ -63,19 +151,11 @@ public class MqttClient {
* @return 是否停止成功 * @return 是否停止成功
*/ */
public boolean stop() { public boolean stop() {
checkState();
return tioClient.stop(); return tioClient.stop();
} }
public ClientChannelContext getClientContext() { public ClientChannelContext getContext() {
checkState(); return context;
return clientContext;
}
private void checkState() {
if (clientContext == null) {
throw new IllegalStateException("您需要先 connect。");
}
} }
} }
...@@ -31,10 +31,10 @@ import java.util.Objects; ...@@ -31,10 +31,10 @@ import java.util.Objects;
* @author L.cm * @author L.cm
*/ */
public class MqttClientAioListener extends DefaultClientAioListener { public class MqttClientAioListener extends DefaultClientAioListener {
private final MqttClientConfig clientConfig; private final MqttClientCreator clientConfig;
private final MqttWillMessage willMessage; private final MqttWillMessage willMessage;
public MqttClientAioListener(MqttClientConfig clientConfig) { public MqttClientAioListener(MqttClientCreator clientConfig) {
this.clientConfig = Objects.requireNonNull(clientConfig); this.clientConfig = Objects.requireNonNull(clientConfig);
this.willMessage = clientConfig.getWillMessage(); this.willMessage = clientConfig.getWillMessage();
} }
......
...@@ -17,23 +17,33 @@ ...@@ -17,23 +17,33 @@
package net.dreamlu.iot.mqtt.core.client; package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttVersion; import net.dreamlu.iot.mqtt.codec.MqttVersion;
import org.tio.client.ClientChannelContext;
import org.tio.client.ClientTioConfig;
import org.tio.client.ReconnConf;
import org.tio.client.TioClient;
import org.tio.client.intf.ClientAioHandler;
import org.tio.client.intf.ClientAioListener;
import org.tio.core.Node;
import org.tio.core.ssl.SslConfig; import org.tio.core.ssl.SslConfig;
import java.util.Objects;
import java.util.function.Consumer;
/** /**
* MqttClient 配置 * mqtt 客户端构造器
* *
* @author L.cm * @author L.cm
*/ */
public class MqttClientConfig { public final class MqttClientCreator {
/** /**
* ip,可为空,为空 t-io 默认为 127.0.0.1 * ip,可为空,为空 t-io 默认为 127.0.0.1
*/ */
private String ip; private String ip;
/** /**
* 端口 * 端口,默认:1883
*/ */
private int port; private int port = 1883;
/** /**
* 超时时间,t-io 配置,可为 null * 超时时间,t-io 配置,可为 null
*/ */
...@@ -82,108 +92,160 @@ public class MqttClientConfig { ...@@ -82,108 +92,160 @@ public class MqttClientConfig {
* 遗嘱消息 * 遗嘱消息
*/ */
private MqttWillMessage willMessage; private MqttWillMessage willMessage;
/**
* 客户端处理器
*/
private MqttClientProcessor processor;
public String getIp() { protected String getIp() {
return ip; return ip;
} }
public void setIp(String ip) { protected int getPort() {
this.ip = ip; return port;
} }
public int getPort() { protected Integer getTimeout() {
return port; return timeout;
} }
public void setPort(int port) { protected int getKeepAliveSecs() {
this.port = port; return keepAliveSecs;
} }
public Integer getTimeout() { protected SslConfig getSslConfig() {
return timeout; return sslConfig;
} }
public void setTimeout(Integer timeout) { protected boolean isReconnect() {
this.timeout = timeout; return reconnect;
} }
public Integer getKeepAliveSecs() { protected Long getReInterval() {
return keepAliveSecs; return reInterval;
} }
public void setKeepAliveSecs(Integer keepAliveSecs) { public String getClientId() {
this.keepAliveSecs = keepAliveSecs; return clientId;
} }
public SslConfig getSslConfig() { protected MqttVersion getProtocolVersion() {
return sslConfig; return protocolVersion;
} }
public void setSslConfig(SslConfig sslConfig) { protected String getUsername() {
this.sslConfig = sslConfig; return username;
} }
public boolean isReconnect() { protected String getPassword() {
return reconnect; return password;
} }
public void setReconnect(boolean reconnect) { protected boolean isCleanSession() {
this.reconnect = reconnect; return cleanSession;
} }
public Long getReInterval() { protected MqttWillMessage getWillMessage() {
return reInterval; return willMessage;
} }
public void setReInterval(Long reInterval) { protected MqttClientProcessor getProcessor() {
this.reInterval = reInterval; return processor;
} }
public String getClientId() { public MqttClientCreator ip(String ip) {
return clientId; this.ip = ip;
return this;
} }
public void setClientId(String clientId) { public MqttClientCreator port(int port) {
this.clientId = clientId; this.port = port;
return this;
} }
public MqttVersion getProtocolVersion() { public MqttClientCreator timeout(int timeout) {
return protocolVersion; this.timeout = timeout;
return this;
} }
public void setProtocolVersion(MqttVersion protocolVersion) { public MqttClientCreator keepAliveSecs(int keepAliveSecs) {
this.protocolVersion = protocolVersion; this.keepAliveSecs = keepAliveSecs;
return this;
} }
public String getUsername() { public MqttClientCreator sslConfig(SslConfig sslConfig) {
return username; this.sslConfig = sslConfig;
return this;
} }
public void setUsername(String username) { public MqttClientCreator reconnect(boolean reconnect) {
this.username = username; this.reconnect = reconnect;
return this;
} }
public String getPassword() { public MqttClientCreator reInterval(long reInterval) {
return password; this.reInterval = reInterval;
return this;
} }
public void setPassword(String password) { public MqttClientCreator clientId(String clientId) {
this.password = password; this.clientId = clientId;
return this;
} }
public boolean isCleanSession() { public MqttClientCreator protocolVersion(MqttVersion protocolVersion) {
return cleanSession; this.protocolVersion = protocolVersion;
return this;
} }
public void setCleanSession(boolean cleanSession) { public MqttClientCreator username(String username) {
this.cleanSession = cleanSession; this.username = username;
return this;
} }
public MqttWillMessage getWillMessage() { public MqttClientCreator password(String password) {
return willMessage; this.password = password;
return this;
}
public MqttClientCreator cleanSession(boolean cleanSession) {
this.cleanSession = cleanSession;
return this;
} }
public void setWillMessage(MqttWillMessage willMessage) { public MqttClientCreator willMessage(MqttWillMessage willMessage) {
this.willMessage = willMessage; this.willMessage = willMessage;
return this;
} }
public MqttClientCreator willMessage(Consumer<MqttWillMessage.Builder> consumer) {
MqttWillMessage.Builder builder = MqttWillMessage.builder();
consumer.accept(builder);
return willMessage(builder.build());
}
public MqttClientCreator processor(MqttClientProcessor processor) {
this.processor = Objects.requireNonNull(processor);
return this;
}
public MqttClient connect() throws Exception {
// 1. 初始化 mqtt 处理器
ClientAioHandler clientAioHandler = new MqttClientAioHandler(Objects.requireNonNull(this.processor));
ClientAioListener clientAioListener = new MqttClientAioListener(this);
// 2. 重连
ReconnConf reconnConf = null;
if (this.reconnect) {
if (reInterval != null && reInterval > 0) {
reconnConf = new ReconnConf(reInterval);
} else {
reconnConf = new ReconnConf();
}
}
// tioClient
TioClient tioClient = new TioClient(new ClientTioConfig(clientAioHandler, clientAioListener, reconnConf));
ClientChannelContext context = tioClient.connect(new Node(this.ip, this.port), this.timeout);
return new MqttClient(tioClient, this, context);
}
} }
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.common;
import java.nio.ByteBuffer;
/**
* mqtt 消息处理
*
* @author L.cm
*/
@FunctionalInterface
public interface MqttMessageListener {
/**
* 监听到消息
*
* @param topic topic
* @param payload payload
*/
void onMessage(String topic, ByteBuffer payload);
}
package net.dreamlu.iot.mqtt.client; package net.dreamlu.iot.mqtt.client;
import net.dreamlu.iot.mqtt.codec.*; import net.dreamlu.iot.mqtt.core.client.MqttClient;
import net.dreamlu.iot.mqtt.core.client.MqttClientAioHandler;
import net.dreamlu.iot.mqtt.core.client.MqttClientAioListener;
import net.dreamlu.iot.mqtt.core.client.MqttClientConfig;
import net.dreamlu.iot.mqtt.core.client.MqttClientProcessor;
import org.tio.client.ClientChannelContext;
import org.tio.client.ClientTioConfig;
import org.tio.client.ReconnConf;
import org.tio.client.TioClient;
import org.tio.client.intf.ClientAioHandler;
import org.tio.client.intf.ClientAioListener;
import org.tio.core.Node;
import org.tio.core.Tio;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Timer; import java.util.Timer;
...@@ -26,31 +14,19 @@ import java.util.TimerTask; ...@@ -26,31 +14,19 @@ import java.util.TimerTask;
public class MqttClientTest { public class MqttClientTest {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
MqttClientProcessor processor = new MqttClientProcessorImpl(); // 初始化 mqtt 客户端
ClientAioHandler clientAioHandler = new MqttClientAioHandler(processor); MqttClient client = MqttClient.create()
// 暂时用 set 后期抽成 builder .clientId("MqttClientTest")
MqttClientConfig clientConfig = new MqttClientConfig(); .username("admin")
clientConfig.setClientId("MqttClientTest"); .password("123456")
clientConfig.setUsername("admin"); .processor(new MqttClientProcessorImpl())
clientConfig.setPassword("123456"); .connect();
//
ClientAioListener clientAioListener = new MqttClientAioListener(clientConfig);
ReconnConf reconnConf = new ReconnConf();
ClientTioConfig tioConfig = new ClientTioConfig(clientAioHandler, clientAioListener, reconnConf);
TioClient tioClient = new TioClient(tioConfig);
ClientChannelContext context = tioClient.connect(new Node("127.0.0.1", 1883), 1000);
Timer timer = new Timer(); Timer timer = new Timer();
timer.schedule(new TimerTask() { timer.schedule(new TimerTask() {
@Override @Override
public void run() { public void run() {
if (!context.isClosed) { client.publish("testtopicxx", ByteBuffer.wrap("mica最牛皮".getBytes()));
MqttPublishMessage message = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttPublishVariableHeader("testtopicxx", 0), ByteBuffer.wrap("mica最牛皮".getBytes()));
Tio.send(context, message);
}
} }
}, 1000, 2000); }, 1000, 2000);
......
...@@ -54,6 +54,6 @@ public class MqttServerTest { ...@@ -54,6 +54,6 @@ public class MqttServerTest {
}, 1000, 2000); }, 1000, 2000);
// 启动 // 启动
tioServer.start("127.0.0.1", socketPort); tioServer.start("0.0.0.0", socketPort);
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册