提交 133be328 编写于 作者: 鲸落和鲨掉's avatar 鲸落和鲨掉

分布式动态配置中心控制限流、切面控制超时熔断

上级 a4c8bafb
......@@ -117,6 +117,10 @@
<artifactId>business-behavior-monitor-sdk</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-javanica</artifactId>
</dependency>
<!-- 工程模块;启动依赖 trigger->domain, infrastructure-->
<dependency>
<groupId>cn.bugstack</groupId>
......
......@@ -4,12 +4,14 @@ import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@Configurable
@EnableScheduling
@EnableDubbo
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class Application {
public static void main(String[] args){
......
package cn.bugstack.aop;
import cn.bugstack.types.annotations.DCCValue;
import cn.bugstack.types.annotations.RateLimiterAccessInterceptor;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
/**
* @ClassName: RateLimiterAOP
* @Description:
* @Author: zhaoyongfeng
* @Date: 2025/1/19 17:04
*/
@Slf4j
@Aspect
@Component
public class RateLimiterAOP {
@DCCValue("rateLimiterSwitch:close")
private String rateLimiterSwitch;
// 个人限频记录1分钟
private final Cache<String, RateLimiter> loginRecord = CacheBuilder.newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES)
.build();
// 个人限频黑名单24h - 分布式业务场景,可以记录到 Redis 中
private final Cache<String, Long> blacklist = CacheBuilder.newBuilder()
.expireAfterWrite(24, TimeUnit.HOURS)
.build();
@Pointcut("@annotation(cn.bugstack.types.annotations.RateLimiterAccessInterceptor)")
public void aopPoint() {
}
@Around("aopPoint() && @annotation(rateLimiterAccessInterceptor)")
public Object doRouter(ProceedingJoinPoint jp, RateLimiterAccessInterceptor rateLimiterAccessInterceptor) throws Throwable{
// 0. 限流开关【open 开启、close 关闭】关闭后,不会走限流策略
if (StringUtils.isBlank(rateLimiterSwitch) || "close".equals(rateLimiterSwitch)) {
return jp.proceed();
}
String key = rateLimiterAccessInterceptor.key();
if (StringUtils.isBlank(key)) {
throw new RuntimeException("annotation RateLimiter uId is null!");
}
// 获取拦截字段
String keyAttr = getAttrValue(key, jp.getArgs());
log.info("aop attr {}", keyAttr);
// 黑名单拦截
if (!"all".equals(keyAttr) && rateLimiterAccessInterceptor.blacklistCount() != 0 && null != blacklist.getIfPresent(keyAttr) && blacklist.getIfPresent(keyAttr) > rateLimiterAccessInterceptor.blacklistCount()) {
log.info("限流-黑名单拦截(24h):{}", keyAttr);
return fallbackMethodResult(jp, rateLimiterAccessInterceptor.fallbackMethod());
}
// 获取限流 -> Guava 缓存1分钟
RateLimiter rateLimiter = loginRecord.getIfPresent(keyAttr);
if (null == rateLimiter) {
rateLimiter = RateLimiter.create(rateLimiterAccessInterceptor.permitsPerSecond());
loginRecord.put(keyAttr, rateLimiter);
}
// 限流拦截
if (!rateLimiter.tryAcquire()) {
if (rateLimiterAccessInterceptor.blacklistCount() != 0) {
if (null == blacklist.getIfPresent(keyAttr)) {
blacklist.put(keyAttr, 1L);
} else {
blacklist.put(keyAttr, blacklist.getIfPresent(keyAttr) + 1L);
}
}
log.info("限流-超频次拦截:{}", keyAttr);
return fallbackMethodResult(jp, rateLimiterAccessInterceptor.fallbackMethod());
}
// 返回结果
return jp.proceed();
}
/**
* @description: 调用用户配置的回调方法,当拦截后,返回回调结果。
* @return: java.lang.Object
* @author zhaoyongfeng
* @date: 2025/1/19 17:59
*/
private Object fallbackMethodResult(JoinPoint jp, String fallbackMethod) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Signature sig = jp.getSignature();
MethodSignature methodSignature = (MethodSignature) sig;
Method method = jp.getTarget().getClass().getMethod(fallbackMethod, methodSignature.getParameterTypes());
return method.invoke(jp.getThis(), jp.getArgs());
}
/**
* 实际根据自身业务调整,主要是为了获取通过某个值做拦截
*/
public String getAttrValue(String attr, Object[] args) {
if (args[0] instanceof String) {
return args[0].toString();
}
String filedValue = null;
for (Object arg : args) {
try {
if (StringUtils.isNotBlank(filedValue)) {
break;
}
// filedValue = BeanUtils.getProperty(arg, attr);
// fix: 使用lombok时,uId这种字段的get方法与idea生成的get方法不同,会导致获取不到属性值,改成反射获取解决
filedValue = String.valueOf(this.getValueByName(arg, attr));
} catch (Exception e) {
log.error("获取路由属性值失败 attr:{}", attr, e);
}
}
return filedValue;
}
/**
* 获取对象的特定属性值
*
* @param item 对象
* @param name 属性名
* @return 属性值
* @author tang
*/
private Object getValueByName(Object item, String name) {
try {
Field field = getFieldByName(item, name);
if (field == null) {
return null;
}
field.setAccessible(true);
Object o = field.get(item);
field.setAccessible(false);
return o;
} catch (IllegalAccessException e) {
return null;
}
}
/**
* 根据名称获取方法,该方法同时兼顾继承类获取父类的属性
*
* @param item 对象
* @param name 属性名
* @return 该属性对应方法
* @author tang
*/
private Field getFieldByName(Object item, String name) {
try {
Field field;
try {
field = item.getClass().getDeclaredField(name);
} catch (NoSuchFieldException e) {
field = item.getClass().getSuperclass().getDeclaredField(name);
}
return field;
} catch (NoSuchFieldException e) {
return null;
}
}
}
......@@ -5,6 +5,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Configuration;
......@@ -64,8 +66,14 @@ public class DCCValueBeanFactory implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> beanClass = bean.getClass();
Field[] fields = beanClass.getDeclaredFields();
// 注意;增加 AOP 代理后,获得类的方式要通过 AopProxyUtils.getTargetClass(bean); 不能直接 bean.class 因为代理后类的结构发生变化,这样不能获得到自己的自定义注解了。
Class<?> targetBeanClass = bean.getClass();
Object targetBeanObject = bean;
if (AopUtils.isAopProxy(bean)) {
targetBeanClass = AopUtils.getTargetClass(bean);
targetBeanObject = AopProxyUtils.getSingletonTarget(bean);
}
Field[] fields = targetBeanClass.getDeclaredFields();
for (Field field : fields) {
if (!field.isAnnotationPresent(DCCValue.class)) {
continue;
......@@ -89,7 +97,7 @@ public class DCCValueBeanFactory implements BeanPostProcessor {
client.create().creatingParentsIfNeeded().forPath(keyPath);
if (StringUtils.isNotBlank(defaultValue)) {
field.setAccessible(true);
field.set(bean, defaultValue);
field.set(targetBeanObject, defaultValue);
field.setAccessible(false);
}
log.info("DCC 节点监听 创建节点 {}", keyPath);
......@@ -97,7 +105,7 @@ public class DCCValueBeanFactory implements BeanPostProcessor {
String configValue = new String(client.getData().forPath(keyPath));
if (StringUtils.isNotBlank(configValue)) {
field.setAccessible(true);
field.set(bean, configValue);
field.set(targetBeanObject, configValue);
field.setAccessible(false);
log.info("DCC 节点监听 设置配置 {} {} {}", keyPath, field.getName(), configValue);
}
......@@ -106,7 +114,7 @@ public class DCCValueBeanFactory implements BeanPostProcessor {
throw new RuntimeException(e);
}
dccObjGroup.put(BASE_CONFIG_PATH_CONFIG.concat("/").concat(key), bean);
dccObjGroup.put(BASE_CONFIG_PATH_CONFIG.concat("/").concat(key), targetBeanObject);
}
return bean;
}
......
package cn.bugstack.domain.strategy.service.rule.chain.impl;
import cn.bugstack.domain.strategy.repository.IStrategyRepository;
import cn.bugstack.domain.strategy.service.rule.chain.AbstractLogicChain;
import cn.bugstack.domain.strategy.service.rule.chain.factory.DefaultChainFactory;
import cn.bugstack.types.common.Constants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author Fuzhengwei bugstack.cn @小傅哥
* @description 黑名单责任链
* @create 2024-01-20 10:23
*/
@Slf4j
@Component("rule_blacklist")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class BackListLogicChain extends AbstractLogicChain {
@Resource
private IStrategyRepository repository;
@Override
public DefaultChainFactory.StrategyAwardVO logic(String userId, Long strategyId) {
log.info("抽奖责任链-黑名单开始 userId: {} strategyId: {} ruleModel: {}", userId, strategyId, ruleModel());
// 查询规则值配置
String ruleValue = repository.queryStrategyRuleValue(strategyId, ruleModel());
String[] splitRuleValue = ruleValue.split(Constants.COLON);
Integer awardId = Integer.parseInt(splitRuleValue[0]);
// 黑名单抽奖判断
String[] userBlackIds = splitRuleValue[1].split(Constants.SPLIT);
for (String userBlackId : userBlackIds) {
if (userId.equals(userBlackId)) {
log.info("抽奖责任链-黑名单接管 userId: {} strategyId: {} ruleModel: {} awardId: {}", userId, strategyId, ruleModel(), awardId);
return DefaultChainFactory.StrategyAwardVO.builder()
.awardId(awardId)
.logicModel(ruleModel())
// 写入默认配置黑名单奖品值 0.01 ~ 1 积分,也可以配置到数据库表中
.awardRuleValue("0.01,1")
.build();
}
}
// 过滤其他责任链
log.info("抽奖责任链-黑名单放行 userId: {} strategyId: {} ruleModel: {}", userId, strategyId, ruleModel());
return next().logic(userId, strategyId);
}
@Override
protected String ruleModel() {
return DefaultChainFactory.LogicModel.RULE_BLACKLIST.getCode();
}
}
......@@ -46,6 +46,10 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-javanica</artifactId>
</dependency>
<!-- 系统模块 -->
<dependency>
<groupId>cn.bugstack</groupId>
......
......@@ -26,9 +26,12 @@ import cn.bugstack.trigger.api.IRaffleActivityService;
import cn.bugstack.trigger.api.dto.*;
import cn.bugstack.trigger.api.response.Response;
import cn.bugstack.types.annotations.DCCValue;
import cn.bugstack.types.annotations.RateLimiterAccessInterceptor;
import cn.bugstack.types.enums.ResponseCode;
import cn.bugstack.types.exception.AppException;
import com.alibaba.fastjson.JSON;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixProperty;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
......@@ -133,12 +136,18 @@ public class RaffleActivityController implements IRaffleActivityService {
* "activityId": 100301
* }'
*/
@RateLimiterAccessInterceptor(key = "userId", fallbackMethod = "drawRateLimiterError", permitsPerSecond = 1.0d, blacklistCount = 1)
@HystrixCommand(commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "150")
}, fallbackMethod = "drawHystrixError"
)
@RequestMapping(value = "draw", method = RequestMethod.POST)
@Override
public Response<ActivityDrawResponseDTO> draw(@RequestBody ActivityDrawRequestDTO request) {
try {
log.info("活动抽奖开始 userId:{} activityId:{}", request.getUserId(), request.getActivityId());
if (!"open".equals(degradeSwitch)) {
// 0. 降级开关【open 开启、close 关闭】
if (StringUtils.isNotBlank(degradeSwitch) && "open".equals(degradeSwitch)) {
return Response.<ActivityDrawResponseDTO>builder()
.code(ResponseCode.DEGRADE_SWITCH.getCode())
.info(ResponseCode.DEGRADE_SWITCH.getInfo())
......@@ -200,7 +209,21 @@ public class RaffleActivityController implements IRaffleActivityService {
.build();
}
}
public Response<ActivityDrawResponseDTO> drawRateLimiterError(@RequestBody ActivityDrawRequestDTO request) {
log.info("活动抽奖限流 userId:{} activityId:{}", request.getUserId(), request.getActivityId());
return Response.<ActivityDrawResponseDTO>builder()
.code(ResponseCode.RATE_LIMITER.getCode())
.info(ResponseCode.RATE_LIMITER.getInfo())
.build();
}
public Response<ActivityDrawResponseDTO> drawHystrixError(@RequestBody ActivityDrawRequestDTO request) {
log.info("活动抽奖熔断 userId:{} activityId:{}", request.getUserId(), request.getActivityId());
return Response.<ActivityDrawResponseDTO>builder()
.code(ResponseCode.HYSTRIX.getCode())
.info(ResponseCode.HYSTRIX.getInfo())
.build();
}
/**
* 日历签到返利接口
*
......
package cn.bugstack.types.annotations;
import java.lang.annotation.*;
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@Documented
public @interface RateLimiterAccessInterceptor {
/** 用哪个字段作为拦截标识,未配置则默认走全部 */
String key() default "all";
/** 限制频次(每秒请求次数) */
double permitsPerSecond();
/** 黑名单拦截(多少次限制后加入黑名单)0 不限制 */
double blacklistCount() default 0;
/** 拦截后的执行方法 */
String fallbackMethod();
}
......@@ -14,6 +14,8 @@ public enum ResponseCode {
ILLEGAL_PARAMETER("0002", "非法参数"),
INDEX_DUP("0003", "唯一索引冲突"),
DEGRADE_SWITCH("0004", "活动已降级"),
RATE_LIMITER("0005", "访问限流拦截"),
HYSTRIX("0006", "访问熔断拦截"),
STRATEGY_RULE_WEIGHT_IS_NULL("ERR_BIZ_001", "业务异常,策略规则中 rule_weight 权重规则已适用但未配置"),
UN_ASSEMBLED_STRATEGY_ARMORY("ERR_BIZ_002", "抽奖策略配置未装配,请通过IStrategyArmory完成装配"),
ACTIVITY_STATE_ERROR("ERR_BIZ_003", "活动未开启(非open状态)"),
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -142,6 +142,11 @@
<artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-javanica</artifactId>
<version>1.5.18</version>
</dependency>
<!-- 工程模块 -->
<dependency>
<groupId>cn.bugstack</groupId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册