diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index 0f424a83a602f8ad3e80cad1c07db0c161f45e96..9d0cb7b0c10a9634bef4ae210abd4051593285b2 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -401,4 +401,8 @@ public class SnodeController { public void setSlowConsumerService(SlowConsumerService slowConsumerService) { this.slowConsumerService = slowConsumerService; } + + public void setConsumerOffsetManager(ConsumerOffsetManager consumerOffsetManager) { + this.consumerOffsetManager = consumerOffsetManager; + } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/SlowConsumerService.java b/snode/src/main/java/org/apache/rocketmq/snode/client/SlowConsumerService.java index 369b24537feea93890d54242d36a1d6bcfbcab53..15d74874132429782766a5766e5a071d27214ee6 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/SlowConsumerService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/SlowConsumerService.java @@ -16,7 +16,29 @@ */ package org.apache.rocketmq.snode.client; +import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + public interface SlowConsumerService { - boolean isSlowConsumer(long latestLogicOffset, String topic, int queueId, String consumerGroup, String enodeName); + /** + * Check whether this consumer is slow consumer, slow consumer means this consumer's acked offset is far behind + * current offset. + * + * @param currentOffset Current offset in consumer queue. + * @param topic + * @param queueId + * @param consumerGroup + * @param enodeName + * @return If this consumer is slow consumer, return true, otherwise false. + */ + boolean isSlowConsumer(long currentOffset, String topic, int queueId, String consumerGroup, String enodeName); + + /** + * When a consumer is checked as slow consumer, this method will be invoked to do next action. + * + * @param pushMessage The message will be pushed to consumer. + * @param remotingChannel Consumer channel. + */ + void slowConsumerResolve(RemotingCommand pushMessage, RemotingChannel remotingChannel); } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java index f7485c22b3d0dbdf823bd6a6a136fd3f1f829e6c..b96f4ee38ef0d4384e2a78a6f95c584f8be97d0c 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java @@ -19,6 +19,8 @@ package org.apache.rocketmq.snode.client.impl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.client.SlowConsumerService; @@ -32,14 +34,19 @@ public class SlowConsumerServiceImpl implements SlowConsumerService { } @Override - public boolean isSlowConsumer(long latestLogicOffset, String topic, int queueId, + public boolean isSlowConsumer(long currentOffset, String topic, int queueId, String consumerGroup, String enodeName) { long ackedOffset = this.snodeController.getConsumerOffsetManager().queryOffset(enodeName, consumerGroup, topic, queueId); - if (latestLogicOffset - ackedOffset > snodeController.getSnodeConfig().getSlowConsumerThreshold()) { - log.warn("[SlowConsumer] group: {}, lastAckedOffset:{} nowOffset:{} ", consumerGroup, ackedOffset, latestLogicOffset); + if (currentOffset - ackedOffset > snodeController.getSnodeConfig().getSlowConsumerThreshold()) { + log.warn("[SlowConsumer] group: {}, lastAckedOffset:{} nowOffset:{} ", consumerGroup, ackedOffset, currentOffset); return true; } return false; } + + @Override + public void slowConsumerResolve(RemotingCommand pushMessage, RemotingChannel remotingChannel) { + log.warn("[SlowConsumer] RemotingChannel address: {}", remotingChannel.remoteAddress()); + } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/ClientService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/ClientService.java index 8447fecc3e82237fd1c3349ce35f9153447f4e05..58e672c11c311d885adb159a3415afe937f14d57 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/ClientService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/ClientService.java @@ -1,4 +1,4 @@ -package org.apache.rocketmq.snode.service;/* +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -14,6 +14,7 @@ package org.apache.rocketmq.snode.service;/* * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.rocketmq.snode.service; public interface ClientService { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java index 03679b37ed3cb0a8fdf51699c441ac57f224d9cb..a82dafbf9a1fc78fcacdc6e103bf2cd716dce3c1 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java @@ -19,6 +19,8 @@ package org.apache.rocketmq.snode.service; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingConnectException; @@ -27,20 +29,76 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public interface EnodeService { - void sendHearbeat(RemotingCommand remotingCommand); + /** + * Send Heartbeat data to enode server to keep alive + * + * @param remotingCommand Heartbeat request + */ + void sendHeartbeat(RemotingCommand remotingCommand); + /** + * Send message to enode. + * + * @param enodeName Enode server name + * @param request {@link SendMessageRequestHeaderV2} Send message request header + * @return Send message response future + */ CompletableFuture sendMessage(final String enodeName, final RemotingCommand request); + /** + * Pull message from enode server. + * + * @param enodeName Enode server name + * @param request {@link PullMessageRequestHeader} Pull message request header + * @return Pull message Response future + */ CompletableFuture pullMessage(final String enodeName, final RemotingCommand request); + /** + * Create topic to enode server. + * + * @param enodeName Enode server name + * @param topicConfig {@link TopicConfig} Topic config information + * @return + * @throws InterruptedException + * @throws RemotingTimeoutException + * @throws RemotingSendRequestException + * @throws RemotingConnectException + */ RemotingCommand creatTopic(String enodeName, TopicConfig topicConfig) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; + /** + * Update Enode address from name server + * + * @param clusterName Cluster name, keep insistence with enode cluster + * @throws InterruptedException + * @throws RemotingTimeoutException + * @throws RemotingSendRequestException + * @throws RemotingConnectException + * @throws MQBrokerException + */ void updateEnodeAddress(String clusterName) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException; + /** + * Persist subscription config data to storage server, maybe file or to enode, in this situation, will persist to + * enode + * + * @param subscriptionGroupConfig {@link SubscriptionGroupConfig} Group subscription config + * @return + */ boolean persistSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig); + /** + * Persist offset information of consumer group to storage server. + * + * @param enodeName Which enode server. + * @param groupName Consumer group name. + * @param topic Related topic. + * @param queueId QueueId of related topic. + * @param offset Current offset of target queue of subscribed topic. + */ void persistOffset(String enodeName, String groupName, String topic, int queueId, long offset); RemotingCommand loadOffset(String enodeName, String consumerGroup, String topic, diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java index 346661586fedac1c8b5b6371d943923de0941008..3a1a7fb101b7dd3368605b1dd7508155ea4dd70b 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java @@ -61,14 +61,14 @@ public class EnodeServiceImpl implements EnodeService { } @Override - public void sendHearbeat(RemotingCommand remotingCommand) { + public void sendHeartbeat(RemotingCommand remotingCommand) { for (Map.Entry> entry : enodeTable.entrySet()) { String enodeAddr = entry.getValue().get(MixAll.MASTER_ID); if (enodeAddr != null) { try { this.snodeController.getRemotingClient().invokeSync(enodeAddr, remotingCommand, SnodeConstant.DEFAULT_TIMEOUT_MILLS); } catch (Exception ex) { - log.warn("Send heart beat faild:{} ,ex:{}", enodeAddr, ex); + log.warn("Send heart beat failed:{}, ex:{}", enodeAddr, ex); } } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java index f668eea21deedce47fea5095845c140ae1b23735..da6e9e5d61f6684b58e6863e50ea9142fe30359c 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java @@ -104,8 +104,8 @@ public class PushServiceImpl implements PushService { if (subscription.getSubscriptionData(topic) != null) { boolean slowConsumer = snodeController.getSlowConsumerService().isSlowConsumer(sendMessageResponseHeader.getQueueOffset(), topic, queueId, consumerGroup, enodeName); if (slowConsumer) { - log.warn("[SlowConsumer]: {} closed as slow consumer", remotingChannel); - remotingChannel.close(); + log.warn("[SlowConsumer]: {} is slow consumer", remotingChannel); + snodeController.getSlowConsumerService().slowConsumerResolve(pushMessage, remotingChannel); continue; } snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.DEFAULT_TIMEOUT_MILLS); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java index 7e80778af88efc6f16648f21e87af02a453f9380..5370e64efbdb4f03d0e8a542c8608ff22a15b6a5 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java @@ -65,7 +65,7 @@ public class ScheduledServiceImpl implements ScheduledService { @Override public void run() { try { - snodeController.getEnodeService().sendHearbeat(enodeHeartbeat); + snodeController.getEnodeService().sendHeartbeat(enodeHeartbeat); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java new file mode 100644 index 0000000000000000000000000000000000000000..6a25009e04f8db0ef02a8e84fe81993359b602e4 --- /dev/null +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.snode.service; + +import org.apache.rocketmq.remoting.ClientConfig; +import org.apache.rocketmq.remoting.ServerConfig; +import org.apache.rocketmq.snode.SnodeController; +import org.apache.rocketmq.snode.client.SlowConsumerService; +import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl; +import org.apache.rocketmq.snode.config.SnodeConfig; +import org.apache.rocketmq.snode.offset.ConsumerOffsetManager; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class SlowConsumerServiceImplTest { + @Spy + private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig()); + + private final String enodeName = "testEndoe"; + + private final String topic = "SnodeTopic"; + + private final String group = "SnodeGroup"; + + private final Integer queue = 1; + + private SlowConsumerService slowConsumerService; + + @Mock + private ConsumerOffsetManager consumerOffsetManager; + + @Before + public void init() { + slowConsumerService = new SlowConsumerServiceImpl(snodeController); + } + + @Test + public void isSlowConsumerTest() { + snodeController.setConsumerOffsetManager(consumerOffsetManager); + when(snodeController.getConsumerOffsetManager().queryOffset(anyString(), anyString(), anyString(), anyInt())).thenReturn(1024L); + this.snodeController.getSnodeConfig().setSlowConsumerThreshold(100); + boolean slowConsumer = slowConsumerService.isSlowConsumer(2000, topic, queue, group, enodeName); + assertThat(slowConsumer).isTrue(); + } + + @Test + public void isSlowConsumerTestFalse() { + snodeController.setConsumerOffsetManager(consumerOffsetManager); + when(snodeController.getConsumerOffsetManager().queryOffset(anyString(), anyString(), anyString(), anyInt())).thenReturn(1024L); + this.snodeController.getSnodeConfig().setSlowConsumerThreshold(100); + boolean slowConsumer = slowConsumerService.isSlowConsumer(1025, topic, queue, group, enodeName); + assertThat(slowConsumer).isFalse(); + } +} diff --git a/snode/src/test/resources/conf/flow_control.yml b/snode/src/test/resources/conf/flow_control.yml index 17f6a7e7edfe435affaf0ab6ac08b06a7fe866ce..8f0ee05c031596fc4d19b4485df83a1a988e8443 100644 --- a/snode/src/test/resources/conf/flow_control.yml +++ b/snode/src/test/resources/conf/flow_control.yml @@ -29,6 +29,6 @@ snode: - flowControlResourceName: 310 flowControlGrade: 1 flowControlBehavior: 1 - flowControlResourceCount: 5.00 #KB/S + flowControlResourceCount: 5000.00 #KB/S topicLimit: \ No newline at end of file