# 核心消息传递 ## 消息传递通道 ### 消息通道 虽然`Message`在封装数据方面发挥着关键作用,但将消息生产者与消息消费者分离开来的是`MessageChannel`。 #### MessageChannel 接口 Spring 集成的顶级`MessageChannel`接口定义如下: ``` public interface MessageChannel { boolean send(Message message); boolean send(Message message, long timeout); } ``` 发送消息时,如果消息发送成功,则返回值为`true`。如果发送调用超时或被中断,它将返回`false`。 ##### `PollableChannel` 由于消息通道可能会或可能不会缓冲消息(如[Spring Integration Overview](./overview.html#overview)中所讨论的),因此两个子接口定义了缓冲和非缓冲通道行为。下面的清单显示了`PollableChannel`接口的定义: ``` public interface PollableChannel extends MessageChannel { Message receive(); Message receive(long timeout); } ``` 与发送方法一样,当接收到消息时,在超时或中断的情况下,返回值为 null。 ##### `SubscribableChannel` `SubscribableChannel`基本接口是通过将消息直接发送到其订阅的`MessageHandler`实例的通道来实现的。因此,它们不提供用于轮询的接收方法。相反,他们定义了管理这些订阅者的方法。下面的清单显示了`SubscribableChannel`接口的定义: ``` public interface SubscribableChannel extends MessageChannel { boolean subscribe(MessageHandler handler); boolean unsubscribe(MessageHandler handler); } ``` #### 消息通道实现 Spring 集成提供了几种不同的消息通道实现方式。下面几节简要介绍每一种方法。 ##### `PublishSubscribeChannel` `PublishSubscribeChannel`实现将发送给它的任何`Message`广播给其所有订阅的处理程序。这通常用于发送事件消息,其主要作用是通知(与文档消息相反,文档消息通常由单个处理程序处理)。请注意,`PublishSubscribeChannel`仅用于发送。由于它在调用`send(Message)`方法时直接向订阅者广播,因此消费者无法轮询消息(它没有实现`PollableChannel`,因此没有`receive()`方法)。相反,任何订阅服务器本身必须是`MessageHandler`,并且依次调用订阅服务器的`handleMessage(Message)`方法。 在版本 3.0 之前,在没有订阅服务器的`PublishSubscribeChannel`上调用`send`方法返回`false`。当与`MessagingTemplate`一起使用时,将抛出`MessageDeliveryException`。从版本 3.0 开始,行为发生了变化,如果至少存在最小的订阅服务器(并成功处理消息),则`send`总是被认为是成功的。可以通过设置`minSubscribers`属性来修改此行为,该属性的默认值为`0`。 | |如果使用`TaskExecutor`,则仅使用存在正确数量的订阅服务器来进行此确定,因为消息的实际处理是异步执行的。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ##### `QueueChannel` `QueueChannel`实现封装了一个队列。与`PublishSubscribeChannel`不同,`QueueChannel`具有点对点语义。换句话说,即使该通道有多个消费者,其中只有一个应该接收到发送到该通道的任何`Message`。它提供了一个默认的无参数构造函数(提供了一个基本上是无界的`Integer.MAX_VALUE`的容量),以及一个接受队列容量的构造函数,如下面的清单所示: ``` public QueueChannel(int capacity) ``` 未达到容量限制的通道将消息存储在其内部队列中,并且`send(Message)`方法立即返回,即使没有接收者准备好处理该消息。如果队列已达到容量,发送方将阻塞该队列,直到该队列中有可用的空间为止。或者,如果使用具有附加超时参数的 send 方法,则队列将阻塞,直到可用的房间或超时周期(以先发生的为准)过去为止。类似地,如果队列上有消息可用,则`receive()`调用将立即返回,但是,如果队列是空的,则接收调用可能会阻塞,直到消息可用或超时(如果提供了超时)为止。在这两种情况下,都可以通过传递 0 的超时值来强制立即返回,而不管队列的状态如何。但是,请注意,这将无限期地调用`send()`和`receive()`参数块的版本。 ##### `PriorityChannel` 虽然`QueueChannel`强制执行先进先出排序,但`PriorityChannel`是一种替代实现,它允许基于优先级在通道内对消息进行排序。默认情况下,优先级由每个消息中的`priority`头决定。但是,对于自定义优先级确定逻辑,可以向`Comparator>`构造函数提供类型`PriorityChannel`的比较器。 ##### `RendezvousChannel` `RendezvousChannel`启用了一个“直接切换”场景,其中,发送方会阻塞,直到另一方调用该通道的`receive()`方法。另一方屏蔽,直到发送方发送消息。在内部,这种实现与`QueueChannel`非常相似,只是它使用了`SynchronousQueue`(`BlockingQueue`的零容量实现)。这在发送方和接收方在不同线程中操作的情况下很好地工作,但是在队列中异步丢弃消息是不合适的。换句话说,对于`RendezvousChannel`,发送者知道某些接收者已经接受了该消息,而对于`QueueChannel`,该消息将被存储到内部队列中,并且可能永远不会被接收。 | |请记住,所有这些基于队列的通道在默认情况下仅在内存中存储消息。
当需要持久性时,你可以在“queue”元素中提供一个“message-store”属性来引用一个持久的`MessageStore`实现,或者你可以用一个持久代理支持的本地通道替换该本地通道,如 JMS 支持的通道或通道适配器,
后一种选项允许你利用任何 JMS 提供者的消息持久性实现,如[JMS 支持](./jms.html#jms)中所讨论的,
但是,当不需要在队列中进行缓冲时,最简单的方法是依赖`DirectChannel`,在下一节中讨论。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| `RendezvousChannel`对于实现请求-回复操作也很有用。发送方可以创建`RendezvousChannel`的临时匿名实例,然后在构建`Message`时将其设置为“replychannel”头。在发送`Message`之后,发送者可以立即调用`receive`(可选地提供超时值),以便在等待答复`Message`时阻塞。这与 Spring 集成的许多请求-应答组件内部使用的实现非常相似。 ##### `DirectChannel` `DirectChannel`具有点对点语义,但在其他方面比前面描述的任何基于队列的通道实现更类似于`PublishSubscribeChannel`。它实现了`SubscribableChannel`接口,而不是`PollableChannel`接口,因此它直接将消息分派给订阅服务器。然而,作为点对点信道,它与`PublishSubscribeChannel`的不同之处在于,它将每个`Message`发送到一个订阅的`MessageHandler`。 除了是最简单的点对点通道选项外,它最重要的特性之一是,它使单个线程能够在通道的“两边”执行操作。例如,如果处理程序订阅了`DirectChannel`,那么在`send()`方法调用返回之前,向该通道发送`Message`将触发该处理程序的`handleMessage(Message)`方法的调用。 提供具有这种行为的通道实现的主要动机是支持必须跨越通道的事务,同时仍然受益于通道提供的抽象和松耦合。如果发送调用是在事务的范围内调用的,那么处理程序调用的结果(例如,更新数据库记录)将在确定该事务的最终结果(提交或回滚)中起到一定的作用。 | |由于`DirectChannel`是最简单的选项,并且不会增加调度和管理 Poller 线程所需的任何额外开销,因此它是 Spring 集成中的默认通道类型,
的一般思想是为应用程序定义通道,考虑其中哪些需要提供缓冲或限制输入,并将其修改为基于队列的`PollableChannels`,
同样,如果一个频道需要广播消息,它不应该是`DirectChannel`,而应该是`PublishSubscribeChannel`,
,稍后,我们展示了如何配置这些通道中的每一个。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| `DirectChannel`在内部委托给消息调度器以调用其订阅的消息处理程序,并且该调度器可以具有由`load-balancer`或`load-balancer-ref`属性(互斥)公开的负载平衡策略。消息调度器使用负载平衡策略来帮助确定当多个消息处理程序订阅同一通道时消息如何在消息处理程序之间分配。为了方便起见,`load-balancer`属性公开指向`LoadBalancingStrategy`的预先存在的实现的值的枚举。唯一可用的值是`round-robin`(旋转中处理程序之间的负载平衡)和`none`(对于希望显式禁用负载平衡的情况)。在将来的版本中可能会添加其他策略实现。然而,自版本 3.0 以来,你可以提供自己的`LoadBalancingStrategy`实现,并通过使用`load-balancer-ref`属性注入它,该属性应该指向实现`LoadBalancingStrategy`的 Bean,如下例所示: a`FixedSubscriberChannel`是一个`SubscribableChannel`只支持一个不能取消订阅的`MessageHandler`订阅服务器的`SubscribableChannel`。当不涉及其他订阅服务器且不需要通道拦截器时,这对于高吞吐量性能用例非常有用。 ``` ``` 注意,`load-balancer`和`load-balancer-ref`属性是互斥的。 负载平衡还与布尔`failover`属性一起工作。如果`failover`值为真(默认值),则当前面的处理程序抛出异常时,Dispatcher 将返回到任何后续的处理程序(根据需要)。顺序是由在处理程序本身上定义的可选订单值确定的,或者,如果不存在这样的值,则由处理程序订阅的顺序确定。 如果在特定的情况下,要求调度器总是尝试调用第一个处理程序,然后在每次发生错误时以相同的固定顺序返回,则不应提供负载平衡策略。换句话说,即使没有启用负载平衡,Dispatcher 仍然支持`failover`布尔属性。然而,在没有负载平衡的情况下,处理程序的调用总是根据它们的顺序从第一个开始。例如,当对小学、中学、大学等有一个明确的定义时,这种方法很有效。当使用名称空间支持时,任一端点上的`order`属性决定顺序。 | |请记住,负载平衡和`failover`仅在通道具有多个订阅消息处理程序时才适用。
在使用名称空间支持时,这意味着多个端点共享在`input-channel`属性中定义的相同通道引用。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 从版本 5.2 开始,当`failover`为真时,当前处理程序的失败以及失败的消息将分别记录在`debug`或`info`(如果已分别配置)下。 ##### `ExecutorChannel` `ExecutorChannel`是一个点对点通道,支持与`DirectChannel`相同的 Dispatcher 配置(负载平衡策略和`failover`布尔属性)。这两种调度通道类型之间的主要区别是,`ExecutorChannel`委托给`TaskExecutor`的一个实例来执行调度。这意味着发送方法通常不会阻塞,但也意味着处理程序调用可能不会发生在发送方的线程中。因此,它不支持跨越发送者和接收处理程序的事务。 | |例如,当使用带有抑制客户机的拒绝策略(例如`ThreadPoolExecutor.CallerRunsPolicy`)的`TaskExecutor`时,发送方有时会阻塞.,在线程池达到最大容量且执行者的工作队列已满的任何时候,发送者的线程都可以执行该方法。
由于这种情况只会以不可预测的方式发生,因此不应依赖它进行事务。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ##### `FluxMessageChannel` `FluxMessageChannel`是`org.reactivestreams.Publisher`实现的`org.reactivestreams.Publisher`,用于将消息发送到内部`reactor.core.publisher.Flux`,以供下游的响应订阅者按需消费。该通道实现既不是`SubscribableChannel`,也不是`PollableChannel`,因此只有`org.reactivestreams.Subscriber`实例可以用于从该通道消耗具有背压特性的反应流。另一方面,`FluxMessageChannel`实现了`ReactiveStreamsSubscribableChannel`及其`subscribeTo(Publisher>)`契约,允许接收来自反应源发布者的事件,将反应流连接到集成流。为了在整个集成流程中实现完全无反应的行为,必须在流程中的所有端点之间放置这样的通道。 有关与反应流的交互的更多信息,请参见[反应流支持](./reactive-streams.html#reactive-streams)。 ##### 作用域信道 Spring Integration1.0 提供了`ThreadLocalChannel`的实现,但自 2.0 起该实现已被移除。现在,处理相同需求的更一般的方法是将`scope`属性添加到通道中。属性的值可以是上下文中可用的作用域的名称。例如,在 Web 环境中,某些作用域是可用的,任何自定义作用域实现都可以在上下文中注册。下面的示例显示了应用于通道的线程本地作用域,包括作用域本身的注册: ``` ``` 在前面的示例中定义的通道也在内部委托给一个队列,但是该通道绑定到当前线程,因此队列的内容也是类似地绑定的。这样,发送到该通道的线程以后可以接收这些相同的消息,但没有其他线程能够访问它们。虽然很少需要线程范围的通道,但在`DirectChannel`实例被用来强制执行单个线程操作但任何回复消息都应该发送到“终端”通道的情况下,它们可能是有用的。如果该终端通道是线程范围的,则原始发送线程可以从终端通道收集其回复。 现在,由于任何通道都可以被作用域定义,所以除了线程本地之外,你还可以定义自己的作用域。 #### 信道拦截器 消息传递体系结构的优势之一是能够提供公共行为,并以非侵入性的方式捕获有关通过系统的消息的有意义的信息。由于`Message`实例被发送到`MessageChannel`实例并从`MessageChannel`实例接收,这些通道提供了截获发送和接收操作的机会。下面的清单中显示了`ChannelInterceptor`策略接口,它为每个操作提供了方法: ``` public interface ChannelInterceptor { Message preSend(Message message, MessageChannel channel); void postSend(Message message, MessageChannel channel, boolean sent); void afterSendCompletion(Message message, MessageChannel channel, boolean sent, Exception ex); boolean preReceive(MessageChannel channel); Message postReceive(Message message, MessageChannel channel); void afterReceiveCompletion(Message message, MessageChannel channel, Exception ex); } ``` 在实现接口之后,使用通道注册拦截器只需要进行以下调用: ``` channel.addInterceptor(someChannelInterceptor); ``` 返回`Message`实例的方法可以用于转换`Message`,也可以返回’null’以阻止进一步的处理(当然,任何方法都可以抛出`RuntimeException`)。同样,`preReceive`方法可以返回`false`以阻止接收操作继续进行。 | |请记住,`receive()`调用仅与`PollableChannels`相关,
实际上,`SubscribableChannel`接口甚至没有定义`receive()`方法。
的原因是,当`Message`被发送到`SubscribableChannel`时,它被直接发送到零个或多个订阅者,这取决于信道的类型(例如,
a`PublishSubscribeChannel`发送到其所有订阅者)。因此,只有当拦截器应用到
时,才调用`preReceive(…​)`、`afterReceiveCompletion(…​)`拦截器方法。| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| Spring 集成还提供了[Wire Tap](https://www.enterpriseintegrationpatterns.com/WireTap.html)模式的实现方式。这是一个简单的拦截器,它在不改变现有流的情况下将`Message`发送到另一个信道。它对于调试和监视非常有用。一个例子如[Wire Tap](#channel-wiretap)所示。 因为很少需要实现所有的拦截器方法,所以接口提供了 no-op 方法(返回`void`方法没有代码,`Message`-返回方法返回`Message`为-is,而`boolean`方法返回`true`)。 | |拦截器方法的调用顺序取决于信道的类型。
如前所述,基于队列的信道是接收方法首先被拦截的唯一信道,
此外,发送和接收拦截之间的关系取决于单独的发送者和接收者线程的时间安排。
例如,如果接收者在等待消息时已经被阻止,则顺序可以如下:`preSend`,`preReceive`,`postReceive`,`postSend`但是,
,如果接收方在发送方在该通道上放置了消息并已返回之后进行轮询,则顺序如下:`preSend`,`postSend`(some-time-elapses),`preReceive`,`postReceive`。
在这种情况下经过的时间取决于许多因素,因此通常是不可预测的(实际上,接收可能永远不会发生)。
队列的类型也起到一定的作用(例如,会合与优先级)。,简而言之,
,除了`preSend`在`postSend`和`preReceive`在`postReceive`之前这一事实外,你不能依赖该顺序。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 从 Spring Framework4.1 和 Spring Integration4.1 开始,`ChannelInterceptor`提供了新的方法:`afterSendCompletion()`和`afterReceiveCompletion()`。它们是在`send()' and 'receive()`调用后调用的,而不考虑引发的允许资源清理的任何异常。请注意,通道在`ChannelInterceptor`列表中以与初始值`preSend()`和`preReceive()`调用相反的顺序调用这些方法。 从版本 5.1 开始,全局信道拦截器现在应用于动态注册的信道-例如,当使用 爪哇 DSL 时,通过使用`beanFactory.initializeBean()`或`IntegrationFlowContext`初始化的 bean。以前,在刷新应用程序上下文后创建 bean 时,不会应用拦截器。 此外,从版本 5.1 开始,当没有接收到消息时,将不再调用`ChannelInterceptor.postReceive()`;不再需要检查`null``Message`。以前,这种方法被称为。如果你的拦截器依赖于前面的行为,那么可以实现`afterReceiveCompleted()`,因为无论是否接收到消息,该方法都会被调用。 | |从版本 5.2 开始, Spring 消息传递模块中的`ChannelInterceptorAware`被弃用,而支持`InterceptableChannel`,它现在对其进行了扩展,以实现向后兼容。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### `MessagingTemplate` Spring 当引入端点及其各种配置选项时,集成为消息传递组件提供了一个基础,该组件允许从消息传递系统非侵入性地调用你的应用程序代码。但是,有时需要从应用程序代码中调用消息传递系统。 Spring 在实现这样的用例时,为了方便起见,集成提供了一个`MessagingTemplate`,它支持跨消息通道的各种操作,包括请求和应答场景。例如,可以发送请求并等待答复,如下所示: ``` MessagingTemplate template = new MessagingTemplate(); Message reply = template.sendAndReceive(someChannel, new GenericMessage("test")); ``` 在前面的示例中,模板将在内部创建一个临时匿名通道。模板上也可以设置“sendtimeout”和“receiveTimeout”属性,还支持其他交换类型。下面的清单显示了这些方法的签名: ``` public boolean send(final MessageChannel channel, final Message message) { ... } public Message sendAndReceive(final MessageChannel channel, final Message request) { ... } public Message receive(final PollableChannel channel) { ... } ``` | |在[Enter the`GatewayProxyFactoryBean`](./gateway.html#gateway-proxy)中描述了一种侵入性较小的方法,它允许你调用带有有效负载或头值的简单接口,而不是`Message`实例。| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### 配置消息通道 要创建消息通道实例,可以使用``元素表示 XML,也可以使用`DirectChannel`实例表示 爪哇 配置,如下所示: 爪哇 ``` @Bean public MessageChannel exampleChannel() { return new DirectChannel(); } ``` XML ``` ``` 当使用不带任何子元素的``元素时,它将创建一个`DirectChannel`实例(a`SubscribableChannel`)。 要创建发布-订阅通道,请使用``元素(在 爪哇 中是`PublishSubscribeChannel`),如下所示: 爪哇 ``` @Bean public MessageChannel exampleChannel() { return new PublishSubscribeChannel(); } ``` XML ``` ``` 你可以替代地提供各种``子元素来创建任何可对应信道类型(如[消息通道实现](#channel-implementations)中所描述的)。下面的部分展示了每种通道类型的示例。 ##### `DirectChannel`配置 如前所述,`DirectChannel`是默认类型。下面的清单显示了谁来定义一个: 爪哇 ``` @Bean public MessageChannel directChannel() { return new DirectChannel(); } ``` XML ``` ``` 默认通道具有循环负载均衡器,并且还启用了故障转移(有关更多详细信息,请参见[`DirectChannel`](#channel-implementations-directchannel))。要禁用其中的一个或两个,请添加``子元素(`LoadBalancingStrategy``DirectChannel`的构造函数)并按以下方式配置属性: 爪哇 ``` @Bean public MessageChannel failFastChannel() { DirectChannel channel = new DirectChannel(); channel.setFailover(false); return channel; } @Bean public MessageChannel failFastChannel() { return new DirectChannel(null); } ``` XML ``` ``` ##### 数据类型通道配置 有时,使用者只能处理特定类型的有效负载,这迫使你确保输入消息的有效负载类型。首先想到的可能是使用消息过滤器。然而,消息过滤器所能做的就是过滤掉不符合使用者需求的消息。另一种方法是使用基于内容的路由器,将具有不兼容数据类型的消息路由到特定的转换器,以强制转换和转换到所需的数据类型。这可能行得通,但要实现同样的事情,一种更简单的方法是应用[数据类型通道](https://www.enterpriseintegrationpatterns.com/DatatypeChannel.html)模式。对于每个特定的有效负载数据类型,可以使用单独的数据类型通道。 要创建只接受包含特定有效负载类型的消息的数据类型通道,请在通道元素的`datatype`属性中提供数据类型的完全限定类名称,如下例所示: 爪哇 ``` @Bean public MessageChannel numberChannel() { DirectChannel channel = new DirectChannel(); channel.setDatatypes(Number.class); return channel; } ``` XML ``` ``` 请注意,类型检查传递的是可分配给通道数据类型的任何类型。换句话说,前面示例中的`numberChannel`将接受有效负载为`java.lang.Integer`或`java.lang.Double`的消息。可以以逗号分隔的列表的形式提供多个类型,如下例所示: 爪哇 ``` @Bean public MessageChannel numberChannel() { DirectChannel channel = new DirectChannel(); channel.setDatatypes(String.class, Number.class); return channel; } ``` XML ``` ``` 因此,前面示例中的“numberchannel”仅接受数据类型为`java.lang.Number`的消息。但是,如果消息的有效负载不是所需的类型,会发生什么情况?这取决于你是否定义了一个名为`integrationConversionService`的 Bean,它是 Spring 的[转换服务](https://docs.spring.io/spring/docs/current/spring-framework-reference/html/validation.html#core-convert-ConversionService-API)的实例。如果不是,则将立即抛出`Exception`。但是,如果你已经定义了`integrationConversionService` Bean,那么将使用它来尝试将消息的有效负载转换为可接受类型。 你甚至可以注册自定义转换器。例如,假设你向上面配置的“numberchannel”发送了一条带有`String`有效负载的消息。你可以按以下方式处理此消息: ``` MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class); inChannel.send(new GenericMessage("5")); ``` 通常情况下,这将是一个完全合法的操作。然而,由于我们使用了数据类型通道,这样的操作的结果将产生类似于以下的异常: ``` Exception in thread "main" org.springframework.integration.MessageDeliveryException: Channel 'numberChannel' expected one of the following datataypes [class java.lang.Number], but received [class java.lang.String] … ``` 出现异常是因为我们要求有效负载类型为`Number`,但是我们发送了`String`。因此,我们需要将`String`转换为`Number`。为此,我们可以实现类似于以下示例的转换器: ``` public static class StringToIntegerConverter implements Converter { public Integer convert(String source) { return Integer.parseInt(source); } } ``` 然后,我们可以将其注册为集成转换服务的转换器,如下例所示: 爪哇 ``` @Bean @IntegrationConverter public StringToIntegerConverter strToInt { return new StringToIntegerConverter(); } ``` XML ``` ``` 或者在`StringToIntegerConverter`类中,当它被标记为用于自动扫描的`@Component`注释时。 解析“转换器”元素时,如果一个元素尚未定义,它将创建`integrationConversionService` Bean。有了该转换器,`send`操作现在就会成功,因为数据类型通道使用该转换器将`String`有效负载转换为`Integer`。 有关有效载荷类型转换的更多信息,请参见[有效载荷类型转换](./endpoint.html#payload-type-conversion)。 从版本 4.0 开始,`integrationConversionService`由`DefaultDatatypeChannelMessageConverter`调用,后者在应用程序上下文中查找转换服务。要使用不同的转换技术,可以在通道上指定`message-converter`属性。这必须是对`MessageConverter`实现的引用。只使用`fromMessage`方法。它为转换器提供了对消息头的访问(如果转换可能需要来自消息头的信息,例如`content-type`)。该方法只能返回转换后的有效载荷或完整的`Message`对象。如果是后者,转换器必须小心地从入站消息中复制所有的头。 或者,你可以声明一个 ID 为`datatypeChannelMessageConverter`的``类型的`MessageConverter`,并且该转换器被所有具有`datatype`的通道使用。 ##### `QueueChannel`配置 要创建`QueueChannel`,请使用``子元素。你可以按以下方式指定频道的容量: 爪哇 ``` @Bean public PollableChannel queueChannel() { return new QueueChannel(25); } ``` XML ``` ``` | |如果你没有为这个``子元素上的“capacity”属性提供一个值,则生成的队列是无界的。
为了避免内存耗尽等问题,我们强烈建议你为有界队列设置一个显式的值。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ###### 持久`QueueChannel`配置 由于`QueueChannel`提供了缓冲消息的功能,但在默认情况下仅在内存中进行缓冲,因此它还引入了一种可能性,即在系统故障的情况下,消息可能会丢失。为了减轻这种风险,`QueueChannel`策略接口的持久实现可以支持`MessageGroupStore`策略。有关`MessageGroupStore`和`MessageStore`的更多详细信息,请参见[消息存储](./message-store.html#message-store)。 | |当使用`message-store`属性时,不允许使用`capacity`属性。| |---|-----------------------------------------------------------------------------------| 当`QueueChannel`接收到`Message`时,它将消息添加到消息存储中。当从`QueueChannel`对`Message`进行轮询时,它将从消息存储中删除。 默认情况下,`QueueChannel`将其消息存储在内存队列中,这可能导致前面提到的消息丢失场景。然而, Spring 集成提供了持久性存储,例如`JdbcChannelMessageStore`。 通过添加`message-store`属性,可以为任何`QueueChannel`配置消息存储,如下例所示: ``` ``` (关于 爪哇/ Kotlin 配置选项,请参见下面的示例) Spring 集成 JDBC 模块还为许多流行的数据库提供了模式数据定义语言(DDL)。这些模式位于该模块(`spring-integration-jdbc`)的 org.springframework.integration.jdbc.store.channel 包中。 | |一个重要的特性是,对于任何事务持久存储(例如`JdbcChannelMessageStore`),只要 poller 配置了一个事务,从存储中删除的消息只有在事务成功完成的情况下才能被永久删除,
否则事务将回滚,而`Message`并没有丢失。| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 随着越来越多的与“NoSQL”数据存储相关的 Spring 项目开始为这些存储提供底层支持,消息存储的许多其他实现都是可用的。如果找不到满足你特定需求的接口,你还可以提供你自己的`MessageGroupStore`接口实现。 自版本 4.0 以来,如果可能的话,我们建议将`QueueChannel`实例配置为使用`ChannelMessageStore`。与一般的消息存储相比,这些通常针对这种使用进行了优化。如果`ChannelMessageStore`是`ChannelPriorityMessageStore`,则在优先顺序内以 FIFO 方式接收消息。优先级的概念由消息存储实现确定。例如,下面的示例显示了[MongoDB 通道消息存储](./mongodb.html#mongodb-priority-channel-message-store)的 爪哇 配置: 爪哇 ``` @Bean public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) { MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory); store.setPriorityEnabled(true); return store; } @Bean public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) { return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue")); } ``` 爪哇 DSL ``` @Bean public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) { return IntegrationFlows.from((Channels c) -> c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup")) .... .get(); } ``` Kotlin DSL ``` @Bean fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) = integrationFlow { channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") } } ``` | |注意`MessageGroupQueue`class.
这是一个`BlockingQueue`实现来使用`MessageGroupStore`的操作。| |---|---------------------------------------------------------------------------------------------------------------------------------------| 定制`QueueChannel`环境的另一个选项是由``子元素的`ref`属性或其特定的构造函数提供的。此属性提供对任何`java.util.Queue`实现的引用。例如,分布式的 Hazelcast[`IQueue`](https://hazelcast.com/use-cases/imdg/imdg-messaging/)可以配置如下: ``` @Bean public HazelcastInstance hazelcastInstance() { return Hazelcast.newHazelcastInstance(new Config() .setProperty("hazelcast.logging.type", "log4j")); } @Bean public PollableChannel distributedQueue() { return new QueueChannel(hazelcastInstance() .getQueue("springIntegrationQueue")); } ``` ##### `PublishSubscribeChannel`配置 要创建`PublishSubscribeChannel`,请使用 \元素。当使用这个元素时,你还可以指定用于发布消息的`task-executor`(如果没有指定消息,它将在发送方的线程中发布),如下所示: 爪哇 ``` @Bean public MessageChannel pubsubChannel() { return new PublishSubscribeChannel(someExecutor()); } ``` XML ``` ``` 如果在`PublishSubscribeChannel`的下游提供一个重序列器或聚合器,则可以将通道上的’apply-sequence’属性设置为`true`。这样做表明通道在传递消息之前应该设置`sequence-size`和`sequence-number`消息头以及相关 ID。例如,如果有五个订阅服务器,`sequence-size`将被设置为`5`,并且消息将具有`sequence-number`标头值,范围从`1`到`5`。 除了`Executor`之外,还可以配置`ErrorHandler`。默认情况下,`PublishSubscribeChannel`使用`MessagePublishingErrorHandler`实现从`errorChannel`头发送一个错误到`MessageChannel`或全局`errorChannel`实例。如果不配置`Executor`,则忽略`ErrorHandler`,并将异常直接抛出到调用者的线程。 如果在`PublishSubscribeChannel`的下游提供`Resequencer`或`Aggregator`,则可以将通道上的’apply-sequence’属性设置为`true`。这样做表明,在传递消息之前,通道应该设置序列大小和序列号消息头以及相关 ID。例如,如果有五个订阅服务器,序列大小将设置为`5`,并且消息将具有从`1`到`5`的序列号标头值。 下面的示例展示了如何将`apply-sequence`标头设置为`true`: 爪哇 ``` @Bean public MessageChannel pubsubChannel() { PublishSubscribeChannel channel = new PublishSubscribeChannel(); channel.setApplySequence(false); return channel; } ``` XML ``` ``` | |默认情况下,`apply-sequence`值是`false`,这样,发布-订阅通道就可以向多个出站通道发送完全相同的消息实例。,
自 Spring 集成以来,当标记设置为`true`时,强制执行有效负载和头引用的不可变性,该通道创建新的`Message`实例,其有效负载引用相同,但标头值不同。| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 从版本 5.4.3 开始,`PublishSubscribeChannel`还可以配置其`requireSubscribers`选项的`requireSubscribers`,以指示此通道在没有订阅服务器时不会静默忽略消息。当没有订阅者时,将抛出带有`MessageDispatchingException`消息的`Dispatcher has no subscribers`消息,并将此选项设置为`true`。 ##### `ExecutorChannel` 要创建`ExecutorChannel`,请添加带有`task-executor`属性的``子元素。该属性的值可以引用上下文中的任何`TaskExecutor`。例如,这样做可以配置线程池,用于将消息分配给订阅的处理程序。如前所述,这样做会破坏发送方和接收方之间的单线程执行上下文,从而处理程序的调用不会共享任何活动事务上下文(即,处理程序可能抛出`Exception`,但`send`调用已成功返回)。下面的示例展示了如何使用`dispatcher`元素并在`task-executor`属性中指定一个执行器: 爪哇 ``` @Bean public MessageChannel executorChannel() { return new ExecutorChannel(someExecutor()); } ``` XML ``` ``` | |`load-balancer`和`failover`选项在 \子元素上也是可用的,如前面在[`DirectChannel`配置](#channel-configuration-directchannel)中所述。
同样的默认值也适用,因此,
,通道具有循环负载平衡策略,启用了故障转移,除非为其中一个或两个属性提供了显式配置,如以下示例所示:

```



```| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ##### `PriorityChannel`配置 要创建`PriorityChannel`,请使用``子元素,如下例所示: 爪哇 ``` @Bean public PollableChannel priorityChannel() { return new PriorityChannel(20); } ``` XML ``` ``` 默认情况下,通道查询消息的`priority`头。但是,你可以提供一个自定义的`Comparator`引用。另外,请注意`PriorityChannel`(与其他类型一样)确实支持`datatype`属性。与`QueueChannel`一样,它也支持`capacity`属性。下面的示例演示了所有这些: Java ``` @Bean public PollableChannel priorityChannel() { PriorityChannel channel = new PriorityChannel(20, widgetComparator()); channel.setDatatypes(example.Widget.class); return channel; } ``` XML ``` ``` 自版本 4.0 以来,`priority-channel`子元素支持`message-store`选项(在这种情况下不允许`comparator`和`capacity`)。消息存储必须是`PriorityCapableChannelMessageStore`。`PriorityCapableChannelMessageStore`的实现方式目前提供给`Redis`、`JDBC`和`MongoDB`。有关更多信息,请参见[`QueueChannel`Configuration]和[消息存储](./message-store.html#message-store)。你可以在[支持消息通道](./jdbc.html#jdbc-message-store-channels)中找到示例配置。 ##### `RendezvousChannel`配置 当 queue 子元素是``时,将创建`RendezvousChannel`。它没有为前面描述的那些提供任何额外的配置选项,并且它的队列不接受任何容量值,因为它是一个零容量直接切换队列。下面的示例展示了如何声明`RendezvousChannel`: Java ``` @Bean public PollableChannel rendezvousChannel() { return new RendezvousChannel(); } ``` XML ``` ``` ##### 作用域信道配置 任何通道都可以配置`scope`属性,如下例所示: ``` ``` ##### 信道拦截器配置 消息通道也可以具有拦截器,如[信道拦截器](#channel-interceptors)中所述。可以将``子元素添加到``(或更具体的元素类型)中。你可以提供`ref`属性来引用实现`ChannelInterceptor`接口的任何 Spring 托管对象,如下例所示: ``` ``` 通常,我们建议在单独的位置定义拦截器实现,因为它们通常提供可以跨多个通道重用的公共行为。 ##### 全局信道拦截器配置 通道拦截器为每个通道应用横切行为提供了一种简洁的方法。如果应该在多个信道上应用相同的行为,那么为每个信道配置相同的拦截器集合将不是最有效的方法。 Spring 为了避免重复配置,同时也使拦截器能够应用于多个信道,集成提供了全局拦截器。考虑以下两个例子: ``` ``` ``` ``` 每个``元素都允许你定义一个全局拦截器,它应用于所有与`pattern`属性定义的任何模式匹配的通道上。在前一种情况下,全局拦截器应用于“Thing1”通道和所有其他以“Thing2”或“Input”开头的通道,但不应用于以“Thing3”开头的通道(自版本 5.0 以来)。 | |将此语法添加到模式中会导致一个可能的(尽管可能不太可能)问题,
如果你有一个名为`!thing1`的 Bean 模式,并且你在通道拦截器的`!thing1`模式中包含了一个`pattern`模式,它不再匹配。
模式现在匹配所有未命名`thing1`的 bean。
在这种情况下,你可以在带有`\`的模式中转义`!`。
模式`\!thing1`匹配名为`!thing1`的 Bean。| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| Order 属性允许你在给定信道上有多个拦截器时管理此拦截器被注入的位置。例如,通道“InputChannel”可以在本地配置单独的拦截器(见下文),如下例所示: ``` ``` 一个合理的问题是“相对于本地配置的其他拦截器或通过其他全球拦截器定义配置的其他拦截器,如何注入一个全球拦截器?”当前的实现提供了一种定义拦截器执行顺序的简单机制。`order`属性中的正数确保在任何现有拦截器之后注入拦截器,而负数确保在现有拦截器之前注入拦截器。这意味着,在前面的示例中,全局拦截器是在本地配置的’wire-tap’拦截器之后注入的(因为其`order`大于`0`)。如果存在另一个匹配`pattern`的全局拦截器,其顺序将通过比较两个拦截器的`order`属性的值来确定。要在现有拦截器之前注入一个全局拦截器,请使用`order`属性的负值。 | |注意,`order`和`pattern`属性都是可选的。
`order`的默认值将为 0,而`pattern`的默认值为’\*’(以匹配所有通道)。| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ##### 丝锥 正如前面提到的, Spring 集成提供了一种简单的线分接拦截器。你可以在``元素内的任何通道上配置线控分接。这样做对于调试特别有用,并且可以与 Spring Integration 的日志通道适配器结合使用,如下所示: ``` ``` | |“logging-channel-adapter”还接受一个“expression”属性,这样你就可以根据“payload”和“headers”变量计算 SPEL 表达式。,
或者,要记录完整的消息`toString()`结果,请为“log-full-message”属性提供一个`true`的值,默认情况下,
,它是`false`,这样只记录有效负载。
将其设置为`true`,可以记录除有效负载之外的所有头。
“表达式”选项提供了最大的灵活性(例如,`expression="payload.user.name"`)。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 关于线导和其他类似组件([消息发布配置](./message-publishing.html#message-publishing-config))的一个常见误解是,它们本质上是自动异步的。默认情况下,线接作为一个组件不会被异步调用。 Spring 相反,集成关注于配置异步行为的单一统一方法:消息通道。使消息流的某些部分同步或异步的是在该流中配置的消息通道的类型。这是消息通道抽象的主要好处之一。从框架的一开始,我们就一直强调消息通道作为框架的一流公民的需求和价值。它不只是 EIP 模式的内在的、隐式的实现。它作为可配置组件完全公开给最终用户。因此,线抽头组件只负责执行以下任务: * 通过点击一个通道(例如`channelA`)来拦截消息流。 * 抓住每条消息 * 将消息发送到另一个通道(例如,`channelB`) 它本质上是桥模式的一种变体,但它被封装在一个通道定义中(因此更容易启用和禁用,而不会破坏流)。此外,与 Bridge 不同的是,它基本上是分叉另一个消息流。这个流是同步的还是异步的?答案取决于“ChannelB”是什么类型的消息通道。我们有以下几种选择:直接通道、可检索通道和执行器通道。后两种方法打破了线程边界,使得在这些通道上的通信是异步的,因为将消息从该通道发送到其订阅的处理程序的任务发生在不同的线程上,而不是用于将消息发送到该通道的线程上。这就是什么将使你的线接流同步或异步。它与框架中的其他组件(例如 Message Publisher)保持一致,并通过避免你提前担心(除了编写线程安全代码)特定代码块应该实现为同步还是异步,从而增加了一定程度的一致性和简单性。两段代码(例如,组件 A 和组件 B)在消息通道上的实际连接使它们的协作是同步的或异步的。将来,你甚至可能想要从同步转换为异步,而 Message Channel 允许你在不接触代码的情况下快速地执行此操作。 关于线接的最后一点是,尽管上面提供了默认情况下不是异步的理由,但你应该记住,通常希望尽快传递消息。因此,使用异步通道选项作为线控分路器的出站通道将是非常常见的。然而,在默认情况下,我们并不强制执行异步行为。如果我们这样做的话,有许多用例将会被打破,包括你可能不希望打破事务边界。也许你出于审计目的而使用了线接模式,并且你确实希望在原始事务中发送审计消息。例如,你可以将线接连接到一个 JMS 出站通道适配器。这样,你就能两全其美:1)JMS 消息的发送可能发生在事务中,而 2)它仍然是一个“触发并忘记”操作,从而防止主消息流中出现任何明显的延迟。 | |从 4.0 版本开始,当拦截器(例如[`WireTap`类](https://DOCS. Spring.io/autorepo/DOCS/ Spring-integration/current/api/org/springframework/integration/channel/interceptor/wiretap.html)引用信道时,避免循环引用是很重要的。
你需要从当前拦截器拦截的信道中排除此类信道。
如果你有自定义的
,可以使用适当的模式或编程地完成此操作。如果你有一个自定义的你还可以在拦截器方法中添加运行时保护,以确保信道不是拦截器引用的信道。
`WireTap`使用了这两种技术。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 从版本 4.3 开始,`WireTap`有额外的构造函数,它们接受`channelName`而不是`MessageChannel`实例。这对于 Java 配置和使用通道自动创建逻辑时都很方便。目标`MessageChannel` Bean 是从提供的`channelName`以后在与拦截器的第一次交互中解析出来的。 | |通道解析需要`BeanFactory`,因此有线分接实例必须是 Spring 管理的 Bean。| |---|----------------------------------------------------------------------------------------------------| 这种后期绑定方法还允许使用 Java DSL 配置简化典型的接线模式,如下例所示: ``` @Bean public PollableChannel myChannel() { return MessageChannels.queue() .wireTap("loggingFlow.input") .get(); } @Bean public IntegrationFlow loggingFlow() { return f -> f.log(); } ``` ##### 条件式电线分接 通过使用`selector`或`selector-expression`属性,可以使电线分接成为有条件的。`selector`引用了`MessageSelector` Bean,这可以在运行时确定消息是否应该转到 TAP 通道。类似地,`selector-expression`是一个执行相同目的的布尔 SPEL 表达式:如果表达式的计算结果为`true`,则消息将被发送到 TAP 通道。 ##### 全局线接接头配置 作为[全局信道拦截器配置](#global-channel-configuration-interceptors)的特殊情况,可以配置全局线接。为此,配置一个顶级`wire-tap`元素。现在,除了正常的`wire-tap`名称空间支持外,`pattern`和`order`属性也得到了支持,并且它们的工作方式与`channel-interceptor`完全相同。下面的示例展示了如何配置全局线接: Java ``` @Bean @GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3) public WireTap wireTap(MessageChannel wiretapChannel) { return new WireTap(wiretapChannel); } ``` XML ``` ``` | |全局线接提供了一种方便的方式,可以在不修改现有通道配置的情况下,在外部配置单通道线接。
要这样做,请将`pattern`属性设置为目标通道名。
例如,你可以使用此技术来配置测试用例,以验证通道上的消息。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### 特殊频道 默认情况下,在应用程序上下文中定义了两个特殊通道:`errorChannel`和`nullChannel`。“nullchannel”(`NullChannel`的实例)的作用类似于`/dev/null`,记录在`DEBUG`级别发送到它的任何消息并立即返回。对`org.reactivestreams.Publisher`发送的消息的有效负载应用了特殊处理:它在此信道中被立即订阅,以启动反应流处理,尽管数据被丢弃。从反应流处理中抛出的错误(参见`Subscriber.onError(Throwable)`)将记录在警告级别下,以进行可能的调查。如果需要对这样的错误做任何事情,则`[ReactiveRequestHandlerAdvice](./handler-advice.html#reactive-advice)`具有`Mono.doOnError()`自定义的消息处理程序可以应用于产生`Mono`的消息处理程序,对此`nullChannel`进行回复。当你遇到不关心的通道解析错误时,你可以将受影响组件的`output-channel`属性设置为“nullchannel”(名称“nullchannel”在应用程序上下文中保留)。 “errorchannel”在内部用于发送错误消息,并且可能会被自定义配置覆盖。这在[错误处理](./error-handling.html#error-handling)中有更详细的讨论。 有关消息通道和拦截器的更多信息,请参见 Java DSL 章节中的[消息通道](./dsl.html#java-dsl-channels)。 ### poller 本节描述了 Spring 集成中轮询的工作方式。 #### 轮询消费者 当消息端点(通道适配器)连接到通道并进行实例化时,它们会产生以下实例之一: * [`PollingConsumer`](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/endpoint/pollingconsumer.html) * [`EventDrivenConsumer`](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/endpoint/eventdrivenconsumer.html) 实际的实现取决于这些端点连接到的通道类型。与实现[`org.springframework.messaging.SubscribableChannel`](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/index.html?org/springframework/messing/subscribablechannel.html)接口的通道适配器产生`EventDrivenConsumer`的实例。另一方面,与实现了[`org.springframework.messaging.PollableChannel`](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/index.html?org/springframework/messaging/pollablechannel.html)接口的通道适配器(例如`QueueChannel`)产生了`PollingConsumer`的实例。 轮询消费者让 Spring 集成组件主动轮询消息,而不是以事件驱动的方式处理消息。 在许多消息传递场景中,它们代表了一个关键的交叉问题。在 Spring 集成中,轮询消费者是基于具有相同名称的模式,这在 Gregor Hohpe 和 Bobby Woolf 的书*Enterprise 整合模式*中进行了描述。你可以在[Book 的网站](https://www.enterpriseintegrationpatterns.com/PollingConsumer.html)上找到模式的描述。 #### 可选消息源 Spring 集成提供了轮询消费者模式的第二种变化。当使用入站通道适配器时,这些适配器通常用`SourcePollingChannelAdapter`包装。例如,当从远程 FTP 服务器位置检索消息时,[FTP 入站通道适配器](./ftp.html#ftp-inbound)中描述的适配器配置了一个 poller,以定期检索消息。因此,当组件配置为 Poller 时,生成的实例属于以下类型之一: * [`PollingConsumer`](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/endpoint/pollingconsumer.html) * [`SourcePollingChannelAdapter`](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/endpoint/sourcepollingchanneladapter.html) 这意味着 Poller 在入站和出站消息传递场景中都会使用。以下是使用 Poller 的一些用例: * 轮询某些外部系统,例如 FTP 服务器、数据库和 Web 服务 * 轮询内部(可匹配)消息通道 * 轮询内部服务(例如在 Java 类上重复执行方法) | |AOP 建议类可以应用于 Pollers,在`advice-chain`中,例如事务通知来启动事务。,
从版本 4.1 开始,提供了`PollSkipAdvice`。
投票者使用触发器来确定下一次投票的时间。
`PollSkipAdvice`可以用来抑制(跳过)投票,这可能是因为存在一些下游条件,这将阻止消息被处理。
使用此建议,你必须为它提供一个`PollSkipStrategy`的实现。
从版本 4.2.5 开始,提供了一个`SimplePollSkipStrategy`,
要使用它,你可以将一个实例作为 Bean 添加到应用程序上下文中,将其注入到`PollSkipAdvice`中,并将其添加到 Poller 的建议链中。
要跳过轮询,请调用`skipPolls()`。
恢复轮询,请调用`reset()`。
4.2 版本在该区域增加了更多的灵活性。
参见[消息源的条件 Poller](#conditional-pollers)。| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 本章仅对轮询消费者以及他们如何适应消息通道(参见[消息通道](./channel.html#channel))和通道适配器(参见[通道适配器](./channel-adapter.html#channel-adapter))的概念进行了高级概述。有关一般消息传递端点和特别是轮询消费者的更多信息,请参见[消息端点](./endpoint.html#endpoint)。 #### 延迟确认可收集消息源 从版本 5.0.1 开始,某些模块提供`MessageSource`实现,支持延迟确认,直到下游流完成(或将消息传递给另一个线程)。这目前仅限于`AmqpMessageSource`和`KafkaMessageSource`。 有了这些消息源,`IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK`报头(参见[`MessageHeaderAccessor`api](./message.html#message-header-accessor))被添加到消息中。当与可收集的消息源一起使用时,头的值是`AcknowledgmentCallback`的实例,如下例所示: ``` @FunctionalInterface public interface AcknowledgmentCallback { void acknowledge(Status status); boolean isAcknowledged(); void noAutoAck(); default boolean isAutoAck(); enum Status { /** * Mark the message as accepted. */ ACCEPT, /** * Mark the message as rejected. */ REJECT, /** * Reject the message and requeue so that it will be redelivered. */ REQUEUE } } ``` 并非所有消息源(例如,a`KafkaMessageSource`)都支持`REJECT`状态。其处理方法与`ACCEPT`相同。 应用程序可以随时确认消息,如下例所示: ``` Message received = source.receive(); ... StaticMessageHeaderAccessor.getAcknowledgmentCallback(received) .acknowledge(Status.ACCEPT); ``` 如果`MessageSource`被连接到`SourcePollingChannelAdapter`,那么当下游流程完成后,Poller 线程返回适配器时,适配器将检查确认是否已经被确认,如果没有,将其状态设置为`ACCEPT`it(如果流抛出异常,则设置`REJECT`)。状态值是在[`AcknowledgmentCallback.Status`枚举](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/support/concredgmentcallback.status.html)中定义的。 Spring 集成提供了`MessageSourcePollingTemplate`来执行对`MessageSource`的临时轮询。这也需要在`MessageHandler`回调返回(或抛出异常)时在`AcknowledgmentCallback`上设置`ACCEPT`或`REJECT`。下面的示例展示了如何使用`MessageSourcePollingTemplate`进行轮询: ``` MessageSourcePollingTemplate template = new MessageSourcePollingTemplate(this.source); template.poll(h -> { ... }); ``` 在这两种情况下(`SourcePollingChannelAdapter`和`MessageSourcePollingTemplate`),都可以通过在回调中调用`noAutoAck()`来禁用 auto ack/nack。如果你将消息传递给另一个线程并希望稍后确认,那么你可能会这样做。并不是所有的实现都支持这一点(例如,Apache Kafka 不支持这一点,因为偏移提交必须在同一个线程上执行)。 #### 消息源的条件 Pollers 本节介绍如何使用条件 Pollers。 ##### 背景 `Advice`对象,在 poller 上的`advice-chain`中,为整个轮询任务(消息检索和处理)提供建议。这些“周围建议”方法无法访问投票的任何上下文——只有投票本身。这对于需求很好,比如执行事务任务或由于某些外部条件而跳过轮询,正如前面讨论的那样。如果我们希望根据轮询的`receive`部分的结果采取某些操作,或者如果我们希望根据条件调整 Poller,该怎么办?在这些情况下, Spring 集成提供了“智能”轮询。 ##### “智能”民调 5.3 版引入了`ReceiveMessageAdvice`接口。(在`MessageSourceMutator`中,`AbstractMessageSourceAdvice`已被弃用,而支持`default`方法。)在`advice-chain`中实现此接口的任何`Advice`对象仅应用于接收操作-`MessageSource.receive()`和`PollableChannel.receive(timeout)`。因此,它们只能应用于`SourcePollingChannelAdapter`或`PollingConsumer`。这样的类实现了以下方法: * `beforeReceive(Object source)`在`Object.receive()`方法之前调用该方法。它允许你检查和重新配置源代码。返回`false`将取消此轮询(类似于前面提到的`PollSkipAdvice`)。 * `Message afterReceive(Message result, Object source)`该方法是在`receive()`方法之后调用的。同样,你可以重新配置源代码或采取任何操作(可能取决于结果,如果源代码没有创建消息,结果可能是`null`)。你甚至可以返回一条不同的消息。 | |线程安全

如果一个建议会使结果发生变异,那么你不应该使用`TaskExecutor`来配置 poller。
如果一个建议会使源发生变异,那么这种变异就不是线程安全的,可能会导致意外的结果,特别是对于高频的 poller,
如果你需要同时处理轮询结果,考虑使用下游`ExecutorChannel`,而不是向 poller 添加执行器。| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | |建议链排序

你应该了解在初始化期间如何处理建议链,`Advice`不实现`ReceiveMessageAdvice`的对象将应用于整个轮询过程,并且所有这些对象都将首先被调用,在任何`ReceiveMessageAdvice`.
之前,`ReceiveMessageAdvice`对象是按照围绕源`receive()`方法的顺序被调用的。
如果你有,例如,`Advice`对象`a, b, c, d`,其中`b`和`d`是,则这些对象也按照以下顺序应用:`a, c, b, d`。如果源已经是`Proxy`,则在任何现有的`Advice`对象之后调用`ReceiveMessageAdvice`。
如果你希望更改顺序,则必须自己连接代理。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ##### `SimpleActiveIdleReceiveMessageAdvice` (以前的`SimpleActiveIdleMessageSourceAdvice`仅用于`MessageSource`已不推荐。)此建议是`ReceiveMessageAdvice`的一个简单实现。当与`DynamicPeriodicTrigger`结合使用时,它会调整轮询频率,这取决于上一次轮询是否导致了消息。Poller 还必须具有对相同`DynamicPeriodicTrigger`的引用。 | |重要提示:异步切换

`SimpleActiveIdleReceiveMessageAdvice`基于`receive()`结果修改触发器。
只有在 poller 线程上调用通知时,此操作才有效。
它不工作如果 poller 有`task-executor`.
来使用此建议,其中你希望在轮询结果之后使用异步操作,请稍后进行异步切换,也许可以使用`ExecutorChannel`。| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ##### `CompoundTriggerAdvice` 此建议允许根据投票是否返回消息来选择两个触发器中的一个。考虑一个使用`CronTrigger`的 poller。`CronTrigger`实例是不可变的,因此一旦构造它们,就不能对它们进行更改。考虑一个用例,我们希望使用 CRON 表达式每小时触发一次轮询,但是如果没有收到消息,则每分钟轮询一次,并且在检索到消息时,恢复为使用 CRON 表达式。 建议(和 Poller)为此目的使用`CompoundTrigger`。触发器的`primary`触发器可以是`CronTrigger`。当通知检测到没有收到消息时,它会将辅助触发器添加到`CompoundTrigger`。当`CompoundTrigger`实例的`nextExecutionTime`方法被调用时,如果存在,它将委托给辅助触发器。否则,它将委托给主触发器。 Poller 还必须具有对相同`CompoundTrigger`的引用。 下面的示例显示了每小时 CRON 表达式的配置,并将其退回到每分钟: ``` ``` | |重要提示:异步切换

`CompoundTriggerAdvice`基于`receive()`结果修改触发器。
只有在 poller 线程上调用通知时,此操作才有效。
它不工作如果 poller 有`task-executor`。
使用此建议,其中你希望在轮询结果之后使用异步操作,请稍后进行异步切换,也许可以使用`ExecutorChannel`。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ##### MessageSource-仅限建议 有些建议可能只适用于`MessageSource.receive()`,而对于`PollableChannel`则没有意义。为此,仍然存在`MessageSourceMutator`接口(`ReceiveMessageAdvice`的扩展)。对于`default`方法,它完全替换了已经废弃的`AbstractMessageSourceAdvice`方法,并且应该在只期望`MessageSource`代理的实现中使用。有关更多信息,请参见[入站通道适配器:轮询多个服务器和目录](./ftp.html#ftp-rotating-server-advice)。 ### 通道适配器 通道适配器是一个消息端点,它允许将单个发送方或接收方连接到消息通道。 Spring 集成提供了许多适配器以支持各种传输,例如 JMS、文件、HTTP、Web 服务、邮件等。本参考指南的下几章将讨论每个适配器。然而,这一章的重点是简单但灵活的方法-调用通道适配器支持。这里既有入站适配器,也有出站适配器,每个适配器都可以配置为使用核心名称空间中提供的 XML 元素。这些提供了一种扩展 Spring 集成的简单方法,只要你有一个可以作为源或目标调用的方法。 #### 配置入站通道适配器 一个`inbound-channel-adapter`元素(在 Java 配置中是`SourcePollingChannelAdapter`)可以在 Spring 管理的对象上调用任何方法,并在将方法的输出转换为`Message`之后,将一个非空返回值发送到`MessageChannel`。当适配器的订阅被激活时,Poller 尝试从源接收消息。根据提供的配置,用`TaskScheduler`调度 Poller。要为单个通道适配器配置轮询间隔或 CRON 表达式,可以提供具有调度属性之一的“poller”元素,例如“fixed-rate”或“CRON”。下面的示例定义了两个`inbound-channel-adapter`实例: Java DSL ``` @Bean public IntegrationFlow source1() { return IntegrationFlows.from(() -> new GenericMessage<>(...), e -> e.poller(p -> p.fixedRate(5000))) ... .get(); } @Bean public IntegrationFlow source2() { return IntegrationFlows.from(() -> new GenericMessage<>(...), e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI"))) ... .get(); } ``` Java ``` public class SourceService { @InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000")) Object method1() { ... } @InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI")) Object method2() { ... } } ``` Kotlin DSL ``` @Bean fun messageSourceFlow() = integrationFlow( { GenericMessage<>(...) }, { poller { it.fixedRate(5000) } }) { ... } ``` XML ``` ``` 另见[通道适配器表达式和脚本](#channel-adapter-expressions-and-scripts)。 | |如果没有提供 Poller,则必须在上下文中注册一个默认 Poller。
有关更多详细信息,请参见[端点命名空间支持](./endpoint.html#endpoint-namespace)。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | |重要:poller 配置

所有`inbound-channel-adapter`类型都支持`SourcePollingChannelAdapter`,这意味着它们包含一个 poller 配置,该配置轮询`MessageSource`(以调用产生该值的自定义方法)这将成为基于 poller 中指定的配置的`Message`有效负载)。
下面的示例显示了两个 poller 的配置:


在第一个配置中,轮询任务每轮询一次被调用,并且在每个任务(轮询)期间,根据`max-messages-per-poll`属性值调用该方法(该方法将产生消息)一次,
在第二个配置中,轮询任务将在每个轮询中调用 10 次,或者直到它返回’null’为止,因此,在每次轮询以一秒的间隔进行时,每次轮询可能会产生 10 条消息。
但是,如果配置看起来像以下示例,会发生什么情况:

```

```
请注意,没有`max-messages-per-poll`指定。
,我们将在后面介绍,在`PollingConsumer`(例如,`service-activator`,`filter`,`router`,以及其他)中,相同的 poller 配置将对`-1`的`max-messages-per-poll`具有默认值,这意味着“不停地执行轮询任务,除非轮询方法返回 null(可能是因为`QueueChannel`中没有更多的消息)”,然后睡眠一秒。,

但是,在`SourcePollingChannelAdapter`中,有点不同。
`max-messages-per-poll`的默认值是`1`,除非你显式地将其设置为负值(例如`-1`)。
这可以确保 poller 可以对生命周期事件(例如开始和停止)做出反应,并防止它可能在无限循环中旋转如果`MessageSource`的自定义方法的实现可能永远不会返回 null,并且碰巧是不可中断的。

但是,如果你确信你的方法可以返回 null,并且你需要对每个轮询中可用的尽可能多的源进行轮询,你应该显式地将`max-messages-per-poll`设置为负值,如下例所示:

```

```
从版本 5.5 开始,
的`0`值具有特殊的含义-完全跳过`MessageSource.receive()`调用,这可能被认为是对此入站通道适配器的暂停,直到`maxMessagesPerPoll`在稍后的时间被更改为非零值,例如通过控制总线。

还请参见[全局默认 Poller](./endpoint.html#global-default-poller)以获取更多信息。| |---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### 配置出站通道适配器 一个`outbound-channel-adapter`元素(用于 Java 配置的`@ServiceActivator`)也可以将`MessageChannel`连接到任何 POJO 使用者方法,该方法应该使用发送到该通道的消息的有效负载来调用。下面的示例展示了如何定义出站通道适配器: Java DSL ``` @Bean public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) { return f -> f .handle(myPojo, "handle"); } ``` Java ``` public class MyPojo { @ServiceActivator(channel = "channel1") void handle(Object payload) { ... } } ``` Kotlin DSL ``` @Bean fun outboundChannelAdapterFlow(myPojo: MyPojo) = integrationFlow { handle(myPojo, "handle") } ``` XML ``` ``` 如果要调整的通道是`PollableChannel`,则必须提供一个 poller 子元素(`@Poller`上的`@Poller`子注释),如下例所示: Java ``` public class MyPojo { @ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000")) void handle(Object payload) { ... } } ``` XML ``` ``` 如果 POJO 使用者实现可以在其他``定义中重用,则应该使用`ref`属性。然而,如果消费者实现仅由``的单个定义引用,则可以将其定义为内部 Bean,如下例所示: ``` ``` | |不允许在同一个``配置中同时使用`ref`属性和内部处理程序定义,因为它会创建一个不明确的条件。
这样的配置会导致引发异常。| |---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 可以在没有`channel`引用的情况下创建任何通道适配器,在这种情况下,它会隐式地创建`DirectChannel`的实例。创建的通道名称与``或``元素的`id`属性匹配。因此,如果不提供`channel`,则需要`id`。 #### 通道适配器表达式和脚本 与许多其他 Spring 集成组件一样,``和``也提供了对 SPEL 表达式求值的支持。要使用 SPEL,在“expression”属性中提供表达式字符串,而不是提供用于 Bean 上的方法调用的“ref”和“method”属性。当计算一个表达式时,它遵循与方法调用相同的契约,其中:当计算结果为非空值时,``的表达式会生成消息,而``的表达式必须等同于无效返回方法调用。 从 Spring Integration3.0 开始,``还可以配置一个 SPEL``(甚至还可以配置一个`