# 消息传递 Spring 框架为与消息传递系统集成提供了广泛的支持,从使用`JmsTemplate`的 JMS API 的简化使用到异步接收消息的完整基础设施。 Spring AMQP 为高级消息队列协议提供了类似的功能集。 Spring Boot 还提供了`RabbitTemplate`和 RabbitMQ 的自动配置选项。 Spring WebSocket 原生地包括对 Stomp 消息传递的支持,并且 Spring 引导通过启动器和少量自动配置对此提供支持。 Spring Boot 还支持 Apache Kafka。 ## 1. JMS `javax.jms.ConnectionFactory`接口提供了一种标准方法,用于创建用于与 JMS 代理交互的`javax.jms.Connection`。尽管 Spring 需要`ConnectionFactory`来处理 JMS,但你通常不需要自己直接使用它,而是可以依赖于更高级别的消息传递抽象。(有关详细信息,请参见 Spring Framework 参考文档的[相关部分](https://docs.spring.io/spring-framework/docs/5.3.16/reference/html/integration.html#jms)。) Spring Boot 还自动配置发送和接收消息所需的基础设施。 ### 1.1.ActiveMQ 支持 当[ActiveMQ](https://activemq.apache.org/)在 Classpath 上可用时, Spring 引导也可以配置`ConnectionFactory`。如果存在代理,则会自动启动并配置嵌入式代理(前提是通过配置未指定代理 URL,并且在配置中未禁用嵌入式代理)。 | |如果使用`spring-boot-starter-activemq`,则提供了连接或嵌入 ActiveMQ 实例所需的依赖关系, Spring 与 JMS 集成的基础设施也是如此。| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ActiveMQ 配置由`spring.activemq.*`中的外部配置属性控制。 默认情况下,ActiveMQ 被自动配置为使用[VM transport](https://activemq.apache.org/vm-transport-reference.html),它启动嵌入在同一 JVM 实例中的代理。 可以通过配置`spring.activemq.in-memory`属性禁用嵌入式代理,如以下示例所示: 属性 ``` spring.activemq.in-memory=false ``` Yaml ``` spring: activemq: in-memory: false ``` 如果你配置代理 URL,那么嵌入式代理也将被禁用,如以下示例所示: 属性 ``` spring.activemq.broker-url=tcp://192.168.1.210:9876 spring.activemq.user=admin spring.activemq.password=secret ``` Yaml ``` spring: activemq: broker-url: "tcp://192.168.1.210:9876" user: "admin" password: "secret" ``` 如果你想完全控制嵌入式代理,请参阅[ActiveMQ 文档](https://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html)以获取更多信息。 默认情况下,`CachingConnectionFactory`将本机`ConnectionFactory`包装成合理的设置,你可以通过`spring.jms.*`中的外部配置属性来控制这些设置: 属性 ``` spring.jms.cache.session-cache-size=5 ``` Yaml ``` spring: jms: cache: session-cache-size: 5 ``` 如果你更愿意使用本机池,那么可以通过向`org.messaginghub:pooled-jms`添加一个依赖项并相应地配置`JmsPoolConnectionFactory`来实现,如下面的示例所示: 属性 ``` spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=50 ``` Yaml ``` spring: activemq: pool: enabled: true max-connections: 50 ``` | |有关更多支持的选项,请参见[“ActiveMQ属性”](https://github.com/spring-projects/spring-boot/tree/v2.6.4/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/jms/activemq/ActiveMQ属性.java)。
你还可以注册任意数量的实现`ActiveMQConnectionFactoryCustomizer`的 bean,以获得更高级的自定义。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 默认情况下,ActiveMQ 会创建一个尚不存在的目标,以便根据其提供的名称对目标进行解析。 ### 1.2.ActiveMQ Artemis 支持 Spring 当在 Classpath 上检测到[ActiveMQ Artemis](https://activemq.apache.org/components/artemis/)可用时,启动可以自动配置`ConnectionFactory`。如果存在代理,则会自动启动和配置嵌入式代理(除非已显式设置了 Mode 属性)。支持的模式是`embedded`(明确表示需要嵌入式代理,并且如果代理在 Classpath 上不可用,则应该发生错误)和`native`(使用`netty`传输协议连接到代理)。当对后者进行配置时, Spring 引导将配置一个`ConnectionFactory`,该代理连接到使用默认设置在本地计算机上运行的代理。 | |如果使用`spring-boot-starter-artemis`,则提供了连接到现有 ActiveMQ Artemis 实例的必要依赖项,以及 Spring 与 JMS 集成的基础设施。
将`org.apache.activemq:artemis-jms-server`添加到应用程序中,可以使用嵌入式模式。| |---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ActiveMQ Artemis 配置由`spring.artemis.*`中的外部配置属性控制。例如,你可以在`application.properties`中声明以下部分: 属性 ``` spring.artemis.mode=native spring.artemis.broker-url=tcp://192.168.1.210:9876 spring.artemis.user=admin spring.artemis.password=secret ``` Yaml ``` spring: artemis: mode: native broker-url: "tcp://192.168.1.210:9876" user: "admin" password: "secret" ``` 嵌入代理时,你可以选择是否要启用持久性,并列出应该使其可用的目标。它们可以指定为逗号分隔的列表,以使用默认选项创建它们,或者你可以分别为高级队列和主题配置定义类型`org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration`或`org.apache.activemq.artemis.jms.server.config.TopicConfiguration`的 Bean(s)。 默认情况下,`CachingConnectionFactory`将本机`ConnectionFactory`包装成合理的设置,你可以通过`spring.jms.*`中的外部配置属性来控制这些设置: 属性 ``` spring.jms.cache.session-cache-size=5 ``` Yaml ``` spring: jms: cache: session-cache-size: 5 ``` 如果你更愿意使用本机池,那么可以通过向`org.messaginghub:pooled-jms`添加一个依赖项并相应地配置`JmsPoolConnectionFactory`来实现,如下面的示例所示: 属性 ``` spring.artemis.pool.enabled=true spring.artemis.pool.max-connections=50 ``` Yaml ``` spring: artemis: pool: enabled: true max-connections: 50 ``` 有关更多支持的选项,请参见[` 青蒿素特性’](https://github.com/spring-projects/spring-boot/tree/v2.6.4/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/jms/artemis/Artemis属性.java)。 不涉及 JNDI 查找,并且使用 Artemis 配置中的`name`属性或通过配置提供的名称,根据它们的名称解析目标。 ### 1.3.使用 JNDI ConnectionFactory 如果你在应用程序服务器中运行你的应用程序, Spring 引导将尝试通过使用 JNDI 来定位 JMS`ConnectionFactory`。默认情况下,将检查`java:/JmsXA`和`java:/XAConnectionFactory`位置。如果需要指定替代位置,可以使用`spring.jms.jndi-name`属性,如下例所示: 属性 ``` spring.jms.jndi-name=java:/MyConnectionFactory ``` Yaml ``` spring: jms: jndi-name: "java:/MyConnectionFactory" ``` ### 1.4.发送消息 Spring 的`JmsTemplate`是自动配置的,你可以直接将其自动连接到自己的 bean 中,如以下示例所示: ``` import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; @Component public class MyBean { private final JmsTemplate jmsTemplate; public MyBean(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } // ... public void someMethod() { this.jmsTemplate.convertAndSend("hello"); } } ``` | |[jmsmessagingtemplate’](https://docs.spring.io/spring-framework/docs/5.3.16/javadoc-api/org/springframework/jms/core/JmsMessagingTemplate.html)可以以类似的方式注入。
如果定义了一个`DestinationResolver`或一个`MessageConverter` Bean,则自动将其关联到自动配置的`JmsTemplate`。| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ### 1.5.接收消息 当存在 JMS 基础设施时,任何 Bean 都可以用`@JmsListener`进行注释,以创建侦听器端点。如果没有`JmsListenerContainerFactory`被定义,一个默认的是自动配置的。如果定义了`DestinationResolver`、`MessageConverter`或`javax.jms.ExceptionListener`bean,则它们将自动与默认工厂关联。 默认情况下,默认工厂是事务性的。如果在存在`JtaTransactionManager`的基础设施中运行,则默认情况下它与侦听器容器关联。如果没有,则启用`sessionTransacted`标志。在后一种场景中,可以通过在侦听器方法(或其委托)上添加`@Transactional`,将本地数据存储事务与传入消息的处理相关联。这可以确保在本地事务完成后确认传入消息。这还包括发送在同一 JMS 会话上执行的响应消息。 以下组件在`someQueue`目标上创建一个侦听器端点: ``` import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class MyBean { @JmsListener(destination = "someQueue") public void processMessage(String content) { // ... } } ``` | |有关更多详细信息,请参见[the Javadoc of `@EnableJms`](https://docs.spring.io/spring-framework/docs/5.3.16/javadoc-api/org/springframework/jms/annotation/EnableJms.html)。| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------| 如果你需要创建更多的`JmsListenerContainerFactory`实例,或者如果你想要重写默认值, Spring boot 提供一个`DefaultJmsListenerContainerFactoryConfigurer`,你可以使用它初始化`DefaultJmsListenerContainerFactory`,并使用与自动配置的设置相同的设置。 例如,下面的示例公开了另一个使用特定`MessageConverter`的工厂: ``` import javax.jms.ConnectionFactory; import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; @Configuration(proxyBeanMethods = false) public class MyJmsConfiguration { @Bean public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); ConnectionFactory connectionFactory = getCustomConnectionFactory(); configurer.configure(factory, connectionFactory); factory.setMessageConverter(new MyMessageConverter()); return factory; } private ConnectionFactory getCustomConnectionFactory() { return ... } } ``` 然后,你可以在任何`@JmsListener`-注释的方法中使用工厂,如下所示: ``` import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class MyBean { @JmsListener(destination = "someQueue", containerFactory = "myFactory") public void processMessage(String content) { // ... } } ``` ## 2. AMQP 高级消息队列协议是一种面向消息中间件的平台中立、线路级协议。 Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。 Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了几种便利,包括`spring-boot-starter-amqp`“starter”。 ### 2.1.RabbitMQ 支持 [RabbitMQ](https://www.rabbitmq.com/)是一种基于 AMQP 协议的轻量级、可靠、可扩展和可移植的消息代理。 Spring 使用`RabbitMQ`通过 AMQP 协议进行通信。 RabbitMQ 配置由`spring.rabbitmq.*`中的外部配置属性控制。例如,你可以在`application.properties`中声明以下部分: 属性 ``` spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=secret ``` Yaml ``` spring: rabbitmq: host: "localhost" port: 5672 username: "admin" password: "secret" ``` 或者,你可以使用`addresses`属性配置相同的连接: 属性 ``` spring.rabbitmq.addresses=amqp://admin:[email protected] ``` Yaml ``` spring: rabbitmq: addresses: "amqp://admin:[email protected]" ``` | |当以这种方式指定地址时,`host`和`port`属性将被忽略。
如果地址使用`amqps`协议,则将自动启用 SSL 支持。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------| 有关更多支持的基于属性的配置选项,请参见[`Rabbit属性’](https://github.com/spring-projects/spring-boot/tree/v2.6.4/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/Rabbit属性.java)。要配置 Spring AMQP 使用的 RabbitMQ`ConnectionFactory`的较低级别的详细信息,请定义`ConnectionFactoryCustomizer` Bean。 如果上下文中存在`ConnectionNameStrategy` Bean,则将自动使用它来命名由自动配置的`CachingConnectionFactory`创建的连接。 | |有关更多详细信息,请参见[理解 RabbitMQ 使用的协议 AMQP](https://spring.io/blog/2010/06/14/understanding-amqp-the-protocol-used-by-rabbitmq/)。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------| ### 2.2.发送消息 Spring 的`AmqpTemplate`和`AmqpAdmin`是自动配置的,你可以将它们直接自动连接到自己的 bean 中,如以下示例所示: ``` import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.stereotype.Component; @Component public class MyBean { private final AmqpAdmin amqpAdmin; private final AmqpTemplate amqpTemplate; public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) { this.amqpAdmin = amqpAdmin; this.amqpTemplate = amqpTemplate; } // ... public void someMethod() { this.amqpAdmin.getQueueInfo("someQueue"); } public void someOtherMethod() { this.amqpTemplate.convertAndSend("hello"); } } ``` | |[RabbitMessagingTemplate](https://docs.spring.io/spring-amqp/docs/2.4.2/api/org/springframework/amqp/rabbit/core/RabbitMessagingTemplate.html)可以以类似的方式注入。
如果定义了`MessageConverter` Bean,则自动将其关联到自动配置的`AmqpTemplate`。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 如果有必要,任何被定义为 Bean 的`org.springframework.amqp.core.Queue`都会自动用于在 RabbitMQ 实例上声明相应的队列。 要重试操作,你可以在`AmqpTemplate`上启用重试(例如,在代理连接丢失的情况下): 属性 ``` spring.rabbitmq.template.retry.enabled=true spring.rabbitmq.template.retry.initial-interval=2s ``` Yaml ``` spring: rabbitmq: template: retry: enabled: true initial-interval: "2s" ``` 默认情况下,重试是禁用的。你还可以通过声明一个`RabbitRetryTemplateCustomizer` Bean 以编程方式定制`RetryTemplate`。 如果你需要创建更多的`RabbitTemplate`实例,或者如果你想要重写默认设置, Spring boot 提供了一个`RabbitTemplateConfigurer` Bean,你可以使用它初始化`RabbitTemplate`,并使用与自动配置所使用的工厂相同的设置。 ### 2.3.接收消息 当 Rabbit 基础结构存在时,任何 Bean 都可以用`@RabbitListener`进行注释,以创建侦听器端点。如果没有`RabbitListenerContainerFactory`已被定义,一个默认的`SimpleRabbitListenerContainerFactory`将被自动配置,并且你可以使用`spring.rabbitmq.listener.type`属性切换到一个直接容器。如果定义了`MessageConverter`或`MessageRecoverer` Bean,则会自动与默认工厂关联。 以下示例组件在`someQueue`队列上创建一个侦听器端点: ``` import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class MyBean { @RabbitListener(queues = "someQueue") public void processMessage(String content) { // ... } } ``` | |有关更多详细信息,请参见[the Javadoc of `@EnableRabbit`](https://docs.spring.io/spring-amqp/docs/2.4.2/api/org/springframework/amqp/rabbit/annotation/EnableRabbit.html)。| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------| 如果你需要创建更多的`RabbitListenerContainerFactory`实例,或者你想要重写默认值, Spring Boot 提供了一个`SimpleRabbitListenerContainerFactoryConfigurer`和一个`DirectRabbitListenerContainerFactoryConfigurer`,你可以使用它们初始化一个`SimpleRabbitListenerContainerFactory`和一个`DirectRabbitListenerContainerFactory`,其设置与自动配置所使用的工厂相同。 | |选择哪种容器类型并不重要。
这两个 bean 是通过自动配置公开的。| |---|-------------------------------------------------------------------------------------------------------------| 例如,下面的配置类公开了另一个使用特定`MessageConverter`的工厂: ``` import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration(proxyBeanMethods = false) public class MyRabbitConfiguration { @Bean public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); ConnectionFactory connectionFactory = getCustomConnectionFactory(); configurer.configure(factory, connectionFactory); factory.setMessageConverter(new MyMessageConverter()); return factory; } private ConnectionFactory getCustomConnectionFactory() { return ... } } ``` 然后,你可以在任何`@RabbitListener`-注释的方法中使用工厂,如下所示: ``` import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class MyBean { @RabbitListener(queues = "someQueue", containerFactory = "myFactory") public void processMessage(String content) { // ... } } ``` 你可以启用重试来处理侦听器抛出异常的情况。默认情况下,使用`RejectAndDontRequeueRecoverer`,但你可以定义自己的`MessageRecoverer`。当重复尝试用完时,消息将被拒绝,如果代理配置为这样做,消息将被丢弃或路由到死信交换。默认情况下,重试是禁用的。你还可以通过声明一个`RabbitRetryTemplateCustomizer` Bean 以编程方式定制`RetryTemplate`。 | |默认情况下,如果禁用了重试,并且侦听器抛出了异常,交付将无限期地重试。
你可以通过两种方式修改此行为:将`defaultRequeueRejected`属性设置为`false`所以尝试进行零次重新交付或抛出`AmqpRejectAndDontRequeueException`以表示消息应被拒绝。
后者是在启用重试并达到最大交付尝试次数时使用的机制。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ## 3. Apache Kafka 支持 [Apache Kafka](https://kafka.apache.org/)通过提供`spring-kafka`项目的自动配置而得到支持。 Kafka 配置由`spring.kafka.*`中的外部配置属性控制。例如,你可以在`application.properties`中声明以下部分: 属性 ``` spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=myGroup ``` Yaml ``` spring: kafka: bootstrap-servers: "localhost:9092" consumer: group-id: "myGroup" ``` | |要在启动时创建一个主题,请添加类型`NewTopic`的 Bean。
如果该主题已经存在,则忽略 Bean。| |---|------------------------------------------------------------------------------------------------------------------| 有关更多支持的选项,请参见[Kafkaproperties’](https://github.com/spring-projects/spring-boot/tree/v2.6.4/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/Kafka属性.java)。 ### 3.1.发送消息 Spring 的`KafkaTemplate`是自动配置的,你可以直接在自己的 bean 中自动连接它,如以下示例所示: ``` import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @Component public class MyBean { private final KafkaTemplate kafkaTemplate; public MyBean(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } // ... public void someMethod() { this.kafkaTemplate.send("someTopic", "Hello"); } } ``` | |如果定义了属性`spring.kafka.producer.transaction-id-prefix`,则自动配置一个`KafkaTransactionManager`。
同样,如果定义了一个`RecordMessageConverter` Bean,则自动将其关联到自动配置的`KafkaTemplate`。| |---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ### 3.2.接收消息 当存在 Apache Kafka 基础设施时,任何 Bean 都可以用`@KafkaListener`进行注释,以创建侦听器端点。如果没有`KafkaListenerContainerFactory`被定义,默认的一个是自动配置与`spring.kafka.listener.*`中定义的键。 以下组件在`someTopic`主题上创建一个侦听器端点: ``` import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class MyBean { @KafkaListener(topics = "someTopic") public void processMessage(String content) { // ... } } ``` 如果定义了`KafkaTransactionManager` Bean,则自动将其关联到容器工厂。类似地,如果定义了 a`RecordFilterStrategy`、`CommonErrorHandler`、`AfterRollbackProcessor`或`ConsumerAwareRebalanceListener` Bean,则自动将其与默认工厂关联。 根据侦听器类型,与默认工厂关联的是`RecordMessageConverter`或`BatchMessageConverter` Bean。如果批处理侦听器只存在一个`RecordMessageConverter` Bean,则将其包装在`BatchMessageConverter`中。 | |自定义`ChainedKafkaTransactionManager`必须标记`@Primary`,因为它通常引用自动配置的`KafkaTransactionManager` Bean。| |---|------------------------------------------------------------------------------------------------------------------------------------------------| ### 3.3.卡夫卡溪流 Spring 对于 Apache,Kafka 提供了一个工厂 Bean 来创建`StreamsBuilder`对象并管理其流的生命周期。 Spring 启动自动配置所需的`KafkaStreamsConfiguration` Bean,只要`kafka-streams`在 Classpath 上并且 Kafka 流是由`@EnableKafkaStreams`注释启用的。 启用 Kafka 流意味着必须设置应用程序 ID 和引导程序服务器。前者可以使用`spring.kafka.streams.application-id`进行配置,如果没有设置,则默认为`spring.application.name`。后者可以全局设置,也可以只针对流专门重写。 使用专用属性可以获得几个附加属性;可以使用`spring.kafka.streams.properties`名称空间设置其他任意 Kafka 属性。有关更多信息,请参见[features.html](features.html#messaging.kafka.additional-properties)。 要使用工厂 Bean,将`StreamsBuilder`连接到你的`@Bean`中,如以下示例所示: ``` import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.support.serializer.JsonSerde; @Configuration(proxyBeanMethods = false) @EnableKafkaStreams public class MyKafkaStreamsConfiguration { @Bean public KStream kStream(StreamsBuilder streamsBuilder) { KStream stream = streamsBuilder.stream("ks1In"); stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>())); return stream; } private KeyValue uppercaseValue(Integer key, String value) { return new KeyValue<>(key, value.toUpperCase()); } } ``` 默认情况下,由它创建的`StreamBuilder`对象管理的流是自动启动的。你可以使用`spring.kafka.streams.auto-startup`属性自定义此行为。 ### 3.4.其他卡夫卡物业 Auto Configuration 支持的属性显示在附录的[“集成属性”](application-properties.html#appendix.application-properties.integration)部分中。请注意,在大多数情况下,这些属性(连字符或 CamelCase)直接映射到 Apache Kafka 点线属性。有关详细信息,请参见 Apache Kafka 文档。 这些属性中的前几个应用于所有组件(生产者、消费者、管理员和流),但如果你希望使用不同的值,则可以在组件级别指定这些属性。Apache Kafka 将属性的重要性指定为高、中或低。 Spring 引导自动配置支持所有高重要性属性、一些选定的中低属性以及不具有默认值的任何属性。 只有 Kafka 支持的属性的一个子集可以直接通过`Kafka属性`类获得。如果你希望为生产者或消费者配置不受直接支持的附加属性,请使用以下属性: 属性 ``` spring.kafka.properties[prop.one]=first spring.kafka.admin.properties[prop.two]=second spring.kafka.consumer.properties[prop.three]=third spring.kafka.producer.properties[prop.four]=fourth spring.kafka.streams.properties[prop.five]=fifth ``` Yaml ``` spring: kafka: properties: "[prop.one]": "first" admin: properties: "[prop.two]": "second" consumer: properties: "[prop.three]": "third" producer: properties: "[prop.four]": "fourth" streams: properties: "[prop.five]": "fifth" ``` 这将普通的`prop.one`Kafka 属性设置为`first`(适用于生产者、消费者和管理员),`prop.two`管理属性设置为`second`,`prop.three`消费者属性设置为`third`,`prop.four`生产者属性设置为`fourth`,`prop.five`流属性设置为`fifth`。 还可以将 Spring kafka`JsonDeserializer`配置如下: 属性 ``` spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another ``` Yaml ``` spring: kafka: consumer: value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer" properties: "[spring.json.value.default.type]": "com.example.Invoice" "[spring.json.trusted.packages]": "com.example.main,com.example.another" ``` 类似地,你可以禁用`JsonSerializer`在标题中发送类型信息的默认行为: Properties ``` spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.properties[spring.json.add.type.headers]=false ``` Yaml ``` spring: kafka: producer: value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer" properties: "[spring.json.add.type.headers]": false ``` | |以这种方式设置的属性将覆盖 Spring 引导明确支持的任何配置项。| |---|------------------------------------------------------------------------------------------------| ### 3.5.嵌入式 Kafka 测试 Spring 对于 Apache Kafka 提供了一种方便的方式来测试带有嵌入式 Apache Kafka 代理的项目。要使用此功能,请使用`spring-kafka-test`模块中的`@EmbeddedKafka`对测试类进行注释。有关更多信息,请参见 Apache Kafka[参考手册](https://docs.spring.io/spring-kafka/docs/2.8.3/reference/html/#embedded-kafka-annotation)的 Spring。 要使 Spring 引导自动配置与前面提到的嵌入式 Apache Kafka 代理一起工作,你需要将嵌入式代理地址(由`EmbeddedKafkaBroker`填充)的系统属性重新映射到 Apache Kafka 的 Spring 引导配置属性中。有几种方法可以做到这一点: * 提供一个系统属性,将嵌入式代理地址映射到测试类中的`spring.kafka.bootstrap-servers`: ``` static { System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers"); } ``` * 在`@EmbeddedKafka`注释上配置属性名: ``` import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.test.context.EmbeddedKafka; @SpringBootTest @EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers") class MyTest { // ... } ``` * 在配置属性中使用占位符: Properties ``` spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers} ``` Yaml ``` spring: kafka: bootstrap-servers: "${spring.embedded.kafka.brokers}" ``` ## 4. RSocket [RSocket](https://rsocket.io)是一种用于字节流传输的二进制协议。它支持通过在单个连接上传递异步消息的对称交互模型。 Spring 框架的`spring-messaging`模块在客户端和服务器端为 RSocket 请求者和响应者提供支持。有关更多详细信息,请参见 Spring 框架引用的[RSocket section](https://docs.spring.io/spring-framework/docs/5.3.16/reference/html/web-reactive.html#rsocket-spring),包括 RSocket 协议的概述。 ### 4.1.RSocket 策略自动配置 Spring 引导自动配置`RSocketStrategies` Bean,该配置提供了编码和解码 RSocket 有效负载所需的所有基础设施。默认情况下,自动配置将尝试配置以下内容(按顺序): 1. [CBOR](https://cbor.io/)带 Jackson 的编解码器 2. 具有 Jackson 的 JSON 编解码器 `spring-boot-starter-rsocket`启动器提供了这两个依赖项。请参阅[Jackson 支助科](features.html#features.json.jackson)以了解有关自定义可能性的更多信息。 开发人员可以通过创建实现`RSocketStrategiesCustomizer`接口的 bean 来定制`RSocketStrategies`组件。请注意,它们的`@Order`很重要,因为它决定了编解码器的顺序。 ### 4.2.RSocket 服务器自动配置 Spring 启动提供了 RSocket 服务器的自动配置。所需的依赖关系由`spring-boot-starter-rsocket`提供。 Spring 启动允许在 WebSocket 上从 WebFlux 服务器公开 RSocket,或者在独立的 RSocket 服务器上公开 RSocket。这取决于应用程序的类型及其配置。 对于 WebFlux 应用程序(类型为`WebApplicationType.REACTIVE`),只有在以下属性匹配的情况下,才会将 RSocket 服务器插入到 Web 服务器中: Properties ``` spring.rsocket.server.mapping-path=/rsocket spring.rsocket.server.transport=websocket ``` Yaml ``` spring: rsocket: server: mapping-path: "/rsocket" transport: "websocket" ``` | |将 RSocket 插入到 Web 服务器中只受 Reactor Netty 的支持,因为 RSocket 本身是用该库构建的。| |---|----------------------------------------------------------------------------------------------------------------------| 或者,RSocket TCP 或 WebSocket 服务器作为独立的嵌入式服务器启动。除了依赖关系需求外,唯一需要的配置是为该服务器定义一个端口: Properties ``` spring.rsocket.server.port=9898 ``` Yaml ``` spring: rsocket: server: port: 9898 ``` ### 4.3. Spring 消息 RSocket 支持 Spring 启动将自动为 RSocket 配置 Spring 消息传递基础设施。 这意味着 Spring 引导将创建一个`RSocketMessageHandler` Bean,它将处理对应用程序的 RSocket 请求。 ### 4.4.使用 RSocketRequester 调用 RSocket 服务 一旦在服务器和客户端之间建立了`RSocket`通道,任何一方都可以向另一方发送或接收请求。 作为服务器,你可以在 RSocket`@Controller`的任何处理程序方法上插入`RSocketRequester`实例。作为客户机,你需要首先配置和建立一个 RSocket 连接。 Spring 对于这种情况,引导自动配置带有预期编解码器的`RSocketRequester.Builder`,并应用任何`RSocketConnectorConfigurer` Bean。 `RSocketRequester.Builder`实例是一个原型 Bean,这意味着每个注入点都将为你提供一个新实例。这是有目的的,因为这个构建器是有状态的,你不应该使用相同的实例创建具有不同设置的请求者。 下面的代码展示了一个典型的示例: ``` import reactor.core.publisher.Mono; import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.stereotype.Service; @Service public class MyService { private final RSocketRequester rsocketRequester; public MyService(RSocketRequester.Builder rsocketRequesterBuilder) { this.rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898); } public Mono someRSocketCall(String name) { return this.rsocketRequester.route("user").data(name).retrieveMono(User.class); } } ``` ## 5. Spring 整合 Spring Boot 为使用[Spring Integration](https://spring.io/projects/spring-integration)提供了几种便利,包括`spring-boot-starter-integration`“starter”。 Spring 集成在消息传递和其他传输(例如 HTTP、TCP 和其他传输)上提供了抽象。如果 Spring 集成在你的 Classpath 上可用,则通过`@EnableIntegration`注释对其进行初始化。 Spring 集成轮询逻辑依赖于[on the auto-configured `TaskScheduler`](features.html#features.task-execution-and-scheduling)。默认的`PollerMetadata`(每秒轮询无界的消息数量)可以使用`spring.integration.poller.*`配置属性进行定制。 Spring 引导还配置了由存在额外的 Spring 集成模块触发的一些特征。如果`spring-integration-jmx`也在 Classpath 上,则消息处理统计信息将在 JMX 上发布。如果`spring-integration-jdbc`可用,则可以在启动时创建默认的数据库模式,如下所示: Properties ``` spring.integration.jdbc.initialize-schema=always ``` Yaml ``` spring: integration: jdbc: initialize-schema: "always" ``` 如果`spring-integration-rsocket`可用,开发人员可以使用`"spring.rsocket.server.*"`属性配置 RSocket 服务器,并让它使用`IntegrationRSocketEndpoint`或`RSocketOutboundGateway`组件来处理传入的 RSocket 消息。该基础结构可以处理 Spring 集成 RSocket 通道适配器和`@MessageMapping`处理程序(已配置`"spring.integration.rsocket.server.message-mapping-enabled"`)。 Spring 启动还可以使用配置属性自动配置`ClientRSocketConnector`: Properties ``` # Connecting to a RSocket server over TCP spring.integration.rsocket.client.host=example.org spring.integration.rsocket.client.port=9898 ``` Yaml ``` # Connecting to a RSocket server over TCP spring: integration: rsocket: client: host: "example.org" port: 9898 ``` Properties ``` # Connecting to a RSocket Server over WebSocket spring.integration.rsocket.client.uri=ws://example.org ``` Yaml ``` # Connecting to a RSocket Server over WebSocket spring: integration: rsocket: client: uri: "ws://example.org" ``` 有关更多详细信息,请参见[“IntegrationAutoConfiguration”](https://github.com/spring-projects/spring-boot/tree/v2.6.4/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java)和[“整合属性”](https://github.com/spring-projects/spring-boot/tree/v2.6.4/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java)类。 ## 6. 接下来读什么? 下一节介绍如何在应用程序中启用[IO capabilities](io.html#io)。你可以在本节中阅读有关[caching](io.html#io.caching)、[mail](io.html#io.email)、[validation](io.html#io.validation)、[rest clients](io.html#io.rest-client)等内容。