提交 044222b5 编写于 作者: D duhenglucky

Fix the subscription data type error issue

上级 c9282de2
...@@ -31,7 +31,6 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; ...@@ -31,7 +31,6 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.PushSubscriptionData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
public class RebalancePushImpl extends RebalanceImpl { public class RebalancePushImpl extends RebalanceImpl {
...@@ -55,8 +54,7 @@ public class RebalancePushImpl extends RebalanceImpl { ...@@ -55,8 +54,7 @@ public class RebalancePushImpl extends RebalanceImpl {
* When rebalance result changed, should update subscription's version to notify broker. * When rebalance result changed, should update subscription's version to notify broker.
* Fix: inconsistency subscription may lead to consumer miss messages. * Fix: inconsistency subscription may lead to consumer miss messages.
*/ */
SubscriptionData sub = this.subscriptionInner.get(topic); SubscriptionData subscriptionData = this.subscriptionInner.get(topic);
PushSubscriptionData subscriptionData = (PushSubscriptionData) sub;
long newVersion = System.currentTimeMillis(); long newVersion = System.currentTimeMillis();
log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion); log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
subscriptionData.setSubVersion(newVersion); subscriptionData.setSubVersion(newVersion);
......
...@@ -74,6 +74,7 @@ public abstract class AbstractFlowControlService implements Interceptor { ...@@ -74,6 +74,7 @@ public abstract class AbstractFlowControlService implements Interceptor {
Boolean acquired = this.acquiredThreadLocal.get(); Boolean acquired = this.acquiredThreadLocal.get();
if (acquired != null && acquired) { if (acquired != null && acquired) {
SphO.exit(); SphO.exit();
this.acquiredThreadLocal.remove();
} }
} }
...@@ -82,6 +83,7 @@ public abstract class AbstractFlowControlService implements Interceptor { ...@@ -82,6 +83,7 @@ public abstract class AbstractFlowControlService implements Interceptor {
Boolean acquired = this.acquiredThreadLocal.get(); Boolean acquired = this.acquiredThreadLocal.get();
if (acquired != null && acquired) { if (acquired != null && acquired) {
SphO.exit(); SphO.exit();
this.acquiredThreadLocal.remove();
} }
} }
......
...@@ -56,7 +56,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { ...@@ -56,7 +56,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
Set<RemotingChannel> prev = pushTable.putIfAbsent(messageQueue, clientSet); Set<RemotingChannel> prev = pushTable.putIfAbsent(messageQueue, clientSet);
clientSet = prev != null ? prev : clientSet; clientSet = prev != null ? prev : clientSet;
} }
log.info("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress()); log.debug("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
clientSet.add(remotingChannel); clientSet.add(remotingChannel);
} }
} }
......
...@@ -20,7 +20,6 @@ import io.netty.channel.Channel; ...@@ -20,7 +20,6 @@ import io.netty.channel.Channel;
import io.netty.util.Attribute; import io.netty.util.Attribute;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
...@@ -75,15 +74,12 @@ public class HeartbeatProcessor implements RequestProcessor { ...@@ -75,15 +74,12 @@ public class HeartbeatProcessor implements RequestProcessor {
Client client = new Client(); Client client = new Client();
client.setClientId(heartbeatData.getClientID()); client.setClientId(heartbeatData.getClientID());
client.setRemotingChannel(remotingChannel); client.setRemotingChannel(remotingChannel);
Set<String> groupSet = new HashSet<>();
for (ProducerData producerData : heartbeatData.getProducerDataSet()) { for (ProducerData producerData : heartbeatData.getProducerDataSet()) {
client.setClientRole(ClientRole.Producer); client.setClientRole(ClientRole.Producer);
if (!MixAll.CLIENT_INNER_PRODUCER_GROUP.equals(producerData.getGroupName())) {
groupSet.add(producerData.getGroupName());
}
this.snodeController.getProducerManager().register(producerData.getGroupName(), client); this.snodeController.getProducerManager().register(producerData.getGroupName(), client);
} }
Set<String> groupSet = new HashSet<>();
for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) { for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) {
client.setClientRole(ClientRole.Consumer); client.setClientRole(ClientRole.Consumer);
groupSet.add(consumerData.getGroupName()); groupSet.add(consumerData.getGroupName());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册