提交 c7fdb765 编写于 作者: D duhenglucky

Add support for one client instance with multi consumer group in push module

上级 5440b1c4
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
<artifactId>rocketmq-all</artifactId> <artifactId>rocketmq-all</artifactId>
<version>4.4.0-SNAPSHOT</version> <version>4.4.0-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging> <packaging>jar</packaging>
<artifactId>rocketmq-acl</artifactId> <artifactId>rocketmq-acl</artifactId>
...@@ -69,7 +70,7 @@ ...@@ -69,7 +70,7 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.rocketmq</groupId> <groupId>${project.groupId}</groupId>
<artifactId>rocketmq-remoting</artifactId> <artifactId>rocketmq-remoting</artifactId>
</dependency> </dependency>
......
...@@ -513,8 +513,8 @@ public class BrokerController { ...@@ -513,8 +513,8 @@ public class BrokerController {
*/ */
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, this.snodePullMessageProcessor, pullMessageExecutor); // this.remotingServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, this.snodePullMessageProcessor, pullMessageExecutor);
this.snodePullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); // this.snodePullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/** /**
* QueryMessageProcessor * QueryMessageProcessor
*/ */
......
...@@ -19,7 +19,6 @@ package org.apache.rocketmq.broker.processor; ...@@ -19,7 +19,6 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion; import io.netty.channel.FileRegion;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
...@@ -41,8 +40,6 @@ import org.apache.rocketmq.common.constant.PermName; ...@@ -41,8 +40,6 @@ import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
...@@ -53,11 +50,13 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; ...@@ -53,11 +50,13 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent; import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
...@@ -78,12 +77,10 @@ public class PullMessageProcessor implements RequestProcessor { ...@@ -78,12 +77,10 @@ public class PullMessageProcessor implements RequestProcessor {
} }
@Override @Override
public RemotingCommand processRequest(RemotingChannel remotingChannel, public RemotingCommand processRequest(final RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException { RemotingCommand request) throws RemotingCommandException {
NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel; Channel channel = ((NettyChannelHandlerContextImpl)remotingChannel).getChannelHandlerContext().channel();
ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext(); return this.processRequest(channel, request, true);
return this.processRequest(ctx.channel(), request, true);
} }
@Override @Override
...@@ -100,7 +97,7 @@ public class PullMessageProcessor implements RequestProcessor { ...@@ -100,7 +97,7 @@ public class PullMessageProcessor implements RequestProcessor {
response.setOpaque(request.getOpaque()); response.setOpaque(request.getOpaque());
log.info("receive PullMessage request command, {}", request); log.debug("receive PullMessage request command, {}", request);
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) { if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION); response.setCode(ResponseCode.NO_PERMISSION);
......
...@@ -115,7 +115,7 @@ public class SnodePullMessageProcessor implements RequestProcessor { ...@@ -115,7 +115,7 @@ public class SnodePullMessageProcessor implements RequestProcessor {
log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
requestHeader.getConsumerGroup()); requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed"); response.setRemark(e.getMessage());
return response; return response;
} }
......
...@@ -31,6 +31,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; ...@@ -31,6 +31,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.PushSubscriptionData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
public class RebalancePushImpl extends RebalanceImpl { public class RebalancePushImpl extends RebalanceImpl {
...@@ -54,16 +55,17 @@ public class RebalancePushImpl extends RebalanceImpl { ...@@ -54,16 +55,17 @@ public class RebalancePushImpl extends RebalanceImpl {
* When rebalance result changed, should update subscription's version to notify broker. * When rebalance result changed, should update subscription's version to notify broker.
* Fix: inconsistency subscription may lead to consumer miss messages. * Fix: inconsistency subscription may lead to consumer miss messages.
*/ */
SubscriptionData subscriptionData = this.subscriptionInner.get(topic); SubscriptionData sub = this.subscriptionInner.get(topic);
PushSubscriptionData subscriptionData = (PushSubscriptionData) sub;
long newVersion = System.currentTimeMillis(); long newVersion = System.currentTimeMillis();
log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion); log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
subscriptionData.setSubVersion(newVersion); subscriptionData.setSubVersion(newVersion);
Set<Integer> queueIdSet = new HashSet<Integer>(); Set<MessageQueue> queueIdSet = new HashSet<MessageQueue>();
for (MessageQueue messageQueue : mqAll) { for (MessageQueue messageQueue : mqAll) {
queueIdSet.add(messageQueue.getQueueId()); queueIdSet.add(messageQueue);
} }
subscriptionData.setQueueIdSet(queueIdSet); subscriptionData.setMessageQueueSet(queueIdSet);
int currentQueueCount = this.processQueueTable.size(); int currentQueueCount = this.processQueueTable.size();
if (currentQueueCount != 0) { if (currentQueueCount != 0) {
int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic(); int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
......
...@@ -131,7 +131,7 @@ public abstract class AbstractFlowControlService implements Interceptor { ...@@ -131,7 +131,7 @@ public abstract class AbstractFlowControlService implements Interceptor {
} }
} }
FlowRuleManager.loadRules(sentinelRules); FlowRuleManager.loadRules(sentinelRules);
log.warn("Load Rules: {}" + FlowRuleManager.getRules()); log.warn("Load Rules: {}", FlowRuleManager.getRules());
} }
} }
...@@ -33,6 +33,9 @@ public class PushMessageHeader implements CommandCustomHeader { ...@@ -33,6 +33,9 @@ public class PushMessageHeader implements CommandCustomHeader {
@CFNotNull @CFNotNull
private String topic; private String topic;
@CFNotNull
private String consumerGroup;
@Override @Override
public void checkFields() throws RemotingCommandException { public void checkFields() throws RemotingCommandException {
...@@ -69,4 +72,12 @@ public class PushMessageHeader implements CommandCustomHeader { ...@@ -69,4 +72,12 @@ public class PushMessageHeader implements CommandCustomHeader {
public void setMessageId(String messageId) { public void setMessageId(String messageId) {
this.messageId = messageId; this.messageId = messageId;
} }
public String getConsumerGroup() {
return consumerGroup;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
} }
...@@ -31,7 +31,6 @@ public class ConsumerData { ...@@ -31,7 +31,6 @@ public class ConsumerData {
private ConsumeFromWhere consumeFromWhere; private ConsumeFromWhere consumeFromWhere;
private Set<SubscriptionData> subscriptionDataSet = new HashSet<SubscriptionData>(); private Set<SubscriptionData> subscriptionDataSet = new HashSet<SubscriptionData>();
private boolean unitMode; private boolean unitMode;
private boolean realPushEnable = true;
public String getGroupName() { public String getGroupName() {
return groupName; return groupName;
...@@ -80,15 +79,6 @@ public class ConsumerData { ...@@ -80,15 +79,6 @@ public class ConsumerData {
public void setUnitMode(boolean isUnitMode) { public void setUnitMode(boolean isUnitMode) {
this.unitMode = isUnitMode; this.unitMode = isUnitMode;
} }
public boolean isRealPushEnable() {
return realPushEnable;
}
public void setRealPushEnable(boolean realPushEnable) {
this.realPushEnable = realPushEnable;
}
@Override @Override
public String toString() { public String toString() {
return "ConsumerData{" + return "ConsumerData{" +
...@@ -98,7 +88,6 @@ public class ConsumerData { ...@@ -98,7 +88,6 @@ public class ConsumerData {
", consumeFromWhere=" + consumeFromWhere + ", consumeFromWhere=" + consumeFromWhere +
", subscriptionDataSet=" + subscriptionDataSet + ", subscriptionDataSet=" + subscriptionDataSet +
", unitMode=" + unitMode + ", unitMode=" + unitMode +
", realPushEnable=" + realPushEnable +
'}'; '}';
} }
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.heartbeat;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageQueue;
public class PushSubscriptionData extends SubscriptionData {
private Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
public Set<MessageQueue> getMessageQueueSet() {
return messageQueueSet;
}
public void setMessageQueueSet(Set<MessageQueue> messageQueueSet) {
this.messageQueueSet = messageQueueSet;
}
}
...@@ -31,7 +31,6 @@ public class SubscriptionData implements Comparable<SubscriptionData> { ...@@ -31,7 +31,6 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
private boolean classFilterMode = false; private boolean classFilterMode = false;
private String topic; private String topic;
private String subString; private String subString;
private Set<Integer> queueIdSet = new HashSet<Integer>();
private Set<String> tagsSet = new HashSet<String>(); private Set<String> tagsSet = new HashSet<String>();
private Set<Integer> codeSet = new HashSet<Integer>(); private Set<Integer> codeSet = new HashSet<Integer>();
private long subVersion = System.currentTimeMillis(); private long subVersion = System.currentTimeMillis();
...@@ -114,14 +113,6 @@ public class SubscriptionData implements Comparable<SubscriptionData> { ...@@ -114,14 +113,6 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
this.expressionType = expressionType; this.expressionType = expressionType;
} }
public Set<Integer> getQueueIdSet() {
return queueIdSet;
}
public void setQueueIdSet(Set<Integer> queueIdSet) {
this.queueIdSet = queueIdSet;
}
@Override @Override
public int hashCode() { public int hashCode() {
final int prime = 31; final int prime = 31;
...@@ -129,7 +120,6 @@ public class SubscriptionData implements Comparable<SubscriptionData> { ...@@ -129,7 +120,6 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
result = prime * result + (classFilterMode ? 1231 : 1237); result = prime * result + (classFilterMode ? 1231 : 1237);
result = prime * result + ((codeSet == null) ? 0 : codeSet.hashCode()); result = prime * result + ((codeSet == null) ? 0 : codeSet.hashCode());
result = prime * result + ((subString == null) ? 0 : subString.hashCode()); result = prime * result + ((subString == null) ? 0 : subString.hashCode());
result = prime * result + ((queueIdSet == null) ? 0 : queueIdSet.hashCode());
result = prime * result + ((tagsSet == null) ? 0 : tagsSet.hashCode()); result = prime * result + ((tagsSet == null) ? 0 : tagsSet.hashCode());
result = prime * result + ((topic == null) ? 0 : topic.hashCode()); result = prime * result + ((topic == null) ? 0 : topic.hashCode());
result = prime * result + ((expressionType == null) ? 0 : expressionType.hashCode()); result = prime * result + ((expressionType == null) ? 0 : expressionType.hashCode());
...@@ -164,11 +154,6 @@ public class SubscriptionData implements Comparable<SubscriptionData> { ...@@ -164,11 +154,6 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
return false; return false;
} else if (!tagsSet.equals(other.tagsSet)) } else if (!tagsSet.equals(other.tagsSet))
return false; return false;
if (queueIdSet == null) {
if (other.queueIdSet != null)
return false;
} else if (!queueIdSet.equals(other.queueIdSet))
return false;
if (topic == null) { if (topic == null) {
if (other.topic != null) if (other.topic != null)
return false; return false;
...@@ -187,7 +172,6 @@ public class SubscriptionData implements Comparable<SubscriptionData> { ...@@ -187,7 +172,6 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
"classFilterMode=" + classFilterMode + "classFilterMode=" + classFilterMode +
", topic='" + topic + '\'' + ", topic='" + topic + '\'' +
", subString='" + subString + '\'' + ", subString='" + subString + '\'' +
", queueIdSet=" + queueIdSet +
", tagsSet=" + tagsSet + ", tagsSet=" + tagsSet +
", codeSet=" + codeSet + ", codeSet=" + codeSet +
", subVersion=" + subVersion + ", subVersion=" + subVersion +
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
*/ */
package org.apache.rocketmq.example.filter; package org.apache.rocketmq.example.filter;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
...@@ -24,20 +23,14 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; ...@@ -24,20 +23,14 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
public class Consumer { public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException, IOException { public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4"); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); consumer.subscribe("TagFilterTest", "TagA || TagC");
File classFile = new File(classLoader.getResource("MessageFilterImpl.java").getFile());
String filterCode = MixAll.file2String(classFile);
consumer.subscribe("TopicTest", "org.apache.rocketmq.example.filter.MessageFilterImpl",
filterCode);
consumer.registerMessageListener(new MessageListenerConcurrently() { consumer.registerMessageListener(new MessageListenerConcurrently() {
...@@ -53,4 +46,5 @@ public class Consumer { ...@@ -53,4 +46,5 @@ public class Consumer {
System.out.printf("Consumer Started.%n"); System.out.printf("Consumer Started.%n");
} }
} }
...@@ -16,31 +16,30 @@ ...@@ -16,31 +16,30 @@
*/ */
package org.apache.rocketmq.example.filter; package org.apache.rocketmq.example.filter;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer { public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException { public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start(); producer.start();
try { String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 6000000; i++) {
Message msg = new Message("TopicFilter7", for (int i = 0; i < 10; i++) {
"TagA", Message msg = new Message("TagFilterTest",
"OrderID001", tags[i % tags.length],
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
msg.putUserProperty("a", String.valueOf(i));
msg.putUserProperty("SequenceId", String.valueOf(i)); SendResult sendResult = producer.send(msg);
SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
} }
producer.shutdown(); producer.shutdown();
} }
} }
...@@ -17,28 +17,23 @@ ...@@ -17,28 +17,23 @@
package org.apache.rocketmq.example.filter; package org.apache.rocketmq.example.filter;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class SqlConsumer { public class SqlConsumer {
public static void main(String[] args) { public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
try {
consumer.subscribe("TopicTest", // Don't forget to set enablePropertyFilter=true in broker
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" + consumer.subscribe("SqlFilterTest",
"and (a is not null and a between 0 3)")); MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
} catch (MQClientException e) { "and (a is not null and a between 0 and 3)"));
e.printStackTrace();
return;
}
consumer.registerMessageListener(new MessageListenerConcurrently() { consumer.registerMessageListener(new MessageListenerConcurrently() {
...@@ -50,12 +45,7 @@ public class SqlConsumer { ...@@ -50,12 +45,7 @@ public class SqlConsumer {
} }
}); });
try { consumer.start();
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
return;
}
System.out.printf("Consumer Started.%n"); System.out.printf("Consumer Started.%n");
} }
} }
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.rocketmq.snode.client; package org.apache.rocketmq.snode.client;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.serialize.LanguageCode; import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.snode.client.impl.ClientRole; import org.apache.rocketmq.snode.client.impl.ClientRole;
...@@ -24,10 +25,10 @@ import org.apache.rocketmq.snode.client.impl.ClientRole; ...@@ -24,10 +25,10 @@ import org.apache.rocketmq.snode.client.impl.ClientRole;
public class Client { public class Client {
private ClientRole clientRole; private ClientRole clientRole;
private String groupId;
private String clientId; private String clientId;
private Set<String> groups;
private RemotingChannel remotingChannel; private RemotingChannel remotingChannel;
private int heartbeatInterval; private int heartbeatInterval;
...@@ -46,32 +47,23 @@ public class Client { ...@@ -46,32 +47,23 @@ public class Client {
this.clientRole = clientRole; this.clientRole = clientRole;
} }
@Override @Override public boolean equals(Object o) {
public boolean equals(Object o) {
if (this == o) if (this == o)
return true; return true;
if (o == null || getClass() != o.getClass()) if (!(o instanceof Client))
return false; return false;
Client client = (Client) o; Client client = (Client) o;
return version == client.version && return
clientRole == client.clientRole && version == client.version &&
Objects.equals(groupId, client.groupId) && clientRole == client.clientRole &&
Objects.equals(clientId, client.clientId) && Objects.equals(clientId, client.clientId) &&
Objects.equals(remotingChannel, client.remotingChannel) && Objects.equals(groups, client.groups) &&
language == client.language; Objects.equals(remotingChannel, client.remotingChannel) &&
} language == client.language;
@Override
public int hashCode() {
return Objects.hash(clientRole, groupId, clientId, remotingChannel, version, language);
}
public String getGroupId() {
return groupId;
} }
public void setGroupId(String groupId) { @Override public int hashCode() {
this.groupId = groupId; return Objects.hash(clientRole, clientId, groups, remotingChannel, heartbeatInterval, lastUpdateTimestamp, version, language);
} }
public RemotingChannel getRemotingChannel() { public RemotingChannel getRemotingChannel() {
...@@ -122,11 +114,19 @@ public class Client { ...@@ -122,11 +114,19 @@ public class Client {
this.language = language; this.language = language;
} }
public Set<String> getGroups() {
return groups;
}
public void setGroups(Set<String> groups) {
this.groups = groups;
}
@Override public String toString() { @Override public String toString() {
return "Client{" + return "Client{" +
"clientRole=" + clientRole + "clientRole=" + clientRole +
", groupId='" + groupId + '\'' +
", clientId='" + clientId + '\'' + ", clientId='" + clientId + '\'' +
", groups=" + groups +
", remotingChannel=" + remotingChannel + ", remotingChannel=" + remotingChannel +
", heartbeatInterval=" + heartbeatInterval + ", heartbeatInterval=" + heartbeatInterval +
", lastUpdateTimestamp=" + lastUpdateTimestamp + ", lastUpdateTimestamp=" + lastUpdateTimestamp +
......
...@@ -20,7 +20,7 @@ import java.util.List; ...@@ -20,7 +20,7 @@ import java.util.List;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
public interface ClientManager { public interface ClientManager {
boolean register(Client client); boolean register(String groupId, Client client);
void unRegister(String groupId, RemotingChannel remotingChannel); void unRegister(String groupId, RemotingChannel remotingChannel);
......
...@@ -16,10 +16,7 @@ ...@@ -16,10 +16,7 @@
*/ */
package org.apache.rocketmq.snode.client; package org.apache.rocketmq.snode.client;
import org.apache.rocketmq.remoting.RemotingChannel;
public interface SlowConsumerService { public interface SlowConsumerService {
boolean isSlowConsumer(long latestLogicOffset, String topic, int queueId, RemotingChannel remotingChannel, boolean isSlowConsumer(long latestLogicOffset, String topic, int queueId, String consumerGroup, String enodeName);
String enodeName);
} }
...@@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.client; ...@@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.client;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
...@@ -35,9 +36,10 @@ public interface SubscriptionManager { ...@@ -35,9 +36,10 @@ public interface SubscriptionManager {
Subscription getSubscription(String groupId); Subscription getSubscription(String groupId);
void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel, String groupId); void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel,
String groupId);
void removePushSession(RemotingChannel remotingChannel); void removePushSession(RemotingChannel remotingChannel);
Set<RemotingChannel> getPushableChannel(String topic, Integer queueId); Set<RemotingChannel> getPushableChannel(MessageQueue messageQueue);
} }
...@@ -82,7 +82,7 @@ public abstract class ClientManagerImpl implements ClientManager { ...@@ -82,7 +82,7 @@ public abstract class ClientManagerImpl implements ClientManager {
iter.remove(); iter.remove();
client.getRemotingChannel().close(); client.getRemotingChannel().close();
log.warn("SCAN: Remove expired channel from {}ClientTable. channel={}, group={}", client.getClientRole(), log.warn("SCAN: Remove expired channel from {}ClientTable. channel={}, group={}", client.getClientRole(),
RemotingHelper.parseChannelRemoteAddr(client.getRemotingChannel().remoteAddress()), client.getGroupId()); RemotingHelper.parseChannelRemoteAddr(client.getRemotingChannel().remoteAddress()), group);
if (channelTable.isEmpty()) { if (channelTable.isEmpty()) {
iterator.remove(); iterator.remove();
log.warn("SCAN: Remove group={} channel from {}ClientTable.", group, client.getClientRole()); log.warn("SCAN: Remove group={} channel from {}ClientTable.", group, client.getClientRole());
...@@ -93,13 +93,13 @@ public abstract class ClientManagerImpl implements ClientManager { ...@@ -93,13 +93,13 @@ public abstract class ClientManagerImpl implements ClientManager {
} }
@Override @Override
public boolean register(Client client) { public boolean register(String groupId, Client client) {
boolean updated = false; boolean updated = false;
if (client != null) { if (client != null) {
ConcurrentHashMap<RemotingChannel, Client> channelTable = groupClientTable.get(client.getGroupId()); ConcurrentHashMap<RemotingChannel, Client> channelTable = groupClientTable.get(groupId);
if (channelTable == null) { if (channelTable == null) {
channelTable = new ConcurrentHashMap(); channelTable = new ConcurrentHashMap();
ConcurrentHashMap prev = groupClientTable.putIfAbsent(client.getGroupId(), channelTable); ConcurrentHashMap prev = groupClientTable.putIfAbsent(groupId, channelTable);
channelTable = prev != null ? prev : channelTable; channelTable = prev != null ? prev : channelTable;
} }
...@@ -107,14 +107,14 @@ public abstract class ClientManagerImpl implements ClientManager { ...@@ -107,14 +107,14 @@ public abstract class ClientManagerImpl implements ClientManager {
if (oldClient == null) { if (oldClient == null) {
Client prev = channelTable.put(client.getRemotingChannel(), client); Client prev = channelTable.put(client.getRemotingChannel(), client);
if (prev != null) { if (prev != null) {
log.info("New client connected, group: {} {} {} channel: {}", client.getGroupId(), client.toString()); log.info("New client connected, group: {} {} {} channel: {}", groupId, client.toString());
updated = true; updated = true;
} }
oldClient = client; oldClient = client;
} else { } else {
if (!oldClient.getClientId().equals(client.getClientId())) { if (!oldClient.getClientId().equals(client.getClientId())) {
log.error("[BUG] client channel exist in snode, but clientId not equal. GROUP: {} OLD: {} NEW: {} ", log.error("[BUG] client channel exist in snode, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
client.getGroupId(), groupId,
oldClient.toString(), oldClient.toString(),
channelTable.toString()); channelTable.toString());
channelTable.put(client.getRemotingChannel(), client); channelTable.put(client.getRemotingChannel(), client);
...@@ -122,8 +122,8 @@ public abstract class ClientManagerImpl implements ClientManager { ...@@ -122,8 +122,8 @@ public abstract class ClientManagerImpl implements ClientManager {
} }
oldClient.setLastUpdateTimestamp(System.currentTimeMillis()); oldClient.setLastUpdateTimestamp(System.currentTimeMillis());
} }
log.debug("Register client role: {}, group: {}, last: {}", client.getClientRole(), client.getGroupId(), client.getLastUpdateTimestamp()); log.debug("Register client role: {}, group: {}, last: {}", client.getClientRole(), groupId, client.getLastUpdateTimestamp());
onRegister(client.getGroupId(), client.getRemotingChannel()); onRegister(groupId, client.getRemotingChannel());
return updated; return updated;
} }
......
...@@ -16,17 +16,11 @@ ...@@ -16,17 +16,11 @@
*/ */
package org.apache.rocketmq.snode.client.impl; package org.apache.rocketmq.snode.client.impl;
import io.netty.channel.Channel;
import io.netty.util.Attribute;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.SlowConsumerService; import org.apache.rocketmq.snode.client.SlowConsumerService;
import org.apache.rocketmq.snode.constant.SnodeConstant;
public class SlowConsumerServiceImpl implements SlowConsumerService { public class SlowConsumerServiceImpl implements SlowConsumerService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
...@@ -39,22 +33,13 @@ public class SlowConsumerServiceImpl implements SlowConsumerService { ...@@ -39,22 +33,13 @@ public class SlowConsumerServiceImpl implements SlowConsumerService {
@Override @Override
public boolean isSlowConsumer(long latestLogicOffset, String topic, int queueId, public boolean isSlowConsumer(long latestLogicOffset, String topic, int queueId,
RemotingChannel remotingChannel, String enodeName) { String consumerGroup, String enodeName) {
Client client = null; long ackedOffset = this.snodeController.getConsumerOffsetManager().queryOffset(enodeName, consumerGroup, topic, queueId);
if (remotingChannel instanceof NettyChannelImpl) { if (latestLogicOffset - ackedOffset > snodeController.getSnodeConfig().getSlowConsumerThreshold()) {
Channel channel = ((NettyChannelImpl) remotingChannel).getChannel(); log.warn("[SlowConsumer] group: {}, lastAckedOffset:{} nowOffset:{} ", consumerGroup, ackedOffset, latestLogicOffset);
Attribute<Client> clientAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ATTRIBUTE_KEY); return true;
if (clientAttribute != null) {
client = clientAttribute.get();
}
}
if (client != null) {
long ackedOffset = this.snodeController.getConsumerOffsetManager().queryOffset(enodeName, client.getGroupId(), topic, queueId);
if (latestLogicOffset - ackedOffset > snodeController.getSnodeConfig().getSlowConsumerThreshold()) {
log.warn("[SlowConsumer] group: {}, lastAckedOffset:{} nowOffset:{} ", client.getGroupId(), ackedOffset, latestLogicOffset);
return true;
}
} }
return false; return false;
} }
} }
...@@ -23,8 +23,10 @@ import java.util.Set; ...@@ -23,8 +23,10 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.PushSubscriptionData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
...@@ -36,38 +38,27 @@ public class SubscriptionManagerImpl implements SubscriptionManager { ...@@ -36,38 +38,27 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
private ConcurrentHashMap<String/*Consumer group*/, Subscription> groupSubscriptionTable = new ConcurrentHashMap<>(1024); private ConcurrentHashMap<String/*Consumer group*/, Subscription> groupSubscriptionTable = new ConcurrentHashMap<>(1024);
private ConcurrentHashMap<String/*Topic#QueueId*/, Set<RemotingChannel>> pushTable = new ConcurrentHashMap(); private ConcurrentHashMap<MessageQueue, Set<RemotingChannel>> pushTable = new ConcurrentHashMap();
private ConcurrentHashMap<RemotingChannel, Set<String>/*Topic#QueueId*/> clientSubscriptionTable = new ConcurrentHashMap<>(2048); private ConcurrentHashMap<RemotingChannel, Set<MessageQueue>> clientSubscriptionTable = new ConcurrentHashMap<>(2048);
private final String pushKeySeparator = "#";
private String buildPushKey(String topic, Integer queueId) {
if (topic != null && queueId != null) {
StringBuffer stringBuffer = new StringBuffer(32);
stringBuffer.append(topic).append(pushKeySeparator).append(queueId);
return stringBuffer.toString();
}
return null;
}
@Override @Override
public void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel, public void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel,
String groupId) { String groupId) {
Set<String> prevSubSet = this.clientSubscriptionTable.get(remotingChannel); Set<MessageQueue> prevSubSet = this.clientSubscriptionTable.get(remotingChannel);
Set<String> keySet = new HashSet<>(); Set<MessageQueue> keySet = new HashSet<>();
for (SubscriptionData subscriptionData : subscriptionDataSet) { for (SubscriptionData tmp : subscriptionDataSet) {
if (subscriptionData.getTopic() != null && subscriptionData.getQueueIdSet() != null && remotingChannel != null) { PushSubscriptionData subscriptionData = (PushSubscriptionData) tmp;
for (Integer queueId : subscriptionData.getQueueIdSet()) { if (subscriptionData.getTopic() != null && subscriptionData.getMessageQueueSet() != null && remotingChannel != null) {
String key = buildPushKey(subscriptionData.getTopic(), queueId); for (MessageQueue messageQueue : subscriptionData.getMessageQueueSet()) {
keySet.add(key); keySet.add(messageQueue);
Set<RemotingChannel> clientSet = pushTable.get(key); Set<RemotingChannel> clientSet = pushTable.get(messageQueue);
if (clientSet == null) { if (clientSet == null) {
clientSet = new HashSet<>(); clientSet = new HashSet<>();
Set<RemotingChannel> prev = pushTable.putIfAbsent(key, clientSet); Set<RemotingChannel> prev = pushTable.putIfAbsent(messageQueue, clientSet);
clientSet = prev != null ? prev : clientSet; clientSet = prev != null ? prev : clientSet;
} }
log.info("Register push session key: {}, group: {} remoting: {}", key, groupId, remotingChannel.remoteAddress()); log.info("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
clientSet.add(remotingChannel); clientSet.add(remotingChannel);
} }
} }
...@@ -76,12 +67,12 @@ public class SubscriptionManagerImpl implements SubscriptionManager { ...@@ -76,12 +67,12 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
this.clientSubscriptionTable.putIfAbsent(remotingChannel, keySet); this.clientSubscriptionTable.putIfAbsent(remotingChannel, keySet);
} }
if (prevSubSet != null) { if (prevSubSet != null) {
for (String key : prevSubSet) { for (MessageQueue messageQueue : prevSubSet) {
if (!keySet.contains(key)) { if (!keySet.contains(messageQueue)) {
Set clientSet = pushTable.get(key); Set clientSet = pushTable.get(messageQueue);
if (clientSet != null) { if (clientSet != null) {
clientSet.remove(remotingChannel); clientSet.remove(remotingChannel);
log.info("Remove key:{}", key); log.info("Remove subscription message queue:{}", messageQueue);
} }
} }
} }
...@@ -90,9 +81,9 @@ public class SubscriptionManagerImpl implements SubscriptionManager { ...@@ -90,9 +81,9 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
@Override @Override
public void removePushSession(RemotingChannel remotingChannel) { public void removePushSession(RemotingChannel remotingChannel) {
Set<String> subSet = this.clientSubscriptionTable.get(remotingChannel); Set<MessageQueue> subSet = this.clientSubscriptionTable.get(remotingChannel);
if (subSet != null) { if (subSet != null) {
for (String key : subSet) { for (MessageQueue key : subSet) {
Set clientSet = pushTable.get(key); Set clientSet = pushTable.get(key);
if (clientSet != null) { if (clientSet != null) {
log.info("Remove push key:{} remoting:{}", key, remotingChannel.remoteAddress()); log.info("Remove push key:{} remoting:{}", key, remotingChannel.remoteAddress());
...@@ -107,10 +98,8 @@ public class SubscriptionManagerImpl implements SubscriptionManager { ...@@ -107,10 +98,8 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
} }
@Override @Override
public Set<RemotingChannel> getPushableChannel(String topic, Integer queueId) { public Set<RemotingChannel> getPushableChannel(MessageQueue messageQueue) {
String key = buildPushKey(topic, queueId); return pushTable.get(messageQueue);
log.info("Get pushableChannel by key: {}", key);
return pushTable.get(key);
} }
private Subscription getSubscription(String groupId, ConsumeType consumeType, private Subscription getSubscription(String groupId, ConsumeType consumeType,
......
...@@ -18,6 +18,8 @@ package org.apache.rocketmq.snode.processor; ...@@ -18,6 +18,8 @@ package org.apache.rocketmq.snode.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.util.Attribute; import io.netty.util.Attribute;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
...@@ -72,30 +74,34 @@ public class HeartbeatProcessor implements RequestProcessor { ...@@ -72,30 +74,34 @@ public class HeartbeatProcessor implements RequestProcessor {
Client client = new Client(); Client client = new Client();
client.setClientId(heartbeatData.getClientID()); client.setClientId(heartbeatData.getClientID());
client.setRemotingChannel(remotingChannel); client.setRemotingChannel(remotingChannel);
Set<String> groupSet = new HashSet<>();
for (ProducerData producerData : heartbeatData.getProducerDataSet()) { for (ProducerData producerData : heartbeatData.getProducerDataSet()) {
client.setGroupId(producerData.getGroupName());
client.setClientRole(ClientRole.Producer); client.setClientRole(ClientRole.Producer);
this.snodeController.getProducerManager().register(client); groupSet.add(producerData.getGroupName());
this.snodeController.getProducerManager().register(producerData.getGroupName(), client);
} }
for (ConsumerData data : heartbeatData.getConsumerDataSet()) { log.info("Heartbeat consumerData: {}", heartbeatData.getConsumerDataSet());
client.setGroupId(data.getGroupName()); for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) {
client.setClientRole(ClientRole.Consumer); client.setClientRole(ClientRole.Consumer);
boolean channelChanged = this.snodeController.getConsumerManager().register(client); groupSet.add(consumerData.getGroupName());
boolean subscriptionChanged = this.snodeController.getSubscriptionManager().subscribe(data.getGroupName(), boolean channelChanged = this.snodeController.getConsumerManager().register(consumerData.getGroupName(), client);
data.getSubscriptionDataSet(), boolean subscriptionChanged = this.snodeController.getSubscriptionManager().subscribe(consumerData.getGroupName(),
data.getConsumeType(), consumerData.getSubscriptionDataSet(),
data.getMessageModel(), consumerData.getConsumeType(),
data.getConsumeFromWhere()); consumerData.getMessageModel(),
if (data.getConsumeType() == ConsumeType.CONSUME_PUSH) { consumerData.getConsumeFromWhere());
if (consumerData.getConsumeType() == ConsumeType.CONSUME_PUSH) {
NettyChannelImpl nettyChannel = new NettyChannelImpl(channel); NettyChannelImpl nettyChannel = new NettyChannelImpl(channel);
this.snodeController.getSubscriptionManager().registerPushSession(data.getSubscriptionDataSet(), nettyChannel, data.getGroupName()); this.snodeController.getSubscriptionManager().registerPushSession(consumerData.getSubscriptionDataSet(), nettyChannel, consumerData.getGroupName());
} }
if (subscriptionChanged || channelChanged) { if (subscriptionChanged || channelChanged) {
this.snodeController.getClientService().notifyConsumer(data.getGroupName()); this.snodeController.getClientService().notifyConsumer(consumerData.getGroupName());
} }
} }
if (groupSet.size() > 0) {
client.setGroups(groupSet);
}
clientAttribute.setIfAbsent(client); clientAttribute.setIfAbsent(client);
RemotingCommand response = RemotingCommand.createResponseCommand(null); RemotingCommand response = RemotingCommand.createResponseCommand(null);
......
...@@ -16,4 +16,5 @@ package org.apache.rocketmq.snode.service;/* ...@@ -16,4 +16,5 @@ package org.apache.rocketmq.snode.service;/*
*/ */
public interface AdminService { public interface AdminService {
} }
...@@ -16,12 +16,15 @@ ...@@ -16,12 +16,15 @@
*/ */
package org.apache.rocketmq.snode.service.impl; package org.apache.rocketmq.snode.service.impl;
import io.netty.channel.Channel;
import io.netty.util.Attribute;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.PushMessageHeader; import org.apache.rocketmq.common.protocol.header.PushMessageHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
...@@ -29,8 +32,11 @@ import org.apache.rocketmq.common.utils.ThreadUtils; ...@@ -29,8 +32,11 @@ import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.Subscription;
import org.apache.rocketmq.snode.constant.SnodeConstant; import org.apache.rocketmq.snode.constant.SnodeConstant;
import org.apache.rocketmq.snode.service.PushService; import org.apache.rocketmq.snode.service.PushService;
...@@ -80,24 +86,37 @@ public class PushServiceImpl implements PushService { ...@@ -80,24 +86,37 @@ public class PushServiceImpl implements PushService {
pushMessageHeader.setQueueId(queueId); pushMessageHeader.setQueueId(queueId);
RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.SNODE_PUSH_MESSAGE, pushMessageHeader); RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.SNODE_PUSH_MESSAGE, pushMessageHeader);
pushMessage.setBody(message); pushMessage.setBody(message);
Set<RemotingChannel> consumerTable = snodeController.getSubscriptionManager().getPushableChannel(topic, queueId); MessageQueue messageQueue = new MessageQueue(topic, enodeName, queueId);
Set<RemotingChannel> consumerTable = snodeController.getSubscriptionManager().getPushableChannel(messageQueue);
if (consumerTable != null) { if (consumerTable != null) {
for (RemotingChannel remotingChannel : consumerTable) { for (RemotingChannel remotingChannel : consumerTable) {
if (remotingChannel.isWritable()) { Client client = null;
boolean slowConsumer = snodeController.getSlowConsumerService().isSlowConsumer(sendMessageResponseHeader.getQueueOffset(), topic, queueId, remotingChannel, enodeName); if (remotingChannel instanceof NettyChannelImpl) {
if (slowConsumer) { Channel channel = ((NettyChannelImpl) remotingChannel).getChannel();
log.warn("[SlowConsumer]: {} closed as slow consumer", remotingChannel);//TODO metrics Attribute<Client> clientAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ATTRIBUTE_KEY);
remotingChannel.close(); if (clientAttribute != null) {
continue; client = clientAttribute.get();
}
}
if (client != null) {
for (String consumerGroup : client.getGroups()) {
Subscription subscription = snodeController.getSubscriptionManager().getSubscription(consumerGroup);
if (subscription.getSubscriptionData(topic) != null) {
boolean slowConsumer = snodeController.getSlowConsumerService().isSlowConsumer(sendMessageResponseHeader.getQueueOffset(), topic, queueId, consumerGroup, enodeName);
if (slowConsumer) {
log.warn("[SlowConsumer]: {} closed as slow consumer", remotingChannel);//TODO metrics
remotingChannel.close();//FIXME this action should be discussed
continue;
}
snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
}
} }
log.debug("Push message to remotingChannel: {}", remotingChannel.remoteAddress());
snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
} else { } else {
log.warn("Remoting channel is not writable: {}", remotingChannel.remoteAddress()); log.error("[NOTIFYME] Remoting channel: {} related client is null", remotingChannel.remoteAddress());
} }
} }
} else { } else {
log.warn("Get client info to topic: {} queueId: {} is null", topic, queueId); log.info("No online registered as push consumer and online for messageQueue: {} ", messageQueue);
} }
} catch (Exception ex) { } catch (Exception ex) {
log.warn("Push message to topic: {} queueId: {}", topic, queueId, ex); log.warn("Push message to topic: {} queueId: {}", topic, queueId, ex);
...@@ -116,13 +135,8 @@ public class PushServiceImpl implements PushService { ...@@ -116,13 +135,8 @@ public class PushServiceImpl implements PushService {
@Override @Override
public void pushMessage(final String enodeName, final String topic, final Integer queueId, final byte[] message, public void pushMessage(final String enodeName, final String topic, final Integer queueId, final byte[] message,
final RemotingCommand response) { final RemotingCommand response) {
Set<RemotingChannel> pushableChannels = this.snodeController.getSubscriptionManager().getPushableChannel(topic, queueId); PushTask pushTask = new PushTask(topic, queueId, message, response, enodeName);
if (pushableChannels != null) { pushMessageExecutorService.submit(pushTask);
PushTask pushTask = new PushTask(topic, queueId, message, response, enodeName);
pushMessageExecutorService.submit(pushTask);
} else {
log.info("Topic: {} QueueId: {} no need to push", topic, queueId);
}
} }
@Override @Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册