listen(String in) {
...
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(MessageHeaders.CONTENT_TYPE, "application/xml")
.build();
}
```
此内容类型将在`MessageProperties`中传递给转换器。默认情况下,为了向后兼容,转换器设置的任何内容类型属性在转换后都将被这个值覆盖。如果你希望重写该行为,还可以将`AmqpHeaders.CONTENT_TYPE_CONVERTER_WINS`设置为`true`,并且转换器设置的任何值都将被保留。
###### 多方法侦听器
从版本 1.5.0 开始,你可以在类级别上指定`@RabbitListener`注释。与新的`@RabbitHandler`注释一起,这允许单个侦听器根据传入消息的有效负载类型调用不同的方法。这一点最好用一个例子来描述:
```
@RabbitListener(id="multi", queues = "someQueue")
@SendTo("my.reply.queue")
public class MultiListenerBean {
@RabbitHandler
public String thing2(Thing2 thing2) {
...
}
@RabbitHandler
public String cat(Cat cat) {
...
}
@RabbitHandler
public String hat(@Header("amqp_receivedRoutingKey") String rk, @Payload Hat hat) {
...
}
@RabbitHandler(isDefault = true)
public String defaultMethod(Object object) {
...
}
}
```
在这种情况下,如果转换的有效负载是`Thing2`、`Cat`或`Hat`,则会调用单独的`@RabbitHandler`方法。你应该理解,系统必须能够基于有效负载类型来识别唯一的方法。将检查类型是否可分配给没有注释或使用`@Payload`注释的单个参数。请注意,同样的方法签名也适用,如方法级别`@RabbitListener`([前面描述的](#message-listener-adapter))中所讨论的那样。
从版本 2.0.3 开始,可以将`@RabbitHandler`方法指定为默认方法,如果其他方法不匹配,则调用该方法。最多只能指定一种方法。
| |`@RabbitHandler`仅用于处理转换后的消息负载,如果希望接收未转换的 RAW`Message`对象,则必须在方法上使用`@RabbitListener`,而不是在类上。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
###### `@Repeatable` `@RabbitListener`
从版本 1.6 开始,`@RabbitListener`注释被标记为`@Repeatable`。这意味着注释可以多次出现在相同的注释元素(方法或类)上。在这种情况下,将为每个注释创建一个单独的侦听器容器,每个注释都调用相同的侦听器`@Bean`。可重复的注释可以用于 Java8 或更高版本。
###### 代理`@RabbitListener`和泛型
如果你的服务旨在被代理(例如,在`@Transactional`的情况下),那么当接口具有通用参数时,你应该记住一些考虑因素。考虑以下示例:
```
interface TxService {
String handle(P payload, String header);
}
static class TxServiceImpl implements TxService {
@Override
@RabbitListener(...)
public String handle(Thing thing, String rk) {
...
}
}
```
对于通用接口和特定实现,你将被迫切换到 CGLIB 目标类代理,因为接口`handle`方法的实际实现是一种桥接方法。在事务管理的情况下,通过使用一个注释选项来配置 CGLIB 的使用:`@EnableTransactionManagement(proxyTargetClass = true)`。在这种情况下,所有注释都必须在实现中的目标方法上声明,如下例所示:
```
static class TxServiceImpl implements TxService {
@Override
@Transactional
@RabbitListener(...)
public String handle(@Payload Foo foo, @Header("amqp_receivedRoutingKey") String rk) {
...
}
}
```
###### 处理异常
默认情况下,如果一个带注释的侦听器方法抛出一个异常,它将被抛出到容器,消息将被重新请求并重新交付、丢弃或路由到死信交换,这取决于容器和代理配置。任何东西都不会退还给寄件人。
从版本 2.0 开始,`@RabbitListener`注释有两个新属性:`errorHandler`和`returnExceptions`。
默认情况下,不会对它们进行配置。
你可以使用`errorHandler`来提供`RabbitListenerErrorHandler`实现的 Bean 名称。此功能接口有一种方法,如下所示:
```
@FunctionalInterface
public interface RabbitListenerErrorHandler {
Object handleError(Message amqpMessage, org.springframework.messaging.Message> message,
ListenerExecutionFailedException exception) throws Exception;
}
```
如你所见,你可以访问从容器接收的原始消息、消息转换器产生的 Spring-messaging`Message>`对象,以及侦听器抛出的异常(包装在`ListenerExecutionFailedException`中)。错误处理程序可以返回一些结果(作为回复发送),也可以抛出原始异常或新的异常(根据`returnExceptions`设置,将其抛出到容器或返回给发送方)。
当`true`时,`returnExceptions`属性将导致异常返回给发送方。异常包装在`RemoteInvocationResult`对象中。在发送端,有一个可用的`RemoteInvocationAwareMessageConverterAdapter`,如果将其配置为`RabbitTemplate`,则会重新抛出服务器端异常,并包装在`AmqpRemoteException`中。服务器异常的堆栈跟踪是通过合并服务器和客户端堆栈跟踪来合成的。
| |这种机制通常仅对默认的`SimpleMessageConverter`有效,后者使用 Java 序列化。
异常通常不是“Jackson 友好的”,并且不能序列化到 JSON。
如果使用 JSON,请考虑在抛出异常时使用`errorHandler`返回一些其他 Jackson 友好的`Error`对象。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| |在版本 2.1 中,此接口从包`o.s.amqp.rabbit.listener`移动到`o.s.amqp.rabbit.listener.api`。|
|---|---------------------------------------------------------------------------------------------------------------|
从版本 2.1.7 开始,`Channel`在消息消息头中可用;这允许你在使用`AcknowledgeMode.MANUAL`时对失败的消息进行 ACK 或 NACK:
```
public Object handleError(Message amqpMessage, org.springframework.messaging.Message> message,
ListenerExecutionFailedException exception) {
...
message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class)
.basicReject(message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class),
true);
}
```
从版本 2.2.18 开始,如果抛出消息转换异常,将调用错误处理程序,在`message`参数中使用`null`。这允许应用程序向调用者发送一些结果,表明收到了格式错误的消息。以前,此类错误是由容器抛出和处理的。
###### 容器管理
为注释创建的容器未在应用程序上下文中注册。你可以通过在`RabbitListenerEndpointRegistry` Bean 上调用`getListenerContainers()`来获得所有容器的集合。然后可以迭代这个集合,例如,停止或启动所有容器,或者调用注册表本身上的`Lifecycle`方法,这些方法将调用每个容器上的操作。
你还可以通过使用其`id`,使用`getListenerContainer(String id)`,获得对单个容器的引用——例如,对于上面的代码片段创建的容器,`registry.getListenerContainer("multi")`。
从版本 1.5.2 开始,你可以使用`getListenerContainerIds()`获得已注册容器的`id`值。
从版本 1.5 开始,你现在可以将`group`分配到`RabbitListener`端点上的容器。这提供了一种机制来获取对容器子集的引用。添加一个`group`属性会导致类型`Collection`的 Bean 被注册到带有组名的上下文中。
##### @rabbitlistener with batching
当接收到[a batch](#template-batching)的消息时,解批处理通常由容器执行,侦听器在一次调用一条消息。从版本 2.2 开始,你可以将侦听器容器工厂和侦听器配置为在一个调用中接收整个批处理,只需设置工厂的`batchListener`属性,并将方法有效负载参数设置为`List`:
```
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setBatchListener(true);
return factory;
}
@RabbitListener(queues = "batch.1")
public void listen1(List in) {
...
}
// or
@RabbitListener(queues = "batch.2")
public void listen2(List> in) {
...
}
```
将`batchListener`属性设置为 true 会自动关闭工厂创建的容器中的`deBatchingEnabled`容器属性(除非`consumerBatchEnabled`是`true`-见下文)。实际上,将 debatching 从容器移动到侦听器适配器,并且适配器将创建传递给侦听器的列表。
启用批处理的工厂不能与[多方法监听器](#annotation-method-selection)一起使用。
也从版本 2.2 开始。当一次只接收一条批处理消息时,最后一条消息包含一个布尔头,该头设置为`true`。这个头可以通过添加`@Header(AmqpHeaders.LAST_IN_BATCH)`BooleanLastMessageProperties.islastinBatch()AmQpHeaders 来获得。batch_size` 是用每个消息片段中批处理的大小填充的。
此外,在`SimpleMessageListenerContainer`中添加了一个新的属性`consumerBatchEnabled`。如果这是真的,容器将创建一批消息,最多为`batchSize`;如果`receiveTimeout`在没有新消息到达的情况下经过,则传递部分批消息。如果收到了生产者创建的批处理,则将其删除并添加到消费者侧批处理中;因此,实际交付的消息数量可能超过`batchSize`,表示从代理收到的消息的数量。`deBatchingEnabled`当`consumerBatchEnabled`为真时必须为真;容器工厂将强制执行此要求。
```
@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConsumerTagStrategy(consumerTagStrategy());
factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
return factory;
}
```
当使用`consumerBatchEnabled`与`@RabbitListener`时:
```
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List amqpMessages) {
this.amqpMessagesReceived = amqpMessages;
this.batch1Latch.countDown();
}
@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List> messages) {
this.messagingMessagesReceived = messages;
this.batch2Latch.countDown();
}
@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List strings) {
this.batch3Strings = strings;
this.batch3Latch.countDown();
}
```
* 第一个是用 RAW 调用的,未转换的`org.springframework.amqp.core.Message`s 接收到的。
* 第二个是用`org.springframework.messaging.Message>`s 调用的,带有转换的有效负载和映射的头/属性。
* 第三种是使用转换后的有效负载进行调用,不访问 headers/properteis。
还可以添加`Channel`参数,该参数在使用`MANUAL`ACK 模式时经常使用。这在第三个示例中不是很有用,因为你无法访问`delivery_tag`属性。
##### 使用容器工厂
引入了监听器容器工厂来支持`@RabbitListener`并使用`RabbitListenerEndpointRegistry`注册容器,如[程序化端点注册](#async-annotation-driven-registration)中所讨论的那样。
从版本 2.1 开始,它们可以用于创建任何侦听器容器——甚至是没有侦听器的容器(例如在 Spring 集成中使用的容器)。当然,必须在容器启动之前添加一个侦听器。
创建这样的容器有两种方法:
* 使用 simplerabbitlistenerendPoint
* 创建后添加监听器
下面的示例展示了如何使用`SimpleRabbitListenerEndpoint`创建侦听器容器:
```
@Bean
public SimpleMessageListenerContainer factoryCreatedContainerSimpleListener(
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory) {
SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setQueueNames("queue.1");
endpoint.setMessageListener(message -> {
...
});
return rabbitListenerContainerFactory.createListenerContainer(endpoint);
}
```
下面的示例展示了如何在创建后添加侦听器:
```
@Bean
public SimpleMessageListenerContainer factoryCreatedContainerNoListener(
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory) {
SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer();
container.setMessageListener(message -> {
...
});
container.setQueueNames("test.no.listener.yet");
return container;
}
```
在这两种情况下,侦听器也可以是`ChannelAwareMessageListener`,因为它现在是`MessageListener`的子接口。
如果你希望创建具有类似属性的多个容器,或者使用预先配置的容器工厂(例如 Spring Boot Auto Configuration 提供的容器工厂),那么这些技术是有用的。
| |以这种方式创建的容器是正常的`@Bean`实例,并且不在`RabbitListenerEndpointRegistry`中注册。|
|---|------------------------------------------------------------------------------------------------------------------------|
##### 异步`@RabbitListener`返回类型
从版本 2.1 开始,`@RabbitListener`(和`@RabbitHandler`)方法可以用异步返回类型`ListenableFuture>`和`Mono>`来指定,让回复被异步发送。
| |侦听器容器工厂必须配置`AcknowledgeMode.MANUAL`,这样使用者线程就不会对消息进行 ACK;相反,异步完成将在异步操作完成时对消息进行 ACK 或 NACK,
当异步结果以错误完成时,消息是否被重新请求取决于抛出的异常类型、容器配置和容器错误处理程序,
默认情况下,消息将被重新请求,除非容器的`defaultRequeueRejected`属性被设置为`false`(默认情况下是`true`)。
如果异步结果是用`AmqpRejectAndDontRequeueException`完成的,则消息将不会被重新请求。,
如果容器的`defaultRequeueRejected`属性是`false`,你可以通过将 Future 的异常设置为`ImmediateRequeueException`来重写它,并且消息将被重新请求,
如果侦听器方法中发生了某些异常,从而阻止了异步结果对象的创建,你必须捕获该异常并返回一个适当的返回对象,该对象将导致消息被确认或重新请求。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
从版本 2.2.21、2.3.13、2.4.1 开始,当检测到异步返回类型时,`AcknowledgeMode`将自动设置`MANUAL`。此外,带有致命异常的传入消息将被单独地负面确认,以前任何先前未确认的消息也被负面确认。
##### 线程和异步消费者
异步消费者涉及许多不同的线程。
在`SimpleMessageListenerContainer`中配置的`TaskExecutor`中的线程用于在`RabbitMQ Client`传递新消息时调用`MessageListener`。如果未配置,则使用`SimpleAsyncTaskExecutor`。如果使用池执行程序,则需要确保池大小足以处理配置的并发性。使用`DirectMessageListenerContainer`,在`RabbitMQ Client`线程上直接调用`MessageListener`。在这种情况下,`taskExecutor`用于监视消费者的任务。
| |当使用默认`SimpleAsyncTaskExecutor`时,对于侦听器所调用的线程,侦听器容器`beanName`在`threadNamePrefix`中使用。
这对于日志分析很有用。
我们通常建议始终在日志附录配置中包括线程名称。
当`TaskExecutor`通过容器上的`taskExecutor`属性特别提供时,它被原封不动地使用,
建议你使用类似的技术来命名由自定义`TaskExecutor` Bean 定义创建的线程,以帮助在日志消息中进行线程标识。|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
在创建连接时,在`CachingConnectionFactory`中配置的`Executor`被传递到`RabbitMQ Client`中,其线程用于将新消息传递到侦听器容器。如果未对此进行配置,则客户机将使用一个内部线程池执行器,每个连接的池大小(在编写时)为`Runtime.getRuntime().availableProcessors() * 2`。
如果你有大量的工厂或者正在使用`CacheMode.CONNECTION`,那么你可能希望考虑使用带有足够线程的共享`ThreadPoolTaskExecutor`来满足你的工作负载。
| |使用`DirectMessageListenerContainer`,你需要确保连接工厂配置了一个任务执行器,该执行器具有足够的线程来支持跨使用该工厂的所有侦听器容器的所需并发性。
缺省池大小(在编写本文时)为`Runtime.getRuntime().availableProcessors() * 2`。|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
`RabbitMQ client`使用`ThreadFactory`为低级 I/O 操作创建线程。要修改这个工厂,你需要配置底层的 RabbitMQ`ConnectionFactory`,如[配置底层客户机连接工厂](#connection-factory)中所讨论的那样。
##### 选择容器
2.0 版引入了`DirectMessageListenerContainer`。在此之前,只有`SimpleMessageListenerContainer`可用。SMLC 为每个使用者使用一个内部队列和一个专用线程。如果一个容器被配置为监听多个队列,那么将使用同一个使用者线程来处理所有队列。并发性由`concurrentConsumers`和其他属性控制。当消息从 RabbitMQ 客户机到达时,客户机线程通过队列将消息传递给消费者线程。之所以需要这种架构,是因为在 RabbitMQ 客户端的早期版本中,不可能实现多个并发交付。较新版本的客户机具有修订后的线程模型,现在可以支持并发。这允许引入 DMLC,现在在 RabbitMQ 客户端线程上直接调用侦听器。因此,它的架构实际上比 SMLC“更简单”。然而,这种方法有一些局限性,并且 SMLC 的某些功能在 DMLC 中不可用。此外,并发性由`consumersPerQueue`(以及客户库的线程池)控制。`concurrentConsumers`和相关属性在此容器中不可用。
以下特性可用于 SMLC,但不适用于 DMLC:
* `batchSize`:使用 SMLC,你可以将其设置为控制事务中传递的消息数量或减少 ACK 的数量,但它可能会导致失败后重复传递的数量增加。(DMLC 确实有`messagesPerAck`,你可以使用它来减少 ACK,与`batchSize`和 SMLC 相同,但它不能用于事务——每个消息都在单独的事务中传递和 ACK’d)。
* `consumerBatchEnabled`:在消费者中启用离散消息的批处理;有关更多信息,请参见[消息侦听器容器配置](#containerAttributes)。
* `maxConcurrentConsumers`和使用者缩放间隔或触发器——DMLC 中没有自动缩放。但是,它确实允许你以编程方式更改`consumersPerQueue`属性,并对消费者进行相应的调整。
然而,与 SMLC 相比,DMLC 有以下好处:
* 在运行时添加和删除队列更有效。使用 SMLC,整个使用者线程将被重新启动(所有使用者将被取消并重新创建)。使用 DMLC,不受影响的消费者不会被取消。
* 避免了 RabbitMQ 客户端线程和使用者线程之间的上下文切换。
* 线程在消费者之间共享,而不是在 SMLC 中为每个消费者拥有一个专用线程。但是,请参见[线程和异步消费者](#threading)中有关连接工厂配置的重要注释。
有关将哪些配置属性应用于每个容器的信息,请参见[消息侦听器容器配置](#containerAttributes)。
##### 检测空闲异步消费者
尽管效率很高,但异步用户的一个问题是检测它们何时空闲——如果一段时间内没有消息到达,用户可能希望采取一些措施。
从版本 1.6 开始,现在可以将侦听器容器配置为在一段时间之后没有消息传递的情况下发布`ListenerContainerIdleEvent`。当容器处于空闲状态时,每`idleEventInterval`毫秒就会发布一个事件。
要配置此功能,请在容器上设置`idleEventInterval`。下面的示例展示了如何在 XML 和 Java 中这样做(对于`SimpleMessageListenerContainer`和`SimpleRabbitListenerContainerFactory`):
```
```
```
@Bean
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
...
container.setIdleEventInterval(60000L);
...
return container;
}
```
```
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setIdleEventInterval(60000L);
...
return factory;
}
```
在每种情况下,当容器处于空闲状态时,每分钟都会发布一次事件。
###### 事件消耗
你可以通过实现`ApplicationListener`来捕获空闲事件——它可以是一个普通的侦听器,也可以是一个窄到只接收这个特定事件的侦听器。还可以使用 Spring Framework4.2 中介绍的`@EventListener`。
下面的示例将`@RabbitListener`和`@EventListener`合并为一个类。你需要理解,应用程序侦听器获取所有容器的事件,因此,如果你想根据哪个容器空闲来采取特定的操作,你可能需要检查侦听器 ID。你也可以为此目的使用`@EventListener``condition`。
这些事件有四个属性:
* `source`:侦听器容器实例
* `id`:侦听器 ID(或容器 Bean 名称)
* `idleTime`:事件发布时容器处于空闲状态的时间
* `queueNames`:容器监听的队列的名称
下面的示例展示了如何同时使用`@RabbitListener`和`@EventListener`注释来创建侦听器:
```
public class Listener {
@RabbitListener(id="someId", queues="#{queue.name}")
public String listen(String foo) {
return foo.toUpperCase();
}
@EventListener(condition = "event.listenerId == 'someId'")
public void onApplicationEvent(ListenerContainerIdleEvent event) {
...
}
}
```
| |事件侦听器看到所有容器的事件。
因此,在前面的示例中,我们根据侦听器 ID 缩小接收到的事件的范围。|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------|
| |如果希望使用 IDLE 事件停止 Lister 容器,则不应在调用侦听器的线程上调用`container.stop()`,
这样做总是会导致延迟和不必要的日志消息。,相反,
,你应该将事件传递给一个不同的线程,然后该线程可以停止容器。|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
##### 监视侦听器性能
从版本 2.2 开始,如果在类路径上检测到`Micrometer`,并且在应用程序上下文中存在`MeterRegistry`,则侦听器容器将自动为侦听器创建和更新微米计`Timer`s。可以通过将容器属性`micrometerEnabled`设置为`false`来禁用计时器。
两个计时器被维护-一个用于对听者的成功调用,另一个用于失败调用。对于一个简单的`MessageListener`,每个配置的队列都有一对计时器。
计时器名为`spring.rabbitmq.listener`,并具有以下标记:
* `listenerId`:(侦听器 ID 或容器 Bean 名称)
* `queue`:(当`consumerBatchEnabled`为`true`时,一个简单侦听器或配置的队列名称列表的队列名称是`true`-因为批处理可能包含来自多个队列的消息)
* `result`:`success`或`failure`
* `exception`:`none`或`ListenerExecutionFailedException`
可以使用`micrometerTags`容器属性添加其他标记。
#### 4.1.7.容器和以代理命名的队列
虽然使用`AnonymousQueue`实例作为自动删除队列是可取的,但从版本 2.1 开始,你可以使用带有侦听器容器的代理命名队列。下面的示例展示了如何做到这一点:
```
@Bean
public Queue queue() {
return new Queue("", false, true, true);
}
@Bean
public SimpleMessageListenerContainer container() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf());
container.setQueues(queue());
container.setMessageListener(m -> {
...
});
container.setMissingQueuesFatal(false);
return container;
}
```
请注意名称的空`String`。当`RabbitAdmin`声明队列时,它使用代理返回的名称更新`Queue.actualName`属性。在配置容器以使其工作时,必须使用`setQueues()`,以便容器可以在运行时访问声明的名称。仅仅设置名字是不够的。
| |在容器运行时,不能将以代理命名的队列添加到容器中。|
|---|----------------------------------------------------------------------------|
| |当一个连接被重置并建立了一个新的连接时,新的队列将获得一个新的名称,
由于在重新启动的容器和重新声明的队列之间存在竞争条件,因此将容器的`missingQueuesFatal`属性设置为`false`非常重要,因为容器最初可能会尝试重新连接到旧队列。|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
#### 4.1.8.消息转换器
`AmqpTemplate`还定义了用于发送和接收委托给`MessageConverter`的消息的几种方法。`MessageConverter`为每个方向提供了一个方法:一个用于转换**to**a`Message`,另一个用于转换**来自**a`Message`。请注意,当转换为`Message`时,除了对象之外,还可以提供属性。`object`参数通常对应于消息体。下面的清单显示了`MessageConverter`接口定义:
```
public interface MessageConverter {
Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException;
Object fromMessage(Message message) throws MessageConversionException;
}
```
在`AmqpTemplate`上的相关`Message`-发送方法比我们前面讨论的方法更简单,因为它们不需要`Message`实例。相反,`MessageConverter`负责“创建”每个`Message`,方法是将提供的对象转换为`Message`主体的字节数组,然后添加任何提供的`MessageProperties`。下面的清单显示了各种方法的定义:
```
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message)
throws AmqpException;
void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
throws AmqpException;
void convertAndSend(String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
```
在接收端,只有两种方法:一种接受队列名称,另一种依赖于模板的“queue”属性已设置。下面的清单显示了这两种方法的定义:
```
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
```
| |在[异步消费者](#async-consumer)中提到的`MessageListenerAdapter`也使用了`MessageConverter`。|
|---|------------------------------------------------------------------------------------------------------------------|
##### `SimpleMessageConverter`
`MessageConverter`策略的默认实现称为`SimpleMessageConverter`。这是`RabbitTemplate`的实例所使用的转换器,如果你没有显式地配置替代选项。它处理基于文本的内容、序列化的 Java 对象和字节数组。
###### 从`Message`转换
如果输入`Message`的内容类型以“text”开头(例如,“text/plain”),则它还会检查 Content-Encoding 属性,以确定将`Message`Body 字节数组转换为 Java`String`时要使用的字符集。如果输入`Message`上没有设置内容编码属性,则默认情况下它使用 UTF-8 字符集。如果需要覆盖该默认设置,可以配置`SimpleMessageConverter`的实例,设置其`defaultCharset`属性,并将其注入`RabbitTemplate`实例。
如果输入`Message`的 Content-Type 属性值设置为“application/x-java-serialized-object”,则`SimpleMessageConverter`将尝试将字节数组反序列化为 Java 对象。虽然这对于简单的原型设计可能很有用,但我们不建议依赖 Java 序列化,因为它会导致生产者和消费者之间的紧密耦合。当然,它也排除了任何一方使用非 Java 系统的可能性。由于 AMQP 是一种线级协议,因此很不幸的是,在这样的限制下,它将失去很多这种优势。在接下来的两节中,我们将探索在不依赖 Java 序列化的情况下传递富领域对象内容的一些替代方案。
对于所有其他内容类型,`SimpleMessageConverter`直接以字节数组的形式返回`Message`正文内容。
有关重要信息,请参见[Java 反序列化](#java-deserialization)。
###### 转换为`Message`
当从任意 Java 对象转换为`Message`时,`SimpleMessageConverter`同样处理字节数组、字符串和可序列化实例。它将这些转换为字节(在字节数组的情况下,没有任何可转换的内容),并且相应地 SESContent-Type 属性。如果要转换的`Object`与这些类型之一不匹配,则`Message`正文为空。
##### `SerializerMessageConverter`
这种转换器类似于`SimpleMessageConverter`,只是它可以配置与其它 Spring 框架`Serializer`和`Deserializer`实现的`application/x-java-serialized-object`转换。
有关重要信息,请参见[Java 反序列化](#java-deserialization)。
##### Jackson2jsonmessageconverter
本节介绍使用`Jackson2JsonMessageConverter`转换到`Message`和从`Message`转换的情况。它有以下几个部分:
* [转换为`Message`](#Jackson2jsonMessageConverter-to-Message)
* [转换自`Message`](#Jackson2jsonmessageconverter-from-message)
###### 转换为`Message`
正如上一节中提到的,通常不建议依赖 Java 序列化。JSON(JavaScript Object Notation,JavaScript Object Notation,JavaScript Object Notation)是一种比较常见的替代方法,它在不同的语言和平台上更灵活、更可移植。转换器可以在任何`RabbitTemplate`实例上配置,以覆盖其对`SimpleMessageConverter`默认值的使用。`Jackson2JsonMessageConverter`使用`com.fasterxml.jackson`2.x 库。下面的示例配置`Jackson2JsonMessageConverter`:
```
```
如上面所示,`Jackson2JsonMessageConverter`默认使用`DefaultClassMapper`。类型信息被添加到(并从)`MessageProperties`。如果入站消息在`MessageProperties`中不包含类型信息,但你知道期望的类型,则可以使用`defaultType`属性配置静态类型,如下例所示:
```
```
此外,你还可以从`*TypeId*`标头中的值提供自定义映射。下面的示例展示了如何做到这一点:
```
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
jsonConverter.setClassMapper(classMapper());
return jsonConverter;
}
@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map> idClassMapping = new HashMap<>();
idClassMapping.put("thing1", Thing1.class);
idClassMapping.put("thing2", Thing2.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}
```
现在,如果发送系统将头设置为`thing1`,转换器将创建一个`Thing1`对象,依此类推。有关从非 Spring 应用程序转换消息的完整讨论,请参见[Receiving JSON from Non-Spring Applications](#spring-rabbit-json)示例应用程序。
###### 从`Message`转换
根据发送系统添加到头部的类型信息,将入站消息转换为对象。
在 1.6 之前的版本中,如果不存在类型信息,则转换将失败。从版本 1.6 开始,如果缺少类型信息,转换器将使用 Jackson 默认值(通常是映射)来转换 JSON。
此外,从版本 1.6 开始,当使用`@RabbitListener`注释(在方法上)时,推断的类型信息将添加到`MessageProperties`中。这使得转换器可以转换为目标方法的参数类型。这仅在存在一个没有注释的参数或一个带有`@Payload`注释的参数时才适用。在分析过程中忽略类型`Message`的参数。
| |默认情况下,推断的类型信息将覆盖由发送系统创建的入站`*TypeId*`和相关标题
。
这将使接收系统自动转换为不同的域对象。
这仅适用于此。如果参数类型是具体的(不是抽象的或接口的),或者它来自`java.util`包。
在所有其他情况下,使用`*TypeId*`和相关的标题。
在某些情况下,你可能希望重写缺省行为,并始终使用`*TypeId*`信息。,例如,
,假设你有一个`@RabbitListener`,它接受一个`Thing1`参数,但是消息包含一个`Thing2`,该
是`Thing1`的子类(这是具体的)。
推断出的类型将是不正确的。
来处理这种情况,将`TYPE_ID`上的`TypePrecedence`属性设置为`TYPE_ID`,而不是默认`INFERRED`的
。
(该属性实际上位于转换器的`DefaultJackson2JavaTypeMapper`上,但转换器
上提供了一个设置器,以方便使用。)
如果你注入一个自定义的类型映射器,你应该在映射器上设置属性。|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| |当从`Message`转换时,传入的`MessageProperties.getContentType()`必须是符合 JSON 的(`contentType.contains("json")`用于检查)。
从版本 2.2 开始,如果没有`contentType`属性,则假定`application/json`,或者它具有默认值`application/octet-stream`。
以恢复到以前的行为(返回一个未转换的`byte[]`),将转换器的`assumeSupportedContentType`属性设置为`false`。
如果不支持内容类型,则生成`WARN`日志消息`Could not convert incoming message with content-type […]`,是发出的,并且`message.getBody()`按原样返回-作为`byte[]`。
因此,为了满足`Jackson2JsonMessageConverter`在消费者方面的要求,生产者必须添加`contentType`消息属性——例如,作为`application/json`或`text/x-json`,或者通过使用`Jackson2JsonMessageConverter`,它会自动设置标题。
下面的清单显示了许多转换器调用:|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
```
@RabbitListener
public void thing1(Thing1 thing1) {...}
@RabbitListener
public void thing1(@Payload Thing1 thing1, @Header("amqp_consumerQueue") String queue) {...}
@RabbitListener
public void thing1(Thing1 thing1, o.s.amqp.core.Message message) {...}
@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message message) {...}
@RabbitListener
public void thing1(Thing1 thing1, String bar) {...}
@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message> message) {...}
```
在前面清单中的前四种情况中,转换器尝试转换为`Thing1`类型。第五个示例是无效的,因为我们无法确定哪个参数应该接收消息负载。在第六个示例中,由于泛型类型是`WildcardType`,所以 Jackson 默认应用。
但是,你可以创建一个自定义转换器,并使用`targetMethod`消息属性来决定将 JSON 转换为哪种类型。
| |只有在方法级别声明`@RabbitListener`注释时,才能实现这种类型推断。
对于类级别`@RabbitListener`,转换后的类型用于选择调用哪个`@RabbitHandler`方法。
由于这个原因,基础设施提供了`targetObject`属性,你可以在自定义的
转换器中使用它来确定类型。|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| |从版本 1.6.11 开始,`Jackson2JsonMessageConverter`,因此,`DefaultJackson2JavaTypeMapper`(`DefaultClassMapper`)提供`trustedPackages`选项,以克服[序列化小工具](https://pivotal.io/security/cve-2017-4995)漏洞。
默认情况下,对于向后兼容,`Jackson2JsonMessageConverter`信任所有包——也就是说,它使用`*`作为选项。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
###### 反序列化抽象类
在版本 2.2.8 之前,如果`@RabbitListener`的推断类型是抽象类(包括接口),则转换器将返回到头部中查找类型信息,如果存在,则使用该信息;如果不存在,则尝试创建抽象类。当使用自定义反序列化器来处理抽象类的自定义`ObjectMapper`时,但传入的消息具有无效的类型标头,这会导致一个问题。
从版本 2.2.8 开始,默认情况下保留以前的行为。如果你有这样的自定义`ObjectMapper`,并且希望忽略类型头,并且总是使用推断类型进行转换,那么将`alwaysConvertToInferredType`设置为`true`。这是向后兼容性所必需的,并且可以避免尝试转换失败时的开销(使用标准`ObjectMapper`)。
###### 使用 Spring 数据投影接口
从版本 2.2 开始,你可以将 JSON 转换为 Spring 数据投影接口,而不是具体的类型。这允许对数据进行非常有选择性的、低耦合的绑定,包括从 JSON 文档中的多个位置查找值。例如,以下接口可以定义为消息有效负载类型:
```
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
```
```
@RabbitListener(queues = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
```
默认情况下,访问器方法将用于在接收的 JSON 文档中查找属性名称 AS 字段。`@JsonPath`表达式允许定制值查找,甚至可以定义多个 JSON 路径表达式,从多个位置查找值,直到表达式返回实际值。
要启用此功能,请将消息转换器上的`useProjectionForInterfaces`设置为`true`。你还必须将`spring-data:spring-data-commons`和`com.jayway.jsonpath:json-path`添加到类路径。
当用作`@RabbitListener`方法的参数时,接口类型将作为正常类型自动传递给转换器。
###### 从`Message`转换为`RabbitTemplate`
如前所述,类型信息在消息头中传递,以在从消息转换时协助转换器。这在大多数情况下都行得通。然而,当使用泛型类型时,它只能转换简单的对象和已知的“容器”对象(列表、数组和映射)。从版本 2.0 开始,`Jackson2JsonMessageConverter`实现了`SmartMessageConverter`,这使得它可以与新的`RabbitTemplate`方法一起使用,该方法接受`ParameterizedTypeReference`参数。这允许转换复杂的泛型类型,如下例所示:
```
Thing1> thing1 =
rabbitTemplate.receiveAndConvert(new ParameterizedTypeReference>>() { });
```
| |从版本 2.1 开始,`AbstractJsonMessageConverter`类已被删除。
它不再是`Jackson2JsonMessageConverter`的基类。
它已被`AbstractJackson2MessageConverter`取代。|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
##### `MarshallingMessageConverter`
还有一个选择是`MarshallingMessageConverter`。它委托给 Spring OXM 库的`Marshaller`和`Unmarshaller`策略接口的实现。你可以阅读有关该库的更多信息[here](https://docs.spring.io/spring/docs/current/spring-framework-reference/html/oxm.html)。在配置方面,最常见的是仅提供构造函数参数,因为`Marshaller`的大多数实现也实现`Unmarshaller`。下面的示例展示了如何配置`MarshallingMessageConverter`:
```
```
##### `Jackson2XmlMessageConverter`
这个类是在版本 2.1 中引入的,可以用于将消息从 XML 转换为 XML。
`Jackson2XmlMessageConverter`和`Jackson2JsonMessageConverter`都具有相同的基类:`AbstractJackson2MessageConverter`。
| |引入`AbstractJackson2MessageConverter`类来替换已删除的类:`AbstractJsonMessageConverter`。|
|---|----------------------------------------------------------------------------------------------------------------------|
`Jackson2XmlMessageConverter`使用`com.fasterxml.jackson`2.x 库。
你可以用与`Jackson2JsonMessageConverter`相同的方式使用它,只是它支持 XML 而不是 JSON。下面的示例配置`Jackson2JsonMessageConverter`:
```
```
有关更多信息,请参见[Jackson2JSONMessageConverter](#json-message-converter)。
| |从版本 2.2 开始,如果不存在`contentType`属性,或者它具有默认值`application/octet-stream`,则假定`application/xml`。
恢复到以前的行为(返回未转换的`byte[]`),将转换器的`assumeSupportedContentType`属性设置为`false`。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
##### `ContentTypeDelegatingMessageConverter`
这个类是在版本 1.4.2 中引入的,允许基于`MessageProperties`中的 Content type 属性将任务委托给特定的`MessageConverter`。默认情况下,如果没有`contentType`属性,或者有一个值与所有配置的转换器都不匹配,那么它将委托给`SimpleMessageConverter`。下面的示例配置`ContentTypeDelegatingMessageConverter`:
```
```
##### Java 反序列化
本节介绍如何反序列化 Java 对象。
| |当从不受信任的源反序列化 Java 对象时,可能存在一个漏洞,
如果你接受来自不受信任的源的消息,并且`content-type`的值为`application/x-java-serialized-object`,你应该
考虑配置哪些包和类被允许进行反序列化。
这适用于`SimpleMessageConverter`和`SerializerMessageConverter`,当它被配置为隐式或通过配置使用`DefaultDeserializer`时。
默认情况下,允许的列表为空,这意味着所有的类都是反序列化的。
你可以设置一个模式列表,例如`thing1.`**,`thing1.thing2.Cat`或`.MySafeClass`。
在找到匹配之前,将按顺序检查模式。
如果不匹配,则将不匹配,抛出一个`SecurityException`。
可以在这些转换器上使用`allowedListPatterns`属性设置模式。|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
##### 消息属性转换器
`MessagePropertiesConverter`策略接口用于在兔子客户端`BasicProperties`和 Spring AMQP`MessageProperties`之间进行转换。默认的实现(`DefaultMessagePropertiesConverter`)对于大多数目的来说通常是足够的,但是如果需要,你可以实现自己的实现。当大小不大于`1024`字节时,默认属性转换器将类型`BasicProperties`的元素转换为`String`实例。较大的`LongString`实例未被转换(请参见下一段)。可以使用构造函数参数重写此限制。
从版本 1.6 开始,长于长字符串限制(默认值:1024)的头现在由`DefaultMessagePropertiesConverter`默认设置为`LongString`实例。你可以通过`getBytes[]`、`toString()`或`getStream()`方法访问内容。
以前,`DefaultMessagePropertiesConverter`将这样的标题“转换”为`DataInputStream`(实际上,它只引用了`LongString`实例的`DataInputStream`)。在输出时,这个头不会被转换(除了转换为字符串——例如,通过在流上调用`toString()`,`[[email protected]](/cdn-cgi/l/email-protection)`)。
大型传入`LongString`头现在也可以在输出上正确地“转换”(默认情况下)。
提供了一个新的构造函数,使你可以将转换器配置为像以前一样工作。下面的清单显示了该方法的 Javadoc 注释和声明:
```
/**
* Construct an instance where LongStrings will be returned
* unconverted or as a java.io.DataInputStream when longer than this limit.
* Use this constructor with 'true' to restore pre-1.6 behavior.
* @param longStringLimit the limit.
* @param convertLongLongStrings LongString when false,
* DataInputStream when true.
* @since 1.6
*/
public DefaultMessagePropertiesConverter(int longStringLimit, boolean convertLongLongStrings) { ... }
```
同样从版本 1.6 开始,在`MessageProperties`中添加了一个名为`correlationIdString`的新属性。以前,当从 RabbitMQ 客户端使用的`BasicProperties`转换到和转换时,执行一个不必要的`byte[] <→ String`转换,因为`MessageProperties.correlationId`是一个`byte[]`,但是`BasicProperties`使用一个`String`。(最终,RabbitMQ 客户机使用 UTF-8 将`String`转换为字节以放入协议消息)。
为了提供最大的向后兼容性,在`DefaultMessagePropertiesConverter`中添加了一个名为`correlationIdPolicy`的新属性。这需要一个`DefaultMessagePropertiesConverter.CorrelationIdPolicy`枚举参数。默认情况下,它被设置为`BYTES`,它复制了之前的行为。
对于入站消息:
* `STRING`:仅映射`correlationIdString`属性
* `BYTES`:仅映射`correlationId`属性
* `BOTH`:两个属性都映射了
对于出站消息:
* `STRING`:仅映射`correlationIdString`属性
* `BYTES`:仅映射`correlationId`属性
* `BOTH`:这两个属性都被考虑,`String`属性优先。
同样从版本 1.6 开始,入站`deliveryMode`属性不再映射到`MessageProperties.deliveryMode`。它被映射到`MessageProperties.receivedDeliveryMode`。此外,入站`userId`属性不再映射到`MessageProperties.userId`。它被映射到`MessageProperties.receivedUserId`。如果出站消息使用相同的`MessageProperties`对象,则这些更改是为了避免这些属性的意外传播。
从版本 2.2 开始,`DefaultMessagePropertiesConverter`使用`getName()`而不是`toString()`转换类型为`Class>`的任何自定义标头;这避免了使用`toString()`表示来解析类名称的应用程序。对于滚动升级,你可能需要更改你的消费者来理解这两种格式,直到所有的生产者都升级了。
#### 4.1.9.修改消息-压缩和更多
存在一些扩展点。它们允许你对消息执行一些处理,可以在消息发送到 RabbitMQ 之前,也可以在收到消息之后立即进行处理。
正如可以在[消息转换器](#message-converters)中看到的那样,这样的一个扩展点是在`AmqpTemplate``convertAndReceive`操作中,其中可以提供一个`MessagePostProcessor`。例如,在你的 POJO 被转换之后,`MessagePostProcessor`允许你在`Message`上设置自定义标题或属性。
从版本 1.4.2 开始,在`RabbitTemplate`-`setBeforePublishPostProcessors()`和`setAfterReceivePostProcessors()`中添加了额外的扩展点。第一个使后处理器能够在发送到 RabbitMQ 之前立即运行。在使用批处理时(参见[Batching](#template-batching)),这是在组装批处理之后和发送批处理之前调用的。第二个是在接收到消息后立即调用的。
这些扩展点用于诸如压缩的特征,并且为此目的,提供了几个`MessagePostProcessor`实现。`GZipPostProcessor`、`ZipPostProcessor`和`DeflaterPostProcessor`在发送消息之前压缩消息,并且`GUnzipPostProcessor`、`UnzipPostProcessor`和`InflaterPostProcessor`解压接收到的消息。
| |从版本 2.1.5 开始,`GZipPostProcessor`可以配置`copyProperties = true`选项,以复制原始消息属性,默认情况下,由于性能原因,这些属性可以重用,并使用压缩内容编码和可选的`MessageProperties.SPRING_AUTO_DECOMPRESS`报头进行修改。
如果保留对原始出站消息的引用,其属性也会发生变化。
因此,如果你的应用程序使用这些消息后处理程序保留出站消息的副本,考虑打开`copyProperties`选项。|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| |从版本 2.2.12 开始,你可以配置压缩后置处理器在内容编码元素之间使用的分隔符。
在版本 2.2.11 和之前,这是硬编码为`:`,现在默认设置为`, `。
解压程序将同时使用这两个分隔符。
但是,如果你使用 2.3 或更高版本发布消息,而使用 2.2.11 或更高版本,则必须将压缩器上的`encodingDelimiter`属性设置为`:`。
当你的用户升级到 2.2.11 或更高版本时,你可以恢复到`, `的默认值。|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
类似地,`SimpleMessageListenerContainer`也有一个`setAfterReceivePostProcessors()`方法,允许在容器接收到消息后执行解压缩。
从版本 2.1.4 开始,`addBeforePublishPostProcessors()`和`addAfterReceivePostProcessors()`已添加到`RabbitTemplate`中,以允许将新的后处理程序分别附加到发布前和接收后处理程序的列表中。还提供了删除后置处理器的方法。类似地,`AbstractMessageListenerContainer`还添加了`addAfterReceivePostProcessors()`和`removeAfterReceivePostProcessor()`方法。有关更多详细信息,请参见`RabbitTemplate`和`AbstractMessageListenerContainer`的 javadoc。
#### 4.1.10.请求/回复消息
`AmqpTemplate`还提供了各种`sendAndReceive`方法,这些方法接受前面描述的用于单向发送操作的相同参数选项(`exchange`,`routingKey`和`Message`)。这些方法在请求-回复场景中非常有用,因为它们在发送之前处理必要的`reply-to`属性的配置,并且可以在内部为此目的创建的独占队列中侦听应答消息。
在将`MessageConverter`应用于请求和答复时,也可以使用类似的请求-答复方法。这些方法被命名为`convertSendAndReceive`。有关更多详细信息,请参见`AmqpTemplate`的[javadoc](https://DOCS. Spring.io/ Spring-amqp/DOCS/latest-ga/api/org/springframework/amqp/core/amqptemplate.html)。
从版本 1.5.0 开始,每个`sendAndReceive`方法变体都有一个重载版本,它接受`CorrelationData`。与正确配置的连接工厂一起,这将为发送端的操作启用发行者确认的接收。有关更多信息,请参见[相关发布者确认并返回](#template-confirms)和[javadoc for`RabbitOperations`](https://DOCS. Spring.io/ Spring-amqp/DOCS/latest-ga/api/org/springframework/amqp/rabbit/core/rabbitoperations.html)。
从版本 2.0 开始,这些方法有一些变体(`convertSendAndReceiveAsType`),它们接受一个额外的`ParameterizedTypeReference`参数来转换复杂的返回类型。模板必须配置为`SmartMessageConverter`。有关更多信息,请参见[从`Message`转换为`RabbitTemplate`]。
从版本 2.1 开始,你可以使用`noLocalReplyConsumer`选项配置`RabbitTemplate`,以控制用于回复消费者的`noLocal`标志。默认情况下,这是`false`。
##### 回复超时
默认情况下,发送和接收方法在 5 秒后超时并返回 null。你可以通过设置`replyTimeout`属性来修改此行为。从版本 1.5 开始,如果你将`mandatory`属性设置为`true`(或者对于特定的消息,`mandatory-expression`计算为`true`),如果无法将消息传递到队列,则将抛出`AmqpMessageReturnedException`。此异常具有`returnedMessage`、`replyCode`和`replyText`属性,以及用于发送的`exchange`和`routingKey`属性。
| |此功能使用 Publisher Returns。
你可以通过在`CachingConnectionFactory`上将`true`设置为`true`(参见[发布者确认并返回](#cf-pub-conf-ret))来启用它。
此外,你还必须没有用`ReturnCallback`注册你自己的`ReturnCallback`。|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
从版本 2.1.2 开始,添加了一个`replyTimedOut`方法,让子类被告知超时,以便它们可以清理任何保留的状态。
从版本 2.0.11 和 2.1.3 开始,当你使用默认的`DirectReplyToMessageListenerContainer`时,你可以通过设置模板的`replyErrorHandler`属性来添加错误处理程序。对于任何失败的交付,例如在没有相关标头的情况下收到的延迟回复和消息,都会调用此错误处理程序。传入的异常是`ListenerExecutionFailedException`,它具有`failedMessage`属性。
##### RabbitMQ 直接回复-回复
| |从 3.4.0 版本开始,RabbitMQ 服务器支持[直接回复](https://www.rabbitmq.com/direct-reply-to.html)。
这消除了固定回复队列的主要原因(以避免需要创建临时队列)对于每个请求)。
以 Spring AMQP 版本 1.4.1 开始的直接回复默认情况下使用(如果服务器支持的话),而不是创建临时回复队列。
当没有`replyQueue`被提供时(或者它的名称设置为`amq.rabbitmq.reply-to`),`RabbitTemplate`自动检测是否支持直接回复,并使用它或退回到使用临时回复队列。
当使用直接回复时,不需要`reply-listener`,也不应进行配置。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
响应侦听器仍然支持命名队列(`amq.rabbitmq.reply-to`除外),允许控制响应并发等。
从版本 1.6 开始,如果你希望对每个回复使用一个临时的、排他的、自动删除队列,请将`useTemporaryReplyQueues`属性设置为`true`。如果设置`replyAddress`,则忽略此属性。
你可以通过子类化`RabbitTemplate`并重写`useDirectReplyTo()`来检查不同的条件,从而更改决定是否使用直接回复的条件。该方法仅在发送第一个请求时调用一次。
在版本 2.0 之前,`RabbitTemplate`为每个请求创建一个新的使用者,并在收到答复(或超时)时取消该使用者。现在,模板使用`DirectReplyToMessageListenerContainer`代替,让消费者被重用。该模板仍然负责将回复进行关联,因此不存在将延迟的回复发送给其他发件人的风险。如果要恢复到以前的行为,请将`useDirectReplyToContainer`(使用 XML 配置时`direct-reply-to-container`)属性设置为 false。
`AsyncRabbitTemplate`没有这样的选项。当使用直接回复时,它总是使用`DirectReplyToContainer`作为回复。
从版本 2.3.7 开始,模板有一个新的属性`useChannelForCorrelation`。当这是`true`时,服务器不必将相关 ID 从请求消息头复制到回复消息。相反,用于发送请求的通道用于将答复与请求关联起来。
##### 与回复队列的消息相关性
当使用固定的应答队列(`amq.rabbitmq.reply-to`除外)时,必须提供相关数据,以便能够将应答与请求关联起来。见[RabbitMQ 远程过程调用](https://www.rabbitmq.com/tutorials/tutorial-six-java.html)。默认情况下,标准`correlationId`属性用于保存相关数据。但是,如果希望使用自定义属性来保存相关数据,则可以在 \上设置`correlation-key`属性。显式地将属性设置为`correlationId`与省略该属性相同。对于相关数据,客户机和服务器必须使用相同的报头。
| |Spring AMQP 版本 1.1 对此数据使用了一个名为`spring_reply_correlation`的自定义属性。
如果你希望用当前版本恢复到此行为(也许是为了与使用 1.1 的另一个应用程序保持兼容性),则必须将该属性设置为`spring_reply_correlation`。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
默认情况下,模板会生成自己的相关 ID(忽略用户提供的任何值)。如果你希望使用自己的相关 ID,请将`RabbitTemplate`实例的`userCorrelationId`属性设置为`true`。
| |相关 ID 必须是唯一的,以避免对请求返回错误答复的可能性。|
|---|---------------------------------------------------------------------------------------------------------|
##### 回复侦听器容器
当使用 3.4.0 之前的 RabbitMQ 版本时,将为每个回复使用一个新的临时队列。但是,可以在模板上配置单个回复队列,这样效率更高,还可以让你在该队列上设置参数。但是,在这种情况下,你还必须提供一个 \子元素。这个元素为应答队列提供了一个侦听器容器,模板就是侦听器。在 \上允许的所有[消息侦听器容器配置](#containerAttributes)属性在元素上都是允许的,但`connection-factory`和`message-converter`除外,它们是从模板的配置中继承而来的。
| |如果你运行应用程序的多个实例或使用多个`RabbitTemplate`实例,则**MUST**对每个实例使用唯一的回复队列,
RabbitMQ 无法从队列中选择消息,因此,如果它们都使用相同的队列,每个实例都会竞争回复,而不一定会收到自己的回复。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
下面的示例定义了一个带有连接工厂的 Rabbit 模板:
```
```
虽然容器和模板共享一个连接工厂,但它们不共享一个通道。因此,请求和响应不是在同一个事务(如果是事务的话)中执行的。
| |在 1.5.0 版本之前,`reply-address`属性是不可用的,
回复总是通过使用默认的 Exchange 和`reply-queue`名称作为路由密钥来路由的,
仍然是默认的,但是你现在可以指定新的`reply-address`属性。
`reply-address`中的`reply-address`可以包含一个具有`/`表单的地址,并且将回复路由到指定的交换并被路由到与路由密钥绑定的队列。
`reply-address`的优先权高于`reply-queue`。
当仅使用`reply-address`时,``必须配置为单独的``组件。
`reply-address`和`reply-queue`(或`queues`属性在``上)必须在逻辑上引用相同的队列。|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
在这种配置下,使用`SimpleListenerContainer`来接收回复,而`RabbitTemplate`是`MessageListener`。当使用``名称空间元素定义模板时,如前面的示例所示,解析器将模板中的容器和线定义为侦听器。
| |当模板不使用固定的`replyQueue`(或正在使用直接回复—参见[RabbitMQ 直接回复](#direct-reply-to))时,不需要侦听器容器。
直接`reply-to`是使用 RabbitMQ3.4.0 或更高版本时的首选机制。|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
如果你将`RabbitTemplate`定义为``,或者使用`@Configuration`类将其定义为`@Bean`,或者当你以编程方式创建模板时,你需要自己定义并连接应答侦听器容器。如果没有做到这一点,模板将永远不会收到回复,最终会超时并返回 null 作为对`sendAndReceive`方法的调用的回复。
从版本 1.5 开始,`RabbitTemplate`检测是否已将其配置为`MessageListener`以接收回复。如果不是,则尝试用`IllegalStateException`发送和接收带有回复地址的消息失败(因为这些回复永远不会收到)。
此外,如果使用了简单的`replyAddress`(队列名称),则应答侦听器容器将验证它正在侦听具有相同名称的队列。如果回复地址是 Exchange 和 Routing Key,并且写入了调试日志消息,则无法执行此检查。
| |在连接应答侦听器和模板时,重要的是要确保模板的`replyAddress`和容器的`queues`(或`queueNames`)属性指向相同的队列。
模板将应答地址插入到出站消息`replyTo`属性中。|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
下面的清单展示了如何手动连接 bean 的示例:
```
```
```
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}
@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}
```
`RabbitTemplate`中显示了一个完整的[这个测试用例](https://github.com/spring-projects/spring-amqp/tree/main/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/JavaConfigFixedReplyQueueTests.java)示例,该示例连接了一个固定的应答队列,以及一个处理请求并返回应答的“远程”侦听器容器。
| |当回复超时(`replyTimeout`)时,`sendAndReceive()`方法返回 null。|
|---|--------------------------------------------------------------------------------------|
在版本 1.3.6 之前,对超时邮件的延迟回复只会被记录。现在,如果收到了一个延迟的回复,它将被拒绝(模板抛出一个`AmqpRejectAndDontRequeueException`)。如果回复队列被配置为将被拒绝的消息发送到死信交换,则可以检索该回复以供以后进行分析。要做到这一点,需要将一个队列与配置的死信交换绑定,其路由密钥与回复队列的名称相等。
有关配置死字的更多信息,请参见[RabbitMQ 死信文档](https://www.rabbitmq.com/dlx.html)。你还可以查看`FixedReplyQueueDeadLetterTests`测试用例。
##### 异步兔子模板
1.6 版引入了`AsyncRabbitTemplate`。它具有与[`AmqpTemplate`](#amqp-template)上的方法类似的`sendAndReceive`(和`convertSendAndReceive`)方法。然而,它们返回的不是阻塞,而是`ListenableFuture`。
`sendAndReceive`方法返回一个`RabbitMessageFuture`。`convertSendAndReceive`方法返回一个`RabbitConverterFuture`。
你可以稍后通过在 Future 上调用`get()`来同步检索结果,也可以注册一个与结果异步调用的回调。下面的清单显示了这两种方法:
```
@Autowired
private AsyncRabbitTemplate template;
...
public void doSomeWorkAndGetResultLater() {
...
ListenableFuture future = this.template.convertSendAndReceive("foo");
// do some more work
String reply = null;
try {
reply = future.get();
}
catch (ExecutionException e) {
...
}
...
}
public void doSomeWorkAndGetResultAsync() {
...
RabbitConverterFuture future = this.template.convertSendAndReceive("foo");
future.addCallback(new ListenableFutureCallback() {
@Override
public void onSuccess(String result) {
...
}
@Override
public void onFailure(Throwable ex) {
...
}
});
...
}
```
如果设置了`mandatory`并且消息无法传递,则 Future 抛出一个`ExecutionException`,原因为`AmqpMessageReturnedException`,该原因封装了返回的消息和有关返回的信息。
如果设置了`enableConfirms`,则 Future 有一个名为`confirm`的属性,它本身是一个`ListenableFuture`,带有`true`,表示成功发布。如果确认的 future 是`false`,则`RabbitFuture`还具有一个名为`nackCause`的属性,如果可用,则该属性包含失败的原因。
| |如果在回复之后收到了发布者确认,则该发布者确认将被丢弃,因为该回复意味着成功发布。|
|---|-------------------------------------------------------------------------------------------------------------------|
你可以将模板上的`receiveTimeout`属性设置为超时回复(默认设置为`30000`-30 秒)。如果发生超时,则使用`AmqpReplyTimeoutException`完成 future。
模板实现`SmartLifecycle`。在存在挂起的回复时停止模板,将导致挂起的`Future`实例被取消。
从版本 2.0 开始,异步模板现在支持[直接回复](https://www.rabbitmq.com/direct-reply-to.html),而不是配置的回复队列。要启用此功能,请使用以下构造函数之一:
```
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
public AsyncRabbitTemplate(RabbitTemplate template)
```
参见[RabbitMQ 直接回复](#direct-reply-to),以使用直接回复与同步`RabbitTemplate`。
版本 2.0 引入了这些方法的变体(`convertSendAndReceiveAsType`),这些方法接受一个额外的`ParameterizedTypeReference`参数来转换复杂的返回类型。你必须使用`SmartMessageConverter`配置底层`RabbitTemplate`。有关更多信息,请参见[从`Message`转换为`RabbitTemplate`]。
##### Spring 带有 AMQP 的远程控制
| |该功能已被弃用,将在 3.0 中删除。
它已被[处理异常](#annotation-error-handling)取代了很长一段时间,`returnExceptions`被设置为 true,并在发送端配置了`RemoteInvocationAwareMessageConverterAdapter`。
有关更多信息,请参见[处理异常](#annotation-error-handling)。|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
Spring 框架具有一般的远程处理能力,允许使用各种传输的[远程过程调用](https://docs.spring.io/spring/docs/current/spring-framework-reference/html/remoting.html)。 Spring-AMQP 支持类似的机制,在客户端上是`AmqpProxyFactoryBean`,在服务器上是`AmqpInvokerServiceExporter`。这为 AMQP 提供了 RPC。在客户端,使用`RabbitTemplate`作为对[earlier](#reply-listener)的描述。在服务器端,调用者(配置为`MessageListener`)接收消息,调用配置的服务,并使用入站消息的`replyTo`信息返回回复。
你可以将客户端工厂 Bean 注入任何 Bean(通过使用其`serviceInterface`)。然后,客户机可以调用代理上的方法,从而在 AMQP 上进行远程执行。
| |对于默认的`MessageConverter`实例,方法参数和返回的值必须是`Serializable`的实例。|
|---|----------------------------------------------------------------------------------------------------------------------------|
在服务器端,`AmqpInvokerServiceExporter`同时具有`AmqpTemplate`和`MessageConverter`属性。目前,没有使用模板的`MessageConverter`。如果需要提供自定义消息转换器,则应该通过设置`messageConverter`属性来提供它。在客户端,你可以向`AmqpTemplate`添加自定义消息转换器,该消息转换器通过使用其`amqpTemplate`属性提供给`AmqpProxyFactoryBean`。
下面的清单显示了示例客户机和服务器配置:
```
```
```
```
| |`AmqpInvokerServiceExporter`只能处理格式正确的消息,例如从`AmqpProxyFactoryBean`发送的消息。
如果收到无法解释的消息,将发送序列化的`RuntimeException`作为回复。
如果消息没有`replyToAddress`属性,如果没有配置死信交换,则消息将被拒绝并永久丢失。|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| |默认情况下,如果无法传递请求消息,则调用线程最终超时,并抛出一个`RemoteProxyFailureException`。
默认情况下,超时为 5 秒。
你可以通过在`RabbitTemplate`上设置`replyTimeout`属性来修改持续时间,
从 1.5 版本开始,通过将`mandatory`属性设置为`true`并在连接工厂上启用返回(参见[发布者确认并返回](#cf-pub-conf-ret)),调用线程抛出一个`AmqpMessageReturnedException`。
查看[回复超时](#reply-timeout)以获取更多信息。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
#### 4.1.11.配置代理
AMQP 规范描述了如何使用该协议在代理上配置队列、交换和绑定。在`org.springframework.amqp.core`包中的`AmqpAdmin`接口中存在这些操作(可从 0.8 规范和更高版本移植)。该类的 RabbitMQ 实现`RabbitAdmin`位于`org.springframework.amqp.rabbit.core`包中。
`AmqpAdmin`接口基于使用 Spring AMQP 域抽象,如以下清单所示:
```
public interface AmqpAdmin {
// Exchange Operations
void declareExchange(Exchange exchange);
void deleteExchange(String exchangeName);
// Queue Operations
Queue declareQueue();
String declareQueue(Queue queue);
void deleteQueue(String queueName);
void deleteQueue(String queueName, boolean unused, boolean empty);
void purgeQueue(String queueName, boolean noWait);
// Binding Operations
void declareBinding(Binding binding);
void removeBinding(Binding binding);
Properties getQueueProperties(String queueName);
}
```
另见[作用域操作](#scoped-operations)。
`getQueueProperties()`方法返回一些关于队列的有限信息(消息计数和消费者计数)。返回的属性的键在`RabbitTemplate`(`QUEUE_NAME`、`QUEUE_MESSAGE_COUNT`和`QUEUE_CONSUMER_COUNT`)中作为常量可用。[RabbitMQ REST API](#management-rest-api)在`QueueInfo`对象中提供了更多的信息。
no-arg`declareQueue()`方法在代理上定义了一个具有自动生成的名称的队列。这个自动生成队列的附加属性是`exclusive=true`、`autoDelete=true`和`durable=false`。
`declareQueue(Queue queue)`方法接受一个`Queue`对象,并返回声明的队列的名称。如果所提供的`name`的`Queue`属性是空的`String`,则代理将使用生成的名称声明队列。该名称将返回给调用者。该名称也被添加到`Queue`的`actualName`属性中。你只能通过直接调用`RabbitAdmin`以编程方式使用此功能。当在应用程序上下文中以声明方式定义队列时,管理员使用自动声明时,可以将 name 属性设置为`""`(空字符串)。然后,代理创建名称。从版本 2.1 开始,侦听器容器可以使用这种类型的队列。有关更多信息,请参见[容器和以代理命名的队列](#containers-and-broker-named-queues)。
这与`AnonymousQueue`相反,该框架生成唯一的(`UUID`)名称,并将`durable`设置为`false`和`exclusive`,`autoDelete`设置为`true`。带有空(或缺少)``属性的`name`总是创建`AnonymousQueue`。
参见[`AnonymousQueue`](#anonymous-queue)以了解为什么`AnonymousQueue`比代理生成的队列名称更受欢迎,以及如何控制名称的格式。从版本 2.1 开始,默认情况下,匿名队列的声明参数`Queue.X_QUEUE_LEADER_LOCATOR`设置为`client-local`。这确保了队列是在应用程序连接的节点上声明的。声明式队列必须具有固定的名称,因为它们可能在上下文的其他地方被引用——例如在以下示例中显示的侦听器中:
```
```
见[交换、队列和绑定的自动声明](#automatic-declaration)。
这个接口的 RabbitMQ 实现是`RabbitAdmin`,当使用 Spring XML 进行配置时,它类似于以下示例:
```
```
当`CachingConnectionFactory`缓存模式是`CHANNEL`(默认值)时,`RabbitAdmin`实现对在同一`ApplicationContext`中声明的队列、交换和绑定进行自动延迟声明。一旦向代理打开`Connection`,就会声明这些组件。有一些名称空间特性使其非常方便——例如,在 Stocks 示例应用程序中,我们有以下内容:
```
```
在前面的示例中,我们使用匿名队列(实际上,在内部,只使用由框架(而不是由代理)生成的名称的队列),并通过 ID 引用它们。我们还可以使用显式名称来声明队列,这些名称也可以作为上下文中其 Bean 定义的标识符。下面的示例使用显式名称配置队列:
```
```
| |你可以同时提供`id`和`name`属性。
这允许你引用队列(例如,
它还允许标准 Spring 特性(例如用于队列名称的属性占位符和 SPEL 表达式)。
当你使用名称作为 Bean 标识符时,这些特性是不可用的。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
队列可以配置额外的参数——例如,`x-message-ttl`。当你使用名称空间支持时,它们以参数-名称/参数-值对的`Map`形式提供,这是通过使用``元素定义的。下面的示例展示了如何做到这一点:
```
```
默认情况下,参数被假定为字符串。对于其他类型的参数,你必须提供该类型。下面的示例展示了如何指定类型:
```
```
当提供混合类型的参数时,你必须为每个条目元素提供类型。下面的示例展示了如何做到这一点:
```
100
```
在 Spring Framework3.2 及更高版本中,可以更简洁地声明这一点,如下所示:
```
```
在使用 Java 配置时,`Queue.X_QUEUE_LEADER_LOCATOR`类上的`setLeaderLocator()`方法支持`Queue`参数作为第一类属性。从版本 2.1 开始,匿名队列的声明默认设置为`client-local`。这确保了队列是在应用程序连接到的节点上声明的。
| |RabbitMQ 代理不允许声明具有不匹配参数的队列。
例如,如果`queue`已经存在一个不带`time to live`参数的`queue`队列,并且尝试使用(例如)`key="x-message-ttl" value="100"`声明它,则抛出一个异常。|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
默认情况下,当发生异常时,`RabbitAdmin`立即停止处理所有声明。这可能会导致下游问题,例如由于未声明另一个队列(在错误队列之后定义),侦听器容器无法初始化。
可以通过在`RabbitAdmin`实例上将`ignore-declaration-exceptions`属性设置为`true`来修改此行为。此选项指示`RabbitAdmin`记录异常并继续声明其他元素。当使用 Java 配置`RabbitAdmin`时,此属性称为`ignoreDeclarationExceptions`。这是一个适用于所有元素的全局设置。队列、交换和绑定具有类似的属性,仅适用于这些元素。
在版本 1.6 之前,此属性仅在通道上发生`IOException`时才生效,例如当前属性与期望属性之间存在不匹配时。现在,此属性对任何异常都生效,包括`TimeoutException`和其他异常。
此外,任何声明异常都会导致`DeclarationExceptionEvent`的发布,这是一个`ApplicationEvent`,可以由上下文中的任何`ApplicationListener`使用。该事件包含对管理、正在声明的元素和`Throwable`的引用。
##### headers exchange
从版本 1.3 开始,你可以将`HeadersExchange`配置为在多个头上匹配。你还可以指定是否必须匹配任何或所有标题。下面的示例展示了如何做到这一点:
```
```
从版本 1.6 开始,你可以使用`internal`标志(默认为`false`)配置`Exchanges`,并且通过`RabbitAdmin`在代理上正确配置这样的`Exchange`(如果在应用程序上下文中存在一个)。如果用于交换的`internal`标志是`true`,则 RabbitMQ 不允许客户机使用该交换。这对于死信交换或交换到交换绑定非常有用,在这种情况下,你不希望发布者直接使用该交换。
要查看如何使用 Java 来配置 AMQP 基础架构,请查看股票示例应用程序,其中有`@Configuration`类`AbstractStockRabbitConfiguration`,它依次具有`RabbitClientConfiguration`和`RabbitServerConfiguration`子类。下面的清单显示了`AbstractStockRabbitConfiguration`的代码:
```
@Configuration
public abstract class AbstractStockAppRabbitConfiguration {
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jsonMessageConverter());
configureRabbitTemplate(template);
return template;
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public TopicExchange marketDataExchange() {
return new TopicExchange("app.stock.marketdata");
}
// additional code omitted for brevity
}
```
在股票应用程序中,通过使用以下`@Configuration`类来配置服务器:
```
@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration {
@Bean
public Queue stockRequestQueue() {
return new Queue("app.stock.request");
}
}
```
这是`@Configuration`类的整个继承链的结束。最终结果是在应用程序启动时向代理声明`TopicExchange`和`Queue`。在服务器配置中,没有将`TopicExchange`绑定到队列,这是在客户机应用程序中完成的。但是,股票请求队列会自动绑定到 AMQP 默认交换。此行为由规范定义。
客户机`@Configuration`类更有趣一些。其声明如下:
```
@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
/**
* Binds to the market data exchange.
* Interested in any stock quotes
* that match its routing key.
*/
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
// additional code omitted for brevity
}
```
客户机通过`AmqpAdmin`上的`declareQueue()`方法声明另一个队列。它使用一种路由模式将队列绑定到市场数据交换,该路由模式在属性文件中具体化。
##### 用于队列和交换的 Builder API
版本 1.6 引入了一个方便的 Fluent API,用于在使用 Java 配置时配置`Queue`和`Exchange`对象。下面的示例展示了如何使用它:
```
@Bean
public Queue queue() {
return QueueBuilder.nonDurable("foo")
.autoDelete()
.exclusive()
.withArgument("foo", "bar")
.build();
}
@Bean
public Exchange exchange() {
return ExchangeBuilder.directExchange("foo")
.autoDelete()
.internal()
.withArgument("foo", "bar")
.build();
}
```
参见[`org.springframework.amqp.core.QueueBuilder`](https://DOCS. Spring.io/ Spring-amqp/DOCS/latest-ga/api/org/springframework/amqp/core/queuebuilder.html)和[`org.springframework.amqp.core.ExchangeBuilder`(https://DOCS. Spring.io/ Spring-amqp/DOCS/latest-ga/api/org/springframf/amqp/core/exchangebuilder.html)以获取更多信息。
从版本 2.0 开始,`ExchangeBuilder`现在默认情况下创建持久交换,以与单个`AbstractExchange`类上的简单构造函数保持一致。要与构建器进行非持久交换,在调用`.build()`之前使用`.durable(false)`。不再提供不带参数的`durable()`方法。
2.2 版引入了 Fluent API,以添加“众所周知的”交换和队列参数。
```
@Bean
public Queue allArgs1() {
return QueueBuilder.nonDurable("all.args.1")
.ttl(1000)
.expires(200_000)
.maxLength(42)
.maxLengthBytes(10_000)
.overflow(Overflow.rejectPublish)
.deadLetterExchange("dlx")
.deadLetterRoutingKey("dlrk")
.maxPriority(4)
.lazy()
.leaderLocator(LeaderLocator.minLeaders)
.singleActiveConsumer()
.build();
}
@Bean
public DirectExchange ex() {
return ExchangeBuilder.directExchange("ex.with.alternate")
.durable(true)
.alternate("alternate")
.build();
}
```
##### 声明交换、队列和绑定的集合
你可以在`Declarable`对象(`Queue`,`Exchange`,和`Binding`)的集合中包装`Declarables`对象。`RabbitAdmin`在应用程序上下文中检测此类 bean(以及离散`Declarable`bean),并在每次建立连接时(最初和连接失败后)在代理上声明所包含的对象。下面的示例展示了如何做到这一点:
```
@Configuration
public static class Config {
@Bean
public CachingConnectionFactory cf() {
return new CachingConnectionFactory("localhost");
}
@Bean
public RabbitAdmin admin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
}
@Bean
public DirectExchange e1() {
return new DirectExchange("e1", false, true);
}
@Bean
public Queue q1() {
return new Queue("q1", false, false, true);
}
@Bean
public Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}
@Bean
public Declarables es() {
return new Declarables(
new DirectExchange("e2", false, true),
new DirectExchange("e3", false, true));
}
@Bean
public Declarables qs() {
return new Declarables(
new Queue("q2", false, false, true),
new Queue("q3", false, false, true));
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Declarables prototypes() {
return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
}
@Bean
public Declarables bs() {
return new Declarables(
new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
}
@Bean
public Declarables ds() {
return new Declarables(
new DirectExchange("e4", false, true),
new Queue("q4", false, false, true),
new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
}
}
```
| |在 2.1 之前的版本中,你可以通过定义类型为`Collection`的 bean 来声明多个`Declarable`实例,
在某些情况下,这可能会导致不良的副作用,因为管理员必须迭代所有`Collection>`bean。,
现在禁用此功能,以支持`Declarables`,正如前面在这一节中所讨论的,
你可以通过将`RabbitAdmin`属性设置为`declareCollections`来恢复到以前的行为。|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
版本 2.2 将`getDeclarablesByType`方法添加到`Declarables`中;这可以作为一种方便,例如,在声明侦听器容器 Bean 时使用。
```
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
Declarables mixedDeclarables, MessageListener listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
container.setMessageListener(listener);
return container;
}
```
##### 条件声明
默认情况下,所有队列、交换和绑定都由应用程序上下文中的所有`RabbitAdmin`实例声明(假设它们具有`auto-startup="true"`)。
从版本 2.1.9 开始,`RabbitAdmin`有一个新的属性`explicitDeclarationsOnly`(默认情况下是`false`);当将其设置为`true`时,管理员将只声明显式配置为由该管理员声明的 bean。
| |从 1.2 版本开始,你可以有条件地声明这些元素。
当应用程序连接到多个代理并且需要指定应该用哪些代理声明特定元素时,这一点特别有用。|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
表示这些元素的类实现`Declarable`,它有两个方法:`shouldDeclare()`和`getDeclaringAdmins()`。`RabbitAdmin`使用这些方法来确定特定实例是否应该实际处理其`Connection`上的声明。
这些属性可以作为名称空间中的属性使用,如以下示例所示:
```
```
| |默认情况下,`auto-declare`属性是`true`,并且,如果`declared-by`没有提供(或者是空的),那么所有`RabbitAdmin`实例都会声明对象(只要管理员的`auto-startup`属性是`true`,默认的,并且管理员的`explicit-declarations-only`属性是假的)。|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
类似地,你可以使用基于 Java 的`@Configuration`来实现相同的效果。在下面的示例中,组件由`admin1`声明,但不是由`admin2`声明:
```
@Bean
public RabbitAdmin admin1() {
return new RabbitAdmin(cf1());
}
@Bean
public RabbitAdmin admin2() {
return new RabbitAdmin(cf2());
}
@Bean
public Queue queue() {
Queue queue = new Queue("foo");
queue.setAdminsThatShouldDeclare(admin1());
return queue;
}
@Bean
public Exchange exchange() {
DirectExchange exchange = new DirectExchange("bar");
exchange.setAdminsThatShouldDeclare(admin1());
return exchange;
}
@Bean
public Binding binding() {
Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
binding.setAdminsThatShouldDeclare(admin1());
return binding;
}
```
##### 关于`id`和`name`属性的注释
``和``元素上的`name`属性反映了代理中实体的名称。对于队列,如果省略`name`,则创建一个匿名队列(参见[`AnonymousQueue`](#anonymous-queue))。
在 2.0 之前的版本中,`name`也被注册为 Bean 名称别名(类似于`name`上的``元素)。
这造成了两个问题:
* 它阻止了使用相同名称的队列和交换的声明。
* 如果别名包含 SPEL 表达式(`#{…}`),则该别名将不会解析。
从版本 2.0 开始,如果你声明其中一个元素,同时带有`id`*和*一个`name`属性,则该名称将不再声明为 Bean name 别名。如果你希望声明一个队列并使用相同的`name`进行交换,则必须提供一个`id`。
如果元素仅具有`name`属性,则不会发生更改。 Bean 仍然可以被`name`引用——例如,在绑定声明中。但是,如果名称包含 SPEL,则仍然不能引用它——你必须提供`id`以供引用。
##### `AnonymousQueue`
通常,当你需要一个唯一命名的、排他的、自动删除的队列时,我们建议你使用`AnonymousQueue`而不是代理定义的队列名称(使用`""`作为`Queue`名称,使代理生成队列名称)。
这是因为:
1. 队列实际上是在建立到代理的连接时声明的。这是在豆子被创建并连接在一起之后很久的事情。使用队列的 bean 需要知道它的名称。实际上,在启动应用程序时,代理程序甚至可能不在运行。
2. 如果由于某种原因丢失了与代理的连接,则管理员将重新声明同名的`AnonymousQueue`。如果我们使用代理声明的队列,队列名称将会更改。
你可以控制`AnonymousQueue`实例使用的队列名称的格式。
默认情况下,队列名称的前缀是`spring.gen-`,后面跟着`UUID`的 base64 表示形式——例如:`spring.gen-MRBv9sqISkuCiPfOYfpo4g`。
可以在构造函数参数中提供`AnonymousQueue.NamingStrategy`实现。下面的示例展示了如何做到这一点:
```
@Bean
public Queue anon1() {
return new AnonymousQueue();
}
@Bean
public Queue anon2() {
return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}
@Bean
public Queue anon3() {
return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}
```
第一个 Bean 生成一个以`spring.gen-`为前缀的队列名称,后面跟着`UUID`的 base64 表示——例如:`spring.gen-MRBv9sqISkuCiPfOYfpo4g`。第二 Bean 生成以`something-`为前缀的队列名称,后跟`UUID`的 base64 表示。第三个 Bean 只使用 UUID(不使用 base64 转换)生成一个名称——例如,`f20c818a-006b-4416-bf91-643590fedb0e`。
Base64 编码使用了 RFC4648 中的“URL 和文件名安全字母表”。删除尾随填充字符(`=`)。
你可以提供自己的命名策略,从而可以在队列名称中包括其他信息(例如应用程序名称或客户端主机)。
在使用 XML 配置时,可以指定命名策略。对于实现`AnonymousQueue.NamingStrategy`的 Bean 引用,在``元素上存在`naming-strategy`属性。以下示例展示了如何以各种方式指定命名策略:
```
```
第一个示例创建了`spring.gen-MRBv9sqISkuCiPfOYfpo4g`之类的名称。第二个示例使用 UUID 的字符串表示形式创建名称。第三个示例创建了`custom.gen-MRBv9sqISkuCiPfOYfpo4g`之类的名称。
你还可以提供自己的命名策略 Bean。
从版本 2.1 开始,默认情况下,匿名队列的声明参数`Queue.X_QUEUE_LEADER_LOCATOR`设置为`client-local`。这确保了队列是在应用程序连接的节点上声明的。在构造实例之后,你可以通过调用`queue.setLeaderLocator(null)`来恢复到以前的行为。
##### 恢复自动删除声明
通常,`RabbitAdmin`(s)只恢复在应用程序上下文中声明为 bean 的队列/交换/绑定;如果任何此类声明是自动删除的,则如果连接丢失,代理将删除它们。当重新建立连接时,管理员将重新声明这些实体。通常,通过调用`admin.declareQueue(…)`、`admin.declareExchange(…)`和`admin.declareBinding(…)`创建的实体将不会被恢复。
从版本 2.4 开始,管理人员有一个新的属性`redeclareManualDeclarations`;如果为真,管理人员将恢复这些实体以及应用程序上下文中的 bean。
如果调用`deleteQueue(…)`、`deleteExchange(…)`或`removeBinding(…)`,则不会执行单个声明的恢复。删除队列和交换时,将从可恢复实体中删除相关的绑定。
最后,调用`resetAllManualDeclarations()`将阻止恢复任何先前声明的实体。
#### 4.1.12.代理事件监听器
当启用[事件交换插件 Name](https://www.rabbitmq.com/event-exchange.html)时,如果将类型`BrokerEventListener`的 Bean 添加到应用程序上下文中,则它将所选的代理事件发布为`BrokerEvent`实例,该实例可以通过正常的 Spring `ApplicationListener`或`@EventListener`方法来使用。事件由代理发布到主题交换`amq.rabbitmq.event`,每个事件类型都有不同的路由密钥。侦听器使用事件键,用于将`AnonymousQueue`绑定到交换,以便侦听器仅接收选定的事件。由于这是一个主题交换,所以可以使用通配符(以及显式地请求特定事件),如下例所示:
```
@Bean
public BrokerEventListener eventListener() {
return new BrokerEventListener(connectionFactory(), "user.deleted", "channel.#", "queue.#");
}
```
通过使用普通 Spring 技术,可以进一步缩小单个事件侦听器中接收到的事件的范围,如下例所示:
```
@EventListener(condition = "event.eventType == 'queue.created'")
public void listener(BrokerEvent event) {
...
}
```
#### 4.1.13.延迟的消息交换
版本 1.6 引入了对[延迟消息交换插件 Name](https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/)的支持
| |该插件目前被标记为试验性的,但已经有一年多的时间可以使用了(在撰写本文时),
如果需要对该插件进行更改,我们计划尽快添加对此类更改的支持,由于这个原因,此功能已在 RabbitMQ3.6.0 和版本 0.0.1 的插件中进行了测试。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
要使用`RabbitAdmin`来声明一个 Exchange 为延迟,可以将 Exchange Bean 上的`delayed`属性设置为`true`。`RabbitAdmin`使用交换类型(`Direct`,`Fanout`,以此类推)来设置`x-delayed-type`参数,并用类型`x-delayed-message`声明交换。
当使用 XML 配置 Exchange bean 时,`delayed`属性(默认:`false`)也可用。下面的示例展示了如何使用它:
```
```
要发送延迟消息,可以通过`MessageProperties`设置`x-delay`头,如下例所示:
```
MessageProperties properties = new MessageProperties();
properties.setDelay(15000);
template.send(exchange, routingKey,
MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build());
```
```
rabbitTemplate.convertAndSend(exchange, routingKey, "foo", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(15000);
return message;
}
});
```
要检查消息是否延迟,请在`MessageProperties`上使用`getReceivedDelay()`方法。它是一个单独的属性,以避免意外传播到由输入消息生成的输出消息。
#### 4.1.14.RabbitMQ REST API
启用管理插件后,RabbitMQ 服务器公开一个 REST API 来监视和配置代理。a[API 的 Java 绑定](https://github.com/rabbitmq/hop)现已提供。`com.rabbitmq.http.client.Client`是一个标准的、直接的 API,因此是阻塞的 API。它是基于[Spring Web](https://docs.spring.io/spring/docs/current/spring-framework-reference/web.html#spring-web)模块及其`RestTemplate`实现的。另一方面,`com.rabbitmq.http.client.ReactorNettyClient`是一个基于[反应堆网状结构](https://projectreactor.io/docs/netty/release/reference/docs/index.html)项目的反应式、非阻塞实现。
跳依赖项(`com.rabbitmq:http-client`)现在也是`optional`。
有关更多信息,请访问他们的 Javadoc。
#### 4.1.15.异常处理
使用 RabbitMQ Java 客户机的许多操作都可以抛出检查过的异常。例如,在很多情况下`IOException`实例可能会被抛出。`RabbitTemplate`、`SimpleMessageListenerContainer`和其他 Spring AMQP 组件捕获这些异常,并将它们转换为`AmqpException`层次结构中的一个异常。这些在’org.springframework.amqp’包中定义,而`AmqpException`是层次结构的基础。
当侦听器抛出异常时,它被包装在`ListenerExecutionFailedException`中。通常情况下,代理会拒绝并重新请求消息。将`defaultRequeueRejected`设置为`false`会导致消息被丢弃(或路由到死信交换)。如[消息侦听器和异步情况](#async-listeners)中所讨论的,侦听器可以抛出一个`AmqpRejectAndDontRequeueException`(或`ImmediateRequeueAmqpException`)来有条件地控制此行为。
但是,有一类错误是侦听器无法控制行为的。当遇到无法转换的消息(例如,无效的`content_encoding`报头)时,在消息到达用户代码之前会抛出一些异常。将`defaultRequeueRejected`设置为`true`(默认)(或抛出`ImmediateRequeueAmqpException`),这样的消息将被一遍又一遍地重新传递。在版本 1.3.2 之前,用户需要编写一个自定义`ErrorHandler`,如[异常处理](#exception-handling)中所讨论的,以避免这种情况。
从版本 1.3.2 开始,默认的`ErrorHandler`现在是一个`ConditionalRejectingErrorHandler`,它拒绝(并且不请求)带有不可恢复错误的失败消息。具体地说,它拒绝具有以下错误的失败消息:
* `o.s.amqp…MessageConversionException`:可以在使用`MessageConverter`转换传入消息有效负载时抛出。
* `o.s.messaging…MessageConversionException`:如果在映射到`@RabbitListener`方法时需要额外的转换,则可以由转换服务抛出。
* `o.s.messaging…MethodArgumentNotValidException`:如果在侦听器中使用验证(例如,`@Valid`)并且验证失败,则可以抛出。
* `o.s.messaging…MethodArgumentTypeMismatchException`:如果入站消息被转换为针对目标方法不正确的类型,则可以抛出该消息。例如,参数被声明为`Message`,但是`Message`被接收。
* `java.lang.NoSuchMethodException`:在版本 1.6.3 中添加。
* `java.lang.ClassCastException`:在版本 1.6.3 中添加。
你可以使用`FatalExceptionStrategy`配置此错误处理程序的实例,以便用户可以为条件消息拒绝提供自己的规则——例如,来自 Spring Retry([消息侦听器和异步情况](#async-listeners))的`BinaryExceptionClassifier`的委托实现。此外,`ListenerExecutionFailedException`现在有一个`failedMessage`属性,你可以在决策中使用它。如果`FatalExceptionStrategy.isFatal()`方法返回`true`,则错误处理程序抛出一个`AmqpRejectAndDontRequeueException`。当异常被确定为致命异常时,默认`FatalExceptionStrategy`会记录一条警告消息。
自版本 1.6.3 以来,将用户异常添加到致命异常列表的一种方便的方法是子类`ConditionalRejectingErrorHandler.DefaultExceptionStrategy`并覆盖`isUserCauseFatal(Throwable cause)`方法,以返回`true`的致命异常。
处理 DLQ 消息的一种常见模式是在这些消息上设置`time-to-live`以及附加的 DLQ 配置,以便这些消息过期并路由回主队列进行重试。这种技术的问题在于,导致致命异常的消息会永远循环。从版本 2.1 开始,`ConditionalRejectingErrorHandler`检测消息上的`x-death`头,该头将导致抛出一个致命的异常。该消息已被记录并丢弃。通过将`ConditionalRejectingErrorHandler`上的`discardFatalsWithXDeath`属性设置为`false`,可以恢复到以前的行为。
| |从版本 2.1.9 开始,具有这些致命异常的消息将被拒绝,并且默认情况下不会重新请求,即使容器确认模式是手动的。
这些异常通常发生在调用侦听器之前,因此侦听器没有机会对消息进行 ACK 或 NACK,因此消息仍处于未 ACKED 状态。
以恢复到先前的行为,将`ConditionalRejectingErrorHandler`上的`rejectManual`属性设置为`false`。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
#### 4.1.16.交易
Spring Rabbit Framework 具有对同步和异步用例中的自动事务管理的支持,其具有许多不同的语义,这些语义可以通过声明方式进行选择,这是 Spring 事务的现有用户所熟悉的。这使得许多即使不是最常见的消息传递模式也很容易实现。
有两种方法可以向框架发出所需的事务语义的信号。在`RabbitTemplate`和`SimpleMessageListenerContainer`中,都有一个标志`channelTransacted`,如果`true`,它告诉框架使用事务通道,并以提交或回滚(取决于结果)结束所有操作(发送或接收),并发出回滚的异常信号。另一个信号是提供具有 Spring 的`PlatformTransactionManager`实现之一的外部事务作为正在进行的操作的上下文。如果在框架发送或接收消息时已经有一个事务在进行中,并且`channelTransacted`标志是`true`,则消息事务的提交或回滚将推迟到当前事务结束时进行。如果`channelTransacted`标志是`false`,则消息传递操作不会应用事务语义(它是自动 ACKED 的)。
`channelTransacted`标志是一个配置时间设置。在创建 AMQP 组件时(通常是在应用程序启动时),对它进行一次声明和处理。外部事务在原则上是更动态的,因为系统在运行时响应当前线程状态。然而,在实践中,当事务以声明方式分层到应用程序上时,它通常也是一个配置设置。
对于使用`RabbitTemplate`的同步用例,外部事务由调用方提供,可以是声明式的,也可以是命令式的(通常的 Spring 事务模型)。下面的示例展示了一种声明式方法(通常更受欢迎,因为它是非侵入性的),其中模板已配置为`channelTransacted=true`:
```
@Transactional
public void doSomething() {
String incoming = rabbitTemplate.receiveAndConvert();
// do some more database processing...
String outgoing = processInDatabaseAndExtractReply(incoming);
rabbitTemplate.convertAndSend(outgoing);
}
```
在前面的示例中,在标记为`@Transactional`的方法中,作为消息体接收、转换和发送`String`有效负载。如果数据库处理出现异常而失败,则传入消息将返回给代理,而传出消息将不会发送。这适用于事务性方法链中带有`RabbitTemplate`的任何操作(例如,除非直接对`Channel`进行操作以尽早提交事务)。
对于带有`SimpleMessageListenerContainer`的异步用例,如果需要一个外部事务,则容器在设置侦听器时必须请求它。为了表示需要外部事务,用户在配置容器时向容器提供`PlatformTransactionManager`的实现。下面的示例展示了如何做到这一点:
```
@Configuration
public class ExampleExternalTransactionAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
}
```
在前面的示例中,事务管理器被添加为从另一个 Bean 定义注入的依赖项(未示出),并且`channelTransacted`标志也被设置为`true`。其结果是,如果侦听器发生异常而失败,事务将被回滚,消息也将返回给代理。值得注意的是,如果事务未能提交(例如,由于数据库约束错误或连接问题),AMQP 事务也将回滚,并将消息返回给代理。这有时被称为“尽最大努力 1 阶段提交”,是可靠消息传递的一个非常强大的模式。如果在前面的示例中将`channelTransacted`标志设置为`false`(默认值),则仍将为侦听器提供外部事务,但是所有消息传递操作都将被自动 ACK,因此其效果是即使在业务操作的回滚时也提交消息传递操作。
##### 条件回滚
在 1.6.6 版本之前,在使用外部事务管理器(例如 JDBC)时,向容器的`transactionAttribute`添加回滚规则不会产生任何效果。异常总是回滚事务。
此外,当在容器的建议链中使用[交易建议](https://docs.spring.io/spring-framework/docs/current/spring-framework-reference/html/transaction.html#transaction-declarative)时,条件回滚不是很有用,因为所有侦听器异常都包装在`ListenerExecutionFailedException`中。
第一个问题已经得到纠正,规则现在得到了适当的应用。而且,现在提供了`ListenerFailedRuleBasedTransactionAttribute`。它是`RuleBasedTransactionAttribute`的一个子类,唯一的区别是它知道`ListenerExecutionFailedException`,并将这种异常的原因用于规则。这个事务属性可以直接在容器中使用,也可以通过事务通知使用。
下面的示例使用了这个规则:
```
@Bean
public AbstractMessageListenerContainer container() {
...
container.setTransactionManager(transactionManager);
RuleBasedTransactionAttribute transactionAttribute =
new ListenerFailedRuleBasedTransactionAttribute();
transactionAttribute.setRollbackRules(Collections.singletonList(
new NoRollbackRuleAttribute(DontRollBackException.class)));
container.setTransactionAttribute(transactionAttribute);
...
}
```
##### 关于回滚接收消息的说明
AMQP 事务只适用于发送给代理的消息和 ACK。因此,当对 Spring 事务进行回滚并且已经接收到消息时, Spring AMQP 不仅必须回滚该事务,而且还必须手动拒绝该消息(有点像 nack,但这不是规范所称的那样)。对消息拒绝所采取的操作独立于事务,并且依赖于`defaultRequeueRejected`属性(默认值:`true`)。有关拒绝失败消息的更多信息,请参见[消息侦听器和异步情况](#async-listeners)。
有关 RabbitMQ 事务及其限制的更多信息,请参见[RabbitMQ 代理语义](https://www.rabbitmq.com/semantics.html)。
| |在 RabbitMQ2.7.0 之前,这样的消息(以及在通道关闭或中止时未加控制的消息)会被发送到 Rabbit Broker 上的队列的后面。
自 2.7.0 以来,被拒绝的消息会被发送到队列的前面,其方式与 JMS 回滚消息类似。|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| |以前,在本地事务回滚和提供`TransactionManager`时,对事务回滚的消息请求是不一致的。
在前一种情况下,应用正常的请求逻辑(`AmqpRejectAndDontRequeueException`或`defaultRequeueRejected=false`)(参见[消息侦听器和异步情况](#async-listeners))。
与事务管理器,从版本 2.0 开始,该行为是一致的,并且在这两种情况下都应用了正常的请求逻辑。
要恢复到以前的行为,你可以将容器的`alwaysRequeueWithTxManagerRollback`属性设置为`true`。
参见[消息侦听器容器配置](#containerAttributes)。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
##### 使用`RabbitTransactionManager`
[RabbitTransactionManager](https://docs.spring.io/spring-amqp/docs/latest_ga/api/org/springframework/amqp/rabbit/transaction/RabbitTransactionManager.html)是在外部事务中执行 Rabbit 操作并与之同步的一种替代方法。此事务管理器是[`PlatformTransactionManager`](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/org/springframework/transactionmanager.html)接口的实现,应该与单个 Rabbit`ConnectionFactory`一起使用。
| |这种策略不能提供 XA 事务——例如,为了在消息传递和数据库访问之间共享事务。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------|
需要应用程序代码来通过`ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)`检索事务性 Rabbit 资源,而不是随后创建通道的标准`Connection.createChannel()`调用。当使用 Spring AMQP 的[RabbitTemplate](https://docs.spring.io/spring-amqp/docs/latest_ga/api/org/springframework/amqp/rabbit/core/RabbitTemplate.html)时,它将自动检测线程绑定通道并自动参与其事务。
使用 Java 配置,你可以通过使用以下 Bean 设置一个新的 RabbitTransActionManager:
```
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(connectionFactory);
}
```
如果你更喜欢 XML 配置,那么可以在 XML 应用程序上下文文件中声明以下内容 Bean:
```
```
##### 事务同步
将 RabbitMQ 事务与其他事务(例如 DBMS)同步提供了“Best Effort One Phase Commit”语义。在事务同步的完成后阶段,RabbitMQ 事务可能无法提交。这是由`spring-tx`基础架构作为错误记录的,但不会向调用代码抛出异常。从版本 2.3.10 开始,你可以在事务在处理该事务的同一线程上提交后调用`ConnectionUtils.checkAfterCompletion()`。如果没有发生异常,它将简单地返回;否则它将抛出一个`AfterCompletionFailedException`,该属性将具有表示完成的同步状态的属性。
通过调用`ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true)`来启用此功能;这是一个全局标志,适用于所有线程。
#### 4.1.17.消息侦听器容器配置
对于配置与事务和服务质量相关的`SimpleMessageListenerContainer`和`DirectMessageListenerContainer`有相当多的选项,其中一些选项相互交互。适用于 SMLC、DMLC 或`StreamListenerContainer`(见[使用 RabbitMQ 流插件](#stream-support))的属性由相应列中的复选标记指示。有关帮助你决定哪个容器适合你的应用程序的信息,请参见[选择容器](#choose-container)。
下表显示了使用名称空间配置``时的容器属性名称及其等效属性名称(在括号中)。该元素上的`type`属性可以是`simple`(默认)或`direct`,以分别指定`SMLC`或`DMLC`。名称空间不公开某些属性。这些由属性的`N/A`表示。
| Property
(Attribute) |说明| SMLC | DMLC | StLC |
|-----------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------|--------------------------------|--------------------------------|
| | |
| | |
| | |
| | |
| | |
| | |
| |在版本 1.6 之前,如果上下文中有一个以上的管理员,容器将随机选择一个。
如果没有管理员,它将在内部创建一个。
在这两种情况下,这都可能导致意外的结果。
从版本 1.6 开始,对于`autoDeclare`来说,上下文中必须正好有一个`RabbitAdmin`,或者必须使用`rabbitAdmin`属性在容器上配置对特定实例的引用。| | | |
| |
| | | |
| | |
| | |
| | | |
| | | |
| | |
| | | |
| | | |
| | | |
| |
| | | |
| | |
| | |
| | |
| | |
| | | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| (group) |这仅在使用名称空间时可用。
当指定时,类型`Collection`的 Bean 被注册为该名称,并且每个
元素的
容器被添加到集合中。
例如,这允许,通过迭代集合来启动和停止容器组。
如果多个``元素具有相同的组值,集合形式中的容器
是如此指定的所有容器的集合。|![tickmark](https://docs.spring.io/spring-amqp/docs/current/reference/html/images/tickmark.png)|![tickmark](https://docs.spring.io/spring-amqp/docs/current/reference/html/images/tickmark.png)| |
| | |
| | |
| | | |
| | |
| | |
| |如果在初始启动期间代理不可用,则容器将启动,并在建立连接时检查条件。| | | |
| |检查是针对上下文中的所有队列进行的,而不仅仅是特定侦听器被配置使用的队列,
如果你希望将检查仅限于容器使用的那些队列,那么你应该为容器配置一个单独的`RabbitAdmin`,并使用`rabbitAdmin`属性提供对它的引用。
有关更多信息,请参见[有条件声明](#conditional-declaration)。| | | |
| |在 Bean 中为`@RabbitListener`启动容器时,禁用不匹配的队列参数检测这被标记为`@Lazy`。
这是为了避免潜在的死锁,这可能会将此类容器的启动延迟长达 60 秒。
使用 lazy Listener bean 的应用程序应该在获得对 lazy 的引用之前检查队列参数 Bean。| | | |
| | |
| |在 Bean 中为`@RabbitListener`启动容器时,将禁用丢失的队列检测这被标记为`@Lazy`。
这是为了避免潜在的死锁,这可能会将此类容器的启动延迟长达 60 秒。
使用 Lazy Listener Bean 的应用程序应该在获得对 Lazy 的引用之前检查队列 Bean。| | | |
| | |
| | |
| | |
|| |
| | |
| |在某些情况下,预取取值应该
较低,例如,对于较大的消息,尤其是在处理速度较慢的情况下(消息可能会将
累加到客户端进程中的大量内存中),如果需要严格的消息排序
(在这种情况下,预取值应该设置为 1),
还可以使用低容量消息传递和多个消费者(包括单个侦听器容器实例中的并发性),你可能希望减少预取,以使消息在消费者之间的分布更加均匀。| | | |
| | |
| | | |
| | |
| | |
| | | |
| | |
| | | |
| | |
| | | |
| |
| | |
| | |
| | |
#### 4.1.18.监听器并发
##### SimpleMessageListenerContainer
默认情况下,侦听器容器启动一个从队列接收消息的使用者。
在检查上一节中的表时,你可以看到许多控制并发性的属性和属性。最简单的是`concurrentConsumers`,它创建了并发处理消息的(固定的)消费者数量。
在 1.3.0 版本之前,这是唯一可用的设置,容器必须停止并重新启动才能更改设置。
自版本 1.3.0 以来,你现在可以动态调整`concurrentConsumers`属性。如果在容器运行时对其进行了更改,则会根据需要添加或删除消费者,以适应新的设置。
此外,还添加了一个名为`maxConcurrentConsumers`的新属性,并且容器根据工作负载动态地调整并发。这与四个附加属性一起工作:`consecutiveActiveTrigger`、`startConsumerMinInterval`、`consecutiveIdleTrigger`和`stopConsumerMinInterval`。在默认设置下,增加消费者的算法工作如下:
如果`maxConcurrentConsumers`尚未到达,并且一个现有的使用者连续十个周期处于活动状态,并且自上一个使用者启动以来至少已经过了 10 秒,则启动一个新的使用者。如果使用者在`batchSize`\*`receiveTimeout`毫秒内至少接收到一条消息,则被认为是活动的。
在默认设置下,减少消费者的算法工作如下:
如果有超过`concurrentConsumers`的运行并且一个消费者检测到连续十个超时(空闲)并且最后一个消费者在至少 60 秒前被停止,则消费者被停止。超时取决于`receiveTimeout`和`batchSize`属性。如果使用者在`batchSize`\*`receiveTimeout`毫秒内没有收到消息,则被认为是空闲的。因此,使用默认的超时(一秒)和`batchSize`的四个超时,在 40 秒的空闲时间(四个超时对应一个空闲检测)后考虑停止消费者。
| |实际上,只有当整个容器闲置一段时间时,才可以停止使用消费者。
这是因为代理在所有活动消费者之间共享其工作。|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
无论配置队列的数量如何,每个使用者都使用一个通道。
从版本 2.0 开始,`concurrentConsumers`和`maxConcurrentConsumers`属性可以设置为`concurrency`属性——例如,`2-4`。
##### 使用`DirectMessageListenerContainer`
在这个容器中,并发是基于配置的队列和`consumersPerQueue`。每个队列的每个使用者都使用一个单独的通道,并且并发性由 Rabbit 客户端库控制。默认情况下,在编写时,它使用`DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2`线程池。
你可以配置`taskExecutor`以提供所需的最大并发性。
#### 4.1.19.独家消费者
从版本 1.3 开始,你可以使用一个独占使用者来配置侦听器容器。这可以防止其他容器从队列中消费,直到当前使用者被取消为止。这样的容器的并发性必须是`1`。
当使用独占消费者时,其他容器尝试根据`recoveryInterval`属性从队列中消费,如果尝试失败,则记录`WARN`消息。
#### 4.1.20.监听器容器队列
版本 1.3 为处理侦听器容器中的多个队列引入了许多改进。
容器必须被配置为至少监听一个队列。以前也是这样,但现在可以在运行时添加和删除队列。当处理了任何预取的消息时,容器回收(取消和重新创建)消费者。对于`addQueues`、`addQueueNames`、`removeQueues`和`removeQueueNames`方法,请参见[Javadoc](https://docs.spring.io/spring-amqp/docs/latest-ga/api/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.html)。在删除队列时,必须保留至少一个队列。
现在,如果消费者的队列中有任何一个可用,他就会启动。以前,如果有任何队列不可用,容器就会停止。现在,只有在没有队列可用的情况下才会出现这种情况。如果不是所有的队列都是可用的,那么容器将尝试每隔 60 秒被动地声明(并消耗)丢失的队列。
此外,如果使用者从代理接收到取消(例如,如果队列被删除),则使用者将尝试恢复,并且恢复的使用者将继续处理来自任何其他配置队列的消息。以前,一个队列上的取消会取消整个消费者,最终,由于缺少队列,容器将停止。
如果希望永久删除队列,则应在删除到队列之前或之后更新容器,以避免将来尝试使用它。
#### 4.1.21.弹性:从错误和代理失败中恢复
Spring AMQP 提供的一些关键(也是最流行的)高级特性与在协议错误或代理失败的情况下的恢复和自动重新连接有关。我们已经在本指南中看到了所有相关的组件,但是在这里将它们集合在一起并单独列出特性和恢复场景应该会有所帮助。
主要的重新连接功能由`CachingConnectionFactory`本身启用。使用`RabbitAdmin`自动声明功能通常也是有益的。此外,如果你关心保证的交付,你可能还需要在`RabbitTemplate`和`SimpleMessageListenerContainer`中使用`channelTransacted`标志,在`AcknowledgeMode.AUTO`中使用`AcknowledgeMode.AUTO`(如果你自己进行 ACK,则使用手动)标志。
##### 交换、队列和绑定的自动声明
`RabbitAdmin`组件可以在启动时声明交换、队列和绑定。它通过`ConnectionListener`懒洋洋地做到了这一点。因此,如果代理在启动时不存在,这并不重要。第一次使用`Connection`(例如,通过发送消息)时,侦听器将触发并应用管理功能。在侦听器中执行自动声明的另一个好处是,如果由于任何原因(例如,代理死亡,网络故障和其他原因)而丢失连接,则在重新建立连接时再次应用这些声明。
| |以这种方式声明的队列必须具有固定的名称——要么是显式声明的,要么是由`AnonymousQueue`实例的框架生成的。
匿名队列是不可持久的、排他的和自动删除的。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| |只有当`CachingConnectionFactory`缓存模式为`CHANNEL`(默认)时,才会执行自动声明。
存在此限制,因为独占和自动删除队列绑定到连接。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
从版本 2.2.2 开始,`RabbitAdmin`将检测类型为`DeclarableCustomizer`的 bean,并在实际处理声明之前应用该函数。这是有用的,例如,设置一个新的参数(属性)之前,它在框架内有第一类支持。
```
@Bean
public DeclarableCustomizer customizer() {
return dec -> {
if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
dec.addArgument("some.new.queue.argument", true);
}
return dec;
};
}
```
对于不提供直接访问`Declarable` Bean 定义的项目,它也很有用。
另见[RabbitMQ 自动连接/拓扑恢复](#auto-recovery)。
##### 同步操作失败和重试选项
如果在使用`RabbitTemplate`(例如)时,在同步序列中丢失了与代理的连接, Spring AMQP 将抛出一个`AmqpException`(通常但并非总是`AmqpIOException`)。我们不会试图掩盖存在问题的事实,因此你必须能够捕捉并响应异常。如果你怀疑连接丢失(而且这不是你的错误),最简单的方法是再次尝试该操作。你可以手动完成此操作,也可以使用 Spring Retry 来处理重试(强制地或声明地)。
Spring 重试提供了两个 AOP 拦截器和很大的灵活性来指定重试的参数(尝试次数、异常类型、退避算法和其他)。 Spring AMQP 还提供了一些方便的工厂 bean,用于为 AMQP 用例以方便的形式创建 Spring 重试拦截器,其具有可用于实现自定义恢复逻辑的强类型回调接口。有关更多详细信息,请参见`StatefulRetryOperationsInterceptor`和`StatelessRetryOperationsInterceptor`的 Javadoc 和属性。如果没有事务或者在重试回调中启动了事务,则无状态重试是合适的。请注意,无状态重试比有状态重试更容易配置和分析,但是如果有一个正在进行的事务必须回滚或者肯定要回滚,那么它通常是不合适的。事务中间的断开连接应该具有与回滚相同的效果。因此,对于在堆栈更高的位置启动事务的重新连接,有状态重试通常是最佳选择。有状态重试需要一种机制来唯一地标识消息。最简单的方法是让发送方在`MessageId`消息属性中放置一个唯一的值。所提供的消息转换器提供了这样做的选项:你可以将`createMessageIds`设置为`true`。否则,可以将`MessageKeyGenerator`实现注入拦截器。密钥生成器必须为每条消息返回唯一的密钥。在版本 2.0 之前的版本中,提供了`MissingMessageIdAdvice`。它允许不带`messageId`属性的消息只重试一次(忽略重试设置)。不再提供此建议,因为与`spring-retry`版本 1.2 一起,其功能内置在拦截器和消息侦听器容器中。
| |对于向后兼容性,缺省情况下(在一次重试之后),带有空消息 ID 的消息被认为对使用者是致命的(使用者被停止),
以复制`MissingMessageIdAdvice`提供的功能,你可以在侦听器容器上将`statefulRetryFatalWithNullMessageId`属性设置为`false`。
通过该设置,使用者将继续运行并拒绝消息(在一次重试之后)。
它将被丢弃或路由到死信队列(如果配置了死信队列)。|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
从版本 1.3 开始,提供了一个 Builder API,以通过使用 Java(在`@Configuration`类中)来帮助组装这些拦截器。下面的示例展示了如何做到这一点:
```
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
```
只能以这种方式配置重试功能的一个子集。更高级的功能将需要将`RetryTemplate`配置为 Spring Bean。有关可用策略及其配置的完整信息,请参见[Spring Retry Javadoc](https://docs.spring.io/spring-retry/docs/api/current/)。
##### 使用批处理侦听器重试
不建议使用批处理侦听器配置重试,除非批处理是由生成器在单个记录中创建的。有关消费者和生产者创建的批的信息,请参见[批处理消息](#de-batching)。对于消费者创建的批处理,框架不知道批处理中的哪条消息导致了故障,因此不可能在重试结束后进行恢复。使用生产者创建的批处理,由于只有一条消息实际失败,因此可以恢复整个消息。应用程序可能想要通知自定义恢复程序在批处理中发生故障的位置,可能是通过设置抛出异常的索引属性。
批处理侦听器的重试恢复程序必须实现`MessageBatchRecoverer`。
##### 消息侦听器和异步情况
如果`MessageListener`由于业务异常而失败,则异常由消息侦听器容器处理,然后返回到侦听另一条消息。如果故障是由已删除的连接(而不是业务异常)引起的,则必须取消并重新启动为侦听器收集消息的使用者。`SimpleMessageListenerContainer`无缝地处理此问题,并留下一个日志来表示侦听器正在重新启动。事实上,它无休止地循环,试图重新启动消费者。只有当消费者表现得非常糟糕时,它才会放弃。一个副作用是,如果代理在容器启动时关闭,它会一直尝试,直到可以建立连接为止。
与协议错误和断开的连接相反,业务异常处理可能需要更多的考虑和一些自定义配置,尤其是在使用事务或容器 ACK 的情况下。在 2.8.x 之前,RabbitMQ 没有对死信行为的定义。因此,在默认情况下,由于业务异常而被拒绝或回滚的消息可以无休止地重新交付。要限制客户机的再交付次数,一个选择是侦听器的建议链中的`StatefulRetryOperationsInterceptor`。拦截器可以有一个实现自定义死信操作的恢复回调——任何适合你的特定环境的操作。
另一种选择是将容器的`defaultRequeueRejected`属性设置为`false`。这将导致丢弃所有失败的消息。当使用 RabbitMQ2.8.x 或更高版本时,这也有利于将消息传递给死信交换。
或者,你可以抛出`AmqpRejectAndDontRequeueException`。无论`defaultRequeueRejected`属性的设置如何,这样做都可以防止消息请求。
从版本 2.1 开始,引入了一个`ImmediateRequeueAmqpException`来执行完全相反的逻辑:无论`defaultRequeueRejected`属性的设置如何,消息都将被重新请求。
通常,这两种技术的组合被使用。你可以在建议链中使用`StatefulRetryOperationsInterceptor`,并使用`MessageRecoverer`抛出`AmqpRejectAndDontRequeueException`。当所有重试都已用尽时,将调用`MessageRecover`。`RejectAndDontRequeueRecoverer`就是这么做的。默认的`MessageRecoverer`消耗错误消息并发出`WARN`消息。
从版本 1.3 开始,提供了一个新的`RepublishMessageRecoverer`,允许在重试结束后发布失败的消息。
当回收者使用最后一个异常时,该消息将被 ACK’d,并且不会被发送到死信交换(如果有的话)。
| |当`RepublishMessageRecoverer`在消费者侧使用时,接收到的消息在`receivedDeliveryMode`消息属性中具有`deliveryMode`。
在这种情况下,`deliveryMode`是`null`。
这意味着在代理上具有`NON_PERSISTENT`交付模式,
从版本 2.0 开始,你可以将`RepublishMessageRecoverer`的`deliveryMode`配置为设置到消息中以重新发布,如果它是`null`。
默认情况下,它使用`MessageProperties`默认值-`MessageDeliveryMode.PERSISTENT`。|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
下面的示例展示了如何将`RepublishMessageRecoverer`设置为回收器:
```
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
.build();
}
```
`RepublishMessageRecoverer`在消息头中发布带有附加信息的消息,例如异常消息、堆栈跟踪、原始交换和路由密钥。可以通过创建一个子类并覆盖`additionalHeaders()`来添加额外的标题。`deliveryMode`(或任何其他属性)也可以在`additionalHeaders()`中进行更改,如下例所示:
```
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {
protected Map extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
message.getMessageProperties()
.setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
return null;
}
};
```
从版本 2.0.5 开始,如果堆栈跟踪太大,它可能会被截断;这是因为所有的标头都必须适合于单个框架。默认情况下,如果堆栈跟踪将导致其他头的可用字节数少于 20,000(“headroom”),那么它将被截断。如果你需要更多或更少的空间来放置其他头文件,可以通过设置 recoverer 的`frameMaxHeadroom`属性来对此进行调整。从版本 2.1.13、2.2.3 开始,异常消息将包含在此计算中,并且使用以下算法将堆栈跟踪的数量最大化:
* 如果单独的堆栈跟踪将超过限制,则异常消息头将被截断为 97 字节加上`…`,并且堆栈跟踪也将被截断。
* 如果堆栈跟踪很小,消息将被截断(加上`…`)以适应可用字节(但堆栈跟踪本身中的消息被截断为 97 字节加上`…`)。
每当发生任何类型的截断时,将记录原始异常以保留完整的信息。
从版本 2.3.3 开始,提供了一个新的子类`RepublishMessageRecovererWithConfirms`;这支持两种类型的 Publisher 确认,并将在返回之前等待确认(或者如果未确认或消息返回,则抛出异常)。
如果确认类型是`CORRELATED`,则子类还将检测是否返回了消息并抛出`AmqpMessageReturnedException`;如果发布是否定的,则将抛出`AmqpNackReceivedException`。
如果确认类型是`SIMPLE`,则子类将调用通道上的`waitForConfirmsOrDie`方法。
有关确认和返回的更多信息,请参见[发布者确认并返回](#cf-pub-conf-ret)。
从版本 2.1 开始,将添加`ImmediateRequeueMessageRecoverer`以抛出`ImmediateRequeueAmqpException`,该命令通知侦听器容器重新请求当前失败的消息。
##### Spring 重试的异常分类
Spring 重试在确定哪些异常可以调用重试方面具有很大的灵活性。对于所有异常,默认配置都会重试。考虑到用户异常包装在`ListenerExecutionFailedException`中,我们需要确保分类检查异常原因。默认分类器只查看顶层异常。
由于 Spring 重试 1.0.3,`BinaryExceptionClassifier`具有一个名为`traverseCauses`的属性(默认:`false`)。当`true`时,它遍历异常原因,直到找到匹配的原因或没有原因为止。
要使用此分类器进行重试,你可以使用一个`SimpleRetryPolicy`,该构造函数创建了最大尝试次数,`Exception`实例的`Map`和布尔(`traverseCauses`),并将此策略注入`RetryTemplate`。
#### 4.1.22.多个代理(或集群)支持
在单个应用程序与多个代理或代理集群之间进行通信时,版本 2.3 增加了更多的便利。在消费者方面,主要的好处是基础设施可以自动将自动声明的队列与适当的代理关联起来。
用一个例子最好地说明了这一点:
```
@SpringBootApplication(exclude = RabbitAutoConfiguration.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
CachingConnectionFactory cf1() {
return new CachingConnectionFactory("localhost");
}
@Bean
CachingConnectionFactory cf2() {
return new CachingConnectionFactory("otherHost");
}
@Bean
CachingConnectionFactory cf3() {
return new CachingConnectionFactory("thirdHost");
}
@Bean
SimpleRoutingConnectionFactory rcf(CachingConnectionFactory cf1,
CachingConnectionFactory cf2, CachingConnectionFactory cf3) {
SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
rcf.setDefaultTargetConnectionFactory(cf1);
rcf.setTargetConnectionFactories(Map.of("one", cf1, "two", cf2, "three", cf3));
return rcf;
}
@Bean("factory1-admin")
RabbitAdmin admin1(CachingConnectionFactory cf1) {
return new RabbitAdmin(cf1);
}
@Bean("factory2-admin")
RabbitAdmin admin2(CachingConnectionFactory cf2) {
return new RabbitAdmin(cf2);
}
@Bean("factory3-admin")
RabbitAdmin admin3(CachingConnectionFactory cf3) {
return new RabbitAdmin(cf3);
}
@Bean
public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
@Bean
public RabbitListenerAnnotationBeanPostProcessor postProcessor(RabbitListenerEndpointRegistry registry) {
MultiRabbitListenerAnnotationBeanPostProcessor postProcessor
= new MultiRabbitListenerAnnotationBeanPostProcessor();
postProcessor.setEndpointRegistry(registry);
postProcessor.setContainerFactoryBeanName("defaultContainerFactory");
return postProcessor;
}
@Bean
public SimpleRabbitListenerContainerFactory factory1(CachingConnectionFactory cf1) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf1);
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory factory2(CachingConnectionFactory cf2) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf2);
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory factory3(CachingConnectionFactory cf3) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf3);
return factory;
}
@Bean
RabbitTemplate template(RoutingConnectionFactory rcf) {
return new RabbitTemplate(rcf);
}
@Bean
ConnectionFactoryContextWrapper wrapper(SimpleRoutingConnectionFactory rcf) {
return new ConnectionFactoryContextWrapper(rcf);
}
}
@Component
class Listeners {
@RabbitListener(queuesToDeclare = @Queue("q1"), containerFactory = "factory1")
public void listen1(String in) {
}
@RabbitListener(queuesToDeclare = @Queue("q2"), containerFactory = "factory2")
public void listen2(String in) {
}
@RabbitListener(queuesToDeclare = @Queue("q3"), containerFactory = "factory3")
public void listen3(String in) {
}
}
```
正如你所看到的,我们已经声明了 3 组基础设施(连接工厂、管理员、容器工厂)。如前所述,`@RabbitListener`可以定义使用哪个容器工厂;在这种情况下,它们还使用`queuesToDeclare`,如果队列不存在,则该队列将在代理上声明。通过使用约定`-admin`命名`RabbitAdmin`bean,基础结构能够确定哪个管理员应该声明队列。这也将与`bindings = @QueueBinding(…)`一起工作,其中交换和绑定也将被声明。它将不能与`queues`一起工作,因为这期望队列已经存在。
在生产者方面,提供了一个方便的`ConnectionFactoryContextWrapper`类,以使使用`RoutingConnectionFactory`(参见[路由连接工厂](#routing-connection-factory))变得更简单。
正如你在上面看到的,一个`SimpleRoutingConnectionFactory` Bean 已经添加了路由密钥`one`,`two`和`three`。还有一个`RabbitTemplate`使用了那个工厂。下面是一个使用该模板和包装器路由到代理集群之一的示例。
```
@Bean
public ApplicationRunner runner(RabbitTemplate template, ConnectionFactoryContextWrapper wrapper) {
return args -> {
wrapper.run("one", () -> template.convertAndSend("q1", "toCluster1"));
wrapper.run("two", () -> template.convertAndSend("q2", "toCluster2"));
wrapper.run("three", () -> template.convertAndSend("q3", "toCluster3"));
};
}
```
#### 4.1.23.调试
Spring AMQP 提供了广泛的日志记录,特别是在`DEBUG`级别。
如果希望监视应用程序和代理之间的 AMQP 协议,则可以使用 Wireshark 之类的工具,该工具具有一个插件来解码该协议。或者,RabbitMQ Java 客户机提供了一个非常有用的类`Tracer`。当作为`main`运行时,默认情况下,它会监听端口 5673 并连接到 LocalHost 上的端口 5672.你可以运行它并更改连接工厂配置以连接到 LocalHost 上的端口 5673.它在控制台上显示已解码的协议。有关更多信息,请参见`Tracer`Javadoc。
### 4.2.使用 RabbitMQ 流插件
版本 2.4 为[RabbitMQ 流插件](https://rabbitmq.com/stream.html)引入了对[RabbitMQ 流插件 Java 客户端](https://github.com/rabbitmq/rabbitmq-stream-java-client)的初始支持。
* `RabbitStreamTemplate`
* `StreamListenerContainer`
#### 4.2.1.发送消息
`RabbitStreamTemplate`提供了`RabbitTemplate`功能的一个子集。
例 1.RabbitStreamOperations
```
public interface RabbitStreamOperations extends AutoCloseable {
ListenableFuture send(Message message);
ListenableFuture convertAndSend(Object message);
ListenableFuture convertAndSend(Object message, @Nullable MessagePostProcessor mpp);
ListenableFuture send(com.rabbitmq.stream.Message message);
MessageBuilder messageBuilder();
MessageConverter messageConverter();
StreamMessageConverter streamMessageConverter();
@Override
void close() throws AmqpException;
}
```
`RabbitStreamTemplate`实现具有以下构造函数和属性:
例 2.RabbitStreamTemplate
```
public RabbitStreamTemplate(Environment environment, String streamName) {
}
public void setMessageConverter(MessageConverter messageConverter) {
}
public void setStreamConverter(StreamMessageConverter streamConverter) {
}
public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}
```
`MessageConverter`在`convertAndSend`方法中用于将对象转换为 Spring amqp`Message`。
`StreamMessageConverter`用于将 Spring AMQP`Message`转换为本机流`Message`。
你还可以直接发送本机流`Message`s;使用`messageBuilder()`方法证明对`Producer`的消息生成器的访问。
`ProducerCustomizer`提供了一种机制,可以在生成生产者之前对其进行定制。
请参阅关于自定义[Java 客户端文档](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/)和`Producer`的[Java 客户端文档](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/)。
#### 4.2.2.接收消息
异步消息接收由`StreamListenerContainer`提供(当使用`@RabbitListener`时,`StreamRabbitListenerContainerFactory`)。
侦听器容器需要`Environment`以及一个流名。
你可以使用经典的`MessageListener`接收 Spring AMQP`Message`s,也可以使用新的接口接收本机流`Message`s:
```
public interface StreamMessageListener extends MessageListener {
void onStreamMessage(Message message, Context context);
}
```
有关受支持的属性的信息,请参见[消息侦听器容器配置](#containerAttributes)。
与模板类似,容器具有`ConsumerCustomizer`属性。
请参阅关于自定义[Java 客户端文档](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/)和`Consumer`的[Java 客户端文档](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/)。
当使用`@RabbitListener`时,配置一个`StreamRabbitListenerContainerFactory`;此时,大多数`@RabbitListener`属性(`concurrency`等)被忽略。只支持`id`、`queues`、`autoStartup`和`containerFactory`。此外,`queues`只能包含一个流名。
#### 4.2.3.例子
```
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
template.setProducerCustomizer((name, builder) -> builder.name("test"));
return template;
}
@Bean
RabbitListenerContainerFactory rabbitListenerContainerFactory(Environment env) {
return new StreamRabbitListenerContainerFactory(env);
}
@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
...
}
@Bean
RabbitListenerContainerFactory nativeFactory(Environment env) {
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
factory.setNativeListener(true);
factory.setConsumerCustomizer((id, builder) -> {
builder.name("myConsumer")
.offset(OffsetSpecification.first())
.manualTrackingStrategy();
});
return factory;
}
@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
...
context.storeOffset();
}
```
### 4.3.日志记录子系统 AMQP 附录
该框架为一些流行的日志记录子系统提供了日志附录:
* 注销(自 Spring AMQP 版本 1.4 起)
* log4j2(自 Spring AMQP 版本 1.6 起)
通过使用日志记录子系统的常规机制来配置附录,可用的属性在下面的部分中指定。
#### 4.3.1.共同属性
以下属性可与所有附录一起使用:
| Property | Default |说明|
|-------------------------------------------|-------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| ```
exchangeName
``` | ```
logs
``` |要向其发布日志事件的交易所的名称。|
| ```
exchangeType
``` | ```
topic
``` |将日志事件发布到其中的交换类型—仅当 Appender 声明交换时才需要。
请参见`declareExchange`。|
| ```
routingKeyPattern
``` | ```
%c.%p
``` |用于生成路由密钥的记录子系统模式格式.|
| ```
applicationId
``` | ```
``` |应用程序 ID——如果模式包含`%X{applicationId}`,则将其添加到路由键中。|
| ```
senderPoolSize
``` | ```
2
``` |用于发布日志事件的线程数。|
| ```
maxSenderRetries
``` | ```
30
``` |如果代理不可用或存在其他错误,则重试发送消息的次数。
重试延迟如下:`N ^ log(N)`,其中`N`是重试编号。|
| ```
addresses
``` | ```
``` |以下列形式的以逗号分隔的代理地址列表:`host:port[,host:port]*`-覆盖`host`和`port`。|
| ```
host
``` | ```
localhost
``` |连接到哪个主机的 RabbitMQ。|
| ```
port
``` | ```
5672
``` |连接到的 RabbitMQ 端口。|
| ```
virtualHost
``` | ```
/
``` |连接到的 RabbitMQ 虚拟主机。|
| ```
username
``` | ```
guest
``` |RabbitMQ 用户连接时使用.|
| ```
password
``` | ```
guest
``` |此用户的 RabbitMQ 密码。|
| ```
useSsl
``` | ```
false
``` |是否将 SSL 用于 RabbitMQ 连接。
参见[`RabbitConnectionFactoryBean`并配置 SSL]|
| ```
verifyHostname
``` | ```
true
``` |启用 TLS 连接的服务器主机名验证。
参见[`RabbitConnectionFactoryBean`并配置 SSL]|
| ```
sslAlgorithm
``` | ```
null
``` |使用的 SSL 算法。|
| ```
sslPropertiesLocation
``` | ```
null
``` |SSL 属性文件的位置。|
| ```
keyStore
``` | ```
null
``` |密钥存储库的位置。|
| ```
keyStorePassphrase
``` | ```
null
``` |密钥库的密码。|
| ```
keyStoreType
``` | ```
JKS
``` |keystore 类型。|
| ```
trustStore
``` | ```
null
``` |信任库的位置。|
| ```
trustStorePassphrase
``` | ```
null
``` |信任存储库的密码。|
| ```
trustStoreType
``` | ```
JKS
``` |信任库类型。|
| ```
saslConfig
``` |```
null (RabbitMQ client default applies)
```|`saslConfig`-关于有效值,请参见`RabbitUtils.stringToSaslConfig`的 Javadoc。|
| ```
ContentType
``` | ```
text/plain
``` |日志消息的`content-type`属性。|
| ```
contentEncoding
``` | ```
``` |`content-encoding`日志消息的属性。|
| ```
declareExchange
``` | ```
false
``` |是否在此附录启动时声明已配置的交换。
另请参见`durable`和`autoDelete`。|
| ```
durable
``` | ```
true
``` |当`declareExchange`为`true`时,持久标志被设置为该值。|
| ```
autoDelete
``` | ```
false
``` |当`declareExchange`为`true`时,自动删除标志被设置为该值。|
| ```
charset
``` | ```
null
``` |将`String`转换为`byte[]`时使用的字符集。
默认:null(使用的是系统默认字符集)。
如果当前平台不支持字符集,我们将退回使用系统字符集。|
| ```
deliveryMode
``` | ```
PERSISTENT
``` |`PERSISTENT`或`NON_PERSISTENT`,以确定 RabbitMQ 是否应该持久化消息。|
| ```
generateId
``` | ```
false
``` |用于确定`messageId`属性是否设置为唯一值。|
|```
clientConnectionProperties
```| ```
null
``` |用于 RabbitMQ 连接的自定义客户端属性的`key:value`对的逗号分隔列表。|
| ```
addMdcAsHeaders
``` | ```
true
``` |在引入此属性之前,MDC 属性总是被添加到 RabbitMQ 消息头中。
它可能会导致大 MDC 的问题,因为 RabbitMQ 对所有头都有有限的缓冲区大小而且这个缓冲区很小。
引入这个属性是为了避免在大 MDC 的情况下出现问题。
默认情况下,这个值设置为`true`,用于向后兼容。
`false`将序列化 MDC 关闭到头中,
请注意,默认情况下,`JsonLayout`将 MDC 添加到消息中。|
#### 4.3.2.log4j2 附录
下面的示例展示了如何配置 log4j2Appender:
```
...
```
| |从 1.6.10 和 1.7.3 版本开始,默认情况下,log4j2Appender 将消息发布到调用线程上的 RabbitMQ。
这是因为 log4j2 默认情况下不会创建线程安全事件。
如果代理关闭,则使用`maxSenderRetries`进行重试,重试之间没有延迟。
如果你希望恢复以前在单独的线程上发布消息的行为(`senderPoolSize`),则可以将`async`属性设置为`true`,但是,
,你还需要配置 log4j2 来使用`DefaultLogEventFactory`而不是`ReusableLogEventFactory`。
这样做的一种方法是设置系统属性`-Dlog4j2.enable.threadlocals=false`。
如果你使用异步发布与`ReusableLogEventFactory`,由于相声,事件很有可能被破坏。|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
#### 4.3.3.回录附录
下面的示例展示了如何配置一个 Logback Appender:
```
%n ]]>
foo:5672,bar:5672
36
false
myApplication
%property{applicationId}.%c.%p
true
UTF-8
false
NON_PERSISTENT
true
false
```
从版本 1.7.1 开始,logback`AmqpAppender`提供了一个`includeCallerData`选项,默认情况下是`false`。提取调用者数据可能会非常昂贵,因为日志事件必须创建一个可丢弃的数据,并对其进行检查以确定调用位置。因此,默认情况下,当事件被添加到事件队列时,不会提取与事件相关的调用方数据。通过将`includeCallerData`属性设置为`true`,可以将 Appender 配置为包括调用方数据。
从版本 2.0.0 开始,Logback`AmqpAppender`使用`encoder`选项支持[翻录编码器](https://logback.qos.ch/manual/encoders.html)。`encoder`和`layout`选项是互斥的。
#### 4.3.4.自定义消息
默认情况下,AMQP Appenders 填充以下消息属性:
* `deliveryMode`
* contentType
* `contentEncoding`,如果已配置
* `messageId`,如果`generateId`已配置
* 日志事件的`timestamp`
* `appId`,如果配置了 ApplicationID
此外,它们还用以下值填充标题:
* 日志事件的`categoryName`
* 日志事件的级别
* `thread`:发生日志事件的线程的名称
* 日志事件调用的堆栈跟踪的位置
* 所有 MDC 属性的副本(除非`addMdcAsHeaders`被设置为`false`)
每个附录都可以进行子类,这样你就可以在发布之前修改消息了。下面的示例展示了如何自定义日志消息:
```
public class MyEnhancedAppender extends AmqpAppender {
@Override
public Message postProcessMessageBeforeSend(Message message, Event event) {
message.getMessageProperties().setHeader("foo", "bar");
return message;
}
}
```
从 2.2.4 开始,log4j2`AmqpAppender`可以使用`@PluginBuilderFactory`进行扩展,也可以使用`AmqpAppender.Builder`进行扩展。
```
@Plugin(name = "MyEnhancedAppender", category = "Core", elementType = "appender", printObject = true)
public class MyEnhancedAppender extends AmqpAppender {
public MyEnhancedAppender(String name, Filter filter, Layout extends Serializable> layout,
boolean ignoreExceptions, AmqpManager manager, BlockingQueue eventQueue, String foo, String bar) {
super(name, filter, layout, ignoreExceptions, manager, eventQueue);
@Override
public Message postProcessMessageBeforeSend(Message message, Event event) {
message.getMessageProperties().setHeader("foo", "bar");
return message;
}
@PluginBuilderFactory
public static Builder newBuilder() {
return new Builder();
}
protected static class Builder extends AmqpAppender.Builder {
@Override
protected AmqpAppender buildInstance(String name, Filter filter, Layout extends Serializable> layout,
boolean ignoreExceptions, AmqpManager manager, BlockingQueue eventQueue) {
return new MyEnhancedAppender(name, filter, layout, ignoreExceptions, manager, eventQueue);
}
}
}
```
#### 4.3.5.自定义客户端属性
你可以通过添加字符串属性或更复杂的属性来添加自定义客户机属性。
##### 简单字符串属性
每个 Appender 都支持向 RabbitMQ 连接添加客户端属性。
下面的示例展示了如何为回登添加自定义客户机属性:
```
...
thing1:thing2,cat:hat
...
```
示例 3.log4j2
```
...
```
这些属性是用逗号分隔的`key:value`对列表。键和值不能包含逗号或冒号。
当查看连接时,这些属性会出现在 RabbitMQ 管理 UI 上。
##### 回传的高级技术
你可以对 Logback Appender 进行子类。这样做可以让你在建立连接之前修改客户机连接属性。下面的示例展示了如何做到这一点:
```
public class MyEnhancedAppender extends AmqpAppender {
private String thing1;
@Override
protected void updateConnectionClientProperties(Map clientProperties) {
clientProperties.put("thing1", this.thing1);
}
public void setThing1(String thing1) {
this.thing1 = thing1;
}
}
```
然后可以将`thing2`添加到 logback.xml。
对于字符串属性(如前面示例中所示的那些),可以使用前面的技术。子类允许添加更丰富的属性(例如添加`Map`或数字属性)。
#### 4.3.6.提供自定义队列实现
`AmqpAppenders`使用`BlockingQueue`将日志事件异步发布到 RabbitMQ。默认情况下,使用`LinkedBlockingQueue`。但是,你可以提供任何类型的自定义`BlockingQueue`实现。
下面的示例展示了如何对注销执行此操作:
```
public class MyEnhancedAppender extends AmqpAppender {
@Override
protected BlockingQueue createEventQueue() {
return new ArrayBlockingQueue();
}
}
```
log4j2Appender 支持使用[`BlockingQueueFactory`](https://logging. Apache.org/log4j/2.x/manual/appenders.html#BlockingQueueFactory),如下例所示:
```
...
```
### 4.4.示例应用程序
项目包括两个示例应用程序。第一个是一个简单的“Hello World”示例,演示了同步和异步消息接收。它为获得对基本组件的理解提供了一个很好的起点。第二个示例基于股票交易用例,以演示在现实世界的应用程序中常见的交互类型。在这一章中,我们提供了每个示例的快速演练,以便你能够关注最重要的组件。这些示例都是基于 Maven 的,因此你应该能够将它们直接导入到任何 Maven 可感知的 IDE 中(例如[SpringSource 工具套件](https://www.springsource.org/sts))。
#### 4.4.1.《Hello World》样本
“Hello World”示例演示了同步和异步消息接收。你可以将`spring-rabbit-helloworld`示例导入到 IDE 中,然后按照下面的讨论进行操作。
##### 同步示例
在`src/main/java`目录中,导航到`org.springframework.amqp.helloworld`包。打开`HelloWorldConfiguration`类,注意它包含类级的`@Configuration`注释,并注意方法级的一些`@Bean`注释。这是 Spring 基于 Java 的配置的一个示例。你可以阅读有关[here](https://docs.spring.io/spring/docs/current/spring-framework-reference/html/beans.html#beans-java)的更多信息。
下面的清单显示了如何创建连接工厂:
```
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
```
配置还包含`RabbitAdmin`的实例,默认情况下,该实例查找类型为 exchange、queue 或 binding 的任何 bean,然后在代理上声明它们。实际上,在`HelloWorldConfiguration`中生成的`helloWorldQueue` Bean 是一个示例,因为它是`Queue`的一个实例。
下面的清单显示了`helloWorldQueue` Bean 的定义:
```
@Bean
public Queue helloWorldQueue() {
return new Queue(this.helloWorldQueueName);
}
```
回顾`rabbitTemplate` Bean 配置,可以看到它的名称`helloWorldQueue`设置为其`queue`属性(用于接收消息)和其`routingKey`属性(用于发送消息)。
既然我们已经研究了配置,我们就可以查看实际使用这些组件的代码了。首先,从同一个包中打开`Producer`类。它包含一个`main()`方法,其中创建了 Spring `ApplicationContext`。
下面的清单显示了`main`方法:
```
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}
```
在前面的示例中,检索`AmqpTemplate` Bean 并用于发送`Message`。由于客户端代码应该尽可能依赖于接口,所以类型是`AmqpTemplate`而不是`RabbitTemplate`。尽管在`HelloWorldConfiguration`中创建的 Bean 是`RabbitTemplate`的一个实例,但依赖于接口意味着此代码更可移植(你可以独立于代码来更改配置)。由于调用了`convertAndSend()`方法,模板将委托给它的`MessageConverter`实例。在这种情况下,它使用默认的`SimpleMessageConverter`,但是可以向`rabbitTemplate` Bean 提供不同的实现,如`HelloWorldConfiguration`中所定义的。
现在打开`Consumer`类。它实际上共享相同的配置基类,这意味着它共享`rabbitTemplate` Bean。这就是为什么我们将模板配置为`routingKey`(用于发送)和`queue`(用于接收)。正如我们在[`AmqpTemplate`]中描述的那样,你可以将’RoutingKey’参数传递给 send 方法,将’Queue’参数传递给 receive 方法。`Consumer`代码基本上是生产者的镜像,调用`receiveAndConvert()`而不是`convertAndSend()`。
下面的清单显示了`Consumer`的主要方法:
```
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}
```
如果运行`Producer`,然后运行`Consumer`,则应该在控制台输出中看到`Received: Hello World`。
##### 异步示例
[同步示例](#hello-world-sync)浏览了同步 Hello World 示例。这一部分描述了一个稍微更高级但功能更强大的选项。通过一些修改,Hello World 示例可以提供异步接收的示例,也称为消息驱动 POJO。实际上,有一个子包提供了以下内容:`org.springframework.amqp.samples.helloworld.async`。
同样,我们从发送方开始。打开`ProducerConfiguration`类,注意它创建了一个`connectionFactory`和一个`rabbitTemplate` Bean。这一次,由于配置是专门用于消息发送端的,因此我们甚至不需要任何队列定义,并且`RabbitTemplate`仅具有’RoutingKey’属性集。记住,消息是发送到交易所的,而不是直接发送到队列的。AMQP 默认交换是一种没有名称的直接交换。所有队列都绑定到该默认交换,并以它们的名称作为路由密钥。这就是为什么我们只需要在这里提供路由密钥。
下面的清单显示了`rabbitTemplate`的定义:
```
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
return template;
}
```
由于此示例演示了异步消息接收,所以生产端被设计为连续发送消息(如果它是一个类似于同步版本的每次执行消息的模型,那么它实际上就不是一个消息驱动的消费者了)。负责持续发送消息的组件被定义为`ProducerConfiguration`中的内部类。它被配置为每三秒运行一次。
下面的清单显示了该组件:
```
static class ScheduledProducer {
@Autowired
private volatile RabbitTemplate rabbitTemplate;
private final AtomicInteger counter = new AtomicInteger();
@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}
```
你不需要了解所有的细节,因为真正的重点应该是接收端(我们接下来将讨论这一点)。但是,如果你还不熟悉 Spring 任务调度支持,则可以了解更多[here](https://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-annotation-support)。简而言之,`postProcessor` Bean 中的`ProducerConfiguration`使用调度程序注册任务。
现在我们可以转向接受方了。为了强调消息驱动的 POJO 行为,我们从对消息做出反应的组件开始。这个类被称为`HelloWorldHandler`,如以下清单所示:
```
public class HelloWorldHandler {
public void handleMessage(String text) {
System.out.println("Received: " + text);
}
}
```
那门课很难上。它不扩展任何基类,不实现任何接口,甚至不包含任何导入。它正被 Spring AMQP`MessageListenerAdapter`接口“适配”到`MessageListener`接口。然后可以在`SimpleMessageListenerContainer`上配置该适配器。对于这个示例,容器是在`ConsumerConfiguration`类中创建的。你可以看到 POJO 包装在适配器那里。
下面的清单显示了`listenerContainer`是如何定义的:
```
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}
```
`SimpleMessageListenerContainer`是一个 Spring 生命周期组件,默认情况下,它会自动启动。如果你在`Consumer`类中查找,你可以看到它的`main()`方法仅由一行引导程序组成,用于创建`ApplicationContext`。生产者的`main()`方法也是一行引导程序,因为其方法被注释为`@Scheduled`的组件也会自动启动。你可以以任何顺序启动`Producer`和`Consumer`,并且你应该每三秒钟就会看到正在发送和接收的消息。
#### 4.4.2.股票交易
股票交易示例演示了比[Hello World 样本](#hello-world-sample)更高级的消息传递场景。然而,配置是非常相似的,如果有一点更多的参与。由于我们详细介绍了 Hello World 配置,在此,我们将重点讨论使这个示例有所不同的原因。有一个服务器将市场数据(股票行情)推送到主题交换。然后,客户端可以通过绑定具有路由模式的队列来订阅市场数据提要(例如,`app.stock.quotes.nasdaq.*`)。这个演示的另一个主要功能是由客户端发起并由服务器处理的请求-回复“股票交易”交互。这涉及一个私有`replyTo`队列,该队列由客户端在订单请求消息本身内发送。
服务器的核心配置在`RabbitServerConfiguration`包中的`org.springframework.amqp.rabbit.stocks.config.server`类中。它扩展了`AbstractStockAppRabbitConfiguration`。这里定义了服务器和客户机的公共资源,包括市场数据主题交换(其名称为“app.stock.marketdata”)和服务器为股票交易公开的队列(其名称为“app.stock.request”)。在该公共配置文件中,你还可以看到在`Jackson2JsonMessageConverter`上配置了`RabbitTemplate`。
特定于服务器的配置由两个部分组成。首先,它在`RabbitTemplate`上配置市场数据交换,这样它就不需要在每次调用发送`Message`时都提供该交换名称。它在基本配置类中定义的抽象回调方法中执行此操作。下面的列表显示了该方法:
```
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}
```
其次,声明股票请求队列。在这种情况下,它不需要任何显式绑定,因为它绑定到默认的无名称交换,并以自己的名称作为路由密钥。如前所述,AMQP 规范定义了这种行为。下面的清单显示了`stockRequestQueue` Bean 的定义:
```
@Bean
public Queue stockRequestQueue() {
return new Queue(STOCK_REQUEST_QUEUE_NAME);
}
```
现在你已经看到了服务器的 AMQP 资源的配置,请导航到`org.springframework.amqp.rabbit.stocks`目录下的`src/test/java`包。在这里,你可以看到实际的`Server`类,它提供了`main()`方法。它基于`server-bootstrap.xml`配置文件创建`ApplicationContext`。在这里,你可以看到发布虚拟市场数据的计划任务。这种配置依赖于 Spring 的`task`名称空间支持。引导程序配置文件还导入了其他一些文件。最有趣的是`server-messaging.xml`,它直接位于`src/main/resources`之下。在这里,你可以看到负责处理股票交易请求的`messageListenerContainer` Bean。最后,看看`serverHandler` Bean 中定义的`server-handlers.xml`(也在“SRC/main/resources”中)。 Bean 是`ServerHandler`类的一个实例,并且是消息驱动 POJO 的一个很好的示例,该 POJO 也可以发送回复消息。请注意,它本身并不耦合到框架或任何 AMQP 概念。它接受`TradeRequest`并返回`TradeResponse`。下面的清单显示了`handleMessage`方法的定义:
```
public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}
```
现在,我们已经了解了服务器最重要的配置和代码,我们可以转到客户机。最好的起点可能是`RabbitClientConfiguration`包中的`org.springframework.amqp.rabbit.stocks.config.client`。注意,它声明了两个队列,但没有提供显式的名称。下面的清单显示了这两个队列的 Bean 定义:
```
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
@Bean
public Queue traderJoeQueue() {
return amqpAdmin().declareQueue();
}
```
这些都是私有队列,并且会自动生成唯一的名称。客户端使用第一个生成的队列绑定到服务器公开的市场数据交换。回想一下,在 AMQP 中,消费者与队列交互,而生产者与交换器交互。队列与交易所的“绑定”是告诉代理将消息从给定的交易所传递(或路由)到队列的方法。由于市场数据交换是一个主题交换,因此绑定可以用路由模式表示。`RabbitClientConfiguration`使用`Binding`对象执行此操作,并且该对象是使用`BindingBuilder`Fluent API 生成的。下面的清单显示了`Binding`:
```
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
```
请注意,实际值已在一个属性文件中外部化(`client.properties``src/main/resources`),并且我们使用 Spring 的`@Value`注释来注入该值。这通常是个好主意。否则,该值将在类中进行硬编码,并且在不重新编译的情况下不可修改。在这种情况下,在更改用于绑定的路由模式时,运行客户机的多个版本要容易得多。我们现在可以试试。
首先运行`org.springframework.amqp.rabbit.stocks.Server`,然后运行`org.springframework.amqp.rabbit.stocks.Client`。你应该会看到`NASDAQ`股票的虚拟报价,因为在 client.properties 中与“stocks.quote.pattern”键关联的当前值是“app.stock.quotes.nasdaq.**”。现在,在保持现有`Server`和`Client`运行的同时,将该属性值更改为’app.stock.quotes.nyse.**’,并启动第二个`Client`实例。你应该看到,第一个客户端仍然接收纳斯达克报价,而第二个客户端接收纽交所报价。相反,你可以改变模式,以获得所有的股票,甚至一个单独的股票代码。
我们探索的最后一个功能是从客户的角度进行请求-回复交互。回想一下,我们已经看到了接受`ServerHandler`对象并返回`TradeResponse`对象的`ServerHandler`对象。在`Client`包中,`RabbitStockServiceGateway`侧的对应代码是`RabbitStockServiceGateway`。它委托给`RabbitTemplate`以发送消息。下面的清单显示了`send`方法:
```
public void send(TradeRequest tradeRequest) {
getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
try {
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
throw new AmqpException(e);
}
return message;
}
});
}
```
请注意,在发送消息之前,它设置了`replyTo`地址。它提供了由`traderJoeQueue` Bean 定义生成的队列(如前面所示)。下面的清单显示了`@Bean`类本身的`StockServiceGateway`定义:
```
@Bean
public StockServiceGateway stockServiceGateway() {
RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
return gateway;
}
```
如果你不再运行服务器和客户机,请立即启动它们。尝试发送格式为“100tckr”的请求。在模拟“处理”请求的短暂人为延迟之后,你应该会看到一个确认消息出现在客户机上。
#### 4.4.3.从非 Spring 份申请中接收 JSON
Spring 应用程序,在发送 JSON 时,将`*TypeId*`头设置为完全限定的类名,以协助接收应用程序将 JSON 转换回 Java 对象。
`spring-rabbit-json`示例探索了几种从非 Spring 应用程序转换 JSON 的技术。
另见[Jackson2JSONMessageConverter](#json-message-converter)以及`DefaultClassMapper`的[javadoc](https://DOCS. Spring.io/ Spring-amqp/DOCS/current/api/index.html?org/springframework/amqp/support/converter/defaultclassmapper.html)。
### 4.5.测试支持
为异步应用程序编写集成一定比测试更简单的应用程序更复杂。当`@RabbitListener`注释之类的抽象出现在图片中时,这就变得更加复杂了。问题是如何验证在发送消息后,侦听器是否如预期的那样接收到了消息。
框架本身有许多单元和集成测试。一些使用模拟,而另一些则使用与实时 RabbitMQ 代理的集成测试。你可以参考这些测试来获得测试场景的一些想法。
Spring AMQP 版本 1.6 引入了`spring-rabbit-test`JAR,它为测试这些更复杂的场景中的一些提供了支持。预计该项目将随着时间的推移而扩展,但我们需要社区反馈来为帮助测试所需的功能提供建议。请使用[JIRA](https://jira.spring.io/browse/AMQP)或[GitHub 问题](https://github.com/spring-projects/spring-amqp/issues)提供此类反馈。
#### 4.5.1.@SpringRabbitTest
使用此注释将基础设施 bean 添加到 Spring test`ApplicationContext`。在使用`@SpringBootTest`时,这是不必要的,因为 Spring boot 的自动配置将添加 bean。
已注册的 bean 有:
* `CachingConnectionFactory`(`autoConnectionFactory`)。如果存在`@RabbitEnabled`,则使用其连接工厂。
* `RabbitTemplate`(`autoRabbitTemplate`)
* `RabbitAdmin`(`autoRabbitAdmin`)
* `RabbitListenerContainerFactory`(`autoContainerFactory`)
此外,还添加了与`@EnableRabbit`相关的 bean(以支持`@RabbitListener`)。
例 4.JUnit5 示例
```
@SpringJunitConfig
@SpringRabbitTest
public class MyRabbitTests {
@Autowired
private RabbitTemplate template;
@Autowired
private RabbitAdmin admin;
@Autowired
private RabbitListenerEndpointRegistry registry;
@Test
void test() {
...
}
@Configuration
public static class Config {
...
}
}
```
使用 JUnit4,将`@SpringJunitConfig`替换为`@RunWith(SpringRunnner.class)`。
#### 4.5.2.mockito`Answer>`实现
目前有两个`Answer>`实现来帮助测试。
第一个是`LatchCountDownAndCallRealMethodAnswer`,它提供一个`Answer`,返回`null`并对锁存器进行倒数。下面的示例展示了如何使用`LatchCountDownAndCallRealMethodAnswer`:
```
LatchCountDownAndCallRealMethodAnswer answer = this.harness.getLatchAnswerFor("myListener", 2);
doAnswer(answer)
.when(listener).foo(anyString(), anyString());
...
assertThat(answer.await(10)).isTrue();
```
第二,`LambdaAnswer`提供了一种可选地调用实际方法的机制,并提供了一种返回自定义结果的机会,该结果基于`InvocationOnMock`和结果(如果有的话)。
考虑以下 POJO:
```
public class Thing {
public String thing(String thing) {
return thing.toUpperCase();
}
}
```
下面的类测试`Thing`POJO:
```
Thing thing = spy(new Thing());
doAnswer(new LambdaAnswer(true, (i, r) -> r + r))
.when(thing).thing(anyString());
assertEquals("THINGTHING", thing.thing("thing"));
doAnswer(new LambdaAnswer(true, (i, r) -> r + i.getArguments()[0]))
.when(thing).thing(anyString());
assertEquals("THINGthing", thing.thing("thing"));
doAnswer(new LambdaAnswer(false, (i, r) ->
"" + i.getArguments()[0] + i.getArguments()[0])).when(thing).thing(anyString());
assertEquals("thingthing", thing.thing("thing"));
```
从版本 2.2.3 开始,答案将捕获由 Test 下的方法引发的任何异常。使用`answer.getExceptions()`获取对它们的引用。
当与[`@RabbitListenerTest`和`RabbitListenerTestHarness`]结合使用时,使用`harness.getLambdaAnswerFor("listenerId", true, …)`可以为侦听器获得正确构造的答案。
#### 4.5.3.`@RabbitListenerTest`和`RabbitListenerTestHarness`
将一个`@Configuration`类注释为`@RabbitListenerTest`,会导致框架将标准`RabbitListenerAnnotationBeanPostProcessor`替换为一个名为`RabbitListenerTestHarness`的子类(它还通过`@EnableRabbit`启用`@RabbitListener`检测)。
`RabbitListenerTestHarness`通过两种方式增强了侦听器。首先,它将侦听器封装在`Mockito Spy`中,从而实现正常的`Mockito`存根和验证操作。它还可以向侦听器添加`Advice`,从而能够访问参数、结果和引发的任何异常。你可以通过`@RabbitListenerTest`上的属性来控制其中哪些(或两个)被启用。提供后者是为了访问关于调用的较低级别的数据。它还支持在调用异步监听器之前阻塞测试线程。
| |`final``@RabbitListener`方法不能被监视或通知。
此外,只有具有`id`属性的侦听器才能被监视或通知。|
|---|--------------------------------------------------------------------------------------------------------------------------------------|
举几个例子。
下面的示例使用 SPY:
```
@Configuration
@RabbitListenerTest
public class Config {
@Bean
public Listener listener() {
return new Listener();
}
...
}
public class Listener {
@RabbitListener(id="foo", queues="#{queue1.name}")
public String foo(String foo) {
return foo.toUpperCase();
}
@RabbitListener(id="bar", queues="#{queue2.name}")
public void foo(@Payload String foo, @Header("amqp_receivedRoutingKey") String rk) {
...
}
}
public class MyTests {
@Autowired
private RabbitListenerTestHarness harness; (1)
@Test
public void testTwoWay() throws Exception {
assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo"));
Listener listener = this.harness.getSpy("foo"); (2)
assertNotNull(listener);
verify(listener).foo("foo");
}
@Test
public void testOneWay() throws Exception {
Listener listener = this.harness.getSpy("bar");
assertNotNull(listener);
LatchCountDownAndCallRealMethodAnswer answer = this.harness.getLatchAnswerFor("bar", 2); (3)
doAnswer(answer).when(listener).foo(anyString(), anyString()); (4)
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "bar");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "baz");
assertTrue(answer.await(10));
verify(listener).foo("bar", this.queue2.getName());
verify(listener).foo("baz", this.queue2.getName());
}
}
```
|**1**|将线束注入到测试用例中,这样我们就可以访问间谍。|
|-----|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2**|获取对 SPY 的引用,这样我们就可以验证它是否如预期的那样被调用了。
由于这是一个发送和接收操作,因此不需要挂起测试线程,因为它已经
挂起在`RabbitTemplate`中等待回复。|
|**3**|在这种情况下,我们只使用一个发送操作,所以我们需要一个锁存器来等待对容器线程上的侦听器
的异步调用。
我们使用[Answer\\>](#mockito-answer)实现之一来帮助实现这一点。
重要:由于侦听器被监视的方式,使用`harness.getLatchAnswerFor()`为间谍获得正确配置的答案是很重要的。|
|**4**|将 SPY 配置为调用`Answer`。|
下面的示例使用了捕获建议:
```
@Configuration
@ComponentScan
@RabbitListenerTest(spy = false, capture = true)
public class Config {
}
@Service
public class Listener {
private boolean failed;
@RabbitListener(id="foo", queues="#{queue1.name}")
public String foo(String foo) {
return foo.toUpperCase();
}
@RabbitListener(id="bar", queues="#{queue2.name}")
public void foo(@Payload String foo, @Header("amqp_receivedRoutingKey") String rk) {
if (!failed && foo.equals("ex")) {
failed = true;
throw new RuntimeException(foo);
}
failed = false;
}
}
public class MyTests {
@Autowired
private RabbitListenerTestHarness harness; (1)
@Test
public void testTwoWay() throws Exception {
assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo"));
InvocationData invocationData =
this.harness.getNextInvocationDataFor("foo", 0, TimeUnit.SECONDS); (2)
assertThat(invocationData.getArguments()[0], equalTo("foo")); (3)
assertThat((String) invocationData.getResult(), equalTo("FOO"));
}
@Test
public void testOneWay() throws Exception {
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "bar");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "baz");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "ex");
InvocationData invocationData =
this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS); (4)
Object[] args = invocationData.getArguments();
assertThat((String) args[0], equalTo("bar"));
assertThat((String) args[1], equalTo(queue2.getName()));
invocationData = this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
args = invocationData.getArguments();
assertThat((String) args[0], equalTo("baz"));
invocationData = this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
args = invocationData.getArguments();
assertThat((String) args[0], equalTo("ex"));
assertEquals("ex", invocationData.getThrowable().getMessage()); (5)
}
}
```
|**1**|将线束注入到测试用例中,这样我们就可以访问间谍。|
|-----|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2**|使用`harness.getNextInvocationDataFor()`来检索调用数据-在这种情况下,因为这是一个请求/回复
场景,因此不需要等待任何时间,因为测试线程被挂起在`RabbitTemplate`中等待
结果。|
|**3**|然后,我们可以验证该论证和结果是否如预期的那样。|
|**4**|这一次我们需要一些时间来等待数据,因为这是容器线程上的异步操作,我们需要
来挂起测试线程。|
|**5**|当侦听器抛出异常时,它在调用数据的`throwable`属性中可用。|
| |当将自定义`Answer>`s 与线束一起使用时,为了正确地进行操作,这样的答案应该是子类`ForwardsInvocation`并从线束中获得实际的侦听器(而不是间谍)(`getDelegate("myListener")`)并调用
。
参见所提供的[mockito`Answer>`实现](#mockito-answer)源代码的示例。|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
#### 4.5.4.使用`TestRabbitTemplate`
`TestRabbitTemplate`用于执行一些基本的集成测试,而不需要代理。在测试用例中将其添加为`@Bean`时,它会发现上下文中的所有侦听器容器,无论是声明为`@Bean`还是``还是使用`@RabbitListener`注释。它目前仅支持按队列名进行路由。模板从容器中提取消息侦听器,并直接在测试线程上调用它。返回回复的侦听器支持请求-回复消息(`sendAndReceive`methods)。
下面的测试用例使用了模板:
```
@RunWith(SpringRunner.class)
public class TestRabbitTemplateTests {
@Autowired
private TestRabbitTemplate template;
@Autowired
private Config config;
@Test
public void testSimpleSends() {
this.template.convertAndSend("foo", "hello1");
assertThat(this.config.fooIn, equalTo("foo:hello1"));
this.template.convertAndSend("bar", "hello2");
assertThat(this.config.barIn, equalTo("bar:hello2"));
assertThat(this.config.smlc1In, equalTo("smlc1:"));
this.template.convertAndSend("foo", "hello3");
assertThat(this.config.fooIn, equalTo("foo:hello1"));
this.template.convertAndSend("bar", "hello4");
assertThat(this.config.barIn, equalTo("bar:hello2"));
assertThat(this.config.smlc1In, equalTo("smlc1:hello3hello4"));
this.template.setBroadcast(true);
this.template.convertAndSend("foo", "hello5");
assertThat(this.config.fooIn, equalTo("foo:hello1foo:hello5"));
this.template.convertAndSend("bar", "hello6");
assertThat(this.config.barIn, equalTo("bar:hello2bar:hello6"));
assertThat(this.config.smlc1In, equalTo("smlc1:hello3hello4hello5hello6"));
}
@Test
public void testSendAndReceive() {
assertThat(this.template.convertSendAndReceive("baz", "hello"), equalTo("baz:hello"));
}
```
```
@Configuration
@EnableRabbit
public static class Config {
public String fooIn = "";
public String barIn = "";
public String smlc1In = "smlc1:";
@Bean
public TestRabbitTemplate template() throws IOException {
return new TestRabbitTemplate(connectionFactory());
}
@Bean
public ConnectionFactory connectionFactory() throws IOException {
ConnectionFactory factory = mock(ConnectionFactory.class);
Connection connection = mock(Connection.class);
Channel channel = mock(Channel.class);
willReturn(connection).given(factory).createConnection();
willReturn(channel).given(connection).createChannel(anyBoolean());
given(channel.isOpen()).willReturn(true);
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() throws IOException {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
return factory;
}
@RabbitListener(queues = "foo")
public void foo(String in) {
this.fooIn += "foo:" + in;
}
@RabbitListener(queues = "bar")
public void bar(String in) {
this.barIn += "bar:" + in;
}
@RabbitListener(queues = "baz")
public String baz(String in) {
return "baz:" + in;
}
@Bean
public SimpleMessageListenerContainer smlc1() throws IOException {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueueNames("foo", "bar");
container.setMessageListener(new MessageListenerAdapter(new Object() {
@SuppressWarnings("unused")
public void handleMessage(String in) {
smlc1In += in;
}
}));
return container;
}
}
}
```
#### 4.5.5.JUnit4`@Rules`
Spring AMQP1.7 版及以后版本提供了一个名为`spring-rabbit-junit`的附加 JAR。此 JAR 包含两个实用程序`@Rule`实例,用于运行 JUnit4 测试时使用。关于 JUnit5 测试,请参见[JUnit5 条件](#junit5-conditions)。
##### 使用`BrokerRunning`
`BrokerRunning`提供了一种机制,当代理不运行时(默认情况下,在`localhost`上),让测试成功。
它还具有用于初始化和清空队列以及删除队列和交换的实用程序方法。
下面的示例展示了它的用法:
```
@ClassRule
public static BrokerRunning brokerRunning = BrokerRunning.isRunningWithEmptyQueues("foo", "bar");
@AfterClass
public static void tearDown() {
brokerRunning.removeTestQueues("some.other.queue.too") // removes foo, bar as well
}
```
有几个`isRunning…`静态方法,例如`isBrokerAndManagementRunning()`,它验证代理是否启用了管理插件。
###### 配置规则
有时,如果没有代理,你希望测试失败,比如夜间的 CI 构建。要在运行时禁用该规则,请将一个名为`RABBITMQ_SERVER_REQUIRED`的环境变量设置为`true`。
你可以使用 setter 或环境变量重写代理属性,例如 hostname:
下面的示例展示了如何使用 setter 重写属性:
```
@ClassRule
public static BrokerRunning brokerRunning = BrokerRunning.isRunningWithEmptyQueues("foo", "bar");
static {
brokerRunning.setHostName("10.0.0.1")
}
@AfterClass
public static void tearDown() {
brokerRunning.removeTestQueues("some.other.queue.too") // removes foo, bar as well
}
```
你还可以通过设置以下环境变量来覆盖属性:
```
public static final String BROKER_ADMIN_URI = "RABBITMQ_TEST_ADMIN_URI";
public static final String BROKER_HOSTNAME = "RABBITMQ_TEST_HOSTNAME";
public static final String BROKER_PORT = "RABBITMQ_TEST_PORT";
public static final String BROKER_USER = "RABBITMQ_TEST_USER";
public static final String BROKER_PW = "RABBITMQ_TEST_PASSWORD";
public static final String BROKER_ADMIN_USER = "RABBITMQ_TEST_ADMIN_USER";
public static final String BROKER_ADMIN_PW = "RABBITMQ_TEST_ADMIN_PASSWORD";
```
这些环境变量覆盖了默认设置(对于 AMQP 是`localhost:5672`,对于 Management REST API 是`[localhost:15672/api/](http://localhost:15672/api/)`)。
更改主机名会同时影响`amqp`和`management`RESTAPI 连接(除非显式设置了管理 URI)。
`BrokerRunning`还提供了一个名为`static`的`static`方法,该方法允许你在包含这些变量的映射中进行传递。它们覆盖系统环境变量。如果你希望对多个测试套件中的测试使用不同的配置,这可能会很有用。重要事项:在调用创建规则实例的任何`isRunning()`静态方法之前,必须调用该方法。变量值应用于此调用后创建的所有实例。调用`clearEnvironmentVariableOverrides()`重置规则以使用默认值(包括任何实际的环境变量)。
在你的测试用例中,你可以在创建连接工厂时使用`brokerRunning`;`getConnectionFactory()`返回规则的 RabbitMQ`ConnectionFactory`。下面的示例展示了如何做到这一点:
```
@Bean
public CachingConnectionFactory rabbitConnectionFactory() {
return new CachingConnectionFactory(brokerRunning.getConnectionFactory());
}
```
##### 使用`LongRunningIntegrationTest`
`LongRunningIntegrationTest`是一条禁用长时间运行测试的规则。你可能希望在开发人员系统上使用该规则,但要确保在例如 Nightly CI 构建上禁用该规则。
下面的示例展示了它的用法:
```
@Rule
public LongRunningIntegrationTest longTests = new LongRunningIntegrationTest();
```
要在运行时禁用该规则,请将一个名为`RUN_LONG_INTEGRATION_TESTS`的环境变量设置为`true`。
#### 4.5.6.JUnit5 条件
2.0.2 版引入了对 JUnit5 的支持。
##### 使用`@RabbitAvailable`注释
这个类级注释类似于在[JUnit4`@Rules`]中讨论的`BrokerRunning``@Rule`(#JUnit-rules)。它由`RabbitAvailableCondition`处理。
注释有三个属性:
* `queues`:在每次测试之前声明(和清除)并在所有测试完成后删除的队列数组。
* `management`:如果你的测试还需要在代理上安装管理插件,则将其设置为`true`。
* `purgeAfterEach`:(自版本 2.2)当`true`(默认)时,`queues`将在测试之间被清除。
它用于检查代理是否可用,如果不可用,则跳过测试。正如[配置规则](#brokerRunning-configure)中所讨论的,如果`RABBITMQ_SERVER_REQUIRED`没有代理,则称为`true`的环境变量会导致测试快速失败。可以通过使用[配置规则](#brokerRunning-configure)中讨论的环境变量来配置条件。
此外,`RabbitAvailableCondition`支持参数化测试构造函数和方法的参数解析。支持两种参数类型:
* `BrokerRunningSupport`:实例(在 2.2 之前,这是一个 JUnit4`BrokerRunning`实例)
* `ConnectionFactory`:`BrokerRunningSupport`实例的 RabbitMQ 连接工厂
下面的示例展示了这两个方面:
```
@RabbitAvailable(queues = "rabbitAvailableTests.queue")
public class RabbitAvailableCTORInjectionTests {
private final ConnectionFactory connectionFactory;
public RabbitAvailableCTORInjectionTests(BrokerRunningSupport brokerRunning) {
this.connectionFactory = brokerRunning.getConnectionFactory();
}
@Test
public void test(ConnectionFactory cf) throws Exception {
assertSame(cf, this.connectionFactory);
Connection conn = this.connectionFactory.newConnection();
Channel channel = conn.createChannel();
DeclareOk declareOk = channel.queueDeclarePassive("rabbitAvailableTests.queue");
assertEquals(0, declareOk.getConsumerCount());
channel.close();
conn.close();
}
}
```
前面的测试是在框架本身中进行的,并验证参数注入和条件是否正确地创建了队列。
一个实际的用户测试可能如下:
```
@RabbitAvailable(queues = "rabbitAvailableTests.queue")
public class RabbitAvailableCTORInjectionTests {
private final CachingConnectionFactory connectionFactory;
public RabbitAvailableCTORInjectionTests(BrokerRunningSupport brokerRunning) {
this.connectionFactory =
new CachingConnectionFactory(brokerRunning.getConnectionFactory());
}
@Test
public void test() throws Exception {
RabbitTemplate template = new RabbitTemplate(this.connectionFactory);
...
}
}
```
在测试类中使用 Spring 注释应用程序上下文时,可以通过一个名为`RabbitAvailableCondition.getBrokerRunning()`的静态方法获得对条件的连接工厂的引用。
| |从版本 2.2 开始,`getBrokerRunning()`返回一个`BrokerRunningSupport`对象;以前,返回的是 JUnit4`BrokerRunnning`实例。
新类的 API 与`BrokerRunning`相同。|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
下面的测试来自该框架,并演示了它的用法:
```
@RabbitAvailable(queues = {
RabbitTemplateMPPIntegrationTests.QUEUE,
RabbitTemplateMPPIntegrationTests.REPLIES })
@SpringJUnitConfig
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
public class RabbitTemplateMPPIntegrationTests {
public static final String QUEUE = "mpp.tests";
public static final String REPLIES = "mpp.tests.replies";
@Autowired
private RabbitTemplate template;
@Autowired
private Config config;
@Test
public void test() {
...
}
@Configuration
@EnableRabbit
public static class Config {
@Bean
public CachingConnectionFactory cf() {
return new CachingConnectionFactory(RabbitAvailableCondition
.getBrokerRunning()
.getConnectionFactory());
}
@Bean
public RabbitTemplate template() {
...
}
@Bean
public SimpleRabbitListenerContainerFactory
rabbitListenerContainerFactory() {
...
}
@RabbitListener(queues = QUEUE)
public byte[] foo(byte[] in) {
return in;
}
}
}
```
##### 使用`@LongRunning`注释
与`LongRunningIntegrationTest`JUnit4`@Rule`类似,除非将环境变量(或系统属性)设置为`true`,否则此注释将导致跳过测试。下面的示例展示了如何使用它:
```
@RabbitAvailable(queues = SimpleMessageListenerContainerLongTests.QUEUE)
@LongRunning
public class SimpleMessageListenerContainerLongTests {
public static final String QUEUE = "SimpleMessageListenerContainerLongTests.queue";
...
}
```
默认情况下,变量是`RUN_LONG_INTEGRATION_TESTS`,但你可以在注释的`value`属性中指定变量名。
## 5. Spring 整合-参考
参考文档的这一部分快速介绍了 Spring 集成项目中的 AMQP 支持。
### 5.1. Spring 集成 AMQP 支持
这一简短的章节涵盖了 Spring 集成和 Spring AMQP 项目之间的关系。
#### 5.1.1.导言
[Spring Integration](https://www.springsource.org/spring-integration)项目包括基于 Spring AMQP 项目的 AMQP 通道适配器和网关。这些适配器是在 Spring 集成项目中开发和发布的。在 Spring 集成中,“通道适配器”是单向的,而“网关”是双向的。我们提供了入站通道适配器、出站通道适配器、入站网关和出站网关。
由于 AMQP 适配器是 Spring 集成发行版的一部分,所以文档可以作为 Spring 集成发行版的一部分使用。我们在这里提供了主要功能的简要概述。有关更多详细信息,请参见[Spring Integration Reference Guide](https://docs.spring.io/spring-integration/reference/htmlsingle/)。
#### 5.1.2.入站通道适配器
要从队列接收 AMQP 消息,可以配置``。下面的示例展示了如何配置入站通道适配器:
```
```
#### 5.1.3.出站通道适配器
要将 AMQP 消息发送到 Exchange,可以配置``。你可以在 Exchange 名称之外提供一个“路由密钥”。下面的示例展示了如何定义出站通道适配器:
```
```
#### 5.1.4.入站网关
要从队列接收 AMQP 消息并响应其应答地址,可以配置``。下面的示例展示了如何定义入站网关:
```
```
#### 5.1.5.出站网关
要将 AMQP 消息发送到 Exchange 并从远程客户端接收回响应,可以配置``。你可以在 Exchange 名称之外提供一个“路由密钥”。下面的示例展示了如何定义出站网关:
```
```
## 6.其他资源
除了这个参考文档,还有许多其他资源可以帮助你了解 AMQP。
### 6.1.进一步阅读
对于那些不熟悉 AMQP 的人来说,[规格](https://www.amqp.org/resources/download)实际上是很有可读性的。当然,它是权威的信息来源, Spring AMQP 代码对于任何熟悉该规范的人都应该很容易理解。我们目前对 RabbitMQ 支持的实现基于他们的 2.8.x 版本,它正式支持 AMQP0.8 和 0.9.1.我们建议阅读 0.9.1 文档。
在 RabbitMQ[开始](https://www.rabbitmq.com/how.html)页面上有很多很棒的文章、演示文稿和博客。由于这是 Spring AMQP 当前唯一受支持的实现,因此我们还建议将其作为所有与代理相关的关注的一般起点。
## 附录 A:变更历史
这一节描述了在版本发生变化时所做的更改。
### a.1.当前版本
见[What’s New](#whats-new)。
### a.2.以前的版本
#### a.2.1.自 2.2 以来 2.3 的变化
本部分描述了版本 2.2 和版本 2.3 之间的更改。有关以前版本的更改,请参见[变更历史](#change-history)。
##### 连接工厂变更
现在提供了两个额外的连接工厂。有关更多信息,请参见[选择连接工厂](#choosing-factory)。
##### `@RabbitListener`变化
现在可以指定回复内容类型。有关更多信息,请参见[回复 ContentType](#reply-content-type)。
##### 消息转换器更改
如果`ObjectMapper`配置了自定义反序列化器,则`Jackson2JMessageConverter`s 现在可以反序列化抽象类(包括接口)。有关更多信息,请参见[反序列化抽象类](#jackson-abstract)。
##### 测试更改
提供了一个新的注释`@SpringRabbitTest`,用于在不使用`SpringBootTest`时自动配置一些基础设施 bean。有关更多信息,请参见[@SpringRabbitTest](#spring-rabbit-test)。
##### RabbitTemplate 更改
模板的`ReturnCallback`已被重构为`ReturnsCallback`,以便在 lambda 表达式中更简单地使用。有关更多信息,请参见[相关发布者确认并返回](#template-confirms)。
当使用 Returns 和相关确认时,`CorrelationData`现在需要唯一的`id`属性。有关更多信息,请参见[相关发布者确认并返回](#template-confirms)。
当使用直接回复时,你现在可以配置模板,这样服务器就不需要返回与回复相关的数据。有关更多信息,请参见[RabbitMQ 直接回复](#direct-reply-to)。
##### 侦听器容器更改
现在可以使用一个新的侦听器容器属性`consumeDelay`;当使用[RabbitMQ 分片插件](https://github.com/rabbitmq/rabbitmq-sharding)时,该属性很有帮助。
默认的`JavaLangErrorHandler`现在调用`System.exit(99)`。要恢复到以前的行为(什么都不做),请添加一个 no-ophandler。
容器现在支持`globalQos`属性,以便在全局范围内为通道而不是为通道上的每个消费者应用`prefetchCount`。
有关更多信息,请参见[消息侦听器容器配置](#containerAttributes)。
##### MessagePostProcessor 更改
压缩`MessagePostProcessor`s 现在使用逗号来分隔多个内容编码,而不是冒号。解压器可以处理这两种格式,但是,如果你使用这个版本生成的消息被 2.2.12 之前的版本使用,那么你应该将压缩器配置为使用旧的分隔符。有关更多信息,请参见[修改消息-压缩和更多](#post-processing)中的重要注释。
##### 多个代理支持改进
有关更多信息,请参见[多个代理(或集群)支持](#multi-rabbit)。
##### RepublishMessageRecoverer 更改
不提供此 recoverer 的支持发布服务器确认的新子类。有关更多信息,请参见[消息侦听器和异步情况](#async-listeners)。
#### a.2.2.自 2.1 以来 2.2 的变化
本部分描述了版本 2.1 和版本 2.2 之间的更改。
##### 软件包更改
以下类/接口已从`org.springframework.amqp.rabbit.core.support`移至`org.springframework.amqp.rabbit.batch`:
* `BatchingStrategy`
* `MessageBatch`
* `SimpleBatchingStrategy`
此外,`ListenerExecutionFailedException`已从`org.springframework.amqp.rabbit.listener.exception`移至`org.springframework.amqp.rabbit.support`。
##### 依赖项更改
JUnit(4)现在是一个可选的依赖项,并且将不再以传递依赖项的形式出现。
`spring-rabbit-junit`模块现在是**编译**模块中的`spring-rabbit-test`依赖项,以便在仅使用单个`spring-rabbit-test`时获得更好的目标应用程序开发体验,我们获得了 AMQP 组件的完整测试实用程序堆栈。
##### “breaking”API 变更
JUnit(5)`RabbitAvailableCondition.getBrokerRunning()`现在返回一个`BrokerRunningSupport`实例,而不是一个`BrokerRunning`实例,后者取决于 JUnit4.它有相同的 API,所以只需要更改任何引用的类名。有关更多信息,请参见[JUnit5 条件](#junit5-conditions)。
##### ListenerContainer 更改
默认情况下,即使确认模式是手动的,具有致命异常的消息现在也会被拒绝,并且不会重新请求。有关更多信息,请参见[异常处理](#exception-handling)。
监听器的性能现在可以使用微米计`Timer`s 进行监视。有关更多信息,请参见[监视监听器性能](#micrometer)。
##### @RabbitListener 更改
现在可以在每个侦听器上配置`executor`,覆盖工厂配置,以更容易地识别与侦听器关联的线程。现在可以使用注释的`ackMode`属性覆盖容器工厂的`acknowledgeMode`属性。有关更多信息,请参见[覆盖集装箱工厂的属性](#listener-property-overrides)。
当使用[batching](#receiving-batch)时,`@RabbitListener`方法现在可以在一个调用中接收一批完整的消息,而不是一次获得它们。
当一次只接收一条批处理消息时,最后一条消息的`isLastInBatch`消息属性设置为 true。
此外,收到的批处理消息现在包含`amqp_batchSize`头。
侦听器也可以使用在`SimpleMessageListenerContainer`中创建的批处理,即使该批处理不是由生成器创建的。有关更多信息,请参见[选择容器](#choose-container)。
Spring 数据投影接口现在由`Jackson2JsonMessageConverter`支持。有关更多信息,请参见[Using Spring Data Projection Interfaces](#data-projection)。
如果不存在`Jackson2JsonMessageConverter`属性,或者它是默认的(`application/octet-string`),则`Jackson2JsonMessageConverter`现在假定内容是 JSON。有关更多信息,请参见[converting from a`Message`](#Jackson2jsonMessageConverter-from-message)。
类似地,如果不存在`contentType`属性,或者它是默认的(`application/octet-string`),则`Jackson2XmlMessageConverter`现在假定内容是 XML。有关更多信息,请参见[`Jackson2XmlMessageConverter`](#Jackson2xml)。
当`@RabbitListener`方法返回结果时, Bean 和`Method`现在在回复消息属性中可用。这允许`beforeSendReplyMessagePostProcessor`的配置,例如,在回复中设置一个头,以指示在服务器上调用了哪个方法。有关更多信息,请参见[回复管理](#async-annotation-driven-reply)。
现在可以配置`ReplyPostProcessor`,以便在发送回复消息之前对其进行修改。有关更多信息,请参见[回复管理](#async-annotation-driven-reply)。
##### AMQP 日志附录更改
log4j 和 logback`AmqpAppender`s 现在支持`verifyHostname`SSL 选项。
现在还可以将这些附录配置为不将 MDC 条目添加为标题。引入了`addMdcAsHeaders`布尔选项来配置这样的行为。
Appenders 现在支持`SaslConfig`属性。
有关更多信息,请参见[日志记录子系统 AMQP 附录](#logging)。
##### MessageListenerAdapter 更改
`MessageListenerAdapter`现在提供了一个新的`buildListenerArguments(Object, Channel, Message)`方法来构建一个参数数组,这些参数将被传递到目标侦听器中,而一个旧的参数将被弃用。有关更多信息,请参见[`MessageListenerAdapter`]。
##### 交换/队列声明更改
用于创建`ExchangeBuilder`和`QueueBuilder`用于由`Exchange`声明的`Queue`对象的 fluent API 现在支持“众所周知”的参数。有关更多信息,请参见[用于队列和交换的 Builder API](#builder-api)。
`RabbitAdmin`有一个新的属性`explicitDeclarationsOnly`。有关更多信息,请参见[有条件声明](#conditional-declaration)。
##### 连接工厂变更
`CachingConnectionFactory`有一个新的属性`shuffleAddresses`。当提供代理节点地址列表时,将在创建连接之前对列表进行调整,以便尝试连接的顺序是随机的。有关更多信息,请参见[连接到集群](#cluster)。
当使用 Publisher 确认和返回时,回调现在在连接工厂的`executor`上调用。如果你在回调中执行 Rabbit 操作,这就避免了`amqp-clients`库中可能出现的死锁。有关更多信息,请参见[相关发布者确认并返回](#template-confirms)。
此外,现在使用`ConfirmType`枚举来指定发布服务器确认类型,而不是使用两个互斥的 setter 方法。
当启用 SSL 时,`RabbitConnectionFactoryBean`现在默认使用 TLS1.2.有关更多信息,请参见[`RabbitConnectionFactoryBean`和配置 SSL]。
##### 新的 MessagePostProcessor 类
当消息内容编码设置为`deflate`时,分别添加了类`DeflaterPostProcessor`和`InflaterPostProcessor`以支持压缩和解压。
##### 其他更改
`Declarables`对象(用于声明多个队列、交换、绑定)现在为每个类型都有一个过滤 getter。有关更多信息,请参见[声明交换、队列和绑定的集合](#collection-declaration)。
现在可以在`RabbitAdmin`处理其声明之前自定义每个`Declarable` Bean。有关更多信息,请参见[交换、队列和绑定的自动声明](#automatic-declaration)。
`singleActiveConsumer()`已被添加到`QueueBuilder`以设置`x-single-active-consumer`队列参数。有关更多信息,请参见[用于队列和交换的 Builder API](#builder-api)。
类型为`Class>`的出站标头现在使用`getName()`而不是`toString()`进行映射。有关更多信息,请参见[消息属性转换器](#message-properties-converters)。
现在支持恢复失败的生产者创建的批处理。有关更多信息,请参见[使用批处理侦听器重试](#batch-retry)。
#### a.2.3.自 2.0 以来 2.1 的变化
##### AMQP 客户库
Spring AMQP 现在使用由 RabbitMQ 团队提供的`amqp-client`库的 5.4.x 版本。默认情况下,此客户端配置了自动恢复功能。见[RabbitMQ 自动连接/拓扑恢复](#auto-recovery)。
| |从版本 4.0 开始,客户端默认支持自动恢复。
虽然与此功能兼容, Spring AMQP 有自己的恢复机制,并且客户端恢复功能通常不需要。
我们建议禁用`amqp-client`自动恢复,为了避免在代理可用但连接尚未恢复时获得`AutoRecoverConnectionNotCurrentlyOpenException`实例。
从 1.7.1 版本开始, Spring AMQP 禁用它,除非你显式地创建自己的 RabbitMQ 连接工厂并将其提供给`CachingConnectionFactory`。
RabbitMQ`ConnectionFactory`由`RabbitConnectionFactoryBean`创建的实例还具有默认禁用的选项。|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
##### 软件包更改
某些类已转移到不同的包。大多数是内部类,不会影响用户应用程序。两个例外是`ChannelAwareMessageListener`和`RabbitListenerErrorHandler`。这些接口现在在`org.springframework.amqp.rabbit.listener.api`中。
##### Publisher 确认更改
当存在未完成的确认时,启用了发布者确认的通道不会返回到缓存中。有关更多信息,请参见[相关发布者确认并返回](#template-confirms)。
##### 监听器容器工厂改进
现在,你可以使用侦听器容器工厂来创建任何侦听器容器,而不仅仅是用于`@RabbitListener`注释或`@RabbitListenerEndpointRegistry`注释的侦听器容器。有关更多信息,请参见[使用集装箱工厂](#using-container-factories)。
`ChannelAwareMessageListener`现在继承自`MessageListener`。
##### Broker 事件侦听器
引入`BrokerEventListener`以将选定的代理事件发布为`ApplicationEvent`实例。有关更多信息,请参见[代理事件监听器](#broker-events)。
##### RabbitAdmin 更改
`RabbitAdmin`发现类型为`Declarables`的 bean(这是`Declarable`-`Queue`,`Exchange`和`Binding`对象的容器)并在代理上声明所包含的对象。不鼓励用户使用声明`>`(和其他)的旧机制,而应该使用`Declarables`bean。默认情况下,旧机制将被禁用。有关更多信息,请参见[声明交换、队列和绑定的集合](#collection-declaration)。
`AnonymousQueue`实例现在声明为`x-queue-master-locator`,默认设置为`client-local`,以确保队列是在应用程序所连接的节点上创建的。有关更多信息,请参见[配置代理](#broker-configuration)。
##### RabbitTemplate 更改
现在,你可以使用`noLocalReplyConsumer`选项配置`RabbitTemplate`,以便在`sendAndReceive()`操作中控制用于回复消费者的`noLocal`标志。有关更多信息,请参见[请求/回复消息](#request-reply)。
`CorrelationData`对于发布者确认现在有一个`ListenableFuture`,你可以使用它来获得确认,而不是使用回调。当启用返回和确认时,相关数据(如果提供)将填充返回的消息。有关更多信息,请参见[相关发布者确认并返回](#template-confirms)。
现在提供了一个名为`replyTimedOut`的方法,用于通知子类答复已超时,从而允许进行任何状态清理。有关更多信息,请参见[回复超时](#reply-timeout)。
现在,你可以指定一个`ErrorHandler`,当发送回复时发生异常(例如,延迟回复)时,在使用`DirectReplyToMessageListenerContainer`(默认)请求/回复时要调用的`ErrorHandler`。参见`setReplyErrorHandler`上的`RabbitTemplate`。(也是从 2.0.11 开始)。
##### 消息转换
我们引入了一个新的`Jackson2XmlMessageConverter`,以支持将消息从 XML 格式转换为 XML 格式。有关更多信息,请参见[`Jackson2XmlMessageConverter`](#Jackson2xml)。
##### 管理 REST API
现在不赞成`RabbitManagementTemplate`,而赞成直接使用`com.rabbitmq.http.client.Client`(或`com.rabbitmq.http.client.ReactorNettyClient`)。有关更多信息,请参见[RabbitMQ REST API](#management-rest-api)。
##### `@RabbitListener`变化
现在可以将侦听器容器工厂配置为`RetryTemplate`,也可以在发送回复时使用`RecoveryCallback`。有关更多信息,请参见[启用监听器端点注释](#async-annotation-driven-enable)。
##### 异步`@RabbitListener`返回
`@RabbitListener`方法现在可以返回`ListenableFuture>`或`Mono>`。有关更多信息,请参见[异步`@RabbitListener`返回类型]。
##### 连接工厂 Bean 变更
默认情况下,`RabbitConnectionFactoryBean`现在调用`enableHostnameVerification()`。要恢复到以前的行为,请将`enableHostnameVerification`属性设置为`false`。
##### 连接工厂变更
现在,`CachingConnectionFactory`无条件地禁用底层 RabbitMQ`ConnectionFactory`中的自动恢复,即使构造函数中提供了预先配置的实例。虽然已经采取措施使 Spring AMQP 与自动恢复兼容,但在仍然存在问题的情况下出现了某些情况。 Spring AMQP 自 1.0.0 以来已经有了自己的恢复机制,并且不需要使用由客户端提供的恢复。虽然仍然有可能在`CachingConnectionFactory`后启用该功能(使用`cachingConnectionFactory.getRabbitConnectionFactory()``.setAutomaticRecoveryEnabled()`),**我们强烈建议你不要这样做。**被构造。如果在直接使用客户端工厂(而不是使用 Spring AMQP 组件)时需要自动恢复连接,我们建议你使用单独的 RabbitMQ。
##### 侦听器容器更改
如果存在`x-death`报头,那么默认的`ConditionalRejectingErrorHandler`现在将完全丢弃导致致命错误的消息。有关更多信息,请参见[异常处理](#exception-handling)。
##### 立即请求
引入了一个新的`ImmediateRequeueAmqpException`来通知侦听器容器消息必须重新排队。要使用此功能,需要添加一个新的`ImmediateRequeueMessageRecoverer`实现。
有关更多信息,请参见[消息侦听器和异步情况](#async-listeners)。
#### a.2.4.自 1.7 以来 2.0 的变化
##### 使用`CachingConnectionFactory`
从版本 2.0.2 开始,你可以将`RabbitTemplate`配置为使用与侦听器容器使用的不同的连接。这一变化避免了当生产商因任何原因而受阻时,消费者陷入僵局。有关更多信息,请参见[使用单独的连接](#separate-connection)。
##### AMQP 客户库
Spring AMQP 现在使用由 RabbitMQ 团队提供的`amqp-client`库的新的 5.0.x 版本。默认情况下,此客户端配置了自动恢复。见[RabbitMQ 自动连接/拓扑恢复](#auto-recovery)。
| |从版本 4.0 开始,客户端默认启用自动恢复。
虽然兼容此功能, Spring AMQP 有自己的恢复机制,并且客户端恢复功能一般不需要。
我们建议你禁用`amqp-client`自动恢复,为了避免在代理可用但连接尚未恢复时获得`AutoRecoverConnectionNotCurrentlyOpenException`实例。
从 1.7.1 版本开始, Spring AMQP 禁用它,除非你显式地创建自己的 RabbitMQ 连接工厂并将其提供给`CachingConnectionFactory`。
RabbitMQ`ConnectionFactory`由`RabbitConnectionFactoryBean`创建的实例还具有默认禁用的选项。|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
##### 一般变化
现在`ExchangeBuilder`默认情况下构建持久交换。在`@QeueueBinding`中使用的`@Exchange`注释也默认声明持久交换。在`@RabbitListener`中使用的`@Queue`注释默认情况下声明持久队列(如果命名)和非持久队列(如果匿名)。有关更多信息,请参见[用于队列和交换的 Builder API](#builder-api)和[注释驱动的监听器端点](#async-annotation-driven)。
##### 删除类
`UniquelyNameQueue`不再提供。使用唯一的名称创建持久的非自动删除队列是不常见的。这个类已被删除。如果你需要它的功能,请使用`new Queue(UUID.randomUUID().toString())`。
##### 新建侦听器容器
已在现有的`SimpleMessageListenerContainer`旁边添加了`DirectMessageListenerContainer`。有关选择使用哪个容器以及如何配置它们的信息,请参见[选择容器](#choose-container)和[消息侦听器容器配置](#containerAttributes)。
##### log4j 附录
由于 log4j 的报废,此附录不再可用。有关可用日志附录的信息,请参见[日志记录子系统 AMQP 附录](#logging)。
##### `RabbitTemplate`变更
| |以前,如果一个非事务性事务`RabbitTemplate`运行在事务性侦听器容器线程上,那么它就参与了一个现有事务。,
这是一个严重的错误,
但是,用户可能依赖于这种行为,
从 1.6.2 版本开始,你必须在模板上设置`channelTransacted`布尔,才能让它参与容器事务。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
`RabbitTemplate`现在使用`DirectReplyToMessageListenerContainer`(默认情况下),而不是为每个请求创建一个新的使用者。有关更多信息,请参见[RabbitMQ 直接回复](#direct-reply-to)。
`AsyncRabbitTemplate`现在支持直接回复。有关更多信息,请参见[异步兔子模板](#async-template)。
`RabbitTemplate`和`AsyncRabbitTemplate`现在有`receiveAndConvert`和`convertSendAndReceiveAsType`方法,它们接受一个`ParameterizedTypeReference`参数,让调用者指定将结果转换为哪个类型。这对于复杂类型或在消息头中未传递类型信息的情况特别有用。它需要一个`SmartMessageConverter`,如`Jackson2JsonMessageConverter`。有关更多信息,请参见[接收消息](#receiving-messages),[请求/回复消息](#request-reply),[异步兔子模板](#async-template),以及[从`Message`与`RabbitTemplate`转换](#json-complex)。
现在可以使用`RabbitTemplate`在专用通道上执行多个操作。有关更多信息,请参见[作用域操作](#scoped-operations)。
##### 侦听器适配器
一个方便的`FunctionalInterface`可用于使用带有`MessageListenerAdapter`的 lambdas。有关更多信息,请参见[`MessageListenerAdapter`](#message-listener-adapter)。
##### 侦听器容器更改
###### 预取默认值
预取默认值过去是 1,这可能导致有效消费者的利用率不足。默认的预取值现在是 250,这应该会使消费者在大多数常见的情况下都很忙,从而提高吞吐量。
| |在某些情况下,预取取值应该
较低,例如,对于较大的消息,尤其是在处理速度较慢的情况下(消息可能会将
累加到客户端进程中的大量内存中),如果需要严格的消息排序
(在这种情况下,预取值应该设置为 1),
还可以使用低容量消息传递和多个消费者(包括单个侦听器容器实例中的并发性),你可能希望减少预取,以使消息在消费者之间的分布更加均匀。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
有关 Prefetch 的更多背景信息,请参见这篇关于[RabbitMQ 中的消费者利用](https://www.rabbitmq.com/blog/2014/04/14/finding-bottlenecks-with-rabbitmq-3-3/)的文章和这篇关于[排队论](https://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/)的文章。
###### 消息计数
以前,对于容器发出的消息,`MessageProperties.getMessageCount()`返回`0`。此属性仅当你使用`basicGet`(例如,从`RabbitTemplate.receive()`方法)时才适用,并且现在对于容器消息初始化为`null`。
###### 事务回滚行为
无论是否配置了事务管理器,事务回滚上的消息重新队列现在都是一致的。有关更多信息,请参见[关于回滚收到的消息的说明](#transaction-rollback)。
###### 关机行为
如果容器线程不响应`shutdownTimeout`内的关机,则默认情况下将强制关闭通道。有关更多信息,请参见[消息侦听器容器配置](#containerAttributes)。
###### 接收消息后处理程序
如果`afterReceiveMessagePostProcessors`属性中的`MessagePostProcessor`返回`null`,则该消息将被丢弃(并在适当的情况下进行确认)。
##### 连接工厂变更
连接和通道侦听器接口现在提供了一种机制来获取有关异常的信息。有关更多信息,请参见[连接和通道侦听器](#connection-channel-listeners)和[发布是异步的——如何检测成功和失败](#publishing-is-async)。
现在提供了一个新的`ConnectionNameStrategy`,用于从`AbstractConnectionFactory`填充目标 RabbitMQ 连接的特定于应用程序的标识。有关更多信息,请参见[连接和资源管理](#connections)。
##### 重试更改
不再提供`MissingMessageIdAdvice`。它的功能现在是内置的。有关更多信息,请参见[同步操作中的故障和重试选项](#retry)。
##### 匿名队列命名
默认情况下,`AnonymousQueues`现在使用默认的`Base64UrlNamingStrategy`来命名,而不是简单的`UUID`字符串。有关更多信息,请参见[`AnonymousQueue`]。
##### `@RabbitListener`变更
现在可以在`@RabbitListener`注释中提供简单的队列声明(仅绑定到默认的交换)。有关更多信息,请参见[注释驱动的监听器端点](#async-annotation-driven)。
你现在可以配置`@RabbitListener`注释,以便将任何异常返回给发送者。你还可以配置`RabbitListenerErrorHandler`来处理异常。有关更多信息,请参见[处理异常](#annotation-error-handling)。
使用`@QueueBinding`注释时,现在可以将队列与多个路由键绑定。另外`@QueueBinding.exchange()`现在支持自定义交换类型,并默认声明持久交换。
现在,你可以在注释级设置侦听器容器的`concurrency`,而不必为不同的并发设置配置不同的容器工厂。
现在可以在注释级设置侦听器容器的`autoStartup`属性,从而覆盖容器工厂中的默认设置。
你现在可以在`RabbitListener`容器工厂中设置接收之后和发送之前的`MessagePostProcessor`实例。
有关更多信息,请参见[注释驱动的监听器端点](#async-annotation-driven)。
从版本 2.0.3 开始,类级`@RabbitHandler`上的一个`@RabbitListener`注释可以被指定为默认值。有关更多信息,请参见[多方法侦听器](#annotation-method-selection)。
##### 容器条件回滚
当使用外部事务管理器(例如 JDBC)时,当你为容器提供事务属性时,现在支持基于规则的回滚。现在,当你使用事务建议时,它也更加灵活。有关更多信息,请参见[条件回滚](#conditional-rollback)。
##### 删除 Jackson1.x 支持
在以前的版本中不推荐的 Jackson`1.x`转换器和相关组件现在已被删除。你可以使用基于 Jackson2.x 的类似组件。有关更多信息,请参见[Jackson2JSONMessageConverter](#json-message-converter)。
##### JSON 消息转换器
当将入站 JSON 消息的`*TypeId*`设置为`Hashtable`时,现在默认的转换类型是`LinkedHashMap`。在此之前,它是`Hashtable`。要恢复到`Hashtable`,可以在`DefaultClassMapper`上使用`setDefaultMapType`。
##### XML 解析器
在解析`Queue`和`Exchange`XML 组件时,如果存在`id`属性,则解析器不再将`name`属性值注册为 Bean 别名。有关更多信息,请参见[关于`id`和`name`属性的注释]。
##### 阻塞连接
现在可以将`com.rabbitmq.client.BlockedListener`注入`org.springframework.amqp.rabbit.connection.Connection`对象。此外,当连接被代理阻塞或解除锁定时,`ConnectionBlockedEvent`和`ConnectionUnblockedEvent`事件由`ConnectionFactory`发出。
有关更多信息,请参见[连接和资源管理](#connections)。
#### a.2.5.1.7 自 1.6 以来的变化
##### AMQP 客户库
Spring AMQP 现在使用由 RabbitMQ 团队提供的`amqp-client`库的新的 4.0.x 版本。默认情况下,此客户端配置了自动恢复功能。见[RabbitMQ 自动连接/拓扑恢复](#auto-recovery)。
| |4.0.x 客户端默认支持自动恢复。
虽然与此功能兼容, Spring AMQP 有自己的恢复机制,并且客户端恢复功能通常不需要。
我们建议禁用`amqp-client`自动恢复,为了避免在代理可用但连接尚未恢复时获得`AutoRecoverConnectionNotCurrentlyOpenException`实例。
从 1.7.1 版本开始, Spring AMQP 禁用它,除非你显式地创建自己的 RabbitMQ 连接工厂,并将其提供给`CachingConnectionFactory`。
RabbitMQ`ConnectionFactory`由`RabbitConnectionFactoryBean`创建的实例还具有默认禁用的选项。|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
##### log4j2 升级
最小的 log4j2 版本(对于`AmqpAppender`)现在是`2.7`。该框架不再与以前的版本兼容。有关更多信息,请参见[日志记录子系统 AMQP 附录](#logging)。
##### 翻录附录
默认情况下,此附录不再捕获调用方数据(方法、行号)。你可以通过设置`includeCallerData`配置选项重新启用它。有关可用日志附录的信息,请参见[日志记录子系统 AMQP 附录](#logging)。
##### Spring 重试升级
重试的最低版本现在是`1.2`。该框架不再与以前的版本兼容。
###### 关机行为
你现在可以将`forceCloseChannel`设置为`true`,这样,如果容器线程在`shutdownTimeout`内没有响应关机,则将强制关闭通道,从而导致任何未加锁的消息重新排队。有关更多信息,请参见[消息侦听器容器配置](#containerAttributes)。
##### fasterxml Jackson 升级
Jackson 的最低版本现在是`2.8`。该框架不再与以前的版本兼容。
##### JUnit`@Rules`
以前由框架内部使用的规则现在可以在一个名为`spring-rabbit-junit`的单独 JAR 中使用。有关更多信息,请参见[JUnit4`@Rules`]。
##### 容器条件回滚
当你使用外部事务管理器(例如 JDBC)时,当你为容器提供事务属性时,现在支持基于规则的回滚。现在,当你使用事务建议时,它也更加灵活。
##### 连接命名策略
现在提供了一个新的`ConnectionNameStrategy`,用于从`AbstractConnectionFactory`填充目标 RabbitMQ 连接的特定于应用程序的标识。有关更多信息,请参见[连接和资源管理](#connections)。
##### 侦听器容器更改
###### 事务回滚行为
现在,无论是否配置了事务管理器,都可以将事务回滚上的消息重新排队配置为一致。有关更多信息,请参见[关于回滚收到的消息的说明](#transaction-rollback)。
#### a.2.6.早期版本
有关以前版本的更改,请参见[以前的版本](#previous-whats-new)。
#### a.2.7.自 1.5 以来 1.6 的变化
##### 测试支持
现在提供了一个新的测试支持库。有关更多信息,请参见[测试支持](#testing)。
##### 建设者
现在可以使用为配置`Queue`和`Exchange`对象提供流畅 API 的构建器。有关更多信息,请参见[用于队列和交换的 Builder API](#builder-api)。
##### 名称空间变更
###### 连接工厂
现在可以将`thread-factory`添加到连接工厂 Bean 声明中——例如,用于命名由`amqp-client`库创建的线程。有关更多信息,请参见[连接和资源管理](#connections)。
当你使用`CacheMode.CONNECTION`时,现在可以限制允许的连接总数。有关更多信息,请参见[连接和资源管理](#connections)。
###### 队列定义
现在可以为匿名队列提供命名策略。有关更多信息,请参见[`AnonymousQueue`]。
##### 侦听器容器更改
###### 空闲消息侦听器检测
现在可以将侦听器容器配置为在空闲时发布`ApplicationEvent`实例。有关更多信息,请参见[检测空闲异步消费者](#idle-containers)。
###### 不匹配队列检测
默认情况下,当侦听器容器启动时,如果检测到具有不匹配属性或参数的队列,则容器会记录异常,但会继续侦听。容器现在具有一个名为`mismatchedQueuesFatal`的属性,如果在启动过程中检测到问题,该属性将阻止容器(和上下文)启动。如果稍后检测到问题,例如从连接失败中恢复后,它还会停止容器。有关更多信息,请参见[消息侦听器容器配置](#containerAttributes)。
###### 侦听器容器日志记录
现在,侦听器容器将其`beanName`提供给内部`SimpleAsyncTaskExecutor`作为`threadNamePrefix`。它对日志分析很有用。
###### 默认错误处理程序
默认的错误处理程序(`ConditionalRejectingErrorHandler`)现在认为不可恢复的`@RabbitListener`异常是致命的。有关更多信息,请参见[异常处理](#exception-handling)。
##### `AutoDeclare`和`RabbitAdmin`实例
有关在应用程序上下文中使用`RabbitAdmin`实例对该选项语义的一些更改,请参见[消息侦听器容器配置](#containerAttributes)(`autoDeclare`)。
##### `AmqpTemplate`:超时接收
已经为`AmqpTemplate`及其`RabbitTemplate`实现引入了许多带有`timeout`的新`receive()`方法。有关更多信息,请参见[轮询消费者](#polling-consumer)。
##### 使用`AsyncRabbitTemplate`
引入了一个新的`AsyncRabbitTemplate`。此模板提供了许多发送和接收方法,其中返回值为`ListenableFuture`,以后可以使用它同步或异步地获得结果。有关更多信息,请参见[异步兔子模板](#async-template)。
##### `RabbitTemplate`变化
1.4.1 引入了在代理支持[直接回复](https://www.rabbitmq.com/direct-reply-to.html)时使用[直接回复](https://www.rabbitmq.com/direct-reply-to.html)的能力。这比为每个回复使用临时队列更有效。此版本允许你通过将`useTemporaryReplyQueues`属性设置为`true`来覆盖此默认行为并使用临时队列。有关更多信息,请参见[RabbitMQ 直接回复](#direct-reply-to)。
`RabbitTemplate`现在支持`user-id-expression`(在使用 Java 配置时`userIdExpression`)。有关更多信息,请参见[经过验证的用户 ID RabbitMQ 文档](https://www.rabbitmq.com/validated-user-id.html)和[已验证的用户 ID](#template-user-id)。
##### 消息属性
###### 使用`CorrelationId`
消息属性`correlationId`现在可以是`String`。有关更多信息,请参见[消息属性转换器](#message-properties-converters)。
###### 长字符串标题
以前,`DefaultMessagePropertiesConverter`将比长字符串限制(缺省 1024)更长的头转换为`DataInputStream`(实际上,它引用了`LongString`实例的`DataInputStream`)。在输出时,这个头没有被转换(除了转换为字符串——例如,通过在流上调用`toString()`,`[[email protected]](/cdn-cgi/l/email-protection)`)。
在此版本中,长`LongString`实例现在默认保留为`LongString`实例。你可以使用`getBytes[]`、`toString()`或`getStream()`方法访问内容。大量输入的`LongString`现在也可以在输出上正确地“转换”。
有关更多信息,请参见[消息属性转换器](#message-properties-converters)。
###### 入站交付模式
`deliveryMode`属性不再映射到`MessageProperties.deliveryMode`。如果使用相同的`MessageProperties`对象发送出站消息,则此更改将避免意外传播。相反,入站`deliveryMode`标头被映射到`MessageProperties.receivedDeliveryMode`。
有关更多信息,请参见[消息属性转换器](#message-properties-converters)。
当使用带注释的端点时,标题提供在名为`AmqpHeaders.RECEIVED_DELIVERY_MODE`的标题中。
有关更多信息,请参见[带注释的端点方法签名](#async-annotation-driven-enable-signature)。
###### 入站用户 ID
`user_id`属性不再映射到`MessageProperties.userId`。如果使用相同的`MessageProperties`对象发送出站消息,则此更改将避免意外传播。相反,入站`userId`头被映射到`MessageProperties.receivedUserId`。
有关更多信息,请参见[消息属性转换器](#message-properties-converters)。
当你使用带注释的端点时,标题将在名为`AmqpHeaders.RECEIVED_USER_ID`的标题中提供。
有关更多信息,请参见[带注释的端点方法签名](#async-annotation-driven-enable-signature)。
##### `RabbitAdmin`变化
###### 宣告失败
以前,`ignoreDeclarationFailures`标志仅对通道上的`IOException`生效(例如不匹配的参数)。它现在对任何例外情况生效(例如`TimeoutException`)。此外,现在只要声明失败,就会发布`DeclarationExceptionEvent`。`RabbitAdmin`Last declaration 事件也可以作为属性`lastDeclarationExceptionEvent`使用。有关更多信息,请参见[配置代理](#broker-configuration)。
##### `@RabbitListener`变更
###### 每个容器有多个 Bean
当你使用 Java8 或更高版本时,你现在可以将多个`@RabbitListener`注释添加到`@Bean`类或它们的方法中。当使用 Java7 或更早版本时,你可以使用`@RabbitListeners`容器注释来提供相同的功能。有关更多信息,请参见[`@Repeatable``@RabbitListener`]。
###### `@SendTo`spel 表达式
`@SendTo`用于路由不带`replyTo`属性的回复,现在可以根据请求/回复计算 SPEL 表达式。有关更多信息,请参见[回复管理](#async-annotation-driven-reply)。
###### `@QueueBinding`改进
现在可以在`@QueueBinding`注释中指定队列、交换和绑定的参数。现在`@QueueBinding`支持报头交换。有关更多信息,请参见[注释驱动的监听器端点](#async-annotation-driven)。
##### 延迟消息交换
Spring AMQP 现在拥有对 RabbitMQ 延迟消息交换插件的一流支持。有关更多信息,请参见[延迟的消息交换](#delayed-message-exchange)。
##### 交易所内部标志
任何`Exchange`定义现在都可以标记为`internal`,并且`RabbitAdmin`在声明交易所时将该值传递给代理。有关更多信息,请参见[配置代理](#broker-configuration)。
##### `CachingConnectionFactory`变化
###### `CachingConnectionFactory`缓存统计信息
`CachingConnectionFactory`现在在运行时和 JMX 上提供缓存属性。有关更多信息,请参见[运行时缓存属性](#runtime-cache-properties)。
###### 访问底层 RabbitMQ 连接工厂
添加了一个新的 getter 以提供对底层工厂的访问。例如,你可以使用此 getter 添加自定义连接属性。有关更多信息,请参见[添加自定义客户端连接属性](#custom-client-props)。
###### 通道缓存
默认通道缓存大小已从 1 增加到 25.有关更多信息,请参见[连接和资源管理](#connections)。
此外,`SimpleMessageListenerContainer`不再调整缓存大小,使其至少与`concurrentConsumers`的数量一样大——这是多余的,因为容器使用者通道永远不会被缓存。
##### 使用`RabbitConnectionFactoryBean`
工厂 Bean 现在公开一个属性,以便将客户机连接属性添加到由结果工厂创建的连接中。
##### Java 反序列化
现在,你可以在使用 Java 反序列化时配置一个允许类的“允许列表”。如果你接受来自不受信任的源的带有序列化 Java 对象的消息,那么你应该考虑创建一个允许的列表。有关更多信息,请参见[Java 反序列化](#java-deserialization)。
##### JSON `MessageConverter`
对 JSON 消息转换器的改进现在允许使用消息头中没有类型信息的消息。有关更多信息,请参见[带注释方法的消息转换](#async-annotation-conversion)和[Jackson2JSONMessageConverter](#json-message-converter)。
##### 日志附录
###### log4j2
添加了一个 log4j2Appender,现在可以将 Appenders 配置为`addresses`属性,以连接到代理群集。
###### 客户端连接属性
现在可以将自定义客户端连接属性添加到 RabbitMQ 连接中。
有关更多信息,请参见[日志记录子系统 AMQP 附录](#logging)。
#### a.2.8.1.5 自 1.4 以来的变化
##### `spring-erlang`不再支持
`spring-erlang`JAR 不再包含在分发版中。用[RabbitMQ REST API](#management-rest-api)代替。
##### `CachingConnectionFactory`变化
###### 中的空地址属性`CachingConnectionFactory`
以前,如果连接工厂配置了主机和端口,但也为`addresses`提供了一个空字符串,则会忽略主机和端口。现在,一个空的`addresses`字符串被处理为与`null`相同的字符串,并且使用了主机和端口。
###### URI 构造函数
`CachingConnectionFactory`有一个附加的构造函数,带有`URI`参数,用于配置代理连接。
###### 连接重置
添加了一个名为`resetConnection()`的新方法,让用户重置连接(或连接)。例如,你可以使用它在失败转移到次要代理之后重新连接到主代理。这**是吗?**会影响进程内操作。现有的`destroy()`方法做的完全相同,但是新方法的名称不那么令人生畏。
##### 控制容器队列声明行为的属性
当监听器容器使用者启动时,他们尝试被动地声明队列,以确保它们在代理上可用。在此之前,如果这些声明失败(例如,因为队列不存在),或者当一个 HA 队列被移动时,重试逻辑被固定为以五秒为间隔的三次重试尝试。如果队列仍然不存在,则行为由`missingQueuesFatal`属性控制(默认:`true`)。此外,对于配置为监听多个队列的容器,如果只有一个队列子集可用,那么使用者将在 60 秒的固定时间间隔内重试丢失的队列。
`declarationRetries`、`failedDeclarationRetryInterval`和`retryDeclarationInterval`属性现在是可配置的。有关更多信息,请参见[消息侦听器容器配置](#containerAttributes)。
##### 类包更改
将`RabbitGatewaySupport`类从`o.s.amqp.rabbit.core.support`移动到`o.s.amqp.rabbit.core`。
##### `DefaultMessagePropertiesConverter`变化
你现在可以配置`DefaultMessagePropertiesConverter`以确定将`LongString`转换为`String`而不是转换为`DataInputStream`的最大长度。转换器有一个替代的构造函数,它将值作为一个限制。以前,这个限制是以`1024`字节进行硬编码的。(也可在 1.4.4 中获得)。
##### `@RabbitListener`改进
###### `@QueueBinding`表示`@RabbitListener`
`bindings`属性已被添加到`@RabbitListener`注释中,作为与`queues`属性的互斥,以允许`queue`的规范,其`exchange`和`binding`用于代理上的`RabbitAdmin`声明。
###### spel in`@SendTo`
`@RabbitListener`的默认回复地址(`@SendTo`)现在可以是 SPEL 表达式。
###### 通过属性获得多个队列名称
现在可以使用 SPEL 和属性占位符的组合来为侦听器指定多个队列。
有关更多信息,请参见[注释驱动的监听器端点](#async-annotation-driven)。
##### 自动交换、队列和绑定声明
现在可以声明定义这些实体集合的 bean,并且`RabbitAdmin`将内容添加到它在建立连接时声明的实体列表中。有关更多信息,请参见[声明交换、队列和绑定的集合](#collection-declaration)。
##### `RabbitTemplate`变更
###### `reply-address`已添加
已将`reply-address`属性添加到``组件中,作为替代`reply-queue`。有关更多信息,请参见[请求/回复消息](#request-reply)。(也可以在 1.4.4 中作为`RabbitTemplate`上的 setter)。
###### 阻塞`receive`方法
`RabbitTemplate`现在支持`receive`和`convertAndReceive`方法中的阻塞。有关更多信息,请参见[轮询消费者](#polling-consumer)。
###### 强制使用`sendAndReceive`方法
当使用`mandatory`和`convertSendAndReceive`方法设置`mandatory`标志时,如果无法传递请求消息,则调用线程将抛出`AmqpMessageReturnedException`。有关更多信息,请参见[回复超时](#reply-timeout)。
###### 不正确的应答侦听器配置
当使用命名的应答队列时,框架将尝试验证应答侦听器容器的正确配置。
有关更多信息,请参见[回复监听器容器](#reply-listener)。
##### `RabbitManagementTemplate`已添加
引入了`RabbitManagementTemplate`,通过使用其[管理插件 Name](https://www.rabbitmq.com/management.html)提供的 REST API 来监视和配置 RabbitMQ 代理。有关更多信息,请参见[RabbitMQ REST API](#management-rest-api)。
##### 侦听器容器 Bean 名称
| |``元素上的`id`属性已被删除。
从这个版本开始,单独使用``子元素上的`id`来命名为每个侦听器元素创建的侦听器容器 Bean。将应用
正常的 Spring Bean 名称重载。
如果稍后的``与现有的 Bean 相同的`id`进行解析,新的定义覆盖了现有的定义。
以前, Bean 名称是由`id`和``元素的``属性组成的。
当迁移到此版本时,如果你的`id`元素上有`id`属性,删除它们并在子元素``上设置`id`。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
但是,为了支持作为一个组开始和停止容器,添加了一个新的`group`属性。定义此属性时,由此元素创建的容器将添加到具有此名称的 Bean,类型为`Collection`。你可以迭代这个组来启动和停止容器。
##### class-level`@RabbitListener`
现在可以在类级别应用`@RabbitListener`注释。与新的`@RabbitHandler`方法注释一起,这允许你基于有效负载类型选择处理程序方法。有关更多信息,请参见[多方法侦听器](#annotation-method-selection)。
##### `SimpleMessageListenerContainer`:Backoff 支持
现在可以为`SimpleMessageListenerContainer`启动恢复提供`BackOff`实例。有关更多信息,请参见[消息侦听器容器配置](#containerAttributes)。
##### 通道关闭日志记录
介绍了一种控制信道关闭日志级别的机制。见[记录通道关闭事件](#channel-close-logging)。
##### 应用程序事件
现在,当使用者失败时,`SimpleMessageListenerContainer`将发出应用程序事件。有关更多信息,请参见[消费者活动](#consumer-events)。
##### 消费者标签配置
以前,异步消费者的消费者标记是由代理生成的。通过这个版本,现在可以向侦听器容器提供命名策略。见[消费者标签](#consumerTags)。
##### 使用`MessageListenerAdapter`
`MessageListenerAdapter`现在支持队列名称(或使用者标记)到方法名称的映射,以根据接收消息的队列确定调用哪个委托方法。
##### `LocalizedQueueConnectionFactory`已添加
`LocalizedQueueConnectionFactory`是一个新的连接工厂,它连接到集群中实际驻留镜像队列的节点。
参见[Queue Affinity 和`LocalizedQueueConnectionFactory`]。
##### 匿名队列命名
从版本 1.5.3 开始,你现在可以控制`AnonymousQueue`名称的生成方式。有关更多信息,请参见[`AnonymousQueue`]。
#### a.2.9.自 1.3 以来 1.4 的变化
##### `@RabbitListener`注释
POJO 侦听器可以使用`@RabbitListener`进行注释,也可以使用`@EnableRabbit`或``。 Spring 此功能需要框架 4.1.有关更多信息,请参见[注释驱动的监听器端点](#async-annotation-driven)。
##### `RabbitMessagingTemplate`已添加
一个新的`RabbitMessagingTemplate`允许你通过使用`spring-messaging``Message`实例与 RabbitMQ 进行交互。在内部,它使用`RabbitTemplate`,你可以将其配置为常规配置。 Spring 此功能需要框架 4.1.有关更多信息,请参见[消息传递集成](#template-messaging)。
##### 侦听器容器`missingQueuesFatal`属性
1.3.5 在`SimpleMessageListenerContainer`上引入了`missingQueuesFatal`属性。这现在在侦听器容器名称空间元素上可用。见[消息侦听器容器配置](#containerAttributes)。
##### RabbitTemplate`ConfirmCallback`接口
这个接口上的`confirm`方法有一个额外的参数,称为`cause`。当可用时,此参数包含否定确认的原因。见[相关发布者确认并返回](#template-confirms)。
##### `RabbitConnectionFactoryBean`已添加
`RabbitConnectionFactoryBean`创建由`CachingConnectionFactory`使用的底层 RabbitMQ`ConnectionFactory`。这允许使用 Spring 的依赖注入配置 SSL 选项。见[配置底层客户机连接工厂](#connection-factory)。
##### 使用`CachingConnectionFactory`
现在,`CachingConnectionFactory`允许将`connectionTimeout`设置为名称空间中的一个属性或属性。它在底层 RabbitMQ`ConnectionFactory`上设置属性。见[配置底层客户机连接工厂](#connection-factory)。
##### 日志附录
已引入了注销`org.springframework.amqp.rabbit.logback.AmqpAppender`。它提供了类似于`org.springframework.amqp.rabbit.log4j.AmqpAppender`的选项。有关更多信息,请参见这些类的 Javadoc。
log4j`AmqpAppender`现在支持`deliveryMode`属性(`PERSISTENT`或`NON_PERSISTENT`,缺省:`PERSISTENT`)。以前,所有的 log4j 消息都是`PERSISTENT`。
Appender 还支持在发送前修改`Message`——例如,允许添加自定义标头。子类应该覆盖`postProcessMessageBeforeSend()`。
##### 侦听器队列
现在,默认情况下,侦听器容器会在启动过程中重新声明任何丢失的队列。已向``添加了一个新的`auto-declare`属性,以防止这些重新声明。参见[`auto-delete`队列]。
##### `RabbitTemplate`:`mandatory`和`connectionFactorySelector`表达式
而`mandatoryExpression`,`sendConnectionFactorySelectorExpression`和`receiveConnectionFactorySelectorExpression`spel 表达式`s properties have been added to `RabbitTemplate`. The `mandatoryexpression` is used to evaluate a `强制` boolean value against each request message when a `returncall` is in use. See [Correlated Publisher Confirms and Returns](#template-confirms). The `sendConnectionygt=“factorexpression<4214"/>factortortoryr=“actorpressionmentfactortortorypresslausion`的``现在支持解析``子元素。现在,你可以使用`key/value`属性对配置``中的``(以便在单个标头上进行匹配)或使用``子元素(允许在多个标头上进行匹配)。这些选择是相互排斥的。见[头交换](#headers-exchange)。
##### 路由连接工厂
引入了一个新的`SimpleRoutingConnectionFactory`。它允许配置`ConnectionFactories`映射,以确定在运行时使用的目标`ConnectionFactory`。见[路由连接工厂](#routing-connection-factory)。
##### `MessageBuilder`和`MessagePropertiesBuilder`
现在提供了用于构建消息或消息属性的“Fluent API”。见[Message Builder API](#message-builder)。
##### `RetryInterceptorBuilder`变化
现在提供了用于构建侦听器容器重试拦截器的“Fluent API”。见[同步操作中的故障和重试选项](#retry)。
##### `RepublishMessageRecoverer`已添加
提供了这个新的`MessageRecoverer`,以允许在重试用完时将失败的消息发布到另一个队列(包括消息头中的堆栈跟踪信息)。见[消息侦听器和异步情况](#async-listeners)。
##### 默认错误处理程序(自 1.3.2 起)
已将默认的`ConditionalRejectingErrorHandler`添加到侦听器容器中。此错误处理程序检测到致命的消息转换问题,并指示容器拒绝该消息,以防止代理继续重新交付不可转换的消息。见[异常处理](#exception-handling)。
##### 侦听器容器“missingqueuesfatal”属性(自 1.3.5)
`SimpleMessageListenerContainer`现在有一个名为`missingQueuesFatal`的属性(默认值:`true`)。以前,排不上队总是致命的。见[消息侦听器容器配置](#containerAttributes)。
#### a.2.11.自 1.1 以来对 1.2 的更改
##### RabbitMQ 版本
Spring AMQP 现在默认使用 RabbitMQ3.1.x(但保留了与早期版本的兼容性)。对于 RabbitMQ3.1.x——联邦交换和`RabbitTemplate`上的`immediate`属性不再支持的特性,已经添加了某些异议。
##### 兔子管理员
`RabbitAdmin`现在提供了一个选项,可以在声明失败时让 Exchange、Queue 和 binding 声明继续。以前,所有的声明都是在失败时停止的。通过设置`ignore-declaration-exceptions`,将记录此类异常(在`WARN`级别),但会继续进行进一步的声明。这可能有用的一个例子是,当队列声明失败时,原因是一个稍微不同的`ttl`设置,该设置通常会阻止其他声明继续进行。
`RabbitAdmin`现在提供了一个名为`getQueueProperties()`的附加方法。你可以使用它来确定代理上是否存在队列(对于不存在的队列,返回`null`)。此外,它返回队列中当前消息的数量以及当前消费者的数量。
##### 兔子模板
以前,当`…sendAndReceive()`方法与固定的应答队列一起使用时,将使用两个自定义标头来进行相关数据以及保留和恢复应答队列信息。在此版本中,默认情况下使用标准消息属性(`correlationId`),尽管你可以指定要使用的自定义属性。此外,嵌套的`replyTo`信息现在被保留在模板内部,而不是使用自定义标头。
`immediate`属性已弃用。在使用 RabbitMQ3.0.x 或更高版本时,不能设置此属性。
##### JSON 消息转换器
现在提供了一个 Jackson2.x`MessageConverter`,以及使用 Jackson1.x 的现有转换器。
##### 队列和其他项的自动声明
以前,在声明队列、交换和绑定时,你无法定义声明使用哪个连接工厂。每个`RabbitAdmin`通过使用其连接声明所有组件。
从这个版本开始,你现在可以将声明限制为特定的`RabbitAdmin`实例。见[有条件声明](#conditional-declaration)。
##### AMQP Remoting
现在提供了用于使用 Spring 远程技术的设施,使用 AMQP 作为 RPC 调用的传输。有关更多信息,请参见[Spring Remoting with AMQP](#remoting)
##### 请求心跳
一些用户要求在 Spring AMQP`CachingConnectionFactory`上公开底层客户机连接工厂的`requestedHeartBeats`属性。这是现在可用的。在此之前,有必要将 AMQP 客户端工厂配置为单独的 Bean,并在`CachingConnectionFactory`中提供对它的引用。
#### a.2.12.自 1.0 以来对 1.1 的更改
##### 一般
Spring-AMQP 现在是用 Gradle 构建的。
添加对 Publisher 确认和返回的支持。
添加对 HA 队列和代理故障转移的支持。
增加了对死信交换和死信队列的支持。
##### AMQP log4j Appender
添加一个选项,以支持将消息 ID 添加到已记录的消息中。
添加一个选项,允许在将`String`转换为`byte[]`时使用`Charset`名称的规范。