# AMQP 支持 ## AMQP 支持 Spring 集成提供了用于通过使用高级消息队列协议接收和发送消息的通道适配器。 你需要在项目中包含此依赖项: Maven ``` org.springframework.integration spring-integration-amqp 5.5.9 ``` Gradle ``` compile "org.springframework.integration:spring-integration-amqp:5.5.9" ``` 可提供以下适配器: * [入站通道适配器](#amqp-inbound-channel-adapter) * [入站网关](#amqp-inbound-gateway) * [出站通道适配器](#amqp-outbound-channel-adapter) * [出站网关](#amqp-outbound-gateway) * [异步出站网关](#amqp-async-outbound-gateway) Spring 集成还提供了由 AMQP 交换和队列支持的点对点消息通道和发布-订阅消息通道。 为了提供 AMQP 支持, Spring 集成依赖于([Spring AMQP](https://projects.spring.io/spring-amqp)),它将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。 Spring AMQP 提供了与([Spring JMS](https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#jms))类似的语义。 Spring 鉴于所提供的 AMQP 通道适配器仅用于单向消息传递(发送或接收), Spring 集成还提供了用于请求-应答操作的入站和出站 AMQP 网关。 小贴士:你应该熟悉[reference documentation of the Spring AMQP project](https://docs.spring.io/spring-amqp/reference/html/)。它提供了关于 Spring 与 AMQP 的集成的更深入的信息,特别是与 RabbitMQ 的集成。 ### 入站通道适配器 下面的清单显示了 AMQP 入站通道适配器的可能配置选项: 爪哇 DSL ``` @Bean public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) { return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "aName")) .handle(m -> System.out.println(m.getPayload())) .get(); } ``` 爪哇 ``` @Bean public MessageChannel amqpInputChannel() { return new DirectChannel(); } @Bean public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer, @Qualifier("amqpInputChannel") MessageChannel channel) { AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(channel); return adapter; } @Bean public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueueNames("aName"); container.setConcurrentConsumers(2); // ... return container; } @Bean @ServiceActivator(inputChannel = "amqpInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message message) throws MessagingException { System.out.println(message.getPayload()); } }; } ``` XML ``` (27) ``` |**1** |此适配器的唯一 ID。
可选的。| |------|| |**2** |转换后的消息应发送到的消息通道。
需要。| |**3** |AMQP 队列的名称(用逗号分隔的列表),应该从这些队列中消费消息。
required。| |**4** |`MessageListenerContainer`的确认模式,
当设置为`MANUAL`时,传递标记和通道在消息头`amqp_deliveryTag`和`amqp_channel`中提供,分别。
用户应用程序负责确认。`NONE`表示没有确认(`autoAck`)。`AUTO`表示适配器的容器在下游流完成时进行确认。
可选(默认为自动)。
参见[入站端点确认模式](#amqp-inbound-ack)。| |**5** |额外的 AOP 建议来处理与此入站通道适配器相关的横切行为。
可选的。| |**6** |标志,指示该组件创建的通道是事务性的。
如果为真,它会告诉框架使用事务性通道,并根据结果以提交或回滚结束所有操作(发送或接收),但有一个异常,表示回滚。
可选(默认为假)。| |**7** |指定要创建的并发消费者的数量。
默认值是`1`。
我们建议增加并发消费者的数量,以按比例分配来自队列的消息的消耗,
但是,请注意,一旦注册了多个消费者,任何订购保证都会丢失。,通常情况下,
,对于低容量队列使用一个使用者。
设置了“每个队列的使用者”时不允许使用。
可选。| |**8** |Bean 参考 RabbitMQ.可选(默认为)。| |**9** |错误消息应发送到的消息通道。
可选。| |**10**|侦听器通道是否暴露于已注册的`ChannelAwareMessageListener`.
可选(默认为 true)。| |**11**|在接收 AMQP 消息时使用的对`AmqpHeaderMapper`的引用。
可选的。
默认情况下,只有标准的 AMQP 属性(例如`contentType`)被复制到 Spring Integration`MessageHeaders`。
AMQP`MessageProperties`中的任何用户定义的标题都不会通过默认的`DefaultAmqpHeaderMapper`复制到消息中。如果提供了“request-header-names”,则不允许
。| |**12**|要从 AMQP 请求映射到`MessageHeaders`中的 AMQP 头的名称的逗号分隔列表。
只能提供如果没有提供“header-mapper”引用。
此列表中的值也可以是与 header 名称匹配的简单模式(例如“\*”或“thing1\*,thing2”或“\*something”)。| |**13**|引用`AbstractMessageListenerContainer`用于接收 AMQP 消息。
如果提供了此属性,则不应提供与侦听器容器配置相关的其他属性。
换句话说,通过设置此引用,你必须对侦听器容器配置承担全部责任。
唯一的例外是`MessageListener`本身。
因为这实际上是这个通道适配器实现的核心责任,所以引用的侦听器容器不能已经拥有自己的`MessageListener`。
可选的。| |**14**|在接收 AMQP 消息时使用的`MessageConverter`。
可选。| |**15**|在接收 AMQP 消息时使用的`MessagePropertiesConverter`。
可选。| |**16**|指定底层`AbstractMessageListenerContainer`应该启动和停止的阶段,
启动顺序从最低到最高,关闭顺序与此相反,
默认情况下,该值为`Integer.MAX_VALUE`,这意味着这个容器开始得越晚,停止得越快。
可选的。| |**17**|告诉 AMQP 代理在一个请求中要向每个使用者发送多少消息。
通常,你可以将这个值设置得很高,以提高吞吐量。
它应该大于或等于事务大小(请参见`tx-size`属性,
可选(默认为`1`)。| |**18**|接收以毫秒为单位的超时。
可选(默认为`1000`)。| |**19**|指定底层`AbstractMessageListenerContainer`的恢复尝试之间的间隔(以毫秒为单位)。
可选(默认为`5000`)。| |**20**|如果“true”且代理上没有队列可用,则容器在启动时抛出一个致命的异常,如果在容器运行时删除了队列(在进行了三次被动声明队列的尝试之后),则停止。,容器不抛出异常并进入恢复模式,尝试根据`recovery-interval`重新启动。
可选(默认为`true`)。| |**21**|在底层`AbstractMessageListenerContainer`停止之后和强制关闭 AMQP 连接之前等待工作人员的时间(以毫秒为单位)。
如果有工作人员在关闭信号出现时处于活动状态,只要他们能够在此超时范围内完成处理,他们就被允许完成处理。
否则,连接已关闭,消息仍未确认(如果通道是事务性的)。
可选(默认为`5000`)。| |**22**|默认情况下,底层`AbstractMessageListenerContainer`使用`SimpleAsyncTaskExecutor`实现,该实现为每个任务启动一个新线程,并异步运行它,默认情况下,注意,此实现不重用线程。
考虑使用线程池`TaskExecutor`实现作为替代。
可选(默认为`SimpleAsyncTaskExecutor`)。| |**23**|默认情况下,底层`AbstractMessageListenerContainer`创建了`DefaultTransactionAttribute`的新实例(它采用 EJB 方法回滚运行时,但不检查异常)。
可选(默认为`DefaultTransactionAttribute`)。| |**24**|设置对底层`AbstractMessageListenerContainer`上的外部`PlatformTransactionManager`的引用。
事务管理器与`channel-transacted`属性一起工作。
如果在框架发送或接收消息时已经有事务在进行中并且`channelTransacted`标志是`true`,消息传递事务的提交或回滚将被推迟到当前事务结束时。
如果`channelTransacted`标志是`false`,则消息传递操作不应用事务语义(它是自动标记的)。
以获取更多信息,见[Transactions with Spring AMQP](https://docs.spring.io/spring-amqp/reference/html/%255Freference.html#%5Ftransactions)。
可选。| |**25**|告诉`SimpleMessageListenerContainer`在单个事务中要处理多少消息(如果通道是事务性的)。
对于最佳结果,它应该小于或等于`prefetch-count`中设置的值。
当设置了“customers-per-queue”时不允许。
可选(默认为`1`)。| |**26**|指示底层侦听器容器应该是`DirectMessageListenerContainer`,而不是默认的`SimpleMessageListenerContainer`。
有关更多信息,请参见[Spring AMQP Reference Manual](https://docs.spring.io/spring-amqp/reference/html/)。| |**27**|当容器的`consumerBatchEnabled`是`true`时,确定适配器如何在消息有效负载中呈现一批消息。
当设置为`MESSAGES`(默认)时,有效负载是`List>`,其中每个消息都有从传入的 AMQP`Message`映射的头,并且有效负载是转换后的`body`。
当设置为`EXTRACT_PAYLOADS`时,有效负载是`List`,其中元素是从 AMQP`Message`体转换而来的。`EXTRACT_PAYLOADS_WITH_HEADERS`类似于`EXTRACT_PAYLOADS`,但是,除此之外,每个消息的头从`MessageProperties`映射到相应索引处的`List`;头名称为`AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS`。| | |容器

注意,在使用 XML 配置外部容器时,不能使用 Spring AMQP 名称空间来定义容器。
这是因为名称空间至少需要一个``元素。,
在这种环境中,侦听器是适配器的内部。,
由于这个原因,你必须使用正常的 Spring ``定义来定义容器,如下例所示:

```
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">




```| |---|| | |尽管 Spring 集成 JMS 和 AMQP 支持是相似的,但存在重要的差异。
JMS 入站通道适配器使用的是`JmsDestinationPollingSource`下的 covers,并且期望配置一个 poller。
AMQP 入站通道适配器使用的是`AbstractMessageListenerContainer`,并且是消息驱动的。,在这方面,
,它更类似于 JMS 消息驱动的通道适配器。| |---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 从版本 5.5 开始,`AmqpInboundChannelAdapter`可以配置`org.springframework.amqp.rabbit.retry.MessageRecoverer`策略,当内部调用重试操作时,该策略在`RecoveryCallback`中使用。有关更多信息,请参见`setMessageRecoverer()`爪哇docs。 #### 批处理消息 有关批处理消息的更多信息,请参见[the Spring AMQP Documentation](https://docs.spring.io/spring-amqp/docs/current/reference/html/#template-batching)。 要生成具有 Spring 集成的批处理消息,只需使用`BatchingRabbitTemplate`配置出站端点。 默认情况下,接收批处理消息时,侦听器容器提取每个片段消息,适配器将为每个片段生成`Message`。从版本 5.2 开始,如果容器的`deBatchingEnabled`属性设置为`false`,则由适配器执行去批处理,并产生一个`Message>`,其有效负载是片段有效负载的列表(如果合适的话,在转换之后)。 默认的`BatchingStrategy`是`SimpleBatchingStrategy`,但这可以在适配器上被重写。 | |当重试操作需要恢复时,必须在批处理中使用`org.springframework.amqp.rabbit.retry.MessageBatchRecoverer`。| |---|-------------------------------------------------------------------------------------------------------------------------------------------| ### 已调查的入站通道适配器 #### 概述 版本 5.0.1 引入了一个轮询通道适配器,允许你按需获取单个消息——例如,使用`MessageSourcePollingTemplate`或 Poller。有关更多信息,请参见[延迟确认可收集消息源](./polling-consumer.html#deferred-acks-message-source)。 它目前不支持 XML 配置。 下面的示例展示了如何配置`AmqpMessageSource`: 爪哇 DSL ``` @Bean public IntegrationFlow flow() { return IntegrationFlows.from(Amqp.inboundPolledAdapter(connectionFactory(), DSL_QUEUE), e -> e.poller(Pollers.fixedDelay(1_000)).autoStartup(false)) .handle(p -> { ... }) .get(); } ``` 爪哇 ``` @Bean public AmqpMessageSource source(ConnectionFactory connectionFactory) { return new AmqpMessageSource(connectionFactory, "someQueue"); } ``` 有关配置属性,请参见[爪哇doc](https://docs.spring.io/spring-integration/api/org/springframework/integration/amqp/inbound/AmqpMessageSource.html)。 XML ``` This adapter currently does not have XML configuration support. ``` #### 批处理消息 见[批处理消息](#amqp-debatching)。 对于轮询的适配器,不存在侦听器容器,批处理的消息总是会被删除(如果`BatchingStrategy`支持这样做的话)。 ### 入站网关 入站网关支持入站通道适配器上的所有属性(除了“通道”被“请求通道”代替),以及一些附加属性。下面的清单显示了可用的属性: Java DSL ``` @Bean // return the upper cased payload public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) { return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, "foo")) .transform(String.class, String::toUpperCase) .get(); } ``` Java ``` @Bean public MessageChannel amqpInputChannel() { return new DirectChannel(); } @Bean public AmqpInboundGateway inbound(SimpleMessageListenerContainer listenerContainer, @Qualifier("amqpInputChannel") MessageChannel channel) { AmqpInboundGateway gateway = new AmqpInboundGateway(listenerContainer); gateway.setRequestChannel(channel); gateway.setDefaultReplyTo("bar"); return gateway; } @Bean public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueueNames("foo"); container.setConcurrentConsumers(2); // ... return container; } @Bean @ServiceActivator(inputChannel = "amqpInputChannel") public MessageHandler handler() { return new AbstractReplyProducingMessageHandler() { @Override protected Object handleRequestMessage(Message requestMessage) { return "reply to " + requestMessage.getPayload(); } }; } ``` XML ``` (9) ``` |**1**|此适配器的唯一 ID。
可选的。| |-----|| |**2**|将转换后的消息发送到的消息通道。
需要。| |**3**|在接收 AMQP 消息时使用的对`AmqpHeaderMapper`的引用。
可选的。
默认情况下,只有标准的 AMQP 属性(例如`contentType`)才会被复制到 Spring Integration`MessageHeaders`。
AMQP`MessageProperties`中的任何用户定义的头文件都不会被默认的`DefaultAmqpHeaderMapper`复制到或复制到 AMQP 消息中。如果提供了“request-header-names”或“reply-header-names”,则不允许
。| |**4**|要从 AMQP 请求映射到`MessageHeaders`的 AMQP 头的名称的逗号分隔列表。
只有在不提供“header-mapper”引用的情况下,才能提供此属性。
此列表中的值也可以是要与头名称匹配的简单模式(例如`"*"`或`"thing1*, thing2"`或`"*thing1"`)。| |**5**|将`MessageHeaders`的名称以逗号分隔的列表映射到 AMQP 回复消息的 AMQP 消息属性中。
所有标准标题(例如`contentType`)都映射到 AMQP 消息属性,当用户定义的头被映射到’headers’属性时。
只有在不提供’header-mapper’引用的情况下才能提供此属性。
此列表中的值也可以是简单的模式,以便与头名称进行匹配(例如,`"*"`或`"foo*, bar"`或`"*foo"`)。| |**6**|需要回复消息的消息通道。
可选。| |**7**|设置底层`o.s.i.core.MessagingTemplate`上的`receiveTimeout`,用于接收来自回复通道的消息。
如果未指定,此属性默认为`1000`(1 秒)。
仅当容器线程在发送回复之前发送到另一个线程时才适用。| |**8**|自定义的`AmqpTemplate` Bean 引用(对发送的回复消息有更多的控制)。
可以提供`RabbitTemplate`的替代实现。| |**9**|`replyTo``o.s.amqp.core.Address`当`requestMessage`没有`replyTo`属性时使用的
如果未指定此选项,则不提供`amqp-template`属性,在请求消息中不存在`replyTo`属性,并抛出
an`IllegalStateException`,因为无法路由答复。
如果未指定此选项,并提供了外部`amqp-template`,没有引发异常。
你必须指定此选项,或者在该模板上配置默认的`exchange`和`routingKey`,
如果你预计请求消息中不存在`replyTo`属性的情况。| 请参阅[入站通道适配器](#amqp-inbound-channel-adapter)中关于配置`listener-container`属性的注释。 从版本 5.5 开始,`AmqpInboundChannelAdapter`可以配置`org.springframework.amqp.rabbit.retry.MessageRecoverer`策略,当内部调用重试操作时,该策略在`RecoveryCallback`中使用。有关更多信息,请参见`setMessageRecoverer()`Javadocs。 #### 批处理消息 见[批处理消息](#amqp-debatching)。 ### 入站端点确认模式 默认情况下,入站端点使用`AUTO`确认模式,这意味着当下游集成流完成(或者通过使用`QueueChannel`或`ExecutorChannel`将消息传递给另一个线程)时,容器会自动确认消息。将模式设置为`NONE`将配置消费者,使得完全不使用确认(代理在消息发送后立即自动确认消息)。将模式设置为`MANUAL`,可以让用户代码在处理过程中的另一个点确认消息。为了支持这一点,在此模式下,端点在`amqp_channel`和`amqp_deliveryTag`标题中分别提供`Channel`和`deliveryTag`。 你可以在`Channel`上执行任何有效的 Rabbit 命令,但通常只使用`basicAck`和`basicNack`(或`basicReject`)。为了不干扰容器的操作,你不应该保留对通道的引用,并且仅在当前消息的上下文中使用它。 | |由于`Channel`是对“live”对象的引用,因此不能序列化它,并且如果消息被持久化,它就会丢失。| |---|---------------------------------------------------------------------------------------------------------------------| 下面的示例展示了如何使用`MANUAL`确认: ``` @ServiceActivator(inputChannel = "foo", outputChannel = "bar") public Object handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception { // Do some processing if (allOK) { channel.basicAck(deliveryTag, false); // perhaps do some more processing } else { channel.basicNack(deliveryTag, false, true); } return someResultForDownStreamProcessing; } ``` ### 出站端点 以下出站端点有许多类似的配置选项。从版本 5.2 开始,添加了`confirm-timeout`。通常,当启用了 Publisher Confirms 时,代理将快速返回一个 ACK(或 NACK),该 ACK 将被发送到相应的通道。如果在接收到确认之前关闭了通道,则 Spring AMQP 框架将合成 NACK。“丢失”ACK 永远不会发生,但是,如果你设置了此属性,则端点将定期检查它们,并在时间过去而未收到确认的情况下合成 NACK。 ### 出站通道适配器 下面的示例显示了 AMQP 出站通道适配器的可用属性: Java DSL ``` @Bean public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate, MessageChannel amqpOutboundChannel) { return IntegrationFlows.from(amqpOutboundChannel) .handle(Amqp.outboundAdapter(amqpTemplate) .routingKey("queue1")) // default exchange - route to queue 'queue1' .get(); } ``` Java ``` @Bean @ServiceActivator(inputChannel = "amqpOutboundChannel") public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) { AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate); outbound.setRoutingKey("queue1"); // default exchange - route to queue 'queue1' return outbound; } @Bean public MessageChannel amqpOutboundChannel() { return new DirectChannel(); } ``` XML ``` (20) ``` |**1** |此适配器的唯一 ID。
可选。| |------|| |**2** |消息通道,其中的消息应该被发送到有他们转换和发布到一个 AMQP 交换。
需要。| |**3** |Bean 参考配置的 AMQP 模板.
可选(默认为`amqpTemplate`)。| |**4** |将消息发送到的 AMQP 交换的名称。
如果不提供,则将消息发送到默认的无名称交换。
与“exchange-name-expression”互斥。
可选。| |**5** |一种 SPEL 表达式,用于确定将消息作为根对象发送到的 AMQP 交换的名称。
如果不提供,则将消息发送到默认的无名称交换。
与“exchange-name”互斥。
可选。| |**6** |当注册了多个使用者时,此使用者的顺序,从而启用负载平衡和故障转移。
可选(默认为`Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]`)。| |**7** |发送消息时使用的固定路由键。
默认情况下,这是一个空的`String`。
与“routing-key-expression”互斥。
可选。| |**8** |一种 SPEL 表达式,用于确定发送消息时要使用的路由密钥,并将消息作为根对象(例如,“payload.key”)。
默认情况下,这是一个空的`String`。
与“routing-key”互斥。
可选。| |**9** |消息的默认传递模式:`PERSISTENT`或`NON_PERSISTENT`。
如果`header-mapper`设置了传递模式,则重写
如果存在 Spring 集成消息头`amqp_deliveryMode`,`DefaultHeaderMapper`设置该值。
如果没有提供此属性,而 header mapper 没有设置该属性,则默认值取决于`RabbitTemplate`所使用的底层 Spring amqp`MessagePropertiesConverter`。
如果根本没有进行自定义,则默认值是`PERSISTENT`。
可选的。| |**10**|定义相关数据的表达式。
当提供时,这将配置底层 AMQP 模板以接收发布者确认。
需要专用的`RabbitTemplate`和`CachingConnectionFactory`,其`publisherConfirms`属性设置为`true`,当接收到发布者确认并提供相关数据时,
,它被写入`confirm-ack-channel`或`confirm-nack-channel`,这取决于确认类型。
确认的有效负载是相关数据,正如这个表达式所定义的。
消息有一个’amqp\_publishconfirm’头,它被设置为`true`(`ack`)或`false`(`nack`)。
示例:`headers['myCorrelationData']`而`payload`.
版本 4.1 引入了`amqp_publishConfirmNackCause`消息头。
它包含用于发布商确认的“nack”的`cause`。
从版本 4.2 开始,如果表达式解析为`Message`实例(例如`#this`),在`ack`/`nack`通道上发出的消息是基于该消息,并添加了附加的报头。
以前,无论类型如何,都以相关数据作为其有效负载来创建新的消息。
还请参见[发布服务器确认和返回的替代机制](#alternative-confirms-returns)。
可选。| |**11**|positive(`ack`)publisher 确认发送到的通道。
有效负载是由`confirm-correlation-expression`定义的相关数据。
如果表达式是`#root`或`#this`,则消息是从原始消息生成的,将`amqp_publishConfirm`标头设置为`true`。
还请参见[发布服务器确认和返回的替代机制](#alternative-confirms-returns)。
可选(默认值为`nullChannel`)。| |**12**|负(`nack`)发布者确认发送到的通道。
有效负载是由`confirm-correlation-expression`定义的相关数据(如果没有`ErrorMessageStrategy`配置)。
如果表达式是`#root`或`#this`,则消息是从原始消息构建的,将`amqp_publishConfirm`标头设置为`false`。
当存在`ErrorMessageStrategy`时,消息是带有`ErrorMessage`有效载荷的
消息。[发布服务器确认和返回的替代机制](#alternative-confirms-returns)还请参见
可选(默认值为`nullChannel`)。| |**13**|设置好后,如果在毫秒内没有收到发布者确认,则适配器将合成一个否定确认。
Pending 确认每检查 50% 的值,因此,一个 NACK 的实际发送时间将在这个值的 1 倍到 1.5 倍之间。
还请参见[发布服务器确认和返回的替代机制](#alternative-confirms-returns)。
默认为零(不会生成 NACK)。| |**14**|当设置为 true 时,调用线程将阻塞,等待发布者确认。
这需要为确认和`confirm-correlation-expression`配置一个`MessageHandlingException`。
线程将阻塞长达`confirm-timeout`(默认情况下为 5 秒)。如果发生超时,
,将抛出`MessageTimeoutException`。
如果启用返回并返回消息,或者在等待确认时发生任何其他异常,将抛出`MessageHandlingException`,并附带相应的消息。| |**15**|返回的消息被发送到的通道。
当提供时,底层的 AMQP 模板被配置为将不可交付的消息返回给适配器。
当没有`ErrorMessageStrategy`配置时,消息是根据从 AMQP 接收到的数据构造的,具有以下附加头:`amqp_returnReplyCode`,`amqp_returnReplyText`,`amqp_returnExchange`,`amqp_returnRoutingKey`。
当存在`ErrorMessageStrategy`时,消息是带有`ReturnedAmqpMessageException`有效载荷的`ErrorMessage`。
另请参见[发布服务器确认和返回的替代机制](#alternative-confirms-returns)。
可选。| |**16**|对`ErrorMessageStrategy`实现的引用,用于在发送返回的或负面确认的消息时构建`ErrorMessage`实例。| |**17**|在默认情况下,只有标准的 AMQP 属性(例如`contentType`)被复制到 Spring 集成`MessageHeaders`。
任何用户定义的标题都不会通过默认的`DefaultAmqpHeaderMapper`复制到消息。
如果提供’请求-header-names’,则不允许
可选的。| |**18**|如果提供了“header-mapper”引用,则不允许
将 AMQP 头的名称从`MessageHeaders`映射到 AMQP 消息的逗号分隔的列表。
此列表中的值也可以是与头名称匹配的简单模式(例如`"*"`或`"thing1*, thing2"`或`"*thing1"`)。| |**19**|当设置为`false`时,端点将尝试在应用程序上下文初始化期间连接到代理。,
这允许“快速失败”地检测错误的配置,但如果代理关闭,也会导致初始化失败,
当`true`(默认值)时,当发送第一条消息时,连接被建立(如果它不存在,因为其他组件已经建立了它)。| |**20**|当设置为`true`时,类型为`Iterable>`的有效载荷将在单个`RabbitTemplate`调用的范围内作为离散消息在同一通道上发送。
需要`RabbitTemplate`。
时`wait-for-confirms`为真,`RabbitTemplate.waitForConfirmsOrDie()`是在消息发送后调用的。
使用事务模板,发送将在新事务或已经启动的事务(如果存在)中执行。| | |return-channel

使用`return-channel`需要一个`RabbitTemplate`,其`mandatory`属性设置为`true`,而`CachingConnectionFactory`属性设置为`true`。
当使用带有返回的多个出站端点时,每个端点都需要一个单独的`RabbitTemplate`。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ### 出站网关 下面的清单显示了 AMQP 出站网关的可能属性: Java DSL ``` @Bean public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) { return f -> f.handle(Amqp.outboundGateway(amqpTemplate) .routingKey("foo")) // default exchange - route to queue 'foo' .get(); } @MessagingGateway(defaultRequestChannel = "amqpOutbound.input") public interface MyGateway { String sendToRabbit(String data); } ``` Java ``` @Bean @ServiceActivator(inputChannel = "amqpOutboundChannel") public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) { AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate); outbound.setExpectReply(true); outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo' return outbound; } @Bean public MessageChannel amqpOutboundChannel() { return new DirectChannel(); } @MessagingGateway(defaultRequestChannel = "amqpOutboundChannel") public interface MyGateway { String sendToRabbit(String data); } ``` XML ``` (19) ``` |**1** |此适配器的唯一 ID。
可选。| |------|| |**2** |将消息发送到的消息通道将其转换并发布到 AMQP Exchange。
required。| |**3** |Bean 参考配置的 AMQP 模板.
可选(默认为`amqpTemplate`)。| |**4** |将消息发送到的 AMQP Exchange 的名称。
如果不提供,则将消息发送到默认的 CXChange。
与“exchange-name-expression”互斥。
可选。| |**5** |一种 SPEL 表达式,用于确定 AMQP 交换的名称,将消息作为根对象发送到该消息。
如果不提供,则将消息发送到默认的无名称交换。
与“exchange-name”互斥。
可选。| |**6** |当注册了多个使用者时,此使用者的顺序,从而启用负载平衡和故障转移。
可选(默认为`Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]`)。| |**7** |从 AMQP 队列接收并转换后,应将回复发送到的消息通道。
可选。| |**8** |网关在将回复消息发送到`reply-channel`时等待的时间。
只有当`reply-channel`可以阻塞时,才适用于此。例如,容量限制为当前已满的`QueueChannel`。
默认值为无穷大。| |**9** |当`true`时,如果在`AmqpTemplate’s `replyTimeout` property.
Defaults to `true` 内没有收到回复消息,网关将抛出一个异常。| |**10**|在发送消息时使用的`routing-key`。
默认情况下,这是一个空的`String`。
与’routing-key-expression’互斥。
可选。| |**11**|一种 SPEL 表达式,用于确定发送消息时要使用的`routing-key`,并将消息作为根对象(例如,“payload.key”)。
默认情况下,这是一个空的`String`。
与“routing-key”互斥。
可选的。| |**12**|消息的默认传递模式:`PERSISTENT`或`NON_PERSISTENT`。
如果`header-mapper`设置了传递模式,则重写
如果存在 Spring 集成消息头`amqp_deliveryMode`,`DefaultHeaderMapper`设置该值。
如果没有提供此属性,而 header mapper 没有设置该属性,则默认值取决于`RabbitTemplate`所使用的底层 Spring amqp`MessagePropertiesConverter`。
如果根本没有进行自定义,则默认值是`PERSISTENT`。
可选的。| |**13**|由于版本 4.2.
定义相关数据的表达式。
当提供时,这将配置底层 AMQP 模板以接收发布者确认。
需要一个专用的`RabbitTemplate`和一个`CachingConnectionFactory`,将`publisherConfirms`属性设置为`true`,当接收到发布者确认并提供相关数据时,
,它被写入`confirm-ack-channel`或`confirm-nack-channel`,这取决于确认类型。
确认的有效负载是相关数据,正如这个表达式所定义的。
消息的头’amqp\_publishconfirm’设置为`true`(`ack`)或`false`(`nack`)。
用于`nack`确认, Spring 集成提供了一个额外的头`amqp_publishConfirmNackCause`。
示例:`headers['myCorrelationData']`和`payload`。
如果表达式解析为`Message`实例(例如`#this`),则在
通道上发出的消息
是基于该消息的,并添加了额外的头`nack`。无论类型如何,都将创建一个新消息,并将相关数据作为其有效负载。
还请参见[发布服务器确认和返回的替代机制](#alternative-confirms-returns)。
可选。| |**14**|正的(`ack`)发布者确认发送到的通道。
有效负载是由`confirm-correlation-expression`定义的相关数据。
如果表达式是`#root`或`#this`,则消息是根据原始消息构建的,将`amqp_publishConfirm`标头设置为`true`。
还请参见[发布服务器确认和返回的替代机制](#alternative-confirms-returns)。
可选(默认值为`nullChannel`)。| |**15**|负(`nack`)发布者确认发送到的通道。
有效负载是由`confirm-correlation-expression`定义的相关数据(如果没有`ErrorMessageStrategy`配置)。
如果表达式是`#root`或`#this`,则消息是根据原始消息构建的,当`amqp_publishConfirm`标头设置为`false`.
时,当存在`ErrorMessageStrategy`时,消息是带有`NackedAmqpMessageException`有效载荷的
消息。[发布服务器确认和返回的替代机制](#alternative-confirms-returns)另请参见[发布服务器确认和返回的替代机制](#alternative-confirms-returns)。
可选(默认值为`nullChannel`)。| |**16**|设置好后,如果在毫秒内没有收到发布者确认,网关将合成一个否定确认,
Pending 确认每检查 50% 的值,因此,一个 NACK 的实际发送时间将在这个值的 1 倍到 1.5 倍之间。
默认为零(不会生成 NACK)。| |**17**|返回的消息被发送到的通道。
当提供时,底层的 AMQP 模板被配置为将不可交付的消息返回给适配器。
当没有`ErrorMessageStrategy`配置时,消息是根据从 AMQP 接收到的数据构造的,具有以下附加头:`amqp_returnReplyCode`,`amqp_returnReplyText`,`amqp_returnExchange`,和`amqp_returnRoutingKey`。
当存在`ErrorMessageStrategy`时,消息是带有`ErrorMessage`有效载荷的
。也参见[发布服务器确认和返回的替代机制](#alternative-confirms-returns)。
可选的。| |**18**|对`ErrorMessageStrategy`实现的引用,用于在发送返回的或负面确认的消息时构建`ErrorMessage`实例。| |**19**|当设置为`false`时,端点将尝试在应用程序上下文初始化期间连接到代理。,
如果代理停机,通过记录错误消息,可以“快速失败”地检测错误配置,
当`true`(默认值),当发送第一条消息时,连接被建立(如果它不存在,因为其他组件已经建立了它)。| | |return-channel

使用`return-channel`需要一个`RabbitTemplate`,其`mandatory`属性设置为`true`,而`CachingConnectionFactory`属性设置为`true`。
当使用多个带返回的出站端点时,每个端点都需要一个单独的`RabbitTemplate`。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | |底层的`AmqpTemplate`具有五秒的默认`replyTimeout`。
如果你需要更长的超时,则必须在`template`上配置它。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------| 请注意,出站适配器和出站网关配置之间的唯一区别是`expectReply`属性的设置。 ### 异步出站网关 在上一节中讨论的网关是同步的,因为发送线程被挂起,直到收到答复(或发生超时)。 Spring 集成版本 4.3 增加了一个异步网关,它使用 Spring AMQP 中的`AsyncRabbitTemplate`。当发送消息时,线程在发送操作完成后立即返回,当收到消息时,响应将在模板的侦听器容器线程上发送。当在 Poller 线程上调用网关时,这可能是有用的。该线程已被释放,并可用于框架中的其他任务。 下面的清单显示了 AMQP 异步出站网关的可能配置选项: Java DSL ``` @Configuration public class AmqpAsyncApplication { @Bean public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) { return f -> f .handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate) .routingKey("queue1")); // default exchange - route to queue 'queue1' } @MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input") public interface MyGateway { String sendToRabbit(String data); } } ``` Java ``` @Configuration public class AmqpAsyncConfig { @Bean @ServiceActivator(inputChannel = "amqpOutboundChannel") public AsyncAmqpOutboundGateway amqpOutbound(AsyncRabbitTemplate asyncTemplate) { AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate); outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo' return outbound; } @Bean public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate, SimpleMessageListenerContainer replyContainer) { return new AsyncRabbitTemplate(rabbitTemplate, replyContainer); } @Bean public SimpleMessageListenerContainer replyContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf); container.setQueueNames("asyncRQ1"); return container; } @Bean public MessageChannel amqpOutboundChannel() { return new DirectChannel(); } } ``` XML ``` (18) ``` |**1** |此适配器的唯一 ID。
可选的。| |------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |**2** |消息通道中的消息应该被发送到,以便将它们转换并发布到 AMQP 交换。
需要。| |**3** |Bean 参考配置的`AsyncRabbitTemplate`.
可选的(它默认为`asyncRabbitTemplate`)。| |**4** |将消息发送到哪个 AMQP Exchange 的名称。
如果不提供,则将消息发送到缺省的、无名称的 Exchange。
与“exchange-name-expression”互斥。
可选。| |**5** |一种 SPEL 表达式,用于确定将消息作为根对象发送到的 AMQP 交换的名称。
如果不提供,则将消息发送到默认的无名称交换。
与“exchange-name”互斥。
可选。| |**6** |当注册了多个使用者时,此使用者的顺序,从而启用负载平衡和故障转移。
可选(它默认为`Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]`)。| |**7** |从 AMQP 队列接收并转换后,应将回复发送到的消息通道。
可选。| |**8** |网关在将回复消息发送到`reply-channel`时等待的时间。
只有当`reply-channel`可以阻塞时,才适用于此,例如容量限制为当前已满的`QueueChannel`。
默认值为无穷大。| |**9** |当`AsyncRabbitTemplate’s `receiveTimeout` property and this setting is `true`, the gateway sends an error message to the inbound message’s `errorchannel` header.
When no reply message is received within the `AsyncrabbitTemplate 的`receiveTimeout`属性中没有接收到回复消息,并且此设置为`false`,网关将向默认的`errorChannel`发送一条错误消息。
默认为`true`。| |**10**|发送消息时使用的路由键。
默认情况下,这是一个空的`String`。
与“routing-key-expression”互斥。
可选。| |**11**|一种 SPEL 表达式,用于确定发送消息时使用的路由键,
将消息作为根对象(例如,“payload.key”)。
默认情况下,这是一个空的`String`。
与“routing-key”互斥。
可选。| |**12**|消息的默认传递模式:`PERSISTENT`或`NON_PERSISTENT`。
如果`header-mapper`设置了传递模式,则重写
如果存在 Spring 集成消息头(`amqp_deliveryMode`),`DefaultHeaderMapper`设置该值。
如果未提供此属性,而 header mapper 未对其进行设置,则默认值取决于`RabbitTemplate`所使用的底层 Spring amqp`MessagePropertiesConverter`。
如果未进行自定义,则默认值为`PERSISTENT`。
可选。| |**13**|定义相关数据的表达式。
当提供时,这将配置底层 AMQP 模板以接收发布者确认。
需要专用的`RabbitTemplate`和`CachingConnectionFactory`,其`publisherConfirms`属性设置为`true`。
当接收到发布者确认并提供相关数据时,确认被写入`confirm-ack-channel`或`confirm-nack-channel`,这取决于确认类型。
确认的有效载荷是由该表达式定义的相关数据,并且消息的“AMQP\_PublishConference”头文件设置为`true`(`ack`)或`false`(`nack`)。
对于`nack`实例,提供了一个额外的头文件。
示例:,`headers['myCorrelationData']`如果表达式,则将这个实例分解为“<>,如”gt=“658”。在`ack`/`nack`通道上发出的消息是基于该消息,并添加了附加的头。
还请参见[发布服务器确认和返回的替代机制](#alternative-confirms-returns)。
可选。| |**14**|正的(`ack`)发布者确认被发送到的通道。
有效负载是由`confirm-correlation-expression`定义的相关数据。
要求底层`enableConfirms`将其`enableConfirms`属性设置为
还请参见
<676"/>r=”可选的(默认值是“gt r=”gt r=“<677"/>)。| |**15**|由于版本 4.2.
向其发送阴性(`nack`)发布者确认信息的通道。
有效载荷是由`confirm-correlation-expression`定义的相关数据。`AsyncRabbitTemplate`要求底层`enableConfirms`将其`enableConfirms`属性设置为
。还请参见<692"/>r=“/>r=”693"/>(默认值是可选的。| |**16**|设置好后,如果在毫秒内没有收到发布者确认,网关将合成一个否定确认,
Pending 确认每检查 50% 的值,因此,一个 NACK 的实际发送时间将在这个值的 1 倍到 1.5 倍之间。
还请参见[发布服务器确认和返回的替代机制](#alternative-confirms-returns)。
默认为零(不会生成 NACK)。| |**17**|返回的消息被发送到的通道。
当提供时,底层的 AMQP 模板被配置为将不可交付的消息返回到网关。
消息是根据从 AMQP 接收的数据构造的,具有以下附加头:`amqp_returnReplyCode`,`amqp_returnReplyText`,`amqp_returnExchange`,而`amqp_returnRoutingKey`.
要求底层`AsyncRabbitTemplate`将其`mandatory`属性设置为`true`。
还请参见[发布服务器确认和返回的替代机制](#alternative-confirms-returns)。
可选。| |**18**|当设置为`false`时,端点将在应用程序上下文初始化期间尝试连接到代理,
这样做可以通过记录代理停机时的错误消息来“快速失败”地检测错误配置,
当`true`(默认值)时,在发送第一条消息时建立连接(如果它不存在,因为其他组件建立了
它)。| 有关更多信息,请参见[异步服务激活器](./service-activator.html#async-service-activator)。 | |RabbitTemplate

当你使用确认和返回时,我们建议将连接到`RabbitTemplate`的`AsyncRabbitTemplate`中的
专用。否则,可能会遇到意想不到的副作用。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ### 发布者确认和返回的替代机制 当连接工厂被配置为 Publisher 确认和返回时,上面的小节讨论了用于异步接收确认和返回的消息通道的配置。从版本 5.4 开始,有一个额外的机制,通常更容易使用。 在这种情况下,不要配置`confirm-correlation-expression`或确认和返回通道。相反,在`AmqpHeaders.PUBLISH_CONFIRM_CORRELATION`头中添加一个`CorrelationData`实例;然后,你可以通过在已发送消息的`CorrelationData`实例中检查将来的状态来等待结果。在将来完成之前,将始终填充`returnedMessage`字段(如果返回了消息)。 ``` CorrelationData corr = new CorrelationData("someId"); // <--- Unique "id" is required for returns someFlow.getInputChannel().send(MessageBuilder.withPayload("test") .setHeader("rk", "someKeyThatWontRoute") .setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr) .build()); ... try { Confirm Confirm = corr.getFuture().get(10, TimeUnit.SECONDS); Message returned = corr.getReturnedMessage(); if (returned !- null) { // message could not be routed } } catch { ... } ``` 为了提高性能,你可能希望发送多条消息,然后等待确认,而不是一次发送一条消息。返回的消息是转换后的原始消息;你可以使用所需的任何附加数据进行子类`CorrelationData`。 ### 入站消息转换 到达通道适配器或网关的入站消息将使用消息转换器转换为`spring-messaging``Message`有效负载。默认情况下,使用`SimpleMessageConverter`,它处理 Java 序列化和文本。默认情况下,标头使用`DefaultHeaderMapper.inboundMapper()`进行映射。如果发生了转换错误,并且没有定义错误通道,那么异常将被抛到容器中,并由侦听器容器的错误处理程序处理。默认的错误处理程序将转换错误视为致命错误,并且消息将被拒绝(如果队列是这样配置的,则将其路由到死信交换)。如果定义了错误通道,则`ErrorMessage`有效负载是一个`ListenerExecutionFailedException`,带有属性`failedMessage`(无法转换的 Spring AMQP 消息)和`cause`。如果容器`AcknowledgeMode`是`AUTO`(默认值),并且错误流在不抛出异常的情况下消耗错误,则将确认原始消息。如果错误流抛出一个异常,那么异常类型将与容器的错误处理程序一起,决定是否重新请求消息。如果容器配置为`AcknowledgeMode.MANUAL`,则有效负载是带有附加属性`channel`和`deliveryTag`的`ManualAckListenerExecutionFailedException`。这使得错误流能够为消息调用`basicAck`或`basicNack`(或`basicReject`)来控制其配置。 ### 出站消息转换 Spring AMQP1.4 引入了`ContentTypeDelegatingMessageConverter`,其中实际的转换器是基于传入的内容类型选择消息属性的。这可以由入站端点使用。 从 Spring 集成版本 4.3 开始,你也可以在出站端点上使用`ContentTypeDelegatingMessageConverter`,并使用`contentType`头指定使用哪个转换器。 下面的示例配置了`ContentTypeDelegatingMessageConverter`,默认的转换器是`SimpleMessageConverter`(用于处理 Java 序列化和纯文本),以及一个 JSON 转换器: ``` ``` 将消息发送到`ctRequestChannel`,并将`contentType`头设置为`application/json`,将导致 JSON 转换器被选中。 这适用于出站通道适配器和网关。 | |从版本 5.0 开始,添加到出站消息的`MessageProperties`的标题永远不会被映射的标题覆盖(默认情况下)。
以前,只有当消息转换器是`ContentTypeDelegatingMessageConverter`(在这种情况下,头是首先映射的,以便可以选择适当的转换器)。,对于其他转换器,例如`SimpleMessageConverter`,映射的标头覆盖了转换器添加的任何标头。
当出站消息有一些剩余的`contentType`标头(可能来自入站通道适配器)时,这会导致问题。并且正确的出站`contentType`被错误地覆盖。
解决方法是在将消息发送到出站端点之前使用头部过滤器删除头部。

但是,在某些情况下,需要使用以前的行为—例如,当`String`包含 JSON 的有效负载时,`SimpleMessageConverter`不知道内容,并将`contentType`消息属性设置为`text/plain`,但你的应用程序想要重写通过将发送到出站端点的消息的`contentType`头设置为`application/json`。
`ObjectToJsonTransformer`确实做到了这(默认情况下)。

现在在出站通道适配器和网关上(以及在 AMQP 支持的通道上)有一个名为`headersMappedLast`的属性。
将其设置为`true`,以恢复覆盖转换器添加的属性的行为。

从 5.1.9 版本开始,类似的`replyHeadersMappedLast`是为`AmqpInboundGateway`提供的,当我们生成一个答复并想要覆盖由转换器填充的标题时。
有关更多信息,请参见其 Javadocs。| |---|| ### 出站用户 ID Spring AMQP 版本 1.6 引入了一种机制,以允许用于出站消息的默认用户 ID 的规范。始终可以设置`AmqpHeaders.USER_ID`标头,它现在优先于默认值。这对消息接收者可能很有用。对于入站消息,如果消息发布者设置了属性,则该属性在`AmqpHeaders.RECEIVED_USER_ID`标头中可用。注意 RabbitMQ[验证用户 ID 是连接的实际用户 ID,或者该连接允许模拟](https://www.rabbitmq.com/validated-user-id.html)。 要为出站消息配置默认的用户 ID,请在`RabbitTemplate`上对其进行配置,并将出站适配器或网关配置为使用该模板。类似地,要在回复中设置用户 ID 属性,请在入站网关中插入一个适当配置的模板。有关更多信息,请参见[Spring AMQP documentation](https://docs.spring.io/spring-amqp/reference/html/_reference.html#template-user-id)。 ### 延迟消息交换 Spring AMQP 支持[RabbitMQ 延迟消息交换插件](https://docs.spring.io/spring-amqp/reference/html/#delayed-message-exchange)。对于入站消息,`x-delay`标头映射到`AmqpHeaders.RECEIVED_DELAY`标头。设置`AMQPHeaders.DELAY`报头会在出站消息中设置相应的`x-delay`报头。你还可以在出站端点上指定`delay`和`delayExpression`属性(当使用 XML 配置时,`delay-expression`)。这些属性优先于`AmqpHeaders.DELAY`标头。 ### AMQP 支持的消息通道 有两个消息通道实现可用。一种是点对点,另一种是发布订阅。这两个通道都为底层`AmqpTemplate`和`SimpleMessageListenerContainer`提供了广泛的配置属性(如本章前面所示的通道适配器和网关)。然而,我们在这里展示的示例具有最小的配置。探索 XML 模式以查看可用的属性。 点对点通道可能看起来像以下示例: ``` ``` 在封面下,前面的示例将导致声明一个名为`Queue`的`si.p2pChannel`,并将此通道发送到该`Queue`(从技术上讲,通过发送到与该`Queue`的名称匹配的路由密钥直接交换)。此通道还在`Queue`上注册消费者。如果你希望通道是“pollable”而不是消息驱动的,请提供值为`message-driven`的`false`的`message-driven`标志,如下例所示: ``` ``` 发布-订阅频道可能如下所示: ``` ``` 在封面下,前面的示例将导致声明一个名为`si.fanout.pubSubChannel`的扇出交换,并且此通道将发送到该扇出交换。该通道还声明了一个名为独占的、自动删除的非持久性`Queue`的服务器,并将其绑定到 FanOut Exchange,同时在该`Queue`上注册一个消费者以接收消息。对于发布-订阅-通道,没有“pollable”选项。它必须是消息驱动的。 从版本 4.1 开始,AMQP 支持的消息通道(与`channel-transacted`结合使用)支持`template-channel-transacted`,以分离`transactional`的`transactional`配置和`RabbitTemplate`。请注意,以前,`channel-transacted`默认为`true`。现在,默认情况下,对于`AbstractMessageListenerContainer`,它是`false`。 在版本 4.3 之前,AMQP 支持的通道只支持带有`Serializable`有效负载和报头的消息。整个消息被转换(序列化)并发送到 RabbitMQ。现在,你可以将`extract-payload`属性(或者在使用 Java 配置时`setExtractPayload()`)设置为`true`。当此标志`true`时,将以与使用通道适配器类似的方式转换消息有效负载并映射消息头。这种安排使 AMQP 支持的通道可用于不可序列化的有效负载(可能与另一个消息转换器一起使用,例如`Jackson2JsonMessageConverter`)。有关默认映射标头的更多信息,请参见[AMQP 消息头](#amqp-message-headers)。你可以通过提供使用`outbound-header-mapper`和`inbound-header-mapper`属性的定制映射器来修改映射。你现在还可以指定`default-delivery-mode`,它用于在没有`amqp_deliveryMode`报头时设置交付模式。默认情况下, Spring AMQP`MessageProperties`使用`PERSISTENT`交付模式。 | |与其他支持持久性的通道一样,支持 AMQP 的通道旨在提供消息持久性,以避免消息丢失。
它们不打算将工作分发给其他对等应用程序。
为此,请使用通道适配器。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | |从版本 5.0 开始,poller 通道现在为指定的`receiveTimeout`阻塞 poller 线程(默认值为 1 秒)。
以前,与其他`PollableChannel`实现不同,如果没有可用的消息,线程将立即返回到调度程序,与接收超时无关。
拦截比使用`basicGet()`检索消息(没有超时)要贵一些,因为必须创建一个使用者来接收每个消息。
要恢复以前的行为,请将 poller 的`receiveTimeout`设置为 0。| |---|| #### 使用 Java 配置进行配置 下面的示例展示了如何使用 Java 配置来配置通道: ``` @Bean public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) { AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(); factoryBean.setConnectionFactory(connectionFactory); factoryBean.setQueueName("foo"); factoryBean.setPubSub(false); return factoryBean; } @Bean public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) { AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true); factoryBean.setConnectionFactory(connectionFactory); factoryBean.setQueueName("bar"); factoryBean.setPubSub(false); return factoryBean; } @Bean public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) { AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true); factoryBean.setConnectionFactory(connectionFactory); factoryBean.setQueueName("baz"); factoryBean.setPubSub(false); return factoryBean; } ``` #### 使用 Java DSL 进行配置 下面的示例展示了如何使用 Java DSL 配置通道: ``` @Bean public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) { return IntegrationFlows.from(...) ... .channel(Amqp.pollableChannel(connectionFactory) .queueName("foo")) ... .get(); } @Bean public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) { return IntegrationFlows.from(...) ... .channel(Amqp.channel(connectionFactory) .queueName("bar")) ... .get(); } @Bean public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) { return IntegrationFlows.from(...) ... .channel(Amqp.publishSubscribeChannel(connectionFactory) .queueName("baz")) ... .get(); } ``` ### AMQP 消息头 #### 概述 Spring 集成 AMQP 适配器自动映射所有 AMQP 属性和标头。(这是从 4.3-以前,只有标准标题被映射的变化)。默认情况下,这些属性通过使用[`DefaultAmqpHeaderMapper`](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/amqp/support/defaultamqpheadermapper.html)来复制到 Spring Integration`MessageHeaders`。 你可以传入你自己的特定于 AMQP 的头映射器的实现,因为适配器具有支持这样做的属性。 AMQP[`MessageProperties`](https://DOCS. Spring.io/ Spring-amqp/api/org/springframework/amqp/core/messageproperties.html)中的任何用户定义的头都被复制到或从 AMQP 消息复制,除非`requestHeaderNames`或`replyHeaderNames`属性的`DefaultAmqpHeaderMapper`明确否定。默认情况下,对于出站映射器,不映射`x-*`标头。关于原因,请参见本节后面出现的[caution](#header-copy-caution)。 要覆盖默认值并恢复到 pre-4.3 行为,请在属性中使用`STANDARD_REQUEST_HEADERS`和`STANDARD_REPLY_HEADERS`。 | |在映射用户定义的标头时,值还可以包含要匹配的简单通配符模式(例如`thing*`或`*thing`)。
`*`匹配所有标头。| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------| 从版本 4.1 开始,`AbstractHeaderMapper`(a`DefaultAmqpHeaderMapper`超类)允许将`NON_STANDARD_HEADERS`令牌配置为`requestHeaderNames`和`replyHeaderNames`属性(除了现有的`STANDARD_REQUEST_HEADERS`和`STANDARD_REPLY_HEADERS`),以映射所有用户定义的标题。 `org.springframework.amqp.support.AmqpHeaders`类标识了`DefaultAmqpHeaderMapper`所使用的默认标头: * `amqp_appId` * `amqp_clusterId` * `amqp_contentEncoding` * `amqp_contentLength` * `content-type`(参见[the`contentType`header](#amqp-content-type)) * `amqp_correlationId` * `amqp_delay` * `amqp_deliveryMode` * `amqp_deliveryTag` * `amqp_expiration` * `amqp_messageCount` * `amqp_messageId` * `amqp_receivedDelay` * `amqp_receivedDeliveryMode` * `amqp_receivedExchange` * `amqp_receivedRoutingKey` * `amqp_redelivered` * `amqp_replyTo` * `amqp_timestamp` * `amqp_type` * `amqp_userId` * `amqp_publishConfirm` * `amqp_publishConfirmNackCause` * `amqp_returnReplyCode` * `amqp_returnReplyText` * `amqp_returnExchange` * `amqp_returnRoutingKey` * `amqp_channel` * `amqp_consumerTag` * `amqp_consumerQueue` | |正如在本节前面提到的,使用`*`的标头映射模式是复制所有标头的一种常见方法,
但是,这可能会有一些意想不到的副作用,因为某些 RabbitMQ 专有属性/标头也会被复制。
例如,当你使用[federation](https://www.rabbitmq.com/federated-exchanges.html)时,接收到的消息可能有一个名为`x-received-from`的属性,其中包含发送消息的节点,
如果使用通配符`*`进行入站网关上的请求和回复头映射,则会复制此头,这可能会导致联合出现一些问题,
此回复消息可能会被联合回发送代理,后者可能会认为消息正在循环,因此会静默地删除它,
如果你希望使用通配符标头映射的便利,你可能需要在下游流中过滤掉一些头。
例如,为了避免将`x-received-from`头复制回答复,你可以在将答复发送到 AMQP 入站网关之前使用``。
或者,你可以显式地列出实际需要映射的那些属性,而不是使用通配符。
由于这些原因,对于入站消息,映射器(默认情况下)不会映射任何`x-*`标头。
它也不会将标头映射到`amqp_deliveryMode`标头,为了避免该报头从入站消息传播到出站消息。
相反,此报头被映射到`amqp_receivedDeliveryMode`,这在输出上没有映射。| |---|| 从版本 4.3 开始,头映射中的模式可以通过使用`!`在模式之前被否定。被否定的模式获得优先权,所以像`STANDARD_REQUEST_HEADERS,thing1,ba*,!thing2,!thing3,qux,!thing1`这样的列表不映射`thing1`(nor`thing2`nor`thing3`)。标准标题加上`bad`和`qux`被映射。例如,当 JSON 反序列化逻辑在下游的接收器以不同的方式完成时,否定技术对于不映射 JSON 类型报头的传入消息是有用的。为此,应该为入站通道适配器/网关的头映射器配置`!json_*`模式。 | |如果你有一个以`!`开头的用户定义标头,你确实希望对其进行映射,那么你需要将其转换为`\`,如下所示:`STANDARD_REQUEST_HEADERS,\!myBangHeader`。
现在映射了名为`!myBangHeader`的标头。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | |从版本 5.1 开始,`DefaultAmqpHeaderMapper`将分别回到映射`MessageHeaders.ID`和`MessageHeaders.TIMESTAMP`到`MessageProperties.messageId`和`MessageProperties.timestamp`,如果出站消息上不存在相应的`amqp_messageId`或`amqp_timestamp`标头。
入站属性将像以前一样映射到`amqp_*`标头。
当消息消费者使用有状态重试时,填充`messageId`属性是有用的。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### 标题`contentType` 与其他头不同,`AmqpHeaders.CONTENT_TYPE`不带`amqp_`前缀;这允许在不同的技术之间透明地传递 ContentType 头。例如,发送到 RabbitMQ 队列的入站 HTTP 消息。 将`contentType`头映射到 Spring AMQP 的`MessageProperties.contentType`属性,然后将其映射到 RabbitMQ 的`content_type`属性。 在版本 5.1 之前,这个头也被映射为`MessageProperties.headers`映射中的一个条目;这是不正确的,而且,该值可能是错误的,因为底层的 Spring AMQP 消息转换器可能已经更改了内容类型。这样的更改将反映在第一类`content_type`属性中,但不会反映在 RabbitMQ 头文件映射中。入站映射忽略了 headers 映射值。`contentType`不再映射到 headers 映射中的条目。 ### 严格的消息排序 本节描述了入站和出站消息的消息排序。 ####
入站 如果需要对入站消息进行严格排序,则必须将入站侦听器容器的`prefetchCount`属性配置为`1`。这是因为,如果一条消息失败并被重新传递,它将在已有的预取消息之后到达。自 Spring AMQP 版本 2.0 以来,`prefetchCount`默认为`250`,以提高性能。严格的订货要求是以性能下降为代价的。 #### 出站 考虑以下集成流程: ``` @Bean public IntegrationFlow flow(RabbitTemplate template) { return IntegrationFlows.from(Gateway.class) .split(s -> s.delimiters(",")) .transform(String::toUpperCase) .handle(Amqp.outboundAdapter(template).routingKey("rk")) .get(); } ``` 假设我们将消息`A`、`B`和`C`发送到网关。虽然消息`A`、`B`、`C`很可能是按顺序发送的,但不能保证。这是因为模板为每次发送操作从缓存中“借用”一个通道,并且不能保证每个消息都使用相同的通道。一种解决方案是在拆分器之前启动事务,但是事务在 RabbitMQ 中是昂贵的,并且可以将性能降低数百倍。 为了以更有效的方式解决这个问题,从版本 5.1 开始, Spring 集成提供了`BoundRabbitChannelAdvice`这是一个`HandleMessageAdvice`。见[处理消息建议](./handler-advice.html#handle-message-advice)。当在拆分器之前应用时,它确保所有下游操作都在相同的通道上执行,并且可以选择地等待,直到接收到所有已发送消息的发布者确认(如果连接工厂被配置为确认)。下面的示例展示了如何使用`BoundRabbitChannelAdvice`: ``` @Bean public IntegrationFlow flow(RabbitTemplate template) { return IntegrationFlows.from(Gateway.class) .split(s -> s.delimiters(",") .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10)))) .transform(String::toUpperCase) .handle(Amqp.outboundAdapter(template).routingKey("rk")) .get(); } ``` 注意,在通知和出站适配器中使用了相同的`RabbitTemplate`(它实现了`RabbitOperations`)。该建议在模板的`invoke`方法中运行下游流,以便所有操作都在同一个通道上运行。如果提供了可选的超时,当流完成时,通知调用`waitForConfirmsOrDie`方法,如果在指定的时间内没有收到确认,该方法将抛出一个异常。 | |在下游流中不能有线程切换(`QueueChannel`,`ExecutorChannel`,以及其他)。| |---|--------------------------------------------------------------------------------------------------------| ### AMQP 样本 要对 AMQP 适配器进行实验,请查看 Spring Integration Samples Git Repository 中可用的示例,网址为[https://github.com/SpringSource/spring-integration-samples](https://github.com/spring-projects/spring-integration-samples) 目前,一个示例通过使用出站通道适配器和入站通道适配器来演示 Spring 集成 AMQP 适配器的基本功能。作为样例中的 AMQP 代理实现使用[RabbitMQ](https://www.rabbitmq.com/)。 | |为了运行示例,你需要一个正在运行的 RabbitMQ 实例。
只有基本默认值的本地安装就足够了。
有关 RabbitMQ 安装过程的详细信息,请参见[https://www.rabbitmq.com/install.html](https://www.rabbitmq.com/install.html)| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 示例应用程序启动后,在命令提示符中输入一些文本,然后将包含所输入文本的消息发送到 AMQP 队列。作为回报, Spring Integration 将检索该消息并将其打印到控制台。 下面的图像说明了 Spring 在此示例中使用的集成组件的基本集合。 ![spring integration amqp sample graph](https://docs.spring.io/spring-integration/docs/current/reference/html/images/spring-integration-amqp-sample-graph.png) 图1.AMQP 样本的 Spring 积分图