messaging.md 38.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931
# 消息传递

Spring 框架为与消息传递系统集成提供了广泛的支持,从使用`JmsTemplate`的 JMS API 的简化使用到异步接收消息的完整基础设施。 Spring AMQP 为高级消息队列协议提供了类似的功能集。 Spring Boot 还提供了`RabbitTemplate`和 RabbitMQ 的自动配置选项。 Spring  WebSocket 原生地包括对 Stomp 消息传递的支持,并且 Spring 引导通过启动器和少量自动配置对此提供支持。 Spring Boot 还支持 Apache Kafka。

## 1. JMS

`javax.jms.ConnectionFactory`接口提供了一种标准方法,用于创建用于与 JMS 代理交互的`javax.jms.Connection`。尽管 Spring 需要`ConnectionFactory`来处理 JMS,但你通常不需要自己直接使用它,而是可以依赖于更高级别的消息传递抽象。(有关详细信息,请参见 Spring Framework 参考文档的[相关部分](https://docs.spring.io/spring-framework/docs/5.3.16/reference/html/integration.html#jms)。) Spring Boot 还自动配置发送和接收消息所需的基础设施。

### 1.1.ActiveMQ 支持

[ActiveMQ](https://activemq.apache.org/)在 Classpath 上可用时, Spring 引导也可以配置`ConnectionFactory`。如果存在代理,则会自动启动并配置嵌入式代理(前提是通过配置未指定代理 URL,并且在配置中未禁用嵌入式代理)。

|   |如果使用`spring-boot-starter-activemq`,则提供了连接或嵌入 ActiveMQ 实例所需的依赖关系, Spring 与 JMS 集成的基础设施也是如此。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

ActiveMQ 配置由`spring.activemq.*`中的外部配置属性控制。

默认情况下,ActiveMQ 被自动配置为使用[VM transport](https://activemq.apache.org/vm-transport-reference.html),它启动嵌入在同一 JVM 实例中的代理。

可以通过配置`spring.activemq.in-memory`属性禁用嵌入式代理,如以下示例所示:

属性

```
spring.activemq.in-memory=false
```

Yaml

```
spring:
  activemq:
    in-memory: false
```

如果你配置代理 URL,那么嵌入式代理也将被禁用,如以下示例所示:

属性

```
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
```

Yaml

```
spring:
  activemq:
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"
```

如果你想完全控制嵌入式代理,请参阅[ActiveMQ 文档](https://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html)以获取更多信息。

默认情况下,`CachingConnectionFactory`将本机`ConnectionFactory`包装成合理的设置,你可以通过`spring.jms.*`中的外部配置属性来控制这些设置:

属性

```
spring.jms.cache.session-cache-size=5
```

Yaml

```
spring:
  jms:
    cache:
      session-cache-size: 5
```

如果你更愿意使用本机池,那么可以通过向`org.messaginghub:pooled-jms`添加一个依赖项并相应地配置`JmsPoolConnectionFactory`来实现,如下面的示例所示:

属性

```
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
```

Yaml

```
spring:
  activemq:
    pool:
      enabled: true
      max-connections: 50
```

|   |有关更多支持的选项,请参见[“ActiveMQ属性”](https://github.com/spring-projects/spring-boot/tree/v2.6.4/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/jms/activemq/ActiveMQ属性.java)<br/>你还可以注册任意数量的实现`ActiveMQConnectionFactoryCustomizer`的 bean,以获得更高级的自定义。|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

默认情况下,ActiveMQ 会创建一个尚不存在的目标,以便根据其提供的名称对目标进行解析。

### 1.2.ActiveMQ Artemis 支持

Spring 当在 Classpath 上检测到[ActiveMQ Artemis](https://activemq.apache.org/components/artemis/)可用时,启动可以自动配置`ConnectionFactory`。如果存在代理,则会自动启动和配置嵌入式代理(除非已显式设置了 Mode 属性)。支持的模式是`embedded`(明确表示需要嵌入式代理,并且如果代理在 Classpath 上不可用,则应该发生错误)和`native`(使用`netty`传输协议连接到代理)。当对后者进行配置时, Spring 引导将配置一个`ConnectionFactory`,该代理连接到使用默认设置在本地计算机上运行的代理。

|   |如果使用`spring-boot-starter-artemis`,则提供了连接到现有 ActiveMQ Artemis 实例的必要依赖项,以及 Spring 与 JMS 集成的基础设施。<br/>`org.apache.activemq:artemis-jms-server`添加到应用程序中,可以使用嵌入式模式。|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

ActiveMQ Artemis 配置由`spring.artemis.*`中的外部配置属性控制。例如,你可以在`application.properties`中声明以下部分:

属性

```
spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
```

Yaml

```
spring:
  artemis:
    mode: native
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"
```

嵌入代理时,你可以选择是否要启用持久性,并列出应该使其可用的目标。它们可以指定为逗号分隔的列表,以使用默认选项创建它们,或者你可以分别为高级队列和主题配置定义类型`org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration``org.apache.activemq.artemis.jms.server.config.TopicConfiguration`的 Bean(s)。

默认情况下,`CachingConnectionFactory`将本机`ConnectionFactory`包装成合理的设置,你可以通过`spring.jms.*`中的外部配置属性来控制这些设置:

属性

```
spring.jms.cache.session-cache-size=5
```

Yaml

```
spring:
  jms:
    cache:
      session-cache-size: 5
```

如果你更愿意使用本机池,那么可以通过向`org.messaginghub:pooled-jms`添加一个依赖项并相应地配置`JmsPoolConnectionFactory`来实现,如下面的示例所示:

属性

```
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
```

Yaml

```
spring:
  artemis:
    pool:
      enabled: true
      max-connections: 50
```

有关更多支持的选项,请参见[` 青蒿素特性’](https://github.com/spring-projects/spring-boot/tree/v2.6.4/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/jms/artemis/Artemis属性.java)

不涉及 JNDI 查找,并且使用 Artemis 配置中的`name`属性或通过配置提供的名称,根据它们的名称解析目标。

### 1.3.使用 JNDI ConnectionFactory

如果你在应用程序服务器中运行你的应用程序, Spring 引导将尝试通过使用 JNDI 来定位 JMS`ConnectionFactory`。默认情况下,将检查`java:/JmsXA``java:/XAConnectionFactory`位置。如果需要指定替代位置,可以使用`spring.jms.jndi-name`属性,如下例所示:

属性

```
spring.jms.jndi-name=java:/MyConnectionFactory
```

Yaml

```
spring:
  jms:
    jndi-name: "java:/MyConnectionFactory"
```

### 1.4.发送消息

Spring 的`JmsTemplate`是自动配置的,你可以直接将其自动连接到自己的 bean 中,如以下示例所示:

```
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final JmsTemplate jmsTemplate;

    public MyBean(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    // ...

    public void someMethod() {
        this.jmsTemplate.convertAndSend("hello");
    }

}

```

|   |[jmsmessagingtemplate’](https://docs.spring.io/spring-framework/docs/5.3.16/javadoc-api/org/springframework/jms/core/JmsMessagingTemplate.html)可以以类似的方式注入。<br/>如果定义了一个`DestinationResolver`或一个`MessageConverter` Bean,则自动将其关联到自动配置的`JmsTemplate`。|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

### 1.5.接收消息

当存在 JMS 基础设施时,任何 Bean 都可以用`@JmsListener`进行注释,以创建侦听器端点。如果没有`JmsListenerContainerFactory`被定义,一个默认的是自动配置的。如果定义了`DestinationResolver``MessageConverter``javax.jms.ExceptionListener`bean,则它们将自动与默认工厂关联。

默认情况下,默认工厂是事务性的。如果在存在`JtaTransactionManager`的基础设施中运行,则默认情况下它与侦听器容器关联。如果没有,则启用`sessionTransacted`标志。在后一种场景中,可以通过在侦听器方法(或其委托)上添加`@Transactional`,将本地数据存储事务与传入消息的处理相关联。这可以确保在本地事务完成后确认传入消息。这还包括发送在同一 JMS 会话上执行的响应消息。

以下组件在`someQueue`目标上创建一个侦听器端点:

```
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @JmsListener(destination = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}

```

|   |有关更多详细信息,请参见[the Javadoc of `@EnableJms`](https://docs.spring.io/spring-framework/docs/5.3.16/javadoc-api/org/springframework/jms/annotation/EnableJms.html)。|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|

如果你需要创建更多的`JmsListenerContainerFactory`实例,或者如果你想要重写默认值, Spring boot 提供一个`DefaultJmsListenerContainerFactoryConfigurer`,你可以使用它初始化`DefaultJmsListenerContainerFactory`,并使用与自动配置的设置相同的设置。

例如,下面的示例公开了另一个使用特定`MessageConverter`的工厂:

```
import javax.jms.ConnectionFactory;

import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {

    @Bean
    public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }

    private ConnectionFactory getCustomConnectionFactory() {
        return ...
    }

}

```

然后,你可以在任何`@JmsListener`-注释的方法中使用工厂,如下所示:

```
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @JmsListener(destination = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }

}

```

## 2. AMQP

高级消息队列协议是一种面向消息中间件的平台中立、线路级协议。 Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。 Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了几种便利,包括`spring-boot-starter-amqp`“starter”。

### 2.1.RabbitMQ 支持

[RabbitMQ](https://www.rabbitmq.com/)是一种基于 AMQP 协议的轻量级、可靠、可扩展和可移植的消息代理。 Spring 使用`RabbitMQ`通过 AMQP 协议进行通信。

RabbitMQ 配置由`spring.rabbitmq.*`中的外部配置属性控制。例如,你可以在`application.properties`中声明以下部分:

属性

```
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
```

Yaml

```
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"
```

或者,你可以使用`addresses`属性配置相同的连接:

属性

```
spring.rabbitmq.addresses=amqp://admin:[email protected]
```

Yaml

```
spring:
  rabbitmq:
    addresses: "amqp://admin:[email protected]"
```

|   |当以这种方式指定地址时,`host``port`属性将被忽略。<br/>如果地址使用`amqps`协议,则将自动启用 SSL 支持。|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|

有关更多支持的基于属性的配置选项,请参见[`Rabbit属性’](https://github.com/spring-projects/spring-boot/tree/v2.6.4/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/Rabbit属性.java)。要配置 Spring AMQP 使用的 RabbitMQ`ConnectionFactory`的较低级别的详细信息,请定义`ConnectionFactoryCustomizer` Bean。

如果上下文中存在`ConnectionNameStrategy` Bean,则将自动使用它来命名由自动配置的`CachingConnectionFactory`创建的连接。

|   |有关更多详细信息,请参见[理解 RabbitMQ 使用的协议 AMQP](https://spring.io/blog/2010/06/14/understanding-amqp-the-protocol-used-by-rabbitmq/)。|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------|

### 2.2.发送消息

Spring 的`AmqpTemplate``AmqpAdmin`是自动配置的,你可以将它们直接自动连接到自己的 bean 中,如以下示例所示:

```
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final AmqpAdmin amqpAdmin;

    private final AmqpTemplate amqpTemplate;

    public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
        this.amqpAdmin = amqpAdmin;
        this.amqpTemplate = amqpTemplate;
    }

    // ...

    public void someMethod() {
        this.amqpAdmin.getQueueInfo("someQueue");
    }

    public void someOtherMethod() {
        this.amqpTemplate.convertAndSend("hello");
    }

}

```

|   |[RabbitMessagingTemplate](https://docs.spring.io/spring-amqp/docs/2.4.2/api/org/springframework/amqp/rabbit/core/RabbitMessagingTemplate.html)可以以类似的方式注入。<br/>如果定义了`MessageConverter` Bean,则自动将其关联到自动配置的`AmqpTemplate`。|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

如果有必要,任何被定义为 Bean 的`org.springframework.amqp.core.Queue`都会自动用于在 RabbitMQ 实例上声明相应的队列。

要重试操作,你可以在`AmqpTemplate`上启用重试(例如,在代理连接丢失的情况下):

属性

```
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
```

Yaml

```
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"
```

默认情况下,重试是禁用的。你还可以通过声明一个`RabbitRetryTemplateCustomizer` Bean 以编程方式定制`RetryTemplate`

如果你需要创建更多的`RabbitTemplate`实例,或者如果你想要重写默认设置, Spring boot 提供了一个`RabbitTemplateConfigurer` Bean,你可以使用它初始化`RabbitTemplate`,并使用与自动配置所使用的工厂相同的设置。

### 2.3.接收消息

当 Rabbit 基础结构存在时,任何 Bean 都可以用`@RabbitListener`进行注释,以创建侦听器端点。如果没有`RabbitListenerContainerFactory`已被定义,一个默认的`SimpleRabbitListenerContainerFactory`将被自动配置,并且你可以使用`spring.rabbitmq.listener.type`属性切换到一个直接容器。如果定义了`MessageConverter``MessageRecoverer` Bean,则会自动与默认工厂关联。

以下示例组件在`someQueue`队列上创建一个侦听器端点:

```
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}

```

|   |有关更多详细信息,请参见[the Javadoc of `@EnableRabbit`](https://docs.spring.io/spring-amqp/docs/2.4.2/api/org/springframework/amqp/rabbit/annotation/EnableRabbit.html)。|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|

如果你需要创建更多的`RabbitListenerContainerFactory`实例,或者你想要重写默认值, Spring Boot 提供了一个`SimpleRabbitListenerContainerFactoryConfigurer`和一个`DirectRabbitListenerContainerFactoryConfigurer`,你可以使用它们初始化一个`SimpleRabbitListenerContainerFactory`和一个`DirectRabbitListenerContainerFactory`,其设置与自动配置所使用的工厂相同。

|   |选择哪种容器类型并不重要。<br/>这两个 bean 是通过自动配置公开的。|
|---|-------------------------------------------------------------------------------------------------------------|

例如,下面的配置类公开了另一个使用特定`MessageConverter`的工厂:

```
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

    @Bean
    public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }

    private ConnectionFactory getCustomConnectionFactory() {
        return ...
    }

}

```

然后,你可以在任何`@RabbitListener`-注释的方法中使用工厂,如下所示:

```
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }

}

```

你可以启用重试来处理侦听器抛出异常的情况。默认情况下,使用`RejectAndDontRequeueRecoverer`,但你可以定义自己的`MessageRecoverer`。当重复尝试用完时,消息将被拒绝,如果代理配置为这样做,消息将被丢弃或路由到死信交换。默认情况下,重试是禁用的。你还可以通过声明一个`RabbitRetryTemplateCustomizer` Bean 以编程方式定制`RetryTemplate`

|   |默认情况下,如果禁用了重试,并且侦听器抛出了异常,交付将无限期地重试。<br/>你可以通过两种方式修改此行为:将`defaultRequeueRejected`属性设置为`false`所以尝试进行零次重新交付或抛出`AmqpRejectAndDontRequeueException`以表示消息应被拒绝。<br/>后者是在启用重试并达到最大交付尝试次数时使用的机制。|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

## 3. Apache Kafka 支持

[Apache Kafka](https://kafka.apache.org/)通过提供`spring-kafka`项目的自动配置而得到支持。

Kafka 配置由`spring.kafka.*`中的外部配置属性控制。例如,你可以在`application.properties`中声明以下部分:

属性

```
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
```

Yaml

```
spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"
```

|   |要在启动时创建一个主题,请添加类型`NewTopic`的 Bean。<br/>如果该主题已经存在,则忽略 Bean。|
|---|------------------------------------------------------------------------------------------------------------------|

有关更多支持的选项,请参见[Kafkaproperties’](https://github.com/spring-projects/spring-boot/tree/v2.6.4/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/Kafka属性.java)

### 3.1.发送消息

Spring 的`KafkaTemplate`是自动配置的,你可以直接在自己的 bean 中自动连接它,如以下示例所示:

```
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

    public void someMethod() {
        this.kafkaTemplate.send("someTopic", "Hello");
    }

}

```

|   |如果定义了属性`spring.kafka.producer.transaction-id-prefix`,则自动配置一个`KafkaTransactionManager`<br/>同样,如果定义了一个`RecordMessageConverter` Bean,则自动将其关联到自动配置的`KafkaTemplate`。|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

### 3.2.接收消息

当存在 Apache Kafka 基础设施时,任何 Bean 都可以用`@KafkaListener`进行注释,以创建侦听器端点。如果没有`KafkaListenerContainerFactory`被定义,默认的一个是自动配置与`spring.kafka.listener.*`中定义的键。

以下组件在`someTopic`主题上创建一个侦听器端点:

```
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @KafkaListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }

}

```

如果定义了`KafkaTransactionManager` Bean,则自动将其关联到容器工厂。类似地,如果定义了 a`RecordFilterStrategy``CommonErrorHandler``AfterRollbackProcessor``ConsumerAwareRebalanceListener` Bean,则自动将其与默认工厂关联。

根据侦听器类型,与默认工厂关联的是`RecordMessageConverter``BatchMessageConverter` Bean。如果批处理侦听器只存在一个`RecordMessageConverter` Bean,则将其包装在`BatchMessageConverter`中。

|   |自定义`ChainedKafkaTransactionManager`必须标记`@Primary`,因为它通常引用自动配置的`KafkaTransactionManager` Bean。|
|---|------------------------------------------------------------------------------------------------------------------------------------------------|

### 3.3.卡夫卡溪流

Spring 对于 Apache,Kafka 提供了一个工厂 Bean 来创建`StreamsBuilder`对象并管理其流的生命周期。 Spring 启动自动配置所需的`KafkaStreamsConfiguration` Bean,只要`kafka-streams`在 Classpath 上并且 Kafka 流是由`@EnableKafkaStreams`注释启用的。

启用 Kafka 流意味着必须设置应用程序 ID 和引导程序服务器。前者可以使用`spring.kafka.streams.application-id`进行配置,如果没有设置,则默认为`spring.application.name`。后者可以全局设置,也可以只针对流专门重写。

使用专用属性可以获得几个附加属性;可以使用`spring.kafka.streams.properties`名称空间设置其他任意 Kafka 属性。有关更多信息,请参见[features.html](features.html#messaging.kafka.additional-properties)

要使用工厂 Bean,将`StreamsBuilder`连接到你的`@Bean`中,如以下示例所示:

```
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }

    private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
        return new KeyValue<>(key, value.toUpperCase());
    }

}

```

默认情况下,由它创建的`StreamBuilder`对象管理的流是自动启动的。你可以使用`spring.kafka.streams.auto-startup`属性自定义此行为。

### 3.4.其他卡夫卡物业

Auto Configuration 支持的属性显示在附录的[“集成属性”](application-properties.html#appendix.application-properties.integration)部分中。请注意,在大多数情况下,这些属性(连字符或 CamelCase)直接映射到 Apache Kafka 点线属性。有关详细信息,请参见 Apache Kafka 文档。

这些属性中的前几个应用于所有组件(生产者、消费者、管理员和流),但如果你希望使用不同的值,则可以在组件级别指定这些属性。Apache Kafka 将属性的重要性指定为高、中或低。 Spring 引导自动配置支持所有高重要性属性、一些选定的中低属性以及不具有默认值的任何属性。

只有 Kafka 支持的属性的一个子集可以直接通过`Kafka属性`类获得。如果你希望为生产者或消费者配置不受直接支持的附加属性,请使用以下属性:

属性

```
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
```

Yaml

```
spring:
  kafka:
    properties:
      "[prop.one]": "first"
    admin:
      properties:
        "[prop.two]": "second"
    consumer:
      properties:
        "[prop.three]": "third"
    producer:
      properties:
        "[prop.four]": "fourth"
    streams:
      properties:
        "[prop.five]": "fifth"
```

这将普通的`prop.one`Kafka 属性设置为`first`(适用于生产者、消费者和管理员),`prop.two`管理属性设置为`second``prop.three`消费者属性设置为`third``prop.four`生产者属性设置为`fourth``prop.five`流属性设置为`fifth`

还可以将 Spring kafka`JsonDeserializer`配置如下:

属性

```
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
```

Yaml

```
spring:
  kafka:
    consumer:
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        "[spring.json.value.default.type]": "com.example.Invoice"
        "[spring.json.trusted.packages]": "com.example.main,com.example.another"
```

类似地,你可以禁用`JsonSerializer`在标题中发送类型信息的默认行为:

Properties

```
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
```

Yaml

```
spring:
  kafka:
    producer:
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
      properties:
        "[spring.json.add.type.headers]": false
```

|   |以这种方式设置的属性将覆盖 Spring 引导明确支持的任何配置项。|
|---|------------------------------------------------------------------------------------------------|

### 3.5.嵌入式 Kafka 测试

Spring 对于 Apache Kafka 提供了一种方便的方式来测试带有嵌入式 Apache Kafka 代理的项目。要使用此功能,请使用`spring-kafka-test`模块中的`@EmbeddedKafka`对测试类进行注释。有关更多信息,请参见 Apache Kafka[参考手册](https://docs.spring.io/spring-kafka/docs/2.8.3/reference/html/#embedded-kafka-annotation)的 Spring。

要使 Spring 引导自动配置与前面提到的嵌入式 Apache Kafka 代理一起工作,你需要将嵌入式代理地址(由`EmbeddedKafkaBroker`填充)的系统属性重新映射到 Apache Kafka 的 Spring 引导配置属性中。有几种方法可以做到这一点:

* 提供一个系统属性,将嵌入式代理地址映射到测试类中的`spring.kafka.bootstrap-servers`:

```
static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}

```

*`@EmbeddedKafka`注释上配置属性名:

```
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

    // ...

}

```

* 在配置属性中使用占位符:

Properties

```
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
```

Yaml

```
spring:
  kafka:
    bootstrap-servers: "${spring.embedded.kafka.brokers}"
```

## 4. RSocket

[RSocket](https://rsocket.io)是一种用于字节流传输的二进制协议。它支持通过在单个连接上传递异步消息的对称交互模型。

Spring 框架的`spring-messaging`模块在客户端和服务器端为 RSocket 请求者和响应者提供支持。有关更多详细信息,请参见 Spring 框架引用的[RSocket section](https://docs.spring.io/spring-framework/docs/5.3.16/reference/html/web-reactive.html#rsocket-spring),包括 RSocket 协议的概述。

### 4.1.RSocket 策略自动配置

Spring 引导自动配置`RSocketStrategies` Bean,该配置提供了编码和解码 RSocket 有效负载所需的所有基础设施。默认情况下,自动配置将尝试配置以下内容(按顺序):

1. [CBOR](https://cbor.io/)带 Jackson 的编解码器

2. 具有 Jackson 的 JSON 编解码器

`spring-boot-starter-rsocket`启动器提供了这两个依赖项。请参阅[Jackson 支助科](features.html#features.json.jackson)以了解有关自定义可能性的更多信息。

开发人员可以通过创建实现`RSocketStrategiesCustomizer`接口的 bean 来定制`RSocketStrategies`组件。请注意,它们的`@Order`很重要,因为它决定了编解码器的顺序。

### 4.2.RSocket 服务器自动配置

Spring 启动提供了 RSocket 服务器的自动配置。所需的依赖关系由`spring-boot-starter-rsocket`提供。

Spring 启动允许在 WebSocket 上从 WebFlux 服务器公开 RSocket,或者在独立的 RSocket 服务器上公开 RSocket。这取决于应用程序的类型及其配置。

对于 WebFlux 应用程序(类型为`WebApplicationType.REACTIVE`),只有在以下属性匹配的情况下,才会将 RSocket 服务器插入到 Web 服务器中:

Properties

```
spring.rsocket.server.mapping-path=/rsocket
spring.rsocket.server.transport=websocket
```

Yaml

```
spring:
  rsocket:
    server:
      mapping-path: "/rsocket"
      transport: "websocket"
```

|   |将 RSocket 插入到 Web 服务器中只受 Reactor Netty 的支持,因为 RSocket 本身是用该库构建的。|
|---|----------------------------------------------------------------------------------------------------------------------|

或者,RSocket TCP 或 WebSocket 服务器作为独立的嵌入式服务器启动。除了依赖关系需求外,唯一需要的配置是为该服务器定义一个端口:

Properties

```
spring.rsocket.server.port=9898
```

Yaml

```
spring:
  rsocket:
    server:
      port: 9898
```

### 4.3. Spring 消息 RSocket 支持

Spring 启动将自动为 RSocket 配置 Spring 消息传递基础设施。

这意味着 Spring 引导将创建一个`RSocketMessageHandler` Bean,它将处理对应用程序的 RSocket 请求。

### 4.4.使用 RSocketRequester 调用 RSocket 服务

一旦在服务器和客户端之间建立了`RSocket`通道,任何一方都可以向另一方发送或接收请求。

作为服务器,你可以在 RSocket`@Controller`的任何处理程序方法上插入`RSocketRequester`实例。作为客户机,你需要首先配置和建立一个 RSocket 连接。 Spring 对于这种情况,引导自动配置带有预期编解码器的`RSocketRequester.Builder`,并应用任何`RSocketConnectorConfigurer` Bean。

`RSocketRequester.Builder`实例是一个原型 Bean,这意味着每个注入点都将为你提供一个新实例。这是有目的的,因为这个构建器是有状态的,你不应该使用相同的实例创建具有不同设置的请求者。

下面的代码展示了一个典型的示例:

```
import reactor.core.publisher.Mono;

import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Service;

@Service
public class MyService {

    private final RSocketRequester rsocketRequester;

    public MyService(RSocketRequester.Builder rsocketRequesterBuilder) {
        this.rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898);
    }

    public Mono<User> someRSocketCall(String name) {
        return this.rsocketRequester.route("user").data(name).retrieveMono(User.class);
    }

}

```

## 5. Spring 整合

Spring Boot 为使用[Spring Integration](https://spring.io/projects/spring-integration)提供了几种便利,包括`spring-boot-starter-integration`“starter”。 Spring 集成在消息传递和其他传输(例如 HTTP、TCP 和其他传输)上提供了抽象。如果 Spring 集成在你的 Classpath 上可用,则通过`@EnableIntegration`注释对其进行初始化。

Spring 集成轮询逻辑依赖于[on the auto-configured `TaskScheduler`](features.html#features.task-execution-and-scheduling)。默认的`PollerMetadata`(每秒轮询无界的消息数量)可以使用`spring.integration.poller.*`配置属性进行定制。

Spring 引导还配置了由存在额外的 Spring 集成模块触发的一些特征。如果`spring-integration-jmx`也在 Classpath 上,则消息处理统计信息将在 JMX 上发布。如果`spring-integration-jdbc`可用,则可以在启动时创建默认的数据库模式,如下所示:

Properties

```
spring.integration.jdbc.initialize-schema=always
```

Yaml

```
spring:
  integration:
    jdbc:
      initialize-schema: "always"
```

如果`spring-integration-rsocket`可用,开发人员可以使用`"spring.rsocket.server.*"`属性配置 RSocket 服务器,并让它使用`IntegrationRSocketEndpoint``RSocketOutboundGateway`组件来处理传入的 RSocket 消息。该基础结构可以处理 Spring 集成 RSocket 通道适配器和`@MessageMapping`处理程序(已配置`"spring.integration.rsocket.server.message-mapping-enabled"`)。

Spring 启动还可以使用配置属性自动配置`ClientRSocketConnector`:

Properties

```
# Connecting to a RSocket server over TCP
spring.integration.rsocket.client.host=example.org
spring.integration.rsocket.client.port=9898
```

Yaml

```
# Connecting to a RSocket server over TCP
spring:
  integration:
    rsocket:
      client:
        host: "example.org"
        port: 9898
```

Properties

```
# Connecting to a RSocket Server over WebSocket
spring.integration.rsocket.client.uri=ws://example.org
```

Yaml

```
# Connecting to a RSocket Server over WebSocket
spring:
  integration:
    rsocket:
      client:
        uri: "ws://example.org"
```

有关更多详细信息,请参见[“IntegrationAutoConfiguration”](https://github.com/spring-projects/spring-boot/tree/v2.6.4/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java)[“整合属性”](https://github.com/spring-projects/spring-boot/tree/v2.6.4/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java)类。

## 6. 接下来读什么?

下一节介绍如何在应用程序中启用[IO capabilities](io.html#io)。你可以在本节中阅读有关[caching](io.html#io.caching)[mail](io.html#io.email)[validation](io.html#io.validation)[rest clients](io.html#io.rest-client)等内容。