# AMQP 支持
## AMQP 支持
Spring 集成提供了用于通过使用高级消息队列协议接收和发送消息的通道适配器。
你需要在项目中包含此依赖项:
Maven
```
org.springframework.integration
spring-integration-amqp
5.5.9
```
Gradle
```
compile "org.springframework.integration:spring-integration-amqp:5.5.9"
```
可提供以下适配器:
* [入站通道适配器](#amqp-inbound-channel-adapter)
* [入站网关](#amqp-inbound-gateway)
* [出站通道适配器](#amqp-outbound-channel-adapter)
* [出站网关](#amqp-outbound-gateway)
* [异步出站网关](#amqp-async-outbound-gateway)
Spring 集成还提供了由 AMQP 交换和队列支持的点对点消息通道和发布-订阅消息通道。
为了提供 AMQP 支持, Spring 集成依赖于([Spring AMQP](https://projects.spring.io/spring-amqp)),它将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。 Spring AMQP 提供了与([Spring JMS](https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#jms))类似的语义。
Spring 鉴于所提供的 AMQP 通道适配器仅用于单向消息传递(发送或接收), Spring 集成还提供了用于请求-应答操作的入站和出站 AMQP 网关。
小贴士:你应该熟悉[reference documentation of the Spring AMQP project](https://docs.spring.io/spring-amqp/reference/html/)。它提供了关于 Spring 与 AMQP 的集成的更深入的信息,特别是与 RabbitMQ 的集成。
### 入站通道适配器
下面的清单显示了 AMQP 入站通道适配器的可能配置选项:
爪哇 DSL
```
@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "aName"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
```
爪哇
```
@Bean
public MessageChannel amqpInputChannel() {
return new DirectChannel();
}
@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
@Qualifier("amqpInputChannel") MessageChannel channel) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(channel);
return adapter;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("aName");
container.setConcurrentConsumers(2);
// ...
return container;
}
@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
```
XML
```
(27)
```
|**1** |此适配器的唯一 ID。
可选的。|
|------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2** |转换后的消息应发送到的消息通道。
需要。|
|**3** |AMQP 队列的名称(用逗号分隔的列表),应该从这些队列中消费消息。
required。|
|**4** |`MessageListenerContainer`的确认模式,
当设置为`MANUAL`时,传递标记和通道在消息头`amqp_deliveryTag`和`amqp_channel`中提供,分别。
用户应用程序负责确认。`NONE`表示没有确认(`autoAck`)。`AUTO`表示适配器的容器在下游流完成时进行确认。
可选(默认为自动)。
参见[入站端点确认模式](#amqp-inbound-ack)。|
|**5** |额外的 AOP 建议来处理与此入站通道适配器相关的横切行为。
可选的。|
|**6** |标志,指示该组件创建的通道是事务性的。
如果为真,它会告诉框架使用事务性通道,并根据结果以提交或回滚结束所有操作(发送或接收),但有一个异常,表示回滚。
可选(默认为假)。|
|**7** |指定要创建的并发消费者的数量。
默认值是`1`。
我们建议增加并发消费者的数量,以按比例分配来自队列的消息的消耗,
但是,请注意,一旦注册了多个消费者,任何订购保证都会丢失。,通常情况下,
,对于低容量队列使用一个使用者。
设置了“每个队列的使用者”时不允许使用。
可选。|
|**8** |Bean 参考 RabbitMQ.可选(默认为)。|
|**9** |错误消息应发送到的消息通道。
可选。|
|**10**|侦听器通道是否暴露于已注册的`ChannelAwareMessageListener`.
可选(默认为 true)。|
|**11**|在接收 AMQP 消息时使用的对`AmqpHeaderMapper`的引用。
可选的。
默认情况下,只有标准的 AMQP 属性(例如`contentType`)被复制到 Spring Integration`MessageHeaders`。
AMQP`MessageProperties`中的任何用户定义的标题都不会通过默认的`DefaultAmqpHeaderMapper`复制到消息中。如果提供了“request-header-names”,则不允许
。|
|**12**|要从 AMQP 请求映射到`MessageHeaders`中的 AMQP 头的名称的逗号分隔列表。
只能提供如果没有提供“header-mapper”引用。
此列表中的值也可以是与 header 名称匹配的简单模式(例如“\*”或“thing1\*,thing2”或“\*something”)。|
|**13**|引用`AbstractMessageListenerContainer`用于接收 AMQP 消息。
如果提供了此属性,则不应提供与侦听器容器配置相关的其他属性。
换句话说,通过设置此引用,你必须对侦听器容器配置承担全部责任。
唯一的例外是`MessageListener`本身。
因为这实际上是这个通道适配器实现的核心责任,所以引用的侦听器容器不能已经拥有自己的`MessageListener`。
可选的。|
|**14**|在接收 AMQP 消息时使用的`MessageConverter`。
可选。|
|**15**|在接收 AMQP 消息时使用的`MessagePropertiesConverter`。
可选。|
|**16**|指定底层`AbstractMessageListenerContainer`应该启动和停止的阶段,
启动顺序从最低到最高,关闭顺序与此相反,
默认情况下,该值为`Integer.MAX_VALUE`,这意味着这个容器开始得越晚,停止得越快。
可选的。|
|**17**|告诉 AMQP 代理在一个请求中要向每个使用者发送多少消息。
通常,你可以将这个值设置得很高,以提高吞吐量。
它应该大于或等于事务大小(请参见`tx-size`属性,
可选(默认为`1`)。|
|**18**|接收以毫秒为单位的超时。
可选(默认为`1000`)。|
|**19**|指定底层`AbstractMessageListenerContainer`的恢复尝试之间的间隔(以毫秒为单位)。
可选(默认为`5000`)。|
|**20**|如果“true”且代理上没有队列可用,则容器在启动时抛出一个致命的异常,如果在容器运行时删除了队列(在进行了三次被动声明队列的尝试之后),则停止。,容器不抛出异常并进入恢复模式,尝试根据`recovery-interval`重新启动。
可选(默认为`true`)。|
|**21**|在底层`AbstractMessageListenerContainer`停止之后和强制关闭 AMQP 连接之前等待工作人员的时间(以毫秒为单位)。
如果有工作人员在关闭信号出现时处于活动状态,只要他们能够在此超时范围内完成处理,他们就被允许完成处理。
否则,连接已关闭,消息仍未确认(如果通道是事务性的)。
可选(默认为`5000`)。|
|**22**|默认情况下,底层`AbstractMessageListenerContainer`使用`SimpleAsyncTaskExecutor`实现,该实现为每个任务启动一个新线程,并异步运行它,默认情况下,注意,此实现不重用线程。
考虑使用线程池`TaskExecutor`实现作为替代。
可选(默认为`SimpleAsyncTaskExecutor`)。|
|**23**|默认情况下,底层`AbstractMessageListenerContainer`创建了`DefaultTransactionAttribute`的新实例(它采用 EJB 方法回滚运行时,但不检查异常)。
可选(默认为`DefaultTransactionAttribute`)。|
|**24**|设置对底层`AbstractMessageListenerContainer`上的外部`PlatformTransactionManager`的引用。
事务管理器与`channel-transacted`属性一起工作。
如果在框架发送或接收消息时已经有事务在进行中并且`channelTransacted`标志是`true`,消息传递事务的提交或回滚将被推迟到当前事务结束时。
如果`channelTransacted`标志是`false`,则消息传递操作不应用事务语义(它是自动标记的)。
以获取更多信息,见[Transactions with Spring AMQP](https://docs.spring.io/spring-amqp/reference/html/%255Freference.html#%5Ftransactions)。
可选。|
|**25**|告诉`SimpleMessageListenerContainer`在单个事务中要处理多少消息(如果通道是事务性的)。
对于最佳结果,它应该小于或等于`prefetch-count`中设置的值。
当设置了“customers-per-queue”时不允许。
可选(默认为`1`)。|
|**26**|指示底层侦听器容器应该是`DirectMessageListenerContainer`,而不是默认的`SimpleMessageListenerContainer`。
有关更多信息,请参见[Spring AMQP Reference Manual](https://docs.spring.io/spring-amqp/reference/html/)。|
|**27**|当容器的`consumerBatchEnabled`是`true`时,确定适配器如何在消息有效负载中呈现一批消息。
当设置为`MESSAGES`(默认)时,有效负载是`List>`,其中每个消息都有从传入的 AMQP`Message`映射的头,并且有效负载是转换后的`body`。
当设置为`EXTRACT_PAYLOADS`时,有效负载是`List>`,其中元素是从 AMQP`Message`体转换而来的。`EXTRACT_PAYLOADS_WITH_HEADERS`类似于`EXTRACT_PAYLOADS`,但是,除此之外,每个消息的头从`MessageProperties`映射到相应索引处的`List