提交 56a1902f 编写于 作者: D duhenglucky

Add push sessiom management module

上级 cd4403ce
......@@ -40,6 +40,9 @@ public class ConsumerGroupInfo {
new ConcurrentHashMap<String, SubscriptionData>();
private final ConcurrentMap<RemotingChannel, ClientChannelInfo> channelInfoTable =
new ConcurrentHashMap<>(16);
private final ConcurrentHashMap<RemotingChannel, Set<SubscriptionData>> channelSubscriptionTable = new ConcurrentHashMap<>(16);
private volatile ConsumeType consumeType;
private volatile MessageModel messageModel;
private volatile ConsumeFromWhere consumeFromWhere;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class PushMessageHeader implements CommandCustomHeader {
@CFNotNull
private long queueOffset;
@CFNotNull
private String messageId;
@CFNotNull
private Integer queueId;
@CFNotNull
private String topic;
@Override
public void checkFields() throws RemotingCommandException {
}
public long getQueueOffset() {
return queueOffset;
}
public void setQueueOffset(long queueOffset) {
this.queueOffset = queueOffset;
}
public Integer getQueueId() {
return queueId;
}
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
}
......@@ -31,6 +31,7 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
private boolean classFilterMode = false;
private String topic;
private String subString;
private Set<Integer> queueIdSet = new HashSet<Integer>();
private Set<String> tagsSet = new HashSet<String>();
private Set<Integer> codeSet = new HashSet<Integer>();
private long subVersion = System.currentTimeMillis();
......@@ -113,6 +114,14 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
this.expressionType = expressionType;
}
public Set<Integer> getQueueIdSet() {
return queueIdSet;
}
public void setQueueIdSet(Set<Integer> queueIdSet) {
this.queueIdSet = queueIdSet;
}
@Override
public int hashCode() {
final int prime = 31;
......@@ -120,6 +129,7 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
result = prime * result + (classFilterMode ? 1231 : 1237);
result = prime * result + ((codeSet == null) ? 0 : codeSet.hashCode());
result = prime * result + ((subString == null) ? 0 : subString.hashCode());
result = prime * result + ((queueIdSet == null) ? 0 : queueIdSet.hashCode());
result = prime * result + ((tagsSet == null) ? 0 : tagsSet.hashCode());
result = prime * result + ((topic == null) ? 0 : topic.hashCode());
result = prime * result + ((expressionType == null) ? 0 : expressionType.hashCode());
......@@ -154,6 +164,11 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
return false;
} else if (!tagsSet.equals(other.tagsSet))
return false;
if (queueIdSet == null) {
if (other.queueIdSet != null)
return false;
} else if (!queueIdSet.equals(other.queueIdSet))
return false;
if (topic == null) {
if (other.topic != null)
return false;
......@@ -167,11 +182,18 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
return true;
}
@Override
public String toString() {
return "SubscriptionData [classFilterMode=" + classFilterMode + ", topic=" + topic + ", subString="
+ subString + ", tagsSet=" + tagsSet + ", codeSet=" + codeSet + ", subVersion=" + subVersion
+ ", expressionType=" + expressionType + "]";
@Override public String toString() {
return "SubscriptionData{" +
"classFilterMode=" + classFilterMode +
", topic='" + topic + '\'' +
", subString='" + subString + '\'' +
", queueIdSet=" + queueIdSet +
", tagsSet=" + tagsSet +
", codeSet=" + codeSet +
", subVersion=" + subVersion +
", expressionType='" + expressionType + '\'' +
", filterClassSource='" + filterClassSource + '\'' +
'}';
}
@Override
......
......@@ -18,7 +18,7 @@ package org.apache.rocketmq.common.protocol.route;/*
public class SnodeData {
private String snodeName;
private String addr;
private String address;
private String clusterName;
......@@ -38,19 +38,19 @@ public class SnodeData {
this.snodeName = snodeName;
}
public String getAddr() {
return addr;
public String getAddress() {
return address;
}
public void setAddr(String addr) {
this.addr = addr;
public void setAddress(String address) {
this.address = address;
}
@Override
public String toString() {
return "SnodeData{" +
"snodeName='" + snodeName + '\'' +
", addr='" + addr + '\'' +
", address='" + address + '\'' +
", clusterName='" + clusterName + '\'' +
'}';
}
......
......@@ -38,6 +38,8 @@ public class SubscriptionGroupConfig {
private boolean notifyConsumerIdsChangedEnable = true;
private boolean realPushEnable = false;
public String getGroupName() {
return groupName;
}
......@@ -110,6 +112,14 @@ public class SubscriptionGroupConfig {
this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable;
}
public boolean isRealPushEnable() {
return realPushEnable;
}
public void setRealPushEnable(boolean realPushEnable) {
this.realPushEnable = realPushEnable;
}
@Override
public int hashCode() {
final int prime = 31;
......@@ -118,6 +128,7 @@ public class SubscriptionGroupConfig {
result = prime * result + (consumeBroadcastEnable ? 1231 : 1237);
result = prime * result + (consumeEnable ? 1231 : 1237);
result = prime * result + (consumeFromMinEnable ? 1231 : 1237);
result = prime * result + (realPushEnable ? 1231 : 1237);
result = prime * result + (notifyConsumerIdsChangedEnable ? 1231 : 1237);
result = prime * result + ((groupName == null) ? 0 : groupName.hashCode());
result = prime * result + retryMaxTimes;
......@@ -144,6 +155,8 @@ public class SubscriptionGroupConfig {
return false;
if (consumeFromMinEnable != other.consumeFromMinEnable)
return false;
if (realPushEnable != other.realPushEnable)
return false;
if (groupName == null) {
if (other.groupName != null)
return false;
......@@ -160,13 +173,18 @@ public class SubscriptionGroupConfig {
return true;
}
@Override
public String toString() {
return "SubscriptionGroupConfig [groupName=" + groupName + ", consumeEnable=" + consumeEnable
+ ", consumeFromMinEnable=" + consumeFromMinEnable + ", consumeBroadcastEnable="
+ consumeBroadcastEnable + ", retryQueueNums=" + retryQueueNums + ", retryMaxTimes="
+ retryMaxTimes + ", brokerId=" + brokerId + ", whichBrokerWhenConsumeSlowly="
+ whichBrokerWhenConsumeSlowly + ", notifyConsumerIdsChangedEnable="
+ notifyConsumerIdsChangedEnable + "]";
@Override public String toString() {
return "SubscriptionGroupConfig{" +
"groupName='" + groupName + '\'' +
", consumeEnable=" + consumeEnable +
", consumeFromMinEnable=" + consumeFromMinEnable +
", consumeBroadcastEnable=" + consumeBroadcastEnable +
", retryQueueNums=" + retryQueueNums +
", retryMaxTimes=" + retryMaxTimes +
", brokerId=" + brokerId +
", whichBrokerWhenConsumeSlowly=" + whichBrokerWhenConsumeSlowly +
", notifyConsumerIdsChangedEnable=" + notifyConsumerIdsChangedEnable +
", realPushEnable=" + realPushEnable +
'}';
}
}
......@@ -119,7 +119,7 @@ public class RouteInfoManager {
snodeSet.add(snodeName);
}
SnodeData snodeData = new SnodeData();
snodeData.setAddr(snodeAddr);
snodeData.setAddress(snodeAddr);
snodeData.setSnodeName(snodeName);
snodeData.setClusterName(clusterName);
snodeTable.put(snodeName, snodeData);
......
......@@ -34,7 +34,9 @@ public interface RemotingServer extends RemotingService {
Pair<RequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
void push(final String addr, final String sessionId, RemotingCommand remotingCommand);
void push(RemotingChannel remotingChannel, RemotingCommand request,
long timeoutMillis) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
RemotingCommand invokeSync(final RemotingChannel remotingChannel, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
......
......@@ -161,7 +161,8 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
}
@Override
public void push(String addr, String sessionId, RemotingCommand remotingCommand) {
public void push(RemotingChannel remotingChannel, RemotingCommand request,
long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
}
......
......@@ -347,7 +347,9 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements
}
@Override
public void push(String addr, String sessionId, RemotingCommand remotingCommand) {
public void push(RemotingChannel remotingChannel, RemotingCommand request,
long timeoutMillis) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
this.invokeOneway(remotingChannel, request, timeoutMillis);
}
}
......@@ -15,6 +15,7 @@
* limitations under the License.
*/
package org.apache.rocketmq.snode;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
......@@ -38,6 +39,7 @@ import org.apache.rocketmq.snode.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.snode.client.ConsumerManager;
import org.apache.rocketmq.snode.client.DefaultConsumerIdsChangeListener;
import org.apache.rocketmq.snode.client.ProducerManager;
import org.apache.rocketmq.snode.client.PushSessionManager;
import org.apache.rocketmq.snode.client.SubscriptionGroupManager;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.interceptor.InterceptorFactory;
......@@ -45,14 +47,16 @@ import org.apache.rocketmq.snode.interceptor.InterceptorGroup;
import org.apache.rocketmq.snode.interceptor.Interceptor;
import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
import org.apache.rocketmq.snode.processor.ConsumerManageProcessor;
import org.apache.rocketmq.snode.processor.HearbeatProcessor;
import org.apache.rocketmq.snode.processor.HeartbeatProcessor;
import org.apache.rocketmq.snode.processor.PullMessageProcessor;
import org.apache.rocketmq.snode.processor.SendMessageProcessor;
import org.apache.rocketmq.snode.service.EnodeService;
import org.apache.rocketmq.snode.service.NnodeService;
import org.apache.rocketmq.snode.service.PushService;
import org.apache.rocketmq.snode.service.ScheduledService;
import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.PushServiceImpl;
import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl;
public class SnodeController {
......@@ -79,9 +83,11 @@ public class SnodeController {
private ConsumerManageProcessor consumerManageProcessor;
private SendMessageProcessor sendMessageProcessor;
private PullMessageProcessor pullMessageProcessor;
private HearbeatProcessor hearbeatProcessor;
private HeartbeatProcessor hearbeatProcessor;
private InterceptorGroup consumeMessageInterceptorGroup;
private InterceptorGroup sendMessageInterceptorGroup;
private PushSessionManager pushSessionManager;
private PushService pushService;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"SnodeControllerScheduledThread"));
......@@ -156,8 +162,11 @@ public class SnodeController {
this.consumerOffsetManager = new ConsumerOffsetManager(this);
this.consumerManageProcessor = new ConsumerManageProcessor(this);
this.sendMessageProcessor = new SendMessageProcessor(this);
this.hearbeatProcessor = new HearbeatProcessor(this);
this.hearbeatProcessor = new HeartbeatProcessor(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
this.pushSessionManager = new PushSessionManager();
this.pushService = new PushServiceImpl(this);
}
public SnodeConfig getSnodeConfig() {
......@@ -219,6 +228,7 @@ public class SnodeController {
this.remotingClient.shutdown();
this.scheduledService.shutdown();
this.clientHousekeepingService.shutdown();
this.pushService.shutdown();
}
public ProducerManager getProducerManager() {
......@@ -264,4 +274,12 @@ public class SnodeController {
public InterceptorGroup getSendMessageInterceptorGroup() {
return sendMessageInterceptorGroup;
}
public PushSessionManager getPushSessionManager() {
return pushSessionManager;
}
public PushService getPushService() {
return pushService;
}
}
......@@ -72,28 +72,22 @@ public class ClientHousekeepingService implements ChannelEventListener {
@Override
public void onChannelClose(String remoteAddr, RemotingChannel remotingChannel) {
NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel;
Channel channel = nettyChannel.getChannel();
log.info("Remoting channel closed: {}", RemotingHelper.parseChannelRemoteAddr(channel.remoteAddress()));
this.producerManager.doChannelCloseEvent(remoteAddr, channel);
this.producerManager.doChannelCloseEvent(remoteAddr, channel);
log.info("Remoting channel closed: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
this.producerManager.doChannelCloseEvent(remoteAddr, remotingChannel);
this.producerManager.doChannelCloseEvent(remoteAddr, remotingChannel);
}
@Override
public void onChannelException(String remoteAddr, RemotingChannel remotingChannel) {
NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel;
Channel channel = nettyChannel.getChannel();
log.info("Remoting channel exception: {}", RemotingHelper.parseChannelRemoteAddr(channel.remoteAddress()));
this.producerManager.doChannelCloseEvent(remoteAddr, channel);
this.consumerManager.doChannelCloseEvent(remoteAddr, channel);
log.info("Remoting channel exception: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
this.producerManager.doChannelCloseEvent(remoteAddr, remotingChannel);
this.consumerManager.doChannelCloseEvent(remoteAddr, remotingChannel);
}
@Override
public void onChannelIdle(String remoteAddr, RemotingChannel remotingChannel) {
NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel;
Channel channel = nettyChannel.getChannel();
log.info("Remoting channel idle: {}", RemotingHelper.parseChannelRemoteAddr(channel.remoteAddress()));
this.producerManager.doChannelCloseEvent(remoteAddr, channel);
this.consumerManager.doChannelCloseEvent(remoteAddr, channel);
log.info("Remoting channel idle: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
this.producerManager.doChannelCloseEvent(remoteAddr, remotingChannel);
this.consumerManager.doChannelCloseEvent(remoteAddr, remotingChannel);
}
}
......@@ -16,7 +16,7 @@
*/
package org.apache.rocketmq.snode.client;
import io.netty.channel.Channel;
import java.nio.channels.Channel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
......@@ -40,6 +40,8 @@ public class ConsumerGroupInfo {
new ConcurrentHashMap<>();
private final ConcurrentMap<RemotingChannel, ClientChannelInfo> channelInfoTable =
new ConcurrentHashMap<>(16);
private ConcurrentHashMap<RemotingChannel, Set<SubscriptionData>> channelSubscriptionTable = new ConcurrentHashMap<>(2048);
private volatile ConsumeType consumeType;
private volatile MessageModel messageModel;
private volatile ConsumeFromWhere consumeFromWhere;
......@@ -53,6 +55,10 @@ public class ConsumerGroupInfo {
this.consumeFromWhere = consumeFromWhere;
}
public Set<SubscriptionData> getSubscriotionDataSet(RemotingChannel remotingChannel) {
return channelSubscriptionTable.get(remotingChannel);
}
public ClientChannelInfo findChannel(final String clientId) {
Iterator<Entry<RemotingChannel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
while (it.hasNext()) {
......@@ -102,11 +108,15 @@ public class ConsumerGroupInfo {
}
}
public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
public boolean doChannelCloseEvent(final String remoteAddr, final RemotingChannel channel) {
final Set<SubscriptionData> subscriptionDataSet = this.channelSubscriptionTable.remove(channel);
if (subscriptionDataSet != null) {
log.warn("NETTY EVENT: remove not active channel[{}] from subscription table groupChannelTable, consumer group: {}",
subscriptionDataSet.toString(), groupName);
}
final ClientChannelInfo info = this.channelInfoTable.remove(channel);
if (info != null) {
log.warn(
"NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}",
log.warn("NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}",
info.toString(), groupName);
return true;
}
......@@ -114,6 +124,15 @@ public class ConsumerGroupInfo {
return false;
}
public void updateChannelSubscription(final ClientChannelInfo newClient,
final Set<SubscriptionData> subscriptionDataSet) {
this.channelSubscriptionTable.put(newClient.getChannel(), subscriptionDataSet);
}
public void removeChannelSubscription(final RemotingChannel remotingChannel) {
this.channelSubscriptionTable.remove(remotingChannel);
}
public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
boolean updated = false;
......
......@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.snode.client;
import io.netty.channel.Channel;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map.Entry;
......@@ -32,25 +31,21 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
public class ConsumerManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<>(1024);
private final ConsumerIdsChangeListener consumerIdsChangeListener;
private final ConcurrentHashMap<String/*Topic*/, ConcurrentHashMap<Integer/*QueueId*/, ClientChannelInfo>> topicConsumerTable = new ConcurrentHashMap<>(2048);
public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) {
this.consumerIdsChangeListener = consumerIdsChangeListener;
}
/**
* public ClientChannelInfo findChannel(final String group, final String clientId) { ConsumerGroupInfo
* consumerGroupInfo = this.consumerTable.get(group); if (consumerGroupInfo != null) { return
* consumerGroupInfo.findChannel(clientId); } return null; }
**/
public SubscriptionData findSubscriptionData(final String group, final String topic) {
ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
if (consumerGroupInfo != null) {
......@@ -73,11 +68,18 @@ public class ConsumerManager {
return 0;
}
public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
private void removePushSession(final ConsumerGroupInfo info, final RemotingChannel channel) {
Set<SubscriptionData> subscriptionDataSet = info.getSubscriotionDataSet(channel);
removeConsumerTopicTable(subscriptionDataSet, channel);
}
public void doChannelCloseEvent(final String remoteAddr, final RemotingChannel channel) {
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConsumerGroupInfo> next = it.next();
ConsumerGroupInfo info = next.getValue();
removePushSession(info, channel);
boolean removed = info.doChannelCloseEvent(remoteAddr, channel);
if (removed) {
if (info.getChannelInfoTable().isEmpty()) {
......@@ -109,12 +111,13 @@ public class ConsumerManager {
consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList);
consumerGroupInfo.updateChannelSubscription(clientChannelInfo, subList);
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
......@@ -125,6 +128,8 @@ public class ConsumerManager {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null != consumerGroupInfo) {
consumerGroupInfo.unregisterChannel(clientChannelInfo);
consumerGroupInfo.removeChannelSubscription(clientChannelInfo.getChannel());
removePushSession(consumerGroupInfo, clientChannelInfo.getChannel());
if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove = this.consumerTable.remove(group);
if (remove != null) {
......@@ -184,4 +189,47 @@ public class ConsumerManager {
}
return groups;
}
public void updateTopicConsumerTable(Set<SubscriptionData> subscriptionDataSet,
ClientChannelInfo clientChannelInfo) {
for (SubscriptionData subscriptionData : subscriptionDataSet) {
String topic = subscriptionData.getTopic();
for (Integer queueId : subscriptionData.getQueueIdSet()) {
ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
if (clientChannelInfoMap == null) {
clientChannelInfoMap = new ConcurrentHashMap<>();
ConcurrentHashMap prev = this.topicConsumerTable.putIfAbsent(topic, clientChannelInfoMap);
if (prev != null) {
clientChannelInfoMap = prev;
}
}
clientChannelInfoMap.put(queueId, clientChannelInfo);
}
}
}
public ClientChannelInfo getClientInfoTable(String topic, long queueId) {
ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
if (clientChannelInfoMap != null) {
return clientChannelInfoMap.get(queueId);
}
return null;
}
public void removeConsumerTopicTable(Set<SubscriptionData> subscriptionDataSet,
RemotingChannel remotingChannel) {
for (SubscriptionData subscriptionData : subscriptionDataSet) {
String topic = subscriptionData.getTopic();
for (Integer queueId : subscriptionData.getQueueIdSet()) {
ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
if (clientChannelInfoMap != null) {
ClientChannelInfo old = clientChannelInfoMap.get(queueId);
if (old != null && old.getChannel() == remotingChannel) {
clientChannelInfoMap.remove(queueId, old);
}
}
}
}
}
}
......@@ -99,7 +99,7 @@ public class ProducerManager {
}
}
public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
public void doChannelCloseEvent(final String remoteAddr, final RemotingChannel channel) {
if (channel != null) {
try {
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
......
package org.apache.rocketmq.snode.client;/*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
......@@ -14,10 +14,43 @@ package org.apache.rocketmq.snode.client;/*
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.client;
import java.util.concurrent.ConcurrentHashMap;
public class PushSessionManager {
private ConcurrentHashMap<String/*topic*/, PushSession> topicPushSessionMap = new ConcurrentHashMap<>(1024);
private final ConcurrentHashMap<String/*Topic*/, ConcurrentHashMap<Integer/*QueueId*/, ClientChannelInfo>> topicConsumerTable = new ConcurrentHashMap<>(2048);
public void updateTopicConsumerTable(String topic, int queueId, ClientChannelInfo clientChannelInfo) {
ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
if (clientChannelInfoMap == null) {
clientChannelInfoMap = new ConcurrentHashMap<>();
ConcurrentHashMap prev = this.topicConsumerTable.putIfAbsent(topic, clientChannelInfoMap);
if (prev != null) {
clientChannelInfoMap = prev;
}
}
clientChannelInfoMap.put(queueId, clientChannelInfo);
}
public ClientChannelInfo getClientInfoTable(String topic, long queueId) {
ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
if (clientChannelInfoMap != null) {
return clientChannelInfoMap.get(queueId);
}
return null;
}
public void removeConsumerTopicTable(String topic, Integer queueId, ClientChannelInfo clientChannelInfo) {
ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
if (clientChannelInfoMap != null) {
ClientChannelInfo old = clientChannelInfoMap.get(queueId);
//TODO Thread safe issue: wait for the next heartbeat
if (old == clientChannelInfo) {
clientChannelInfoMap.remove(queueId, clientChannelInfo);
}
}
}
}
......@@ -30,7 +30,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import org.apache.rocketmq.snode.SnodeController;
public class SubscriptionGroupManager extends ConfigManager {
public class SubscriptionGroupManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
......@@ -134,18 +134,10 @@ public class SubscriptionGroupManager extends ConfigManager {
return subscriptionGroupConfig;
}
@Override
public String encode() {
return this.encode(false);
}
@Override
public String configFilePath() {
//TODO get subscription persist request code
return null;
}
@Override
public void decode(String jsonString) {
if (jsonString != null) {
SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);
......
......@@ -15,6 +15,7 @@
* limitations under the License.
*/
package org.apache.rocketmq.snode.config;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
......@@ -60,6 +61,13 @@ public class SnodeConfig {
private boolean autoCreateSubscriptionGroup = true;
private int snodePushMessageMinPoolSize = 10;
private int snodePushMessageMaxPoolSize = 20;
private int snodePushMessageThreadPoolQueueCapacity = 10000;
private int listenPort = 11911;
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
......@@ -232,4 +240,28 @@ public class SnodeConfig {
public void setVipChannelEnabled(boolean vipChannelEnabled) {
this.vipChannelEnabled = vipChannelEnabled;
}
public int getSnodePushMessageMinPoolSize() {
return snodePushMessageMinPoolSize;
}
public void setSnodePushMessageMinPoolSize(int snodePushMessageMinPoolSize) {
this.snodePushMessageMinPoolSize = snodePushMessageMinPoolSize;
}
public int getSnodePushMessageMaxPoolSize() {
return snodePushMessageMaxPoolSize;
}
public void setSnodePushMessageMaxPoolSize(int snodePushMessageMaxPoolSize) {
this.snodePushMessageMaxPoolSize = snodePushMessageMaxPoolSize;
}
public int getSnodePushMessageThreadPoolQueueCapacity() {
return snodePushMessageThreadPoolQueueCapacity;
}
public void setSnodePushMessageThreadPoolQueueCapacity(int snodePushMessageThreadPoolQueueCapacity) {
this.snodePushMessageThreadPoolQueueCapacity = snodePushMessageThreadPoolQueueCapacity;
}
}
......@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.snode.processor;
import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
......@@ -34,11 +35,11 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.ClientChannelInfo;
public class HearbeatProcessor implements RequestProcessor {
public class HeartbeatProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeController snodeController;
public HearbeatProcessor(SnodeController snodeController) {
public HeartbeatProcessor(SnodeController snodeController) {
this.snodeController = snodeController;
}
......@@ -97,6 +98,10 @@ public class HearbeatProcessor implements RequestProcessor {
RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress())
);
}
if (subscriptionGroupConfig.isRealPushEnable()) {
this.snodeController.getConsumerManager().updateTopicConsumerTable(data.getSubscriptionDataSet(), clientChannelInfo);
}
}
}
RemotingCommand response = RemotingCommand.createResponseCommand(null);
......
/*
package org.apache.rocketmq.snode.service;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
......@@ -14,38 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.client;
import io.netty.channel.Channel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
public class PushSession {
private SubscriptionData subscriptionData;
public interface MetricsService {
private Channel channel;
private String consumerGroup;
public SubscriptionData getSubscriptionData() {
return subscriptionData;
}
public void setSubscriptionData(SubscriptionData subscriptionData) {
this.subscriptionData = subscriptionData;
}
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
public String getConsumerGroup() {
return consumerGroup;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
}
......@@ -15,12 +15,18 @@
* limitations under the License.
*/
package org.apache.rocketmq.snode.service;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RemotingChannel;
public interface PushService {
boolean registerPushSession(String consumerGroup);
void unregisterPushSession(String consumerGroup);
void pushMessage(Message message);
void pushMessage(final String messageId, final byte[] message, final Integer queueId, final String topic,
final long queueOffset);
void start();
void shutdown();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.service.impl;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.header.PushMessageHeader;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.ClientChannelInfo;
import org.apache.rocketmq.snode.constant.SnodeConstant;
import org.apache.rocketmq.snode.service.PushService;
public class PushServiceImpl implements PushService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private SnodeController snodeController;
private ExecutorService pushMessageExecutorService;
public PushServiceImpl(final SnodeController snodeController) {
this.snodeController = snodeController;
pushMessageExecutorService = ThreadUtils.newThreadPoolExecutor(
this.snodeController.getSnodeConfig().getSnodePushMessageMinPoolSize(),
this.snodeController.getSnodeConfig().getSnodePushMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(this.snodeController.getSnodeConfig().getSnodeSendThreadPoolQueueCapacity()),
"SnodePushMessageThread",
false);
}
public class PushTask implements Runnable {
private AtomicBoolean canceled;
private final String messageId;
private final byte[] message;
private final Integer queueId;
private final String topic;
private final long queueOffset;
public PushTask(final String messageId, final byte[] message, final Integer queueId, final String topic,
final long queueOffset) {
this.messageId = messageId;
this.message = message;
this.queueId = queueId;
this.topic = topic;
this.queueOffset = queueOffset;
}
@Override
public void run() {
if (!canceled.get()) {
PushMessageHeader pushMessageHeader = new PushMessageHeader();
pushMessageHeader.setMessageId(this.messageId);
pushMessageHeader.setQueueOffset(queueOffset);
pushMessageHeader.setTopic(topic);
pushMessageHeader.setQueueId(queueId);
RemotingCommand pushMessage = RemotingCommand.createResponseCommand(PushMessageHeader.class);
pushMessage.setBody(message);
pushMessage.setCustomHeader(pushMessageHeader);
try {
ClientChannelInfo clientChannelInfo = snodeController.getPushSessionManager().getClientInfoTable(topic, queueId);
if (clientChannelInfo != null) {
RemotingChannel remotingChannel = clientChannelInfo.getChannel();
snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.defaultTimeoutMills);
} else {
log.warn("Get client info to topic: {} queueId: {} is null", topic, queueId);
}
} catch (Exception ex) {
log.warn("Push message to topic: {} queueId: {} ex:{}", topic, queueId, ex);
}
}
}
public AtomicBoolean getCanceled() {
return canceled;
}
public void setCanceled(AtomicBoolean canceled) {
this.canceled = canceled;
}
}
@Override
public boolean registerPushSession(String consumerGroup) {
return false;
}
@Override
public void unregisterPushSession(String consumerGroup) {
}
@Override
public void pushMessage(final String messageId, final byte[] message, final Integer queueId, final String topic,
final long queueOffset) {
PushTask pushTask = new PushTask(messageId, message, queueId, topic, queueOffset);
pushMessageExecutorService.submit(pushTask);
}
@Override
public void start() {
}
@Override
public void shutdown() {
this.pushMessageExecutorService.shutdown();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册