From 74a5b3f5d4e2a1e489e0242e2e6b8cd9cb63f5d5 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Tue, 15 Jan 2019 11:29:59 +0800 Subject: [PATCH] Add request qps and request size per second flow control strategy --- .../AbstractFlowControlService.java | 54 +++++++++------ .../QPSFlowControlServiceImpl.java | 2 +- .../RequestSizeFlowControlServiceImpl.java | 65 +++++++++++++++++++ .../snode/offset/ConsumerOffsetManager.java | 2 +- ...node.interceptor.RemotingServerInterceptor | 3 +- 5 files changed, 104 insertions(+), 22 deletions(-) create mode 100644 snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/RequestSizeFlowControlServiceImpl.java diff --git a/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java index fc4ef40b..a2d8adc5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java +++ b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java @@ -17,12 +17,13 @@ package org.apache.rocketmq.common.flowcontrol; import com.alibaba.csp.sentinel.SphO; -import com.alibaba.csp.sentinel.slots.block.RuleConstant; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; @@ -36,12 +37,14 @@ public abstract class AbstractFlowControlService implements Interceptor { private final ThreadLocal acquiredThreadLocal = new ThreadLocal(); private final FlowControlConfig flowControlConfig; + public final String flowControlNameSeparator = "@"; + public AbstractFlowControlService() { this.flowControlConfig = new FlowControlConfig(); loadRules(this.flowControlConfig); } - public abstract String getResourceKey(RequestContext requestContext); + public abstract String getResourceName(RequestContext requestContext); public abstract int getResourceCount(RequestContext requestContext); @@ -51,23 +54,24 @@ public abstract class AbstractFlowControlService implements Interceptor { @Override public void beforeRequest(RequestContext requestContext) { - String resourceKey = getResourceKey(requestContext); + String resourceName = getResourceName(requestContext); + String flowControlType = getFlowControlType(); int resourceCount = getResourceCount(requestContext); + String resourceKey = buildResourceName(flowControlType, resourceName); + log.info("resourceKey: {} resourceCount: {}", resourceKey, resourceCount); resourceCount = resourceCount == 0 ? 1 : resourceCount; - if (resourceKey != null) { - boolean acquired = SphO.entry(resourceKey, resourceCount); - if (acquired) { - this.acquiredThreadLocal.set(true); - } else { - rejectRequest(requestContext); - } + boolean acquired = SphO.entry(resourceKey, resourceCount); + if (acquired) { + this.acquiredThreadLocal.set(true); + } else { + rejectRequest(requestContext); } } @Override public void afterRequest(ResponseContext responseContext) { Boolean acquired = this.acquiredThreadLocal.get(); - if (acquired != null && acquired == true) { + if (acquired != null && acquired) { SphO.exit(); } } @@ -75,7 +79,7 @@ public abstract class AbstractFlowControlService implements Interceptor { @Override public void onException(ExceptionContext exceptionContext) { Boolean acquired = this.acquiredThreadLocal.get(); - if (acquired != null && acquired == true) { + if (acquired != null && acquired) { SphO.exit(); } } @@ -94,27 +98,39 @@ public abstract class AbstractFlowControlService implements Interceptor { log.warn("Get flow control config null by moduleName: {} ", moduleName); } } else { - log.warn("flowControlConfig is null "); + log.warn("FlowControlConfig is null "); } return null; } + private String buildResourceName(String flowControlType, String flowControlResourceName) { + StringBuffer sb = new StringBuffer(32); + sb.append(flowControlType).append(flowControlNameSeparator).append(flowControlResourceName); + return sb.toString(); + } + private void loadRules(FlowControlConfig flowControlConfig) { Map>> rules = flowControlConfig.getPlainFlowControlRules(); - for (Map> flowControlTypeMap : rules.values()) { - for (List list : flowControlTypeMap.values()) { + List sentinelRules = new ArrayList(); + for (Map> rulesMap : rules.values()) { + Set>> entrySet = rulesMap.entrySet(); + Iterator iterator = entrySet.iterator(); + while (iterator.hasNext()) { + Map.Entry> entry = (Map.Entry>) iterator.next(); + String flowControlType = entry.getKey(); + List list = entry.getValue(); for (FlowControlRule flowControlRule : list) { - List sentinelRules = new ArrayList(); FlowRule rule1 = new FlowRule(); - rule1.setResource(flowControlRule.getFlowControlResourceName()); + rule1.setResource(buildResourceName(flowControlType, flowControlRule.getFlowControlResourceName())); rule1.setCount(flowControlRule.getFlowControlResourceCount()); rule1.setGrade(flowControlRule.getFlowControlGrade()); rule1.setLimitApp("default"); sentinelRules.add(rule1); - FlowRuleManager.loadRules(sentinelRules); } } } + FlowRuleManager.loadRules(sentinelRules); + log.warn("Load Rules: {}" + FlowRuleManager.getRules()); } - } + diff --git a/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java index 96a1e962..743ac9b4 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java @@ -38,7 +38,7 @@ public class QPSFlowControlServiceImpl extends AbstractFlowControlService { } @Override - public String getResourceKey(RequestContext requestContext) { + public String getResourceName(RequestContext requestContext) { if (RequestCode.HEART_BEAT == requestContext.getRequest().getCode()) { return null; } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/RequestSizeFlowControlServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/RequestSizeFlowControlServiceImpl.java new file mode 100644 index 00000000..90886941 --- /dev/null +++ b/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/RequestSizeFlowControlServiceImpl.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.snode.flowcontrol; + +import com.sun.org.apache.bcel.internal.generic.IF_ACMPEQ; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.flowcontrol.AbstractFlowControlService; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.exception.RemotingRuntimeException; +import org.apache.rocketmq.remoting.interceptor.RequestContext; +import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; + +public class RequestSizeFlowControlServiceImpl extends AbstractFlowControlService { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); + private final AtomicLong logCount = new AtomicLong(0); + private final String flowControlType = "sizeLimit"; + + @Override + public String getResourceName(RequestContext requestContext) { + return requestContext.getRequest().getCode() + ""; + } + + /** + * @param requestContext + * @return Size of request KB + */ + @Override + public int getResourceCount(RequestContext requestContext) { + return requestContext.getRequest().getBody().length / 1024; + } + + @Override + public String getFlowControlType() { + return flowControlType; + } + + @Override + public void rejectRequest(RequestContext requestContext) { + if (logCount.getAndIncrement() % 100 == 0) { + log.warn("[REJECT]exceed system flow control config request size, start flow control for a while: requestContext: {} ", requestContext); + } + throw new RemotingRuntimeException(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECT]exceed system flow control config request size, start flow control for a while"); + } + + @Override + public String interceptorName() { + return "requestSizeFlowControlInterceptor"; + } +} diff --git a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java index e295a158..3d53ef10 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java @@ -38,7 +38,7 @@ import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.exception.SnodeException; public class ConsumerOffsetManager { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private static final String TOPIC_GROUP_SEPARATOR = "@"; private ConcurrentMap> offsetTable = diff --git a/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.RemotingServerInterceptor b/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.RemotingServerInterceptor index 5b1dd525..9c742a22 100644 --- a/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.RemotingServerInterceptor +++ b/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.RemotingServerInterceptor @@ -1 +1,2 @@ -org.apache.rocketmq.snode.flowcontrol.QPSFlowControlServiceImpl \ No newline at end of file +org.apache.rocketmq.snode.flowcontrol.QPSFlowControlServiceImpl +org.apache.rocketmq.snode.flowcontrol.RequestSizeFlowControlServiceImpl \ No newline at end of file -- GitLab