README.md 9.4 KB
Newer Older
小傅哥's avatar
小傅哥 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
# RabbitMQ

本文的宗旨在于通过简单干净实践的方式教会读者,使用 Docker 配置 RabbitMQ 在 DDD 结构下验证使用。关于 MQ 的场景内容已经在 [RocketMQ](https://bugstack.cn/md/road-map/rocketmq.html) 一节中做了讲解,本文只要为大家扩展另外一种 MQ 的使用。方便有需要的伙伴可以做技术栈替换。

[RabbitMQ](https://www.rabbitmq.com/) 是一个由 Erlang 开发的 AMQP (Advanced Message Queuing Protocol) 的开源实现。非常轻量,用于部署,有自己提供好的管理后台,非常容易上手使用。在功能上支持订阅、广播、路由和通配符,可以适合各类场景诉求。

本文涉及的工程:
- xfg-dev-tech-rabbitmq:[https://gitcode.net/KnowledgePlanet/road-map/xfg-dev-tech-rabbitmq](https://gitcode.net/KnowledgePlanet/road-map/xfg-dev-tech-rabbitmq)
- RabbitMQ Docker 安装:[docs/dev-ops/docker-compose.yml](https://gitcode.net/KnowledgePlanet/road-map/xfg-dev-tech-rabbitmq/-/blob/master/docs/dev-ops/docker-compose.yml)

## 一、环境安装

因为本文是在 [RocketMQ](https://bugstack.cn/md/road-map/rocketmq.html) 一节的扩展,所以只讲解下技术使用即可。

本案例涉及了 RabbitMQ 的使用,都已经在工程中提供了安装脚本,可以按需执行。—— 前置条件已安装 [Docker](https://bugstack.cn/md/road-map/docker.html) 环境。

<div align="center">
    <img src="./docs/images/roadmap-rabbitmq-01.png?raw=true" width="750px">
</div>

<div align="center">
    <img src="./docs/images/roadmap-rabbitmq-02.png?raw=true" width="750px">
</div>

- Mac 电脑会比较好安装一些,直接在 IntelliJ IDEA 点击小绿色按钮即可完成安装。安装完成后进入 [http://localhost:9000/#!/2/docker/containers](http://localhost:9000/#!/2/docker/containers) - 可看到 RabbitMQ 运行。
- Windows 电脑安装 Docker 需要折腾下
- Linux 服务器,需要上传整个 dev-ops 后在云服务器执行脚本安装;`docker-compose -f docker-compose.yml up -d`

## 二、配置主题

登录 RabbitMQ 管理后台:[http://127.0.0.1:15672/#/](http://127.0.0.1:15672/#/) - `账密:admin/admin` 

<div align="center">
    <img src="./docs/images/roadmap-rabbitmq-03.png?raw=true" width="750px">
</div>

进入到后台以后,先如图配置个主题消息,后面会使用到这个主题发送和监听消息信息。

## 三、测试案例

### 1. yml 配置

**文件**`application-dev.yml`

```yml
spring:
  # RabbitMQ 配置
  rabbitmq:
    addresses: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    listener:
      simple:
        prefetch: 1 # 每次投递n个消息,消费完在投递n个
```

- 测试前,需要在工程中添加 RabbitMQ 连接配置信息。
- prefetch 是消息投递的数量,实际场景可以适当配置的大一些。

### 2. 消费配置

进入到 `xfg-dev-tech-trigger` 是监听 MQ 消息的地方。

#### 2.1 普通消息

```java
@Slf4j
@Component
public class Customer {

    /**
     * queuesToDeclare:支持多个队列,将队列绑定到默认交换机上,routeKey为队列名称。
     *
     * @param msg 接收到的消息
     */
    @RabbitListener(queuesToDeclare = @Queue(value = "testQueue"))
    public void listener(String msg) {
        log.info("接收消息:{}", msg);
        // 通过抛异常,验证消息重试
//        throw new RuntimeException("Err");
    }

}
```

- 异常可以随着你的测试开启,开启后会接收到重试的消息。

#### 2.2 广播消息

```java
@Slf4j
@Component
public class FanoutCustomer {

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "fanoutCustomer"),
                    exchange = @Exchange(
                            value = "fanoutExchange",
                            type = ExchangeTypes.FANOUT
                    )
            )
    )
    public void listener(String msg) {
        log.info("接收消息【广播模式】:{}", msg);
    }

}
```

- 广播模式,所有的消费放都监听到消息。

#### 2.3 路由消息

```java
@Slf4j
@Component
public class RouteCustomer {

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "routeQueue1"),
                    exchange = @Exchange(value = "routeExchange", type = ExchangeTypes.DIRECT),
                    key = "routeKey1"
            )
    )
    public void listener01(String msg) {
        log.info("接收消息【路由模式】:{}", msg);
    }

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "routeQueue2"),
                    exchange = @Exchange(value = "routeExchange", type = ExchangeTypes.DIRECT),
                    key = "routeKey2"
            )
    )
    public void listener02(String msg) {
        log.info("接收消息【路由模式】:{}", msg);
    }

}
```

- 路由模式,会根据实际发送消息时候路由选择配置,让指定的消费方接收消息。比如实际场景中有监听订单的消息,但订单有很多种,比如自营、三方以及不同支付渠道,那么可以让不同的监听者只收取自己的消息信息。

#### 2.3 通配符消息

```java
@Slf4j
@Component
public class TopicCustomer {

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "topicQueue1"),
                    exchange = @Exchange(value = "topicExchange", type = ExchangeTypes.TOPIC),
                    key = "topic.*" // `*`:匹配一个单词,就只有一个单词
            )
    )
    public void listener01(String msg) {
        log.info("接收消息【通配符模式】listener01:{}", msg);
    }

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "topicQueue2"),
                    exchange = @Exchange(value = "topicExchange", type = ExchangeTypes.TOPIC),
                    key = "topic.#" // `#`:匹配一个或多个词
            )
    )
    public void listener02(String msg) {
        log.info("接收消息【通配符模式】listener02:{}", msg);
    }

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "topicQueue3"),
                    exchange = @Exchange(value = "topicExchange", type = ExchangeTypes.TOPIC),
                    key = "topic.y.#" // `#`:匹配一个或多个词
            )
    )
    public void listener03(String msg) {
        log.info("接收消息【通配符模式】listener03:{}", msg);
    }

}
```

- 通配符可以起到过滤的作用,比如在实际场景中,你需要根据过往mq的类型,做部分的监听。那么可以根据通配符配置来搞定。

## 四、测试验证

```java
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApiTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test_product() throws InterruptedException {
        // 发送消息
        rabbitTemplate.convertAndSend("testQueue", "基本消息");
        // 等待
        new CountDownLatch(1).await();
    }

    @Test
    public void test_product_fanout() throws InterruptedException {
        rabbitTemplate.convertAndSend("fanoutExchange", "", "广播消息");
        // 等待
        new CountDownLatch(1).await();
    }

    @Test
    public void test_product_route() throws InterruptedException {
        rabbitTemplate.convertAndSend("routeExchange", "routeKey1", "路由模式,消息1");
        rabbitTemplate.convertAndSend("routeExchange", "routeKey2", "路由模式,消息2");
        // 等待
        new CountDownLatch(1).await();
    }

    @Test
    public void test_product_topic() throws InterruptedException {
        rabbitTemplate.convertAndSend("topicExchange", "topic.x", "通配符模式,消息1");
        rabbitTemplate.convertAndSend("topicExchange", "topic.y.z", "通配符模式,消息2");
        // 等待
        new CountDownLatch(1).await();
    }

}
```

```java
22:29:46.792 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  Customer               - 接收消息基本消息
22:30:40.525 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] INFO  FanoutCustomer         - 接收消息广播模式】:广播消息
22:31:27.117 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#3-1] INFO  RouteCustomer          - 接收消息路由模式】:路由模式消息2
22:31:27.117 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#2-1] INFO  RouteCustomer          - 接收消息路由模式】:路由模式消息1
10:32:08.359 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#6-1] INFO  TopicCustomer          - 接收消息通配符模式listener03通配符模式消息2
10:32:08.359 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#4-1] INFO  TopicCustomer          - 接收消息通配符模式listener01通配符模式消息1
10:32:08.359 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#5-1] INFO  TopicCustomer          - 接收消息通配符模式listener02通配符模式消息1
10:32:08.372 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#5-1] INFO  TopicCustomer          - 接收消息通配符模式listener02通配符模式消息2
```

- 以上案例,分别测试;基本消息、广播消息、路由消息、通配符消息。