diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java index be88ff3bf0e4b5cb0b8bf758ab44a6d01e32c142..4f32af068f9a81029bfeef44bbc18f6a59ecb71b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java @@ -182,6 +182,7 @@ public class SubscriptionData implements Comparable { "classFilterMode=" + classFilterMode + ", topic='" + topic + '\'' + ", subString='" + subString + '\'' + + ", messageQueueSet=" + messageQueueSet + ", tagsSet=" + tagsSet + ", codeSet=" + codeSet + ", subVersion=" + subVersion + diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java index be337dae6c8d566fb42e3086a709290e45acda86..22e245ac29c00888b68fa4ddc22a972f30fc1048 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java @@ -44,6 +44,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { @Override public void registerPushSession(Set subscriptionDataSet, RemotingChannel remotingChannel, String groupId) { + log.info("Register push session subscriptionDataSet: {}", subscriptionDataSet); Set prevSubSet = this.clientSubscriptionTable.get(remotingChannel); Set keySet = new HashSet<>(); for (SubscriptionData subscriptionData : subscriptionDataSet) { @@ -64,6 +65,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { if (keySet.size() > 0) { this.clientSubscriptionTable.putIfAbsent(remotingChannel, keySet); } + log.info("Register push session clientSubscriptionTable: {}", clientSubscriptionTable); if (prevSubSet != null) { for (MessageQueue messageQueue : prevSubSet) { if (!keySet.contains(messageQueue)) { @@ -75,11 +77,13 @@ public class SubscriptionManagerImpl implements SubscriptionManager { } } } + log.info("Register push session clientSubscriptionTable: {}", clientSubscriptionTable); } @Override public void removePushSession(RemotingChannel remotingChannel) { Set subSet = this.clientSubscriptionTable.get(remotingChannel); + log.info("subSet: {}", subSet); if (subSet != null) { for (MessageQueue key : subSet) { Set clientSet = pushTable.get(key); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java index 37bb9f6e6c5ddd21624b6155186defd064822f93..8a81c4cf8e9a07a9b7930a586c4e84cd385e890c 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java @@ -88,11 +88,9 @@ public class PushServiceImpl implements PushService { messageExt.setCommitLogOffset(sendMessageResponseHeader.getCommitLogOffset()); messageExt.setBornTimestamp(sendMessageRequestHeader.getBornTimestamp()); messageExt.setBornHost(sendMessageRequestHeader.getBornHost()); -// messageExt.setStoreSize(sendMessageResponseHeader.getStoreSize()); messageExt.setStoreHost(RemotingUtil.string2SocketAddressWithIp(sendMessageResponseHeader.getStoreHost())); messageExt.setStoreTimestamp(sendMessageResponseHeader.getStoreTimestamp()); messageExt.setWaitStoreMsgOK(false); - messageExt.setSnodeAddress(sendMessageRequestHeader.getSnodeHost()); messageExt.setSysFlag(sendMessageRequestHeader.getSysFlag()); messageExt.setFlag(sendMessageRequestHeader.getFlag()); messageExt.setBody(message);