diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientStore.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientStore.java index 4a9abe8d92e20fbcd91927074f82f271b3b74fff..06b8e834176900b95e05a896318f18008caa4344 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientStore.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientStore.java @@ -29,7 +29,6 @@ import java.util.*; * @author L.cm */ final class MqttClientStore { - /** * 订阅的数据承载 */ @@ -39,23 +38,23 @@ final class MqttClientStore { private final Map pendingPublishData = new LinkedHashMap<>(); private final Map pendingQos2PublishData = new LinkedHashMap<>(); - void addPaddingSubscribe(int messageId, MqttPendingSubscription pendingSubscription) { + protected void addPaddingSubscribe(int messageId, MqttPendingSubscription pendingSubscription) { pendingSubscriptions.put(messageId, pendingSubscription); } - MqttPendingSubscription getPaddingSubscribe(int messageId) { + protected MqttPendingSubscription getPaddingSubscribe(int messageId) { return pendingSubscriptions.get(messageId); } - MqttPendingSubscription removePaddingSubscribe(int messageId) { + protected MqttPendingSubscription removePaddingSubscribe(int messageId) { return pendingSubscriptions.remove(messageId); } - void addSubscription(MqttSubscription subscription) { + protected void addSubscription(MqttSubscription subscription) { subscriptions.add(subscription.getTopicFilter(), subscription); } - List getAndCleanSubscription() { + protected List getAndCleanSubscription() { List subscriptionList = new ArrayList<>(); for (List mqttSubscriptions : subscriptions.values()) { subscriptionList.addAll(mqttSubscriptions); @@ -65,7 +64,7 @@ final class MqttClientStore { return data; } - List getMatchedSubscription(String topicName) { + protected List getMatchedSubscription(String topicName) { List subscriptionList = new ArrayList<>(); for (List mqttSubscriptions : subscriptions.values()) { for (MqttSubscription subscription : mqttSubscriptions) { @@ -77,43 +76,43 @@ final class MqttClientStore { return Collections.unmodifiableList(subscriptionList); } - void removeSubscriptions(String topicFilter) { + protected void removeSubscriptions(String topicFilter) { subscriptions.remove(topicFilter); } - void addPaddingUnSubscribe(int messageId, MqttPendingUnSubscription pendingUnSubscription) { + protected void addPaddingUnSubscribe(int messageId, MqttPendingUnSubscription pendingUnSubscription) { pendingUnSubscriptions.put(messageId, pendingUnSubscription); } - MqttPendingUnSubscription getPaddingUnSubscribe(int messageId) { + protected MqttPendingUnSubscription getPaddingUnSubscribe(int messageId) { return pendingUnSubscriptions.get(messageId); } - MqttPendingUnSubscription removePaddingUnSubscribe(int messageId) { + protected MqttPendingUnSubscription removePaddingUnSubscribe(int messageId) { return pendingUnSubscriptions.remove(messageId); } - void addPendingPublish(int messageId, MqttPendingPublish pendingPublish) { + protected void addPendingPublish(int messageId, MqttPendingPublish pendingPublish) { pendingPublishData.put(messageId, pendingPublish); } - MqttPendingPublish getPendingPublish(int messageId) { + protected MqttPendingPublish getPendingPublish(int messageId) { return pendingPublishData.get(messageId); } - MqttPendingPublish removePendingPublish(int messageId) { + protected MqttPendingPublish removePendingPublish(int messageId) { return pendingPublishData.remove(messageId); } - void addPendingQos2Publish(int messageId, MqttPendingQos2Publish pendingQos2Publish) { + protected void addPendingQos2Publish(int messageId, MqttPendingQos2Publish pendingQos2Publish) { pendingQos2PublishData.put(messageId, pendingQos2Publish); } - MqttPendingQos2Publish getPendingQos2Publish(int messageId) { + protected MqttPendingQos2Publish getPendingQos2Publish(int messageId) { return pendingQos2PublishData.get(messageId); } - MqttPendingQos2Publish removePendingQos2Publish(int messageId) { + protected MqttPendingQos2Publish removePendingQos2Publish(int messageId) { return pendingQos2PublishData.remove(messageId); } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttPendingSubscription.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttPendingSubscription.java index 1ed24c690a254ba00f702e1d9e2b08bc1d133235..ee02f3d3d8a0100468c7c579840d3d17f27511fd 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttPendingSubscription.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttPendingSubscription.java @@ -30,15 +30,15 @@ final class MqttPendingSubscription { this.retryProcessor.setOriginalMessage(message); } - MqttQoS getMqttQoS() { + protected MqttQoS getMqttQoS() { return mqttQoS; } - String getTopicFilter() { + protected String getTopicFilter() { return topicFilter; } - MqttMessageListener getListener() { + protected MqttMessageListener getListener() { return listener; } @@ -46,13 +46,13 @@ final class MqttPendingSubscription { return new MqttSubscription(getMqttQoS(), getTopicFilter(), getListener()); } - void startRetransmitTimer(ScheduledThreadPoolExecutor executor, Consumer sendPacket) { + protected void startRetransmitTimer(ScheduledThreadPoolExecutor executor, Consumer sendPacket) { this.retryProcessor.setHandle((fixedHeader, originalMessage) -> sendPacket.accept(new MqttSubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload()))); this.retryProcessor.start(executor); } - void onSubAckReceived() { + protected void onSubAckReceived() { this.retryProcessor.stop(); } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttPendingUnSubscription.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttPendingUnSubscription.java index b802cc65f6384348d79a5712e4d4a48708cd8b9b..ce4065f4c3fb487729a31c22f23d19a66a0599b6 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttPendingUnSubscription.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttPendingUnSubscription.java @@ -11,7 +11,6 @@ import java.util.function.Consumer; * MqttPendingSubscription,参考于 netty-mqtt-client */ final class MqttPendingUnSubscription { - private final String topic; private final RetryProcessor retryProcessor = new RetryProcessor<>(); @@ -20,17 +19,17 @@ final class MqttPendingUnSubscription { this.retryProcessor.setOriginalMessage(unSubscribeMessage); } - String getTopic() { + protected String getTopic() { return topic; } - void startRetransmissionTimer(ScheduledThreadPoolExecutor executor, Consumer sendPacket) { + protected void startRetransmissionTimer(ScheduledThreadPoolExecutor executor, Consumer sendPacket) { this.retryProcessor.setHandle((fixedHeader, originalMessage) -> sendPacket.accept(new MqttUnsubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload()))); this.retryProcessor.start(executor); } - void onUnSubAckReceived() { + protected void onUnSubAckReceived() { this.retryProcessor.stop(); }