提交 9972871f 编写于 作者: 如梦技术's avatar 如梦技术 🐛

完善 mica-mqtt

上级 96417dbf
...@@ -29,7 +29,6 @@ import java.util.*; ...@@ -29,7 +29,6 @@ import java.util.*;
* @author L.cm * @author L.cm
*/ */
final class MqttClientStore { final class MqttClientStore {
/** /**
* 订阅的数据承载 * 订阅的数据承载
*/ */
...@@ -39,23 +38,23 @@ final class MqttClientStore { ...@@ -39,23 +38,23 @@ final class MqttClientStore {
private final Map<Integer, MqttPendingPublish> pendingPublishData = new LinkedHashMap<>(); private final Map<Integer, MqttPendingPublish> pendingPublishData = new LinkedHashMap<>();
private final Map<Integer, MqttPendingQos2Publish> pendingQos2PublishData = new LinkedHashMap<>(); private final Map<Integer, MqttPendingQos2Publish> pendingQos2PublishData = new LinkedHashMap<>();
void addPaddingSubscribe(int messageId, MqttPendingSubscription pendingSubscription) { protected void addPaddingSubscribe(int messageId, MqttPendingSubscription pendingSubscription) {
pendingSubscriptions.put(messageId, pendingSubscription); pendingSubscriptions.put(messageId, pendingSubscription);
} }
MqttPendingSubscription getPaddingSubscribe(int messageId) { protected MqttPendingSubscription getPaddingSubscribe(int messageId) {
return pendingSubscriptions.get(messageId); return pendingSubscriptions.get(messageId);
} }
MqttPendingSubscription removePaddingSubscribe(int messageId) { protected MqttPendingSubscription removePaddingSubscribe(int messageId) {
return pendingSubscriptions.remove(messageId); return pendingSubscriptions.remove(messageId);
} }
void addSubscription(MqttSubscription subscription) { protected void addSubscription(MqttSubscription subscription) {
subscriptions.add(subscription.getTopicFilter(), subscription); subscriptions.add(subscription.getTopicFilter(), subscription);
} }
List<MqttSubscription> getAndCleanSubscription() { protected List<MqttSubscription> getAndCleanSubscription() {
List<MqttSubscription> subscriptionList = new ArrayList<>(); List<MqttSubscription> subscriptionList = new ArrayList<>();
for (List<MqttSubscription> mqttSubscriptions : subscriptions.values()) { for (List<MqttSubscription> mqttSubscriptions : subscriptions.values()) {
subscriptionList.addAll(mqttSubscriptions); subscriptionList.addAll(mqttSubscriptions);
...@@ -65,7 +64,7 @@ final class MqttClientStore { ...@@ -65,7 +64,7 @@ final class MqttClientStore {
return data; return data;
} }
List<MqttSubscription> getMatchedSubscription(String topicName) { protected List<MqttSubscription> getMatchedSubscription(String topicName) {
List<MqttSubscription> subscriptionList = new ArrayList<>(); List<MqttSubscription> subscriptionList = new ArrayList<>();
for (List<MqttSubscription> mqttSubscriptions : subscriptions.values()) { for (List<MqttSubscription> mqttSubscriptions : subscriptions.values()) {
for (MqttSubscription subscription : mqttSubscriptions) { for (MqttSubscription subscription : mqttSubscriptions) {
...@@ -77,43 +76,43 @@ final class MqttClientStore { ...@@ -77,43 +76,43 @@ final class MqttClientStore {
return Collections.unmodifiableList(subscriptionList); return Collections.unmodifiableList(subscriptionList);
} }
void removeSubscriptions(String topicFilter) { protected void removeSubscriptions(String topicFilter) {
subscriptions.remove(topicFilter); subscriptions.remove(topicFilter);
} }
void addPaddingUnSubscribe(int messageId, MqttPendingUnSubscription pendingUnSubscription) { protected void addPaddingUnSubscribe(int messageId, MqttPendingUnSubscription pendingUnSubscription) {
pendingUnSubscriptions.put(messageId, pendingUnSubscription); pendingUnSubscriptions.put(messageId, pendingUnSubscription);
} }
MqttPendingUnSubscription getPaddingUnSubscribe(int messageId) { protected MqttPendingUnSubscription getPaddingUnSubscribe(int messageId) {
return pendingUnSubscriptions.get(messageId); return pendingUnSubscriptions.get(messageId);
} }
MqttPendingUnSubscription removePaddingUnSubscribe(int messageId) { protected MqttPendingUnSubscription removePaddingUnSubscribe(int messageId) {
return pendingUnSubscriptions.remove(messageId); return pendingUnSubscriptions.remove(messageId);
} }
void addPendingPublish(int messageId, MqttPendingPublish pendingPublish) { protected void addPendingPublish(int messageId, MqttPendingPublish pendingPublish) {
pendingPublishData.put(messageId, pendingPublish); pendingPublishData.put(messageId, pendingPublish);
} }
MqttPendingPublish getPendingPublish(int messageId) { protected MqttPendingPublish getPendingPublish(int messageId) {
return pendingPublishData.get(messageId); return pendingPublishData.get(messageId);
} }
MqttPendingPublish removePendingPublish(int messageId) { protected MqttPendingPublish removePendingPublish(int messageId) {
return pendingPublishData.remove(messageId); return pendingPublishData.remove(messageId);
} }
void addPendingQos2Publish(int messageId, MqttPendingQos2Publish pendingQos2Publish) { protected void addPendingQos2Publish(int messageId, MqttPendingQos2Publish pendingQos2Publish) {
pendingQos2PublishData.put(messageId, pendingQos2Publish); pendingQos2PublishData.put(messageId, pendingQos2Publish);
} }
MqttPendingQos2Publish getPendingQos2Publish(int messageId) { protected MqttPendingQos2Publish getPendingQos2Publish(int messageId) {
return pendingQos2PublishData.get(messageId); return pendingQos2PublishData.get(messageId);
} }
MqttPendingQos2Publish removePendingQos2Publish(int messageId) { protected MqttPendingQos2Publish removePendingQos2Publish(int messageId) {
return pendingQos2PublishData.remove(messageId); return pendingQos2PublishData.remove(messageId);
} }
......
...@@ -30,15 +30,15 @@ final class MqttPendingSubscription { ...@@ -30,15 +30,15 @@ final class MqttPendingSubscription {
this.retryProcessor.setOriginalMessage(message); this.retryProcessor.setOriginalMessage(message);
} }
MqttQoS getMqttQoS() { protected MqttQoS getMqttQoS() {
return mqttQoS; return mqttQoS;
} }
String getTopicFilter() { protected String getTopicFilter() {
return topicFilter; return topicFilter;
} }
MqttMessageListener getListener() { protected MqttMessageListener getListener() {
return listener; return listener;
} }
...@@ -46,13 +46,13 @@ final class MqttPendingSubscription { ...@@ -46,13 +46,13 @@ final class MqttPendingSubscription {
return new MqttSubscription(getMqttQoS(), getTopicFilter(), getListener()); return new MqttSubscription(getMqttQoS(), getTopicFilter(), getListener());
} }
void startRetransmitTimer(ScheduledThreadPoolExecutor executor, Consumer<MqttMessage> sendPacket) { protected void startRetransmitTimer(ScheduledThreadPoolExecutor executor, Consumer<MqttMessage> sendPacket) {
this.retryProcessor.setHandle((fixedHeader, originalMessage) -> this.retryProcessor.setHandle((fixedHeader, originalMessage) ->
sendPacket.accept(new MqttSubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload()))); sendPacket.accept(new MqttSubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload())));
this.retryProcessor.start(executor); this.retryProcessor.start(executor);
} }
void onSubAckReceived() { protected void onSubAckReceived() {
this.retryProcessor.stop(); this.retryProcessor.stop();
} }
......
...@@ -11,7 +11,6 @@ import java.util.function.Consumer; ...@@ -11,7 +11,6 @@ import java.util.function.Consumer;
* MqttPendingSubscription,参考于 netty-mqtt-client * MqttPendingSubscription,参考于 netty-mqtt-client
*/ */
final class MqttPendingUnSubscription { final class MqttPendingUnSubscription {
private final String topic; private final String topic;
private final RetryProcessor<MqttUnsubscribeMessage> retryProcessor = new RetryProcessor<>(); private final RetryProcessor<MqttUnsubscribeMessage> retryProcessor = new RetryProcessor<>();
...@@ -20,17 +19,17 @@ final class MqttPendingUnSubscription { ...@@ -20,17 +19,17 @@ final class MqttPendingUnSubscription {
this.retryProcessor.setOriginalMessage(unSubscribeMessage); this.retryProcessor.setOriginalMessage(unSubscribeMessage);
} }
String getTopic() { protected String getTopic() {
return topic; return topic;
} }
void startRetransmissionTimer(ScheduledThreadPoolExecutor executor, Consumer<MqttMessage> sendPacket) { protected void startRetransmissionTimer(ScheduledThreadPoolExecutor executor, Consumer<MqttMessage> sendPacket) {
this.retryProcessor.setHandle((fixedHeader, originalMessage) -> this.retryProcessor.setHandle((fixedHeader, originalMessage) ->
sendPacket.accept(new MqttUnsubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload()))); sendPacket.accept(new MqttUnsubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload())));
this.retryProcessor.start(executor); this.retryProcessor.start(executor);
} }
void onUnSubAckReceived() { protected void onUnSubAckReceived() {
this.retryProcessor.stop(); this.retryProcessor.stop();
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册