提交 24ff3278 编写于 作者: 浅梦2013's avatar 浅梦2013

MqttPublishMessage payload 改为 byte 数组,简化代码

上级 30836e9a
......@@ -7,9 +7,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
import org.tio.utils.buffer.ByteBufferUtil;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
/**
* 客户端消息监听的另一种方式
......@@ -22,8 +21,8 @@ public class MqttClientMessageListener implements IMqttClientMessageListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientMessageListener.class);
@Override
public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, ByteBuffer payload) {
logger.info("topic:{} payload:{}", topic, ByteBufferUtil.toString(payload));
public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
}
}
......@@ -5,7 +5,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.tio.utils.buffer.ByteBufferUtil;
import java.nio.charset.StandardCharsets;
......@@ -25,7 +24,7 @@ public class ClientService {
public boolean sub() {
client.subQos0("/test/#", (context, topic, message, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
});
return true;
}
......
......@@ -17,7 +17,6 @@
package net.dreamlu.iot.mqtt.aliyun;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
import org.tio.utils.buffer.ByteBufferUtil;
import java.nio.charset.StandardCharsets;
import java.util.Timer;
......@@ -55,7 +54,7 @@ public class MqttClientTest {
.connect();
client.subQos0("/sys/" + productKey + '/' + deviceName + "/thing/event/property/post_reply", (context, topic, message, payload) -> {
System.out.println(topic + '\t' + ByteBufferUtil.toString(payload));
System.out.println(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
});
Timer timer = new Timer();
......
......@@ -19,7 +19,8 @@ package net.dreamlu.iot.mqtt.broker;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.utils.buffer.ByteBufferUtil;
import java.nio.charset.StandardCharsets;
/**
* 设备 A,这里默认 APP 应用端
......@@ -39,7 +40,7 @@ public class DeviceA {
.connect();
client.subQos0("/a/door/open", (context, topic, message, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
});
}
......
......@@ -19,7 +19,8 @@ package net.dreamlu.iot.mqtt.broker;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.utils.buffer.ByteBufferUtil;
import java.nio.charset.StandardCharsets;
/**
* 设备 B,这里默认 web 端
......@@ -39,7 +40,7 @@ public class DeviceB {
.connect();
client.subQos0("/a/door/open", (context, topic, message, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
});
}
}
......@@ -19,7 +19,6 @@ package net.dreamlu.iot.mqtt.client;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.utils.buffer.ByteBufferUtil;
import java.nio.charset.StandardCharsets;
......@@ -43,7 +42,7 @@ public class MqttClientSyncTest {
.connectSync();
client.subQos0("/test/#", (context, topic, message, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
});
client.unSubscribe("/test/#", "/test/123");
......
......@@ -23,9 +23,7 @@ import net.dreamlu.iot.mqtt.core.client.MqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.utils.buffer.ByteBufferUtil;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Timer;
import java.util.TimerTask;
......@@ -68,8 +66,8 @@ public class MqttClientTest {
}
@Override
public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, ByteBuffer payload) {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
}
});
......
......@@ -17,7 +17,6 @@
package net.dreamlu.iot.mqtt.huawei;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
import org.tio.utils.buffer.ByteBufferUtil;
import java.nio.charset.StandardCharsets;
import java.util.Timer;
......@@ -58,7 +57,7 @@ public class MqttClientTest {
String cmdRequestTopic = "$oc/devices/" + deviceId + "/sys/commands/#";
client.subQos0(cmdRequestTopic, (context, topic, message, payload) -> {
System.out.println(topic + '\t' + ByteBufferUtil.toString(payload));
System.out.println(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
});
// 属性上报消息
......
......@@ -19,7 +19,6 @@ package net.dreamlu.iot.mqtt.server;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.utils.buffer.ByteBufferUtil;
import java.nio.charset.StandardCharsets;
import java.util.Timer;
......@@ -49,7 +48,7 @@ public class MqttServerTest {
// mqtt 3.1 协议会校验 clientId 长度。
// .maxClientIdLength(64)
.messageListener((context, clientId, topic, qos, message) -> {
logger.info("clientId:{} payload:{}", clientId, ByteBufferUtil.toString(message.getPayload()));
logger.info("clientId:{} payload:{}", clientId, new String(message.payload(), StandardCharsets.UTF_8));
})
// 客户端连接状态监听
.connectStatusListener(new MqttConnectStatusListener())
......
......@@ -7,7 +7,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.tio.core.ChannelContext;
import org.tio.utils.buffer.ByteBufferUtil;
/**
* 监听器1,使用 Spring boot event 的方式
......@@ -29,7 +28,7 @@ public class MqttServerMessageListener1 {
public void onMessage(Message message) {
String clientId = message.getFromClientId();
ChannelContext context = mqttServerTemplate.getChannelContext(clientId);
logger.info("context:{} clientId:{} message:{} payload:{}", context, clientId, message, ByteBufferUtil.toString(message.getPayload()));
logger.info("context:{} clientId:{} message:{} payload:{}", context, clientId, message, new String(message.getPayload()));
}
}
......@@ -11,7 +11,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
import org.tio.utils.buffer.ByteBufferUtil;
import java.nio.charset.StandardCharsets;
/**
* 消息监听器示例2,优点:性能损失小
......@@ -27,7 +28,7 @@ public class MqttServerMessageListener2 implements IMqttMessageListener, SmartIn
@Override
public void onMessage(ChannelContext context, String clientId, String topic, MqttQoS qos, MqttPublishMessage message) {
logger.info("context:{} clientId:{} message:{} payload:{}", context, clientId, message, ByteBufferUtil.toString(message.getPayload()));
logger.info("context:{} clientId:{} message:{} payload:{}", context, clientId, message, new String(message.payload(), StandardCharsets.UTF_8));
}
@Override
......
......@@ -66,7 +66,7 @@ public class RedisMqttMessageDownReceiver implements MessageListener, Initializi
return;
}
String clientId = mqttMessage.getClientId();
ByteBuffer payload = mqttMessage.getPayload();
byte[] payload = mqttMessage.getPayload();
MqttQoS mqttQoS = MqttQoS.valueOf(mqttMessage.getQos());
boolean retain = mqttMessage.isRetain();
if (StringUtil.isBlank(clientId)) {
......
......@@ -7,7 +7,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
import org.tio.utils.buffer.ByteBufferUtil;
import java.nio.charset.StandardCharsets;
/**
* 消息监听
......@@ -20,6 +21,6 @@ public class MqttServerMessageListener implements IMqttMessageListener {
@Override
public void onMessage(ChannelContext context, String clientId, String topic, MqttQoS qoS, MqttPublishMessage message) {
logger.info("clientId:{} message:{} payload:{}", clientId, message, ByteBufferUtil.toString(message.getPayload()));
logger.info("clientId:{} message:{} payload:{}", clientId, message, new String(message.getPayload(), StandardCharsets.UTF_8));
}
}
......@@ -27,7 +27,6 @@ import org.tio.core.Tio;
import org.tio.utils.hutool.CollUtil;
import org.tio.utils.timer.TimerTaskService;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
......@@ -267,7 +266,6 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
}
pendingPublish.onPubAckReceived();
clientSession.removePendingPublish(messageId);
pendingPublish.getPayload().clear();
}
@Override
......@@ -317,7 +315,6 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
String topicName = pendingPublish.getMessage().variableHeader().topicName();
logger.info("MQTT Topic:{} successfully PubComp", topicName);
}
pendingPublish.getPayload().clear();
pendingPublish.onPubCompReceived();
clientSession.removePendingPublish(messageId);
}
......@@ -334,10 +331,9 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
if (subscriptionList.isEmpty()) {
logger.warn("Mqtt message to accept topic:{} subscriptionList is empty.", topicName);
} else {
final ByteBuffer payload = message.payload();
final byte[] payload = message.payload();
subscriptionList.forEach(subscription -> {
IMqttClientMessageListener listener = subscription.getListener();
payload.rewind();
executor.submit(() -> {
try {
listener.onMessage(context, topicName, message, payload);
......
......@@ -21,8 +21,6 @@ import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.codec.MqttSubAckMessage;
import org.tio.core.ChannelContext;
import java.nio.ByteBuffer;
/**
* mqtt 消息处理
*
......@@ -62,6 +60,6 @@ public interface IMqttClientMessageListener {
* @param message MqttPublishMessage
* @param payload payload
*/
void onMessage(ChannelContext context, String topic, MqttPublishMessage message, ByteBuffer payload);
void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload);
}
......@@ -32,7 +32,6 @@ import org.tio.core.Tio;
import org.tio.core.intf.Packet;
import org.tio.utils.timer.TimerTaskService;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
......@@ -263,17 +262,6 @@ public final class MqttClient {
return this;
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @return 是否发送成功
*/
public boolean publish(String topic, ByteBuffer payload) {
return publish(topic, payload, MqttQoS.AT_MOST_ONCE);
}
/**
* 发布消息
*
......@@ -285,18 +273,6 @@ public final class MqttClient {
return publish(topic, payload, MqttQoS.AT_MOST_ONCE);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @return 是否发送成功
*/
public boolean publish(String topic, ByteBuffer payload, MqttQoS qos) {
return publish(topic, payload, qos, false);
}
/**
* 发布消息
*
......@@ -309,18 +285,6 @@ public final class MqttClient {
return publish(topic, payload, qos, false);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public boolean publish(String topic, ByteBuffer payload, boolean retain) {
return publish(topic, payload, MqttQoS.AT_MOST_ONCE, retain);
}
/**
* 发布消息
*
......@@ -343,19 +307,6 @@ public final class MqttClient {
* @return 是否发送成功
*/
public boolean publish(String topic, byte[] payload, MqttQoS qos, boolean retain) {
return publish(topic, payload == null ? null : ByteBuffer.wrap(payload), qos, retain);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public boolean publish(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
return publish(topic, payload, qos, (publishBuilder) -> publishBuilder.retained(retain));
}
......@@ -369,7 +320,7 @@ public final class MqttClient {
* @param properties MqttProperties
* @return 是否发送成功
*/
public boolean publish(String topic, ByteBuffer payload, MqttQoS qos, boolean retain, MqttProperties properties) {
public boolean publish(String topic, byte[] payload, MqttQoS qos, boolean retain, MqttProperties properties) {
return publish(topic, payload, qos, (publishBuilder) -> publishBuilder.retained(retain).properties(properties));
}
......@@ -382,15 +333,12 @@ public final class MqttClient {
* @param builder PublishBuilder
* @return 是否发送成功
*/
public boolean publish(String topic, ByteBuffer payload, MqttQoS qos, Consumer<MqttMessageBuilders.PublishBuilder> builder) {
public boolean publish(String topic, byte[] payload, MqttQoS qos, Consumer<MqttMessageBuilders.PublishBuilder> builder) {
// 校验 topic
TopicUtil.validateTopicName(topic);
// qos 判断
boolean isHighLevelQoS = MqttQoS.AT_LEAST_ONCE == qos || MqttQoS.EXACTLY_ONCE == qos;
int messageId = isHighLevelQoS ? messageIdGenerator.getId() : -1;
if (payload == null) {
payload = ByteBuffer.allocate(0);
}
MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish();
// 自定义配置
builder.accept(publishBuilder);
......
......@@ -528,11 +528,10 @@ public final class MqttDecoder {
return new Result<>(new MqttUnsubscribePayload(unsubscribeTopics), numberOfBytesConsumed);
}
private static Result<ByteBuffer> decodePublishPayload(ByteBuffer buffer, int bytesRemainingInVariablePart) {
byte[] slice = new byte[bytesRemainingInVariablePart];
buffer.get(slice, 0, bytesRemainingInVariablePart);
ByteBuffer byteBuffer = ByteBuffer.wrap(slice);
return new Result<>(byteBuffer, bytesRemainingInVariablePart);
private static Result<byte[]> decodePublishPayload(ByteBuffer buffer, int bytesRemainingInVariablePart) {
byte[] payload = new byte[bytesRemainingInVariablePart];
buffer.get(payload, 0, bytesRemainingInVariablePart);
return new Result<>(payload, bytesRemainingInVariablePart);
}
private static Result<String> decodeString(ByteBuffer buffer) {
......
......@@ -369,7 +369,8 @@ public final class MqttEncoder {
MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttPublishVariableHeader variableHeader = message.variableHeader();
ByteBuffer payload = message.payload().duplicate();
// 处理 payload 为 null 的情况
byte[] payload = message.payload() == null ? ByteBufferUtil.EMPTY_BYTES : message.payload();
String topicName = variableHeader.topicName();
byte[] topicNameBytes = encodeStringUtf8(topicName);
......@@ -379,7 +380,7 @@ public final class MqttEncoder {
int variableHeaderBufferSize = 2 + topicNameBytes.length +
(mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0) + propertiesBytes.length;
int payloadBufferSize = payload.array().length;
int payloadBufferSize = payload.length;
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
......
......@@ -36,7 +36,7 @@ public final class MqttMessageBuilders {
private String topic;
private boolean retained;
private MqttQoS qos;
private ByteBuffer payload;
private byte[] payload;
private int messageId;
private MqttProperties mqttProperties;
......@@ -58,7 +58,7 @@ public final class MqttMessageBuilders {
return this;
}
public PublishBuilder payload(ByteBuffer payload) {
public PublishBuilder payload(byte[] payload) {
this.payload = payload;
return this;
}
......@@ -81,7 +81,7 @@ public final class MqttMessageBuilders {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retained, 0);
MqttPublishVariableHeader mqttVariableHeader =
new MqttPublishVariableHeader(topic, messageId, mqttProperties);
return new MqttPublishMessage(mqttFixedHeader, mqttVariableHeader, ByteBufferUtil.clone(payload));
return new MqttPublishMessage(mqttFixedHeader, mqttVariableHeader, payload);
}
}
......
......@@ -58,7 +58,7 @@ public final class MqttMessageFactory {
return new MqttPublishMessage(
mqttFixedHeader,
(MqttPublishVariableHeader) variableHeader,
(ByteBuffer) payload);
(byte[]) payload);
case PUBACK:
//Having MqttPubReplyMessageVariableHeader or MqttMessageIdVariableHeader
return new MqttPubAckMessage(mqttFixedHeader, (MqttMessageIdVariableHeader) variableHeader);
......
......@@ -16,18 +16,16 @@
package net.dreamlu.iot.mqtt.codec;
import java.nio.ByteBuffer;
/**
* See <a href="https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#publish">MQTTV3.1/publish</a>
*
* @author netty
* @author netty、L.cm
*/
public class MqttPublishMessage extends MqttMessage {
public MqttPublishMessage(
MqttFixedHeader mqttFixedHeader,
MqttPublishVariableHeader variableHeader,
ByteBuffer payload) {
byte[] payload) {
super(mqttFixedHeader, variableHeader, payload);
}
......@@ -37,12 +35,12 @@ public class MqttPublishMessage extends MqttMessage {
}
@Override
public ByteBuffer payload() {
return (ByteBuffer) super.payload();
public byte[] payload() {
return (byte[]) super.payload();
}
public ByteBuffer getPayload() {
return payload();
public byte[] getPayload() {
return this.payload();
}
}
......@@ -15,20 +15,20 @@ import java.util.function.Consumer;
* @author netty
*/
public final class MqttPendingPublish {
private final ByteBuffer payload;
private final byte[] payload;
private final MqttPublishMessage message;
private final MqttQoS qos;
private final RetryProcessor<MqttPublishMessage> pubRetryProcessor = new RetryProcessor<>();
private final RetryProcessor<MqttMessage> pubRelRetryProcessor = new RetryProcessor<>();
public MqttPendingPublish(ByteBuffer payload, MqttPublishMessage message, MqttQoS qos) {
public MqttPendingPublish(byte[] payload, MqttPublishMessage message, MqttQoS qos) {
this.payload = payload;
this.message = message;
this.qos = qos;
this.pubRetryProcessor.setOriginalMessage(message);
}
public ByteBuffer getPayload() {
public byte[] getPayload() {
return payload;
}
......@@ -42,7 +42,6 @@ public final class MqttPendingPublish {
public void startPublishRetransmissionTimer(TimerTaskService taskService, Consumer<MqttMessage> sendPacket) {
this.pubRetryProcessor.setHandle(((fixedHeader, originalMessage) -> {
this.payload.rewind();
sendPacket.accept(new MqttPublishMessage(fixedHeader, originalMessage.variableHeader(), this.payload));
}));
this.pubRetryProcessor.start(taskService);
......
......@@ -37,7 +37,6 @@ import org.tio.utils.hutool.StrUtil;
import org.tio.utils.timer.TimerTaskService;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
/**
......@@ -117,7 +116,7 @@ public final class MqttServer {
* @param payload 消息体
* @return 是否发送成功
*/
public boolean publish(String clientId, String topic, ByteBuffer payload) {
public boolean publish(String clientId, String topic, byte[] payload) {
return publish(clientId, topic, payload, MqttQoS.AT_MOST_ONCE);
}
......@@ -130,7 +129,7 @@ public final class MqttServer {
* @param qos MqttQoS
* @return 是否发送成功
*/
public boolean publish(String clientId, String topic, ByteBuffer payload, MqttQoS qos) {
public boolean publish(String clientId, String topic, byte[] payload, MqttQoS qos) {
return publish(clientId, topic, payload, qos, false);
}
......@@ -143,7 +142,7 @@ public final class MqttServer {
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public boolean publish(String clientId, String topic, ByteBuffer payload, boolean retain) {
public boolean publish(String clientId, String topic, byte[] payload, boolean retain) {
return publish(clientId, topic, payload, MqttQoS.AT_MOST_ONCE, retain);
}
......@@ -157,7 +156,7 @@ public final class MqttServer {
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public boolean publish(String clientId, String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
public boolean publish(String clientId, String topic, byte[] payload, MqttQoS qos, boolean retain) {
// 校验 topic
TopicUtil.validateTopicName(topic);
// 获取 context
......@@ -186,15 +185,9 @@ public final class MqttServer {
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
private boolean publish(ChannelContext context, String clientId, String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
private boolean publish(ChannelContext context, String clientId, String topic, byte[] payload, MqttQoS qos, boolean retain) {
boolean isHighLevelQoS = MqttQoS.AT_LEAST_ONCE == qos || MqttQoS.EXACTLY_ONCE == qos;
int messageId = isHighLevelQoS ? sessionManager.getMessageId(clientId) : -1;
// 下行 payload 为空时,构造一个空结构体
if (payload == null) {
payload = ByteBuffer.allocate(0);
} else {
payload.rewind();
}
if (retain) {
this.saveRetainMessage(topic, qos, payload);
}
......@@ -226,17 +219,6 @@ public final class MqttServer {
return publishAll(topic, payload, MqttQoS.AT_MOST_ONCE);
}
/**
* 发布消息给所以的在线设备
*
* @param topic topic
* @param payload 消息体
* @return 是否发送成功
*/
public boolean publishAll(String topic, ByteBuffer payload) {
return publishAll(topic, payload, MqttQoS.AT_MOST_ONCE);
}
/**
* 发布消息
*
......@@ -249,18 +231,6 @@ public final class MqttServer {
return publishAll(topic, payload, qos, false);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @return 是否发送成功
*/
public boolean publishAll(String topic, ByteBuffer payload, MqttQoS qos) {
return publishAll(topic, payload, qos, false);
}
/**
* 发布消息给所以的在线设备
*
......@@ -273,18 +243,6 @@ public final class MqttServer {
return publishAll(topic, payload, MqttQoS.AT_MOST_ONCE, retain);
}
/**
* 发布消息给所以的在线设备
*
* @param topic topic
* @param payload 消息体
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public boolean publishAll(String topic, ByteBuffer payload, boolean retain) {
return publishAll(topic, payload, MqttQoS.AT_MOST_ONCE, retain);
}
/**
* 发布消息给所以的在线设备
*
......@@ -295,19 +253,6 @@ public final class MqttServer {
* @return 是否发送成功
*/
public boolean publishAll(String topic, byte[] payload, MqttQoS qos, boolean retain) {
return publishAll(topic, payload == null ? null : ByteBuffer.wrap(payload), qos, retain);
}
/**
* 发布消息给所以的在线设备
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public boolean publishAll(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
// 校验 topic
TopicUtil.validateTopicName(topic);
// 查找订阅该 topic 的客户端
......@@ -316,10 +261,6 @@ public final class MqttServer {
logger.debug("Mqtt Topic:{} publishAll but subscribe client list is empty.", topic);
return false;
}
// 下行 payload 为空时,构造一个空结构体
if (payload == null) {
payload = ByteBuffer.allocate(0);
}
if (retain) {
this.saveRetainMessage(topic, qos, payload);
}
......@@ -362,7 +303,7 @@ public final class MqttServer {
* @param mqttQoS MqttQoS
* @param payload ByteBuffer
*/
private void saveRetainMessage(String topic, MqttQoS mqttQoS, ByteBuffer payload) {
private void saveRetainMessage(String topic, MqttQoS mqttQoS, byte[] payload) {
Message retainMessage = new Message();
retainMessage.setTopic(topic);
retainMessage.setQos(mqttQoS.value());
......
......@@ -37,7 +37,6 @@ import org.tio.http.common.Method;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.json.JsonUtil;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Function;
......@@ -131,7 +130,7 @@ public class MqttHttpApi {
message.setRetain(form.isRetain());
// payload 解码
if (StrUtil.isNotBlank(payload)) {
message.setPayload(ByteBuffer.wrap(PayloadEncode.decode(payload, form.getEncoding())));
message.setPayload(PayloadEncode.decode(payload, form.getEncoding()));
}
messageDispatcher.send(message);
}
......
......@@ -19,7 +19,7 @@ package net.dreamlu.iot.mqtt.core.server.model;
import net.dreamlu.iot.mqtt.core.server.enums.MessageType;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
/**
......@@ -76,7 +76,7 @@ public class Message implements Serializable {
/**
* 消息内容
*/
private ByteBuffer payload;
private byte[] payload;
/**
* 客户端的 IPAddress
*/
......@@ -178,11 +178,11 @@ public class Message implements Serializable {
this.retain = retain;
}
public ByteBuffer getPayload() {
public byte[] getPayload() {
return payload;
}
public void setPayload(ByteBuffer payload) {
public void setPayload(byte[] payload) {
this.payload = payload;
}
......@@ -219,12 +219,12 @@ public class Message implements Serializable {
return false;
}
Message message = (Message) o;
return dup == message.dup && qos == message.qos && retain == message.retain && timestamp == message.timestamp && Objects.equals(node, message.node) && Objects.equals(id, message.id) && Objects.equals(fromClientId, message.fromClientId) && Objects.equals(fromUsername, message.fromUsername) && Objects.equals(clientId, message.clientId) && Objects.equals(username, message.username) && Objects.equals(topic, message.topic) && messageType == message.messageType && Objects.equals(payload, message.payload) && Objects.equals(peerHost, message.peerHost) && Objects.equals(publishReceivedAt, message.publishReceivedAt);
return dup == message.dup && qos == message.qos && retain == message.retain && timestamp == message.timestamp && Objects.equals(node, message.node) && Objects.equals(id, message.id) && Objects.equals(fromClientId, message.fromClientId) && Objects.equals(fromUsername, message.fromUsername) && Objects.equals(clientId, message.clientId) && Objects.equals(username, message.username) && Objects.equals(topic, message.topic) && messageType == message.messageType && Arrays.equals(payload, message.payload) && Objects.equals(peerHost, message.peerHost) && Objects.equals(publishReceivedAt, message.publishReceivedAt);
}
@Override
public int hashCode() {
return Objects.hash(node, id, fromClientId, fromUsername, clientId, username, topic, messageType, dup, qos, retain, payload, peerHost, timestamp, publishReceivedAt);
return Objects.hash(node, id, fromClientId, fromUsername, clientId, username, topic, messageType, dup, qos, retain, Arrays.hashCode(payload), peerHost, timestamp, publishReceivedAt);
}
@Override
......
......@@ -323,11 +323,9 @@ public enum DefaultMessageSerializer implements IMessageSerializer {
protocolLength += topicBytes.length;
}
// 消息内容
ByteBuffer payload = message.getPayload();
byte[] payloadBytes = null;
byte[] payload = message.getPayload();
if (payload != null) {
payloadBytes = payload.array();
protocolLength += payloadBytes.length;
protocolLength += payload.length;
}
// 客户端的 IPAddress
String peerHost = message.getPeerHost();
......@@ -406,9 +404,9 @@ public enum DefaultMessageSerializer implements IMessageSerializer {
}
buffer.put((byte) byte1);
// 消息内容
if (payloadBytes != null) {
buffer.putInt(payloadBytes.length);
buffer.put(payloadBytes);
if (payload != null) {
buffer.putInt(payload.length);
buffer.put(payload);
} else {
buffer.put(EMPTY_INT_BYTES);
}
......@@ -463,11 +461,9 @@ public enum DefaultMessageSerializer implements IMessageSerializer {
protocolLength += topicBytes.length;
}
// 消息内容
ByteBuffer payload = message.getPayload();
byte[] payloadBytes = null;
byte[] payload = message.getPayload();
if (payload != null) {
payloadBytes = payload.array();
protocolLength += payloadBytes.length;
protocolLength += payload.length;
}
// 客户端的 IPAddress
String peerHost = message.getPeerHost();
......@@ -525,9 +521,9 @@ public enum DefaultMessageSerializer implements IMessageSerializer {
}
buffer.put((byte) byte1);
// 消息内容
if (payloadBytes != null) {
buffer.putInt(payloadBytes.length);
buffer.put(payloadBytes);
if (payload != null) {
buffer.putInt(payload.length);
buffer.put(payload);
} else {
buffer.put(EMPTY_INT_BYTES);
}
......@@ -748,7 +744,7 @@ public enum DefaultMessageSerializer implements IMessageSerializer {
if (payloadLen > 0) {
byte[] payloadBytes = new byte[payloadLen];
buffer.get(payloadBytes);
message.setPayload(ByteBuffer.wrap(payloadBytes));
message.setPayload(payloadBytes);
}
// 客户端的 peerHost IPAddress
byte peerHostLen = buffer.get();
......@@ -817,7 +813,7 @@ public enum DefaultMessageSerializer implements IMessageSerializer {
if (payloadLen > 0) {
byte[] payloadBytes = new byte[payloadLen];
buffer.get(payloadBytes);
message.setPayload(ByteBuffer.wrap(payloadBytes));
message.setPayload(payloadBytes);
}
// 客户端的 peerHost IPAddress
byte peerHostLen = buffer.get();
......
......@@ -43,7 +43,6 @@ import org.tio.core.Tio;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.timer.TimerTaskService;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
......@@ -158,7 +157,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
willMessage.setTopic(payload.willTopic());
byte[] willMessageInBytes = payload.willMessageInBytes();
if (willMessageInBytes != null) {
willMessage.setPayload(ByteBuffer.wrap(willMessageInBytes));
willMessage.setPayload(willMessageInBytes);
}
willMessage.setQos(variableHeader.willQos());
willMessage.setRetain(variableHeader.isWillRetain());
......@@ -270,7 +269,6 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
}
pendingPublish.onPubAckReceived();
sessionManager.removePendingPublish(clientId, messageId);
pendingPublish.getPayload().clear();
}
@Override
......@@ -320,7 +318,6 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
logger.debug("PubComp - clientId:{}, messageId:{}", clientId, messageId);
MqttPendingPublish pendingPublish = sessionManager.getPendingPublish(clientId, messageId);
if (pendingPublish != null) {
pendingPublish.getPayload().clear();
pendingPublish.onPubCompReceived();
sessionManager.removePendingPublish(clientId, messageId);
}
......@@ -457,11 +454,11 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
String topicName, MqttPublishMessage publishMessage) {
MqttFixedHeader fixedHeader = publishMessage.fixedHeader();
boolean isRetain = fixedHeader.isRetain();
ByteBuffer payload = publishMessage.payload();
byte[] payload = publishMessage.payload();
// 1. retain 消息逻辑
if (isRetain) {
// qos == 0 or payload is none,then clear previous retain message
if (MqttQoS.AT_MOST_ONCE == mqttQoS || payload == null || payload.array().length == 0) {
if (MqttQoS.AT_MOST_ONCE == mqttQoS || payload == null || payload.length == 0) {
this.messageStore.clearRetainMessage(topicName);
} else {
Message retainMessage = new Message();
......
......@@ -27,7 +27,6 @@ import org.tio.client.ClientChannelContext;
import org.tio.client.TioClient;
import org.tio.client.TioClientConfig;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Consumer;
......@@ -220,17 +219,6 @@ public class MqttClientKit {
return client.unSubscribe(topicFilters);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @return 是否发送成功
*/
public static boolean publish(String topic, ByteBuffer payload) {
return client.publish(topic, payload);
}
/**
* 发布消息
*
......@@ -242,18 +230,6 @@ public class MqttClientKit {
return client.publish(topic, payload);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @return 是否发送成功
*/
public static boolean publish(String topic, ByteBuffer payload, MqttQoS qos) {
return client.publish(topic, payload, qos);
}
/**
* 发布消息
*
......@@ -266,18 +242,6 @@ public class MqttClientKit {
return client.publish(topic, payload, qos);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public static boolean publish(String topic, ByteBuffer payload, boolean retain) {
return client.publish(topic, payload, retain);
}
/**
* 发布消息
*
......@@ -303,33 +267,6 @@ public class MqttClientKit {
return client.publish(topic, payload, qos, retain);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public static boolean publish(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
return client.publish(topic, payload, qos, retain);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @param retain 是否在服务器上保留消息
* @param properties MqttProperties
* @return 是否发送成功
*/
public static boolean publish(String topic, ByteBuffer payload, MqttQoS qos, boolean retain, MqttProperties properties) {
return client.publish(topic, payload, qos, retain, properties);
}
/**
* 发布消息
*
......@@ -339,7 +276,7 @@ public class MqttClientKit {
* @param builder PublishBuilder
* @return 是否发送成功
*/
public static boolean publish(String topic, ByteBuffer payload, MqttQoS qos, Consumer<MqttMessageBuilders.PublishBuilder> builder) {
public static boolean publish(String topic, byte[] payload, MqttQoS qos, Consumer<MqttMessageBuilders.PublishBuilder> builder) {
return client.publish(topic, payload, qos, builder);
}
......
......@@ -20,8 +20,6 @@ import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import org.tio.core.ChannelContext;
import java.nio.ByteBuffer;
/**
* mica mqtt server kit
*
......@@ -46,7 +44,7 @@ public class MqttServerKit {
* @param payload 消息体
* @return 是否发送成功
*/
public static boolean publish(String clientId, String topic, ByteBuffer payload) {
public static boolean publish(String clientId, String topic, byte[] payload) {
return mqttServer.publish(clientId, topic, payload);
}
......@@ -59,7 +57,7 @@ public class MqttServerKit {
* @param qos MqttQoS
* @return 是否发送成功
*/
public static boolean publish(String clientId, String topic, ByteBuffer payload, MqttQoS qos) {
public static boolean publish(String clientId, String topic, byte[] payload, MqttQoS qos) {
return mqttServer.publish(clientId, topic, payload, qos);
}
......@@ -72,7 +70,7 @@ public class MqttServerKit {
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public static boolean publish(String clientId, String topic, ByteBuffer payload, boolean retain) {
public static boolean publish(String clientId, String topic, byte[] payload, boolean retain) {
return mqttServer.publish(clientId, topic, payload, retain);
}
......@@ -86,58 +84,10 @@ public class MqttServerKit {
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public static boolean publish(String clientId, String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
public static boolean publish(String clientId, String topic, byte[] payload, MqttQoS qos, boolean retain) {
return mqttServer.publish(clientId, topic, payload, qos, retain);
}
/**
* 发布消息给所以的在线设备
*
* @param topic topic
* @param payload 消息体
* @return 是否发送成功
*/
public static boolean publishAll(String topic, ByteBuffer payload) {
return mqttServer.publishAll(topic, payload);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @return 是否发送成功
*/
public static boolean publishAll(String topic, ByteBuffer payload, MqttQoS qos) {
return mqttServer.publishAll(topic, payload, qos);
}
/**
* 发布消息给所以的在线设备
*
* @param topic topic
* @param payload 消息体
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public static boolean publishAll(String topic, ByteBuffer payload, boolean retain) {
return mqttServer.publishAll(topic, payload, retain);
}
/**
* 发布消息给所以的在线设备
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public static boolean publishAll(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
return mqttServer.publishAll(topic, payload, qos, retain);
}
/**
* 发布消息给所以的在线设备
*
......
......@@ -26,7 +26,6 @@ import org.tio.client.ClientChannelContext;
import org.tio.client.TioClient;
import org.tio.client.TioClientConfig;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
......@@ -180,54 +179,6 @@ public class MqttClientTemplate implements SmartInitializingSingleton, Disposabl
return client.publish(topic, payload, qos, retain);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @return 是否发送成功
*/
public boolean publish(String topic, ByteBuffer payload) {
return client.publish(topic, payload, MqttQoS.AT_MOST_ONCE);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @return 是否发送成功
*/
public boolean publish(String topic, ByteBuffer payload, MqttQoS qos) {
return client.publish(topic, payload, qos, false);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public boolean publish(String topic, ByteBuffer payload, boolean retain) {
return client.publish(topic, payload, MqttQoS.AT_MOST_ONCE, retain);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public boolean publish(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
return client.publish(topic, payload, qos, retain);
}
/**
* 重连
*/
......
......@@ -21,8 +21,6 @@ import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import org.tio.core.ChannelContext;
import java.nio.ByteBuffer;
/**
* mqtt Server 模板
*
......@@ -40,7 +38,7 @@ public class MqttServerTemplate {
* @param payload 消息体
* @return 是否发送成功
*/
public boolean publish(String clientId, String topic, ByteBuffer payload) {
public boolean publish(String clientId, String topic, byte[] payload) {
return mqttServer.publish(clientId, topic, payload);
}
......@@ -53,7 +51,7 @@ public class MqttServerTemplate {
* @param qos MqttQoS
* @return 是否发送成功
*/
public boolean publish(String clientId, String topic, ByteBuffer payload, MqttQoS qos) {
public boolean publish(String clientId, String topic, byte[] payload, MqttQoS qos) {
return mqttServer.publish(clientId, topic, payload, qos);
}
......@@ -66,7 +64,7 @@ public class MqttServerTemplate {
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public boolean publish(String clientId, String topic, ByteBuffer payload, boolean retain) {
public boolean publish(String clientId, String topic, byte[] payload, boolean retain) {
return mqttServer.publish(clientId, topic, payload, retain);
}
......@@ -80,7 +78,7 @@ public class MqttServerTemplate {
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public boolean publish(String clientId, String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
public boolean publish(String clientId, String topic, byte[] payload, MqttQoS qos, boolean retain) {
return mqttServer.publish(clientId, topic, payload, qos, retain);
}
......@@ -95,17 +93,6 @@ public class MqttServerTemplate {
return mqttServer.publishAll(topic, payload);
}
/**
* 发布消息给所以的在线设备
*
* @param topic topic
* @param payload 消息体
* @return 是否发送成功
*/
public boolean publishAll(String topic, ByteBuffer payload) {
return mqttServer.publishAll(topic, payload);
}
/**
* 发布消息
*
......@@ -118,18 +105,6 @@ public class MqttServerTemplate {
return mqttServer.publishAll(topic, payload, qos);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @return 是否发送成功
*/
public boolean publishAll(String topic, ByteBuffer payload, MqttQoS qos) {
return mqttServer.publishAll(topic, payload, qos);
}
/**
* 发布消息给所以的在线设备
*
......@@ -142,18 +117,6 @@ public class MqttServerTemplate {
return mqttServer.publishAll(topic, payload, retain);
}
/**
* 发布消息给所以的在线设备
*
* @param topic topic
* @param payload 消息体
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public boolean publishAll(String topic, ByteBuffer payload, boolean retain) {
return mqttServer.publishAll(topic, payload, retain);
}
/**
* 发布消息给所以的在线设备
*
......@@ -167,19 +130,6 @@ public class MqttServerTemplate {
return mqttServer.publishAll(topic, payload, qos, retain);
}
/**
* 发布消息给所以的在线设备
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public boolean publishAll(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
return mqttServer.publishAll(topic, payload, qos, retain);
}
/**
* 获取 ChannelContext
*
......
......@@ -24,7 +24,8 @@ import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import org.springframework.context.ApplicationEventPublisher;
import org.tio.core.ChannelContext;
import org.tio.utils.buffer.ByteBufferUtil;
import java.nio.charset.StandardCharsets;
/**
* 使用 Spring event 解耦消息监听
......@@ -39,7 +40,7 @@ public class SpringEventMqttMessageListener implements IMqttMessageListener {
@Override
public void onMessage(ChannelContext context, String clientId, String topic, MqttQoS qoS, MqttPublishMessage publishMessage, Message message) {
if (log.isDebugEnabled()) {
log.debug("mqtt server receive message clientId:{} message:{} payload:{}", clientId, message, ByteBufferUtil.toString(message.getPayload()));
log.debug("mqtt server receive message clientId:{} message:{} payload:{}", clientId, message, new String(message.getPayload(), StandardCharsets.UTF_8));
}
eventPublisher.publishEvent(message);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册