# 消息传递端点 ## 消息传递端点 ### 消息端点 本章的第一部分介绍了一些背景理论,并揭示了驱动 Spring 集成的各种消息传递组件的底层 API。如果你想真正了解幕后的情况,这些信息可能会很有帮助。但是,如果你想要启动并运行各种元素的基于名称空间的简化配置,现在可以跳过[端点命名空间支持](#endpoint-namespace)。 正如在概述中提到的,消息端点负责将各种消息传递组件连接到通道。在接下来的几章中,我们将介绍使用消息的许多不同组件。其中一些还能够发送回复消息。发送消息非常简单。如前面[消息通道](./channel.html#channel)中所示,可以将消息发送到消息通道。然而,接收要复杂一些。主要原因是有两类消费者:[民意测验消费者](https://www.enterpriseintegrationpatterns.com/PollingConsumer.html)和[事件驱动的消费者](https://www.enterpriseintegrationpatterns.com/EventDrivenConsumer.html)。 在这两种情况中,事件驱动的消费者要简单得多。不需要管理和调度单独的 Poller 线程,它们本质上是具有回调方法的侦听器。当连接到 Spring 集成的可订阅消息通道之一时,这个简单的选项非常有效。然而,当连接到一个缓冲的、可匹配的消息通道时,一些组件必须调度和管理轮询线程。 Spring 集成提供了两种不同的端点实现,以适应这两种类型的消费者。因此,消费者本身只需要实现回调接口。当需要轮询时,端点充当使用者实例的容器。其好处类似于使用容器来托管消息驱动的 bean,但是,由于这些使用者是在`ApplicationContext`中运行的 Spring 管理对象,因此它更类似于 Spring 自己的`MessageListener`容器。 #### 消息处理程序 Spring 集成的`MessageHandler`接口由框架内的许多组件实现。换句话说,这不是公共 API 的一部分,并且你通常不会直接实现`MessageHandler`。然而,消息使用者使用它来实际处理所使用的消息,因此了解此策略接口确实有助于理解使用者的整体角色。接口定义如下: ``` public interface MessageHandler { void handleMessage(Message message); } ``` 尽管它很简单,但是这个接口为下面几章中涉及的大多数组件(路由器、转换器、分发器、聚合器、服务激活器和其他组件)提供了基础。这些组件各自对它们处理的消息执行非常不同的功能,但是实际接收消息的要求是相同的,并且轮询和事件驱动行为之间的选择也是相同的。 Spring 集成提供了两个端点实现,它们承载这些基于回调的处理程序,并让它们连接到消息通道。 #### 事件驱动的消费者 因为它是两个中比较简单的,所以我们首先讨论事件驱动的消费者端点。你可能还记得`SubscribableChannel`接口提供了一个`subscribe()`方法,并且该方法接受一个`MessageHandler`参数(如[`SubscribableChannel`](./channel.html#channel-interfaces-subscribablechannel)所示)。下面的清单显示了`subscribe`方法的定义: ``` subscribableChannel.subscribe(messageHandler); ``` 由于订阅了一个通道的处理程序不必主动轮询该通道,因此这是一个事件驱动的消费者, Spring 集成提供的实现接受一个`SubscribableChannel`和一个`MessageHandler`,如下例所示: ``` SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class); EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler); ``` #### 轮询消费者 Spring 集成还提供了一个`PollingConsumer`,并且可以以相同的方式进行实例化,只是通道必须实现`PollableChannel`,如下例所示: ``` PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class); PollingConsumer consumer = new PollingConsumer(channel, exampleHandler); ``` | |有关轮询消费者的更多信息,请参见[Poller](./polling-consumer.html#polling-consumer)和[通道适配器](./channel-adapter.html#channel-adapter)。| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------| 轮询使用者还有许多其他配置选项。例如,触发器是一个必需的属性。下面的示例展示了如何设置触发器: ``` PollingConsumer consumer = new PollingConsumer(channel, handler); consumer.setTrigger(new PeriodicTrigger(30, TimeUnit.SECONDS)); ``` `PeriodicTrigger`通常定义为一个简单的间隔(以毫秒为单位),但也支持`initialDelay`属性和布尔`fixedRate`属性(默认为`false`——即没有固定的延迟)。以下示例设置了这两个属性: ``` PeriodicTrigger trigger = new PeriodicTrigger(1000); trigger.setInitialDelay(5000); trigger.setFixedRate(true); ``` 在前面的示例中,三个设置的结果是一个触发器,该触发器等待五秒钟,然后每秒触发一次。 `CronTrigger`需要一个有效的 CRON 表达式。有关详细信息,请参见[爪哇doc](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/scheduling/support/CronTrigger.html)。下面的示例设置了一个新的`CronTrigger`: ``` CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI"); ``` 在前面的示例中定义的触发器的结果是,从星期一到星期五,每十秒钟触发一次触发器。 除了触发器,你还可以指定另外两个与轮询相关的配置属性:`maxMessagesPerPoll`和`receiveTimeout`。下面的示例展示了如何设置这两个属性: ``` PollingConsumer consumer = new PollingConsumer(channel, handler); consumer.setMaxMessagesPerPoll(10); consumer.setReceiveTimeout(5000); ``` `maxMessagesPerPoll`属性指定在给定的轮询操作中要接收的消息的最大数量。这意味着,Poller 将继续调用`receive()`而不等待,直到返回`null`或达到最大值为止。例如,如果一个 Poller 有一个 10 秒的间隔触发器,并且`maxMessagesPerPoll`设置为`25`,并且它正在轮询一个在其队列中有 100 条消息的通道,那么所有 100 条消息都可以在 40 秒内检索到。它抓取 25 个,等待 10 秒,抓取下一个 25,以此类推。如果`maxMessagesPerPoll`被配置为负值,则在一个轮询周期内调用`MessageSource.receive()`,直到返回`null`。从版本 5.5 开始,`0`值具有特殊的含义-完全跳过`MessageSource.receive()`调用,这可能被视为暂停此轮询端点,直到`maxMessagesPerPoll`在以后的时间更改为 n 个非零值,例如通过控制总线。 `receiveTimeout`属性指定当调用器调用 receive 操作时,如果没有可用消息,那么调用器应该等待的时间。例如,考虑两个表面上看起来相似但实际上完全不同的选项:第一个选项的间隔触发时间为 5 秒,接收超时时间为 50 毫秒,而第二个选项的间隔触发时间为 50 毫秒,接收超时时间为 5 秒。第一个接收消息的时间可能比它到达频道的时间晚 4950 毫秒(如果该消息是在其一个轮询调用返回后立即到达的)。另一方面,第二种配置对消息的漏失永远不会超过 50 毫秒。不同之处在于,第二个选项需要等待线程。然而,结果是,它可以更快地响应到达的消息。这种技术被称为“长轮询”,可以用来模拟被轮询数据源上的事件驱动行为。 轮询消费者也可以委托给 Spring `TaskExecutor`,如下例所示: ``` PollingConsumer consumer = new PollingConsumer(channel, handler); TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class); consumer.setTaskExecutor(taskExecutor); ``` 此外,`PollingConsumer`具有一个名为`adviceChain`的属性。此属性允许你指定一个`List`的 AOP 建议,用于处理包括事务在内的其他交叉问题。这些建议是围绕`doPoll()`方法应用的。有关更多的深入信息,请参见[端点命名空间支持](#endpoint-namespace)下的 AOP 建议链和事务支持部分。 前面的示例显示了依赖项查找。然而,请记住,这些消费者通常被配置为 Spring Bean 定义。实际上, Spring 集成还提供了一个名为`FactoryBean`的`ConsumerEndpointFactoryBean`的`ConsumerEndpointFactoryBean`,它基于通道的类型创建适当的消费者类型。此外, Spring 集成具有完整的 XML 命名空间支持,以进一步隐藏这些细节。基于名称空间的配置在本指南中介绍了每个组件类型。 | |许多`MessageHandler`实现都可以生成回复消息。
如前所述,与接收消息相比,发送消息是微不足道的。
然而,何时以及发送多少回复消息取决于处理程序类型。例如,
,聚合器等待大量消息到达,并且通常被配置为拆分器的下游使用者,拆分器可以为其处理的每条消息生成多个答复。
当使用名称空间配置时,你不需要严格地了解所有细节。,但是,
,仍然值得知道的是,这些组件中的几个共享一个公共基类,即`AbstractReplyProducingMessageHandler`,并且它提供了一个`setOutputChannel(..)`方法。| |---|| #### 端点命名空间支持 在本参考手册中,你可以找到端点元素的特定配置示例,例如路由器、转换器、服务激活器等等。其中大多数支持`input-channel`属性,许多支持`output-channel`属性。在被解析之后,这些端点元素产生`PollingConsumer`或`EventDrivenConsumer`的实例,这取决于所引用的`input-channel`的类型:`PollableChannel`或`SubscribableChannel`,分别。当通道是可匹配的时,轮询行为基于端点元素的`poller`子元素及其属性。 下面的清单列出了`poller`的所有可用配置选项: ``` (12) (13) (14) ``` |**1** |提供通过使用 CRON 表达式配置 Pollers 的能力。
底层实现使用`org.springframework.scheduling.support.CronTrigger`。
如果设置了此属性,则必须指定以下所有属性:`fixed-delay`、`trigger`、`fixed-rate`和`ref`。| |------|| |**2** |通过将此属性设置为`true`,你可以精确地定义一个全局默认 poller。
引发了异常如果在应用程序上下文中定义了一个以上的默认 Poller。
连接到`PollableChannel`(`PollingConsumer`)或任何没有显式配置 Poller 的`SourcePollingChannelAdapter`的端点,则使用全局默认 Poller。
它默认为`false`。
可选的。| |**3** |标识如果此调用器的调用发生故障时发送错误消息的通道。
要完全抑制异常,可以提供对`nullChannel`的引用。
可选的。| |**4** |固定延迟触发器在覆盖项下使用`PeriodicTrigger`。
如果不使用`time-unit`属性,则指定的值以毫秒为单位表示。
如果设置了此属性,则必须指定以下所有属性:`fixed-rate`、`trigger`、`cron`和`ref`。| |**5** |固定费率触发器在覆盖项下使用`PeriodicTrigger`。
如果不使用`time-unit`属性,则指定的值以毫秒为单位表示。
如果设置了该属性,则不需要指定以下属性:`fixed-delay`,`trigger`,`cron`,以及`ref`。| |**6** |引用 poller 的底层 Bean-定义的 ID 类型为`org.springframework.integration.scheduling.PollerMetadata`。
对于顶级 poller 元素,`id`属性是必需的,除非它是默认的 poller(`default="true"`)。| |**7** |有关更多信息,请参见[配置入站通道适配器](./channel-adapter.html#channel-adapter-namespace-inbound)。
如果未指定,默认值取决于上下文。
如果使用`PollingConsumer`,则此属性默认为`-1`。
但是,如果使用`max-messages-per-poll`,则`max-messages-per-poll`属性默认为`1`。如果未指定,它的默认值为 1000(毫秒)。
可选。| |**9** |Bean 引用另一个顶级 poller。
`ref`属性不能出现在顶级`poller`元素上。
但是,如果设置了此属性,则不能指定以下属性:`fixed-rate`,`cron`,以及`fixed-delay`。| |**10**|提供引用自定义任务执行器的能力。
有关更多信息,请参见[TaskExecutor 支持](#taskexecutor-support)。
可选。| |**11**|此属性指定底层`java.util.concurrent.TimeUnit`上的`java.util.concurrent.TimeUnit`enum 值。
因此,此属性只能与`fixed-delay`或`fixed-rate`属性结合使用。
如果与`cron`或`trigger`引用属性结合使用,它会导致失败。
`PeriodicTrigger`所支持的最小粒度是毫秒。
因此,唯一可用的选项是毫秒和秒。
如果不提供该值,则任何`fixed-delay`或`fixed-rate`值都被解释为毫秒。
基本上,这个枚举为基于秒的间隔触发值提供了便利。
对于每小时、每天和每月的设置,我们建议使用`cron`触发器。| |**12**|引用实现`org.springframework.scheduling.Trigger`接口的任何 Spring-配置的 Bean。
但是,如果设置了此属性,则必须指定以下任何属性:`fixed-delay`,`fixed-rate`,`cron`和`ref`可选的。| |**13**|允许指定额外的 AOP 建议来处理额外的横切问题。
有关更多信息,请参见[事务支持](#transaction-support)。
可选。| |**14**|可以使 poller 成为事务性的。
有关更多信息,请参见[AOP Advice chains](#aop-advice-chains)。
可选。| ##### 示例 具有 1 秒间隔的简单的基于间隔的 Poller 可以配置如下: ``` ``` 作为使用`fixed-rate`属性的替代方法,你还可以使用`fixed-delay`属性。 对于基于 CRON 表达式的 poller,请使用`cron`属性,如下例所示: ``` ``` 如果输入通道是`PollableChannel`,则需要进行 poller 配置。具体地说,正如前面提到的,`trigger`是`PollingConsumer`类的一个必需属性。因此,如果省略用于轮询消费者端点配置的`poller`子元素,可能会引发异常。如果你试图在连接到不可搜索通道的元素上配置一个 Poller,也可能引发异常。 也可以创建顶级 Pollers,在这种情况下,只需要一个`ref`属性,如下例所示: ``` ``` | |`ref`属性仅在内部 poller 定义上允许。
在顶级 poller 上定义此属性会导致在初始化应用程序上下文期间引发配置异常。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ###### 全局默认 Poller 为了进一步简化配置,你可以定义一个全局默认 Poller。XML DSL 中的单个顶级 Poller 组件可能将`default`属性设置为`true`。对于 爪哇 配置,在这种情况下必须声明带有`PollerMetadata.DEFAULT_POLLER`名称的`PollerMetadata` Bean。在这种情况下,任何具有`PollableChannel`作为其输入通道的端点,都是在相同的`ApplicationContext`中定义的,并且没有显式配置`poller`的端点使用该默认值。下面的示例展示了这样的 Poller 和使用它的 Transformer: Java DSL ``` @Bean(name = PollerMetadata.DEFAULT_POLLER) public PollerMetadata defaultPoller() { PollerMetadata pollerMetadata = new PollerMetadata(); pollerMetadata.setMaxMessagesPerPoll(5); pollerMetadata.setTrigger(new PeriodicTrigger(3000)); return pollerMetadata; } // No 'poller' attribute because there is a default global poller @Bean public IntegrationFlow transformFlow(MyTransformer transformer) { return IntegrationFlows.from(MessageChannels.queue("pollable")) .transform(transformer) // No 'poller' attribute because there is a default global poller .channel("output") .get(); } ``` Java ``` @Bean(PollerMetadata.DEFAULT_POLLER) public PollerMetadata defaultPoller() { PollerMetadata pollerMetadata = new PollerMetadata(); pollerMetadata.setMaxMessagesPerPoll(5); pollerMetadata.setTrigger(new PeriodicTrigger(3000)); return pollerMetadata; } @Bean public QueueChannel pollable() { return new QueueChannel(); } // No 'poller' attribute because there is a default global poller @Transformer(inputChannel = "pollable", outputChannel = "output") public Object transform(Object payload) { ... } ``` Kotlin DSL ``` @Bean(PollerMetadata.DEFAULT_POLLER) fun defaultPoller() = PollerMetadata() .also { it.maxMessagesPerPoll = 5 it.trigger = PeriodicTrigger(3000) } @Bean fun convertFlow() = integrationFlow(MessageChannels.queue("pollable")) { transform(transformer) // No 'poller' attribute because there is a default global poller channel("output") } ``` XML ``` ``` ###### 事务支持 Spring 集成还为 Pollers 提供事务支持,以便每个接收和转发操作可以作为工作的原子单元来执行。要为 Poller 配置事务,请添加``子元素。下面的示例显示了可用的属性: ``` ``` 有关更多信息,请参见[Poller 事务支持](./transactions.html#transaction-poller)。 ##### AOP 建议链 由于 Spring 事务支持依赖于用`TransactionInterceptor`( AOP 通知)处理由 Poller 发起的消息流的事务行为的代理机制,因此有时必须提供额外的建议来处理与 Poller 相关的其他交叉行为。为此,`poller`定义了一个`advice-chain`元素,该元素允许你在实现`MethodInterceptor`接口的类中添加更多的建议。下面的示例展示了如何为`poller`定义`advice-chain`: ``` ``` 有关如何实现`MethodInterceptor`接口的更多信息,请参见[AOP sections of the Spring Framework Reference Guide](https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#aop-api)。也可以在没有任何事务配置的 Poller 上应用一个建议链,从而增强由 Poller 发起的消息流的行为。 | |当使用建议链时,不能指定``子元素。
相反,声明一个`` Bean 并将其添加到``中。
有关完整的配置详细信息,请参见[Poller 事务支持](./transactions.html#transaction-poller)。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ###### TaskExecutor 支持 轮询线程可以由 Spring 的`TaskExecutor`抽象的任何实例执行。这使端点或一组端点能够并发。截至 Spring 3.0,核心 Spring 框架有一个`task`名称空间,其``元素支持创建一个简单的线程池执行器。该元素接受用于公共并发设置的属性,例如池大小和队列容量。配置线程池执行器可以使端点在负载下的执行方式发生很大的变化。这些设置可用于每个端点,因为端点的性能是需要考虑的主要因素之一(另一个主要因素是端点订阅的通道上的预期卷)。要为配置了 XML 名称空间支持的轮询端点启用并发,请在其``元素上提供`task-executor`引用,然后提供一个或多个属性,如以下示例所示: ``` ``` 如果你不提供任务执行器,则会在调用者的线程中调用使用者的处理程序。请注意,调用者通常是默认的`TaskScheduler`(参见[配置任务调度程序](./configuration.html#namespace-taskscheduler))。还应该记住,通过指定 Bean 名称,`task-executor`属性可以提供对 Spring 的`TaskExecutor`接口的任何实现的引用。前面显示的`executor`元素是为了方便而提供的。 正如前面在[轮询消费者的后台部分](#endpoint-pollingconsumer)中提到的,你还可以以这样一种方式配置轮询使用者,以便模拟事件驱动的行为。通过长时间的接收超时和较短的触发间隔,你可以确保对到达的消息做出非常及时的反应,即使是在经过调查的消息源上。请注意,这仅适用于具有超时阻塞等待调用的源。例如,文件 poller 不会阻塞。每个`receive()`调用都会立即返回,并且要么包含新文件,要么不包含新文件。因此,即使 poller 包含一个长`receive-timeout`,在这种情况下也不会使用该值。另一方面,当使用 Spring 集成自己的基于队列的通道时,超时值确实有机会参与。下面的示例展示了轮询消费者如何几乎在瞬间接收消息: ``` ``` 使用这种方法不会带来太多的开销,因为在内部,它只不过是一个定时等待线程,它所需的 CPU 资源使用量几乎不像(例如)颠簸的无限 while 循环那样多。 #### 在运行时更改轮询速率 当配置带有`fixed-delay`或`fixed-rate`属性的 Poller 时,默认实现使用`PeriodicTrigger`实例。`PeriodicTrigger`是核心 Spring 框架的一部分。它只接受作为构造函数参数的时间间隔。因此,它不能在运行时进行更改。 但是,你可以定义自己的`org.springframework.scheduling.Trigger`接口的实现。你甚至可以使用`PeriodicTrigger`作为起点。然后,你可以为间隔(周期)添加一个 setter,或者你甚至可以在触发器本身中嵌入你自己的节流逻辑。对`nextExecutionTime`的每次调用都使用`period`属性来安排下一次投票。要在 Pollers 中使用此自定义触发器,请在应用程序上下文中声明 Bean 自定义触发器的定义,并使用`trigger`属性将依赖项注入到 Poller 配置中,该属性引用自定义触发器 Bean 实例。现在可以获得对触发器的引用 Bean 并更改轮询之间的轮询间隔。 有关示例,请参见[Spring Integration Samples](https://github.com/SpringSource/spring-integration-samples/tree/main/intermediate)项目。它包含一个名为`dynamic-poller`的示例,该示例使用自定义触发器,并演示了在运行时更改轮询间隔的能力。 该示例提供了一个自定义触发器,该触发器实现了[`org.springframework.scheduling.Trigger`](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/org/springframework/schooling/trigger.html)接口。该示例的触发器基于 Spring 的[`PeriodicTrigger`](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/org/springframework/schooling/support/perioditragger.html)实现。但是,自定义触发器的字段不是最终的,属性具有显式的 getter 和 setter,允许你在运行时动态地更改轮询周期。 | |不过,需要注意的是,由于触发器方法是`nextExecutionTime()`,因此基于现有配置,对动态触发器的任何更改直到下一次轮询才生效。
在当前配置的下一次执行时间之前,不可能强制触发。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### 有效载荷类型转换 在整个参考手册中,你还可以看到各种端点的特定配置和实现示例,这些端点接受消息或任何任意的`Object`作为输入参数。在`Object`的情况下,这样的参数被映射到消息有效负载或有效负载或报头的一部分(当使用 Spring 表达式语言时)。然而,端点方法的输入参数的类型有时与有效负载或其部分的类型不匹配。在这个场景中,我们需要执行类型转换。 Spring 集成提供了一种方便的方式来注册类型转换器(通过使用 Spring )在其自身实例内的名为的转换服务 Bean。 Bean 是在通过使用 Spring 集成基础设施定义第一转换器时自动创建的。要注册转换器,可以实现`org.springframework.core.convert.converter.Converter`、`org.springframework.core.convert.converter.GenericConverter`或`org.springframework.core.convert.converter.ConverterFactory`。 `Converter`实现是最简单的,可以从一种类型转换为另一种类型。对于更复杂的操作,例如转换为类层次结构,你可以实现`GenericConverter`,也可以实现`ConditionalConverter`。这使你能够完全访问`from`和`to`类型描述符,从而实现复杂的转换。例如,如果你有一个名为`Something`的抽象类,它是转换的目标(参数类型、通道数据类型等等),那么你有两个具体的实现,分别称为`Thing1`和`Thing`,并且你希望根据输入类型转换为一个或另一个,`GenericConverter`将是一个很好的匹配。有关更多信息,请参见这些接口的 Javadoc: * [org.springframework.core.convert.converter.converter.converter.converter](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/core/convert/converter/Converter.html) * [org.springframework.core.convert.converter.genericconverter.genericconverter](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/core/convert/converter/package-summary.html) * [org.springframework.core.convert.converter.converterfactory.converterfactory](https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/core/convert/converter/ConverterFactory.html) 当你实现了转换器之后,你可以使用方便的名称空间支持对其进行注册,如下例所示: ``` ``` 或者,你可以使用内部 Bean,如下例所示: ``` ``` 从 Spring Integration4.0 开始,你可以使用注释来创建前面的配置,如下例所示: ``` @Component @IntegrationConverter public class TestConverter implements Converter { public Number convert(Boolean source) { return source ? 1 : 0; } } ``` 或者,你可以使用`@Configuration`注释,如下例所示: ``` @Configuration @EnableIntegration public class ContextConfiguration { @Bean @IntegrationConverter public SerializingConverter serializingConverter() { return new SerializingConverter(); } } ``` | |在配置应用程序上下文时, Spring 框架允许你添加`conversionService` Bean(参见[配置转换服务](https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#core-convert-Spring-config)章)。
当需要时,该服务用于在 Bean 创建和配置期间执行适当的转换,相比之下,

,`integrationConversionService`用于运行时转换。
这些用途是完全不同的。
转换器是用于连接 Bean 构造函数参数和属性时使用的转换器,如果在运行时用于 Spring 针对数据类型通道、有效负载类型转换器中的消息的集成表达式计算,则可能会产生意想不到的结果,以此类推。

但是,如果你确实希望使用 Spring `conversionService`作为 Spring 集成`integrationConversionService`,则可以在应用程序上下文中配置一个别名,如以下示例所示:


在这种情况下,
由`conversionService`提供的转换器可用于 Spring 集成运行时转换。| |---|| #### 内容类型转换 从版本 5.0 开始,默认情况下,方法调用机制基于`org.springframework.messaging.handler.invocation.InvocableHandlerMethod`基础架构。它的`HandlerMethodArgumentResolver`实现(例如`PayloadArgumentResolver`和`MessageMethodArgumentResolver`)可以使用`MessageConverter`抽象来将传入的`payload`转换为目标方法参数类型。该转换可以基于`contentType`消息头。为此, Spring Integration 提供了`ConfigurableCompositeMessageConverter`,它将委托给要调用的已注册转换器的列表,直到其中一个转换器返回非空结果。默认情况下,此转换器提供(以严格的顺序): 1. [`MappingJackson2MessageConverter`](https://DOCS. Spring.io/ Spring-framework/DOCS/current/javadoc-api/org/springframework/jms/support/converter/mappingJackson2messageconverter.html)如果 Jackson 处理器存在于 Classpath 上 2. [`ByteArrayMessageConverter`](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/org/SpringFramework/Messaging/Converter/BytearrayMessageConverter.html) 3. [`ObjectStringMessageConverter`](https://DOCS. Spring.io/ Spring-integration/DOCS/current/api//org/springframework/integration/support/converter/objectstringmessageConverter.html) 4. [`GenericMessageConverter`](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/org/SpringFramework/Messaging/Converter/GenericMessageConverter.html) 有关其目的和用于转换的适当的`contentType`值的更多信息,请参见 Javadoc(在前面的列表中链接)。使用`ConfigurableCompositeMessageConverter`是因为它可以与任何其他`MessageConverter`实现一起提供,包括或排除前面提到的默认转换器。 Bean 还可以在应用程序上下文中将其注册为适当的,覆盖默认的转换器,如以下示例所示: ``` @Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME) public ConfigurableCompositeMessageConverter compositeMessageConverter() { List converters = Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()), new JavaSerializationMessageConverter()); return new ConfigurableCompositeMessageConverter(converters); } ``` 这两个新的转换器是注册在复合之前的默认值。你也可以不使用`ConfigurableCompositeMessageConverter`,而是通过使用名称注册 Bean `integrationArgumentResolverMessageConverter`(通过设置`IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME`属性)来提供你自己的`MessageConverter`。 | |当使用 SPEL 方法调用时,基于`MessageConverter`(包括`contentType`header)的转换是不可用的。
在这种情况下,只有在[有效载荷类型转换](#payload-type-conversion)中提到的常规类到类的转换是可用的。| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### 异步轮询 如果希望轮询是异步的,则 Poller 可以选择指定一个`task-executor`属性,该属性指向任何`TaskExecutor` Bean 的现有实例( Spring 3.0 通过`task`命名空间提供了一个方便的命名空间配置)。但是,在使用`TaskExecutor`配置 Poller 时,你必须了解某些事情。 问题在于存在两种配置,poller 和`TaskExecutor`。他们必须互相配合。否则,你可能最终会创建一个人工内存泄漏。 考虑以下配置: ``` ``` 前面的配置演示了一个过时的配置。 默认情况下,任务执行器有一个无界的任务队列。即使所有线程都被阻塞,Poller 仍会继续调度新任务,等待新消息到达或超时过期。考虑到有 20 个线程以 5 秒的超时时间执行任务,它们的执行速度为每秒 4 个线程。然而,新任务的调度速度为每秒 20 次,因此任务执行器中的内部队列以每秒 16 次的速度增长(而进程是空闲的),因此我们存在内存泄漏。 处理此问题的方法之一是设置任务执行器的`queue-capacity`属性。即使是 0 也是一个合理的值。你还可以通过设置任务执行器的`rejection-policy`属性(例如,设置为`DISCARD`)来指定如何处理不能排队的消息来管理它。换句话说,在配置`TaskExecutor`时,你必须了解某些细节。有关此主题的更多详细信息,请参见 Spring 参考手册中的[“任务执行和调度”](https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#scheduling)。 #### 端点内部 bean 许多端点是复合 bean。这包括所有的消费者和所有的入站通道适配器。消费者(民意测验或事件驱动)委托给`MessageHandler`。经过轮询的适配器通过委托给`MessageSource`来获取消息。 Bean 通常情况下,获得对委托的引用是有用的,可能用于在运行时更改配置或进行测试。这些 bean 可以从具有众所周知的名称的`ApplicationContext`中获得。`MessageHandler`实例使用与`someConsumer.handler`相似的 Bean ID 在应用程序上下文中注册,(其中’Consumer’是端点的`id`属性的值),`MessageSource`实例使用与`somePolledAdapter.source`相似的 Bean ID 注册,其中“somepolledapter”是适配器的 ID。 前面的只适用于框架组件本身。你可以使用内部 Bean 定义,如下例所示: ``` ``` Bean 被视为与任何内部 Bean 声明的一样,并且不与应用上下文注册。如果你希望以某种其他方式访问此 Bean,请在顶层用`id`声明它,并使用`ref`属性。有关更多信息,请参见[Spring Documentation](https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#beans-inner-beans)。 ### 端点角色 从版本 4.2 开始,可以将端点分配给角色。角色让端点作为一个组开始和停止。这在使用领导选举时特别有用,在这种情况下,可以分别在授予或撤销领导时启动或停止一组端点。为此,框架在应用程序上下文中以`IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER`的名称注册了`SmartLifecycleRoleController` Bean。每当需要控制生命周期时,这 Bean 可以被注入或`@Autowired`: ``` ``` 你可以使用 XML、Java 配置或编程方式将端点分配给角色。下面的示例展示了如何使用 XML 配置端点角色: ``` ``` 下面的示例展示了如何为用 Java 创建的 Bean 配置端点角色: ``` @Bean @ServiceActivator(inputChannel = "sendAsyncChannel", autoStartup="false") @Role("cluster") public MessageHandler sendAsyncHandler() { return // some MessageHandler } ``` 下面的示例展示了如何在 Java 中的方法上配置端点角色: ``` @Payload("#args[0].toLowerCase()") @Role("cluster") public String handle(String payload) { return payload.toUpperCase(); } ``` 下面的示例展示了如何使用 Java 中的`SmartLifecycleRoleController`配置端点角色: ``` @Autowired private SmartLifecycleRoleController roleController; ... this.roleController.addSmartLifeCycleToRole("cluster", someEndpoint); ... ``` 下面的示例展示了如何在 Java 中使用`IntegrationFlow`配置端点角色: ``` IntegrationFlow flow -> flow .handle(..., e -> e.role("cluster")); ``` 每一个都将端点添加到`cluster`角色。 调用`roleController.startLifecyclesInRole("cluster")`和相应的`stop…​`方法来启动和停止端点。 | |任何实现`SmartLifecycle`的对象都可以通过编程方式添加——而不仅仅是端点。| |---|-----------------------------------------------------------------------------------------------| `SmartLifecycleRoleController`实现了`ApplicationListener`,并且在授予或撤销领导权限时(当某些 Bean 发布`OnGrantedEvent`或`OnRevokedEvent`时),它会自动启动和停止其配置的`SmartLifecycle`对象。 | |当使用 leadership election 来启动和停止组件时,将`auto-startup`XML 属性(`autoStartup` Bean 属性)设置为`false`非常重要,这样应用程序上下文在上下文初始化期间就不会启动组件。| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 从版本 4.3.8 开始,`SmartLifecycleRoleController`提供了几种状态方法: ``` public Collection getRoles() (1) public boolean allEndpointsRunning(String role) (2) public boolean noEndpointsRunning(String role) (3) public Map getEndpointsRunningStatus(String role) (4) ``` |**1**|返回被管理角色的列表。| |-----|----------------------------------------------------------------------------------------------------| |**2**|如果角色中的所有端点都在运行,则返回`true`。| |**3**|如果角色中的端点都没有运行,则返回`true`。| |**4**|返回`component name : running status`的映射。
组件名称通常是 Bean 名称。| ### 领导力事件处理 端点组可以根据分别授予或撤销的领导才能来启动和停止。这在群集场景中很有用,在群集场景中,共享资源必须仅由单个实例使用。这方面的一个例子是一个文件入站通道适配器,它正在轮询一个共享目录。(见[读取文件](./file.html#file-reading))。 为了参与领导者选举并在当选领导者、领导者被撤销或未能获得成为领导者的资源时获得通知,应用程序在应用程序上下文中创建了一个称为“领导者发起者”的组件。通常,leader 发起者是`SmartLifecycle`,因此它在上下文启动时启动(可选地),然后在 leadership 更改时发布通知。你还可以通过将`publishFailedEvents`设置为`true`(从版本 5.0 开始)来接收失败通知,以便在发生失败时采取特定操作。按照惯例,你应该提供一个`Candidate`来接收回调。你还可以通过框架提供的`Context`对象撤销领导地位。你的代码还可以侦听`o.s.i.leader.event.AbstractLeaderEvent`实例(`OnGrantedEvent`和`OnRevokedEvent`的超类)并做出相应的响应(例如,通过使用`SmartLifecycleRoleController`)。这些事件包含对`Context`对象的引用。下面的清单显示了`Context`接口的定义: ``` public interface Context { boolean isLeader(); void yield(); String getRole(); } ``` 从版本 5.0.6 开始,上下文提供了对候选人角色的引用。 Spring 集成提供了一种基于`LockRegistry`抽象的 leader 启动器的基本实现。要使用它,你需要创建一个实例作为 Bean,如下例所示: ``` @Bean public LockRegistryLeaderInitiator leaderInitiator(LockRegistry locks) { return new LockRegistryLeaderInitiator(locks); } ``` 如果正确地实现了锁注册表,则最多只有一个领导者。如果 Lock Registry 还提供了锁,当异常过期或中断时抛出异常(理想情况下,`InterruptedException`),则无引导期的持续时间可以短于锁实现中固有的延迟所允许的时间。默认情况下,`busyWaitMillis`属性会增加一些额外的延迟,以防止在(更常见的)锁不完善且只有在再次尝试获得锁时才知道锁已过期的情况下出现 CPU 短缺。 有关领导选举和使用 ZooKeeper 的活动的更多信息,请参见[动物园管理员领导事件处理](./zookeeper.html#zk-leadership)。 ### 消息传递网关 网关隐藏了 Spring 集成提供的消息传递 API。它让应用程序的业务逻辑不了解 Spring Integration API。通过使用通用网关,你的代码只与一个简单的接口交互。 #### 输入`GatewayProxyFactoryBean` 如前所述,不依赖 Spring 集成 API(包括 Gateway 类)将是很好的。出于这个原因, Spring 集成提供了`GatewayProxyFactoryBean`,它为任何接口生成代理,并在内部调用下面所示的网关方法。通过使用依赖注入,你可以将接口公开给你的业务方法。 下面的示例显示了可用于与 Spring 集成交互的接口: ``` package org.cafeteria; public interface Cafe { void placeOrder(Order order); } ``` #### 网关 XML 命名空间支持 还提供了名称空间支持。它允许你将接口配置为服务,如下例所示: ``` ``` 在定义了这种配置之后,`cafeService`现在可以被注入到其他 bean 中,并且调用`Cafe`接口的代理实例上的方法的代码不知道 Spring 集成 API。一般的方法类似于 Spring 远程处理(RMI、Httpinvoker 等)。参见[“Samples”](./samples.html#samples)附录中使用`gateway`元素的示例(在 Cafe 演示中)。 前面配置中的默认值应用于网关接口上的所有方法。如果未指定应答超时,则调用线程将无限期地等待应答。见[未收到响应时的网关行为](#gateway-no-response)。 对于单个方法,可以重写默认值。见[带有注释和 XML 的网关配置](#gateway-configuration-annotations)。 #### 设置默认回复通道 通常,你不需要指定`default-reply-channel`,因为网关会自动创建一个临时的匿名回复通道,在该通道中监听回复。然而,有些情况下可能会提示你定义`default-reply-channel`(或者`reply-channel`具有适配器网关,例如 HTTP、JMS 和其他)。 对于一些背景,我们简要地讨论了网关的一些内部工作。网关创建一个临时的点对点应答通道。它是匿名的,并以`replyChannel`的名称添加到消息头中。当提供显式`default-reply-channel`(带有远程适配器网关的`reply-channel`)时,你可以指向一个发布-订阅通道,之所以这样命名,是因为可以向它添加多个订阅服务器。 Spring 在内部,集成在临时`replyChannel`和显式定义的`default-reply-channel`之间创建了一个桥。 假设你希望你的回复不仅发送到网关,还发送到其他一些消费者。在这种情况下,你想要两件事: * 你可以订阅的指定频道 * 该频道为发布-订阅-频道 网关使用的默认策略不能满足这些需求,因为添加到报头的应答通道是匿名的和点对点的。这意味着没有其他订阅者可以获得它的句柄,并且,即使可以,该通道也具有点对点行为,使得只有一个订阅者将获得该消息。通过定义`default-reply-channel`,你可以指向你选择的通道。在这种情况下,这是一个`publish-subscribe-channel`。网关创建了一个桥,从它到临时的匿名回复通道,该通道存储在头文件中。 你可能还希望显式地提供一个响应通道,用于通过拦截器进行监视或审核(例如,[wiretap](./channel.html#channel-wiretap))。要配置通道拦截器,你需要一个命名通道。 | |从版本 5.4 开始,当网关方法返回类型是`void`时,如果没有明确提供这样的头,则框架将`replyChannel`头填充为`nullChannel` Bean 引用。
这允许丢弃来自下游流的任何可能的答复,从而满足单向网关契约。| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### 带注释和 XML 的网关配置 考虑下面的示例,该示例通过添加`@Gateway`注释,扩展了前面的`Cafe`接口示例: ``` public interface Cafe { @Gateway(requestChannel="orders") void placeOrder(Order order); } ``` `@Header`注释允许你添加被解释为消息头的值,如下例所示: ``` public interface FileWriter { @Gateway(requestChannel="filesOut") void write(byte[] content, @Header(FileHeaders.FILENAME) String filename); } ``` 如果你更喜欢使用 XML 方法来配置网关方法,那么可以在网关配置中添加`method`元素,如下例所示: ``` ``` 你还可以使用 XML 为每个方法调用提供单独的头。如果你想要设置的标头本质上是静态的,并且你不想通过使用`@Header`注释将它们嵌入到网关的方法签名中,那么这可能是有用的。例如,在 Loan Broker 示例中,我们希望根据发起的请求类型(单个报价或所有报价)来影响如何进行贷款报价的聚合。通过评估调用了哪个网关方法来确定请求的类型,尽管这是可能的,但这将违反 Concerns 分离范式(该方法是一个 Java 工件)。然而,在消息传递体系结构中,在消息头中表达你的意图(元信息)是很自然的。下面的示例展示了如何为两个方法中的每一个添加不同的消息头: ``` ``` 在前面的示例中,根据网关的方法,为“response\_type”头设置了不同的值。 | |例如,如果在``中指定`requestChannel`以及在`@Gateway`注释中指定`requestChannel`,则注释值获胜。| |---|--------------------------------------------------------------------------------------------------------------------------------------| | |如果在 XML 中指定了无参数网关,并且接口方法同时具有`@Payload`和`@Gateway`注释(在`payloadExpression`元素中带有`payload-expression`或`payload-expression`元素),则`@Payload`值将被忽略。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ##### 表达式和“全局”头 `
`元素支持`expression`作为`value`的替代项。计算 SPEL 表达式以确定标头的值。从版本 5.2 开始,计算上下文的`#root`对象是带有`getMethod()`和`getArgs()`访问器的`MethodArgsHolder`对象。 从版本 5.2 开始,这两个表达式求值上下文变量已被弃用: * \#args:包含方法参数的`Object[]` * \#GatewayMethod:表示调用的`service-interface`中的方法的对象(派生自`java.reflect.Method`)。包含此变量的头可以在流的后面使用(例如,用于路由)。例如,如果你希望对简单的方法名进行路由,则可以添加一个带有以下表达式的头:`#gatewayMethod.name`。 | |`java.reflect.Method`不可序列化。如果你以后序列化消息,则带有`method`表达式的头将丢失。
因此,在这些情况下,你可能希望使用`method.name`或`method.toString()`。`service-interface`方法提供了该方法的`String`表示,包括参数和返回类型。| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 从版本 3.0 开始,``元素可以被定义为向网关产生的所有消息添加头,而不管调用的方法是什么。为方法定义的特定标头优先于缺省标头。这里为方法定义的特定标头覆盖了服务接口中的任何`@Header`注释。但是,缺省标头不会覆盖服务接口中的任何`@Header`注释。 网关现在还支持`default-payload-expression`,它适用于所有方法(除非重写)。 #### 将方法参数映射到消息 使用上一节中的配置技术,可以控制如何将方法参数映射到消息元素(有效负载和标题)。当不使用显式配置时,将使用某些约定来执行映射。在某些情况下,这些约定不能确定哪个参数是有效负载,哪个参数应该映射到头。考虑以下示例: ``` public String send1(Object thing1, Map thing2); public String send2(Map thing1, Map thing2); ``` 在第一种情况下,惯例是将第一个参数映射到有效负载(只要它不是`Map`),并且第二个参数的内容成为标题。 在第二种情况下(或者当参数`thing1`的参数是`Map`时的第一种情况),框架无法确定哪个参数应该是有效负载。因此,映射失败。这通常可以使用`payload-expression`、`@Payload`注释或`@Headers`注释来解决。 或者(每当约定失效时),你可以承担将方法调用映射到消息的全部责任。为此,实现`MethodArgsMessageMapper`,并通过使用`mapper`属性将其提供给``。映射器映射了`MethodArgsHolder`,这是一个简单的类,它包装`java.reflect.Method`实例和包含参数的`Object[]`实例。当提供自定义映射器时,在网关上不允许`default-payload-expression`属性和``元素。类似地,`payload-expression`属性和`
`元素在任何``元素上都是不允许的。 ##### 映射方法参数 以下示例展示了如何将方法参数映射到消息,并展示了无效配置的一些示例: ``` public interface MyGateway { void payloadAndHeaderMapWithoutAnnotations(String s, Map map); void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map map); void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y); void mapOnly(Map map); // the payload is the map and no custom headers are added void twoMapsAndOneAnnotatedWithPayload(@Payload Map payload, Map headers); @Payload("#args[0] + #args[1] + '!'") void payloadAnnotationAtMethodLevel(String a, String b); @Payload("@someBean.exclaim(#args[0])") void payloadAnnotationAtMethodLevelUsingBeanResolver(String s); void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s); void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); // (1) // invalid void twoMapsWithoutAnnotations(Map m1, Map m2); // invalid void twoPayloads(@Payload String s1, @Payload String s2); // invalid void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s); // invalid void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map map); } ``` |**1**|请注意,在本例中,spel 变量`#this`引用了参数——在本例中,是`s`的值。| |-----|----------------------------------------------------------------------------------------------------------------| 等效的 XML 看起来有些不同,因为方法参数没有`#this`上下文。但是,表达式可以通过使用`#args`变量来引用方法参数,如下例所示: ``` ``` #### `@MessagingGateway`注解 从版本 4.0 开始,网关服务接口可以用`@MessagingGateway`注释来标记,而不需要定义用于配置的``XML 元素。下面的两个示例比较了配置相同网关的两种方法: ``` ``` ``` @MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC", defaultHeaders = @GatewayHeader(name = "calledMethod", expression="#gatewayMethod.name")) public interface TestGateway { @Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200) String echo(String payload); @Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2")) String echoUpperCase(String payload); String echoViaDefault(String payload); } ``` | |与 XML 版本类似, Spring 集成在组件扫描期间发现这些注释时,将使用其消息传递基础设施创建`proxy`实现,
以执行此扫描并在应用程序上下文中注册`BeanDefinition`,将`@IntegrationComponentScan`注释添加到`@Configuration`类中。
标准`@ComponentScan`基础设施不处理接口。
因此,我们引入了自定义`@IntegrationComponentScan`逻辑,以细化接口上的`@MessagingGateway`注释,并为它们注册`GatewayProxyFactoryBean`实例。
另请参见[注释支持](./configuration.html#annotations)。| |---|| 与`@MessagingGateway`注释一起,你可以使用`@Profile`注释标记服务接口,以避免创建 Bean,如果这样的配置文件不是活动的。 | |如果没有 XML 配置,则在至少一个`@Configuration`类上需要`@EnableIntegration`注释。
参见[configuration and`@EnableIntegration`](./overview.html#configuration-enable-integration)以获取更多信息。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### 调用无参数方法 在没有任何参数的网关接口上调用方法时,默认的行为是从`PollableChannel`接收`Message`。 然而,有时你可能希望触发无参数方法,以便你可以与不需要用户提供的参数的其他下游组件进行交互,例如触发无参数 SQL 调用或存储过程。 要实现发送和接收语义,你必须提供一个有效负载。要生成有效负载,不需要接口上的方法参数。你可以在`method`元素上使用`@Payload`注释或在 XML 中使用`payload-expression`属性。下面的列表包括几个有效载荷可能是什么的例子: * 字面意义的字符串 * \#gatewaymethod.name * 新建 java.util.date() * @somebean.somemethod()的返回值 下面的示例展示了如何使用`@Payload`注释: ``` public interface Cafe { @Payload("new java.util.Date()") List retrieveOpenOrders(); } ``` 你也可以使用`@Gateway`注释。 ``` public interface Cafe { @Gateway(payloadExpression = "new java.util.Date()") List retrieveOpenOrders(); } ``` | |如果两个注释都存在(并且提供了`payloadExpression`),则`@Gateway`获胜。| |---|-------------------------------------------------------------------------------------------| 另见[带有注释和 XML 的网关配置](#gateway-configuration-annotations)。 如果一个方法没有参数,也没有返回值,但确实包含有效负载表达式,那么它将被视为只发送操作。 #### 调用`default`方法 网关代理的接口也可以有`default`方法,并且从版本 5.3 开始,该框架将`DefaultMethodInvokingMethodInterceptor`注入到代理中,以便使用`default`方法而不是代理方法来调用`java.lang.invoke.MethodHandle`方法。来自 JDK 的接口,例如`java.util.function.Function`,仍然可以用于网关代理,但是它们的`default`方法不能被调用,因为内部的 Java 安全原因导致针对 JDK 类的`MethodHandles.Lookup`实例化。这些方法还可以使用显式的`@Gateway`方法上的注释,或`proxyDefaultMethods`上的注释或`proxyDefaultMethods`上的注释或`@MessagingGateway`上的注释或``XML 组件来代理(失去它们的实现逻辑,同时,恢复以前的网关代理行为)。 #### 错误处理 网关调用可能会导致错误。默认情况下,下游发生的任何错误都会在网关的方法调用中“按原样”重新抛出。例如,考虑以下简单的流程: ``` gateway -> service-activator ``` 如果服务激活器调用的服务抛出一个`MyException`(例如),框架将其包装为`MessagingException`,并在`failedMessage`属性中将传递的消息附加到服务激活器。因此,框架执行的任何日志记录都具有故障的全部上下文。默认情况下,当异常被网关捕获时,`MyException`将被打开并抛给调用方。可以在网关方法声明中配置`throws`子句,以匹配原因链中的特定异常类型。例如,如果你希望捕获整个`MessagingException`带有所有消息传递信息的下行错误的原因,则应该具有类似于以下的网关方法: ``` public interface MyGateway { void performProcess() throws MessagingException; } ``` 由于我们鼓励 POJO 编程,因此你可能不希望将调用方暴露于消息传递基础结构中。 如果你的网关方法没有`throws`子句,则网关遍历原因树,查找不是`RuntimeException`的`RuntimeException`。如果没有找到,则框架抛出`MessagingException`。如果前面的讨论中的`MyException`的原因是`SomeOtherException`,并且你的方法`throws SomeOtherException`,那么网关将进一步展开该原因,并将其抛给调用者。 当网关声明为 no`service-interface`时,将使用内部框架接口`RequestReplyExchanger`。 考虑以下示例: ``` public interface RequestReplyExchanger { Message exchange(Message request) throws MessagingException; } ``` 在版本 5.0 之前,这个`exchange`方法没有`throws`子句,因此,异常被打开。如果你使用此接口并希望恢复以前的打开行为,请使用自定义的`service-interface`,或者自己访问`cause`中的`MessagingException`。 但是,你可能希望记录错误,而不是传播错误,或者你可能希望将异常视为有效的答复(通过将其映射到符合调用者理解的某些“错误消息”契约的消息)。为了实现这一点,网关通过包括对`error-channel`属性的支持,提供了对专门用于错误的消息通道的支持。在下面的示例中,“Transformer”从`Exception`创建一个答复`Message`: ``` ``` `exceptionTransformer`可以是一个简单的 POJO,它知道如何创建预期的错误响应对象。这将成为发送回调用者的有效负载。如果有必要,你可以在这样的“错误流程”中做更多复杂的事情。它可能涉及路由器(包括 Spring Integration 的`ErrorMessageExceptionTypeRouter`)、过滤器等等。然而,在大多数情况下,一个简单的“变压器”就足够了。 或者,你可能希望只记录异常(或将异常异步发送到某个地方)。如果你提供了单向流,那么将不会向呼叫者发送任何内容。如果希望完全抑制异常,可以提供对全局`nullChannel`方法的引用(本质上是`/dev/null`方法)。最后,正如上面提到的,如果没有`error-channel`被定义,那么异常将照常传播。 当你使用`@MessagingGateway`注释(参见`[`@messaginggateway` Annotation](#messaging-gateway-annotation)`)时,你可以使用`errorChannel`属性。 从版本 5.0 开始,当你使用带有`void`返回类型(单向流)的网关方法时,`error-channel`引用(如果提供)将在每个发送消息的标准`errorChannel`头中填充。该特性允许基于标准`ExecutorChannel`配置(或`QueueChannel`)的下游异步流覆盖默认的全局`errorChannel`异常发送行为。以前,你必须手动指定带有`@GatewayHeader`注释或`
`元素的`errorChannel`头。对于具有异步流的`void`方法,忽略了`error-channel`属性。相反,错误消息被发送到默认的`errorChannel`。 | |通过简单的 POJI 网关公开消息传递系统会带来好处,但“隐藏”底层消息传递系统的现实确实是要付出代价的,因此,你应该考虑某些事情。
我们希望我们的 Java 方法能够尽快返回,而不是在调用者等待它返回时无限期地挂起(是否无效,返回值,或抛出异常)。
当常规方法被用作消息传递系统前面的代理时,我们必须考虑到底层消息传递的潜在异步性质。
这意味着可能存在这样一种可能性,即网关发起的消息可能会被过滤器丢弃,而永远不会到达组件。这是产生回复的原因。
某些服务激活器方法可能会导致异常,从而不提供回复(因为我们不生成空消息)。
换句话说,多个场景可能会导致回复消息永远不会出现。
这在消息传递系统中是非常自然的。
但是,考虑一下网关方法的含义。网关的方法输入参数被合并到一条消息中,并向下游发送。
回复消息将被转换为网关方法的返回值,
因此,你可能需要确保,对于每个网关调用,总是有一条回复消息。,否则,你的网关方法可能永远不会返回并无限期地挂起。
处理这种情况的一种方法是使用异步网关(在本节稍后进行说明)。
另一种处理方法是显式地设置`reply-timeout`属性。
,网关的挂起时间不会超过`reply-timeout`指定的时间,如果超时过了,则返回’null’。最后,你可能需要考虑在服务激活器上设置下游标志,例如’requires-reply’,或者在过滤器上设置’throw-exceptions-on-reference’。本章的最后一节将对这些选项进行更详细的讨论。| |---|| | |如果下游流返回一个`ErrorMessage`,则其`payload`(a`Throwable`)被视为一个常规的下游错误。
如果配置了一个`error-channel`,则将其发送到错误流。
否则,有效负载将被抛给网关的调用者。,如果`error-channel`上的错误流返回`ErrorMessage`,它的有效负载被抛给调用者。
这同样适用于任何具有`Throwable`有效负载的消息。
当你需要将`Exception`直接传播给调用者时,这在异步情况下很有用。,
这样做,你可以返回一个`Exception`(作为来自某个服务的`reply`),也可以抛出它,
通常情况下,即使使用异步流,框架负责将下游流引发的异常传播回网关。
[TCP 客户机-服务器多路复用](https://github.com/spring-projects/spring-integration-samples/tree/main/intermediate/tcp-client-server-multiplex)示例演示了这两种技术都可以将异常返回给调用者。
它通过使用`aggregator`和`group-timeout`(参见[聚合器和组超时](./aggregator.html#agg-and-group-to))以及在 discard 流上的`MessagingTimeoutException`答复来模拟套接字 IO 错误到等待线程。| |---|| #### 网关超时 网关有两个超时属性:`requestTimeout`和`replyTimeout`。请求超时仅在通道可以阻塞(例如,有界`QueueChannel`已满)的情况下才适用。`replyTimeout`值是网关等待回复或返回`null`的时间。它默认值为无穷大。 对于网关(`defaultRequestTimeout`和`defaultReplyTimeout`)或`MessagingGateway`接口注释上的所有方法,超时都可以设置为默认值。单个方法可以覆盖这些默认值(在``子元素中)或在`@Gateway`注释上。 从版本 5.0 开始,超时可以定义为表达式,如下例所示: ``` @Gateway(payloadExpression = "#args[0]", requestChannel = "someChannel", requestTimeoutExpression = "#args[1]", replyTimeoutExpression = "#args[2]") String lateReply(String payload, long requestTimeout, long replyTimeout); ``` 评估上下文有一个`BeanResolver`(使用`@someBean`引用其他 bean),并且`#args`数组变量是可用的。 在使用 XML 进行配置时,超时属性可以是长值或 SPEL 表达式,如下例所示: ``` ``` #### 异步网关 作为一种模式,消息传递网关提供了一种很好的方式来隐藏特定于消息传递的代码,同时仍然公开消息传递系统的全部功能。由于[前面描述的](#gateway-proxy),`GatewayProxyFactoryBean`提供了一种方便的方式,可以通过服务接口公开代理,从而使你能够基于 POJO 访问消息传递系统(基于你自己的域中的对象、原语/字符串或其他对象)。但是,当网关通过返回值的简单 POJO 方法公开时,这意味着对于每个请求消息(在调用方法时生成),必须有一个答复消息(在方法返回时生成)。由于消息传递系统是自然异步的,因此你可能不能总是保证“对于每个请求,总是会有一个答复”的契约。 Spring Integration2.0 引入了对异步网关的支持,它提供了一种方便的方式来初始化流,当你可能不知道是否需要回复或回复需要多长时间才能到达时。 为了处理这些类型的场景, Spring 集成使用`java.util.concurrent.Future`实例来支持异步网关。 从 XML 配置来看,没有任何变化,你仍然以与定义常规网关相同的方式定义异步网关,如下例所示: ``` ``` 然而,网关接口(一种服务接口)有一点不同,如下所示: ``` public interface MathServiceGateway { Future multiplyByTwo(int i); } ``` 如前面的示例所示,网关方法的返回类型是`Future`。当`GatewayProxyFactoryBean`看到网关方法的返回类型是`Future`时,它通过使用`AsyncTaskExecutor`立即切换到异步模式。这就是差异的程度。对这种方法的调用总是以`Future`实例立即返回。然后,你可以按照自己的速度与`Future`进行交互,以获得结果、取消等等。同样,与使用`Future`实例的任何其他情况一样,调用`get()`可能会显示超时、执行异常等。下面的示例展示了如何使用从异步网关返回的`Future`: ``` MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class); Future result = mathService.multiplyByTwo(number); // do something else here since the reply might take a moment int finalResult = result.get(1000, TimeUnit.SECONDS); ``` 有关更详细的示例,请参见 Spring 集成示例中的[异步网关](https://github.com/spring-projects/spring-integration-samples/tree/main/intermediate/async-gateway)示例。 ##### `ListenableFuture` 从版本 4.1 开始,异步网关方法还可以返回`ListenableFuture`(在 Spring Framework4.0 中引入)。这些返回类型允许你提供回调,在结果可用(或发生异常)时调用该回调。当网关检测到此返回类型并且[任务执行者](#gateway-asynctaskexecutor)是`AsyncListenableTaskExecutor`时,将调用执行器的`submitListenable()`方法。下面的示例展示了如何使用`ListenableFuture`: ``` ListenableFuture result = this.asyncGateway.async("something"); result.addCallback(new ListenableFutureCallback() { @Override public void onSuccess(String result) { ... } @Override public void onFailure(Throwable t) { ... } }); ``` ##### `AsyncTaskExecutor` 默认情况下,`GatewayProxyFactoryBean`在为返回类型为`Future`的任何网关方法提交内部`AsyncInvocationTask`实例时使用`org.springframework.core.task.SimpleAsyncTaskExecutor`。但是,``元素的配置中的`async-executor`属性允许你提供对 Spring 应用程序上下文中可用的`java.util.concurrent.Executor`的任何实现的引用。 (默认)`SimpleAsyncTaskExecutor`同时支持`Future`和`ListenableFuture`返回类型,分别返回`FutureTask`或`ListenableFutureTask`。参见[`CompletableFuture`]。尽管有一个默认的执行器,但提供一个外部的执行器通常是有用的,这样你就可以在日志中标识它的线程(当使用 XML 时,线程名称是基于执行器的 Bean 名称),如下例所示: ``` @Bean public AsyncTaskExecutor exec() { SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor(); simpleAsyncTaskExecutor.setThreadNamePrefix("exec-"); return simpleAsyncTaskExecutor; } @MessagingGateway(asyncExecutor = "exec") public interface ExecGateway { @Gateway(requestChannel = "gatewayChannel") Future doAsync(String foo); } ``` 如果你希望返回一个不同的`Future`实现,那么你可以提供一个自定义执行器,或者完全禁用该执行器,并从下游流返回回复消息有效负载中的`Future`。要禁用执行器,请在`GatewayProxyFactoryBean`中将其设置为`null`(通过使用`setAsyncTaskExecutor(null)`)。当使用 XML 配置网关时,使用`async-executor=""`。当使用`@MessagingGateway`注释进行配置时,请使用类似于以下代码的代码: ``` @MessagingGateway(asyncExecutor = AnnotationConstants.NULL) public interface NoExecGateway { @Gateway(requestChannel = "gatewayChannel") Future doAsync(String foo); } ``` | |如果返回类型是特定的具体`Future`实现或配置的执行器不支持的其他子接口,则流在调用方的线程上运行,流必须在应答消息有效负载中返回所需的类型。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ##### `CompletableFuture` 从版本 4.2 开始,网关方法现在可以返回`CompletableFuture`。返回此类型时有两种操作模式: * 当提供了异步执行器并且返回类型正好是`CompletableFuture`(不是子类)时,框架在执行器上运行任务,并立即将`CompletableFuture`返回给调用者。`CompletableFuture.supplyAsync(Supplier supplier, Executor executor)`用于创建 future。 * 当异步执行器显式地设置为`null`并且返回类型是`CompletableFuture`或者返回类型是`CompletableFuture`的子类时,将在调用者的线程上调用该流。在这种情况下,预计下游流将返回适当类型的`CompletableFuture`。 ###### 使用场景 在下面的场景中,调用者线程立即返回一个`CompletableFuture`,当下游流回应到网关(使用`Invoice`对象)时,这个过程就完成了。 ``` CompletableFuture order(Order order); ``` ``` ``` 在下面的场景中,当下游流将调用者线程作为对网关的响应的有效负载提供时,调用者线程返回`CompletableFuture`。当发票准备好时,其他一些过程必须完成。 ``` CompletableFuture order(Order order); ``` ``` ``` 在下面的场景中,当下游流将调用者线程作为对网关的响应的有效负载提供时,调用者线程返回`CompletableFuture`。当发票准备好时,其他一些过程必须完成。如果启用了`DEBUG`日志记录,则会发出一个日志条目,表示异步执行器不能用于此场景。 ``` MyCompletableFuture order(Order order); ``` ``` ``` `CompletableFuture`实例可用于对回复执行额外的操作,如下例所示: ``` CompletableFuture process(String data); ... CompletableFuture result = process("foo") .thenApply(t -> t.toUpperCase()); ... String out = result.get(10, TimeUnit.SECONDS); ``` ##### 反应堆`Mono` 从版本 5.0 开始,`GatewayProxyFactoryBean`允许使用带有网关接口方法的[项目反应堆](https://projectreactor.io/),使用[`Mono`](https://github.com/reactor/reactor-core)返回类型。内部`AsyncInvocationTask`包装在`Mono.fromCallable()`中。 可以使用`Mono`稍后检索结果(类似于`Future`),或者你可以在结果返回到网关时通过调用你的`Consumer`与 Dispatcher 一起使用它。 | |`Mono`不会被框架立即刷新。
因此,在网关方法返回之前,底层消息流不会启动(就像`Future``Executor`任务一样)。
当订阅`Mono`时,该流开始。
或者,当`subscribe()`与整个`Flux`相关时,`Mono`(作为一个“可组合”)可能是反应器流的一部分。
下面的示例展示了如何使用 Project Reactor 创建网关:| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ``` @MessagingGateway public static interface TestGateway { @Gateway(requestChannel = "promiseChannel") Mono multiply(Integer value); } ... @ServiceActivator(inputChannel = "promiseChannel") public Integer multiply(Integer value) { return value * 2; } ... Flux.just("1", "2", "3", "4", "5") .map(Integer::parseInt) .flatMap(this.testGateway::multiply) .collectList() .subscribe(integers -> ...); ``` 另一个使用 Project Reactor 的示例是一个简单的回调场景,如下例所示: ``` Mono mono = service.process(myOrder); mono.subscribe(invoice -> handleInvoice(invoice)); ``` 调用线程继续,当流完成时调用`handleInvoice()`。 ##### 下游流返回异步类型 正如上面的`ListenableFuture`小节中提到的,如果你希望某些下游组件返回带有异步负载的消息(`Future`,`Mono`,以及其他),则必须显式地将异步执行器设置为`null`(或者在使用 XML 配置时`""`)。然后在调用者线程上调用该流,然后可以检索结果。 ##### `void`返回类型 与前面提到的返回类型不同,当方法返回类型是`void`时,框架不能隐式地确定你希望下游流异步运行,而调用者线程立即返回。在这种情况下,你必须用`@Async`注释接口方法,如下例所示: ``` @MessagingGateway public interface MyGateway { @Gateway(requestChannel = "sendAsyncChannel") @Async void sendAsync(String payload); } ``` 与`Future`返回类型不同,如果流引发了某些异常,则无法通知调用方,除非某些自定义`TaskExecutor`(例如`ErrorHandlingTaskExecutor`)与`@Async`注释相关联。 #### 未收到响应时的网关行为 由于[早些时候解释过](#gateway-proxy),网关提供了一种通过 POJO 方法调用与消息传递系统进行交互的便捷方式。但是,通常期望总是返回(即使有异常)的典型方法调用可能并不总是将一对一映射到消息交换(例如,回复消息可能不会到达——相当于方法不返回)。 本节的其余部分涵盖了各种场景,以及如何使网关的行为更具可预测性。可以对某些属性进行配置,以使同步网关行为更可预测,但其中一些属性可能并不总是如你所期望的那样工作。其中一个是`reply-timeout`(在方法级别或`default-reply-timeout`在网关级别)。我们检查`reply-timeout`属性,以查看它如何能够和不能在各种场景中影响同步网关的行为。我们研究了单线程场景(下游的所有组件都通过直接通道连接)和多线程场景(例如,在下游的某个地方,你可能有一个打破单线程边界的 pollable 或 executor 通道)。 ##### 长期运行的流程下游 单线程同步网关 如果下游组件仍在运行(可能是由于无限循环或服务速度较慢),则设置`reply-timeout`没有任何作用,并且网关方法调用直到下游服务退出(通过返回或抛出异常)才返回。 同步网关,多线程 如果多线程消息流中的下游组件仍在运行(可能是由于无限循环或服务速度较慢),那么设置`reply-timeout`会产生一种效果,即允许网关方法调用在超时后返回,因为`GatewayProxyFactoryBean`在回复通道上轮询,等待消息,直到超时结束。但是,如果在产生实际的回复之前已经达到超时,则可能导致网关方法返回“null”。你应该理解,在网关方法调用返回之后,回复消息(如果产生)将被发送到一个回复通道,因此你必须意识到这一点,并在设计流程时将其考虑在内。 ##### 下游组件返回’null’ 同步网关——单线程 如果组件下游返回’null’,并且没有配置`reply-timeout`,则网关方法调用将无限期地挂起,除非已经配置了`reply-timeout`或者在可能返回’null’的下游组件(例如,服务激活器)上设置了`requires-reply`属性。在这种情况下,将抛出一个异常并将其传播到网关。 同步网关——多线程 行为与前一例相同。 ##### 下游组件返回签名为“void”,而网关方法签名为非 void 同步网关——单线程 如果组件下游返回“void”,并且没有配置`reply-timeout`,则网关方法调用将无限期地挂起,除非配置了`reply-timeout`。 同步网关——多线程 行为与前一例相同。 ##### 下游组件导致运行时异常 同步网关——单线程 如果组件下游抛出一个运行时异常,则该异常将通过一条错误消息传播回网关并重新抛出。 同步网关——多线程 行为与前一例相同。 | |你应该理解,默认情况下,`reply-timeout`是无界的。,因此,如果不显式地设置`reply-timeout`,则网关方法调用可能会无限期地挂起,
,为了确保你分析了你的流,并且如果存在发生这些场景中的一种远程可能性,你应该将`reply-timeout`属性设置为“’safe’”值,
甚至更好,你可以将下游组件的`requires-reply`属性设置为“true”,以确保及时响应,当下游组件内部返回 null 时,抛出异常就会产生异常。
但是你也应该意识到在某些情况下(参见[第一个](#long-running-process-downstream)),`reply-timeout`并没有帮助,
这意味着分析消息流并决定何时使用同步网关而不是异步网关也很重要,
as[前面描述的](#async-gateway),后一种情况是定义返回`Future`实例的网关方法,
然后保证接收到该返回值,并且对调用结果有更细粒度的控制。
同样,在处理路由器时,你应该记住,将`resolution-required`属性设置为“true”会导致路由器在无法解析特定通道时引发异常,
同样,在处理过滤器时,你可以设置`throw-exception-on-rejection`属性,在这两种情况下,
都是这样,生成的流的行为就像它包含一个具有“requires-reply”属性的服务激活器。
换句话说,它有助于确保网关方法调用的及时响应。| |---|| | |`reply-timeout`对于``元素是无界的(由`GatewayProxyFactoryBean`创建)。
用于外部集成的入站网关(WS、HTTP 等)与这些网关共享许多特性和属性。
但是,对于那些入站网关,默认的`reply-timeout`是 1000 毫秒(一秒)。
如果对另一个线程进行了下游异步切换,则可能需要增加此属性,以便在网关超时之前为流留出足够的时间来完成。| |---|| | |你应该理解,定时器是在线程返回网关时启动的——也就是说,当流完成或将消息传递给另一个线程时,
此时,调用线程开始等待回复。,
如果流是完全同步的,这个回复是立即可用的。
对于异步流,线程等待的时间最多到这个时候。| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 有关通过`IntegrationFlows`定义网关的选项,请参见 Java DSL 章节中的[`IntegrationFlow`as gateway](./dsl.html#integration-flow-as-gateway)。 ### 服务激活器 服务激活器是用于将任何 Spring 管理的对象连接到输入通道的端点类型,以便它可以扮演服务的角色。如果服务产生输出,它也可以连接到一个输出通道。或者,输出产生服务可以位于处理管道或消息流的末端,在这种情况下,可以使用入站消息的`replyChannel`头。如果没有定义输出通道,这是默认的行为。与这里描述的大多数配置选项一样,相同的行为实际上也适用于大多数其他组件。 #### 配置服务激活器 要创建服务激活器,请使用带有“input-channel”和“ref”属性的“service-activator”元素,如下例所示: ``` ``` 前面的配置从`exampleHandler`中选择满足消息传递要求之一的所有方法,如下所示: * 用`@ServiceActivator`注释 * is`public` * 如果`requiresReply == true`不返回`void` 在运行时调用的目标方法是通过它们的`payload`类型为每个请求消息选择的,或者作为对`Message`类型的回退,如果这样的方法存在于目标类中的话。 从版本 5.0 开始,一个服务方法可以用`@org.springframework.integration.annotation.Default`标记,作为所有不匹配情况的后备。当使用[内容类型转换](./endpoint.html#content-type-conversion)并在转换后调用目标方法时,这可能是有用的。 要委托给任何对象的显式定义的方法,你可以添加`method`属性,如下例所示: ``` ``` 在这两种情况下,当服务方法返回一个非空值时,端点尝试将应答消息发送到适当的应答通道。要确定应答通道,首先要检查端点配置中是否提供了`output-channel`,如下例所示: ``` ``` 如果方法返回一个结果,并且没有`output-channel`被定义,那么框架将检查请求消息的`replyChannel`标头值。如果该值是可用的,那么它将检查其类型。如果是`MessageChannel`,则将回复消息发送到该通道。如果是`String`,则端点尝试将通道名称解析为通道实例。如果通道不能解析,则抛出一个`DestinationResolutionException`。如果它能被解决,消息就会被发送到那里。如果请求消息没有`replyChannel`头,并且`reply`对象是`Message`,则其`replyChannel`头将用于查询目标目的地。这是 Spring 集成中用于请求-回复消息传递的技术,也是返回地址模式的示例。 如果你的方法返回一个结果,并且希望丢弃它并结束流,那么你应该配置`output-channel`以发送到`NullChannel`。为了方便起见,该框架注册了一个名为`nullChannel`的框架。有关更多信息,请参见[特殊频道](./channel.html#channel-special-channels)。 服务激活器是不需要生成回复消息的组件之一。如果你的方法返回`null`或具有`void`返回类型,则服务激活器在方法调用后退出,不带任何信号。这种行为可以通过`AbstractReplyProducingMessageHandler.requiresReply`选项进行控制,当使用 XML 名称空间进行配置时,该选项也会以`requires-reply`的形式公开。如果将标记设置为`true`,并且方法返回 null,则抛出一个`ReplyRequiredException`。 服务方法中的参数可以是消息,也可以是任意类型。如果是后者,则假定它是一个消息负载,该负载从消息中提取并注入到服务方法中。我们通常推荐这种方法,因为在使用 Spring 集成时,它遵循并促进了 POJO 模型。参数也可以有`@Header`或`@Headers`注释,如[注释支持](./configuration.html#annotations)中所述。 | |服务方法不需要有任何参数,这意味着你可以实现事件风格的服务激活器(在这里你所关心的只是对服务方法的调用)不要担心消息的内容。
将其视为空的 JMS 消息。
这种实现的一个示例用例是一个简单的计数器或消息监视器,它存储在输入通道上。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 从版本 4.1 开始,该框架将消息属性(`payload`和`headers`)正确地转换为 Java8`Optional`POJO 方法参数,如下例所示: ``` public class MyBean { public String computeValue(Optional payload, @Header(value="foo", required=false) String foo1, @Header(value="foo") Optional foo2) { if (payload.isPresent()) { String value = payload.get(); ... } else { ... } } } ``` 如果自定义服务激活器处理程序实现可以在其他``定义中重用,我们通常建议使用`ref`属性。然而,如果自定义服务激活器处理程序实现仅在``的单个定义内使用,则可以提供内部 Bean 定义,如以下示例所示: ``` ``` | |不允许在同一个``配置中同时使用`ref`属性和内部处理程序定义,因为它会创建一个模棱两可的条件并导致抛出异常。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | |如果`ref`属性引用扩展`AbstractMessageProducingHandler`的 Bean(例如框架本身提供的处理程序),则通过将输出通道直接注入处理程序来优化配置。,在这种情况下,
,每个`ref`必须是单独的 Bean 实例(或`prototype`-作用域 Bean)或使用内部的``配置类型。
如果无意中从多个 bean 引用了相同的消息处理程序,则会出现配置异常。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ##### 服务激活器和 Spring 表达式语言 Spring 集成 2.0 以来,服务激活器还可以受益于[SpEL](https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#expressions)。 例如,可以调用任何 Bean 方法,而不指向`ref`属性中的 Bean,或者将其作为内部 Bean 定义包括在内,如下所示: ``` ``` 在前面的配置中,我们使用 spel 的`@beanId`表示法,并调用一个与消息有效负载兼容的类型的方法,而不是通过使用`ref`或作为内部 Bean 注入“AccountService”。我们还传递一个标头值。可以针对消息中的任何内容计算任何有效的 SPEL 表达式。对于简单的场景,如果所有逻辑都可以封装在这样的表达式中,那么你的服务激活器就不需要引用 A Bean,如下例所示: ``` ``` 在前面的配置中,我们的服务逻辑是将有效载荷值乘以 2。SPEL 让我们相对容易地处理它。 有关配置服务激活器的更多信息,请参见 Java DSL 章节中的[服务激活器和`.handle()`方法](./dsl.html#java-dsl-handle)。 #### 异步服务激活器 服务激活器由调用线程调用。如果输入通道是`SubscribableChannel`或`PollableChannel`的 poller 线程,则这是一个上游线程。如果服务返回一个`ListenableFuture`,那么默认的操作是将其作为消息的有效负载发送到输出(或回复)通道。从版本 4.3 开始,你现在可以将`async`属性设置为`true`(在使用 Java 配置时使用`Object`)。如果当`async`属性被设置为`true`时,服务返回一个`ListenableFuture`,则调用线程将立即被释放,并在完成将来的线程(从你的服务中)上发送答复消息。这对于使用`PollableChannel`的长时间运行的服务特别有利,因为 Poller 线程被释放以在框架内执行其他服务。 如果服务使用`Exception`完成 future,则发生正常的错误处理。如果存在`ErrorMessage`消息头,则将其发送到`errorChannel`消息头。否则,将把`ErrorMessage`发送到默认的`errorChannel`(如果可用)。 #### 服务激活器和方法返回类型 服务方法可以返回成为回复消息有效负载的任何类型。在这种情况下,将创建一个新的`Message`对象,并复制来自请求消息的所有标题。当交互是基于 POJO 方法调用时,对于大多数 Spring 集成`MessageHandler`实现,这以相同的方式工作。 也可以从该方法返回一个完整的`Message`对象。但是请记住,与[变形金刚](./transformer.html#transformer)不同的是,对于服务激活器,如果返回的消息中不存在标题,则将通过从请求消息中复制标题来修改此消息。因此,如果你的方法参数是`Message`,并且你在服务方法中复制了一些(但不是全部)现有的头,那么它们将在回复消息中重新出现。从回复消息中删除头不是服务激活器的责任,并且,遵循松耦合原则,最好在集成流中添加`HeaderFilter`。或者,可以使用 Transformer 代替服务激活器,但是,在这种情况下,当返回完整的`Message`时,该方法完全负责消息,包括复制请求消息头(如果需要)。你必须确保重要的框架标题(例如`replyChannel`,`errorChannel`)如果存在,就必须保留。 ### 延迟器 延迟器是一个简单的端点,它允许消息流延迟一定的时间间隔。当消息延迟时,原始发件人不会阻塞。相反,延迟消息被调度为一个`org.springframework.scheduling.TaskScheduler`的实例,以便在延迟通过后将其发送到输出通道。这种方法即使对于相当长的延迟也是可伸缩的,因为它不会导致大量的发件人线程被阻塞。相反,在典型的情况下,线程池用于实际执行消息的释放。本节包含几个配置延迟器的示例。 #### 配置延迟器 ``元素用于延迟两个消息通道之间的消息流。与其他端点一样,你可以提供“输入通道”和“输出通道”属性,但延迟器也具有“默认延迟”和“表达式”属性(以及“表达式”元素),它们决定每条消息应该延迟的毫秒数。以下示例将所有消息延迟 3 秒钟: ``` ``` 如果需要确定每条消息的延迟,还可以使用“expression”属性提供 SPEL 表达式,如下所示: Java DSL ``` @Bean public IntegrationFlow flow() { return IntegrationFlows.from("input") .delay("delayer.messageGroupId", d -> d .defaultDelay(3_000L) .delayExpression("headers['delay']")) .channel("output") .get(); } ``` Kotlin DSL ``` @Bean fun flow() = integrationFlow("input") { delay("delayer.messageGroupId") { defaultDelay(3000L) delayExpression("headers['delay']") } channel("output") } ``` Java ``` @ServiceActivator(inputChannel = "input") @Bean public DelayHandler delayer() { DelayHandler handler = new DelayHandler("delayer.messageGroupId"); handler.setDefaultDelay(3_000L); handler.setDelayExpressionString("headers['delay']"); handler.setOutputChannelName("output"); return handler; } ``` XML ``` ``` 在前面的示例中,只有当表达式对给定的入站消息计算为 null 时,才会出现三秒的延迟。如果只想对表达式求值结果有效的消息应用延迟,可以使用`0`(默认值)的“默认延迟”。对于延迟`0`(或更短)的任何消息,该消息将立即在调用线程上发送。 | |XML 解析器使用消息组 ID`.messageGroupId`。| |---|----------------------------------------------------------------------| | |延迟处理程序支持表示以毫秒为单位的间隔的表达式求值结果(其`Object`方法产生的值可以解析为`Long`)以及表示绝对时间的`java.util.Date`实例。,在第一种情况下,
,毫秒是从当前时间开始计算的(例如,
的值`5000`将使消息从延迟者接收到的时间起至少延迟 5 秒)。
使用`Date`实例,直到`Date`对象表示的时间,消息才会被释放。,
等于非正延迟或过去日期的值不会导致延迟。相反,
,它被直接发送到原始发送者线程上的输出通道。
如果表达式求值结果不是`Date`且不能解析为`Long`,则应用默认延迟(如果有的话—默认为`0`)。| |---|| | |表达式求值可能由于各种原因抛出求值异常,包括无效的表达式或其他条件。
默认情况下,这样的异常将被忽略(尽管在调试级别上进行了记录),并且延迟器将回落到默认的延迟(如果有)。
你可以通过设置`ignore-expression-failures`属性来修改此行为。
默认情况下,此属性被设置为`true`,延迟器行为与前面描述的一样。
但是,如果你不希望忽略表达式求值异常并将它们抛给延迟者的调用者,请将`ignore-expression-failures`属性设置为`false`。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | |在前面的例子中,将延迟表达式指定为`headers['delay']`。
这是 SPEL`Indexer`语法,用于访问`Map`元素(`MessageHeaders`实现
)。
它调用:`headers.get("delay")`。对于不包含’的简单映射元素名称(
),也可以使用 SPEL“dot accessor”语法,如果前面显示的头表达式可以指定为`headers.delay`。
但是,如果缺少头表达式,则会获得不同的结果。
在第一种情况下,表达式的计算结果为`null`。
第二种结果类似于以下结果:
```
org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8):
Field or property 'delay' cannot be found on object of type 'org.springframework.messaging.MessageHeaders'
```
因此,
如果存在省略标题的可能性,并且你希望返回到默认的延迟,那么使用 Indexer 语法而不是 Dot Property Accessor 语法通常会更有效(建议使用),因为检测空比捕获异常更快。| |---|| 延迟器将委托给 Spring 的`TaskScheduler`抽象的实例。延迟器使用的默认调度程序是由 Spring Integration 在启动时提供的`ThreadPoolTaskScheduler`实例。见[配置任务调度程序](./configuration.html#namespace-taskscheduler)。如果你想委托给一个不同的计划程序,你可以通过 delayer 元素的’scheduler’属性提供一个引用,如下例所示: ``` ``` | |如果你配置了一个外部`ThreadPoolTaskScheduler`,则可以在此属性上设置`waitForTasksToCompleteOnShutdown = true`。,
它允许在应用程序关机时成功完成已经处于执行状态的“延迟”任务(释放消息),
在 Spring 集成 2.2 之前,此属性在``元素上可用,因为`ThreadPoolTaskScheduler`可以在后台创建自己的调度程序。
自 2.2 以来,延迟器需要一个外部调度程序实例,并且`waitForTasksToCompleteOnShutdown`已被删除。
你应该使用调度程序自己的配置。| |---|| | |`ThreadPoolTaskScheduler`具有一个属性`Map`,该属性可以与`org.springframework.util.ErrorHandler`的一些实现一起注入。
该处理程序允许从发送延迟消息的调度任务的线程处理`Exception`,默认情况下,它使用`org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler`,你可以在日志中看到堆栈跟踪,
你可能需要考虑使用`org.springframework.integration.channel.MessagePublishingErrorHandler`,它将`ErrorMessage`发送到`error-channel`,要么从失败消息的消息头,要么进入默认的`error-channel`。
此错误处理是在事务回滚(如果存在)后执行的。
参见[发布失败](#delayer-release-failures)。| |---|| #### 延迟器和消息存储 `DelayHandler`将延迟消息保存到提供的`MessageStore`中的消息组中。(“groupid”基于``元素所需的“id”属性。)在`DelayHandler`将消息发送到`output-channel`之前,调度任务将从`MessageStore`中删除一条延迟消息。如果提供的`MessageStore`是持久性的(例如`JdbcMessageStore`),则提供了在应用程序关闭时不丢失消息的能力。在应用程序启动之后,`ErrorMessage`从其消息组中的[表达式评估建议](#expression-advice)中读取消息,并根据消息的原始到达时间(如果延迟是数值的话)重新安排它们的延迟。对于延迟报头为`Date`的消息,在重新调度时使用`Date`。如果延迟消息在`TaskScheduler`中的停留时间超过其“延迟”,则在启动后立即发送该消息。 ``可以用两个相互排斥的元素中的任意一个来丰富:``和``。这些 AOP 通知中的`List`应用于代理的内部`DelayHandler.ReleaseMessageHandler`,该代理负责在延迟之后在调度任务的`Thread`上释放消息。例如,当下游消息流抛出一个异常并且回滚`ReleaseMessageHandler`的事务时,可以使用它。在这种情况下,延迟消息保持在持久的`MessageStore`中。你可以在``中使用任何自定义的`org.aopalliance.aop.Advice`实现。``元素定义了一个简单的建议链,该建议链仅包含事务性建议。下面的示例显示了`advice-chain`中的``: ``` ``` `DelayHandler`可以导出为带有托管操作(`MBean`和`reschedulePersistedMessages`)的 JMX`MBean`,这允许在运行时重新安排延迟的持久消息——例如,如果`TaskScheduler`先前已被停止。可以通过`Control Bus`命令调用这些操作,如下例所示: ``` Message delayerReschedulingMessage = MessageBuilder.withPayload("@'delayer.handler'.reschedulePersistedMessages()").build(); controlBusChannel.send(delayerReschedulingMessage); ``` | |有关消息存储、JMX 和控制总线的更多信息,请参见`MessageStore`。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------| 从版本 5.3.7 开始,如果将消息存储到`MessageStore`时事务处于活动状态,则在`TransactionSynchronization.afterCommit()`回调中调度发布任务。这对于防止竞争情况是必要的,在这种情况下,调度的发布可以在事务提交之前运行,并且找不到消息。在这种情况下,消息将在延迟之后发布,或者在事务提交之后发布,以较晚者为准。 #### 发行失败 从版本 5.0.8 开始,DeLayer 上有两个新属性: * `maxAttempts`(默认 5) * (默认 1 秒) 当消息被释放时,如果下游流失败,将在`retryDelay`之后尝试释放。如果达到`maxAttempts`,则该消息将被丢弃(除非该发布是事务性的,在这种情况下,该消息将保留在存储区中,但将不再计划发布,直到重新启动应用程序,或者调用`reschedulePersistedMessages()`方法,如上所述)。 此外,你还可以配置`delayedMessageErrorChannel`;当发布失败时,一个`ErrorMessage`被发送到该通道,异常作为有效负载,并具有`originalMessage`属性。`ErrorMessage`包含一个包含当前计数的头`IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT`。 如果错误流使用错误消息并正常退出,则不会采取进一步的操作;如果发布是事务性的,则事务将提交,消息将从存储中删除。如果错误流抛出一个异常,则释放将被重试到`maxAttempts`,正如上面讨论的那样。 ### 脚本支持 Spring Integration2.1 增加了对[面向 Java 规范的 JSR223 脚本](https://www.jcp.org/en/jsr/detail?id=223)的支持,在 Java 版本 6 中引入。它允许你使用以任何受支持的语言(包括 Ruby、JRuby、Groovy 和 Kotlin)编写的脚本来为各种集成组件提供逻辑,类似于 Spring 表达式语言在 Spring 集成中的使用方式。有关 JSR223 的更多信息,请参见[文件](https://docs.oracle.com/javase/8/docs/technotes/guides/scripting/prog_guide/api.html)。 | |从 Java11 开始,Nashorn JavaScript 引擎已被弃用,可能会在 Java15 中删除。
建议从现在开始重新考虑使用其他脚本语言。| |---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 你需要在项目中包含此依赖项: Maven ``` org.springframework.integration spring-integration-scripting 5.5.9 ``` Gradle ``` compile "org.springframework.integration:spring-integration-scripting:5.5.9" ``` 此外,还需要添加一个脚本引擎实现,例如 JRuby、Jython。 从版本 5.2 开始, Spring 集成提供了 Kotlin JSR223 支持。你需要将这些依赖项添加到项目中以使其正常工作: Maven ``` org.jetbrains.kotlin kotlin-script-util runtime org.jetbrains.kotlin kotlin-compiler-embeddable runtime org.jetbrains.kotlin kotlin-scripting-compiler-embeddable runtime ``` Gradle ``` runtime 'org.jetbrains.kotlin:kotlin-script-util' runtime 'org.jetbrains.kotlin:kotlin-compiler-embeddable' runtime 'org.jetbrains.kotlin:kotlin-scripting-compiler-embeddable' ``` 所提供的`kotlin`所选择的`kotlin`语言指示器或脚本文件带有`.kts`扩展名。 为了使用 JVM 脚本语言,必须在类路径中包含该语言的 JSR223 实现。[Groovy](https://groovy-lang.org/)和[JRuby](https://www.jruby.org)项目在其标准发行版中提供了 JSR233 支持。 | |第三方开发了各种 JSR223 语言实现。
特定实现与 Spring 集成的兼容性取决于它与规范的一致性以及实现方对规范的解释。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | |如果你计划使用 Groovy 作为脚本语言,我们建议你使用[Spring-Integration’s Groovy Support](./groovy.html#groovy),因为它提供了 Groovy 特有的其他功能。
但是,这一部分也是相关的。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### 脚本配置 根据集成需求的复杂性,脚本可以作为 XML 配置中的 CDATA 内联提供,也可以作为对包含该脚本的 Spring 资源的引用提供。 Spring 为了启用脚本支持,集成定义了一个`headers`,它将消息有效负载绑定到一个名为`payload`的变量,并将消息头绑定到一个`headers`变量,这两个变量都可以在脚本执行上下文中访问。你所需要做的就是编写一个使用这些变量的脚本。以下两个示例展示了创建筛选器的示例配置: Java DSL ``` @Bean public IntegrationFlow scriptFilter() { return f -> f.filter(Scripts.processor("some/path/to/ruby/script/RubyFilterTests.rb")); } ... @Bean public Resource scriptResource() { return new ByteArrayResource("headers.type == 'good'".getBytes()); } @Bean public IntegrationFlow scriptFilter() { return f -> f.filter(Scripts.processor(scriptResource()).lang("groovy")); } ``` XML ``` ``` 如前面的示例所示,该脚本可以内联地包含,也可以通过引用资源位置(通过使用`location`属性)来包含。此外,`lang`属性对应于语言名称(或其 JSR223 别名)。 Spring 其他支持脚本的集成端点元素包括`router`、`service-activator`、`Lifecycle`和`splitter`。在每种情况下,脚本配置都将与上述相同(除了 Endpoint 元素)。 脚本支持的另一个有用的特性是无需重新启动应用程序上下文就可以更新(重新加载)脚本。要这样做,请在`script`元素上指定`refresh-check-delay`属性,如下例所示: Java DSL ``` Scripts.processor(...).refreshCheckDelay(5000) } ``` XML ``` ``` 在前面的示例中,每 5 秒检查一次脚本位置的更新。如果脚本已更新,则自更新后 5 秒内发生的任何调用都会导致新脚本的运行。 考虑以下示例: Java DSL ``` Scripts.processor(...).refreshCheckDelay(0) } ``` XML ``` ``` 在前面的示例中,一旦发生这样的修改,就会用任何脚本修改来更新上下文,从而为“实时”配置提供了一种简单的机制。任何负值都意味着在初始化应用程序上下文后不会重新加载脚本。这是默认的行为。下面的示例显示了一个永远不会更新的脚本: Java DSL ``` Scripts.processor(...).refreshCheckDelay(-1) } ``` XML ``` ``` | |无法重新加载内联脚本。| |---|-----------------------------------| ##### 脚本变量绑定 需要变量绑定来使脚本能够引用外部提供给脚本执行上下文的变量。默认情况下,`headers`和`headers`用作绑定变量。可以使用``元素(或`ScriptSpec.variables()`选项)将其他变量绑定到脚本,如下例所示: Java DSL ``` Scripts.processor("foo/bar/MyScript.py") .variables(Map.of("var1", "thing1", "var2", "thing2", "date", date)) } ``` XML ``` ``` 如前面的示例所示,可以将脚本变量绑定到标量值或 Spring Bean 引用。注意,`payload`和`headers`仍然作为绑定变量包括在内。 Spring Integration3.0 中,除了`variable`元素外,还引入了`variables`属性。此属性和`variable`元素并不互斥,你可以将它们合并到一个`script`组件中。然而,变量必须是唯一的,无论它们在哪里定义。此外,自 Spring Integration3.0 以来,内联脚本也允许变量绑定,如下例所示: ``` ``` 前面的示例显示了一个内联脚本、一个`variable`元素和一个`variables`属性的组合。`variables`属性包含一个逗号分隔的值,其中每个段包含变量及其值的一个’=’分隔对。变量名可以后缀为`-ref`,就像前面示例中的`date-ref`变量一样。这意味着绑定变量的名称为`date`,但该值是对应用程序上下文中`dateBean` Bean 的引用。这在使用属性占位符配置或命令行参数时可能很有用。 如果你需要更多地控制变量的生成方式,那么你可以实现自己的 Java 类,它使用`ScriptVariableGenerator`策略,该策略由以下接口定义: ``` public interface ScriptVariableGenerator { Map generateScriptVariables(Message message); } ``` 这个接口要求你实现`generateScriptVariables(Message)`方法。消息参数允许你访问消息有效负载和消息头中的任何可用数据,返回值是绑定变量的`Map`。每当执行消息的脚本时,都会调用此方法。下面的示例展示了如何提供`ScriptVariableGenerator`的实现,并使用`script-variable-generator`属性对其进行引用: Java DSL ``` Scripts.processor("foo/bar/MyScript.groovy") .variableGenerator(new foo.bar.MyScriptVariableGenerator()) } ``` XML ``` ``` 如果不提供`script-variable-generator`,则脚本组件使用`DefaultScriptVariableGenerator`,它将所提供的任何``元素与`payload`和`headers`中的`Message`变量合并到其`generateScriptVariables(Message)`方法中。 | |不能同时提供`script-variable-generator`属性和``元素。
它们是互斥的。| |---|-------------------------------------------------------------------------------------------------------------------------------| ### Groovy 支持 在 Spring Integration2.0 中,我们添加了 Groovy 支持,允许你使用 Groovy 脚本语言为各种集成组件提供逻辑——类似于 Spring Expression 语言在路由、转换和其他集成问题中所支持的方式。有关 Groovy 的更多信息,请参见 Groovy 文档,你可以在[项目网站](https://groovy-lang.org/)上找到该文档。 你需要在项目中包含此依赖项: Maven ``` org.springframework.integration spring-integration-groovy 5.5.9 ``` Gradle ``` compile "org.springframework.integration:spring-integration-groovy:5.5.9" ``` #### Groovy 配置 在 Spring Integration2.1 中,Groovy 支持的配置名称空间是 Spring Integration 的脚本支持的扩展,并共享[脚本支持](./scripting.html#scripting)部分中详细描述的核心配置和行为。尽管 Groovy 脚本很好地得到了通用脚本支持,但 Groovy 支持提供了`Groovy`配置名称空间,它由 Spring 框架的`org.springframework.scripting.groovy.GroovyScriptFactory`和相关组件支持,为使用 Groovy 提供了扩展功能。下面的清单显示了两个示例配置: 例1.过滤器 ``` ``` 正如前面的示例所示,该配置看起来与常规脚本支持配置相同。唯一的区别是使用 Groovy 命名空间,如`int-groovy`命名空间前缀所示。还要注意,`