提交 69097957 编写于 作者: D duhenglucky

Remove snode address in MessageExt clas

上级 3a4e8669
...@@ -182,6 +182,7 @@ public class SubscriptionData implements Comparable<SubscriptionData> { ...@@ -182,6 +182,7 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
"classFilterMode=" + classFilterMode + "classFilterMode=" + classFilterMode +
", topic='" + topic + '\'' + ", topic='" + topic + '\'' +
", subString='" + subString + '\'' + ", subString='" + subString + '\'' +
", messageQueueSet=" + messageQueueSet +
", tagsSet=" + tagsSet + ", tagsSet=" + tagsSet +
", codeSet=" + codeSet + ", codeSet=" + codeSet +
", subVersion=" + subVersion + ", subVersion=" + subVersion +
......
...@@ -44,6 +44,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { ...@@ -44,6 +44,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
@Override @Override
public void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel, public void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel,
String groupId) { String groupId) {
log.info("Register push session subscriptionDataSet: {}", subscriptionDataSet);
Set<MessageQueue> prevSubSet = this.clientSubscriptionTable.get(remotingChannel); Set<MessageQueue> prevSubSet = this.clientSubscriptionTable.get(remotingChannel);
Set<MessageQueue> keySet = new HashSet<>(); Set<MessageQueue> keySet = new HashSet<>();
for (SubscriptionData subscriptionData : subscriptionDataSet) { for (SubscriptionData subscriptionData : subscriptionDataSet) {
...@@ -64,6 +65,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { ...@@ -64,6 +65,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
if (keySet.size() > 0) { if (keySet.size() > 0) {
this.clientSubscriptionTable.putIfAbsent(remotingChannel, keySet); this.clientSubscriptionTable.putIfAbsent(remotingChannel, keySet);
} }
log.info("Register push session clientSubscriptionTable: {}", clientSubscriptionTable);
if (prevSubSet != null) { if (prevSubSet != null) {
for (MessageQueue messageQueue : prevSubSet) { for (MessageQueue messageQueue : prevSubSet) {
if (!keySet.contains(messageQueue)) { if (!keySet.contains(messageQueue)) {
...@@ -75,11 +77,13 @@ public class SubscriptionManagerImpl implements SubscriptionManager { ...@@ -75,11 +77,13 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
} }
} }
} }
log.info("Register push session clientSubscriptionTable: {}", clientSubscriptionTable);
} }
@Override @Override
public void removePushSession(RemotingChannel remotingChannel) { public void removePushSession(RemotingChannel remotingChannel) {
Set<MessageQueue> subSet = this.clientSubscriptionTable.get(remotingChannel); Set<MessageQueue> subSet = this.clientSubscriptionTable.get(remotingChannel);
log.info("subSet: {}", subSet);
if (subSet != null) { if (subSet != null) {
for (MessageQueue key : subSet) { for (MessageQueue key : subSet) {
Set clientSet = pushTable.get(key); Set clientSet = pushTable.get(key);
......
...@@ -88,11 +88,9 @@ public class PushServiceImpl implements PushService { ...@@ -88,11 +88,9 @@ public class PushServiceImpl implements PushService {
messageExt.setCommitLogOffset(sendMessageResponseHeader.getCommitLogOffset()); messageExt.setCommitLogOffset(sendMessageResponseHeader.getCommitLogOffset());
messageExt.setBornTimestamp(sendMessageRequestHeader.getBornTimestamp()); messageExt.setBornTimestamp(sendMessageRequestHeader.getBornTimestamp());
messageExt.setBornHost(sendMessageRequestHeader.getBornHost()); messageExt.setBornHost(sendMessageRequestHeader.getBornHost());
// messageExt.setStoreSize(sendMessageResponseHeader.getStoreSize());
messageExt.setStoreHost(RemotingUtil.string2SocketAddressWithIp(sendMessageResponseHeader.getStoreHost())); messageExt.setStoreHost(RemotingUtil.string2SocketAddressWithIp(sendMessageResponseHeader.getStoreHost()));
messageExt.setStoreTimestamp(sendMessageResponseHeader.getStoreTimestamp()); messageExt.setStoreTimestamp(sendMessageResponseHeader.getStoreTimestamp());
messageExt.setWaitStoreMsgOK(false); messageExt.setWaitStoreMsgOK(false);
messageExt.setSnodeAddress(sendMessageRequestHeader.getSnodeHost());
messageExt.setSysFlag(sendMessageRequestHeader.getSysFlag()); messageExt.setSysFlag(sendMessageRequestHeader.getSysFlag());
messageExt.setFlag(sendMessageRequestHeader.getFlag()); messageExt.setFlag(sendMessageRequestHeader.getFlag());
messageExt.setBody(message); messageExt.setBody(message);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册