提交 2fb8b5f2 编写于 作者: D duhenglucky

Add consumer group management for push module

上级 41366dda
......@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.snode.client;
import io.netty.channel.Channel;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
......@@ -27,8 +26,9 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
/**
* TODO Refactor housekeeping service
*/
public class ClientHousekeepingService implements ChannelEventListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ProducerManager producerManager;
......@@ -74,7 +74,7 @@ public class ClientHousekeepingService implements ChannelEventListener {
public void onChannelClose(String remoteAddr, RemotingChannel remotingChannel) {
log.info("Remoting channel closed: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
this.producerManager.doChannelCloseEvent(remoteAddr, remotingChannel);
this.producerManager.doChannelCloseEvent(remoteAddr, remotingChannel);
this.consumerManager.doChannelCloseEvent(remoteAddr, remotingChannel);
}
@Override
......
......@@ -37,8 +37,8 @@ public class ConsumerGroupInfo {
private final String groupName;
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<>();
private final ConcurrentMap<RemotingChannel, ClientChannelInfo> channelInfoTable =
new ConcurrentHashMap<>(16);
private final ConcurrentMap<RemotingChannel, ClientChannelInfo> channelInfoTable = new ConcurrentHashMap<>(16);
private ConcurrentHashMap<RemotingChannel, Set<SubscriptionData>> channelSubscriptionTable = new ConcurrentHashMap<>(2048);
private volatile ConsumeType consumeType;
......@@ -129,7 +129,10 @@ public class ConsumerGroupInfo {
}
public void removeChannelSubscription(final RemotingChannel remotingChannel) {
this.channelSubscriptionTable.remove(remotingChannel);
Set<SubscriptionData> subscriptionDataSet = this.channelSubscriptionTable.remove(remotingChannel);
if (subscriptionDataSet != null) {
log.debug("Unregister a push session[{}] from consumerGroupInfo {}", this.groupName, subscriptionDataSet);
}
}
public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
......
......@@ -32,15 +32,19 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* TODO Refactor this manager
*/
public class ConsumerManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<>(1024);
private final ConcurrentMap<String/*Group*/, ConsumerGroupInfo> consumerTable = new ConcurrentHashMap<>(1024);
private final ConcurrentHashMap<String/*Topic*/, ConcurrentHashMap<Integer/*QueueId*/, ConcurrentHashMap<String/*ConsumerGroup*/, ClientChannelInfo>>> topicConsumerTable = new ConcurrentHashMap<>(2048);
private final ConsumerIdsChangeListener consumerIdsChangeListener;
private final ConcurrentHashMap<String/*Topic*/, ConcurrentHashMap<Integer/*QueueId*/, ClientChannelInfo>> topicConsumerTable = new ConcurrentHashMap<>(2048);
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) {
this.consumerIdsChangeListener = consumerIdsChangeListener;
......@@ -68,10 +72,10 @@ public class ConsumerManager {
return 0;
}
private void removePushSession(final ConsumerGroupInfo info, final RemotingChannel channel) {
private void clearPushSession(final String consumerGroup, final ConsumerGroupInfo info,
final RemotingChannel channel) {
Set<SubscriptionData> subscriptionDataSet = info.getSubscriotionDataSet(channel);
removeConsumerTopicTable(subscriptionDataSet, channel);
removeConsumerTopicTable(consumerGroup, subscriptionDataSet, channel);
}
public void doChannelCloseEvent(final String remoteAddr, final RemotingChannel channel) {
......@@ -79,7 +83,7 @@ public class ConsumerManager {
while (it.hasNext()) {
Entry<String, ConsumerGroupInfo> next = it.next();
ConsumerGroupInfo info = next.getValue();
removePushSession(info, channel);
clearPushSession(info.getGroupName(), info, channel);
boolean removed = info.doChannelCloseEvent(remoteAddr, channel);
if (removed) {
if (info.getChannelInfoTable().isEmpty()) {
......@@ -105,9 +109,9 @@ public class ConsumerManager {
consumerGroupInfo = prev != null ? prev : tmp;
}
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList);
consumerGroupInfo.updateChannelSubscription(clientChannelInfo, subList);
......@@ -128,7 +132,7 @@ public class ConsumerManager {
if (null != consumerGroupInfo) {
consumerGroupInfo.unregisterChannel(clientChannelInfo);
consumerGroupInfo.removeChannelSubscription(clientChannelInfo.getChannel());
removePushSession(consumerGroupInfo, clientChannelInfo.getChannel());
clearPushSession(group, consumerGroupInfo, clientChannelInfo.getChannel());
if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove = this.consumerTable.remove(group);
if (remove != null) {
......@@ -189,46 +193,67 @@ public class ConsumerManager {
return groups;
}
public void updateTopicConsumerTable(Set<SubscriptionData> subscriptionDataSet,
public void registerPushSession(String consumerGroup, Set<SubscriptionData> subscriptionDataSet,
ClientChannelInfo clientChannelInfo) {
for (SubscriptionData subscriptionData : subscriptionDataSet) {
String topic = subscriptionData.getTopic();
for (Integer queueId : subscriptionData.getQueueIdSet()) {
ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
if (clientChannelInfoMap == null) {
clientChannelInfoMap = new ConcurrentHashMap<>();
ConcurrentHashMap prev = this.topicConsumerTable.putIfAbsent(topic, clientChannelInfoMap);
if (prev != null) {
clientChannelInfoMap = prev;
if (clientChannelInfo != null) {
for (SubscriptionData subscriptionData : subscriptionDataSet) {
String topic = subscriptionData.getTopic();
for (Integer queueId : subscriptionData.getQueueIdSet()) {
ConcurrentHashMap<Integer, ConcurrentHashMap<String, ClientChannelInfo>> clientChannelInfoMap = this.topicConsumerTable.get(topic);
if (clientChannelInfoMap == null) {
clientChannelInfoMap = new ConcurrentHashMap<>();
ConcurrentHashMap prev = this.topicConsumerTable.putIfAbsent(topic, clientChannelInfoMap);
if (prev != null) {
clientChannelInfoMap = prev;
}
}
log.info("Register push for consumer group: {} topic: {}, queueId: {}", consumerGroup, topic, queueId);
ConcurrentHashMap<String, ClientChannelInfo> consumerGroupChannelTable = clientChannelInfoMap.get(queueId);
if (consumerGroupChannelTable == null) {
consumerGroupChannelTable = new ConcurrentHashMap<>();
ConcurrentHashMap<String, ClientChannelInfo> preMap = clientChannelInfoMap.putIfAbsent(queueId, consumerGroupChannelTable);
if (preMap != null) {
consumerGroupChannelTable = preMap;
}
}
consumerGroupChannelTable.putIfAbsent(consumerGroup, clientChannelInfo);
clientChannelInfoMap.put(queueId, consumerGroupChannelTable);
}
clientChannelInfoMap.put(queueId, clientChannelInfo);
}
}
}
public ClientChannelInfo getClientInfoTable(String topic, Integer queueId) {
ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
public ConcurrentHashMap getClientInfoTable(String topic, Integer queueId) {
ConcurrentHashMap<Integer, ConcurrentHashMap<String, ClientChannelInfo>> clientChannelInfoMap = this.topicConsumerTable.get(topic);
if (clientChannelInfoMap != null) {
return clientChannelInfoMap.get(queueId);
}
return null;
}
public void removeConsumerTopicTable(Set<SubscriptionData> subscriptionDataSet,
public void removeConsumerTopicTable(String consumerGroup, Set<SubscriptionData> subscriptionDataSet,
RemotingChannel remotingChannel) {
for (SubscriptionData subscriptionData : subscriptionDataSet) {
String topic = subscriptionData.getTopic();
for (Integer queueId : subscriptionData.getQueueIdSet()) {
ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
if (clientChannelInfoMap != null) {
ClientChannelInfo old = clientChannelInfoMap.get(queueId);
if (old != null && old.getChannel() == remotingChannel) {
clientChannelInfoMap.remove(queueId, old);
if (subscriptionDataSet != null) {
for (SubscriptionData subscriptionData : subscriptionDataSet) {
String topic = subscriptionData.getTopic();
for (Integer queueId : subscriptionData.getQueueIdSet()) {
ConcurrentHashMap<Integer, ConcurrentHashMap<String, ClientChannelInfo>> clientChannelInfoMap = this.topicConsumerTable.get(topic);
if (clientChannelInfoMap != null) {
ConcurrentHashMap<String, ClientChannelInfo> queueConsumerGroupMap = clientChannelInfoMap.get(queueId);
if (queueConsumerGroupMap != null) {
ClientChannelInfo clientChannelInfo = queueConsumerGroupMap.get(consumerGroup);
if (clientChannelInfo.getChannel().equals(remotingChannel)) {
log.info("Remove push topic: {}, queueId: {}, consumerGroup:{} session", topic, queueId, consumerGroup);
queueConsumerGroupMap.remove(consumerGroup, clientChannelInfo);
}
}
if (clientChannelInfoMap.isEmpty()) {
log.info("All consumer offline, so remove this map");
this.topicConsumerTable.remove(topic, clientChannelInfoMap);
}
}
}
}
}
}
}
......@@ -101,7 +101,7 @@ public class HeartbeatProcessor implements RequestProcessor {
}
if (data.isRealPushEnable()) {
this.snodeController.getConsumerManager().updateTopicConsumerTable(data.getSubscriptionDataSet(), clientChannelInfo);
this.snodeController.getConsumerManager().registerPushSession(data.getGroupName(), data.getSubscriptionDataSet(), clientChannelInfo);
}
}
}
......
......@@ -16,7 +16,11 @@
*/
package org.apache.rocketmq.snode.service.impl;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
......@@ -76,15 +80,19 @@ public class PushServiceImpl implements PushService {
pushMessageHeader.setQueueOffset(sendMessageResponseHeader.getQueueOffset());
pushMessageHeader.setTopic(topic);
pushMessageHeader.setQueueId(queueId);
RemotingCommand pushMessage = RemotingCommand.createResponseCommand(PushMessageHeader.class);
RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.SNODE_PUSH_MESSAGE, pushMessageHeader);
pushMessage.setBody(message);
pushMessage.setCustomHeader(pushMessageHeader);
pushMessage.setCode(RequestCode.SNODE_PUSH_MESSAGE);
ClientChannelInfo clientChannelInfo = snodeController.getConsumerManager().getClientInfoTable(topic, queueId);
if (clientChannelInfo != null) {
log.warn("Push message to topic: {} queueId: {}, message:{}", topic, queueId, pushMessage);
RemotingChannel remotingChannel = clientChannelInfo.getChannel();
snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.defaultTimeoutMills);
ConcurrentHashMap consumerGroupTable = snodeController.getConsumerManager().getClientInfoTable(topic, queueId);
if (consumerGroupTable != null) {
Iterator<Map.Entry<String, ClientChannelInfo>> itChannel = consumerGroupTable.entrySet().iterator();
while (itChannel.hasNext()) {
Entry<String, ClientChannelInfo> clientChannelInfoEntry = itChannel.next();
RemotingChannel remotingChannel = clientChannelInfoEntry.getValue().getChannel();
if (remotingChannel.isWritable()) {
log.warn("Push message to topic: {} queueId: {} consumer group:{}, message:{}", topic, queueId, clientChannelInfoEntry.getKey(), pushMessage);
snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.defaultTimeoutMills);
}
}
} else {
log.warn("Get client info to topic: {} queueId: {} is null", topic, queueId);
}
......@@ -115,8 +123,8 @@ public class PushServiceImpl implements PushService {
@Override
public void pushMessage(final String topic, final Integer queueId, final byte[] message,
final RemotingCommand response) {
ClientChannelInfo clientChannelInfo = this.snodeController.getConsumerManager().getClientInfoTable(topic, queueId);
if (clientChannelInfo != null) {
ConcurrentHashMap<String, ClientChannelInfo> clientChannelInfoTable = this.snodeController.getConsumerManager().getClientInfoTable(topic, queueId);
if (clientChannelInfoTable != null) {
PushTask pushTask = new PushTask(topic, queueId, message, response);
pushMessageExecutorService.submit(pushTask);
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册