提交 452d6182 编写于 作者: 如梦技术's avatar 如梦技术 🐛

mqtt-server 简化,默认多设备可以直接互相订阅和处理消息。

上级 a472e683
......@@ -35,8 +35,6 @@ public class Server {
public static void main(String[] args) {
// 启动服务,mica-mqtt 1.3.x 已经默认为 broker 模式
MqttServer.create()
.ip("0.0.0.0")
.port(1883)
.debug()
.start();
}
......
......@@ -16,6 +16,7 @@
package net.dreamlu.iot.mqtt.server;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import org.slf4j.Logger;
......@@ -46,6 +47,9 @@ public class MqttServerTest {
// .maxBytesInMessage(1024 * 100)
// mqtt 3.1 协议会校验 clientId 长度。
// .maxClientIdLength(64)
.messageListener((context, clientId, message) -> {
logger.info("clientId:{} message:{} payload:{}", clientId, message, ByteBufferUtil.toString(message.getPayload()));
})
// 客户端连接状态监听
.connectStatusListener(new MqttConnectStatusListener())
// 开启 http
......
package net.dreamlu.iot.mqtt.server.listener;
import net.dreamlu.iot.mqtt.core.server.dispatcher.AbstractMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
/**
* @author wsq
*/
@Service
public class MqttServerMessageListener extends AbstractMqttMessageDispatcher {
public class MqttServerMessageListener implements IMqttMessageListener {
private static final Logger logger = LoggerFactory.getLogger(MqttServerMessageListener.class);
@Override
public void sendAll(Message message) {
logger.info("message:{}", message);
public void onMessage(ChannelContext context, String clientId, Message message) {
logger.info("clientId:{} message:{} payload:{}", clientId, message, ByteBufferUtil.toString(message.getPayload()));
}
}
......@@ -84,6 +84,10 @@ MqttServer mqttServer = MqttServer.create()
.maxBytesInMessage(1024 * 100)
// 自定义认证
.authHandler((clientId, userName, password) -> true)
// 消息监听
.messageListener((context, clientId, message) -> {
logger.info("clientId:{} message:{} payload:{}", clientId, message, ByteBufferUtil.toString(message.getPayload()));
})
// 堆内存和堆外内存选择,默认:堆内存
.bufferAllocator(ByteBufferAllocator.HEAP)
// 心跳超时时间,默认:120s
......
......@@ -26,6 +26,7 @@ import net.dreamlu.iot.mqtt.core.server.broker.DefaultMqttBrokerDispatcher;
import net.dreamlu.iot.mqtt.core.server.dispatcher.AbstractMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import net.dreamlu.iot.mqtt.core.server.http.core.MqttWebServer;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.session.InMemoryMqttSessionManager;
......@@ -129,6 +130,10 @@ public class MqttServerCreator {
* session 管理
*/
private IMqttSessionManager sessionManager;
/**
* 消息监听
*/
private IMqttMessageListener messageListener;
/**
* 连接状态监听
*/
......@@ -350,6 +355,15 @@ public class MqttServerCreator {
return this;
}
public IMqttMessageListener getMessageListener() {
return messageListener;
}
public MqttServerCreator messageListener(IMqttMessageListener messageListener) {
this.messageListener = messageListener;
return this;
}
public IMqttConnectStatusListener getConnectStatusListener() {
return connectStatusListener;
}
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server.event;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import org.tio.core.ChannelContext;
/**
* mqtt 消息处理
*
* @author L.cm
*/
@FunctionalInterface
public interface IMqttMessageListener {
/**
* 监听到消息
*
* @param context ChannelContext
* @param clientId clientId
* @param message Message
*/
void onMessage(ChannelContext context, String clientId, Message message);
}
......@@ -29,6 +29,7 @@ import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerUniqueIdService;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.enums.MessageType;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
......@@ -69,6 +70,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
private final IMqttServerPublishPermission publishPermission;
private final IMqttMessageDispatcher messageDispatcher;
private final IMqttConnectStatusListener connectStatusListener;
private final IMqttMessageListener messageListener;
private final ScheduledThreadPoolExecutor executor;
public DefaultMqttServerProcessor(MqttServerCreator serverCreator, ScheduledThreadPoolExecutor executor) {
......@@ -82,6 +84,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
this.publishPermission = serverCreator.getPublishPermission();
this.messageDispatcher = serverCreator.getMessageDispatcher();
this.connectStatusListener = serverCreator.getConnectStatusListener();
this.messageListener = serverCreator.getMessageListener();
this.executor = executor;
}
......@@ -445,6 +448,14 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
message.setPeerHost(clientNode.getIp() + ':' + clientNode.getPort());
message.setNode(serverCreator.getNodeName());
// 3. 消息发布
if (messageListener != null) {
try {
messageListener.onMessage(context, clientId, message);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
// 4. 消息流转
try {
messageDispatcher.send(message);
} catch (Throwable e) {
......
......@@ -62,29 +62,29 @@ mqtt:
### 2.3 可实现接口(注册成 Spring Bean 即可)
| 接口 | 是否必须 | 说明 |
| --------------------------- | -------------- | ------------------------- |
| IMqttServerUniqueIdService | 否 | 用于 clientId 不唯一时,自定义实现唯一标识,后续接口使用它替代 clientId |
| IMqttServerAuthHandler | 是 | 用于服务端认证 |
| IMqttServerSubscribeValidator | 否(建议实现) | 1.1.3 新增,用于对客户端订阅校验 |
| IMqttServerPublishPermission | 否(建议实现) | 1.2.2 新增,用于对客户端发布权限校验 |
| IMqttConnectStatusListener | 是 | 连接状态监听 |
| IMqttSessionManager | 否 | session 管理 |
| IMqttMessageStore | 集群是,单机否 | 遗嘱和保留消息存储 |
| AbstractMqttMessageDispatcher | 集群是,单机否 | 消息转发,(遗嘱、保留消息转发) |
| IpStatListener | 否 | t-io ip 状态监听 |
### 2.4 AbstractMqttMessageDispatcher (用于监听客户端上传的消息) 使用示例
| --------------------------- |------------| ------------------------- |
| IMqttServerUniqueIdService | 否 | 用于 clientId 不唯一时,自定义实现唯一标识,后续接口使用它替代 clientId |
| IMqttServerAuthHandler | 是 | 用于服务端认证 |
| IMqttServerSubscribeValidator | 否(建议实现) | 1.1.3 新增,用于对客户端订阅校验 |
| IMqttServerPublishPermission | 否(建议实现) | 1.2.2 新增,用于对客户端发布权限校验 |
| IMqttMessageListener | 否(1.3.x为否) | 消息监听 |
| IMqttConnectStatusListener | 是 | 连接状态监听 |
| IMqttSessionManager | 否 | session 管理 |
| IMqttMessageStore | 集群是,单机否 | 遗嘱和保留消息存储 |
| AbstractMqttMessageDispatcher | 集群是,单机否 | 消息转发,(遗嘱、保留消息转发) |
| IpStatListener | 否 | t-io ip 状态监听 |
### 2.4 IMqttMessageListener (用于监听客户端上传的消息) 使用示例
```java
@Service
public class MqttServerMessageListener extends AbstractMqttMessageDispatcher {
public class MqttServerMessageListener implements IMqttMessageListener {
private static final Logger logger = LoggerFactory.getLogger(MqttServerMessageListener.class);
@Override
public void sendAll(Message message) {
logger.info("message:{}", message);
public void onMessage(ChannelContext context, String clientId, Message message) {
logger.info("clientId:{} message:{} payload:{}", clientId, message, ByteBufferUtil.toString(message.getPayload()));
}
}
```
......
......@@ -24,6 +24,7 @@ import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerSubscribeValidator;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerUniqueIdService;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
import org.springframework.beans.factory.ObjectProvider;
......@@ -58,6 +59,7 @@ public class MqttServerConfiguration {
ObjectProvider<IMqttMessageDispatcher> messageDispatcherObjectProvider,
ObjectProvider<IMqttMessageStore> messageStoreObjectProvider,
ObjectProvider<IMqttSessionManager> sessionManagerObjectProvider,
ObjectProvider<IMqttMessageListener> messageListenerObjectProvider,
ObjectProvider<IMqttConnectStatusListener> connectStatusListenerObjectProvider,
ObjectProvider<IpStatListener> ipStatListenerObjectProvider,
ObjectProvider<MqttServerCustomizer> customizers) {
......@@ -91,6 +93,8 @@ public class MqttServerConfiguration {
if (StrUtil.isNotBlank(keyStorePath) && StrUtil.isNotBlank(trustStorePath) && StrUtil.isNotBlank(password)) {
serverCreator.useSsl(keyStorePath, trustStorePath, password);
}
// 自定义消息监听
messageListenerObjectProvider.ifAvailable(serverCreator::messageListener);
// 认证处理器
authHandlerObjectProvider.ifAvailable(serverCreator::authHandler);
// mqtt 内唯一id
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册