(window.webpackJsonp=window.webpackJsonp||[]).push([[407],{839:function(e,a,t){"use strict";t.r(a);var r=t(56),n=Object(r.a)({},(function(){var e=this,a=e.$createElement,t=e._self._c||a;return t("ContentSlotsDistributor",{attrs:{"slot-key":e.$parent.slotKey}},[t("h1",{attrs:{id:"消息传递"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#消息传递"}},[e._v("#")]),e._v(" 消息传递")]),e._v(" "),t("p",[e._v("Spring 框架为与消息传递系统集成提供了广泛的支持,从使用"),t("code",[e._v("JmsTemplate")]),e._v("的 JMS API 的简化使用到异步接收消息的完整基础设施。 Spring AMQP 为高级消息队列协议提供了类似的功能集。 Spring Boot 还提供了"),t("code",[e._v("RabbitTemplate")]),e._v("和 RabbitMQ 的自动配置选项。 Spring WebSocket 原生地包括对 Stomp 消息传递的支持,并且 Spring 引导通过启动器和少量自动配置对此提供支持。 Spring Boot 还支持 Apache Kafka。")]),e._v(" "),t("h2",{attrs:{id:"_1-jms"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_1-jms"}},[e._v("#")]),e._v(" 1. JMS")]),e._v(" "),t("p",[t("code",[e._v("javax.jms.ConnectionFactory")]),e._v("接口提供了一种标准方法来创建用于与 JMS 代理交互的"),t("code",[e._v("javax.jms.Connection")]),e._v("。尽管 Spring 需要"),t("code",[e._v("ConnectionFactory")]),e._v("来处理 JMS,但你通常不需要自己直接使用它,而是可以依赖于更高级别的消息传递抽象。(有关详细信息,请参见 Spring Framework 参考文档的"),t("a",{attrs:{href:"https://docs.spring.io/spring-framework/docs/5.3.16/reference/html/integration.html#jms",target:"_blank",rel:"noopener noreferrer"}},[e._v("相关部分"),t("OutboundLink")],1),e._v("。) Spring Boot 还自动配置发送和接收消息所需的基础设施。")]),e._v(" "),t("h3",{attrs:{id:"_1-1-activemq-支持"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_1-1-activemq-支持"}},[e._v("#")]),e._v(" 1.1.ActiveMQ 支持")]),e._v(" "),t("p",[e._v("当"),t("a",{attrs:{href:"https://activemq.apache.org/",target:"_blank",rel:"noopener noreferrer"}},[e._v("ActiveMQ"),t("OutboundLink")],1),e._v("在 Classpath 上可用时, Spring 引导也可以配置"),t("code",[e._v("ConnectionFactory")]),e._v("。如果存在代理,则会自动启动并配置嵌入式代理(前提是通过配置未指定代理 URL,并且在配置中未禁用嵌入式代理)。")]),e._v(" "),t("table",[t("thead",[t("tr",[t("th"),e._v(" "),t("th",[e._v("如果使用"),t("code",[e._v("spring-boot-starter-activemq")]),e._v(",则提供了连接或嵌入 ActiveMQ 实例所需的依赖关系, Spring 与 JMS 集成的基础设施也是如此。")])])]),e._v(" "),t("tbody")]),e._v(" "),t("p",[e._v("ActiveMQ 配置由"),t("code",[e._v("spring.activemq.*")]),e._v("中的外部配置属性控制。")]),e._v(" "),t("p",[e._v("默认情况下,ActiveMQ 被自动配置为使用"),t("a",{attrs:{href:"https://activemq.apache.org/vm-transport-reference.html",target:"_blank",rel:"noopener noreferrer"}},[e._v("VM 传输"),t("OutboundLink")],1),e._v(",它启动嵌入在同一 JVM 实例中的代理。")]),e._v(" "),t("p",[e._v("可以通过配置"),t("code",[e._v("spring.activemq.in-memory")]),e._v("属性禁用嵌入式代理,如以下示例所示:")]),e._v(" "),t("p",[e._v("属性")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring.activemq.in-memory=false\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring:\n activemq:\n in-memory: false\n")])])]),t("p",[e._v("如果你配置代理 URL,那么嵌入式代理也将被禁用,如以下示例所示:")]),e._v(" "),t("p",[e._v("属性")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring.activemq.broker-url=tcp://192.168.1.210:9876\nspring.activemq.user=admin\nspring.activemq.password=secret\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('spring:\n activemq:\n broker-url: "tcp://192.168.1.210:9876"\n user: "admin"\n password: "secret"\n')])])]),t("p",[e._v("如果你想完全控制嵌入式代理,请参阅"),t("a",{attrs:{href:"https://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html",target:"_blank",rel:"noopener noreferrer"}},[e._v("ActiveMQ 文档"),t("OutboundLink")],1),e._v("以获取更多信息。")]),e._v(" "),t("p",[e._v("默认情况下,"),t("code",[e._v("CachingConnectionFactory")]),e._v("将本机"),t("code",[e._v("ConnectionFactory")]),e._v("包装成合理的设置,你可以通过"),t("code",[e._v("spring.jms.*")]),e._v("中的外部配置属性来控制这些设置:")]),e._v(" "),t("p",[e._v("属性")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring.jms.cache.session-cache-size=5\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring:\n jms:\n cache:\n session-cache-size: 5\n")])])]),t("p",[e._v("如果你更愿意使用本机池,那么可以通过向"),t("code",[e._v("org.messaginghub:pooled-jms")]),e._v("添加一个依赖项并相应地配置"),t("code",[e._v("JmsPoolConnectionFactory")]),e._v("来实现,如下面的示例所示:")]),e._v(" "),t("p",[e._v("属性")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring.activemq.pool.enabled=true\nspring.activemq.pool.max-connections=50\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring:\n activemq:\n pool:\n enabled: true\n max-connections: 50\n")])])]),t("table",[t("thead",[t("tr",[t("th"),e._v(" "),t("th",[e._v("参见["),t("code",[e._v("ActiveMQ属性")]),e._v("](https://github.com/ Spring-projects/ Spring-boot/tree/v2.6.4/ Spring-boot-project/ Spring-boot-autofigure/SRC/main/java/org/springframework/boot/autofigure/jms/activemq/activemqproperties.java)以获得更多支持的选项。"),t("br"),e._v("还可以注册任意数量的 bean 来实现"),t("code",[e._v("ActiveMQConnectionFactoryCustomizer")]),e._v("以进行更高级的定制。")])])]),e._v(" "),t("tbody")]),e._v(" "),t("p",[e._v("默认情况下,ActiveMQ 会创建一个尚不存在的目标,以便根据其提供的名称对目标进行解析。")]),e._v(" "),t("h3",{attrs:{id:"_1-2-activemq-artemis-支持"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_1-2-activemq-artemis-支持"}},[e._v("#")]),e._v(" 1.2.ActiveMQ Artemis 支持")]),e._v(" "),t("p",[e._v("Spring 当在 Classpath 上检测到"),t("a",{attrs:{href:"https://activemq.apache.org/components/artemis/",target:"_blank",rel:"noopener noreferrer"}},[e._v("ActiveMQ Artemis"),t("OutboundLink")],1),e._v("可用时,启动可以自动配置"),t("code",[e._v("ConnectionFactory")]),e._v("。如果存在代理,则会自动启动和配置嵌入式代理(除非已显式设置了 Mode 属性)。支持的模式是"),t("code",[e._v("embedded")]),e._v("(明确表示需要嵌入式代理,并且如果代理在 Classpath 上不可用,则应该发生错误)和"),t("code",[e._v("native")]),e._v("(使用"),t("code",[e._v("netty")]),e._v("传输协议连接到代理)。当对后者进行配置时, Spring 引导将配置一个"),t("code",[e._v("ConnectionFactory")]),e._v(",该代理连接到使用默认设置在本地计算机上运行的代理。")]),e._v(" "),t("table",[t("thead",[t("tr",[t("th"),e._v(" "),t("th",[e._v("如果使用"),t("code",[e._v("spring-boot-starter-artemis")]),e._v(",则提供了连接到现有 ActiveMQ Artemis 实例的必要依赖项,以及 Spring 与 JMS 集成的基础设施。"),t("br"),e._v("将"),t("code",[e._v("org.apache.activemq:artemis-jms-server")]),e._v("添加到应用程序中,可以使用嵌入式模式。")])])]),e._v(" "),t("tbody")]),e._v(" "),t("p",[e._v("ActiveMQ Artemis 配置由"),t("code",[e._v("spring.artemis.*")]),e._v("中的外部配置属性控制。例如,你可以在"),t("code",[e._v("application.properties")]),e._v("中声明以下部分:")]),e._v(" "),t("p",[e._v("属性")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring.artemis.mode=native\nspring.artemis.broker-url=tcp://192.168.1.210:9876\nspring.artemis.user=admin\nspring.artemis.password=secret\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('spring:\n artemis:\n mode: native\n broker-url: "tcp://192.168.1.210:9876"\n user: "admin"\n password: "secret"\n')])])]),t("p",[e._v("嵌入代理时,你可以选择是否要启用持久性,并列出应该使其可用的目标。它们可以指定为逗号分隔的列表,以使用默认选项创建它们,或者你可以分别为高级队列和主题配置定义类型"),t("code",[e._v("org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration")]),e._v("或"),t("code",[e._v("org.apache.activemq.artemis.jms.server.config.TopicConfiguration")]),e._v("的 Bean(s)。")]),e._v(" "),t("p",[e._v("默认情况下,"),t("code",[e._v("CachingConnectionFactory")]),e._v("将本机"),t("code",[e._v("ConnectionFactory")]),e._v("包装成合理的设置,你可以通过"),t("code",[e._v("spring.jms.*")]),e._v("中的外部配置属性来控制这些设置:")]),e._v(" "),t("p",[e._v("属性")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring.jms.cache.session-cache-size=5\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring:\n jms:\n cache:\n session-cache-size: 5\n")])])]),t("p",[e._v("如果你更愿意使用本机池,那么可以通过向"),t("code",[e._v("org.messaginghub:pooled-jms")]),e._v("添加一个依赖项并相应地配置"),t("code",[e._v("JmsPoolConnectionFactory")]),e._v("来实现,如下面的示例所示:")]),e._v(" "),t("p",[e._v("属性")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring.artemis.pool.enabled=true\nspring.artemis.pool.max-connections=50\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring:\n artemis:\n pool:\n enabled: true\n max-connections: 50\n")])])]),t("p",[e._v("参见["),t("code",[e._v("Artemis属性")]),e._v("](https://github.com/ Spring-projects/ Spring-boot/tree/v2.6.4/ Spring-boot-project/ Spring-boot-autofigure/SRC/main/java/org/springframework/boot/autofigure/jms/artemis/artemproperties.java)以获得更多支持的选项。")]),e._v(" "),t("p",[e._v("不涉及 JNDI 查找,并且使用 Artemis 配置中的"),t("code",[e._v("name")]),e._v("属性或通过配置提供的名称,根据它们的名称解析目标。")]),e._v(" "),t("h3",{attrs:{id:"_1-3-使用-jndi-connectionfactory"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_1-3-使用-jndi-connectionfactory"}},[e._v("#")]),e._v(" 1.3.使用 JNDI ConnectionFactory")]),e._v(" "),t("p",[e._v("如果你在应用程序服务器中运行你的应用程序, Spring 引导将尝试通过使用 JNDI 来定位 JMS"),t("code",[e._v("ConnectionFactory")]),e._v("。默认情况下,将检查"),t("code",[e._v("java:/JmsXA")]),e._v("和"),t("code",[e._v("java:/XAConnectionFactory")]),e._v("位置。如果需要指定替代位置,可以使用"),t("code",[e._v("spring.jms.jndi-name")]),e._v("属性,如下例所示:")]),e._v(" "),t("p",[e._v("属性")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring.jms.jndi-name=java:/MyConnectionFactory\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('spring:\n jms:\n jndi-name: "java:/MyConnectionFactory"\n')])])]),t("h3",{attrs:{id:"_1-4-发送消息"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_1-4-发送消息"}},[e._v("#")]),e._v(" 1.4.发送消息")]),e._v(" "),t("p",[e._v("Spring 的"),t("code",[e._v("JmsTemplate")]),e._v("是自动配置的,你可以直接将其自动连接到自己的 bean 中,如以下示例所示:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('import org.springframework.jms.core.JmsTemplate;\nimport org.springframework.stereotype.Component;\n\n@Component\npublic class MyBean {\n\n private final JmsTemplate jmsTemplate;\n\n public MyBean(JmsTemplate jmsTemplate) {\n this.jmsTemplate = jmsTemplate;\n }\n\n // ...\n\n public void someMethod() {\n this.jmsTemplate.convertAndSend("hello");\n }\n\n}\n\n')])])]),t("table",[t("thead",[t("tr",[t("th"),e._v(" "),t("th",[e._v("["),t("code",[e._v("JmsMessagingTemplate")]),e._v("](https://DOCS. Spring.io/ Spring-framework/DOCS/5.3.16/javadoc-api/org/springframework/jms/core/jmssagingtemplate.html)可以以类似的方式注入。"),t("br"),e._v("如果定义了"),t("code",[e._v("DestinationResolver")]),e._v("或"),t("code",[e._v("MessageConverter")]),e._v(" Bean,则自动关联到自动配置的"),t("code",[e._v("JmsTemplate")]),e._v("。")])])]),e._v(" "),t("tbody")]),e._v(" "),t("h3",{attrs:{id:"_1-5-接收消息"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_1-5-接收消息"}},[e._v("#")]),e._v(" 1.5.接收消息")]),e._v(" "),t("p",[e._v("当存在 JMS 基础设施时,任何 Bean 都可以用"),t("code",[e._v("@JmsListener")]),e._v("进行注释,以创建侦听器端点。如果没有"),t("code",[e._v("JmsListenerContainerFactory")]),e._v("被定义,一个默认的是自动配置的。如果定义了"),t("code",[e._v("DestinationResolver")]),e._v("、"),t("code",[e._v("MessageConverter")]),e._v("或"),t("code",[e._v("javax.jms.ExceptionListener")]),e._v("bean,则它们将自动与默认工厂关联。")]),e._v(" "),t("p",[e._v("默认情况下,默认工厂是事务性的。如果在存在"),t("code",[e._v("JtaTransactionManager")]),e._v("的基础设施中运行,则默认情况下它与侦听器容器关联。如果没有,则启用"),t("code",[e._v("sessionTransacted")]),e._v("标志。在后一种场景中,可以通过在侦听器方法(或其委托)上添加"),t("code",[e._v("@Transactional")]),e._v(",将本地数据存储事务与传入消息的处理相关联。这可以确保在本地事务完成后确认传入消息。这还包括发送在同一 JMS 会话上执行的响应消息。")]),e._v(" "),t("p",[e._v("以下组件在"),t("code",[e._v("someQueue")]),e._v("目标上创建一个侦听器端点:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('import org.springframework.jms.annotation.JmsListener;\nimport org.springframework.stereotype.Component;\n\n@Component\npublic class MyBean {\n\n @JmsListener(destination = "someQueue")\n public void processMessage(String content) {\n // ...\n }\n\n}\n\n')])])]),t("table",[t("thead",[t("tr",[t("th"),e._v(" "),t("th",[e._v("有关更多详细信息,请参见["),t("code",[e._v("@EnableJms")]),e._v("的 Javadoc](https://DOCS. Spring.io/ Spring-framework/DOCS/5.3.16/javadoc-api/org/springframework/jms/annotation/enablejms.html)。")])])]),e._v(" "),t("tbody")]),e._v(" "),t("p",[e._v("如果你需要创建更多的"),t("code",[e._v("JmsListenerContainerFactory")]),e._v("实例,或者如果你想重写默认值, Spring Boot 提供了一个"),t("code",[e._v("DefaultJmsListenerContainerFactoryConfigurer")]),e._v(",你可以使用它初始化"),t("code",[e._v("DefaultJmsListenerContainerFactory")]),e._v(",并使用与自动配置的设置相同的设置。")]),e._v(" "),t("p",[e._v("例如,下面的示例公开了另一个使用特定"),t("code",[e._v("MessageConverter")]),e._v("的工厂:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("import javax.jms.ConnectionFactory;\n\nimport org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;\nimport org.springframework.context.annotation.Bean;\nimport org.springframework.context.annotation.Configuration;\nimport org.springframework.jms.config.DefaultJmsListenerContainerFactory;\n\n@Configuration(proxyBeanMethods = false)\npublic class MyJmsConfiguration {\n\n @Bean\n public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {\n DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();\n ConnectionFactory connectionFactory = getCustomConnectionFactory();\n configurer.configure(factory, connectionFactory);\n factory.setMessageConverter(new MyMessageConverter());\n return factory;\n }\n\n private ConnectionFactory getCustomConnectionFactory() {\n return ...\n }\n\n}\n\n")])])]),t("p",[e._v("然后,你可以在任何"),t("code",[e._v("@JmsListener")]),e._v("注释的方法中使用工厂,如下所示:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('import org.springframework.jms.annotation.JmsListener;\nimport org.springframework.stereotype.Component;\n\n@Component\npublic class MyBean {\n\n @JmsListener(destination = "someQueue", containerFactory = "myFactory")\n public void processMessage(String content) {\n // ...\n }\n\n}\n\n')])])]),t("h2",{attrs:{id:"_2-amqp"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_2-amqp"}},[e._v("#")]),e._v(" 2. AMQP")]),e._v(" "),t("p",[e._v("高级消息队列协议是一种面向消息中间件的平台中立、线路级协议。 Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。 Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了几种便利,包括"),t("code",[e._v("spring-boot-starter-amqp")]),e._v("“starter”。")]),e._v(" "),t("h3",{attrs:{id:"_2-1-rabbitmq-支持"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_2-1-rabbitmq-支持"}},[e._v("#")]),e._v(" 2.1.RabbitMQ 支持")]),e._v(" "),t("p",[t("a",{attrs:{href:"https://www.rabbitmq.com/",target:"_blank",rel:"noopener noreferrer"}},[e._v("RabbitMQ"),t("OutboundLink")],1),e._v("是一种基于 AMQP 协议的轻量级、可靠、可扩展和可移植的消息代理。 Spring 使用"),t("code",[e._v("RabbitMQ")]),e._v("通过 AMQP 协议进行通信。")]),e._v(" "),t("p",[e._v("RabbitMQ 配置由"),t("code",[e._v("spring.rabbitmq.*")]),e._v("中的外部配置属性控制。例如,你可以在"),t("code",[e._v("application.properties")]),e._v("中声明以下部分:")]),e._v(" "),t("p",[e._v("属性")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring.rabbitmq.host=localhost\nspring.rabbitmq.port=5672\nspring.rabbitmq.username=admin\nspring.rabbitmq.password=secret\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('spring:\n rabbitmq:\n host: "localhost"\n port: 5672\n username: "admin"\n password: "secret"\n')])])]),t("p",[e._v("或者,你可以使用"),t("code",[e._v("addresses")]),e._v("属性配置相同的连接:")]),e._v(" "),t("p",[e._v("属性")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring.rabbitmq.addresses=amqp://admin:[email protected]\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('spring:\n rabbitmq:\n addresses: "amqp://admin:[email protected]"\n')])])]),t("table",[t("thead",[t("tr",[t("th"),e._v(" "),t("th",[e._v("当以这种方式指定地址时,"),t("code",[e._v("host")]),e._v("和"),t("code",[e._v("port")]),e._v("属性将被忽略。"),t("br"),e._v("如果地址使用"),t("code",[e._v("amqps")]),e._v("协议,则将自动启用 SSL 支持。")])])]),e._v(" "),t("tbody")]),e._v(" "),t("p",[e._v("参见["),t("code",[e._v("Rabbit属性")]),e._v("](https://github.com/ Spring-projects/ Spring-boot/tree/v2.6.4/ Spring-boot-project/ Spring-boot-autofigure/SRC/main/java/org/springframework/boot/autofigure/amqp/rabbitproperties.java)以获得更多支持的基于属性的配置选项。要配置 Spring AMQP 使用的 RabbitMQ"),t("code",[e._v("ConnectionFactory")]),e._v("的较低级别的详细信息,请定义"),t("code",[e._v("ConnectionFactoryCustomizer")]),e._v(" Bean。")]),e._v(" "),t("p",[e._v("如果在上下文中存在"),t("code",[e._v("ConnectionNameStrategy")]),e._v(" Bean,则将自动使用它来命名由自动配置的"),t("code",[e._v("CachingConnectionFactory")]),e._v("创建的连接。")]),e._v(" "),t("table",[t("thead",[t("tr",[t("th"),e._v(" "),t("th",[e._v("有关更多详细信息,请参见"),t("a",{attrs:{href:"https://spring.io/blog/2010/06/14/understanding-amqp-the-protocol-used-by-rabbitmq/",target:"_blank",rel:"noopener noreferrer"}},[e._v("理解 RabbitMQ 使用的协议 AMQP"),t("OutboundLink")],1),e._v("。")])])]),e._v(" "),t("tbody")]),e._v(" "),t("h3",{attrs:{id:"_2-2-发送消息"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_2-2-发送消息"}},[e._v("#")]),e._v(" 2.2.发送消息")]),e._v(" "),t("p",[e._v("Spring 的"),t("code",[e._v("AmqpTemplate")]),e._v("和"),t("code",[e._v("AmqpAdmin")]),e._v("是自动配置的,你可以将它们直接自动连接到自己的 bean 中,如以下示例所示:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('import org.springframework.amqp.core.AmqpAdmin;\nimport org.springframework.amqp.core.AmqpTemplate;\nimport org.springframework.stereotype.Component;\n\n@Component\npublic class MyBean {\n\n private final AmqpAdmin amqpAdmin;\n\n private final AmqpTemplate amqpTemplate;\n\n public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {\n this.amqpAdmin = amqpAdmin;\n this.amqpTemplate = amqpTemplate;\n }\n\n // ...\n\n public void someMethod() {\n this.amqpAdmin.getQueueInfo("someQueue");\n }\n\n public void someOtherMethod() {\n this.amqpTemplate.convertAndSend("hello");\n }\n\n}\n\n')])])]),t("table",[t("thead",[t("tr",[t("th"),e._v(" "),t("th",[e._v("["),t("code",[e._v("RabbitMessagingTemplate")]),e._v("](https://DOCS. Spring.io/ Spring-amqp/DOCS/2.4.2/api/org/springframework/amqp/rabbit/core/rabbitmessagingtemplate.html)可以以类似的方式注入。"),t("br"),e._v("如果定义了"),t("code",[e._v("MessageConverter")]),e._v(" Bean,则自动关联到自动配置的"),t("code",[e._v("AmqpTemplate")]),e._v("。")])])]),e._v(" "),t("tbody")]),e._v(" "),t("p",[e._v("如果有必要,任何被定义为 Bean 的"),t("code",[e._v("org.springframework.amqp.core.Queue")]),e._v("都会自动用于在 RabbitMQ 实例上声明相应的队列。")]),e._v(" "),t("p",[e._v("要重试操作,你可以在"),t("code",[e._v("AmqpTemplate")]),e._v("上启用重试(例如,在代理连接丢失的情况下):")]),e._v(" "),t("p",[e._v("属性")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring.rabbitmq.template.retry.enabled=true\nspring.rabbitmq.template.retry.initial-interval=2s\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('spring:\n rabbitmq:\n template:\n retry:\n enabled: true\n initial-interval: "2s"\n')])])]),t("p",[e._v("默认情况下,重试是禁用的。你还可以通过声明一个"),t("code",[e._v("RabbitRetryTemplateCustomizer")]),e._v(" Bean 以编程方式定制"),t("code",[e._v("RetryTemplate")]),e._v("。")]),e._v(" "),t("p",[e._v("如果你需要创建更多的"),t("code",[e._v("RabbitTemplate")]),e._v("实例,或者如果你想要重写默认值, Spring boot 提供了一个"),t("code",[e._v("RabbitTemplateConfigurer")]),e._v(" Bean,你可以使用它初始化"),t("code",[e._v("RabbitTemplate")]),e._v(",并使用与自动配置所使用的工厂相同的设置。")]),e._v(" "),t("h3",{attrs:{id:"_2-3-接收消息"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_2-3-接收消息"}},[e._v("#")]),e._v(" 2.3.接收消息")]),e._v(" "),t("p",[e._v("当 Rabbit 基础结构存在时,任何 Bean 都可以用"),t("code",[e._v("@RabbitListener")]),e._v("进行注释,以创建侦听器端点。如果没有"),t("code",[e._v("RabbitListenerContainerFactory")]),e._v("已被定义,一个默认的"),t("code",[e._v("SimpleRabbitListenerContainerFactory")]),e._v("将被自动配置,并且你可以使用"),t("code",[e._v("spring.rabbitmq.listener.type")]),e._v("属性切换到一个直接容器。如果定义了"),t("code",[e._v("MessageConverter")]),e._v("或"),t("code",[e._v("MessageRecoverer")]),e._v(" Bean,则会自动与默认工厂关联。")]),e._v(" "),t("p",[e._v("以下示例组件在"),t("code",[e._v("someQueue")]),e._v("队列上创建一个侦听器端点:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('import org.springframework.amqp.rabbit.annotation.RabbitListener;\nimport org.springframework.stereotype.Component;\n\n@Component\npublic class MyBean {\n\n @RabbitListener(queues = "someQueue")\n public void processMessage(String content) {\n // ...\n }\n\n}\n\n')])])]),t("table",[t("thead",[t("tr",[t("th"),e._v(" "),t("th",[e._v("有关更多详细信息,请参见["),t("code",[e._v("@EnableRabbit")]),e._v("的 javadoc](https://DOCS. Spring.io/ Spring-amqp/DOCS/2.4.2/api/org/springframework/amqp/rabbit/annotation/enablerabbit.html)。")])])]),e._v(" "),t("tbody")]),e._v(" "),t("p",[e._v("如果你需要创建更多的"),t("code",[e._v("RabbitListenerContainerFactory")]),e._v("实例,或者你想要重写默认值, Spring Boot 提供了一个"),t("code",[e._v("SimpleRabbitListenerContainerFactoryConfigurer")]),e._v("和一个"),t("code",[e._v("DirectRabbitListenerContainerFactoryConfigurer")]),e._v(",你可以使用它们初始化一个"),t("code",[e._v("SimpleRabbitListenerContainerFactory")]),e._v("和一个"),t("code",[e._v("DirectRabbitListenerContainerFactory")]),e._v(",其设置与自动配置所使用的工厂相同。")]),e._v(" "),t("table",[t("thead",[t("tr",[t("th"),e._v(" "),t("th",[e._v("选择哪种容器类型并不重要。"),t("br"),e._v("这两个 bean 是通过自动配置公开的。")])])]),e._v(" "),t("tbody")]),e._v(" "),t("p",[e._v("例如,下面的配置类公开了另一个使用特定"),t("code",[e._v("MessageConverter")]),e._v("的工厂:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;\nimport org.springframework.amqp.rabbit.connection.ConnectionFactory;\nimport org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;\nimport org.springframework.context.annotation.Bean;\nimport org.springframework.context.annotation.Configuration;\n\n@Configuration(proxyBeanMethods = false)\npublic class MyRabbitConfiguration {\n\n @Bean\n public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {\n SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();\n ConnectionFactory connectionFactory = getCustomConnectionFactory();\n configurer.configure(factory, connectionFactory);\n factory.setMessageConverter(new MyMessageConverter());\n return factory;\n }\n\n private ConnectionFactory getCustomConnectionFactory() {\n return ...\n }\n\n}\n\n")])])]),t("p",[e._v("然后,你可以在任何"),t("code",[e._v("@RabbitListener")]),e._v("-注释的方法中使用工厂,如下所示:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('import org.springframework.amqp.rabbit.annotation.RabbitListener;\nimport org.springframework.stereotype.Component;\n\n@Component\npublic class MyBean {\n\n @RabbitListener(queues = "someQueue", containerFactory = "myFactory")\n public void processMessage(String content) {\n // ...\n }\n\n}\n\n')])])]),t("p",[e._v("你可以启用重试来处理侦听器抛出异常的情况。默认情况下,使用"),t("code",[e._v("RejectAndDontRequeueRecoverer")]),e._v(",但你可以定义自己的"),t("code",[e._v("MessageRecoverer")]),e._v("。当重复尝试用完时,消息将被拒绝,如果代理配置为这样做,消息将被丢弃或路由到死信交换。默认情况下,重试是禁用的。你还可以通过声明一个"),t("code",[e._v("RabbitRetryTemplateCustomizer")]),e._v(" Bean 以编程方式定制"),t("code",[e._v("RetryTemplate")]),e._v("。")]),e._v(" "),t("table",[t("thead",[t("tr",[t("th"),e._v(" "),t("th",[e._v("默认情况下,如果禁用了重试,并且侦听器抛出了异常,交付将无限期地重试。"),t("br"),e._v("你可以通过两种方式修改此行为:将"),t("code",[e._v("defaultRequeueRejected")]),e._v("属性设置为"),t("code",[e._v("false")]),e._v("所以尝试进行零次重新交付或抛出"),t("code",[e._v("AmqpRejectAndDontRequeueException")]),e._v("以表示消息应被拒绝。"),t("br"),e._v("后者是在启用重试并达到最大交付尝试次数时使用的机制。")])])]),e._v(" "),t("tbody")]),e._v(" "),t("h2",{attrs:{id:"_3-apache-kafka-支持"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_3-apache-kafka-支持"}},[e._v("#")]),e._v(" 3. Apache Kafka 支持")]),e._v(" "),t("p",[t("a",{attrs:{href:"https://kafka.apache.org/",target:"_blank",rel:"noopener noreferrer"}},[e._v("Apache Kafka"),t("OutboundLink")],1),e._v("通过提供"),t("code",[e._v("spring-kafka")]),e._v("项目的自动配置而得到支持。")]),e._v(" "),t("p",[e._v("Kafka 配置由"),t("code",[e._v("spring.kafka.*")]),e._v("中的外部配置属性控制。例如,你可以在"),t("code",[e._v("application.properties")]),e._v("中声明以下部分:")]),e._v(" "),t("p",[e._v("属性")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring.kafka.bootstrap-servers=localhost:9092\nspring.kafka.consumer.group-id=myGroup\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('spring:\n kafka:\n bootstrap-servers: "localhost:9092"\n consumer:\n group-id: "myGroup"\n')])])]),t("table",[t("thead",[t("tr",[t("th"),e._v(" "),t("th",[e._v("要在启动时创建主题,请添加类型"),t("code",[e._v("NewTopic")]),e._v("的 Bean。如果主题已经存在,则忽略 Bean。")])])]),e._v(" "),t("tbody")]),e._v(" "),t("p",[e._v("参见["),t("code",[e._v("Kafka属性")]),e._v("](https://github.com/ Spring-projects/ Spring-boot/tree/v2.6.4/ Spring-boot-project/ Spring-boot-autofigure/SRC/main/java/org/springframework/boot/autofigure/kafka/kaproperties.java)以获得更多支持的选项。")]),e._v(" "),t("h3",{attrs:{id:"_3-1-发送消息"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_3-1-发送消息"}},[e._v("#")]),e._v(" 3.1.发送消息")]),e._v(" "),t("p",[e._v("Spring 的"),t("code",[e._v("KafkaTemplate")]),e._v("是自动配置的,你可以直接在自己的 bean 中自动连接它,如以下示例所示:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('import org.springframework.kafka.core.KafkaTemplate;\nimport org.springframework.stereotype.Component;\n\n@Component\npublic class MyBean {\n\n private final KafkaTemplate kafkaTemplate;\n\n public MyBean(KafkaTemplate kafkaTemplate) {\n this.kafkaTemplate = kafkaTemplate;\n }\n\n // ...\n\n public void someMethod() {\n this.kafkaTemplate.send("someTopic", "Hello");\n }\n\n}\n\n')])])]),t("table",[t("thead",[t("tr",[t("th"),e._v(" "),t("th",[e._v("如果定义了属性"),t("code",[e._v("spring.kafka.producer.transaction-id-prefix")]),e._v(",则自动配置一个"),t("code",[e._v("KafkaTransactionManager")]),e._v("。"),t("br"),e._v("同样,如果定义了一个"),t("code",[e._v("RecordMessageConverter")]),e._v(" Bean,则自动将其关联到自动配置的"),t("code",[e._v("KafkaTemplate")]),e._v("。")])])]),e._v(" "),t("tbody")]),e._v(" "),t("h3",{attrs:{id:"_3-2-接收消息"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_3-2-接收消息"}},[e._v("#")]),e._v(" 3.2.接收消息")]),e._v(" "),t("p",[e._v("当存在 Apache Kafka 基础设施时,任何 Bean 都可以用"),t("code",[e._v("@KafkaListener")]),e._v("进行注释,以创建侦听器端点。如果没有"),t("code",[e._v("KafkaListenerContainerFactory")]),e._v("被定义,默认的一个是自动配置与"),t("code",[e._v("spring.kafka.listener.*")]),e._v("中定义的键。")]),e._v(" "),t("p",[e._v("以下组件在"),t("code",[e._v("someTopic")]),e._v("主题上创建一个侦听器端点:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('import org.springframework.kafka.annotation.KafkaListener;\nimport org.springframework.stereotype.Component;\n\n@Component\npublic class MyBean {\n\n @KafkaListener(topics = "someTopic")\n public void processMessage(String content) {\n // ...\n }\n\n}\n\n')])])]),t("p",[e._v("如果定义了"),t("code",[e._v("KafkaTransactionManager")]),e._v(" Bean,则它将自动与容器工厂关联。类似地,如果定义了一个"),t("code",[e._v("RecordFilterStrategy")]),e._v("、"),t("code",[e._v("CommonErrorHandler")]),e._v("、"),t("code",[e._v("AfterRollbackProcessor")]),e._v("或"),t("code",[e._v("ConsumerAwareRebalanceListener")]),e._v(" Bean,则它将自动关联到默认工厂。")]),e._v(" "),t("p",[e._v("根据侦听器类型,与默认工厂关联的是"),t("code",[e._v("RecordMessageConverter")]),e._v("或"),t("code",[e._v("BatchMessageConverter")]),e._v(" Bean。如果批处理侦听器仅存在一个"),t("code",[e._v("RecordMessageConverter")]),e._v(" Bean,则将其包装在"),t("code",[e._v("BatchMessageConverter")]),e._v("中。")]),e._v(" "),t("table",[t("thead",[t("tr",[t("th"),e._v(" "),t("th",[e._v("自定义"),t("code",[e._v("ChainedKafkaTransactionManager")]),e._v("必须标记"),t("code",[e._v("@Primary")]),e._v(",因为它通常引用自动配置的"),t("code",[e._v("KafkaTransactionManager")]),e._v(" Bean。")])])]),e._v(" "),t("tbody")]),e._v(" "),t("h3",{attrs:{id:"_3-3-卡夫卡溪流"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_3-3-卡夫卡溪流"}},[e._v("#")]),e._v(" 3.3.卡夫卡溪流")]),e._v(" "),t("p",[e._v("Spring 对于 Apache,Kafka 提供了一种工厂 Bean 来创建"),t("code",[e._v("StreamsBuilder")]),e._v("对象并管理其流的生命周期。 Spring 引导自动配置所需的"),t("code",[e._v("KafkaStreamsConfiguration")]),e._v(" Bean,只要"),t("code",[e._v("kafka-streams")]),e._v("在 Classpath 上并且 Kafka 流是由"),t("code",[e._v("@EnableKafkaStreams")]),e._v("注释启用的。")]),e._v(" "),t("p",[e._v("启用 Kafka 流意味着必须设置应用程序 ID 和引导程序服务器。前者可以使用"),t("code",[e._v("spring.kafka.streams.application-id")]),e._v("进行配置,如果没有设置,则默认为"),t("code",[e._v("spring.application.name")]),e._v("。后者可以全局设置,也可以只针对流专门重写。")]),e._v(" "),t("p",[e._v("使用专用属性可以获得几个附加属性;可以使用"),t("code",[e._v("spring.kafka.streams.properties")]),e._v("名称空间设置其他任意 Kafka 属性。有关更多信息,请参见"),t("RouterLink",{attrs:{to:"/spring-boot/features.html#messaging.kafka.additional-properties"}},[e._v("features.html")]),e._v("。")],1),e._v(" "),t("p",[e._v("要使用工厂 Bean,将"),t("code",[e._v("StreamsBuilder")]),e._v("连接到你的"),t("code",[e._v("@Bean")]),e._v("中,如以下示例所示:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('import org.apache.kafka.common.serialization.Serdes;\nimport org.apache.kafka.streams.KeyValue;\nimport org.apache.kafka.streams.StreamsBuilder;\nimport org.apache.kafka.streams.kstream.KStream;\nimport org.apache.kafka.streams.kstream.Produced;\n\nimport org.springframework.context.annotation.Bean;\nimport org.springframework.context.annotation.Configuration;\nimport org.springframework.kafka.annotation.EnableKafkaStreams;\nimport org.springframework.kafka.support.serializer.JsonSerde;\n\n@Configuration(proxyBeanMethods = false)\n@EnableKafkaStreams\npublic class MyKafkaStreamsConfiguration {\n\n @Bean\n public KStream kStream(StreamsBuilder streamsBuilder) {\n KStream stream = streamsBuilder.stream("ks1In");\n stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));\n return stream;\n }\n\n private KeyValue uppercaseValue(Integer key, String value) {\n return new KeyValue<>(key, value.toUpperCase());\n }\n\n}\n\n')])])]),t("p",[e._v("默认情况下,由它创建的"),t("code",[e._v("StreamBuilder")]),e._v("对象管理的流是自动启动的。你可以使用"),t("code",[e._v("spring.kafka.streams.auto-startup")]),e._v("属性自定义此行为。")]),e._v(" "),t("h3",{attrs:{id:"_3-4-其他卡夫卡物业"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_3-4-其他卡夫卡物业"}},[e._v("#")]),e._v(" 3.4.其他卡夫卡物业")]),e._v(" "),t("p",[e._v("Auto Configuration 支持的属性显示在附录的"),t("RouterLink",{attrs:{to:"/spring-boot/application-properties.html#appendix.application-properties.integration"}},[e._v("“集成属性”")]),e._v("部分中。请注意,在大多数情况下,这些属性(连字符或 CamelCase)直接映射到 Apache Kafka 点线属性。有关详细信息,请参见 Apache Kafka 文档。")],1),e._v(" "),t("p",[e._v("这些属性中的前几个应用于所有组件(生产者、消费者、管理员和流),但如果你希望使用不同的值,则可以在组件级别指定这些属性。Apache Kafka 将属性的重要性指定为高、中或低。 Spring 引导自动配置支持所有高重要性属性、一些选定的中低属性以及不具有默认值的任何属性。")]),e._v(" "),t("p",[e._v("只有 Kafka 支持的属性的一个子集可以直接通过"),t("code",[e._v("Kafka属性")]),e._v("类获得。如果你希望为生产者或消费者配置不受直接支持的附加属性,请使用以下属性:")]),e._v(" "),t("p",[e._v("属性")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring.kafka.properties[prop.one]=first\nspring.kafka.admin.properties[prop.two]=second\nspring.kafka.consumer.properties[prop.three]=third\nspring.kafka.producer.properties[prop.four]=fourth\nspring.kafka.streams.properties[prop.five]=fifth\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('spring:\n kafka:\n properties:\n "[prop.one]": "first"\n admin:\n properties:\n "[prop.two]": "second"\n consumer:\n properties:\n "[prop.three]": "third"\n producer:\n properties:\n "[prop.four]": "fourth"\n streams:\n properties:\n "[prop.five]": "fifth"\n')])])]),t("p",[e._v("这会将常见的"),t("code",[e._v("prop.one")]),e._v("Kafka 属性设置为"),t("code",[e._v("first")]),e._v("(适用于生产者、消费者和管理员),将"),t("code",[e._v("prop.two")]),e._v("管理属性设置为"),t("code",[e._v("second")]),e._v(",将"),t("code",[e._v("prop.three")]),e._v("消费者属性设置为"),t("code",[e._v("prop.four")]),e._v("生产者属性设置为"),t("code",[e._v("fourth")]),e._v(",将"),t("code",[e._v("prop.five")]),e._v("流属性设置为"),t("code",[e._v("fifth")]),e._v("。")]),e._v(" "),t("p",[e._v("还可以将 Spring kafka"),t("code",[e._v("JsonDeserializer")]),e._v("配置如下:")]),e._v(" "),t("p",[e._v("属性")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer\nspring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice\nspring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('spring:\n kafka:\n consumer:\n value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"\n properties:\n "[spring.json.value.default.type]": "com.example.Invoice"\n "[spring.json.trusted.packages]": "com.example.main,com.example.another"\n')])])]),t("p",[e._v("类似地,你可以禁用"),t("code",[e._v("JsonSerializer")]),e._v("在标题中发送类型信息的默认行为:")]),e._v(" "),t("p",[e._v("属性")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer\nspring.kafka.producer.properties[spring.json.add.type.headers]=false\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('spring:\n kafka:\n producer:\n value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"\n properties:\n "[spring.json.add.type.headers]": false\n')])])]),t("table",[t("thead",[t("tr",[t("th"),e._v(" "),t("th",[e._v("以这种方式设置的属性覆盖 Spring 引导明确支持的任何配置项。")])])]),e._v(" "),t("tbody")]),e._v(" "),t("h3",{attrs:{id:"_3-5-嵌入式-kafka-测试"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_3-5-嵌入式-kafka-测试"}},[e._v("#")]),e._v(" 3.5.嵌入式 Kafka 测试")]),e._v(" "),t("p",[e._v("Spring 对于 Apache Kafka 提供了一种方便的方式来测试带有嵌入式 Apache Kafka 代理的项目。要使用此功能,请在"),t("code",[e._v("spring-kafka-test")]),e._v("模块中用"),t("code",[e._v("@EmbeddedKafka")]),e._v("注释一个测试类。有关更多信息,请参见 Apache Kafka"),t("a",{attrs:{href:"https://docs.spring.io/spring-kafka/docs/2.8.3/reference/html/#embedded-kafka-annotation",target:"_blank",rel:"noopener noreferrer"}},[e._v("参考手册"),t("OutboundLink")],1),e._v("的 Spring。")]),e._v(" "),t("p",[e._v("要使 Spring 引导自动配置与前面提到的嵌入式 Apache Kafka 代理一起工作,你需要将嵌入式代理地址(由"),t("code",[e._v("EmbeddedKafkaBroker")]),e._v("填充)的系统属性重新映射到 Apache Kafka 的 Spring 引导配置属性中。有几种方法可以做到这一点:")]),e._v(" "),t("ul",[t("li",[e._v("提供一个系统属性,将嵌入式代理地址映射到测试类中的"),t("code",[e._v("spring.kafka.bootstrap-servers")]),e._v(":")])]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('static {\n System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");\n}\n\n')])])]),t("ul",[t("li",[e._v("在"),t("code",[e._v("@EmbeddedKafka")]),e._v("注释上配置属性名:")])]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('import org.springframework.boot.test.context.SpringBootTest;\nimport org.springframework.kafka.test.context.EmbeddedKafka;\n\n@SpringBootTest\n@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")\nclass MyTest {\n\n // ...\n\n}\n\n')])])]),t("ul",[t("li",[e._v("在配置属性中使用占位符:")])]),e._v(" "),t("p",[e._v("属性")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('spring:\n kafka:\n bootstrap-servers: "${spring.embedded.kafka.brokers}"\n')])])]),t("h2",{attrs:{id:"_4-rsocket"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_4-rsocket"}},[e._v("#")]),e._v(" 4. RSocket")]),e._v(" "),t("p",[t("a",{attrs:{href:"https://rsocket.io",target:"_blank",rel:"noopener noreferrer"}},[e._v("RSocket"),t("OutboundLink")],1),e._v("是一种用于字节流传输的二进制协议。它支持通过在单个连接上传递异步消息的对称交互模型。")]),e._v(" "),t("p",[e._v("Spring 框架的"),t("code",[e._v("spring-messaging")]),e._v("模块在客户端和服务器端为 RSocket 请求者和响应者提供支持。有关更多详细信息,请参见 Spring 框架引用的"),t("a",{attrs:{href:"https://docs.spring.io/spring-framework/docs/5.3.16/reference/html/web-reactive.html#rsocket-spring",target:"_blank",rel:"noopener noreferrer"}},[e._v("RSocket 部分"),t("OutboundLink")],1),e._v(",包括 RSocket 协议的概述。")]),e._v(" "),t("h3",{attrs:{id:"_4-1-rsocket-策略自动配置"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_4-1-rsocket-策略自动配置"}},[e._v("#")]),e._v(" 4.1.RSocket 策略自动配置")]),e._v(" "),t("p",[e._v("Spring 引导自动配置"),t("code",[e._v("RSocketStrategies")]),e._v(" Bean,该配置提供了编码和解码 RSocket 有效负载所需的所有基础设施。默认情况下,自动配置将尝试配置以下内容(按顺序):")]),e._v(" "),t("ol",[t("li",[t("p",[t("a",{attrs:{href:"https://cbor.io/",target:"_blank",rel:"noopener noreferrer"}},[e._v("CBOR"),t("OutboundLink")],1),e._v("带 Jackson 的编解码器")])]),e._v(" "),t("li",[t("p",[e._v("具有 Jackson 的 JSON 编解码器")])])]),e._v(" "),t("p",[t("code",[e._v("spring-boot-starter-rsocket")]),e._v("starter 提供了这两个依赖项。请参阅"),t("RouterLink",{attrs:{to:"/spring-boot/features.html#features.json.jackson"}},[e._v("Jackson 支助科")]),e._v("以了解更多有关定制可能性的信息。")],1),e._v(" "),t("p",[e._v("开发人员可以通过创建实现"),t("code",[e._v("RSocketStrategiesCustomizer")]),e._v("接口的 bean 来定制"),t("code",[e._v("RSocketStrategies")]),e._v("组件。请注意,它们的"),t("code",[e._v("@Order")]),e._v("很重要,因为它决定了编解码器的顺序。")]),e._v(" "),t("h3",{attrs:{id:"_4-2-rsocket-服务器自动配置"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_4-2-rsocket-服务器自动配置"}},[e._v("#")]),e._v(" 4.2.RSocket 服务器自动配置")]),e._v(" "),t("p",[e._v("Spring 启动提供了 RSocket 服务器的自动配置。所需的依赖关系由"),t("code",[e._v("spring-boot-starter-rsocket")]),e._v("提供。")]),e._v(" "),t("p",[e._v("Spring 启动允许在 WebSocket 上从 WebFlux 服务器公开 RSocket,或者支持独立的 RSocket 服务器。这取决于应用程序的类型及其配置。")]),e._v(" "),t("p",[e._v("对于 WebFlux 应用程序(类型为"),t("code",[e._v("WebApplicationType.REACTIVE")]),e._v("),只有在以下属性匹配的情况下,才会将 RSocket 服务器插入到 Web 服务器中:")]),e._v(" "),t("p",[e._v("Properties")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring.rsocket.server.mapping-path=/rsocket\nspring.rsocket.server.transport=websocket\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('spring:\n rsocket:\n server:\n mapping-path: "/rsocket"\n transport: "websocket"\n')])])]),t("table",[t("thead",[t("tr",[t("th"),e._v(" "),t("th",[e._v("将 RSocket 插入到 Web 服务器中只受 Reactor Netty 的支持,因为 RSocket 本身是用该库构建的。")])])]),e._v(" "),t("tbody")]),e._v(" "),t("p",[e._v("或者,RSocket TCP 或 WebSocket 服务器作为独立的嵌入式服务器启动。除了依赖关系需求外,唯一需要的配置是为该服务器定义一个端口:")]),e._v(" "),t("p",[e._v("Properties")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring.rsocket.server.port=9898\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring:\n rsocket:\n server:\n port: 9898\n")])])]),t("h3",{attrs:{id:"_4-3-spring-消息-rsocket-支持"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_4-3-spring-消息-rsocket-支持"}},[e._v("#")]),e._v(" 4.3. Spring 消息 RSocket 支持")]),e._v(" "),t("p",[e._v("Spring 启动将自动为 RSocket 配置 Spring 消息传递基础结构。")]),e._v(" "),t("p",[e._v("这意味着 Spring 引导将创建一个"),t("code",[e._v("RSocketMessageHandler")]),e._v(" Bean,它将处理对应用程序的 RSocket 请求。")]),e._v(" "),t("h3",{attrs:{id:"_4-4-使用-rsocketrequester-调用-rsocket-服务"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_4-4-使用-rsocketrequester-调用-rsocket-服务"}},[e._v("#")]),e._v(" 4.4.使用 RSocketRequester 调用 RSocket 服务")]),e._v(" "),t("p",[e._v("一旦在服务器和客户端之间建立了"),t("code",[e._v("RSocket")]),e._v("通道,任何一方都可以向另一方发送或接收请求。")]),e._v(" "),t("p",[e._v("作为服务器,你可以在 RSocket"),t("code",[e._v("@Controller")]),e._v("的任何处理程序方法上插入"),t("code",[e._v("RSocketRequester")]),e._v("实例。作为客户机,你需要首先配置和建立一个 RSocket 连接。 Spring 对于这种情况,引导自动配置带有预期编解码器的"),t("code",[e._v("RSocketRequester.Builder")]),e._v(",并应用任何"),t("code",[e._v("RSocketConnectorConfigurer")]),e._v(" Bean。")]),e._v(" "),t("p",[t("code",[e._v("RSocketRequester.Builder")]),e._v("实例是一个原型 Bean,这意味着每个注入点都将为你提供一个新实例。这是有目的的,因为这个构建器是有状态的,你不应该使用相同的实例创建具有不同设置的请求者。")]),e._v(" "),t("p",[e._v("下面的代码展示了一个典型的示例:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('import reactor.core.publisher.Mono;\n\nimport org.springframework.messaging.rsocket.RSocketRequester;\nimport org.springframework.stereotype.Service;\n\n@Service\npublic class MyService {\n\n private final RSocketRequester rsocketRequester;\n\n public MyService(RSocketRequester.Builder rsocketRequesterBuilder) {\n this.rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898);\n }\n\n public Mono someRSocketCall(String name) {\n return this.rsocketRequester.route("user").data(name).retrieveMono(User.class);\n }\n\n}\n\n')])])]),t("h2",{attrs:{id:"_5-spring-整合"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_5-spring-整合"}},[e._v("#")]),e._v(" 5. Spring 整合")]),e._v(" "),t("p",[e._v("Spring Boot 为使用"),t("a",{attrs:{href:"https://spring.io/projects/spring-integration",target:"_blank",rel:"noopener noreferrer"}},[e._v("Spring Integration"),t("OutboundLink")],1),e._v("提供了几种便利,包括"),t("code",[e._v("spring-boot-starter-integration")]),e._v("“starter”。 Spring 集成在消息传递和其他传输(例如 HTTP、TCP 和其他传输)上提供了抽象。如果 Spring 集成在你的 Classpath 上可用,则通过"),t("code",[e._v("@EnableIntegration")]),e._v("注释对其进行初始化。")]),e._v(" "),t("p",[e._v("Spring 集成轮询逻辑依赖于[自动配置的"),t("code",[e._v("TaskScheduler")]),e._v("](features.html#features.task-execution-and-schedule)。可以使用"),t("code",[e._v("spring.integration.poller.*")]),e._v("配置属性定制默认的"),t("code",[e._v("PollerMetadata")]),e._v("(每秒轮询无界的消息数量)。")]),e._v(" "),t("p",[e._v("Spring 引导还配置了由存在额外的 Spring 集成模块触发的一些特征。如果"),t("code",[e._v("spring-integration-jmx")]),e._v("也在 Classpath 上,则消息处理统计信息将在 JMX 上发布。如果"),t("code",[e._v("spring-integration-jdbc")]),e._v("可用,则可以在启动时创建默认的数据库模式,如下列所示:")]),e._v(" "),t("p",[e._v("Properties")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("spring.integration.jdbc.initialize-schema=always\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('spring:\n integration:\n jdbc:\n initialize-schema: "always"\n')])])]),t("p",[e._v("如果"),t("code",[e._v("spring-integration-rsocket")]),e._v("可用,开发人员可以使用"),t("code",[e._v('"spring.rsocket.server.*"')]),e._v("属性配置 RSocket 服务器,并让它使用"),t("code",[e._v("IntegrationRSocketEndpoint")]),e._v("或"),t("code",[e._v("RSocketOutboundGateway")]),e._v("组件来处理传入的 RSocket 消息。这个基础结构可以处理 Spring 集成 RSocket 通道适配器和"),t("code",[e._v("@MessageMapping")]),e._v("处理程序(给定"),t("code",[e._v('"spring.integration.rsocket.server.message-mapping-enabled"')]),e._v("是已配置的)。")]),e._v(" "),t("p",[e._v("Spring 启动还可以使用配置属性自动配置"),t("code",[e._v("ClientRSocketConnector")]),e._v(":")]),e._v(" "),t("p",[e._v("Properties")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("# Connecting to a RSocket server over TCP\nspring.integration.rsocket.client.host=example.org\nspring.integration.rsocket.client.port=9898\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('# Connecting to a RSocket server over TCP\nspring:\n integration:\n rsocket:\n client:\n host: "example.org"\n port: 9898\n')])])]),t("p",[e._v("Properties")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("# Connecting to a RSocket Server over WebSocket\nspring.integration.rsocket.client.uri=ws://example.org\n")])])]),t("p",[e._v("Yaml")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('# Connecting to a RSocket Server over WebSocket\nspring:\n integration:\n rsocket:\n client:\n uri: "ws://example.org"\n')])])]),t("p",[e._v("参见["),t("code",[e._v("IntegrationAutoConfiguration")]),e._v("](https://github.com/ Spring-projects/ Spring-boot/tree/v2.6.4/ Spring-boot-project/ Spring-boot-autofconfigure/SRC/main/java/org/org/SpringFramework/Boot/autoframework/autoframework/autoframework/AutoConfigation/AutoConfiguration/justion.java)和[<"),t("code",[e._v("IntegrationProperties")]),e._v("](https:///github.com/[[ Spring-projects/ Spring-")]),e._v(" "),t("h2",{attrs:{id:"_6-接下来要读什么"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#_6-接下来要读什么"}},[e._v("#")]),e._v(" 6. 接下来要读什么?")]),e._v(" "),t("p",[e._v("下一节介绍如何在应用程序中启用"),t("RouterLink",{attrs:{to:"/spring-boot/io.html#io"}},[e._v("IO 功能")]),e._v("。你可以在本节中阅读有关"),t("RouterLink",{attrs:{to:"/spring-boot/io.html#io.caching"}},[e._v("caching")]),e._v("、"),t("RouterLink",{attrs:{to:"/spring-boot/io.html#io.email"}},[e._v("mail")]),e._v("、"),t("RouterLink",{attrs:{to:"/spring-boot/io.html#io.validation"}},[e._v("validation")]),e._v("、"),t("RouterLink",{attrs:{to:"/spring-boot/io.html#io.rest-client"}},[e._v("REST 客户")]),e._v("等内容。")],1)])}),[],!1,null,null,null);a.default=n.exports}}]);