提交 28a8d649 编写于 作者: D duhenglucky

Fix the push subscription not update issue

上级 84bbb8f3
...@@ -36,6 +36,8 @@ public class PushMessageHeader implements CommandCustomHeader { ...@@ -36,6 +36,8 @@ public class PushMessageHeader implements CommandCustomHeader {
@CFNotNull @CFNotNull
private String consumerGroup; private String consumerGroup;
private String enodeName;
@Override @Override
public void checkFields() throws RemotingCommandException { public void checkFields() throws RemotingCommandException {
...@@ -80,4 +82,23 @@ public class PushMessageHeader implements CommandCustomHeader { ...@@ -80,4 +82,23 @@ public class PushMessageHeader implements CommandCustomHeader {
public void setConsumerGroup(String consumerGroup) { public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup; this.consumerGroup = consumerGroup;
} }
public String getEnodeName() {
return enodeName;
}
public void setEnodeName(String enodeName) {
this.enodeName = enodeName;
}
@Override public String toString() {
return "PushMessageHeader{" +
"queueOffset=" + queueOffset +
", messageId='" + messageId + '\'' +
", queueId=" + queueId +
", topic='" + topic + '\'' +
", consumerGroup='" + consumerGroup + '\'' +
", enodeName='" + enodeName + '\'' +
'}';
}
} }
...@@ -44,7 +44,6 @@ public class SubscriptionManagerImpl implements SubscriptionManager { ...@@ -44,7 +44,6 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
@Override @Override
public void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel, public void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel,
String groupId) { String groupId) {
log.info("Register push session subscriptionDataSet: {}", subscriptionDataSet);
Set<MessageQueue> prevSubSet = this.clientSubscriptionTable.get(remotingChannel); Set<MessageQueue> prevSubSet = this.clientSubscriptionTable.get(remotingChannel);
Set<MessageQueue> keySet = new HashSet<>(); Set<MessageQueue> keySet = new HashSet<>();
for (SubscriptionData subscriptionData : subscriptionDataSet) { for (SubscriptionData subscriptionData : subscriptionDataSet) {
...@@ -57,15 +56,14 @@ public class SubscriptionManagerImpl implements SubscriptionManager { ...@@ -57,15 +56,14 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
Set<RemotingChannel> prev = pushTable.putIfAbsent(messageQueue, clientSet); Set<RemotingChannel> prev = pushTable.putIfAbsent(messageQueue, clientSet);
clientSet = prev != null ? prev : clientSet; clientSet = prev != null ? prev : clientSet;
} }
log.info("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress()); log.debug("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
clientSet.add(remotingChannel); clientSet.add(remotingChannel);
} }
} }
} }
if (keySet.size() > 0) { if (keySet.size() > 0) {
this.clientSubscriptionTable.putIfAbsent(remotingChannel, keySet); this.clientSubscriptionTable.put(remotingChannel, keySet);
} }
log.info("Register push session clientSubscriptionTable: {}", clientSubscriptionTable);
if (prevSubSet != null) { if (prevSubSet != null) {
for (MessageQueue messageQueue : prevSubSet) { for (MessageQueue messageQueue : prevSubSet) {
if (!keySet.contains(messageQueue)) { if (!keySet.contains(messageQueue)) {
...@@ -77,7 +75,6 @@ public class SubscriptionManagerImpl implements SubscriptionManager { ...@@ -77,7 +75,6 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
} }
} }
} }
log.info("Register push session clientSubscriptionTable: {}", clientSubscriptionTable);
} }
@Override @Override
......
...@@ -95,7 +95,7 @@ public class PushServiceImpl implements PushService { ...@@ -95,7 +95,7 @@ public class PushServiceImpl implements PushService {
messageExt.setFlag(sendMessageRequestHeader.getFlag()); messageExt.setFlag(sendMessageRequestHeader.getFlag());
messageExt.setBody(message); messageExt.setBody(message);
messageExt.setBodyCRC(UtilAll.crc32(message)); messageExt.setBodyCRC(UtilAll.crc32(message));
log.info("MessageExt:{}", messageExt); log.debug("MessageExt:{}", messageExt);
return messageExt; return messageExt;
} }
...@@ -103,9 +103,9 @@ public class PushServiceImpl implements PushService { ...@@ -103,9 +103,9 @@ public class PushServiceImpl implements PushService {
public void run() { public void run() {
if (!canceled.get()) { if (!canceled.get()) {
try { try {
log.info("sendMessageResponse:{}", sendMessageResponse); log.debug("sendMessageResponse: {}", sendMessageResponse);
SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) sendMessageResponse.decodeCommandCustomHeader(SendMessageResponseHeader.class); SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) sendMessageResponse.decodeCommandCustomHeader(SendMessageResponseHeader.class);
log.info("sendMessageResponseHeader:{}", sendMessageResponseHeader); log.debug("sendMessageResponseHeader: {}", sendMessageResponseHeader);
MessageQueue messageQueue = new MessageQueue(sendMessageRequestHeader.getTopic(), sendMessageRequestHeader.getEnodeName(), sendMessageRequestHeader.getQueueId()); MessageQueue messageQueue = new MessageQueue(sendMessageRequestHeader.getTopic(), sendMessageRequestHeader.getEnodeName(), sendMessageRequestHeader.getQueueId());
Set<RemotingChannel> consumerTable = snodeController.getSubscriptionManager().getPushableChannel(messageQueue); Set<RemotingChannel> consumerTable = snodeController.getSubscriptionManager().getPushableChannel(messageQueue);
if (consumerTable != null) { if (consumerTable != null) {
...@@ -113,6 +113,7 @@ public class PushServiceImpl implements PushService { ...@@ -113,6 +113,7 @@ public class PushServiceImpl implements PushService {
pushMessageHeader.setQueueOffset(sendMessageResponseHeader.getQueueOffset()); pushMessageHeader.setQueueOffset(sendMessageResponseHeader.getQueueOffset());
pushMessageHeader.setTopic(sendMessageRequestHeader.getTopic()); pushMessageHeader.setTopic(sendMessageRequestHeader.getTopic());
pushMessageHeader.setQueueId(sendMessageResponseHeader.getQueueId()); pushMessageHeader.setQueueId(sendMessageResponseHeader.getQueueId());
pushMessageHeader.setEnodeName(sendMessageRequestHeader.getEnodeName());
RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.SNODE_PUSH_MESSAGE, pushMessageHeader); RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.SNODE_PUSH_MESSAGE, pushMessageHeader);
MessageExt messageExt = buildMessageExt(sendMessageResponseHeader, message, sendMessageRequestHeader); MessageExt messageExt = buildMessageExt(sendMessageResponseHeader, message, sendMessageRequestHeader);
pushMessage.setBody(MessageDecoder.encode(messageExt, false)); pushMessage.setBody(MessageDecoder.encode(messageExt, false));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册