diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index 18631ea9bfa294f99989fe902f0a0e0fd53c9546..0707db5dc4b48a635c33fbdcb134263338d5d47e 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -39,10 +39,12 @@ import org.apache.rocketmq.remoting.interceptor.InterceptorFactory; import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.snode.client.ClientHousekeepingService; import org.apache.rocketmq.snode.client.ClientManager; +import org.apache.rocketmq.snode.client.SlowConsumerService; import org.apache.rocketmq.snode.client.SubscriptionGroupManager; import org.apache.rocketmq.snode.client.SubscriptionManager; import org.apache.rocketmq.snode.client.impl.ConsumerManagerImpl; import org.apache.rocketmq.snode.client.impl.ProducerManagerImpl; +import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl; import org.apache.rocketmq.snode.client.impl.SubscriptionManagerImpl; import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.offset.ConsumerOffsetManager; @@ -77,10 +79,8 @@ public class SnodeController { private NnodeService nnodeService; private ExecutorService consumerManagerExecutor; private ScheduledService scheduledService; -// private ProducerManager producerManager; -// private ConsumerManager consumerManager; - private ClientManager producerManagerImpl; - private ClientManager consumerManagerImpl; + private ClientManager producerManager; + private ClientManager consumerManager; private SubscriptionManager subscriptionManager; private ClientHousekeepingService clientHousekeepingService; private SubscriptionGroupManager subscriptionGroupManager; @@ -94,6 +94,7 @@ public class SnodeController { private InterceptorGroup sendMessageInterceptorGroup; private PushService pushService; private ClientService clientService; + private SlowConsumerService slowConsumerService; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( "SnodeControllerScheduledThread")); @@ -172,9 +173,10 @@ public class SnodeController { this.pushService = new PushServiceImpl(this); this.clientService = new ClientServiceImpl(this); this.subscriptionManager = new SubscriptionManagerImpl(); - this.producerManagerImpl = new ProducerManagerImpl(); - this.consumerManagerImpl = new ConsumerManagerImpl(this); - this.clientHousekeepingService = new ClientHousekeepingService(this.producerManagerImpl, this.consumerManagerImpl); + this.producerManager = new ProducerManagerImpl(); + this.consumerManager = new ConsumerManagerImpl(this); + this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, this.consumerManager); + this.slowConsumerService = new SlowConsumerServiceImpl(this); } public SnodeConfig getSnodeConfig() { @@ -257,18 +259,10 @@ public class SnodeController { this.pushService.shutdown(); } -// public ProducerManager getProducerManager() { -// return producerManager; -// } - public RemotingServer getSnodeServer() { return snodeServer; } -// public ConsumerManager getConsumerManager() { -// return consumerManager; -// } - public SubscriptionGroupManager getSubscriptionGroupManager() { return subscriptionGroupManager; } @@ -326,20 +320,20 @@ public class SnodeController { this.remotingServerInterceptorGroup = remotingServerInterceptorGroup; } - public ClientManager getProducerManagerImpl() { - return producerManagerImpl; + public ClientManager getProducerManager() { + return producerManager; } - public void setProducerManagerImpl(ClientManager producerManagerImpl) { - this.producerManagerImpl = producerManagerImpl; + public void setProducerManager(ClientManager producerManager) { + this.producerManager = producerManager; } - public ClientManager getConsumerManagerImpl() { - return consumerManagerImpl; + public ClientManager getConsumerManager() { + return consumerManager; } - public void setConsumerManagerImpl(ClientManager consumerManagerImpl) { - this.consumerManagerImpl = consumerManagerImpl; + public void setConsumerManager(ClientManager consumerManager) { + this.consumerManager = consumerManager; } public SubscriptionManager getSubscriptionManager() { @@ -357,4 +351,12 @@ public class SnodeController { public void setClientService(ClientService clientService) { this.clientService = clientService; } + + public SlowConsumerService getSlowConsumerService() { + return slowConsumerService; + } + + public void setSlowConsumerService(SlowConsumerService slowConsumerService) { + this.slowConsumerService = slowConsumerService; + } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java b/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java index 2a3d52ac8a53d436f17494cbbdcb9d2c0bbb43bf..2376b706b31de688bfbbac28a6f6d8611a074164 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java @@ -53,9 +53,7 @@ public class Client { if (o == null || getClass() != o.getClass()) return false; Client client = (Client) o; - return heartbeatInterval == client.heartbeatInterval && - lastUpdateTimestamp == client.lastUpdateTimestamp && - version == client.version && + return version == client.version && clientRole == client.clientRole && Objects.equals(groupId, client.groupId) && Objects.equals(clientId, client.clientId) && @@ -65,7 +63,7 @@ public class Client { @Override public int hashCode() { - return Objects.hash(clientRole, groupId, clientId, remotingChannel, heartbeatInterval, lastUpdateTimestamp, version, language); + return Objects.hash(clientRole, groupId, clientId, remotingChannel, version, language); } public String getGroupId() { @@ -123,4 +121,19 @@ public class Client { public void setLanguage(LanguageCode language) { this.language = language; } + + @Override public String toString() { + return "Client{" + + "clientRole=" + clientRole + + ", groupId='" + groupId + '\'' + + ", clientId='" + clientId + '\'' + + ", remotingChannel=" + remotingChannel + + ", heartbeatInterval=" + heartbeatInterval + + ", lastUpdateTimestamp=" + lastUpdateTimestamp + + ", version=" + version + + ", language=" + language + + '}'; + } } + + diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java index a86d14f01b09289688bdd3925735de3f1b651a7d..42ceacf254848dd67516c95131fb0a1bfbce7d47 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java @@ -52,9 +52,9 @@ public class ClientHousekeepingService implements ChannelEventListener { private ClientRole clientRole(RemotingChannel remotingChannel) { if (remotingChannel instanceof NettyChannelImpl) { Channel channel = ((NettyChannelImpl) remotingChannel).getChannel(); - Attribute clientRoleAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ROLE_ATTRIBUTE_KEY); - if (clientRoleAttribute != null) { - return clientRoleAttribute.get(); + Attribute clientAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ATTRIBUTE_KEY); + if (clientAttribute != null) { + return clientAttribute.get().getClientRole(); } } log.warn("RemotingChannel type error: {}", remotingChannel.getClass()); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/SlowConsumerService.java b/snode/src/main/java/org/apache/rocketmq/snode/client/SlowConsumerService.java index b341e69bbf0caab176ec1351941dff26faddf074..1fdea74baf7273fc0092c322cc5df506e7fce8a1 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/SlowConsumerService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/SlowConsumerService.java @@ -20,5 +20,6 @@ import org.apache.rocketmq.remoting.RemotingChannel; public interface SlowConsumerService { - boolean isSlowConsumer(long latestLogicOffset, String topic, String queueId, RemotingChannel remotingChannel); + boolean isSlowConsumer(long latestLogicOffset, String topic, int queueId, RemotingChannel remotingChannel, + String enodeName); } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java index e153ef1c24ee08bfe3bb5829b37e953fc4542908..ab013b8b9c4b788769b007cb442d215ae64e7447 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java @@ -16,8 +16,6 @@ */ package org.apache.rocketmq.snode.client; -import java.util.Iterator; -import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.DataVersion; @@ -26,20 +24,15 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; -import org.apache.rocketmq.remoting.serialize.RemotingSerializable; import org.apache.rocketmq.snode.SnodeController; public class SubscriptionGroupManager { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); - private final ConcurrentMap subscriptionGroupTable = - new ConcurrentHashMap<>(1024); + private final ConcurrentMap subscriptionGroupTable = new ConcurrentHashMap<>(1024); private final DataVersion dataVersion = new DataVersion(); - private transient SnodeController snodeController; - public SubscriptionGroupManager() { - this.init(); - } + private transient SnodeController snodeController; public SubscriptionGroupManager(SnodeController snodeController) { this.snodeController = snodeController; @@ -47,51 +40,6 @@ public class SubscriptionGroupManager { } private void init() { - { - SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); - subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP); - this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig); - } - - { - SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); - subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP); - this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig); - } - - { - SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); - subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP); - this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig); - } - - { - SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); - subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP); - subscriptionGroupConfig.setConsumeBroadcastEnable(true); - this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig); - } - - { - SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); - subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP); - subscriptionGroupConfig.setConsumeBroadcastEnable(true); - this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig); - } - - { - SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); - subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP); - subscriptionGroupConfig.setConsumeBroadcastEnable(true); - this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig); - } - - { - SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); - subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP); - subscriptionGroupConfig.setConsumeBroadcastEnable(true); - this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig); - } } public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) { @@ -133,32 +81,7 @@ public class SubscriptionGroupManager { return subscriptionGroupConfig; } - public String encode() { - return this.encode(false); - } - public void decode(String jsonString) { - if (jsonString != null) { - SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class); - if (obj != null) { - this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable); - this.dataVersion.assignNewOne(obj.dataVersion); - this.printLoadDataWhenFirstBoot(obj); - } - } - } - - public String encode(final boolean prettyFormat) { - return RemotingSerializable.toJson(this, prettyFormat); - } - - private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) { - Iterator> it = sgm.getSubscriptionGroupTable().entrySet().iterator(); - while (it.hasNext()) { - Entry next = it.next(); - log.info("load exist subscription group, {}", next.getValue().toString()); - } - } public ConcurrentMap getSubscriptionGroupTable() { return subscriptionGroupTable; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java index 099dc1231fe503637de62e802bb93965874241a9..a40bbb6c63b12f9393c4792f78cc8bd441efd4f3 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java @@ -35,9 +35,9 @@ public interface SubscriptionManager { Subscription getSubscription(String groupId); - void registerPush(Set subscriptionDataSet, RemotingChannel remotingChannel, String groupId); + void registerPushSession(Set subscriptionDataSet, RemotingChannel remotingChannel, String groupId); - void removePush(RemotingChannel remotingChannel); + void removePushSession(RemotingChannel remotingChannel); Set getPushableChannel(String topic, Integer queueId); } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java index 9aae903950b3c786155f7f60aec63c58fffdff79..34d79486538bc013d2b9c80def450d72041e7d34 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java @@ -122,6 +122,7 @@ public abstract class ClientManagerImpl implements ClientManager { } oldClient.setLastUpdateTimestamp(System.currentTimeMillis()); } + log.debug("Register client role: {}, group: {}, last: {}", client.getClientRole(), client.getGroupId(), client.getLastUpdateTimestamp()); onRegister(client.getGroupId(), client.getRemotingChannel()); return updated; } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java index cf9492c0f34be4d07aa6eab9ba13817eab708e16..fb6693c92ec9cdc026c30eff58ae3249bab29f56 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java @@ -39,12 +39,12 @@ public class ConsumerManagerImpl extends ClientManagerImpl { @Override public void onClosed(String groupId, RemotingChannel remotingChannel) { this.snodeController.getClientService().notifyConsumer(groupId); - this.snodeController.getSubscriptionManager().removePush(remotingChannel); + this.snodeController.getSubscriptionManager().removePushSession(remotingChannel); } @Override public void onUnregister(String groupId, RemotingChannel remotingChannel) { this.snodeController.getClientService().notifyConsumer(groupId); - this.snodeController.getSubscriptionManager().removePush(remotingChannel); + this.snodeController.getSubscriptionManager().removePushSession(remotingChannel); } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java index 9b95301eb58aab9593c20fcf62c55176cf6effab..acf6d5004ff4181f631cead37f0f0cd77c527f62 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java @@ -16,14 +16,45 @@ */ package org.apache.rocketmq.snode.client.impl; +import io.netty.channel.Channel; +import io.netty.util.Attribute; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.netty.NettyChannelImpl; +import org.apache.rocketmq.snode.SnodeController; +import org.apache.rocketmq.snode.client.Client; import org.apache.rocketmq.snode.client.SlowConsumerService; +import org.apache.rocketmq.snode.constant.SnodeConstant; public class SlowConsumerServiceImpl implements SlowConsumerService { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); + + private final SnodeController snodeController; + + public SlowConsumerServiceImpl(SnodeController snodeController) { + this.snodeController = snodeController; + } @Override - public boolean isSlowConsumer(long latestLogicOffset, String topic, String queueId, - RemotingChannel remotingChannel) { + public boolean isSlowConsumer(long latestLogicOffset, String topic, int queueId, + RemotingChannel remotingChannel, String enodeName) { + Client client = null; + if (remotingChannel instanceof NettyChannelImpl) { + Channel channel = ((NettyChannelImpl) remotingChannel).getChannel(); + Attribute clientAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ATTRIBUTE_KEY); + if (clientAttribute != null) { + client = clientAttribute.get(); + } + } + if (client != null) { + long ackedOffset = this.snodeController.getConsumerOffsetManager().queryOffset(enodeName, client.getGroupId(), topic, queueId); + if (latestLogicOffset - ackedOffset > snodeController.getSnodeConfig().getSlowConsumerThreshold()) { + log.warn("[SlowConsumer] group: {}, lastAckedOffset:{} nowOffset:{} ", client.getGroupId(), ackedOffset, latestLogicOffset); + return true; + } + } return false; } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java index e44f9a9607125befec059781047eedc22f57a2d7..9011c5fd2b38fecf4531947dd27c229ae4efecdc 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java @@ -52,7 +52,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { } @Override - public void registerPush(Set subscriptionDataSet, RemotingChannel remotingChannel, + public void registerPushSession(Set subscriptionDataSet, RemotingChannel remotingChannel, String groupId) { Set prevSubSet = this.clientSubscriptionTable.get(remotingChannel); Set keySet = new HashSet<>(); @@ -89,7 +89,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { } @Override - public void removePush(RemotingChannel remotingChannel) { + public void removePushSession(RemotingChannel remotingChannel) { Set subSet = this.clientSubscriptionTable.get(remotingChannel); if (subSet != null) { for (String key : subSet) { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java index fa1681909c20ad1453376259dfdc5691746736f6..3143d7d1739651f2e4488306a2003d5c4a1b7d85 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java @@ -284,4 +284,11 @@ public class SnodeConfig { return remotingServerInterceptorPath; } + public int getSlowConsumerThreshold() { + return slowConsumerThreshold; + } + + public void setSlowConsumerThreshold(int slowConsumerThreshold) { + this.slowConsumerThreshold = slowConsumerThreshold; + } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java index 9fe315e8807746c436aaa9882406dfd14293a82e..16deb6d1eae8f07afcf2745c398b69b9ed85bb25 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.snode.constant; import io.netty.util.AttributeKey; +import org.apache.rocketmq.snode.client.Client; import org.apache.rocketmq.snode.client.impl.ClientRole; public class SnodeConstant { @@ -30,10 +31,5 @@ public class SnodeConstant { public static final AttributeKey NETTY_CLIENT_ROLE_ATTRIBUTE_KEY = AttributeKey.valueOf("netty.client.role"); - public static final String NETTY_PRODUCER_ROLE_ATTRIBUTE_VALUE = "Producer"; - - public static final String NETTY_CONSUMER_ROLE_ATTRIBUTE_VALUE = "Consumer"; - - public static final String NETTY_IOT_ROLE_ATTRIBUTE_VALUE = "IOTGroup"; - + public static final AttributeKey NETTY_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("netty.client"); } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java index 7cd485c83a3054380b58f6cf4558767def7206e2..7c417364a014ccda2de1c4d3c73cad09955619b4 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java @@ -39,7 +39,7 @@ public class ConsumerOffsetManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private static final String TOPIC_GROUP_SEPARATOR = "@"; - private ConcurrentMap> offsetTable = + private ConcurrentMap> offsetTable = new ConcurrentHashMap<>(512); private transient SnodeController snodeController; @@ -88,11 +88,10 @@ public class ConsumerOffsetManager { return result; } - public void commitOffset(final String enodeName, final String clientHost, final String group, final String topic, final int queueId, final long offset) { - // topic@group + // Topic@group String key = buildKey(enodeName, topic, group); this.commitOffset(clientHost, key, queueId, offset); } @@ -101,12 +100,13 @@ public class ConsumerOffsetManager { ConcurrentMap map = this.offsetTable.get(key); if (null == map) { map = new ConcurrentHashMap<>(32); + ConcurrentMap prev = this.offsetTable.putIfAbsent(key, map); + map = prev != null ? prev : map; map.put(queueId, offset); - this.offsetTable.put(key, map); } else { Long storeOffset = map.put(queueId, offset); if (storeOffset != null && offset < storeOffset) { - log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset); + log.warn("[NOTIFYME]update consumer offset less than store. clientHost: {}, key: {}, queueId: {}, requestOffset: {}, storeOffset: {}", clientHost, key, queueId, offset, storeOffset); } } } @@ -123,18 +123,6 @@ public class ConsumerOffsetManager { return -1; } - public String encode() { - return this.encode(false); - } - - public void decode(String jsonString) { - if (jsonString != null) { - ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class); - if (obj != null) { - this.offsetTable = obj.offsetTable; - } - } - } public String encode(final boolean prettyFormat) { return RemotingSerializable.toJson(this, prettyFormat); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java index 9f044ec9412908fc2c04a43bbe48a46f986b9bde..4cd54d6ba2b9fdf0fa07e0794da4913ca9d8722b 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java @@ -126,7 +126,7 @@ public class ConsumerManageProcessor implements RequestProcessor { (GetConsumerListByGroupRequestHeader) request .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class); - List clientIds = this.snodeController.getConsumerManagerImpl().getAllClientId(requestHeader.getConsumerGroup()); + List clientIds = this.snodeController.getConsumerManager().getAllClientId(requestHeader.getConsumerGroup()); if (!clientIds.isEmpty()) { GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody(); body.setConsumerIdList(clientIds); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java index 55bbe8205487e452cb1c1e1adf59c943d41404d9..3d2c03ac0bbebe575c21819c625cddd043e358a7 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java @@ -63,47 +63,41 @@ public class HeartbeatProcessor implements RequestProcessor { private RemotingCommand register(RemotingChannel remotingChannel, RemotingCommand request) { HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); - Channel channel; - ClientRole role = null; - Attribute clientRoleAttribute = null; + Channel channel = null; + Attribute clientAttribute = null; if (remotingChannel instanceof NettyChannelHandlerContextImpl) { channel = ((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel(); - clientRoleAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ROLE_ATTRIBUTE_KEY); + clientAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ATTRIBUTE_KEY); } Client client = new Client(); client.setClientId(heartbeatData.getClientID()); client.setRemotingChannel(remotingChannel); for (ProducerData producerData : heartbeatData.getProducerDataSet()) { - role = ClientRole.Producer; client.setGroupId(producerData.getGroupName()); - client.setClientRole(role); - this.snodeController.getProducerManagerImpl().register(client); + client.setClientRole(ClientRole.Producer); + this.snodeController.getProducerManager().register(client); } for (ConsumerData data : heartbeatData.getConsumerDataSet()) { client.setGroupId(data.getGroupName()); - role = ClientRole.Consumer; - client.setClientRole(role); - boolean channelChanged = this.snodeController.getConsumerManagerImpl().register(client); + client.setClientRole(ClientRole.Consumer); + boolean channelChanged = this.snodeController.getConsumerManager().register(client); boolean subscriptionChanged = this.snodeController.getSubscriptionManager().subscribe(data.getGroupName(), data.getSubscriptionDataSet(), data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere()); if (data.getConsumeType() == ConsumeType.CONSUME_PUSH) { - NettyChannelImpl nettyChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl)remotingChannel).getChannelHandlerContext().channel()); - this.snodeController.getSubscriptionManager().registerPush(data.getSubscriptionDataSet(), nettyChannel, data.getGroupName()); + NettyChannelImpl nettyChannel = new NettyChannelImpl(channel); + this.snodeController.getSubscriptionManager().registerPushSession(data.getSubscriptionDataSet(), nettyChannel, data.getGroupName()); } if (subscriptionChanged || channelChanged) { this.snodeController.getClientService().notifyConsumer(data.getGroupName()); } } - if (role != null) { - log.debug("Set channel attribute value: {}", role); - clientRoleAttribute.setIfAbsent(role); - } + clientAttribute.setIfAbsent(client); RemotingCommand response = RemotingCommand.createResponseCommand(null); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); @@ -118,13 +112,13 @@ public class HeartbeatProcessor implements RequestProcessor { final String producerGroup = requestHeader.getProducerGroup(); if (producerGroup != null) { - this.snodeController.getProducerManagerImpl().unRegister(producerGroup, remotingChannel); + this.snodeController.getProducerManager().unRegister(producerGroup, remotingChannel); } final String consumerGroup = requestHeader.getConsumerGroup(); if (consumerGroup != null) { - this.snodeController.getConsumerManagerImpl().unRegister(consumerGroup, remotingChannel); - this.snodeController.getSubscriptionManager().removePush(remotingChannel); + this.snodeController.getConsumerManager().unRegister(consumerGroup, remotingChannel); + this.snodeController.getSubscriptionManager().removePushSession(remotingChannel); this.snodeController.getClientService().notifyConsumer(consumerGroup); } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java index f9f6be22a0925e887a663b90cde70212b24adc03..b498fa551dea655d923e0fd77c6ee5a63a684f89 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java @@ -75,7 +75,7 @@ public class SendMessageProcessor implements RequestProcessor { } remotingChannel.reply(data); if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) { - this.snodeController.getPushService().pushMessage(topic, queueId, message, data); + this.snodeController.getPushService().pushMessage(enodeName, topic, queueId, message, data); } } else { if (this.snodeController.getSendMessageInterceptorGroup() != null) { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/ConsumerOffsetService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/ConsumerOffsetService.java deleted file mode 100644 index 3699b08cb3ce1a0f9bc56d9fcd066b54d04b22ab..0000000000000000000000000000000000000000 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/ConsumerOffsetService.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.snode.service; -public class ConsumerOffsetService { - -} diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java index fd61303984518eb5e666014063f0d08afea76d27..db24055b067a7aaf861da24482ee7a0110a6548c 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java @@ -20,10 +20,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; public interface PushService { - void pushMessage(final String topic, final Integer queueId, final byte[] message, + void pushMessage(final String enodeName, final String topic, final Integer queueId, final byte[] message, final RemotingCommand response); - void start(); - void shutdown(); } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java index 5ce42bd771e11514b3473ac27b13e62a5fd66e9c..6e49c6a53ed933ca3de8283ad10d9748993fc855 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java @@ -42,7 +42,7 @@ public class ClientServiceImpl implements ClientService { SubscriptionGroupConfig subscriptionGroupConfig = snodeController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group); boolean notifyConsumer = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable(); if (notifyConsumer) { - List remotingChannels = snodeController.getConsumerManagerImpl().getChannels(group); + List remotingChannels = snodeController.getConsumerManager().getChannels(group); if (remotingChannels != null && snodeController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(group).isNotifyConsumerIdsChangedEnable()) { for (RemotingChannel remotingChannel : remotingChannels) { NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader(); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java index e1b2562d1d422e3b3a458a6d13c07855e33b5aa9..f6a3ac63fa88fe3d807d2498272a108c413f3347 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java @@ -58,13 +58,15 @@ public class PushServiceImpl implements PushService { private final Integer queueId; private final String topic; private final RemotingCommand response; + private final String enodeName; public PushTask(final String topic, final Integer queueId, final byte[] message, - final RemotingCommand response) { + final RemotingCommand response, final String enodeName) { this.message = message; this.queueId = queueId; this.topic = topic; this.response = response; + this.enodeName = enodeName; } @Override @@ -79,11 +81,16 @@ public class PushServiceImpl implements PushService { RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.SNODE_PUSH_MESSAGE, pushMessageHeader); pushMessage.setBody(message); Set consumerTable = snodeController.getSubscriptionManager().getPushableChannel(topic, queueId); - log.info("Push message to consumerTable: {}", consumerTable); if (consumerTable != null) { for (RemotingChannel remotingChannel : consumerTable) { if (remotingChannel.isWritable()) { - log.info("Push message to remotingChannel: {}", remotingChannel.remoteAddress()); + boolean slowConsumer = snodeController.getSlowConsumerService().isSlowConsumer(sendMessageResponseHeader.getQueueOffset(), topic, queueId, remotingChannel, enodeName); + if (slowConsumer) { + log.warn("[SlowConsumer]: {} closed as slow consumer", remotingChannel);//TODO metrics + remotingChannel.close(); + continue; + } + log.debug("Push message to remotingChannel: {}", remotingChannel.remoteAddress()); snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.DEFAULT_TIMEOUT_MILLS); } else { log.warn("Remoting channel is not writable: {}", remotingChannel.remoteAddress()); @@ -107,21 +114,17 @@ public class PushServiceImpl implements PushService { } @Override - public void pushMessage(final String topic, final Integer queueId, final byte[] message, + public void pushMessage(final String enodeName, final String topic, final Integer queueId, final byte[] message, final RemotingCommand response) { Set pushableChannels = this.snodeController.getSubscriptionManager().getPushableChannel(topic, queueId); if (pushableChannels != null) { - PushTask pushTask = new PushTask(topic, queueId, message, response); + PushTask pushTask = new PushTask(topic, queueId, message, response, enodeName); pushMessageExecutorService.submit(pushTask); } else { log.info("Topic: {} QueueId: {} no need to push", topic, queueId); } } - @Override - public void start() { - } - @Override public void shutdown() { this.pushMessageExecutorService.shutdown();