提交 1769061e 编写于 作者: 如梦技术's avatar 如梦技术 🐛

统一接口参数

上级 1a74b996
......@@ -24,7 +24,7 @@ public class ClientService {
}
public boolean sub() {
client.subQos0("/test/#", (context, topic, message, payload) -> {
client.subQos0("/test/#", (topic, message, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
return true;
......
......@@ -54,7 +54,7 @@ public class MqttClientTest {
.clientId(clientId)
.connect();
client.subQos0("/sys/" + productKey + '/' + deviceName + "/thing/event/property/post_reply", (context, topic, message, payload) -> {
client.subQos0("/sys/" + productKey + '/' + deviceName + "/thing/event/property/post_reply", (topic, message, payload) -> {
System.out.println(topic + '\t' + ByteBufferUtil.toString(payload));
});
......
......@@ -38,7 +38,7 @@ public class DeviceA {
.password("123456")
.connect();
client.subQos0("/a/door/open", (context, topic, message, payload) -> {
client.subQos0("/a/door/open", (topic, message, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
}
......
......@@ -38,7 +38,7 @@ public class DeviceB {
.password("123456")
.connect();
client.subQos0("/a/door/open", (context, topic, message, payload) -> {
client.subQos0("/a/door/open", (topic, message, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
}
......
......@@ -42,7 +42,7 @@ public class MqttClientSyncTest {
// 同步连接,注意:连接会阻塞
.connectSync();
client.subQos0("/test/#", (context, topic, message, payload) -> {
client.subQos0("/test/#", (topic, message, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
......
......@@ -23,7 +23,6 @@ import net.dreamlu.iot.mqtt.core.client.IMqttClientMessageListener;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
......@@ -62,13 +61,13 @@ public class MqttClientTest {
client.subQos0("/test/123", new IMqttClientMessageListener() {
@Override
public void onSubscribed(ChannelContext context, String topicFilter, MqttQoS mqttQoS) {
public void onSubscribed(String topicFilter, MqttQoS mqttQoS) {
// 订阅成功之后触发,可在此处做一些业务逻辑
logger.info("topicFilter:{} MqttQoS:{} 订阅成功!!!", topicFilter, mqttQoS);
}
@Override
public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, ByteBuffer payload) {
public void onMessage(String topic, MqttPublishMessage message, ByteBuffer payload) {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
}
});
......
......@@ -57,7 +57,7 @@ public class MqttClientTest {
// 订阅命令下发topic
String cmdRequestTopic = "$oc/devices/" + deviceId + "/sys/commands/#";
client.subQos0(cmdRequestTopic, (context, topic, message, payload) -> {
client.subQos0(cmdRequestTopic, (topic, message, payload) -> {
System.out.println(topic + '\t' + ByteBufferUtil.toString(payload));
});
......
......@@ -61,7 +61,7 @@ MqttClient client = MqttClient.create()
.connect();
// 消息订阅,同类方法 subxxx
client.subQos0("/test/#", (topic, payload) -> {
client.subQos0("/test/#", (topic, message, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
// 取消订阅
......
......@@ -193,7 +193,7 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
IMqttClientMessageListener subscriptionListener = clientSubscription.getListener();
executor.execute(() -> {
try {
subscriptionListener.onSubscribed(context, topicFilter, mqttQoS, message);
subscriptionListener.onSubscribed(topicFilter, mqttQoS, message);
} catch (Throwable e) {
logger.error("MQTT topicFilter:{} subscribed onSubscribed event error.", subscribedList, e);
}
......@@ -340,7 +340,7 @@ public class DefaultMqttClientProcessor implements IMqttClientProcessor {
payload.rewind();
executor.submit(() -> {
try {
listener.onMessage(context, topicName, message, payload);
listener.onMessage(topicName, message, payload);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
......
......@@ -19,7 +19,6 @@ package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.codec.MqttSubAckMessage;
import org.tio.core.ChannelContext;
import java.nio.ByteBuffer;
......@@ -34,34 +33,31 @@ public interface IMqttClientMessageListener {
/**
* 订阅成功之后的事件
*
* @param context ChannelContext
* @param topicFilter topicFilter
* @param mqttQoS MqttQoS
* @param message MqttSubAckMessage
*/
default void onSubscribed(ChannelContext context, String topicFilter, MqttQoS mqttQoS, MqttSubAckMessage message) {
onSubscribed(context, topicFilter, mqttQoS);
default void onSubscribed(String topicFilter, MqttQoS mqttQoS, MqttSubAckMessage message) {
onSubscribed(topicFilter, mqttQoS);
}
/**
* 订阅成功之后的事件
*
* @param context ChannelContext
* @param topicFilter topicFilter
* @param mqttQoS MqttQoS
*/
default void onSubscribed(ChannelContext context, String topicFilter, MqttQoS mqttQoS) {
default void onSubscribed(String topicFilter, MqttQoS mqttQoS) {
}
/**
* 监听到消息
*
* @param context ChannelContext
* @param topic topic
* @param message MqttPublishMessage
* @param payload payload
*/
void onMessage(ChannelContext context, String topic, MqttPublishMessage message, ByteBuffer payload);
void onMessage(String topic, MqttPublishMessage message, ByteBuffer payload);
}
......@@ -74,7 +74,7 @@ public class MqttClientConnectListener implements IMqttClientConnectListener {
```java
public class TestMqttClientMessageListener implements IMqttClientMessageListener {
@Override
public void onMessage(String topic, MqttPublishMessage mqttPublishMessage, ByteBuffer byteBuffer) {
public void onMessage(String topic, MqttPublishMessage message, ByteBuffer byteBuffer) {
System.out.println("收到消息 topic:" + topic + "内容:\n" + ByteBufferUtil.toString(byteBuffer));
}
}
......
......@@ -165,7 +165,7 @@ public class MainService {
}
public boolean sub() {
client.subQos0("/test/#", (topic, payload) -> {
client.subQos0("/test/#", (topic, message, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
return true;
......
......@@ -81,7 +81,7 @@ public class MqttClientSubscribeDetector implements BeanPostProcessor {
// 4. 订阅
MqttClientTemplate clientTemplate = getMqttClientTemplate(applicationContext, subscribe.clientTemplateBean());
String[] topicFilters = getTopicFilters(applicationContext, subscribe.value());
clientTemplate.subscribe(topicFilters, subscribe.qos(), (context, topic, message, payload) ->
clientTemplate.subscribe(topicFilters, subscribe.qos(), (topic, message, payload) ->
ReflectionUtils.invokeMethod(method, bean, topic, payload)
);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册