提交 e49c6169 编写于 作者: D duhenglucky

Add slow consumer resolve strategy

上级 36481471
......@@ -401,4 +401,8 @@ public class SnodeController {
public void setSlowConsumerService(SlowConsumerService slowConsumerService) {
this.slowConsumerService = slowConsumerService;
}
public void setConsumerOffsetManager(ConsumerOffsetManager consumerOffsetManager) {
this.consumerOffsetManager = consumerOffsetManager;
}
}
......@@ -16,7 +16,29 @@
*/
package org.apache.rocketmq.snode.client;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface SlowConsumerService {
boolean isSlowConsumer(long latestLogicOffset, String topic, int queueId, String consumerGroup, String enodeName);
/**
* Check whether this consumer is slow consumer, slow consumer means this consumer's acked offset is far behind
* current offset.
*
* @param currentOffset Current offset in consumer queue.
* @param topic
* @param queueId
* @param consumerGroup
* @param enodeName
* @return If this consumer is slow consumer, return true, otherwise false.
*/
boolean isSlowConsumer(long currentOffset, String topic, int queueId, String consumerGroup, String enodeName);
/**
* When a consumer is checked as slow consumer, this method will be invoked to do next action.
*
* @param pushMessage The message will be pushed to consumer.
* @param remotingChannel Consumer channel.
*/
void slowConsumerResolve(RemotingCommand pushMessage, RemotingChannel remotingChannel);
}
......@@ -19,6 +19,8 @@ package org.apache.rocketmq.snode.client.impl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.SlowConsumerService;
......@@ -32,14 +34,19 @@ public class SlowConsumerServiceImpl implements SlowConsumerService {
}
@Override
public boolean isSlowConsumer(long latestLogicOffset, String topic, int queueId,
public boolean isSlowConsumer(long currentOffset, String topic, int queueId,
String consumerGroup, String enodeName) {
long ackedOffset = this.snodeController.getConsumerOffsetManager().queryOffset(enodeName, consumerGroup, topic, queueId);
if (latestLogicOffset - ackedOffset > snodeController.getSnodeConfig().getSlowConsumerThreshold()) {
log.warn("[SlowConsumer] group: {}, lastAckedOffset:{} nowOffset:{} ", consumerGroup, ackedOffset, latestLogicOffset);
if (currentOffset - ackedOffset > snodeController.getSnodeConfig().getSlowConsumerThreshold()) {
log.warn("[SlowConsumer] group: {}, lastAckedOffset:{} nowOffset:{} ", consumerGroup, ackedOffset, currentOffset);
return true;
}
return false;
}
@Override
public void slowConsumerResolve(RemotingCommand pushMessage, RemotingChannel remotingChannel) {
log.warn("[SlowConsumer] RemotingChannel address: {}", remotingChannel.remoteAddress());
}
}
package org.apache.rocketmq.snode.service;/*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
......@@ -14,6 +14,7 @@ package org.apache.rocketmq.snode.service;/*
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.service;
public interface ClientService {
......
......@@ -19,6 +19,8 @@ package org.apache.rocketmq.snode.service;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
......@@ -27,20 +29,76 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface EnodeService {
void sendHearbeat(RemotingCommand remotingCommand);
/**
* Send Heartbeat data to enode server to keep alive
*
* @param remotingCommand Heartbeat request
*/
void sendHeartbeat(RemotingCommand remotingCommand);
/**
* Send message to enode.
*
* @param enodeName Enode server name
* @param request {@link SendMessageRequestHeaderV2} Send message request header
* @return Send message response future
*/
CompletableFuture<RemotingCommand> sendMessage(final String enodeName, final RemotingCommand request);
/**
* Pull message from enode server.
*
* @param enodeName Enode server name
* @param request {@link PullMessageRequestHeader} Pull message request header
* @return Pull message Response future
*/
CompletableFuture<RemotingCommand> pullMessage(final String enodeName, final RemotingCommand request);
/**
* Create topic to enode server.
*
* @param enodeName Enode server name
* @param topicConfig {@link TopicConfig} Topic config information
* @return
* @throws InterruptedException
* @throws RemotingTimeoutException
* @throws RemotingSendRequestException
* @throws RemotingConnectException
*/
RemotingCommand creatTopic(String enodeName,
TopicConfig topicConfig) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
/**
* Update Enode address from name server
*
* @param clusterName Cluster name, keep insistence with enode cluster
* @throws InterruptedException
* @throws RemotingTimeoutException
* @throws RemotingSendRequestException
* @throws RemotingConnectException
* @throws MQBrokerException
*/
void updateEnodeAddress(String clusterName) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException;
/**
* Persist subscription config data to storage server, maybe file or to enode, in this situation, will persist to
* enode
*
* @param subscriptionGroupConfig {@link SubscriptionGroupConfig} Group subscription config
* @return
*/
boolean persistSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig);
/**
* Persist offset information of consumer group to storage server.
*
* @param enodeName Which enode server.
* @param groupName Consumer group name.
* @param topic Related topic.
* @param queueId QueueId of related topic.
* @param offset Current offset of target queue of subscribed topic.
*/
void persistOffset(String enodeName, String groupName, String topic, int queueId, long offset);
RemotingCommand loadOffset(String enodeName, String consumerGroup, String topic,
......
......@@ -61,14 +61,14 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
public void sendHearbeat(RemotingCommand remotingCommand) {
public void sendHeartbeat(RemotingCommand remotingCommand) {
for (Map.Entry<String, HashMap<Long, String>> entry : enodeTable.entrySet()) {
String enodeAddr = entry.getValue().get(MixAll.MASTER_ID);
if (enodeAddr != null) {
try {
this.snodeController.getRemotingClient().invokeSync(enodeAddr, remotingCommand, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
} catch (Exception ex) {
log.warn("Send heart beat faild:{} ,ex:{}", enodeAddr, ex);
log.warn("Send heart beat failed:{}, ex:{}", enodeAddr, ex);
}
}
}
......
......@@ -104,8 +104,8 @@ public class PushServiceImpl implements PushService {
if (subscription.getSubscriptionData(topic) != null) {
boolean slowConsumer = snodeController.getSlowConsumerService().isSlowConsumer(sendMessageResponseHeader.getQueueOffset(), topic, queueId, consumerGroup, enodeName);
if (slowConsumer) {
log.warn("[SlowConsumer]: {} closed as slow consumer", remotingChannel);
remotingChannel.close();
log.warn("[SlowConsumer]: {} is slow consumer", remotingChannel);
snodeController.getSlowConsumerService().slowConsumerResolve(pushMessage, remotingChannel);
continue;
}
snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
......
......@@ -65,7 +65,7 @@ public class ScheduledServiceImpl implements ScheduledService {
@Override
public void run() {
try {
snodeController.getEnodeService().sendHearbeat(enodeHeartbeat);
snodeController.getEnodeService().sendHeartbeat(enodeHeartbeat);
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.service;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.SlowConsumerService;
import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class SlowConsumerServiceImplTest {
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
private final String enodeName = "testEndoe";
private final String topic = "SnodeTopic";
private final String group = "SnodeGroup";
private final Integer queue = 1;
private SlowConsumerService slowConsumerService;
@Mock
private ConsumerOffsetManager consumerOffsetManager;
@Before
public void init() {
slowConsumerService = new SlowConsumerServiceImpl(snodeController);
}
@Test
public void isSlowConsumerTest() {
snodeController.setConsumerOffsetManager(consumerOffsetManager);
when(snodeController.getConsumerOffsetManager().queryOffset(anyString(), anyString(), anyString(), anyInt())).thenReturn(1024L);
this.snodeController.getSnodeConfig().setSlowConsumerThreshold(100);
boolean slowConsumer = slowConsumerService.isSlowConsumer(2000, topic, queue, group, enodeName);
assertThat(slowConsumer).isTrue();
}
@Test
public void isSlowConsumerTestFalse() {
snodeController.setConsumerOffsetManager(consumerOffsetManager);
when(snodeController.getConsumerOffsetManager().queryOffset(anyString(), anyString(), anyString(), anyInt())).thenReturn(1024L);
this.snodeController.getSnodeConfig().setSlowConsumerThreshold(100);
boolean slowConsumer = slowConsumerService.isSlowConsumer(1025, topic, queue, group, enodeName);
assertThat(slowConsumer).isFalse();
}
}
......@@ -29,6 +29,6 @@ snode:
- flowControlResourceName: 310
flowControlGrade: 1
flowControlBehavior: 1
flowControlResourceCount: 5.00 #KB/S
flowControlResourceCount: 5000.00 #KB/S
topicLimit:
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册