提交 5440b1c4 编写于 作者: D duhenglucky

Add slow consumer process service for push model

上级 2be22d2f
......@@ -39,10 +39,12 @@ import org.apache.rocketmq.remoting.interceptor.InterceptorFactory;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.snode.client.ClientHousekeepingService;
import org.apache.rocketmq.snode.client.ClientManager;
import org.apache.rocketmq.snode.client.SlowConsumerService;
import org.apache.rocketmq.snode.client.SubscriptionGroupManager;
import org.apache.rocketmq.snode.client.SubscriptionManager;
import org.apache.rocketmq.snode.client.impl.ConsumerManagerImpl;
import org.apache.rocketmq.snode.client.impl.ProducerManagerImpl;
import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl;
import org.apache.rocketmq.snode.client.impl.SubscriptionManagerImpl;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
......@@ -77,10 +79,8 @@ public class SnodeController {
private NnodeService nnodeService;
private ExecutorService consumerManagerExecutor;
private ScheduledService scheduledService;
// private ProducerManager producerManager;
// private ConsumerManager consumerManager;
private ClientManager producerManagerImpl;
private ClientManager consumerManagerImpl;
private ClientManager producerManager;
private ClientManager consumerManager;
private SubscriptionManager subscriptionManager;
private ClientHousekeepingService clientHousekeepingService;
private SubscriptionGroupManager subscriptionGroupManager;
......@@ -94,6 +94,7 @@ public class SnodeController {
private InterceptorGroup sendMessageInterceptorGroup;
private PushService pushService;
private ClientService clientService;
private SlowConsumerService slowConsumerService;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"SnodeControllerScheduledThread"));
......@@ -172,9 +173,10 @@ public class SnodeController {
this.pushService = new PushServiceImpl(this);
this.clientService = new ClientServiceImpl(this);
this.subscriptionManager = new SubscriptionManagerImpl();
this.producerManagerImpl = new ProducerManagerImpl();
this.consumerManagerImpl = new ConsumerManagerImpl(this);
this.clientHousekeepingService = new ClientHousekeepingService(this.producerManagerImpl, this.consumerManagerImpl);
this.producerManager = new ProducerManagerImpl();
this.consumerManager = new ConsumerManagerImpl(this);
this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, this.consumerManager);
this.slowConsumerService = new SlowConsumerServiceImpl(this);
}
public SnodeConfig getSnodeConfig() {
......@@ -257,18 +259,10 @@ public class SnodeController {
this.pushService.shutdown();
}
// public ProducerManager getProducerManager() {
// return producerManager;
// }
public RemotingServer getSnodeServer() {
return snodeServer;
}
// public ConsumerManager getConsumerManager() {
// return consumerManager;
// }
public SubscriptionGroupManager getSubscriptionGroupManager() {
return subscriptionGroupManager;
}
......@@ -326,20 +320,20 @@ public class SnodeController {
this.remotingServerInterceptorGroup = remotingServerInterceptorGroup;
}
public ClientManager getProducerManagerImpl() {
return producerManagerImpl;
public ClientManager getProducerManager() {
return producerManager;
}
public void setProducerManagerImpl(ClientManager producerManagerImpl) {
this.producerManagerImpl = producerManagerImpl;
public void setProducerManager(ClientManager producerManager) {
this.producerManager = producerManager;
}
public ClientManager getConsumerManagerImpl() {
return consumerManagerImpl;
public ClientManager getConsumerManager() {
return consumerManager;
}
public void setConsumerManagerImpl(ClientManager consumerManagerImpl) {
this.consumerManagerImpl = consumerManagerImpl;
public void setConsumerManager(ClientManager consumerManager) {
this.consumerManager = consumerManager;
}
public SubscriptionManager getSubscriptionManager() {
......@@ -357,4 +351,12 @@ public class SnodeController {
public void setClientService(ClientService clientService) {
this.clientService = clientService;
}
public SlowConsumerService getSlowConsumerService() {
return slowConsumerService;
}
public void setSlowConsumerService(SlowConsumerService slowConsumerService) {
this.slowConsumerService = slowConsumerService;
}
}
......@@ -53,9 +53,7 @@ public class Client {
if (o == null || getClass() != o.getClass())
return false;
Client client = (Client) o;
return heartbeatInterval == client.heartbeatInterval &&
lastUpdateTimestamp == client.lastUpdateTimestamp &&
version == client.version &&
return version == client.version &&
clientRole == client.clientRole &&
Objects.equals(groupId, client.groupId) &&
Objects.equals(clientId, client.clientId) &&
......@@ -65,7 +63,7 @@ public class Client {
@Override
public int hashCode() {
return Objects.hash(clientRole, groupId, clientId, remotingChannel, heartbeatInterval, lastUpdateTimestamp, version, language);
return Objects.hash(clientRole, groupId, clientId, remotingChannel, version, language);
}
public String getGroupId() {
......@@ -123,4 +121,19 @@ public class Client {
public void setLanguage(LanguageCode language) {
this.language = language;
}
@Override public String toString() {
return "Client{" +
"clientRole=" + clientRole +
", groupId='" + groupId + '\'' +
", clientId='" + clientId + '\'' +
", remotingChannel=" + remotingChannel +
", heartbeatInterval=" + heartbeatInterval +
", lastUpdateTimestamp=" + lastUpdateTimestamp +
", version=" + version +
", language=" + language +
'}';
}
}
......@@ -52,9 +52,9 @@ public class ClientHousekeepingService implements ChannelEventListener {
private ClientRole clientRole(RemotingChannel remotingChannel) {
if (remotingChannel instanceof NettyChannelImpl) {
Channel channel = ((NettyChannelImpl) remotingChannel).getChannel();
Attribute<ClientRole> clientRoleAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ROLE_ATTRIBUTE_KEY);
if (clientRoleAttribute != null) {
return clientRoleAttribute.get();
Attribute<Client> clientAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ATTRIBUTE_KEY);
if (clientAttribute != null) {
return clientAttribute.get().getClientRole();
}
}
log.warn("RemotingChannel type error: {}", remotingChannel.getClass());
......
......@@ -20,5 +20,6 @@ import org.apache.rocketmq.remoting.RemotingChannel;
public interface SlowConsumerService {
boolean isSlowConsumer(long latestLogicOffset, String topic, String queueId, RemotingChannel remotingChannel);
boolean isSlowConsumer(long latestLogicOffset, String topic, int queueId, RemotingChannel remotingChannel,
String enodeName);
}
......@@ -16,8 +16,6 @@
*/
package org.apache.rocketmq.snode.client;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.DataVersion;
......@@ -26,20 +24,15 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import org.apache.rocketmq.snode.SnodeController;
public class SubscriptionGroupManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
new ConcurrentHashMap<>(1024);
private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable = new ConcurrentHashMap<>(1024);
private final DataVersion dataVersion = new DataVersion();
private transient SnodeController snodeController;
public SubscriptionGroupManager() {
this.init();
}
private transient SnodeController snodeController;
public SubscriptionGroupManager(SnodeController snodeController) {
this.snodeController = snodeController;
......@@ -47,51 +40,6 @@ public class SubscriptionGroupManager {
}
private void init() {
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);
this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);
}
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);
this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);
}
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);
this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);
}
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);
}
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);
}
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);
}
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);
}
}
public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
......@@ -133,32 +81,7 @@ public class SubscriptionGroupManager {
return subscriptionGroupConfig;
}
public String encode() {
return this.encode(false);
}
public void decode(String jsonString) {
if (jsonString != null) {
SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);
if (obj != null) {
this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);
this.dataVersion.assignNewOne(obj.dataVersion);
this.printLoadDataWhenFirstBoot(obj);
}
}
}
public String encode(final boolean prettyFormat) {
return RemotingSerializable.toJson(this, prettyFormat);
}
private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {
Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();
while (it.hasNext()) {
Entry<String, SubscriptionGroupConfig> next = it.next();
log.info("load exist subscription group, {}", next.getValue().toString());
}
}
public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
return subscriptionGroupTable;
......
......@@ -35,9 +35,9 @@ public interface SubscriptionManager {
Subscription getSubscription(String groupId);
void registerPush(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel, String groupId);
void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel, String groupId);
void removePush(RemotingChannel remotingChannel);
void removePushSession(RemotingChannel remotingChannel);
Set<RemotingChannel> getPushableChannel(String topic, Integer queueId);
}
......@@ -122,6 +122,7 @@ public abstract class ClientManagerImpl implements ClientManager {
}
oldClient.setLastUpdateTimestamp(System.currentTimeMillis());
}
log.debug("Register client role: {}, group: {}, last: {}", client.getClientRole(), client.getGroupId(), client.getLastUpdateTimestamp());
onRegister(client.getGroupId(), client.getRemotingChannel());
return updated;
}
......
......@@ -39,12 +39,12 @@ public class ConsumerManagerImpl extends ClientManagerImpl {
@Override
public void onClosed(String groupId, RemotingChannel remotingChannel) {
this.snodeController.getClientService().notifyConsumer(groupId);
this.snodeController.getSubscriptionManager().removePush(remotingChannel);
this.snodeController.getSubscriptionManager().removePushSession(remotingChannel);
}
@Override
public void onUnregister(String groupId, RemotingChannel remotingChannel) {
this.snodeController.getClientService().notifyConsumer(groupId);
this.snodeController.getSubscriptionManager().removePush(remotingChannel);
this.snodeController.getSubscriptionManager().removePushSession(remotingChannel);
}
}
......@@ -16,14 +16,45 @@
*/
package org.apache.rocketmq.snode.client.impl;
import io.netty.channel.Channel;
import io.netty.util.Attribute;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.SlowConsumerService;
import org.apache.rocketmq.snode.constant.SnodeConstant;
public class SlowConsumerServiceImpl implements SlowConsumerService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeController snodeController;
public SlowConsumerServiceImpl(SnodeController snodeController) {
this.snodeController = snodeController;
}
@Override
public boolean isSlowConsumer(long latestLogicOffset, String topic, String queueId,
RemotingChannel remotingChannel) {
public boolean isSlowConsumer(long latestLogicOffset, String topic, int queueId,
RemotingChannel remotingChannel, String enodeName) {
Client client = null;
if (remotingChannel instanceof NettyChannelImpl) {
Channel channel = ((NettyChannelImpl) remotingChannel).getChannel();
Attribute<Client> clientAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ATTRIBUTE_KEY);
if (clientAttribute != null) {
client = clientAttribute.get();
}
}
if (client != null) {
long ackedOffset = this.snodeController.getConsumerOffsetManager().queryOffset(enodeName, client.getGroupId(), topic, queueId);
if (latestLogicOffset - ackedOffset > snodeController.getSnodeConfig().getSlowConsumerThreshold()) {
log.warn("[SlowConsumer] group: {}, lastAckedOffset:{} nowOffset:{} ", client.getGroupId(), ackedOffset, latestLogicOffset);
return true;
}
}
return false;
}
}
......@@ -52,7 +52,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
}
@Override
public void registerPush(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel,
public void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel,
String groupId) {
Set<String> prevSubSet = this.clientSubscriptionTable.get(remotingChannel);
Set<String> keySet = new HashSet<>();
......@@ -89,7 +89,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
}
@Override
public void removePush(RemotingChannel remotingChannel) {
public void removePushSession(RemotingChannel remotingChannel) {
Set<String> subSet = this.clientSubscriptionTable.get(remotingChannel);
if (subSet != null) {
for (String key : subSet) {
......
......@@ -284,4 +284,11 @@ public class SnodeConfig {
return remotingServerInterceptorPath;
}
public int getSlowConsumerThreshold() {
return slowConsumerThreshold;
}
public void setSlowConsumerThreshold(int slowConsumerThreshold) {
this.slowConsumerThreshold = slowConsumerThreshold;
}
}
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.snode.constant;
import io.netty.util.AttributeKey;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.ClientRole;
public class SnodeConstant {
......@@ -30,10 +31,5 @@ public class SnodeConstant {
public static final AttributeKey<ClientRole> NETTY_CLIENT_ROLE_ATTRIBUTE_KEY = AttributeKey.valueOf("netty.client.role");
public static final String NETTY_PRODUCER_ROLE_ATTRIBUTE_VALUE = "Producer";
public static final String NETTY_CONSUMER_ROLE_ATTRIBUTE_VALUE = "Consumer";
public static final String NETTY_IOT_ROLE_ATTRIBUTE_VALUE = "IOTGroup";
public static final AttributeKey<Client> NETTY_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("netty.client");
}
......@@ -39,7 +39,7 @@ public class ConsumerOffsetManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private static final String TOPIC_GROUP_SEPARATOR = "@";
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
private ConcurrentMap<String/* Enode@Topic@Group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<>(512);
private transient SnodeController snodeController;
......@@ -88,11 +88,10 @@ public class ConsumerOffsetManager {
return result;
}
public void commitOffset(final String enodeName, final String clientHost, final String group, final String topic,
final int queueId,
final long offset) {
// topic@group
// Topic@group
String key = buildKey(enodeName, topic, group);
this.commitOffset(clientHost, key, queueId, offset);
}
......@@ -101,12 +100,13 @@ public class ConsumerOffsetManager {
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null == map) {
map = new ConcurrentHashMap<>(32);
ConcurrentMap<Integer, Long> prev = this.offsetTable.putIfAbsent(key, map);
map = prev != null ? prev : map;
map.put(queueId, offset);
this.offsetTable.put(key, map);
} else {
Long storeOffset = map.put(queueId, offset);
if (storeOffset != null && offset < storeOffset) {
log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
log.warn("[NOTIFYME]update consumer offset less than store. clientHost: {}, key: {}, queueId: {}, requestOffset: {}, storeOffset: {}", clientHost, key, queueId, offset, storeOffset);
}
}
}
......@@ -123,18 +123,6 @@ public class ConsumerOffsetManager {
return -1;
}
public String encode() {
return this.encode(false);
}
public void decode(String jsonString) {
if (jsonString != null) {
ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
if (obj != null) {
this.offsetTable = obj.offsetTable;
}
}
}
public String encode(final boolean prettyFormat) {
return RemotingSerializable.toJson(this, prettyFormat);
......
......@@ -126,7 +126,7 @@ public class ConsumerManageProcessor implements RequestProcessor {
(GetConsumerListByGroupRequestHeader) request
.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
List<String> clientIds = this.snodeController.getConsumerManagerImpl().getAllClientId(requestHeader.getConsumerGroup());
List<String> clientIds = this.snodeController.getConsumerManager().getAllClientId(requestHeader.getConsumerGroup());
if (!clientIds.isEmpty()) {
GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();
body.setConsumerIdList(clientIds);
......
......@@ -63,47 +63,41 @@ public class HeartbeatProcessor implements RequestProcessor {
private RemotingCommand register(RemotingChannel remotingChannel, RemotingCommand request) {
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
Channel channel;
ClientRole role = null;
Attribute<ClientRole> clientRoleAttribute = null;
Channel channel = null;
Attribute<Client> clientAttribute = null;
if (remotingChannel instanceof NettyChannelHandlerContextImpl) {
channel = ((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel();
clientRoleAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ROLE_ATTRIBUTE_KEY);
clientAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ATTRIBUTE_KEY);
}
Client client = new Client();
client.setClientId(heartbeatData.getClientID());
client.setRemotingChannel(remotingChannel);
for (ProducerData producerData : heartbeatData.getProducerDataSet()) {
role = ClientRole.Producer;
client.setGroupId(producerData.getGroupName());
client.setClientRole(role);
this.snodeController.getProducerManagerImpl().register(client);
client.setClientRole(ClientRole.Producer);
this.snodeController.getProducerManager().register(client);
}
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
client.setGroupId(data.getGroupName());
role = ClientRole.Consumer;
client.setClientRole(role);
boolean channelChanged = this.snodeController.getConsumerManagerImpl().register(client);
client.setClientRole(ClientRole.Consumer);
boolean channelChanged = this.snodeController.getConsumerManager().register(client);
boolean subscriptionChanged = this.snodeController.getSubscriptionManager().subscribe(data.getGroupName(),
data.getSubscriptionDataSet(),
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere());
if (data.getConsumeType() == ConsumeType.CONSUME_PUSH) {
NettyChannelImpl nettyChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl)remotingChannel).getChannelHandlerContext().channel());
this.snodeController.getSubscriptionManager().registerPush(data.getSubscriptionDataSet(), nettyChannel, data.getGroupName());
NettyChannelImpl nettyChannel = new NettyChannelImpl(channel);
this.snodeController.getSubscriptionManager().registerPushSession(data.getSubscriptionDataSet(), nettyChannel, data.getGroupName());
}
if (subscriptionChanged || channelChanged) {
this.snodeController.getClientService().notifyConsumer(data.getGroupName());
}
}
if (role != null) {
log.debug("Set channel attribute value: {}", role);
clientRoleAttribute.setIfAbsent(role);
}
clientAttribute.setIfAbsent(client);
RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
......@@ -118,13 +112,13 @@ public class HeartbeatProcessor implements RequestProcessor {
final String producerGroup = requestHeader.getProducerGroup();
if (producerGroup != null) {
this.snodeController.getProducerManagerImpl().unRegister(producerGroup, remotingChannel);
this.snodeController.getProducerManager().unRegister(producerGroup, remotingChannel);
}
final String consumerGroup = requestHeader.getConsumerGroup();
if (consumerGroup != null) {
this.snodeController.getConsumerManagerImpl().unRegister(consumerGroup, remotingChannel);
this.snodeController.getSubscriptionManager().removePush(remotingChannel);
this.snodeController.getConsumerManager().unRegister(consumerGroup, remotingChannel);
this.snodeController.getSubscriptionManager().removePushSession(remotingChannel);
this.snodeController.getClientService().notifyConsumer(consumerGroup);
}
......
......@@ -75,7 +75,7 @@ public class SendMessageProcessor implements RequestProcessor {
}
remotingChannel.reply(data);
if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) {
this.snodeController.getPushService().pushMessage(topic, queueId, message, data);
this.snodeController.getPushService().pushMessage(enodeName, topic, queueId, message, data);
}
} else {
if (this.snodeController.getSendMessageInterceptorGroup() != null) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.service;
public class ConsumerOffsetService {
}
......@@ -20,10 +20,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface PushService {
void pushMessage(final String topic, final Integer queueId, final byte[] message,
void pushMessage(final String enodeName, final String topic, final Integer queueId, final byte[] message,
final RemotingCommand response);
void start();
void shutdown();
}
......@@ -42,7 +42,7 @@ public class ClientServiceImpl implements ClientService {
SubscriptionGroupConfig subscriptionGroupConfig = snodeController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);
boolean notifyConsumer = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
if (notifyConsumer) {
List<RemotingChannel> remotingChannels = snodeController.getConsumerManagerImpl().getChannels(group);
List<RemotingChannel> remotingChannels = snodeController.getConsumerManager().getChannels(group);
if (remotingChannels != null && snodeController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(group).isNotifyConsumerIdsChangedEnable()) {
for (RemotingChannel remotingChannel : remotingChannels) {
NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
......
......@@ -58,13 +58,15 @@ public class PushServiceImpl implements PushService {
private final Integer queueId;
private final String topic;
private final RemotingCommand response;
private final String enodeName;
public PushTask(final String topic, final Integer queueId, final byte[] message,
final RemotingCommand response) {
final RemotingCommand response, final String enodeName) {
this.message = message;
this.queueId = queueId;
this.topic = topic;
this.response = response;
this.enodeName = enodeName;
}
@Override
......@@ -79,11 +81,16 @@ public class PushServiceImpl implements PushService {
RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.SNODE_PUSH_MESSAGE, pushMessageHeader);
pushMessage.setBody(message);
Set<RemotingChannel> consumerTable = snodeController.getSubscriptionManager().getPushableChannel(topic, queueId);
log.info("Push message to consumerTable: {}", consumerTable);
if (consumerTable != null) {
for (RemotingChannel remotingChannel : consumerTable) {
if (remotingChannel.isWritable()) {
log.info("Push message to remotingChannel: {}", remotingChannel.remoteAddress());
boolean slowConsumer = snodeController.getSlowConsumerService().isSlowConsumer(sendMessageResponseHeader.getQueueOffset(), topic, queueId, remotingChannel, enodeName);
if (slowConsumer) {
log.warn("[SlowConsumer]: {} closed as slow consumer", remotingChannel);//TODO metrics
remotingChannel.close();
continue;
}
log.debug("Push message to remotingChannel: {}", remotingChannel.remoteAddress());
snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
} else {
log.warn("Remoting channel is not writable: {}", remotingChannel.remoteAddress());
......@@ -107,21 +114,17 @@ public class PushServiceImpl implements PushService {
}
@Override
public void pushMessage(final String topic, final Integer queueId, final byte[] message,
public void pushMessage(final String enodeName, final String topic, final Integer queueId, final byte[] message,
final RemotingCommand response) {
Set<RemotingChannel> pushableChannels = this.snodeController.getSubscriptionManager().getPushableChannel(topic, queueId);
if (pushableChannels != null) {
PushTask pushTask = new PushTask(topic, queueId, message, response);
PushTask pushTask = new PushTask(topic, queueId, message, response, enodeName);
pushMessageExecutorService.submit(pushTask);
} else {
log.info("Topic: {} QueueId: {} no need to push", topic, queueId);
}
}
@Override
public void start() {
}
@Override
public void shutdown() {
this.pushMessageExecutorService.shutdown();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册