# 消息路由 ## 消息路由 本章介绍了使用 Spring 集成路由消息的细节。 ### 路由器 本节介绍路由器的工作方式。它包括以下主题: * [Overview](#router-overview) * [通用路由器参数](#router-common-parameters) * [路由器实现](#router-implementations) * [配置通用路由器](#router-namespace) * [Routers and the Spring Expression Language (SpEL)](#router-spel) * [动态路由器](#dynamic-routers) #### 概述 路由器是许多消息传递体系结构中的关键元素。它们消耗来自消息通道的消息,并根据一组条件将每个消耗的消息转发到一个或多个不同的消息通道。 Spring 集成提供了以下路由器: * [有效载荷型路由器](#router-implementations-payloadtyperouter) * [头值路由器](#router-implementations-headervaluerouter) * [收件人列表路由器](#router-implementations-recipientlistrouter) * [XPath 路由器(XML 模块的一部分)](./xml.html#xml-xpath-routing) * [错误消息异常类型路由器](#router-implementations-exception-router) * [(通用)路由器](#router-namespace) 路由器实现共享许多配置参数。然而,路由器之间存在某些差异。此外,配置参数的可用性取决于路由器是在链内使用还是在链外使用。为了提供一个快速的概述,所有可用的属性都在下面的两个表中列出。 下表显示了链外路由器可用的配置参数: | Attribute |路由器| header value router | xpath router | payload type router | recipient list route | exception type router | |----------------------|--------------------------------|--------------------------------|--------------------------------|--------------------------------|--------------------------------|--------------------------------| | apply-sequence |![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)| |default-output-channel|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)| | resolution-required |![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)| | ignore-send-failures |![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)| | timeout |![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)| | id |![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)| | auto-startup |![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)| | input-channel |![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)| | order |![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)| | method |![tickmark](images/tickmark.png)| | | | | | | ref |![tickmark](images/tickmark.png)| | | | | | | expression |![tickmark](images/tickmark.png)| | | | | | | header-name | |![tickmark](images/tickmark.png)| | | | | | evaluate-as-string | | |![tickmark](images/tickmark.png)| | | | | xpath-expression-ref | | |![tickmark](images/tickmark.png)| | | | | converter | | |![tickmark](images/tickmark.png)| | | | 下表显示了链内路由器可用的配置参数: | Attribute |路由器| header value router | xpath router | payload type router | recipient list router | exception type router | |----------------------|--------------------------------|--------------------------------|--------------------------------|--------------------------------|--------------------------------|--------------------------------| | apply-sequence |![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)| |default-output-channel|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)| | resolution-required |![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)| | ignore-send-failures |![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)| | timeout |![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)|![tickmark](images/tickmark.png)| | id | | | | | | | | auto-startup | | | | | | | | input-channel | | | | | | | | order | | | | | | | | method |![tickmark](images/tickmark.png)| | | | | | | ref |![tickmark](images/tickmark.png)| | | | | | | expression |![tickmark](images/tickmark.png)| | | | | | | header-name | |![tickmark](images/tickmark.png)| | | | | | evaluate-as-string | | |![tickmark](images/tickmark.png)| | | | | xpath-expression-ref | | |![tickmark](images/tickmark.png)| | | | | converter | | |![tickmark](images/tickmark.png)| | | | | |在 Spring Integration2.1 中,路由器参数已经在所有路由器实现中得到了更多的标准化。因此,一些小的更改可能会破坏较旧的 Spring 基于 Integration 的应用程序。,自 Spring Integration2.1 以来,将`ignore-channel-name-resolution-failures`属性删除,以利于将其行为与`resolution-required`属性合并,
此外,`resolution-required`属性现在默认为`true`,
在进行这些更改之前,`resolution-required`属性默认为`false`,当没有解析通道且没有设置`default-output-channel`时,导致消息被静默删除。
新行为需要至少一个解析通道,并且默认情况下,如果没有确定通道(或者发送尝试未成功),则抛出`MessageDeliveryException`。
如果你确实希望静默地删除消息,则可以设置`default-output-channel="nullChannel"`。| |---|| #### 常见路由器参数 本节描述所有路由器参数的公共参数(在本章前面的两个表中勾选了它们的所有方框的参数)。 ##### 链条的内部和外部 以下参数对链内和链外的所有路由器都有效。 `apply-sequence` 此属性指定是否应将序列号和大小标题添加到每个消息中。这个可选属性默认为`false`。 `default-output-channel` 如果设置了此属性,则该属性提供了对通道的引用,如果通道解析无法返回任何通道,则应在该通道中发送消息。如果没有提供默认的输出通道,路由器将抛出一个异常。如果你想静默地删除这些消息,请将默认的输出通道属性值设置为`nullChannel`。 | |如果`resolution-required`是`false`且信道未解析,则仅向`default-output-channel`发送消息。| |---|---------------------------------------------------------------------------------------------------------------------------| `resolution-required` 此属性指定是否必须始终成功地将通道名解析为已存在的通道实例。如果设置为`true`,则在无法解析通道时会引发`MessagingException`。将此属性设置为`false`将导致忽略任何不可溶解的通道。这个可选属性默认为`true`。 | |如果指定,当`resolution-required`是`false`且信道未解析时,消息仅被发送到`default-output-channel`。| |---|--------------------------------------------------------------------------------------------------------------------------------------------| `ignore-send-failures` 如果设置为`true`,则忽略发送到消息通道的失败。如果设置为`false`,则会抛出一个`MessageDeliveryException`,并且,如果路由器解析了多个通道,则后续的任何通道都不会接收到该消息。 此属性的确切行为取决于消息发送到的`Channel`的类型。例如,当使用直接通道(单线程)时,发送失败可能是由组件在更远的下游抛出的异常引起的。然而,当将消息发送到一个简单的队列通道(异步)时,引发异常的可能性非常小。 | |虽然大多数路由器会路由到单个通道,但它们可以返回多个通道名,例如,
`recipient-list-router`就是这样做的,
如果在只路由到单个通道的路由器上将此属性设置为`true`,则任何导致的异常都会被吞没,这通常没有什么意义。在这种情况下,最好是在流入口点的错误流中捕获异常。因此,当路由器实现返回多个通道名时,将`ignore-send-failures`属性设置为`true`通常更有意义,因为在发生故障的通道之后的其他通道仍将接收消息。| |---|| 此属性默认为`false`。 `timeout` `timeout`属性指定向目标消息通道发送消息时等待的最大时间(以毫秒为单位)。默认情况下,发送操作会无限期地阻塞。 ##### 顶层(在链外) 以下参数仅对链外的所有顶级路由器有效。 `id` 标识底层的 Spring Bean 定义,在路由器的情况下,它是`EventDrivenConsumer`或`PollingConsumer`的实例,这取决于路由器的`input-channel`分别是`SubscribableChannel`或`PollableChannel`。这是一个可选属性。 `auto-startup` 这个“生命周期”属性表示是否应该在应用程序上下文的启动期间启动这个组件。这个可选属性默认为`true`。 `input-channel` 此端点的接收消息通道。 `order` 此属性定义了当此端点作为订阅服务器连接到信道时调用的顺序。当该通道使用故障转移调度策略时,这一点尤其重要。当这个端点本身是具有队列的通道的轮询消费者时,它没有任何作用。 #### 路由器实现 由于基于内容的路由通常需要一些特定于领域的逻辑,因此大多数用例都需要 Spring 集成的选项,可以通过使用 XML 名称空间支持或注释来将任务委托给 POJO。这两个问题都将在后面讨论。然而,我们首先介绍了几个满足常见需求的实现。 ##### `PayloadTypeRouter` a`PayloadTypeRouter`将消息发送到由有效负载类型映射定义的通道,如下例所示: ``` ``` Spring Integration(参见`[Namespace Support](./configuration.html#configuration-namespace)`)提供的命名空间也支持`PayloadTypeRouter`的配置,该命名空间通过将``配置及其相应的实现(通过使用``元素定义)合并为一个更简洁的配置元素,基本上简化了配置。下面的示例显示了一个`PayloadTypeRouter`配置,该配置与上面的配置等价,但使用了名称空间支持: ``` ``` 下面的示例展示了在 Java 中配置的等效路由器: ``` @ServiceActivator(inputChannel = "routingChannel") @Bean public PayloadTypeRouter router() { PayloadTypeRouter router = new PayloadTypeRouter(); router.setChannelMapping(String.class.getName(), "stringChannel"); router.setChannelMapping(Integer.class.getName(), "integerChannel"); return router; } ``` 当使用 Java DSL 时,有两个选项。 首先,你可以定义路由器对象,如前面的示例所示: ``` @Bean public IntegrationFlow routerFlow1() { return IntegrationFlows.from("routingChannel") .route(router()) .get(); } public PayloadTypeRouter router() { PayloadTypeRouter router = new PayloadTypeRouter(); router.setChannelMapping(String.class.getName(), "stringChannel"); router.setChannelMapping(Integer.class.getName(), "integerChannel"); return router; } ``` 注意,路由器可以是,但不一定是`@Bean`。如果它不是`@Bean`,则流对其进行注册。 其次,你可以在 DSL 流本身中定义路由功能,如下例所示: ``` @Bean public IntegrationFlow routerFlow2() { return IntegrationFlows.from("routingChannel") .>route(Object::getClass, m -> m .channelMapping(String.class, "stringChannel") .channelMapping(Integer.class, "integerChannel")) .get(); } ``` ##### `HeaderValueRouter` a`HeaderValueRouter`基于单独的头部值映射向通道发送消息。当创建`HeaderValueRouter`时,将使用要计算的标头的名称对其进行初始化。页眉的值可以是以下两点中的一点: * 任意值 * 频道名称 如果它是一个任意值,则需要将这些标头值与通道名进行额外的映射。否则,不需要额外的配置。 Spring 集成提供了一个简单的基于名称空间的 XML 配置来配置`HeaderValueRouter`。下面的示例演示了在需要将头数据值映射到通道时`HeaderValueRouter`的配置: ``` ``` 在解析过程中,在前面的示例中定义的路由器可能会遇到信道解析失败,从而导致异常。如果你希望抑制此类异常并将未解决的消息发送到默认输出通道(用`default-output-channel`属性标识),请将`resolution-required`设置为`false`。 通常,消息头的值未显式映射到通道的消息被发送到`default-output-channel`。但是,当标头值被映射到通道名但通道不能解析时,将`resolution-required`属性设置为`false`会导致将此类消息路由到`default-output-channel`。 | |在 Spring Integration2.1 时,属性从`ignore-channel-name-resolution-failures`更改为`resolution-required`。
属性`resolution-required`默认为`true`。| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 下面的示例展示了在 Java 中配置的等效路由器: ``` @ServiceActivator(inputChannel = "routingChannel") @Bean public HeaderValueRouter router() { HeaderValueRouter router = new HeaderValueRouter("testHeader"); router.setChannelMapping("someHeaderValue", "channelA"); router.setChannelMapping("someOtherHeaderValue", "channelB"); return router; } ``` 当使用 Java DSL 时,有两个选项。首先,你可以定义路由器对象,如前面的示例所示: ``` @Bean public IntegrationFlow routerFlow1() { return IntegrationFlows.from("routingChannel") .route(router()) .get(); } public HeaderValueRouter router() { HeaderValueRouter router = new HeaderValueRouter("testHeader"); router.setChannelMapping("someHeaderValue", "channelA"); router.setChannelMapping("someOtherHeaderValue", "channelB"); return router; } ``` 注意,路由器可以是,但不一定是`@Bean`。如果它不是`@Bean`,则流对其进行注册。 其次,你可以在 DSL 流本身中定义路由功能,如下例所示: ``` @Bean public IntegrationFlow routerFlow2() { return IntegrationFlows.from("routingChannel") .route(Message.class, m -> m.getHeaders().get("testHeader", String.class), m -> m .channelMapping("someHeaderValue", "channelA") .channelMapping("someOtherHeaderValue", "channelB"), e -> e.id("headerValueRouter")) .get(); } ``` 不需要将头文件值映射到通道名的配置,因为头文件值本身表示通道名。下面的示例展示了一个路由器,它不需要将标头值映射到通道名: ``` ``` | |自 Spring Integration2.1 以来,解析通道的行为更加明确。例如,如果省略
属性,则路由器无法解析至少一个有效通道,并且通过将`resolution-required`设置为`false`,可以忽略任何通道名称解析失败,然后抛出一个`MessageDeliveryException`。

基本上,默认情况下,路由器必须能够将消息成功路由到至少一个通道。
如果你真的想要删除消息,还必须将`default-output-channel`设置为`nullChannel`。| |---|| ##### `RecipientListRouter` `RecipientListRouter`将接收到的每个消息发送到静态定义的消息通道列表。下面的示例创建了`RecipientListRouter`: ``` ``` Spring 集成还为`RecipientListRouter`配置(参见[命名空间支持](./configuration.html#configuration-namespace))提供了名称空间支持,如下例所示: ``` ``` 下面的示例展示了在 Java 中配置的等效路由器: ``` @ServiceActivator(inputChannel = "routingChannel") @Bean public RecipientListRouter router() { RecipientListRouter router = new RecipientListRouter(); router.setSendTimeout(1_234L); router.setIgnoreSendFailures(true); router.setApplySequence(true); router.addRecipient("channel1"); router.addRecipient("channel2"); router.addRecipient("channel3"); return router; } ``` 下面的示例展示了使用 Java DSL 配置的等效路由器: ``` @Bean public IntegrationFlow routerFlow() { return IntegrationFlows.from("routingChannel") .routeToRecipients(r -> r .applySequence(true) .ignoreSendFailures(true) .recipient("channel1") .recipient("channel2") .recipient("channel3") .sendTimeout(1_234L)) .get(); } ``` | |这里的“apply-sequence”标志与 publish-subscribe-channel 具有相同的效果,并且与 publish-subscribe-channel 一样,在`recipient-list-router`上默认禁用它。
有关更多信息,请参见[`PublishSubscribeChannel`configuration](./channel.html#channel-configuration-pubsubchannel)。| |---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 在配置`RecipientListRouter`时,另一个方便的选择是使用 Spring 表达式语言支持作为单个收件人通道的选择器。这样做类似于在“链”的开头使用一个过滤器来充当“选择性消费者”。然而,在这种情况下,所有这些都被非常简洁地组合到路由器的配置中,如下例所示: ``` ``` 在前面的配置中,对由`selector-expression`属性标识的 SPEL 表达式进行求值,以确定该收件人是否应包括在给定输入消息的收件人列表中。表达式的求值结果必须是布尔值。如果未定义此属性,则通道始终位于收件人列表中。 ##### `RecipientListRouterManagement` 从版本 4.1 开始,`RecipientListRouter`提供了几个操作来在运行时动态地操作收件人。这些管理操作由`RecipientListRouterManagement`通过`@ManagedResource`注释表示。可以通过使用[控制总线](./control-bus.html#control-bus)以及使用 JMX 来获得它们,如下例所示: ``` ``` ``` messagingTemplate.convertAndSend(controlBus, "@'simpleRouter.handler'.addRecipient('channel2')"); ``` 从应用程序启动`simpleRouter`时,只有一个`channel1`收件人。但是在`addRecipient`命令之后,将添加`channel2`收件人。这是一个“注册对消息的一部分感兴趣的东西”用例,当我们可能在某个时间段对来自路由器的消息感兴趣时,所以我们订阅`recipient-list-router`,并在某个时刻决定取消订阅。 由于``的运行时管理操作,从一开始就可以在没有任何``的情况下对其进行配置。在这种情况下,当消息没有匹配的收件人时,`RecipientListRouter`的行为是相同的。如果`defaultOutputChannel`被配置,消息将被发送到那里。否则将抛出`MessageDeliveryException`。 ##### XPath 路由器 XPath 路由器是 XML 模块的一部分。见[使用 XPath 路由 XML 消息](./xml.html#xml-xpath-routing)。 ##### 路由和错误处理 Spring 集成还提供了一种特殊的基于类型的路由器,称为`ErrorMessageExceptionTypeRouter`,用于路由错误消息(定义为其`payload`是`Throwable`实例的消息)。`ErrorMessageExceptionTypeRouter`类似于`PayloadTypeRouter`。事实上,它们几乎完全相同。唯一的区别是,虽然`PayloadTypeRouter`导航有效负载实例的实例层次结构(例如,`payload.getClass().getSuperclass()`)以找到最特定的类型和通道映射,但`ErrorMessageExceptionTypeRouter`导航“异常原因”的层次结构(例如,`payload.getCause()`)以找到最特定的`Throwable`类型或通道映射,并使用`mappingClass.isInstance(cause)`将`cause`匹配到类或任何超类。 | |在这种情况下,通道映射顺序很重要。
因此,如果需要获得`IllegalArgumentException`的映射,而不是`RuntimeException`的映射,则必须首先在路由器上配置最后一个。| |---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | |自版本 4.3 起,`ErrorMessageExceptionTypeRouter`在初始化阶段加载所有映射类,以使`ClassNotFoundException`具有抗故障能力。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------| 下面的示例显示了`ErrorMessageExceptionTypeRouter`的示例配置: ``` ``` #### 配置通用路由器 Spring 集成提供了一种通用的路由器。你可以将其用于通用路由(与 Spring Integration 提供的其他路由器相反,每个路由器都具有某种形式的专门化)。 ##### 使用 XML 配置基于内容的路由器 `router`元素提供了一种将路由器连接到输入通道的方法,并且还接受可选的`default-output-channel`属性。`ref`属性引用了自定义路由器实现的 Bean 名称(它必须扩展`AbstractMessageRouter`)。下面的示例展示了三种通用路由器: ``` ``` 或者,`ref`可能指向包含`@Router`注释的 POJO(稍后显示),或者你可以将`ref`与显式方法名结合起来。指定方法将应用与本文后面的`@Router`注释部分中描述的行为相同的行为。下面的示例定义了一个路由器,该路由器在其`ref`属性中指向 POJO: ``` ``` 如果在其他``定义中引用了自定义路由器实现,我们通常建议使用`ref`属性。但是,如果自定义路由器实现的范围应该是``的单个定义,则可以提供内部 Bean 定义,如下例所示: ``` ``` | |不允许在相同的``配置中同时使用`ref`属性和内部处理程序定义。
这样做会创建一个模棱两可的条件并抛出一个异常。| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | |如果`ref`属性引用扩展`AbstractMessageProducingHandler`的 Bean Bean(例如由框架本身提供的路由器),则对配置进行优化以直接引用路由器。,在这种情况下,
,每个`ref`属性必须引用单独的 Bean 实例(或`prototype`-作用域 Bean)或使用内部的``配置类型。
但是,只有当你在路由器 XML 定义中没有提供任何特定于路由器的属性时,这种优化才适用。
如果你无意中从多个 bean 引用了相同的消息处理程序,则会出现配置异常。| |---|| 下面的示例展示了在 Java 中配置的等效路由器: ``` @Bean @Router(inputChannel = "routingChannel") public AbstractMessageRouter myCustomRouter() { return new AbstractMessageRouter() { @Override protected Collection determineTargetChannels(Message message) { return // determine channel(s) for message } }; } ``` 下面的示例展示了使用 Java DSL 配置的等效路由器: ``` @Bean public IntegrationFlow routerFlow() { return IntegrationFlows.from("routingChannel") .route(myCustomRouter()) .get(); } public AbstractMessageRouter myCustomRouter() { return new AbstractMessageRouter() { @Override protected Collection determineTargetChannels(Message message) { return // determine channel(s) for message } }; } ``` 或者,你也可以对来自消息有效负载的数据进行路由,如下例所示: ``` @Bean public IntegrationFlow routerFlow() { return IntegrationFlows.from("routingChannel") .route(String.class, p -> p.contains("foo") ? "fooChannel" : "barChannel") .get(); } ``` #### 路由器与 Spring 表达式语言 有时,路由逻辑可能很简单,为它编写一个单独的类并将其配置为 Bean 可能看起来有些过头。在 Spring Integration2.0 中,我们提供了一种替代方案,允许你使用 SPEL 来实现以前需要自定义 POJO 路由器的简单计算。 | |有关 Spring 表达式语言的更多信息,请参见[relevant chapter in the Spring Framework Reference Guide](https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#expressions)。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 通常,计算一个 SPEL 表达式,并将其结果映射到一个通道,如下例所示: ``` ``` 下面的示例展示了在 Java 中配置的等效路由器: ``` @Router(inputChannel = "routingChannel") @Bean public ExpressionEvaluatingRouter router() { ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter("payload.paymentType"); router.setChannelMapping("CASH", "cashPaymentChannel"); router.setChannelMapping("CREDIT", "authorizePaymentChannel"); router.setChannelMapping("DEBIT", "authorizePaymentChannel"); return router; } ``` 下面的示例显示了在 Java DSL 中配置的等效路由器: ``` @Bean public IntegrationFlow routerFlow() { return IntegrationFlows.from("routingChannel") .route("payload.paymentType", r -> r .channelMapping("CASH", "cashPaymentChannel") .channelMapping("CREDIT", "authorizePaymentChannel") .channelMapping("DEBIT", "authorizePaymentChannel")) .get(); } ``` 为了进一步简化,SPEL 表达式可以求值为通道名,如下所示: ``` ``` 在前面的配置中,结果通道是由 SPEL 表达式计算的,该表达式将`payload`的值与字面意义`String`的“通道”连接起来。 SPEL 用于配置路由器的另一个优点是,表达式可以返回`Collection`,从而有效地使每个``成为收件人列表路由器。每当表达式返回多个通道值时,消息就会被转发到每个通道。下面的示例展示了这样的表达式: ``` ``` 在上面的配置中,如果消息包含一个名为“channels”的消息头,并且该消息头的值是通道名的`List`,则消息将被发送到列表中的每个通道。当你需要选择多个通道时,你可能还会发现集合投影和集合选择表达式非常有用。有关更多信息,请参见: * [集合投影](https://docs.spring.io/spring-framework/docs/current/spring-framework-reference/core.html#expressions-collection-projection) * [收藏选择](https://docs.spring.io/spring-framework/docs/current/spring-framework-reference/core.html#expressions-collection-selection) ##### 配置带有注释的路由器 当使用`@Router`注释一个方法时,该方法可以返回`MessageChannel`或`String`类型。在后一种情况下,端点解析通道名,就像解析默认输出通道一样。此外,该方法可以返回单个值,也可以返回集合。如果返回了集合,则将回复消息发送到多个通道。总而言之,以下方法签名都是有效的: ``` @Router public MessageChannel route(Message message) {...} @Router public List route(Message message) {...} @Router public String route(Foo payload) {...} @Router public List route(Foo payload) {...} ``` 除了基于有效负载的路由之外,还可以基于消息头中可用的元数据作为属性或属性来路由消息。在这种情况下,用`@Router`注释的方法可以包括一个用`@Header`注释的参数,该参数映射到一个标头值,如下例所示,并在[注释支持](./configuration.html#annotations)中进行了说明: ``` @Router public List route(@Header("orderStatus") OrderStatus status) ``` | |关于基于 XML 的消息的路由,包括 XPath 支持,请参见[XML 支持-处理 XML 有效负载](./xml.html#xml)。| |---|--------------------------------------------------------------------------------------------------------------------------| 有关路由器配置的更多信息,请参见 Java DSL 章节中的[消息路由器](./dsl.html#java-dsl-routers)。 #### 动态路由器 Spring 集成为常见的基于内容的路由用例提供了相当多的不同的路由器配置,以及将定制路由器实现为 POJO 的选项。例如,`PayloadTypeRouter`提供了一种简单的方式来配置路由器,该路由器基于传入消息的有效负载类型来计算信道,而`HeaderValueRouter`通过评估特定消息头的值来配置路由器来计算信道,从而提供了相同的便利。也有基于表达式的路由器,其中信道是基于对表达式的求值来确定的。所有这些类型的路由器都表现出一些动态特性。 然而,这些路由器都需要静态配置。即使在基于表达式的路由器的情况下,表达式本身也被定义为路由器配置的一部分,这意味着在相同的值上操作的相同表达式总是导致相同的信道的计算。这在大多数情况下是可以接受的,因为这样的路线是明确定义的,因此是可预测的。但是,有时我们需要动态地更改路由器配置,以便消息流可以路由到不同的通道。 例如,你可能希望关闭系统的某些部分以进行维护,并临时将消息重新路由到不同的消息流。作为另一个示例,你可能希望通过添加另一个路由来处理更具体类型的`java.lang.Number`(在`PayloadTypeRouter`的情况下)来为消息流引入更多粒度。 不幸的是,使用静态路由器配置来实现这两个目标中的任何一个,你将不得不关闭整个应用程序,更改路由器的配置(更改路由),并将应用程序恢复。这显然不是任何人想要的解决方案。 [动态路由器](https://www.enterpriseintegrationpatterns.com/DynamicRouter.html)模式描述了一种机制,通过这种机制,你可以动态地更改或配置路由器,而不会降低系统或单个路由器的性能。 在深入了解 Spring 集成如何支持动态路由的细节之前,我们需要考虑路由器的典型流程: 1. 计算一个通道标识符,这是路由器在接收到消息后计算的值。通常,它是实际`MessageChannel`的字符串或实例。 2. 将通道标识符解析为通道名。我们将在本节的后面部分描述这个过程的细节。 3. 将通道名称解析为实际的`MessageChannel` 如果步骤 1 产生`MessageChannel`的实际实例,那么在动态路由方面就不能做太多的事情,因为`MessageChannel`是任何路由器工作的最终产品。但是,如果第一步产生的信道标识符不是`MessageChannel`的实例,那么你有很多可能的方法来影响导出`MessageChannel`的过程。考虑以下有效负载类型路由器的示例: ``` ``` 在有效负载类型路由器的上下文中,前面提到的三个步骤将实现如下: 1. 计算一个通道标识符,它是有效负载类型的完全限定名称(例如,`java.lang.String`)。 2. 将通道标识符解析为通道名,其中上一步的结果用于从`mapping`元素中定义的有效负载类型映射中选择适当的值。 3. 将通道名称解析为`MessageChannel`的实际实例,作为对应用程序上下文中的 Bean 的引用(希望是由上一步的结果标识的`MessageChannel`)。 换句话说,每一步都为下一步提供信息,直到流程完成。 现在考虑一个标头值路由器的示例: ``` ``` 现在,我们可以考虑这三个步骤是如何为头值路由器工作的: 1. 计算信道标识符,该标识符是由`header-name`属性标识的报头的值。 2. 将通道标识符 A 解析为通道名,其中上一步的结果用于从`mapping`元素中定义的一般映射中选择适当的值。 3. 将通道名称解析为`MessageChannel`的实际实例,作为对应用程序上下文中的 Bean 的引用(希望是由上一步的结果标识的`MessageChannel`)。 两种不同的路由器类型的前两种配置看起来几乎相同。但是,如果你查看`HeaderValueRouter`的替代配置,我们可以清楚地看到不存在`mapping`子元素,如下面的清单所示: ``` ``` 然而,该配置仍然是完全有效的。因此,自然的问题是,第二步的映射是什么? 第二步现在是可选的。如果`mapping`未被定义,那么在第一步中计算的信道标识符的值被自动地处理为`channel name`,这现在被解析为实际的`MessageChannel`,就像在第三步中那样。这也意味着,第二步是向路由器提供动态特性的关键步骤之一,因为它引入了一个过程,可以让你更改通道标识符解析为通道名的方式,从而影响从初始信道标识符确定`MessageChannel`的最终实例的过程。 例如,在前面的配置中,假设`testHeader`值是’Kermit’,它现在是一个通道标识符(第一步)。由于在此路由器中没有映射,因此将此通道标识符解析为通道名(第二步)是不可能的,并且此通道标识符现在被视为通道名。然而,如果有一个映射,但有一个不同的值呢?最终的结果仍然是相同的,因为,如果无法通过将通道标识符解析为通道名称的过程来确定新值,则通道标识符将成为通道名称。 剩下的就是第三步,将通道名解析为由该名称标识的`MessageChannel`的实际实例。这基本上涉及 Bean 查找所提供的名称。现在,所有包含头-值对`testHeader=kermit`的消息都将路由到一个`MessageChannel`,其 Bean 名称(其`id`)是“kermit”。 但是,如果你想将这些消息路由到“辛普森”频道呢?显然,更改静态配置是可行的,但这样做还需要降低系统的性能。但是,如果可以访问通道标识符映射,则可以引入一个新的映射,其中头-值对现在是`kermit=simpson`,从而让第二步将“Kermit”视为通道标识符,同时将其解析为“Simpson”作为通道名。 这显然也适用于`PayloadTypeRouter`,你现在可以在其中重新映射或删除特定的有效负载类型映射。实际上,它适用于所有其他路由器,包括基于表达式的路由器,因为它们的计算值现在有机会通过第二步来解析到实际的`channel name`。 任何属于`AbstractMappingMessageRouter`(包括大多数框架定义的路由器)的子类的路由器都是动态路由器,因为`channelMapping`是在`AbstractMappingMessageRouter`级别上定义的。该映射的 setter 方法与“setChannelMapping”和“removeChannelMapping”方法一起作为公共方法公开。只要有对路由器本身的引用,就可以在运行时更改、添加和删除路由器映射。这也意味着你可以通过 JMX(参见[JMX 支持](./jmx.html#jmx))或 Spring 集成控制总线(参见[控制总线](./control-bus.html#control-bus))功能公开这些相同的配置选项。 | |返回到通道键,因为通道名是灵活和方便的,
但是,如果你不信任消息创建者,恶意参与者(了解系统)可能会创建一条消息,并将其路由到一个意外的通道。,例如,因此,你可能希望禁用此功能(将`channelKeyFallback`属性设置为`false`),并在需要时更改映射。| |---|| ##### 使用控制总线管理路由器映射 Spring 管理路由器映射的一种方法是通过[控制总线](https://www.enterpriseintegrationpatterns.com/ControlBus.html)模式,该模式公开了一个控制通道,你可以向其发送控制消息以管理和监视集成组件,包括路由器。 | |有关控制总线的更多信息,请参见[控制总线](./control-bus.html#control-bus)。| |---|----------------------------------------------------------------------------------------------| 通常,你会发送一条控制消息,要求在特定的托管组件(例如路由器)上调用特定的操作。以下托管操作(方法)是更改路由器解析过程所特有的: * `public void setChannelMapping(String key, String channelName)`:允许你在`channel identifier`和`channel name`之间添加新的映射或修改现有的映射 * `public void removeChannelMapping(String key)`:允许你删除特定的通道映射,从而断开`channel identifier`和`channel name`之间的关系 请注意,这些方法可以用于简单的更改(例如更新单个路由或添加或删除路由)。但是,如果你想要删除一条路由并添加另一条路由,那么更新就不是原子的了。这意味着路由表在更新之间可能处于不确定状态。从版本 4.0 开始,你现在可以使用控制总线自动更新整个路由表。下面的方法可以让你这样做: * `public MapgetChannelMappings()`:返回当前映射。 * `public void replaceChannelMappings(Properties channelMappings)`:更新映射。请注意,`channelMappings`参数是一个`Properties`对象。这种安排允许控制总线命令使用内置的`StringToPropertiesConverter`,如下例所示: ``` "@'router.handler'.replaceChannelMappings('foo=qux \n baz=bar')" ``` 请注意,每个映射都用一个换行符分隔(`\n`)。对于映射的编程更改,出于类型安全考虑,我们建议你使用`setChannelMappings`方法。`replaceChannelMappings`忽略不是`String`对象的键或值。 ##### 使用 JMX 管理路由器映射 你还可以使用 Spring 的 JMX 支持来公开一个路由器实例,然后使用你最喜欢的 JMX 客户机(例如,JConsole)来管理那些用于更改路由器配置的操作(方法)。 | |有关 Spring Integration 的 JMX 支持的更多信息,请参见[JMX 支持](./jmx.html#jmx)。| |---|-----------------------------------------------------------------------------------------------| ##### 路由条 Spring 从版本 4.1 开始,集成提供了[布线滑移](https://www.enterpriseintegrationpatterns.com/RoutingTable.html)Enterprise 集成模式的实现。它被实现为`routingSlip`消息头,当没有为端点指定`outputChannel`时,它用于在`AbstractMessageProducingHandler`实例中确定下一个通道。这种模式在复杂、动态的情况下很有用,因为很难配置多个路由器来确定消息流。当消息到达没有`output-channel`的端点时,将查询`routingSlip`以确定将消息发送到的下一个通道。当路由滑移耗尽时,恢复正常的`replyChannel`处理。 路由表的配置以`HeaderEnricher`选项的形式呈现——一个分号分隔的路由表,其中包含`path`条目,如下例所示: ``` channel1 request.headers[myRoutingSlipChannel] ``` 前面的例子有: * 配置``,以演示路由条`path`中的条目可以指定为可解析密钥。 * ````子元素用于将`RoutingSlipHeaderValueMessageProcessor`填充到`HeaderEnricher`处理程序。 * `RoutingSlipHeaderValueMessageProcessor`接受一个`String`已解析路由滑移`path`项的数组,并返回(从`processMessage()`)一个`singletonMap`,并将`path`作为`0`作为初始`routingSlipIndex`。 路由表`path`条目可以包含`MessageChannel` Bean 名称、`RoutingSlipRouteStrategy` Bean 名称和 Spring 表达式。`RoutingSlipHeaderValueMessageProcessor`在第一次`processMessage`调用时,对照`BeanFactory`条目检查每个路由单`path`条目。它将条目(在应用程序上下文中不是 Bean 名称)转换为`ExpressionEvaluatingRoutingSlipRouteStrategy`实例。`RoutingSlipRouteStrategy`条目被多次调用,直到它们返回 null 或空的`String`。 由于在`getOutputChannel`过程中涉及到路由滑移,因此我们有一个请求-回复上下文。引入了`RoutingSlipRouteStrategy`来确定下一个`outputChannel`,它使用`requestMessage`和`reply`对象。此策略的实现应该在应用程序上下文中注册为 Bean,并且其 Bean 名称在路由单`path`中使用。提供了`ExpressionEvaluatingRoutingSlipRouteStrategy`实现。它接受一个 SPEL 表达式,并且内部的`ExpressionEvaluatingRoutingSlipRouteStrategy.RequestAndReply`对象被用作计算上下文的根对象。这是为了避免为每个`ExpressionEvaluatingRoutingSlipRouteStrategy.getNextPath()`调用创建`EvaluationContext`的开销。它是一个简单的 Java Bean,具有两个属性:`Message request`和`Object reply`。有了这个表达式实现,我们可以通过使用 SPEL(例如,`@routingSlipRoutingPojo.get(request, reply)`和`request.headers[myRoutingSlipChannel]`)来指定路由滑移`path`项,并避免为`RoutingSlipRouteStrategy`定义 Bean。 | |`requestMessage`参数总是`Message`。
根据上下文,应答对象可以是`Message`、`AbstractIntegrationMessageBuilder`或任意应用程序域对象(例如,当它由服务激活器调用的 POJO 方法返回时),在前两种情况下,
,当使用 SPEL(或 Java 实现)时,通常的`Message`属性(`payload`和`headers`)是可用的。
对于任意域对象,这些属性是不可用的,
由于这个原因,如果使用结果来确定下一条路径,那么在将路由滑与 POJO 方法结合使用时要小心。| |---|| | |如果在分布式环境中涉及路由滑移,我们建议不要对路由滑移`path`使用内联表达式,
此建议适用于分布式环境,例如跨 JVM 应用程序,通过消息代理使用`request-reply`(例如[AMQP 支持](./amqp.html#amqp)或[JMS 支持](./jms.html#jms)),或者在集成流中使用持久的`MessageStore`([消息存储](./message-store.html#message-store))。
框架使用`RoutingSlipHeaderValueMessageProcessor`将它们转换为`ExpressionEvaluatingRoutingSlipRouteStrategy`对象,并且它们在`routingSlip`消息头中使用。
因为这个类不是`Serializable`(不能,因为它依赖于`BeanFactory`),所以整个`Message`变得不可序列化,并且,在任何分布式操作中,我们最终得到一个`NotSerializableException`。,
来克服这个限制,将`ExpressionEvaluatingRoutingSlipRouteStrategy` Bean 与所需的 SPEL 注册并在路由单`path`配置中使用其 Bean 名称。| |---|| 对于 Java 配置,你可以将`RoutingSlipHeaderValueMessageProcessor`实例添加到`HeaderEnricher` Bean 定义中,如下例所示: ``` @Bean @Transformer(inputChannel = "routingSlipHeaderChannel") public HeaderEnricher headerEnricher() { return new HeaderEnricher(Collections.singletonMap(IntegrationMessageHeaderAccessor.ROUTING_SLIP, new RoutingSlipHeaderValueMessageProcessor("myRoutePath1", "@routingSlipRoutingPojo.get(request, reply)", "routingSlipRoutingStrategy", "request.headers[myRoutingSlipChannel]", "finishChannel"))); } ``` 当一个端点产生一个答复并且没有`outputChannel`被定义时,路由滑移算法的工作方式如下: * `routingSlipIndex`用于从路由表`path`列表中获取一个值。 * 如果来自`routingSlipIndex`的值是`String`,则用于从`BeanFactory`获得 Bean。 * 如果返回的 Bean 是`MessageChannel`的实例,则将其用作下一个`outputChannel`,并且在回复消息头中增加`routingSlipIndex`项(路由条`path`项保持不变)。 * 如果返回的 Bean 是`RoutingSlipRouteStrategy`的实例,而其`getNextPath`不返回空的`String`,则该结果被用作下一个`outputChannel`的 Bean 名称。`routingSlipIndex`保持不变。 * 如果`RoutingSlipRouteStrategy.getNextPath`返回一个空的`String`或`null`,则`routingSlipIndex`被递增,并且`getOutputChannelFromRoutingSlip`被递归地调用用于下一个路由滑移`path`项。 * 如果下一个路由条`path`条目不是`String`,则它必须是`RoutingSlipRouteStrategy`的实例。 * 当`routingSlipIndex`超过路由单`path`列表的大小时,算法将移动到标准`replyChannel`报头的默认行为。 #### Process Manager Enterprise 集成模式 Enterprise 集成模式包括[过程管理器](https://www.enterpriseintegrationpatterns.com/ProcessManager.html)模式。现在,你可以通过使用封装在路由条中的`RoutingSlipRouteStrategy`中的自定义流程管理器逻辑轻松地实现此模式。除了 Bean 名称之外,`RoutingSlipRouteStrategy`还可以返回任何`MessageChannel`对象,并且不要求该`MessageChannel`实例在应用程序上下文中是 Bean。这样,当无法预测应该使用哪个信道时,我们可以提供强大的动态路由逻辑。可以在`RoutingSlipRouteStrategy`中创建`MessageChannel`并返回。对于这样的情况,`FixedSubscriberChannel`与`MessageHandler`相关联的实现是一个很好的组合。例如,你可以路由到[反应流](https://projectreactor.io/docs/core/release/reference/#getting-started),如下例所示: ``` @Bean public PollableChannel resultsChannel() { return new QueueChannel(); } @Bean public RoutingSlipRouteStrategy routeStrategy() { return (requestMessage, reply) -> requestMessage.getPayload() instanceof String ? new FixedSubscriberChannel(m -> Mono.just((String) m.getPayload()) .map(String::toUpperCase) .subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v))) : new FixedSubscriberChannel(m -> Mono.just((Integer) m.getPayload()) .map(v -> v * 2) .subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v))); } ``` ### 过滤器 消息筛选器用于根据某些条件(例如消息头值或消息内容本身)来决定是否应该传递`Message`。因此,消息过滤器类似于路由器,但对于从过滤器的输入通道接收的每个消息,相同的消息可能会也可能不会发送到过滤器的输出通道。与路由器不同,它不会决定将消息发送到哪个消息通道,而只会决定是否发送消息。 | |正如我们在本节后面所描述的,该过滤器还支持丢弃通道。
在某些情况下,它可以扮演非常简单的路由器(或“交换机”)的角色,基于布尔条件。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 在 Spring 集成中,可以将消息过滤器配置为消息端点,该消息端点将委托给`MessageSelector`接口的实现。这个界面本身非常简单,如下所示: ``` public interface MessageSelector { boolean accept(Message message); } ``` `MessageFilter`构造函数接受选择器实例,如下例所示: ``` MessageFilter filter = new MessageFilter(someSelector); ``` 结合名称空间和 SPEL,你可以用很少的 Java 代码配置强大的过滤器。 #### 使用 XML 配置过滤器 可以使用``元素来创建消息选择端点。除了`input-channel`和`output-channel`属性外,它还需要一个`ref`属性。`ref`可以指向`MessageSelector`实现,如下例所示: ``` ``` 或者,你可以添加`method`属性。在这种情况下,`ref`属性可以引用任何对象。引用的方法可能期望入站消息的`Message`类型或有效负载类型。方法必须返回一个布尔值。如果方法返回“true”,则消息将被发送到输出通道。下面的示例展示了如何配置使用`method`属性的筛选器: ``` ``` 如果选择器或经过调整的 POJO 方法返回`false`,那么一些设置将控制对拒绝消息的处理。默认情况下(如果按照前面示例中的配置),被拒绝的消息将被静默删除。如果拒绝反而会导致错误条件,请将`throw-exception-on-rejection`属性设置为`true`,如下例所示: ``` ``` 如果希望将被拒绝的消息路由到特定的通道,请将该引用提供为`discard-channel`,如下例所示: ``` ``` 另见[建议过滤器](./handler-advice.html#advising-filters)。 | |消息过滤器通常与发布-订阅通道结合使用。
许多过滤器端点可能被订阅到相同的通道,它们决定是否将消息传递到下一个端点,它可以是任何受支持的类型(例如服务激活器)。
这为使用具有单个点对点输入通道和多个输出通道的消息路由器这一更积极主动的方法提供了一种被动的替代方法。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 如果在其他``定义中引用了自定义过滤器实现,则建议使用`ref`属性。但是,如果自定义过滤器实现的作用域是单个``元素,则应该提供内部 Bean 定义,如下例所示: ``` ``` | |不允许在同一个``配置中同时使用`ref`属性和内部处理程序定义,因为它会创建一个模棱两可的条件并抛出一个异常。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | |如果`ref`属性引用扩展`MessageFilter`的 Bean(例如框架本身提供的过滤器),则通过将输出通道直接注入过滤器 Bean 来优化配置。,在这种情况下,
,每个`ref`必须是一个单独的 Bean 实例(或者`prototype`-作用域 Bean),或者使用内部的``配置类型。
但是,只有当你在 Filter XML 定义中没有提供任何特定于过滤器的属性时,这种优化才会应用。
如果你无意中从多个 bean 引用了相同的消息处理程序,则会出现配置异常。| |---|| 随着 SPEL 支持的引入, Spring Integration 向过滤器元素添加了`expression`属性。对于简单的过滤器,它可以完全避免使用 Java,如下例所示: ``` ``` 作为表达式属性的值传递的字符串被计算为 SPEL 表达式,其消息在计算上下文中可用。如果必须在应用程序上下文的作用域中包括表达式的结果,则可以使用`#{}`表示法,如[SPEL 参考文献](https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#expressions-beandef)中所定义的那样,如下例所示: ``` ``` 如果表达式本身需要是动态的,那么可以使用“表达式”子元素。这提供了一个间接级别,用于通过其键从`ExpressionSource`解析表达式。这是一个可以直接实现的策略接口,或者可以依赖 Spring Integration 中可用的版本,该版本从“资源包”加载表达式,并可以在给定的秒数之后检查是否进行了修改。下面的配置示例演示了所有这些,如果底层文件已被修改,表达式可以在一分钟内重新加载: ``` ``` 如果`ExpressionSource` Bean 名为`expressionSource`,则不需要在``元素上提供` source`属性。然而,在前面的示例中,我们展示了它的完整性。 “config/integration/expressions.properties”文件(或任何带有区域设置扩展名的特定版本,以加载资源包的典型方式进行解析)可以包含一个键/值对,如下例所示: ``` filterPatterns.example=payload > 100 ``` | |所有这些使用`expression`作为属性或子元素的示例也可以应用于 Transformer、Router、Splitter、Service-Activator 和 Header-Enrich 元素中,
给定组件类型的语义和角色将影响对评估结果的解释,与方法调用的返回值将被解释的方式相同。
例如,表达式可以返回将被路由器组件视为消息通道名的字符串。
但是,在 Spring 集成范围内的所有核心 EIP 组件中,根据作为根对象的消息计算表达式并解析 Bean 名称(如果前缀为’@’)的底层功能是一致的。| |---|| #### 配置带有注释的过滤器 下面的示例展示了如何通过使用注释来配置过滤器: ``` public class PetFilter { ... @Filter (1) public boolean dogsOnly(String input) { ... } } ``` |**1**|表示此方法将用作筛选器的注释。
如果要将此类用作筛选器,则必须指定它。| |-----|--------------------------------------------------------------------------------------------------------------------------------------| XML 元素提供的所有配置选项也可用于`@Filter`注释。 过滤器可以从 XML 显式引用,或者,如果`@MessageEndpoint`注释是在类上定义的,则可以通过 Classpath 扫描自动检测。 另见[使用注释为端点提供建议](./handler-advice.html#advising-with-annotations)。 ### splitter 拆分器是一个组件,其作用是将消息划分为多个部分,并将生成的消息发送给要独立处理的部分。通常情况下,他们是包括聚合器的管道中的上游生产者。 #### 编程模型 用于执行拆分的 API 由一个基类组成,`AbstractMessageSplitter`。它是一个`MessageHandler`实现,它封装了拆分器常见的功能,例如在生成的消息上填充适当的消息头(`CORRELATION_ID`、`SEQUENCE_SIZE`和`SEQUENCE_NUMBER`)。这种填充可以跟踪消息及其处理结果(在典型的场景中,这些头被复制到由各种转换端点产生的消息)。然后可以使用这些值,例如,由[组合消息处理器](https://www.enterpriseintegrationpatterns.com/DistributionAggregate.html)使用。 下面的示例显示了`AbstractMessageSplitter`的摘录: ``` public abstract class AbstractMessageSplitter extends AbstractReplyProducingMessageConsumer { ... protected abstract Object splitMessage(Message message); } ``` 要在应用程序中实现特定的拆分器,可以扩展`AbstractMessageSplitter`并实现`splitMessage`方法,该方法包含用于拆分消息的逻辑。返回值可以是以下几种值之一: * 一个`Collection`或一个消息数组或一个遍历消息的`Iterable`(或`Iterator`)。在这种情况下,消息作为消息发送(在填充`CORRELATION_ID`、`SEQUENCE_SIZE`和`SEQUENCE_NUMBER`之后)。使用这种方法可以提供更多的控制——例如,作为拆分过程的一部分,填充自定义消息头。 * 一个`Collection`或一个非消息对象数组,或一个在非消息对象上迭代的`Iterable`(或`Iterator`)。它的工作原理与前一种情况类似,只是每个集合元素被用作消息有效负载。使用这种方法,你可以在不需要考虑消息传递系统的情况下专注域对象,并生成更易于测试的代码。 * `Message`或非消息对象(但不是集合或数组)。它的工作原理与之前的情况类似,只是发送了一条消息。 在 Spring 集成中,任何 POJO 都可以实现分割算法,前提是它定义了一个接受单个参数并具有返回值的方法。在这种情况下,方法的返回值被解释为如前所述。输入参数可以是`Message`,也可以是简单的 POJO。在后一种情况下,拆分器接收传入消息的有效负载。我们推荐这种方法,因为它将代码与 Spring Integration API 分离,并且通常更容易测试。 ##### 迭代器 从版本 4.1 开始,`AbstractMessageSplitter`支持`Iterator`类型来分割`value`。注意,在`Iterator`(或`Iterable`)的情况下,我们无法访问底层项的数量,并且`SEQUENCE_SIZE`头被设置为`0`。这意味着``的`SequenceSizeReleaseStrategy`的默认`SequenceSizeReleaseStrategy`将不工作,并且来自`splitter`的`CORRELATION_ID`的组将不会被释放;它将保持为`incomplete`。在这种情况下,你应该使用适当的自定义`ReleaseStrategy`,或者将`send-partial-result-on-expiry`与`group-timeout`或`MessageGroupStoreReaper`一起使用。 从版本 5.0 开始,`AbstractMessageSplitter`提供了`protected obtainSizeIfPossible()`方法,以允许在可能的情况下确定`Iterable`和`Iterator`对象的大小。例如`XPathMessageSplitter`可以确定底层`NodeList`对象的大小。从版本 5.0.9 开始,该方法还适当地返回`com.fasterxml.jackson.core.TreeNode`的大小。 `Iterator`对象可以避免在拆分前在内存中构建整个集合的需要。例如,当底层项使用迭代或流从某些外部系统(例如数据库或 FTP`MGET`)填充时。 ##### 流和通量 从版本 5.0 开始,`AbstractMessageSplitter`支持用于分割`value`的 Java`Stream`和反应流`Publisher`类型。在这种情况下,目标`Iterator`是建立在它们的迭代功能之上的。 此外,如果分离器的输出通道是`ReactiveStreamsSubscribableChannel`的实例,则`AbstractMessageSplitter`产生一个`Flux`的结果,而不是一个`Iterator`,并且输出通道订阅了这个`Flux`,用于根据下游流需求进行基于背压的分离。 从版本 5.2 开始,Splitter 支持一个`discardChannel`选项,用于发送那些请求消息,对于这些请求消息,Split 函数返回了一个空的容器(集合、数组、流、`Flux`等)。在这种情况下,没有要迭代的项用于发送到`outputChannel`。将`null`分割结果保留为流指示器的结束。 #### 使用 XML 配置拆分器 可以通过 XML 将拆分器配置如下: ``` (6) ``` |**1**|分离器的 ID 是可选的。| |-----|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |**2**|在应用程序上下文中定义的对 Bean 的引用。该 Bean 必须实现分割逻辑,如在前面的部分中所描述的那样。可选的。如果没有提供对 Bean 的引用,假定到达`input-channel`上的消息的有效负载是`java.util.Collection`的实现,并且将缺省分割逻辑应用于集合,将每个单独的元素合并到消息中并将其发送到`output-channel`。| |**3**|实现分割逻辑的方法(在 Bean 上定义)。
可选的。| |**4**|分配器的输入通道。
需要。| |**5**|分割器将分割传入消息的结果发送到的通道。
可选的(因为传入消息可以自己指定一个回复通道)。| |**6**|在空拆分结果的情况下将请求消息发送到的通道。
可选的(在`null`结果的情况下将停止)。| 如果可以在其他``定义中引用自定义拆分器实现,则建议使用`ref`属性。但是,如果自定义拆分器处理程序实现的范围应为``的单个定义,则可以配置内部 Bean 定义,如下例所示: ``` ``` | |不允许在同一个``配置中同时使用`ref`属性和内部处理程序定义,因为这会创建一个模棱两可的条件并导致引发异常。| |---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | |如果`ref`属性引用扩展`AbstractMessageProducingHandler`的 Bean(例如框架本身提供的分割器),则通过将输出通道直接注入处理程序来优化配置。,在这种情况下,
,每个`ref`必须是一个单独的 Bean 实例(或`prototype`-作用域 Bean),或者使用内部``配置类型。
但是,只有当你在 Splitter XML 定义中没有提供任何特定于 Splitter 的属性时,此优化才会应用。
如果你无意中从多个 bean 引用了相同的消息处理程序,则会出现配置异常。| |---|| #### 配置带有注释的拆分器 `@Splitter`注释适用于期望`Message`类型或消息有效负载类型的方法,并且方法的返回值应该是任何类型的`Collection`。如果返回的值不是实际的`Message`对象,则将每个项包装在`Message`中,作为`Message`的有效负载。每个结果`Message`都被发送到用于定义`@Splitter`的端点的指定输出通道。 下面的示例展示了如何使用`@Splitter`注释来配置拆分器: ``` @Splitter List extractItems(Order order) { return order.getItems() } ``` 另见[使用注释为端点提供建议](./handler-advice.html#advising-with-annotations),[Splitters](./dsl.html#java-dsl-splitters)和[文件拆分器](./file.html#file-splitter)。 ### 聚合器 聚合器基本上是拆分器的镜像,是一种消息处理程序,它接收多个消息并将它们合并为一个消息。实际上,聚合器通常是包含拆分器的管道中的下游消费者。 从技术上讲,聚合器比拆分器更复杂,因为它是有状态的。它必须保存要聚合的消息,并确定何时可以聚合完整的消息组。为了做到这一点,它需要`MessageStore`。 #### 功能 聚合器通过关联和存储一组相关消息来组合一组相关消息,直到该组被认为是完整的。此时,聚合器通过处理整个组创建一个消息,并将聚合的消息作为输出发送。 实现聚合器需要提供执行聚合的逻辑(即,创建来自多个消息的单个消息)。两个相关的概念是相关性和释放性。 相关性决定了如何为聚合而对消息进行分组。在 Spring 集成中,默认情况下,关联是基于`IntegrationMessageHeaderAccessor.CORRELATION_ID`消息头完成的。具有相同`IntegrationMessageHeaderAccessor.CORRELATION_ID`的消息被分组在一起。但是,你可以自定义相关策略,以允许使用其他方式指定如何将消息组合在一起。要做到这一点,你可以实现`CorrelationStrategy`(将在本章后面介绍)。 要确定准备好处理一组消息的时间点,请参考`ReleaseStrategy`。当序列中包含的所有消息都存在时,聚合器的默认发布策略将基于`IntegrationMessageHeaderAccessor.SEQUENCE_SIZE`头发布一个组。你可以通过提供对自定义`ReleaseStrategy`实现的引用来覆盖此默认策略。 #### 编程模型 聚合 API 由多个类组成: * 接口`MessageGroupProcessor`,及其子类:`MethodInvokingAggregatingMessageGroupProcessor`和`ExpressionEvaluatingMessageGroupProcessor` * `ReleaseStrategy`接口及其默认实现:`SimpleSequenceSizeReleaseStrategy` * `CorrelationStrategy`接口及其默认实现:`HeaderAttributeCorrelationStrategy` ##### `AggregatingMessageHandler` `AggregatingMessageHandler`(`AbstractCorrelatingMessageHandler`的子类)是`MessageHandler`实现,封装了聚合器(和其他相关用例)的公共功能,如下所示: * 将消息关联到要聚合的组中 * 在`MessageStore`中维护这些消息,直到可以释放该组 * 决定何时释放该团体 * 将已发布的组聚合为一条消息 * 识别并响应过期的组 将决定如何将消息组合在一起的责任委托给`CorrelationStrategy`实例。决定消息组是否可以释放的责任被委托给`ReleaseStrategy`实例。 下面的清单显示了基本`AbstractAggregatingMessageGroupProcessor`的一个简短的亮点(实现`aggregatePayloads`方法的责任留给了开发人员): ``` public abstract class AbstractAggregatingMessageGroupProcessor implements MessageGroupProcessor { protected Map aggregateHeaders(MessageGroup group) { // default implementation exists } protected abstract Object aggregatePayloads(MessageGroup group, Map defaultHeaders); } ``` 参见`DefaultAggregatingMessageGroupProcessor`,`ExpressionEvaluatingMessageGroupProcessor`和`MethodInvokingMessageGroupProcessor`作为`AbstractAggregatingMessageGroupProcessor`的开箱即用实现。 从版本 5.2 开始,`Function>`策略可用于`AbstractAggregatingMessageGroupProcessor`合并和计算输出消息的(聚合)头。`DefaultAggregateHeadersFunction`实现是可用的,其逻辑返回所有在组中没有冲突的头;在组中的一个或多个消息上没有头不被认为是冲突。相互冲突的标题被省略。与新引入的`DelegatingMessageGroupProcessor`一起,该函数用于任意(非-`AbstractAggregatingMessageGroupProcessor`)`MessageGroupProcessor`实现。本质上,框架将提供的函数注入到`AbstractAggregatingMessageGroupProcessor`实例中,并将所有其他实现封装到`DelegatingMessageGroupProcessor`中。在逻辑上`AbstractAggregatingMessageGroupProcessor`和`DelegatingMessageGroupProcessor`之间的区别是,后者在调用委托策略之前不会提前计算标题,并且如果委托返回`Message`或`AbstractIntegrationMessageBuilder`,则不会调用函数。在这种情况下,框架假定目标实现已经注意到产生了一组适当的头,填充到返回的结果中。对于 XML 配置,`Function>`策略作为`headers-function`引用属性可用,对于 Java DSL,作为`AggregatorSpec.headersFunction()`选项可用,对于普通 Java 配置,作为`AggregatorFactoryBean.setHeadersFunction()`选项可用。 `CorrelationStrategy`由`AbstractCorrelatingMessageHandler`拥有,并具有基于`IntegrationMessageHeaderAccessor.CORRELATION_ID`消息头的默认值,如下例所示: ``` public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) { ... this.correlationStrategy = correlationStrategy == null ? new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy; this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy; ... } ``` 至于消息组的实际处理,默认的实现是`DefaultAggregatingMessageGroupProcessor`。它创建一个`Message`,其有效载荷是给定组所接收的有效载荷的`List`。这对于带有拆分器、发布-订阅通道或上游收件人列表路由器的简单分散-收集实现非常有效。 | |在这种类型的场景中,当使用发布-订阅通道或收件人列表路由器时,请确保启用`apply-sequence`标志。
这样做会添加必要的标题:`CORRELATION_ID`,`SEQUENCE_NUMBER`,和`SEQUENCE_SIZE`。
在 Spring 集成中,默认情况下,分离器的行为是启用的,但是,它不能用于发布-订阅通道或收件人列表路由器,因为这些组件可以在不需要这些头的各种上下文中使用。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 在为应用程序实现特定的聚合器策略时,可以扩展`AbstractAggregatingMessageGroupProcessor`并实现`aggregatePayloads`方法。然而,有更好的解决方案(与 API 的耦合较少)来实现聚合逻辑,它可以通过 XML 或注释进行配置。 通常,任何 POJO 都可以实现聚集算法,如果它提供了一个方法,该方法接受一个`java.util.List`作为参数(也支持参数化的列表)。调用此方法聚合消息,如下所示: * 如果参数是`java.util.Collection`,并且参数类型 T 可分配给`Message`,则将为聚合而积累的整个消息列表发送到聚合器。 * 如果参数是非参数化的`java.util.Collection`,或者参数类型不能分配给`Message`,则该方法接收累积消息的有效负载。 * 如果返回类型不能分配给`Message`,则将其视为框架自动创建的`Message`的有效负载。 | |为了简化代码并促进诸如低耦合、可测试性等最佳实践,实现聚合逻辑的首选方法是通过 POJO 并使用 XML 或注释支持在应用程序中配置它。| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 从版本 5.3 开始,在处理消息组之后,`AbstractCorrelatingMessageHandler`针对具有几个嵌套级别的正确的拆分器聚合器场景执行`MessageBuilder.popSequenceDetails()`消息头修改。只有在消息组发布结果不是消息集合的情况下,才会执行此操作。在这种情况下,目标`MessageGroupProcessor`在构建这些消息时负责执行`MessageBuilder.popSequenceDetails()`调用。 如果`MessageGroupProcessor`返回一个`Message`,则只有当`sequenceDetails`与组中的第一条消息匹配时,才会在输出消息上执行一个`MessageBuilder.popSequenceDetails()`。(以前只有在从`MessageGroupProcessor`返回了一个普通有效载荷或`AbstractIntegrationMessageBuilder`时才会这样做。 该功能可以通过一个新的`popSequence``boolean`属性来控制,因此在某些情况下,当标准分割器尚未填充相关细节时,可以禁用`MessageBuilder.popSequenceDetails()`。这个属性基本上撤消了最近的上游`applySequence = true`在`AbstractMessageSplitter`中所做的事情。有关更多信息,请参见[Splitter](./splitter.html#splitter)。 | |`SimpleMessageGroup.getMessages()`方法返回一个`unmodifiableCollection`。
因此,如果你的聚合 POJO 方法有一个`Collection`参数,那么传入的参数就是`Collection`实例,并且,当你对聚合器使用`SimpleMessageStore`时,原始的`Collection`在发布组之后被清除。
因此,POJO 中的`Collection`变量也被清除,如果它被传递出聚合器。
如果你希望简单地按原样发布该集合以进行进一步处理,则必须构建一个新的`Collection`(例如,`new ArrayList(messages)`)。
从版本 4.3 开始,框架不再将消息复制到新的集合,以避免创建不希望的额外对象。| |---|| 如果`MessageGroupProcessor`的`processMessageGroup`方法返回一个集合,则它必须是`Message`对象的集合。在这种情况下,消息是单独发布的。在版本 4.2 之前,不可能通过使用 XML 配置提供`MessageGroupProcessor`。只有 POJO 方法可以用于聚合。现在,如果框架检测到引用的(或内部的) Bean 实现了`MessageProcessor`,则将其用作聚合器的输出处理器。 如果你希望释放来自自定义`MessageGroupProcessor`的对象集合作为消息的有效负载,那么你的类应该扩展`AbstractAggregatingMessageGroupProcessor`并实现`aggregatePayloads()`。 此外,从版本 4.2 开始,还提供了`SimpleMessageGroupProcessor`。它返回来自该组的消息集合,正如前面所指出的,该集合将导致单独发送已发布的消息。 这使得聚合器作为消息屏障工作,在此过程中,到达的消息将被保存,直到发布策略触发,并且将组作为单个消息的序列发布。 ##### `ReleaseStrategy` `ReleaseStrategy`接口定义如下: ``` public interface ReleaseStrategy { boolean canRelease(MessageGroup group); } ``` 通常,任何 POJO 都可以实现完成决策逻辑,如果它提供了一个方法,该方法接受一个`java.util.List`作为参数(也支持参数化的列表)并返回一个布尔值。此方法在每条新消息到达后调用,以决定该组是否完成,如下所示: * 如果参数是`java.util.List`,并且参数类型`T`可分配给`Message`,则将在组中积累的整个消息列表发送到方法。 * 如果参数是非参数化的`java.util.List`,或者参数类型不能分配给`Message`,则该方法接收累积消息的有效负载。 * 如果消息组已准备好进行聚合,则该方法必须返回`true`,否则为 false。 下面的示例显示了如何对`Message`类型的`List`使用`@ReleaseStrategy`注释: ``` public class MyReleaseStrategy { @ReleaseStrategy public boolean canMessagesBeReleased(List>) {...} } ``` 下面的示例显示了如何对`String`类型的`List`使用`@ReleaseStrategy`注释: ``` public class MyReleaseStrategy { @ReleaseStrategy public boolean canMessagesBeReleased(List) {...} } ``` 基于前面两个示例中的签名,基于 POJO 的发布策略将传递一个`Collection`的尚未发布的消息(如果你需要访问整个`Message`)或`Collection`的有效负载对象(如果类型参数不是`Message`)。这满足了大多数用例。但是,如果出于某种原因,需要访问完整的`MessageGroup`,则应该提供`ReleaseStrategy`接口的实现。 | |在处理可能较大的组时,你应该了解如何调用这些方法,因为在组被释放之前,释放策略可能会被多次调用,
最有效的是`ReleaseStrategy`的实现,因为聚合器可以直接调用它。
效率第二高的是带有`Collection>`参数类型的 POJO 方法。
效率最低的是带有`Collection`类型的 POJO 方法。
框架必须将组中的消息中的有效负载复制到一个新的集合中。(并且可能在每次调用释放策略时尝试将有效负载转换为`Something`)。
使用`Collection`可以避免转换,但是仍然需要创建新的`Collection`。

由于这些原因,对于大型组,我们建议你实现`ReleaseStrategy`。| |---|| 当组被释放以进行聚合时,其所有尚未释放的消息都将被处理并从组中删除。如果该组也是完备的(也就是说,如果来自序列的所有消息都已到达,或者如果没有定义序列),则该组被标记为完备的。该组的任何新消息都将发送到丢弃通道(如果已定义)。将`expire-groups-upon-completion`设置为`true`(默认值为`false`)会删除整个组,并且任何新消息(具有与删除的组相同的相关 ID)都会形成新的组。可以通过使用`MessageGroupStoreReaper`和`send-partial-result-on-expiry`一起设置为`true`来释放部分序列。 | |为了便于丢弃延迟到达的消息,聚合器必须在组被释放后保持该组的状态。
这最终可能导致内存不足的情况,
以避免此类情况,你应该考虑配置`MessageGroupStoreReaper`以删除组元数据。
一旦到达某个点,过期消息就不会到达,过期参数应该设置为过期组,
用于配置收割者的信息,参见[聚合器中的管理状态:`MessageGroupStore`]。| |---|| Spring 集成为`ReleaseStrategy`提供了一种实现方式:`SimpleSequenceSizeReleaseStrategy`。该实现会参考每个到达消息的`SEQUENCE_NUMBER`和`SEQUENCE_SIZE`头,以决定消息组何时完成并准备好进行聚合。如前所述,这也是默认的策略。 | |在版本 5.0 之前,默认的发布策略是`SequenceSizeReleaseStrategy`,这在大型组中表现不佳。
使用该策略,会检测并拒绝重复的序列号。
此操作可能很昂贵。| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 如果你要聚合大的群组,你不需要释放部分群组,也不需要检测/拒绝重复的序列,考虑使用`SimpleSequenceSizeReleaseStrategy`代替,它对这些用例来说效率要高得多,并且是自*版本 5.0*未指定部分分组发布时的默认设置。 ##### 聚合大型群组 4.3 版本将`SimpleMessageGroup`中的消息的默认`Collection`更改为`HashSet`(以前是`BlockingQueue`)。当从大的组中删除单个消息(需要 O(n)线性扫描)时,这是昂贵的。尽管散列集通常可以更快地删除,但对于大消息来说,它可能会很昂贵,因为散列集必须在插入和删除两个部分上计算。如果你的消息散列成本很高,请考虑使用其他类型的集合。正如[使用`MessageGroupFactory`](./message-store.html#message-group-factory)中所讨论的,提供了一个`SimpleMessageGroupFactory`,以便你可以选择最适合你的需要的`Collection`。你还可以提供你自己的工厂实现来创建一些其他`Collection>`。 下面的示例展示了如何使用前面的实现和`SimpleSequenceSizeReleaseStrategy`配置聚合器: ``` ``` | |如果过滤器端点涉及聚合器的上游流,序列大小发布策略(固定的或基于`sequenceSize`报头)不会达到其目的,因为来自序列的一些消息可能会被过滤器丢弃,
在这种情况下,建议选择另一种`ReleaseStrategy`,或者使用从丢弃子流发送的补偿消息,在其内容中携带一些信息,以便在自定义的完整组函数中跳过。
有关更多信息,请参见[Filter](./filter.html#filter)。| |---|| ##### 相关策略 `CorrelationStrategy`接口定义如下: ``` public interface CorrelationStrategy { Object getCorrelationKey(Message message); } ``` 该方法返回一个`Object`,表示用于将消息与消息组关联的相关键。对于`equals()`和`hashCode()`的实现,键必须满足用于`Map`中的键的条件。 通常,任何 POJO 都可以实现相关逻辑,并且将消息映射到方法的参数(或多个参数)的规则与`ServiceActivator`的规则相同(包括对`@Header`注释的支持)。方法必须返回一个值,并且该值不能是`null`。 Spring 集成为`CorrelationStrategy`提供了一种实现方式:`HeaderAttributeCorrelationStrategy`。此实现返回一个消息头的值(其名称由构造函数参数指定)作为相关键。默认情况下,相关策略是一个`HeaderAttributeCorrelationStrategy`,它返回`CORRELATION_ID`header 属性的值。如果你有一个自定义的头名称,那么你可以在`HeaderAttributeCorrelationStrategy`的实例上配置它,并将其作为聚合器的相关策略的参考。 ##### 锁定注册表 对组的更改是线程安全的。因此,当你并发地为相同的相关 ID 发送消息时,聚合器中只会处理其中的一个消息,从而使其有效地成为**每个消息组的单线程**。a`LockRegistry`用于获得解析的相关 ID 的锁。默认情况下(内存中)使用`DefaultLockRegistry`。要在使用共享`MessageGroupStore`的服务器上同步更新,必须配置共享锁注册中心。 ##### 避免死锁 如上所述,当消息组发生突变(添加或释放消息)时,将保持锁定。 考虑以下流程: ``` ...->aggregator1-> ... ->aggregator2-> ... ``` 如果有多个线程,**聚合器共享一个共同的锁注册中心**,则可能出现死锁。这将导致线程挂起,`jstack `可能会呈现如下结果: ``` Found one Java-level deadlock: ============================= "t2": waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync), which is held by "t1" "t1": waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync), which is held by "t2" ``` 有几种方法可以避免这个问题: * 确保每个聚合器都有自己的锁注册表(这可以是跨应用程序实例的共享注册表,但流中的两个或多个聚合器必须各自具有不同的注册表) * 使用`ExecutorChannel`或`QueueChannel`作为聚合器的输出通道,以便下游流在新线程上运行 * 从版本 5.1.1 开始,将`releaseLockBeforeSend`聚合器属性设置为`true` | |如果由于某种原因,单个聚合器的输出最终被路由回相同的聚合器,也可能导致此问题。
当然,上面的第一个解决方案在这种情况下不适用。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### 在 Java DSL 中配置聚合器 有关如何在 Java DSL 中配置聚合器,请参见[聚合器和重排序器](./dsl.html#java-dsl-aggregators)。 ##### 使用 XML 配置聚合器 Spring 集成支持通过``元素配置带有 XML 的聚合器。下面的示例展示了聚合器的示例: ``` (24) (25) (26) ``` |**1** |聚合器的 ID 是可选的。| |------|| |**2** |生命周期属性表示是否应该在应用程序上下文启动期间启动聚合器。
可选(默认值为“true”)。| |**3** |聚合器接收消息的通道。
需要。| |**4** |聚合器向其发送聚合结果的通道。
可选的(因为收到的消息本身可以在“replyChannel”消息头中指定一个回复通道)。| |**5** |聚合器向其发送超时消息的通道(如果`send-partial-result-on-expiry`是`false`)。
可选。| |**6** |对`MessageGroupStore`的引用,用于在相关键下存储组消息,直到它们完成。
可选。
默认情况下,它是一个挥发性内存存储。
有关更多信息,请参见[消息存储](./message-store.html#message-store)。| |**7** |当多个句柄订阅相同的`DirectChannel`(用于负载平衡目的)时,此聚合器的顺序可选。| |**8** |表示过期的消息应该被聚合起来。并在其包含的`MessageGroup`过期后发送到’output-channel’或’replychannel’(见[`MessageGroupStore.expireMessageGroups(long)`](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/store/messagrestore.html#ExpireMessageGroups-long))。`MessageGroup`到期的一种方法是通过配置.896r=“>过期,你也可以选择过期。但是通过调用>>>>>>通过控制总线操作,或者,如果你有对`MessageGroupStore`实例的引用,则通过调用`expireMessageGroups(timeout)`。
本身,这个属性不起任何作用。
它只是一个指示,指示是否丢弃或发送到输出或回复通道中仍然在`MessageGroup`中的任何消息它即将过期。
可选(默认值为`false`)。
注意:这个属性可以更正确地被称为`send-partial-result-on-timeout`,因为如果`expire-groups-upon-timeout`设置为`false`,则该组实际上可能不会过期。| |**9** |当向`output-channel`或`discard-channel`发送回复`Message`时,等待的超时间隔将默认为
,这将导致无限期地阻塞。
只有在输出通道具有某些“发送”限制的情况下,才会应用该超时间隔,例如带有固定’capacity’的`QueueChannel`。
在这种情况下,抛出一个`MessageDeliveryException`。
对于`AbstractSubscribableChannel`实现,
被忽略。
对于`group-timeout(-expression)`,来自计划过期任务的`MessageDeliveryException`将导致该任务被重新安排。。| |**10**|对实现消息相关性(分组)算法的 Bean 的引用。
Bean 可以是`CorrelationStrategy`接口的实现。
在后一种情况下,`correlation-strategy-method`属性也必须被定义。
可选(默认情况下,聚合器使用`IntegrationMessageHeaderAccessor.CORRELATION_ID`头)。| |**11**|在 Bean 上定义的一个方法由`correlation-strategy`引用。
它实现了相关决策算法。
可选的,带有限制(`correlation-strategy`必须存在)。| |**12**|表示相关策略的 SPEL 表达式。
示例:`"headers['something']"`。
只允许`correlation-strategy`或`correlation-strategy-expression`中的一个。| |**13**|在应用程序上下文中定义了对 Bean 的引用。该 Bean 必须实现聚合逻辑,如前面所述。可选的(默认情况下,聚合消息的列表成为输出消息的有效负载)。| |**14**|在 Bean 上定义的一种方法由`ref`属性引用。
它实现了消息聚合算法。
可选(它取决于`ref`属性正在被定义)。| |**15**|对实现发布策略的 Bean 的引用。
Bean 可以是`ReleaseStrategy`接口的实现或 POJO。
在后一种情况下,还必须定义`release-strategy-method`属性。
可选(默认情况下,聚合器使用`IntegrationMessageHeaderAccessor.SEQUENCE_SIZE`header 属性)。| |**16**|在 Bean 上定义了由`release-strategy`属性引用的方法。
它实现了完成决策算法。
可选的,带有限制(`release-strategy`必须存在)。| |**17**|表示发布策略的 SPEL 表达式。
表达式的根对象是`MessageGroup`。
示例:`"size() == 5"`。
只允许`release-strategy`或`release-strategy-expression`中的一个。| |**18**|当设置为`true`(默认值为`false`)时,完成的组将从消息存储中删除,让具有相同相关性的后续消息形成新的组。
默认的行为是将具有与完成的组相同相关性的消息发送到
。| |**19**|仅当`MessageGroupStoreReaper`被配置为``中的`MessageStore`时才适用。默认情况下,当
被配置为使部分组过期时,空组也会被删除。
空组在正常释放一个组之后存在。
空组允许检测和丢弃延迟到达的消息。
如果你希望在更长的时间安排下使空组过期,而不是使部分组过期,设置此属性。
空组不会从`MessageStore`中删除,直到它们至少在此毫秒内未被修改为止,
注意,空组的实际过期时间也会受到收割者的`timeout`属性的影响,它可能是这个值加上超时。| |**20**|一个对`org.springframework.integration.util.LockRegistry` Bean 的引用。
它用于基于`groupId`获得一个`Lock`用于在`MessageGroup`上的并发操作。
默认情况下,使用一个内部`DefaultLockRegistry`。使用一个分布式`LockRegistry`,如
,确保只有一个聚合器实例可以同时在一个组上操作。
有关更多信息,请参见[Redis 锁注册表](./redis.html#redis-lock-registry),[Gemfire Lock 注册表](./gemfire.html#gemfire-lock-registry)和[动物园管理员锁定注册表](./zookeeper.html#zk-lock-registry)。| |**21**|当`ReleaseStrategy`未在当前消息到达时发布组时,强制`MessageGroup`完成的超时(以毫秒为单位)。
此属性提供了一个内置的基于时间的发布策略对于聚合器,当需要发出部分结果(或放弃该组)时如果`MessageGroup`的新消息在从最后一条消息到达时算起的超时内没有到达。
要设置一个超时,该超时从创建`MessageGroup`时开始计算,请参见`group-timeout-expression`信息。
当新消息到达聚合器时,如果`ReleaseStrategy`返回`false`(意为不发布)和`groupTimeout > 0`,则取消其
,一个新的任务被安排在组中过期。
我们不建议将此属性设置为零(或负值)。
这样做会有效地禁用聚合器,因为每个消息组都会立即完成。`Advice`但是,你可以,使用表达式有条件地将其设置为零(或负值)。
有关信息,请参见`group-timeout-expression`。
完成过程中采取的操作取决于`ReleaseStrategy`和`send-partial-group-on-expiry`属性。[聚合器和组超时](#agg-and-group-to)有关更多信息,请参见
与“group-timeout-expression”属性互斥。| |**22**|将`#root`计算为`groupTimeout`并将`MessageGroup`作为计算上下文对象的 spel 表达式。
用于将`MessageGroup`调度为强制完成。
如果表达式计算为`null`,则不计划完成。
如果计算为零,组立即在当前线程上完成。
实际上,这提供了一个动态的`group-timeout`属性。
作为示例,如果你希望在创建组的时间已经过去 10 秒后强制完成`MessageGroup`,则可以考虑使用以下 SPEL 表达式:`timestamp + 10000 - T(System).currentTimeMillis()`其中`timestamp`由`MessageGroup.getTimestamp()`提供由于`MessageGroup`这里是`#root`求值上下文对象。
但是请记住,组创建时间可能与第一次到达消息的时间不同,这取决于其他组过期属性的配置。
有关更多信息,请参见`group-timeout`。
与’group-timeout’属性互斥。| |**23**|当由于超时(或通过`MessageGroupStoreReaper`)而完成一个组时,默认情况下,该组已过期(完全删除)。
延迟到达的消息将启动一个新的组。
你可以将其设置为`false`以完成该组,但其元数据保持不变稍后可以使用
和`empty-group-min-timeout`属性一起使用空组过期。
它默认为“true”。| |**24**|一个`TaskScheduler` Bean 引用,如果`MessageGroup`内的`MessageGroup`没有新消息到达,则将`MessageGroup`调度为强制完成
,如果没有提供,使用在`ApplicationContext`(`ThreadPoolTaskScheduler`)中注册的默认调度器(`taskScheduler`)。
如果未指定`group-timeout`或`group-timeout-expression`,则该属性不适用。| |**25**|由于版本 4.1.
,它允许为`forceComplete`操作启动事务。
它是从`group-timeout(-expression)`或由`MessageGroupStoreReaper`发起的,并且不应用于正常的`add`、`release`和`discard`操作。
只允许此子元素或
。| |**26**|由于*版本 4.1*.
它允许为`forceComplete`操作配置任何`Advice`。
它是从`group-timeout(-expression)`或由发起的,并且不应用于正常的,`release`,并且`discard`操作。
只允许这个子元素或`expireGroupsUponCompletion`。
还可以在这里通过使用 Spring `tx`命名空间来配置事务`Advice`。| | |Expiring Groups

There are two attributes related to expiring (completely removing) groups.
When a group is expired, there is no record of it, and, if a new message arrives with the same correlation, a new group is started.
When a group is completed (without expiry), the empty group remains and late-arriving messages are discarded.
Empty groups can be removed later by using a `MessageGroupStoreReaper` in combination with the `empty-group-min-timeout` attribute.

`expire-groups-upon-completion` relates to “normal” completion when the `ReleaseStrategy` releases the group.
This defaults to `false`.

If a group is not completed normally but is released or discarded because of a timeout, the group is normally expired.
Since version 4.1, you can control this behavior by using `expire-groups-upon-timeout`.
It defaults to `true` for backwards compatibility.

| |When a group is timed out, the `ReleaseStrategy` is given one more opportunity to release the group.
If it does so and `expire-groups-upon-timeout` is false, expiration is controlled by `expire-groups-upon-completion`.
If the group is not released by the release strategy during timeout, then the expiration is controlled by the `expire-groups-upon-timeout`.
Timed-out groups are either discarded or a partial release occurs (based on `send-partial-result-on-expiry`).|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

自版本 5.0 以来,空组也被安排在`empty-group-min-timeout`之后被删除。
如果`expireGroupsUponCompletion == false`和`minimumTimeoutForEmptyGroups > 0`,则在正常或部分序列释放发生时将排除组的任务进行调度。

从版本 5.4 开始,可以将聚合器(和 ReSequencer)配置为使孤立的组(在持久消息存储中的组,否则可能不会被释放)过期。
`expireTimeout`(如果大于`0`)表示存储中比这个值更早的组应该被清除,
在启动时调用`purgeOrphanedGroups()`方法,以及提供的`expireDuration`,在计划的任务中定期执行。
此方法是也可以在任何时候调用外部。
根据上面提到的提供的到期选项,将到期逻辑完全委托给`forceComplete(MessageGroup)`功能。
这样的周期性清除功能是有用的当需要从那些不再使用常规消息到达逻辑发布的旧组中清理消息存储时,
在大多数情况下,这发生在应用程序重新启动之后,当使用持久的消息组存储时,
的功能类似于带有计划任务的`group-timeout-expression`,但是提供了一种方便的方式来处理特定组件中的旧组,当使用组超时而不是收割者时。
`MessageGroupStore`必须专门为当前相关端点提供
,否则一个聚合器可能会从另一个聚合器中清除组。
有了聚合器,使用此技术过期的组将被丢弃或作为部分组释放,取决于`expireGroupsUponCompletion`属性。| |---|| | |当一个组被超时时时,`ReleaseStrategy`被给予一个更多的机会来释放该组。
如果它这样做并且`expire-groups-upon-timeout`是假的,则过期由`expire-groups-upon-completion`控制。
如果该组在超时期间没有通过释放策略被释放,然后过期由`expire-groups-upon-timeout`控制。
超时组要么被丢弃,要么发生部分释放(基于`send-partial-result-on-expiry`)。| 如果在其他``定义中可以引用自定义聚合器处理程序实现,我们通常建议使用`ref`属性。但是,如果自定义聚合器实现仅由``的单个定义使用,则可以使用内部 Bean 定义(从版本 1.0.3 开始)在``元素中配置聚合 POJO,如下例所示: ``` ``` | |在相同的``配置中使用`ref`属性和内部 Bean 定义是不允许的,因为它创建了一个模棱两可的条件。``在这种情况下,将抛出一个异常。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 下面的示例展示了聚合器 Bean 的一个实现: ``` public class PojoAggregator { public Long add(List results) { long total = 0l; for (long partialResult: results) { total += partialResult; } return total; } } ``` 对于上述示例,完成策略 Bean 的实现方式可能如下: ``` public class PojoReleaseStrategy { ... public boolean canRelease(List numbers) { int sum = 0; for (long number: numbers) { sum += number; } return sum >= maxValue; } } ``` | |Bean 只要这样做是有意义的,发布策略方法和聚合器方法就可以合并为一个单独的方法。| |---|---------------------------------------------------------------------------------------------------------------------------| Bean 用于上述示例的关联策略的实现方式可以如下: ``` public class PojoCorrelationStrategy { ... public Long groupNumbersByLastDigit(Long number) { return number % 10; } } ``` 在前面的示例中,聚合器将按照某种标准(在这种情况下,是除以 10 后的剩余部分)对数字进行分组,并保持分组,直到有效负载提供的数字之和超过某个值。 | |只要这样做是有意义的,释放策略方法、相关策略方法和聚合器方法就可以合并为一个 Bean。
(实际上,它们中的所有或任意两个都可以合并。)| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ###### 聚合器和 Spring 表达式语言 Spring Integration2.0 以来,你可以使用[SpEL](https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#expressions)处理各种策略(相关、发布和聚合),如果这样的发布策略背后的逻辑相对简单,我们推荐使用这种策略。假设你有一个设计用于接收对象数组的遗留组件。我们知道,默认的发布策略在`List`中组装了所有聚合的消息。现在我们有两个问题。首先,我们需要从列表中提取单个消息。其次,我们需要提取每个消息的有效负载并组装对象数组。下面的示例解决了这两个问题: ``` public String[] processRelease(List> messages){ List stringList = new ArrayList(); for (Message message : messages) { stringList.add(message.getPayload()); } return stringList.toArray(new String[]{}); } ``` 然而,对于 SPEL,这样的需求实际上可以通过单行表达式相对容易地处理,从而避免编写自定义类并将其配置为 Bean。下面的示例展示了如何做到这一点: ``` ``` 在前面的配置中,我们使用[集合投影](https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#expressions)表达式从列表中所有消息的有效负载中组装一个新的集合,然后将其转换为一个数组,从而获得与早期 Java 代码相同的结果。 在处理自定义发布和相关策略时,可以应用相同的基于表达式的方法。 与在`AbstractCorrelatingMessageHandler`属性中为自定义`CorrelationStrategy`定义 Bean 不同,你可以将简单的相关逻辑实现为 SPEL 表达式,并在`correlation-strategy-expression`属性中对其进行配置,如下例所示: ``` correlation-strategy-expression="payload.person.id" ``` 在前面的示例中,我们假设有效负载具有`person`属性和`id`属性,该属性将用于关联消息。 同样,对于`ReleaseStrategy`,你可以将你的发布逻辑实现为 SPEL 表达式,并在`MessageGroupStoreReaper`属性中对其进行配置。求值上下文的根对象是`MessageGroup`本身。可以通过在表达式中使用组的`message`属性来引用消息的`List`。 | |在版本 5.0 之前的版本中,根对象是`Message`的集合,如前面的示例所示:| |---|--------------------------------------------------------------------------------------------------------------------| ``` release-strategy-expression="!messages.?[payload==5].empty" ``` 在前面的示例中,SPEL 求值上下文的根对象是`MessageGroup`本身,并且你正在声明,一旦在这个组中出现有效负载`5`的消息,就应该释放该组。 ###### 聚合器和组超时 从版本 4.0 开始,引入了两个新的互斥属性:`group-timeout`和`group-timeout-expression`。见[使用 XML 配置聚合器](#aggregator-xml)。在某些情况下,如果`ReleaseStrategy`在当前消息到达时未释放,则可能需要在超时后发出聚合器结果(或丢弃该组)。为此,`groupTimeout`选项允许强制完成调度`MessageGroup`,如下例所示: ``` ``` 在此示例中,如果聚合器按照`release-strategy-expression`定义的顺序接收最后一条消息,则正常的发布是可能的。如果该特定消息未到达,`groupTimeout`将强制该组在 10 秒后完成,只要该组至少包含两条消息。 强迫分组完成的结果取决于`ReleaseStrategy`和`send-partial-result-on-expiry`。首先,再次咨询发布策略,以确定是否要进行正常发布。虽然组没有改变,但`ReleaseStrategy`可以决定在此时释放该组。如果发布策略仍然没有发布组,则该策略已过期。如果`send-partial-result-on-expiry`是`true`,则(部分)`MessageGroup`中的现有消息将作为普通聚合器的回复消息发布到`output-channel`。否则,它就会被抛弃。 在`groupTimeout`行为和`MessageGroupStoreReaper`行为之间存在差异(参见[使用 XML 配置聚合器](#aggregator-xml))。收割者周期性地对`MessageGroupStore`中的所有`MessageGroup`s 启动强制完成。如果在`groupTimeout`期间没有收到新消息,则`groupTimeout`对每个`MessageGroup`单独执行。同样,Reaper 可以用于删除空组(如果`expire-groups-upon-completion`为假,则保留空组以丢弃过期消息)。 从版本 5.5 开始,`groupTimeoutExpression`可以计算为`java.util.Date`实例。这在以下情况下很有用:基于组创建时间(`MessageGroup.getTimestamp()`)确定计划的任务时刻,而不是当`groupTimeoutExpression`被计算为`long`时计算当前消息到达: ``` group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null" ``` ##### 配置带有注释的聚合器 下面的示例展示了一个配置了注释的聚合器: ``` public class Waiter { ... @Aggregator (1) public Delivery aggregatingMethod(List items) { ... } @ReleaseStrategy (2) public boolean releaseChecker(List> messages) { ... } @CorrelationStrategy (3) public String correlateBy(OrderItem item) { ... } } ``` |**1**|指示此方法应用作聚合器的注释。
如果将此类用作聚合器,则必须指定它。| |-----|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |**2**|表示此方法被用作聚合器的发布策略的注释。
如果不存在于任何方法上,则聚合器使用`SimpleSequenceSizeReleaseStrategy`。| |**3**|表示此方法应用作聚合器的相关策略的注释。
如果没有指示相关策略,则聚合器使用基于`HeaderAttributeCorrelationStrategy`的`HeaderAttributeCorrelationStrategy`。| XML 元素提供的所有配置选项也可用于`@Aggregator`注释。 可以从 XML 显式引用聚合器,或者,如果在类上定义了`@MessageEndpoint`,则可以通过 Classpath 扫描自动检测聚合器。 聚合器组件的注释配置(`@Aggregator`和其他)仅涵盖简单的用例,其中大多数默认选项就足够了。如果在使用注释配置时需要更多地控制这些选项,请考虑使用`@Bean`的`AggregatingMessageHandler`定义,并将其`@Bean`方法标记为`@ServiceActivator`,如下例所示: ``` @ServiceActivator(inputChannel = "aggregatorChannel") @Bean public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) { AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(), jdbcMessageGroupStore); aggregator.setOutputChannel(resultsChannel()); aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L)); aggregator.setTaskScheduler(this.taskScheduler); return aggregator; } ``` 有关更多信息,请参见[程序设计模型](#aggregator-api)和[关于`@Bean`方法的注释]。 | |从版本 4.2 开始,`AggregatorFactoryBean`可用于简化`AggregatingMessageHandler`的 Java 配置。| |---|---------------------------------------------------------------------------------------------------------------------------------------| #### 在聚合器中管理状态:`MessageGroupStore` 聚合器(以及 Spring Integration 中的一些其他模式)是一种有状态的模式,该模式要求基于一组在一段时间内到达的消息做出决策,所有这些消息都具有相同的相关键。在有状态模式(例如`ReleaseStrategy`)中,接口的设计是由以下原则驱动的:组件(无论是由框架还是由用户定义的)应该能够保持无状态。所有状态都由`MessageGroup`承载,其管理被委托给`MessageGroupStore`。`MessageGroupStore`接口定义如下: ``` public interface MessageGroupStore { int getMessageCountForAllMessageGroups(); int getMarkedMessageCountForAllMessageGroups(); int getMessageGroupCount(); MessageGroup getMessageGroup(Object groupId); MessageGroup addMessageToGroup(Object groupId, Message message); MessageGroup markMessageGroup(MessageGroup group); MessageGroup removeMessageFromGroup(Object key, Message messageToRemove); MessageGroup markMessageFromGroup(Object key, Message messageToMark); void removeMessageGroup(Object groupId); void registerMessageGroupExpiryCallback(MessageGroupCallback callback); int expireMessageGroups(long timeout); } ``` 有关更多信息,请参见[Javadoc](https://docs.spring.io/spring-integration/api/org/springframework/integration/store/MessageGroupStore.html)。 在等待触发发布策略时,`MessageGroupStore`在`MessageGroups`中积累状态信息,而该事件可能永远不会发生。因此,为了防止陈旧的消息持续存在,并为 Volatile Stores 提供一个钩子,用于在应用程序关闭时进行清理,`MessageGroupStore`允许你注册回调,以便在它们过期时应用到它的`MessageGroups`。该界面非常简单,如下表所示: ``` public interface MessageGroupCallback { void execute(MessageGroupStore messageGroupStore, MessageGroup group); } ``` 回调可以直接访问存储和消息组,这样它就可以管理持久状态(例如,通过从存储中完全删除组)。 `MessageGroupStore`维护这些回调的列表,并按需将其应用于所有时间戳早于作为参数提供的时间的消息(请参见前面描述的`AbstractCorrelatingMessageHandler`和`expireMessageGroups(..)`方法)。 | |在不同的聚合器组件中,不要使用相同的`MessageGroupStore`实例,当你打算依赖`MessageGroupStore`功能时。
每个`AbstractCorrelatingMessageHandler`都基于`forceComplete()`回调来注册自己的`MessageGroupCallback`。
这样,每个过期的组都可能被错误的聚合器完成或丢弃。,
从 5.0.10 版本开始,在`MessageGroupStore`中,从`AbstractCorrelatingMessageHandler`中使用`UniqueExpiryCallback`用于注册回调。
`MessageGroupStore`中的`MessageGroupStore`,检查该类的实例是否存在,并用适当的消息记录错误如果一个已经存在于回调集中。
这样,框架不允许在不同的聚合器/重序器中使用`MessageGroupStore`实例,以避免上述的副作用,即过期的组不是由特定的相关处理程序创建的。| |---|| 你可以使用超时值调用`expireMessageGroups`方法。任何大于当前时间减去该值的消息都将过期并应用回调。因此,是存储的用户定义了消息组“过期”的含义。 Spring 为方便用户,集成以`MessageGroupStoreReaper`的形式为消息过期提供了包装,如下例所示: ``` ``` 收割者是 a`Runnable`。在前面的示例中,消息组存储的 expire 方法每十秒被调用一次。超时本身是 30 秒。 | |重要的是要理解`MessageGroupStoreReaper`的“timeout”属性是一个近似值,并且受任务计划程序的速率的影响,因为该属性仅在`MessageGroupStoreReaper`任务的下一个计划执行时检查。
例如,如果超时时间设置为 10 分钟,但`MessageGroupStoreReaper`任务被安排每小时运行一次,并且`MessageGroupStoreReaper`任务的最后一次执行发生在超时前一分钟,则`MessageGroup`在接下来的 59 分钟内不会过期。,因此,我们建议将速率设置为至少等于超时的值或更短。| |---|| 除了 Reaper 之外,当应用程序在`AbstractCorrelatingMessageHandler`中的生命周期回调中关闭时,还会调用到期回调。 `AbstractCorrelatingMessageHandler`注册了自己的到期回调,这是聚合器的 XML 配置中带有布尔标志`send-partial-result-on-expiry`的链接。如果将标志设置为`true`,则在调用到期回调时,可以将尚未发布的组中的任何未标记消息发送到输出通道。 | |由于`MessageGroupStoreReaper`是从计划的任务调用的,并且可能导致向下游集成流产生消息(取决于`sendPartialResultOnExpiry`选项),建议通过`errorChannel`向处理程序异常提供带有`MessagePublishingErrorHandler`的自定义`TaskScheduler`,
相同的逻辑也适用于组超时功能,它也依赖于`TaskScheduler`。
有关更多信息,请参见`AbstractCorrelatingMessageHandler`。| |---|| | |当共享的`MessageStore`用于不同的相关端点时,必须配置适当的`CorrelationStrategy`以确保组 ID 的唯一性。
否则,当一个相关端点从其他端点释放或过期消息时,可能会发生意外行为。
具有相同相关键的消息存储在相同的消息组中。

某些`MessageStore`实现允许使用相同的物理资源,通过对数据进行分区。例如,
,`JdbcMessageStore`具有`region`属性,而`MongoDbMessageStore`具有`collectionName`属性。`MessageGroup`
有关`MessageStore`接口及其实现的更多信息,请参见[消息存储](./message-store.html#message-store)。| |---|| #### 通量聚合器 在版本 5.2 中,引入了`FluxAggregatorMessageHandler`组件。它是基于项目反应器`Flux.groupBy()`和`Flux.window()`的运营商。传入消息被发送到由该组件的构造函数中的`Flux.create()`发起的`FluxSink`中。如果不提供`outputChannel`或者它不是`ReactiveStreamsSubscribableChannel`的实例,则订阅主`Flux`是从`Lifecycle.start()`实现完成的。否则它将被推迟到由`ReactiveStreamsSubscribableChannel`实现所完成的订阅。消息由`Flux.groupBy()`分组,使用用于组键的`CorrelationStrategy`。默认情况下,查询消息的`IntegrationMessageHeaderAccessor.CORRELATION_ID`头。 默认情况下,每个关闭的窗口都会以`Flux`的有效载荷释放要产生的消息。这条消息包含了窗口中第一条消息的所有标题。这`Flux`在输出消息中的有效负载必须被订阅并向下游处理。这样的逻辑可以被`FluxAggregatorMessageHandler`的`setCombineFunction(Function>, Mono>>)`配置选项定制(或取代)。例如,如果我们希望在最终消息中有`List`的有效负载,那么我们可以这样配置`Flux.collectList()`: ``` fluxAggregatorMessageHandler.setCombineFunction( (messageFlux) -> messageFlux .map(Message::getPayload) .collectList() .map(GenericMessage::new)); ``` 在`FluxAggregatorMessageHandler`中有几个选项可用于选择适当的窗口策略: * `setBoundaryTrigger(Predicate>)`-被传播到`Flux.windowUntil()`运算符。有关更多信息,请参见其 Javadocs。优先于所有其他窗口选项。 * `setWindowSize(int)`和`setWindowSizeFunction(Function, Integer>)`-被传播到`Flux.window(int)`或`windowTimeout(int, Duration)`。默认情况下,窗口大小是从组中的第一条消息及其`IntegrationMessageHeaderAccessor.SEQUENCE_SIZE`头计算出来的。 * `setWindowTimespan(Duration)`-被传播到`Flux.window(Duration)`或`windowTimeout(int, Duration)`取决于窗口大小配置。 * `setWindowConfigurer(Function>, Flux>>>)`-一个函数,用于将转换应用到分组通量中,以实现任何未包含在公开选项中的自定义窗口操作。 由于该组件是`MessageHandler`实现,因此它可以简单地用作`@Bean`定义以及`@ServiceActivator`消息注释。对于 Java DSL,可以使用`.handle()`EIP-method。下面的示例演示了如何在运行时注册`IntegrationFlow`,以及如何将`FluxAggregatorMessageHandler`与上游拆分器关联: ``` IntegrationFlow fluxFlow = (flow) -> flow .split() .channel(MessageChannels.flux()) .handle(new FluxAggregatorMessageHandler()); IntegrationFlowContext.IntegrationFlowRegistration registration = this.integrationFlowContext.registration(fluxFlow) .register(); @SuppressWarnings("unchecked") Flux> window = registration.getMessagingTemplate() .convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class); ``` #### 消息组上的`FluxAggregatorMessageHandler`条件 从版本 5.5 开始,`AbstractCorrelatingMessageHandler`(包括其 Java&XML DSL)公开了`groupConditionSupplier`实现的`groupConditionSupplier`选项。此函数用于添加到组中的每个消息,并将结果条件语句存储到组中以供将来考虑。`ReleaseStrategy`可能会参考这个条件,而不是对组中的所有消息进行迭代。有关更多信息,请参见`GroupConditionProvider`Javadocs 和[消息组条件](./message-store.html#message-group-condition)。 另见[文件聚合器](./file.html#file-aggregator)。 ### 重序器 重测序器与聚合器相关,但具有不同的目的。当聚合器合并消息时,重排序程序在不更改消息的情况下传递消息。 #### 功能 重序器的工作方式与聚合器类似,它使用`CORRELATION_ID`将消息分组存储。不同之处在于,重排序程序不会以任何方式处理消息。相反,它以`SEQUENCE_NUMBER`标头值的顺序释放它们。 关于这一点,你可以 OPT 一次发布所有消息(在整个序列之后,根据`false`和其他可能性),或者一旦有效的序列可用。(我们将在本章后面介绍我们所说的“有效序列”的含义。 | |重序器的目的是对具有较小间隙的相对较短的消息序列进行重新排序。
如果你有大量具有许多间隙的不相交序列,那么你可能会遇到性能问题。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### 配置重排序程序 有关在 Java DSL 中配置重排序程序,请参见[聚合器和重排序器](./dsl.html#java-dsl-aggregators)。 配置重排序程序只需要在 XML 中包含适当的元素。 下面的示例展示了一个重排序程序配置: ``` (19) expire-group-upon-timeout="false" /> (20) ``` |**1** |重新排序器的 ID 是可选的。| |------|| |**2** |重新排序程序的输入通道。
需要。| |**3** |重新排序程序向其发送重新排序消息的通道。
可选的。| |**4** |重序器向其发送超时消息的通道(如果`send-partial-result-on-timeout`设置为`false`)。
可选。| |**5** |是在有序序列可用时立即发送,还是仅在整个消息组到达后才发送。
可选。
(默认值为`false`。)| |**6** |对`MessageGroupStore`的引用,该引用可用于在相关键下存储组消息,直到它们完成。
可选的。
(默认情况下是易失性内存存储。)| |**7** |是否应该在该组到期后将已排序的组发送出去(即使某些消息丢失了)。
可选的。
(默认为 false。)
请参阅[aggregator 中的管理状态:`MessageGroupStore`]。| |**8** |向`output-channel`或`discard-channel`发送回复`Message`时要等待的超时间隔。
默认为`-1`,这将无限期地阻塞。
只有当输出通道具有某些“发送”限制时,才会应用该超时,例如具有固定’容量的`QueueChannel`。在这种情况下,
,抛出了`MessageDeliveryException`。
对于`AbstractSubscribableChannel`实现,忽略`send-timeout`。
对于`JdbcMessageStore`,来自计划过期任务的`MessageDeliveryException`导致重新安排此任务。
可选。| |**9** |对实现消息相关性(分组)算法的 Bean 的引用。
Bean 可以是`CorrelationStrategy`接口的实现或 POJO。
在后一种情况下,还必须定义`correlation-strategy-method`属性。
可选.
(默认情况下,聚合器使用`IntegrationMessageHeaderAccessor.CORRELATION_ID`标题。)| |**10**|在 Bean 上定义的一种方法是由`correlation-strategy`引用的,并且它实现了相关决策算法。
可选的,带有限制(需要`correlation-strategy`才能存在)。| |**11**|表示相关策略的 SPEL 表达式。
示例:`"headers['something']"`。
只允许`correlation-strategy`或`correlation-strategy-expression`中的一个。| |**12**|对实现发布策略的 Bean 的引用。
Bean 可以是`ReleaseStrategy`接口的实现或 POJO。
在后一种情况下,`release-strategy-method`属性也必须定义。
可选(默认情况下,聚合器将使用`IntegrationMessageHeaderAccessor.SEQUENCE_SIZE`header 属性)。| |**13**|在由`release-strategy`引用的 Bean 上定义的一种方法,并且它实现了完成决策算法。
可选的,带有限制(需要`release-strategy`才能存在)。| |**14**|表示发布策略的 SPEL 表达式。
表达式的根对象是`MessageGroup`。
示例:`"size() == 5"`。
只允许`release-strategy`或`release-strategy-expression`中的一个。| |**15**|仅当`MessageGroupStoreReaper`被配置为```MessageStore`时才适用。默认情况下,当
被配置为使部分组过期时,空组也会被删除。
空组在正常释放一个组之后存在。
这是为了能够检测和丢弃延迟到达的消息。
如果你希望在比过期部分组更长的时间表上过期空组,设置此属性。
空组不会从`MessageStore`中删除,直到它们至少在这个毫秒数内未被修改为止,
注意,空组的实际过期时间也会受到收割者超时属性的影响,它可能是这个值加上超时。| |**16**|见[使用 XML 配置聚合器](./aggregator.html#aggregator-xml)。| |**17**|见[使用 XML 配置聚合器](./aggregator.html#aggregator-xml)。| |**18**|见[使用 XML 配置聚合器](./aggregator.html#aggregator-xml)。| |**19**|见[使用 XML 配置聚合器](./aggregator.html#aggregator-xml)。| |**20**|默认情况下,当一个组由于超时(或通过`MessageGroupStoreReaper`)而完成时,将保留空组的元数据。
延迟到达的消息将被立即丢弃。
将此设置为`true`以完全删除该组。
然后,到达较晚的消息会启动一个新组,并且直到该组再次超时才会被丢弃。
由于序列范围中的“漏洞”,新组永远不会正常释放这导致了超时。
空组可以在以后通过使用`MessageGroupStoreReaper`和`empty-group-min-timeout`属性一起过期(完全删除)。
从版本 5.0 开始,空组也计划在`empty-group-min-timeout`经过之后删除。
默认为“false”。| 有关更多信息,请参见[聚合器到期的组](./aggregator.html#aggregator-expiring-groups)。 | |由于在 爪哇 类中没有针对重序列器实现的自定义行为,因此没有对它的注释支持。| |---|----------------------------------------------------------------------------------------------------------------------------| ### 消息处理程序链 `MessageHandlerChain`是`MessageHandler`的一种实现,它可以被配置为单个消息端点,同时实际上将其委托给一系列其他处理程序,例如过滤器、转换器、拆分器等。当多个处理程序需要以固定的线性级数连接时,这可能会导致更简单的配置。例如,在提供变压器之前提供其他组件是相当常见的。类似地,当你在链中的其他组件之前提供一个过滤器时,实质上创建了[选择性消费者](https://www.enterpriseintegrationpatterns.com/MessageSelector.html)。在这两种情况下,链只需要一个`input-channel`和一个`output-channel`,从而无需为每个单独的组件定义通道。 | |`MessageHandlerChain`主要是为 XML 配置而设计的。
对于 Java DSL,`IntegrationFlow`定义可以被视为链组件,但它与下面这一章中描述的概念和原则无关。
参见[Java DSL](./dsl.html#java-dsl)获取更多信息。| |---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | |Spring Integration 的`Filter`提供了一个布尔属性:`throwExceptionOnRejection`。
当你在相同的点对点通道上提供具有不同接受条件的多个选择性消费者时,你应该将该值设置为“true”(默认值为`false`),以便调度器知道消息已被拒绝,因此,尝试将消息传递给其他订阅服务器。
如果没有引发异常,在调度器看来,消息已被成功传递,即使过滤器已删除消息以阻止进一步处理,
如果你确实想“删除”这些消息,过滤器的“丢弃通道”可能会很有用,因为它确实为你提供了一个机会来对丢弃的消息执行某些操作(例如将其发送到 JMS 队列或将其写入日志)。| |---|| 处理程序链简化了配置,同时在内部保持了组件之间相同程度的松耦合,并且如果在某个时刻需要非线性配置,则修改配置是很简单的。 在内部,链被扩展为列出的端点的线性设置,由匿名通道分隔。在链中没有考虑到应答通道头。只有在调用最后一个处理程序之后,才会将结果消息转发到应答通道或链的输出通道。由于这种设置,除最后一个以外的所有处理程序都必须实现`MessageProducer`接口(它提供了一个“setoutputchannel()”方法)。如果设置了`MessageHandlerChain`上的`outputChannel`,则最后一个处理程序只需要一个输出通道。 | |与其他端点一样,`output-channel`是可选的。
如果在链尾有一个回复消息,则输出通道优先。
但是,如果它不可用,则链处理程序将检查入站消息上的回复通道头作为后备。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 在大多数情况下,你不需要自己实现`MessageHandler`。下一节将重点介绍对 Chain 元素的命名空间支持。 Spring 大多数集成端点,例如服务激活器和转换器,适合在`MessageHandlerChain`内使用。 #### 配置链 ``元素提供了一个`input-channel`属性。如果链中的最后一个元素能够生成回复消息(可选的),那么它还支持`output-channel`属性。然后,子元素是过滤器、变压器、分离器和服务激活器。最后一个元素也可以是路由器或出站通道适配器。下面的示例展示了一个链定义: ``` ``` 前面示例中使用的``元素在消息上设置了一个名为`thing1`的消息头,其值为`thing2`。Header Enricher 是`Transformer`的专门化,它只涉及标头值。你可以通过实现一个`MessageHandler`来获得相同的结果,它对头进行了修改,并将其连接为 Bean,但是 header-enricher 是一个更简单的选项。 可以将``配置为消息流的最后一个“闭盒”消费者。对于这个解决方案,可以将其放在 \的末尾,有些 \,如下例所示: ``` ``` | |不允许的属性和元素

某些属性,例如`order`和`input-channel`不允许在链内使用的组件上指定。
对于 poller 子元素也是如此。
对于 Spring 集成核心组件,
,但是,对于非核心组件或你自己的定制组件,这些约束是由 XML 名称空间解析器强制执行的,不是通过 XML 模式。

这些 XML 名称空间解析器约束是在 Spring Integration2.2 中添加的。
如果尝试使用不允许的属性和元素,XML 名称空间解析器将抛出`BeanDefinitionParsingException`。| |---|| #### 使用’id’属性 从 Spring Integration3.0 开始,如果给一个 chain 元素一个`id`属性,则该元素的 Bean 名称是链的`id`和元素本身的`id`的组合。没有`id`属性的元素不被注册为 bean,但是每个元素都被赋予了一个`componentName`,其中包含了链`id`。考虑以下示例: ``` ``` 在前面的示例中: * 根元素的``有一个’SomethingChain’的`id`。因此,`AbstractEndpoint`实现(`PollingConsumer`或`EventDrivenConsumer`,取决于`input-channel`类型) Bean 将该值作为其 Bean 名称。 * `MessageHandlerChain` Bean 获得一个 Bean 别名(’somethingchain.handler’),该别名允许从`BeanFactory`直接访问该别名 Bean。 * ``不是一个成熟的消息传递端点(它不是`PollingConsumer`或`EventDrivenConsumer`)。它是`MessageHandler`中的``。在本例中,用`BeanFactory`注册的 Bean 名称是’somethingchain$child.somethingservice.handler’。 * 这个`ServiceActivatingHandler`的`componentName`接受相同的值,但没有后缀“.handler”。它变成了“SomethingChain$Child.SomethingService”。 * 最后的``子组件``不具有`id`属性。其`componentName`基于其在``中的位置。在本例中,它是“SomethingChain$Child#1”。(名称的最后一个元素是链中的顺序,以“#0”开头)。注意,此转换器未在应用程序上下文中注册为 Bean,因此它不会获得`beanName`。但是,它的`componentName`的值对于日志记录和其他目的是有用的。 ``元素的`id`属性使它们符合[JMX export](./jmx.html#jmx-mbean-exporter)的条件,并且它们可以在[消息历史](./message-history.html#message-history)中进行跟踪。正如前面讨论的那样,你可以通过使用适当的 Bean 名称从`BeanFactory`访问它们。 | |在``元素上提供显式的`id`属性是有用的,以简化日志中的子组件的标识,并提供从`BeanFactory`等对它们的访问。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### 从链内调用链子 有时,你需要从一个链中对另一个链进行嵌套调用,然后返回并继续在原来的链中执行。要实现这一点,你可以通过包含一个 \元素来使用消息传递网关,如下例所示: ``` ``` 在前面的示例中,`nested-chain-a`是在`main-chain`处理结束时由在那里配置的’gateway’元素调用的。而在`nested-chain-a`中,对`nested-chain-b`的调用是在 header 充实之后进行的。然后流返回以在`nested-chain-b`中完成执行。最后,该流返回`main-chain`。当在链中定义``元素的嵌套版本时,它不需要`service-interface`属性。相反,它以当前状态接收消息,并将其放置在`request-channel`属性中定义的通道上。当由该网关发起的下游流完成时,一个`Message`被返回到网关,并继续其在当前链中的行程。 ### 分散收集 从版本 4.1 开始, Spring 集成提供了[分散收集](https://www.enterpriseintegrationpatterns.com/BroadcastAggregate.html)Enterprise 集成模式的实现。它是一个复合端点,其目标是向收件人发送消息并汇总结果。正如[*Enterprise 整合模式 *](https://www.enterpriseintegrationpatterns.com/)中所指出的,它是“最佳报价”等场景的组件,在这种情况下,我们需要从几个供应商那里请求信息,并决定哪个供应商为我们提供所请求的项目的最佳术语。 以前,模式可以通过使用离散组件来配置。这种增强带来了更方便的配置。 `ScatterGatherHandler`是一个请求-回复端点,它结合了`PublishSubscribeChannel`(或`RecipientListRouter`)和`AggregatingMessageHandler`。请求消息被发送到`scatter`通道,而`ScatterGatherHandler`等待聚合器发送给`outputChannel`的回复。 #### 功能 `Scatter-Gather`模式提出了两种情况:“拍卖”和“分销”。在这两种情况下,`aggregation`函数是相同的,并且提供了`AggregatingMessageHandler`的所有可用选项。(实际上,`ScatterGatherHandler`只需要一个`AggregatingMessageHandler`作为构造函数参数。)有关更多信息,请参见[Aggregator](./aggregator.html#aggregator)。 ##### 拍卖 拍卖`Scatter-Gather`变体对请求消息使用“发布-订阅”逻辑,其中“分散”通道是带有`PublishSubscribeChannel`的`apply-sequence="true"`。然而,这个通道可以是任何`MessageChannel`实现(就像`request-channel`中的`ContentEnricher`—参见[内容更丰富](./content-enrichment.html#content-enricher)中的`request-channel`一样)。但是,在这种情况下,你应该为`aggregation`函数创建自己的自定义`correlationStrategy`。 ##### 分布 该发行版`Scatter-Gather`变体基于`RecipientListRouter`(参见[`RecipientListRouter`](./router.html#router-implementations-reciscuentlistrouter)),具有`RecipientListRouter`的所有可用选项。这是第二个`ScatterGatherHandler`构造函数参数。如果你希望仅依赖`correlationStrategy`的`recipient-list-router`和`aggregator`的默认`correlationStrategy`,则应该指定`apply-sequence="true"`。否则,你应该为`aggregator`提供一个自定义`correlationStrategy`。与`PublishSubscribeChannel`变体(拍卖变体)不同,具有`recipient-list-router``selector`选项可以基于消息过滤目标供应商。使用`apply-sequence="true"`,将提供默认的`sequenceSize`,并且`aggregator`可以正确地释放该组。分配选项与拍卖选项是互斥的。 对于拍卖和分发变量,请求(散点)消息中都添加了`gatherResultChannel`头,以等待来自`aggregator`的回复消息。 默认情况下,所有供应商都应该将其结果发送到`replyChannel`头(通常通过省略最终端点的`output-channel`)。但是,还提供了`gatherChannel`选项,允许供应商将其答复发送到该聚合通道。 #### 配置分散收集端点 下面的示例显示了`Scatter-Gather` Bean 定义的 Java 配置: ``` @Bean public MessageHandler distributor() { RecipientListRouter router = new RecipientListRouter(); router.setApplySequence(true); router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(), distributionChannel3())); return router; } @Bean public MessageHandler gatherer() { return new AggregatingMessageHandler( new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"), new SimpleMessageStore(), new HeaderAttributeCorrelationStrategy( IntegrationMessageHeaderAccessor.CORRELATION_ID), new ExpressionEvaluatingReleaseStrategy("size() == 2")); } @Bean @ServiceActivator(inputChannel = "distributionChannel") public MessageHandler scatterGatherDistribution() { ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer()); handler.setOutputChannel(output()); return handler; } ``` 在前面的示例中,我们将`RecipientListRouter``distributor` Bean 配置为`applySequence="true"`和收件人通道列表。下一个 Bean 是`AggregatingMessageHandler`。最后,我们将这两个 bean 注入`ScatterGatherHandler` Bean 定义中,并将其标记为`@ServiceActivator`,以将分散收集组件连接到积分流中。 下面的示例展示了如何通过使用 XML 名称空间来配置``端点: ``` (11) (12) (13) ``` |**1** |端点的 ID。
`ScatterGatherHandler` Bean 注册的别名为`id + '.handler'`。
`RecipientListRouter` Bean 注册的别名为`id + '.scatterer'``AggregatingMessageHandler` Bean 的别名为`id + '.gatherer'`gt=“gt=”1564“/>”r=“1565./>(可选值<>>>>><<>>>><<<>>>>>>><<<>>>>>>>>>>| |------|| |**2** |另外,`ScatterGatherHandler`还实现了`Lifecycle`,并开始和停止`gatherEndpoint`,这是在提供`gather-channel`的情况下在内部创建的。
可选的。
(默认值是`true`。)| |**3** |接收请求消息的通道在`ScatterGatherHandler`中对它们进行处理。
是必需的。| |**4** |将聚合结果发送到`ScatterGatherHandler`的通道。
可选。
(传入消息可以在`replyChannel`消息头中指定自己的回复通道)。| |**5** |用于拍卖场景的向其发送分散消息的通道。
可选的。
与``子元素互斥。| |**6** |用于接收来自每个供应商的聚合回复的通道。
它在分散消息中用作`replyChannel`头。
可选的。
默认情况下,`FixedSubscriberChannel`被创建。| |**7** |当订阅了多个相同的处理程序`DirectChannel`(用于负载平衡目的)时,此组件的顺序相同。
可选。| |**8** |指定应该启动和停止端点的阶段,
启动顺序从最低到最高,关闭顺序从最高到最低,
默认情况下,该值为`Integer.MAX_VALUE`,这意味着这个容器开始得越晚,停止得越快。
可选的。| |**9** |当向`output-channel`发送回复`Message`时等待的超时间隔。
默认情况下,发送块为一秒。
只有在输出通道有一些“发送”限制的情况下才适用,例如,a`QueueChannel`具有已满的固定“容量”的
在这种情况下,将抛出一个`MessageDeliveryException`。
对于`send-timeout`实现,
被忽略。
对于`group-timeout(-expression)`,来自计划过期任务的`MessageDeliveryException`将导致该任务被重新安排。默认情况下,它将无限期地等待。如果回复超时,将返回
’null。
可选。
它默认为`-1`,这意味着无限期地等待。| |**11**|指定 Scatter-Gather 是否必须返回一个非空值。
该值默认为`true`。
因此,当底层聚合器在`gather-timeout`之后返回一个空值时,将抛出一个
。`null`注意,如果`null`是一种可能性,应该指定`gather-timeout`以避免无限期等待。| |**12**|将``选项.
选项.
与`scatter-channel`属性互斥。| |**13**|需要``选项。
选项。| #### 错误处理 由于 Scatter-Gather 是一个多请求-应答组件,错误处理有一些额外的复杂性。在某些情况下,如果`ReleaseStrategy`允许进程以比请求更少的回复来完成任务,那么最好只捕获并忽略下游异常。在其他情况下,当发生错误时,应该考虑从子流返回“补偿消息”之类的内容。 每个异步子流都应该配置一个`errorChannel`头,以便从`MessagePublishingErrorHandler`发送正确的错误消息。否则,将使用常见的错误处理逻辑将错误发送到全局`errorChannel`。有关异步错误处理的更多信息,请参见[错误处理](./error-handling.html#error-handling)。 同步流可以使用`ExpressionEvaluatingRequestHandlerAdvice`忽略异常或返回补偿消息。当异常从一个子流被抛到`ScatterGatherHandler`时,它只是被重新抛到上游。这样,所有其他子流将不起任何作用,它们的答复将在`ScatterGatherHandler`中被忽略。这有时可能是一种预期的行为,但在大多数情况下,最好是在不影响所有其他子流和收集器中的期望的情况下处理特定子流中的错误。 从版本 5.1.3 开始,`ScatterGatherHandler`提供了`errorChannelName`选项。它是填充到`errorChannel`报头的散点消息,并且是在异步错误发生时使用的,或者可以在常规的同步子流中使用,用于直接发送错误消息。 下面的示例配置通过返回一条补偿消息来演示异步错误处理: ``` @Bean public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) { return f -> f .scatterGather( scatterer -> scatterer .applySequence(true) .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1")) .recipientFlow(f2 -> f2 .channel(c -> c.executor(taskExecutor)) .transform(p -> { throw new RuntimeException("Sub-flow#2"); })), null, s -> s.errorChannel("scatterGatherErrorChannel")); } @ServiceActivator(inputChannel = "scatterGatherErrorChannel") public Message processAsyncScatterError(MessagingException payload) { return MessageBuilder.withPayload(payload.getCause().getCause()) .copyHeaders(payload.getFailedMessage().getHeaders()) .build(); } ``` 要产生正确的回复,我们必须从`MessagingException`的`failedMessage`中复制标题(包括`replyChannel`和`errorChannel`),该标题已由`MessagePublishingErrorHandler`发送到`scatterGatherErrorChannel`。通过这种方式,将目标异常返回给`ScatterGatherHandler`的收集者,以完成对消息组的回复。这样的异常`payload`可以在`MessageGroupProcessor`的采集器的过滤掉或以其他方式向下游处理后,在散集端点之后。 | |在将分散的结果发送给收集者之前,`ScatterGatherHandler`将恢复请求消息头,如果有的话,包括回复和错误通道,
这样,来自`AggregatingMessageHandler`的错误将传播给调用者,即使在分散的接收者子流中应用了异步关闭。
要成功地进行操作,必须将`gatherResultChannel`,`originalReplyChannel`和`originalErrorChannel`标题从分散的接收者子流中传输回答复。,在这种情况下,
是合理的,必须为`gatherTimeout`配置有限的`ScatterGatherHandler`。
否则,默认情况下,将永远阻止它等待收集者的回复。| |---|| ### 螺纹屏障 有时,我们需要挂起消息流线程,直到发生其他异步事件。例如,考虑一个向 RabbitMQ 发布消息的 HTTP 请求。在 RabbitMQ 代理发出消息已被接收的确认消息之前,我们可能不希望回复用户。 在版本 4.2 中, Spring Integration 为此目的引入了``组件。标的`MessageHandler`是`BarrierMessageHandler`。该类还实现`MessageTriggerAction`,其中传递给`trigger()`方法的消息在`handleRequestMessage()`方法中释放相应的线程(如果存在)。 挂起的线程和触发器线程通过调用消息上的`CorrelationStrategy`来进行关联。当消息被发送到`input-channel`时,线程将被挂起长达`requestTimeout`毫秒,等待相应的触发消息。默认的相关策略使用`IntegrationMessageHeaderAccessor.CORRELATION_ID`头。当触发消息以相同的相关性到达时,线程将被释放。在发布后发送到`output-channel`的消息是通过使用`MessageGroupProcessor`构建的。默认情况下,消息是两个有效负载的`Collection`,并且通过使用`DefaultAggregatingMessageGroupProcessor`合并头。 | |如果`trigger()`方法首先被调用(或者在主线程超时之后),它将被挂起长达`triggerTimeout`,等待挂起的消息到达。
如果不想挂起触发器线程,考虑切换到`TaskExecutor`,这样它的线程就被挂起了。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | |在以前的 5.4 版本中,对于请求和触发器消息只有一个`timeout`选项,但是在某些使用情况下,对于这些操作最好有不同的超时时间。因此引入了
和`triggerTimeout`选项。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| `requires-reply`属性确定如果挂起的线程在触发器消息到达之前超时,将采取的操作。默认情况下,它是`false`,这意味着端点返回`null`,流结束,线程返回给调用方。当`true`时,抛出一个`ReplyRequiredException`。 可以通过编程调用`trigger()`方法(通过使用名称`barrier.handler`获得 Bean 引用——其中`barrier`是屏障端点的 Bean 名称)。或者,你可以配置``来触发发布。 | |相同的相关性只能挂起一个线程。
相同的相关性可以被多次使用,但只能同时使用一次。
如果第二个线程到达时具有相同的相关性,则会引发异常。| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 下面的示例展示了如何使用自定义标头进行关联: Java ``` @ServiceActivator(inputChannel="in") @Bean public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) { BarrierMessageHandler barrier = new BarrierMessageHandler(10000); barrier.setOutputChannel(out()); barrier.setDiscardChannel(lateTriggerChannel); return barrier; } @ServiceActivator (inputChannel="release") @Bean public MessageHandler releaser(MessageTriggerAction barrier) { return barrier::trigger(message); } ``` XML ``` ``` 根据哪个消息先到达,向`in`发送消息的线程或向`release`发送消息的线程最多要等待 10 秒,直到另一条消息到达。当消息被释放时,`out`通道被发送一条消息,该消息结合了调用自定义`MessageGroupProcessor` Bean 的结果,名为`myOutputProcessor`。如果主线程超时且触发器较晚到达,则可以配置一个丢弃通道,将较晚的触发器发送到该通道。 有关此组件的示例,请参见[屏障样本应用](https://github.com/spring-projects/spring-integration-samples/tree/main/basic/barrier)。