huifer-how-to-limit-current.md 9.6 KB
Newer Older
1
# 如何限流?在工作中是怎么做的?说一下具体的实现?
Y
yanglbme 已提交
2

Y
yanglbme 已提交
3 4
- Author: [HuiFer](https://github.com/huifer)
- Description: 该文简单介绍限流相关技术以及实现
5 6

## 什么是限流
Y
yanglbme 已提交
7

8 9 10 11 12 13 14
> 限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。

## 限流方法

### 计数器

#### 实现方式
Y
yanglbme 已提交
15

Y
yanglbme 已提交
16
- 控制单位时间内的请求数量
Y
yanglbme 已提交
17

Y
yanglbme 已提交
18
```java
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56

import java.util.concurrent.atomic.AtomicInteger;

public class Counter {
    /**
     * 最大访问数量
     */
    private final int limit = 10;
    /**
     * 访问时间差
     */
    private final long timeout = 1000;
    /**
     * 请求时间
     */
    private long time;
    /**
     * 当前计数器
     */
    private AtomicInteger reqCount = new AtomicInteger(0);

    public boolean limit() {
        long now = System.currentTimeMillis();
        if (now < time + timeout) {
            // 单位时间内
            reqCount.addAndGet(1);
            return reqCount.get() <= limit;
        } else {
            // 超出单位时间
            time = now;
            reqCount = new AtomicInteger(0);
            return true;
        }
    }
}

```

Y
yanglbme 已提交
57 58 59
- 劣势
  - 假设在 00:01 时发生一个请求,在 00:01-00:58 之间不在发送请求,在 00:59 时发送剩下的所有请求 `n-1` (n 为限流请求数量),在下一分钟的 00:01 发送 n 个请求,这样在 2 秒钟内请求到达了 `2n - 1` 个.
    - 设每分钟请求数量为 60 个,每秒可以处理 1 个请求,用户在 00:59 发送 60 个请求,在 01:00 发送 60 个请求 此时 2 秒钟有 120 个请求(每秒 60 个请求),远远大于了每秒钟处理数量的阈值
60 61

### 滑动窗口
Y
yanglbme 已提交
62

63
#### 实现方式
Y
yanglbme 已提交
64

Y
yanglbme 已提交
65 66
- 滑动窗口是对计数器方式的改进, 增加一个时间粒度的度量单位
  - 把一分钟分成若干等分(6 份,每份 10 秒), 在每一份上设置独立计数器,在 00:00-00:09 之间发生请求计数器累加 1.当等分数量越大限流统计就越详细
67

Y
yanglbme 已提交
68
```java
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
package com.example.demo1.service;

import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.IntStream;

public class TimeWindow {
    private ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<Long>();

    /**
     * 间隔秒数
     */
    private int seconds;

    /**
     * 最大限流
     */
    private int max;

    public TimeWindow(int max, int seconds) {
        this.seconds = seconds;
        this.max = max;

        /**
         * 永续线程执行清理queue 任务
         */
        new Thread(() -> {
            while (true) {
                try {
                    // 等待 间隔秒数-1 执行清理操作
                    Thread.sleep((seconds - 1) * 1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                clean();
            }
        }).start();

    }

    public static void main(String[] args) throws Exception {

        final TimeWindow timeWindow = new TimeWindow(10, 1);

        // 测试3个线程
        IntStream.range(0, 3).forEach((i) -> {
            new Thread(() -> {

                while (true) {

                    try {
                        Thread.sleep(new Random().nextInt(20) * 100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    timeWindow.take();
                }

            }).start();

        });

    }

    /**
     * 获取令牌,并且添加时间
     */
    public void take() {

        long start = System.currentTimeMillis();
        try {

            int size = sizeOfValid();
            if (size > max) {
                System.err.println("超限");

            }
            synchronized (queue) {
                if (sizeOfValid() > max) {
                    System.err.println("超限");
                    System.err.println("queue中有 " + queue.size() + " 最大数量 " + max);
                }
                this.queue.offer(System.currentTimeMillis());
            }
            System.out.println("queue中有 " + queue.size() + " 最大数量 " + max);

        }

    }

    public int sizeOfValid() {
        Iterator<Long> it = queue.iterator();
        Long ms = System.currentTimeMillis() - seconds * 1000;
        int count = 0;
        while (it.hasNext()) {
            long t = it.next();
            if (t > ms) {
                // 在当前的统计时间范围内
                count++;
            }
        }

        return count;
    }

    /**
     * 清理过期的时间
     */
    public void clean() {
        Long c = System.currentTimeMillis() - seconds * 1000;

        Long tl = null;
        while ((tl = queue.peek()) != null && tl < c) {
            System.out.println("清理数据");
            queue.poll();
        }
    }

}

```

### Leaky Bucket 漏桶
Y
yanglbme 已提交
193

194
#### 实现方式
Y
yanglbme 已提交
195

Y
yanglbme 已提交
196
- 规定固定容量的桶, 有水进入, 有水流出. 对于流进的水我们无法估计进来的数量、速度, 对于流出的水我们可以控制速度.
Y
yanglbme 已提交
197

Y
yanglbme 已提交
198
```java
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
public class LeakBucket {
    /**
     * 时间
     */
    private long time;
    /**
     * 总量
     */
    private Double total;
    /**
     * 水流出去的速度
     */
    private Double rate;
    /**
     * 当前总量
     */
    private Double nowSize;

    public boolean limit() {
        long now = System.currentTimeMillis();
        nowSize = Math.max(0, (nowSize - (now - time) * rate));
        time = now;
        if ((nowSize + 1) < total) {
            nowSize++;
            return true;
        } else {
            return false;
        }

    }
}
```

### Token Bucket 令牌桶
Y
yanglbme 已提交
233

234 235
#### 实现方式

Y
yanglbme 已提交
236
- 规定固定容量的桶, token 以固定速度往桶内填充, 当桶满时 token 不会被继续放入, 每过来一个请求把 token 从桶中移除, 如果桶中没有 token 不能请求
Y
yanglbme 已提交
237

Y
yanglbme 已提交
238
```java
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
public class TokenBucket {
    /**
     * 时间
     */
    private long time;
    /**
     * 总量
     */
    private Double total;
    /**
     * token 放入速度
     */
    private Double rate;
    /**
     * 当前总量
     */
    private Double nowSize;

    public boolean limit() {
        long now = System.currentTimeMillis();
        nowSize = Math.min(total, nowSize + (now - time) * rate);
        time = now;
        if (nowSize < 1) {
            // 桶里没有token
            return false;
        } else {
            // 存在token
            nowSize -= 1;
            return true;
        }
    }

}
```

## 工作中的使用
Y
yanglbme 已提交
275

276
### spring cloud gateway
Y
yanglbme 已提交
277

Y
yanglbme 已提交
278
- spring cloud gateway 默认使用 redis 进行限流, 笔者一般只是修改修改参数属于拿来即用. 并没有去从头实现上述那些算法.
Y
yanglbme 已提交
279

Y
yanglbme 已提交
280
```xml
281 282 283 284 285 286 287 288 289 290
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
```

Y
yanglbme 已提交
291
```yaml
292 293 294 295
spring:
  cloud:
    gateway:
      routes:
Y
yanglbme 已提交
296
        - id: requestratelimiter_route
Y
yanglbme 已提交
297

Y
yanglbme 已提交
298 299 300 301
          uri: lb://pigx-upms
          order: 10000
          predicates:
            - Path=/admin/**
Y
yanglbme 已提交
302

Y
yanglbme 已提交
303 304
          filters:
            - name: RequestRateLimiter
Y
yanglbme 已提交
305

Y
yanglbme 已提交
306 307 308 309
              args:
                redis-rate-limiter.replenishRate: 1 # 令牌桶的容积
                redis-rate-limiter.burstCapacity: 3 # 流速 每秒
                key-resolver: "#{@remoteAddrKeyResolver}" #SPEL表达式去的对应的bean
Y
yanglbme 已提交
310

Y
yanglbme 已提交
311
            - StripPrefix=1
312 313
```

Y
yanglbme 已提交
314
```java
315 316 317 318 319 320 321
@Bean
KeyResolver remoteAddrKeyResolver() {
    return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName());
}
```

### sentinel
Y
yanglbme 已提交
322

Y
yanglbme 已提交
323
- 通过配置来控制每个 url 的流量
Y
yanglbme 已提交
324

Y
yanglbme 已提交
325
```xml
326 327 328 329 330
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
```
Y
yanglbme 已提交
331

Y
yanglbme 已提交
332
```yaml
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
spring:
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848
    sentinel:
      transport:
        dashboard: localhost:8080
        port: 8720
      datasource:
        ds:
          nacos:
            server-addr: localhost:8848
            dataId: spring-cloud-sentinel-nacos
            groupId: DEFAULT_GROUP
            rule-type: flow
            namespace: xxxxxxxx
```
Y
yanglbme 已提交
351

Y
yanglbme 已提交
352
- 配置内容在 nacos 上进行编辑
Y
yanglbme 已提交
353

Y
yanglbme 已提交
354
```json
355
[
Y
yanglbme 已提交
356 357 358 359 360 361 362 363 364
  {
    "resource": "/hello",
    "limitApp": "default",
    "grade": 1,
    "count": 1,
    "strategy": 0,
    "controlBehavior": 0,
    "clusterMode": false
  }
365 366
]
```
Y
yanglbme 已提交
367

Y
yanglbme 已提交
368 369 370 371 372 373 374
- resource:资源名,即限流规则的作用对象。
- limitApp:流控针对的调用来源,若为 default 则不区分调用来源。
- grade:限流阈值类型,QPS 或线程数模式,0 代表根据并发数量来限流,1 代表根据 QPS 来进行流量控制。
- count:限流阈值
- strategy:判断的根据是资源自身,还是根据其它关联资源 (refResource),还是根据链路入口
- controlBehavior:流控效果(直接拒绝 / 排队等待 / 慢启动模式)
- clusterMode:是否为集群模式
Y
yanglbme 已提交
375

376
### 总结
Y
yanglbme 已提交
377

Y
yanglbme 已提交
378
> sentinel 和 spring cloud gateway 两个框架都是很好的限流框架, 但是在我使用中还没有将[spring-cloud-alibaba](https://github.com/alibaba/spring-cloud-alibaba)接入到项目中进行使用, 所以我会选择**spring cloud gateway**, 当接入完整的或者接入 Nacos 项目使用 setinel 会有更加好的体验.