提交 74a5b3f5 编写于 作者: D duhenglucky

Add request qps and request size per second flow control strategy

上级 e7a71b70
...@@ -17,12 +17,13 @@ ...@@ -17,12 +17,13 @@
package org.apache.rocketmq.common.flowcontrol; package org.apache.rocketmq.common.flowcontrol;
import com.alibaba.csp.sentinel.SphO; import com.alibaba.csp.sentinel.SphO;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
...@@ -36,12 +37,14 @@ public abstract class AbstractFlowControlService implements Interceptor { ...@@ -36,12 +37,14 @@ public abstract class AbstractFlowControlService implements Interceptor {
private final ThreadLocal<Boolean> acquiredThreadLocal = new ThreadLocal<Boolean>(); private final ThreadLocal<Boolean> acquiredThreadLocal = new ThreadLocal<Boolean>();
private final FlowControlConfig flowControlConfig; private final FlowControlConfig flowControlConfig;
public final String flowControlNameSeparator = "@";
public AbstractFlowControlService() { public AbstractFlowControlService() {
this.flowControlConfig = new FlowControlConfig(); this.flowControlConfig = new FlowControlConfig();
loadRules(this.flowControlConfig); loadRules(this.flowControlConfig);
} }
public abstract String getResourceKey(RequestContext requestContext); public abstract String getResourceName(RequestContext requestContext);
public abstract int getResourceCount(RequestContext requestContext); public abstract int getResourceCount(RequestContext requestContext);
...@@ -51,23 +54,24 @@ public abstract class AbstractFlowControlService implements Interceptor { ...@@ -51,23 +54,24 @@ public abstract class AbstractFlowControlService implements Interceptor {
@Override @Override
public void beforeRequest(RequestContext requestContext) { public void beforeRequest(RequestContext requestContext) {
String resourceKey = getResourceKey(requestContext); String resourceName = getResourceName(requestContext);
String flowControlType = getFlowControlType();
int resourceCount = getResourceCount(requestContext); int resourceCount = getResourceCount(requestContext);
String resourceKey = buildResourceName(flowControlType, resourceName);
log.info("resourceKey: {} resourceCount: {}", resourceKey, resourceCount);
resourceCount = resourceCount == 0 ? 1 : resourceCount; resourceCount = resourceCount == 0 ? 1 : resourceCount;
if (resourceKey != null) { boolean acquired = SphO.entry(resourceKey, resourceCount);
boolean acquired = SphO.entry(resourceKey, resourceCount); if (acquired) {
if (acquired) { this.acquiredThreadLocal.set(true);
this.acquiredThreadLocal.set(true); } else {
} else { rejectRequest(requestContext);
rejectRequest(requestContext);
}
} }
} }
@Override @Override
public void afterRequest(ResponseContext responseContext) { public void afterRequest(ResponseContext responseContext) {
Boolean acquired = this.acquiredThreadLocal.get(); Boolean acquired = this.acquiredThreadLocal.get();
if (acquired != null && acquired == true) { if (acquired != null && acquired) {
SphO.exit(); SphO.exit();
} }
} }
...@@ -75,7 +79,7 @@ public abstract class AbstractFlowControlService implements Interceptor { ...@@ -75,7 +79,7 @@ public abstract class AbstractFlowControlService implements Interceptor {
@Override @Override
public void onException(ExceptionContext exceptionContext) { public void onException(ExceptionContext exceptionContext) {
Boolean acquired = this.acquiredThreadLocal.get(); Boolean acquired = this.acquiredThreadLocal.get();
if (acquired != null && acquired == true) { if (acquired != null && acquired) {
SphO.exit(); SphO.exit();
} }
} }
...@@ -94,27 +98,39 @@ public abstract class AbstractFlowControlService implements Interceptor { ...@@ -94,27 +98,39 @@ public abstract class AbstractFlowControlService implements Interceptor {
log.warn("Get flow control config null by moduleName: {} ", moduleName); log.warn("Get flow control config null by moduleName: {} ", moduleName);
} }
} else { } else {
log.warn("flowControlConfig is null "); log.warn("FlowControlConfig is null ");
} }
return null; return null;
} }
private String buildResourceName(String flowControlType, String flowControlResourceName) {
StringBuffer sb = new StringBuffer(32);
sb.append(flowControlType).append(flowControlNameSeparator).append(flowControlResourceName);
return sb.toString();
}
private void loadRules(FlowControlConfig flowControlConfig) { private void loadRules(FlowControlConfig flowControlConfig) {
Map<String, Map<String, List<FlowControlRule>>> rules = flowControlConfig.getPlainFlowControlRules(); Map<String, Map<String, List<FlowControlRule>>> rules = flowControlConfig.getPlainFlowControlRules();
for (Map<String, List<FlowControlRule>> flowControlTypeMap : rules.values()) { List<FlowRule> sentinelRules = new ArrayList<FlowRule>();
for (List<FlowControlRule> list : flowControlTypeMap.values()) { for (Map<String, List<FlowControlRule>> rulesMap : rules.values()) {
Set<Map.Entry<String, List<FlowControlRule>>> entrySet = rulesMap.entrySet();
Iterator iterator = entrySet.iterator();
while (iterator.hasNext()) {
Map.Entry<String, List<FlowControlRule>> entry = (Map.Entry<String, List<FlowControlRule>>) iterator.next();
String flowControlType = entry.getKey();
List<FlowControlRule> list = entry.getValue();
for (FlowControlRule flowControlRule : list) { for (FlowControlRule flowControlRule : list) {
List<FlowRule> sentinelRules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule(); FlowRule rule1 = new FlowRule();
rule1.setResource(flowControlRule.getFlowControlResourceName()); rule1.setResource(buildResourceName(flowControlType, flowControlRule.getFlowControlResourceName()));
rule1.setCount(flowControlRule.getFlowControlResourceCount()); rule1.setCount(flowControlRule.getFlowControlResourceCount());
rule1.setGrade(flowControlRule.getFlowControlGrade()); rule1.setGrade(flowControlRule.getFlowControlGrade());
rule1.setLimitApp("default"); rule1.setLimitApp("default");
sentinelRules.add(rule1); sentinelRules.add(rule1);
FlowRuleManager.loadRules(sentinelRules);
} }
} }
} }
FlowRuleManager.loadRules(sentinelRules);
log.warn("Load Rules: {}" + FlowRuleManager.getRules());
} }
} }
...@@ -38,7 +38,7 @@ public class QPSFlowControlServiceImpl extends AbstractFlowControlService { ...@@ -38,7 +38,7 @@ public class QPSFlowControlServiceImpl extends AbstractFlowControlService {
} }
@Override @Override
public String getResourceKey(RequestContext requestContext) { public String getResourceName(RequestContext requestContext) {
if (RequestCode.HEART_BEAT == requestContext.getRequest().getCode()) { if (RequestCode.HEART_BEAT == requestContext.getRequest().getCode()) {
return null; return null;
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.flowcontrol;
import com.sun.org.apache.bcel.internal.generic.IF_ACMPEQ;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.flowcontrol.AbstractFlowControlService;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingRuntimeException;
import org.apache.rocketmq.remoting.interceptor.RequestContext;
import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
public class RequestSizeFlowControlServiceImpl extends AbstractFlowControlService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final AtomicLong logCount = new AtomicLong(0);
private final String flowControlType = "sizeLimit";
@Override
public String getResourceName(RequestContext requestContext) {
return requestContext.getRequest().getCode() + "";
}
/**
* @param requestContext
* @return Size of request KB
*/
@Override
public int getResourceCount(RequestContext requestContext) {
return requestContext.getRequest().getBody().length / 1024;
}
@Override
public String getFlowControlType() {
return flowControlType;
}
@Override
public void rejectRequest(RequestContext requestContext) {
if (logCount.getAndIncrement() % 100 == 0) {
log.warn("[REJECT]exceed system flow control config request size, start flow control for a while: requestContext: {} ", requestContext);
}
throw new RemotingRuntimeException(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECT]exceed system flow control config request size, start flow control for a while");
}
@Override
public String interceptorName() {
return "requestSizeFlowControlInterceptor";
}
}
...@@ -38,7 +38,7 @@ import org.apache.rocketmq.snode.SnodeController; ...@@ -38,7 +38,7 @@ import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.exception.SnodeException; import org.apache.rocketmq.snode.exception.SnodeException;
public class ConsumerOffsetManager { public class ConsumerOffsetManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private static final String TOPIC_GROUP_SEPARATOR = "@"; private static final String TOPIC_GROUP_SEPARATOR = "@";
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable = private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
......
org.apache.rocketmq.snode.flowcontrol.QPSFlowControlServiceImpl org.apache.rocketmq.snode.flowcontrol.QPSFlowControlServiceImpl
\ No newline at end of file org.apache.rocketmq.snode.flowcontrol.RequestSizeFlowControlServiceImpl
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册