reactive-streams.md 17.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
# 反应流支持

## 反应流支持

Spring 集成提供了对[反应流](https://www.reactive-streams.org/)在框架的某些地方和从不同方面的交互的支持。我们将在这里讨论其中的大部分内容,并在必要时提供与目标章节的适当链接,以获取详细信息。

### 序言

概括地说, Spring 集成扩展了 Spring 编程模型,以支持众所周知的 Enterprise 集成模式。 Spring 集成支持基于 Spring 的应用程序内的轻量级消息传递,并支持通过声明性适配器与外部系统集成。 Spring 集成的主要目标是提供一个简单的模型,用于构建 Enterprise 集成解决方案,同时保持关注点的分离,这对于生成可维护的、可测试的代码是必不可少的。在目标应用程序中,使用像`message``channel``endpoint`这样的第一类公民来实现这个目标,这允许我们构建一个集成流(管道),其中(在大多数情况下)一个端点将消息产生到另一个端点消费的通道中。通过这种方式,我们将集成交互模型与目标业务逻辑区分开来。这里的关键部分是介于两者之间的通道:流的行为取决于它的实现,而不影响端点。

另一方面,对于具有非阻塞背压的异步流处理,反应流是一种标准。反应流的主要目标是控制跨异步边界的流数据交换——比如将元素传递到另一个线程或线程池——同时确保接收方不会被迫缓冲任意数量的数据。换句话说,背压是这个模型的一个组成部分,以便允许在线程之间进行中介的队列是有界的。诸如[项目反应堆](https://projectreactor.io/)之类的反应流实现的目的是在流应用程序的整个处理图中保留这些优点和特性。反应流库的最终目标是以透明和平滑的方式为目标应用程序提供类型、操作符集合和支持 API,这是可用的编程语言结构所可能达到的,但是,最终的解决方案并不像普通的函数链调用那样势在必行。它被分为几个阶段:定义和执行,这发生在订阅最终的反应性发布者的一段时间之后,对数据的需求从定义的底部推到顶部,根据需要施加反压-我们请求尽可能多的事件,我们现在可以处理。反应性应用程序看起来像`"stream"`,或者像我们在 Spring 积分术语中习惯的那样-`"flow"`。实际上,自 Java9 以来的反应流 SPI 是在`java.util.concurrent.Flow`类中呈现的。

从这里看, Spring 当我们在端点上应用一些反应性框架操作符时,集成流确实非常适合编写反应性流应用程序,但实际上问题要广泛得多,我们需要记住,并不是所有的端点(例如`JdbcMessageHandler`)都可以透明地在反应流中进行处理。当然, Spring 集成中的反应性流支持的主要目标是允许整个过程完全具有反应性,按需启动并准备好反压力。这是不可能的,直到目标协议和系统为信道适配器提供了一个反应流交互模型。在下面的部分中,我们将描述 Spring 集成中提供了哪些组件和方法,用于开发保持集成流结构的反应性应用程序。

|   |Spring 集成中的所有反应流实现了与类型的交互,例如和。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------|

### 消息传递网关

与反应流交互的最简单的点是`@MessagingGateway`,其中我们将网关方法的返回类型设置为`Mono<?>`-当订阅发生在返回的`Mono`实例上时,将执行网关方法调用背后的整个集成流。有关更多信息,请参见[reactor`Mono`]。对于完全基于反应流兼容协议的入站网关,框架内部使用了类似的`Mono`-reply 方法(有关更多信息,请参见下面的[无功通道适配器](#reactive-channel-adapters))。发送和接收操作被包装到`Mono.deffer()`中,并在任何可用的情况下链接来自`replyChannel`头的应答评估。这样,用于特定的无反应协议(例如 NETTY)的入站组件将被用作在 Spring 集成上执行的无反应流的订阅者和发起者。如果请求有效负载是一种反应性类型,那么最好使用反应性流定义来处理它,从而将进程推迟到发起者订阅。为此目的,处理程序方法还必须返回一个反应式类型。有关更多信息,请参见下一节。

### 无功回复有效载荷

当产生`MessageHandler`的应答返回用于应答消息的反应型有效负载时,将使用为`outputChannel`实现提供的常规`MessageChannel`实现以异步方式对其进行处理,并且当输出通道是`ReactiveStreamsSubscribableChannel`实现时,使用按需订阅进行平坦化处理,例如`FluxMessageChannel`。使用标准的命令式`MessageChannel`用例,并且如果一个回复有效负载是**多值**发布者(有关更多信息,请参见`ReactiveAdapter.isMultiValue()`),则将其包装为`Mono.just()`。这样做的结果是,`Mono`必须在下游显式地订阅或在下游由`FluxMessageChannel`压平。对于`ReactiveStreamsSubscribableChannel``outputChannel`,无需担心返回类型和订阅;所有内容都由框架内部顺利处理。

有关更多信息,请参见[异步服务激活器](./service-activator.html#async-service-activator)

### `FluxMessageChannel`和`ReactiveStreamsConsumer`

`FluxMessageChannel``MessageChannel``Publisher<Message<?>>`的组合实现。内部创建了一个`Flux`作为热源,用于接收来自`send()`实现的传入消息。将`Publisher.subscribe()`实现委托给内部`Flux`。此外,对于按需的上游消费,`FluxMessageChannel`提供了`ReactiveStreamsSubscribableChannel`契约的实现。为该通道提供的任何上游`Publisher`(例如,请参见下面的源轮询通道适配器和拆分器)在为该通道准备好订阅时都是自动订阅的。来自此委托发布者的事件被沉入上面提到的内部`Flux`中。

`FluxMessageChannel`的使用者必须是一个`org.reactivestreams.Subscriber`实例,以遵守反应流契约。幸运的是, Spring 集成中的所有`MessageHandler`实现还实现了来自 Project Reactor 的`CoreSubscriber`。而且,由于中间有`ReactiveStreamsConsumer`实现,整个集成流配置对目标开发人员来说是透明的。在这种情况下,流的行为被改变,从一个命令推动模型到一个反应拉动模型。也可以使用`ReactiveStreamsConsumer`将任何`MessageChannel`转换为使用`IntegrationReactiveUtils`的反应源,从而使积分流部分地具有活性。

有关更多信息,请参见[`FluxMessageChannel`]。

从版本 5.5 开始,`ConsumerEndpointSpec`引入了`reactive()`选项,使流中的端点独立于输入通道,成为`ReactiveStreamsConsumer`。可以提供可选的`Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>`以通过`Flux.transform()`操作从输入通道自定义源`Flux`,例如使用`publishOn()``doOnNext()``retry()`等。该功能通过其`reactive()`属性表示为所有消息注释(`@ServiceActivator``@Splitter`等)的`@Reactive`子注释。

### 源轮询通道适配器

通常,`SourcePollingChannelAdapter`依赖于由`TaskScheduler`发起的任务。轮询触发器由提供的选项构建,用于定期调度任务,以轮询目标数据源或事件。当`outputChannel``ReactiveStreamsSubscribableChannel`时,使用相同的`Trigger`来确定下一次执行的时间,但不是调度任务,`SourcePollingChannelAdapter`基于`Flux.generate()`值和`Mono.delay()`值的`Flux<Message<?>>`在上一步的持续时间内创建`Flux<Message<?>>`。然后使用`Flux.flatMapMany()`轮询`maxMessagesPerPoll`,并将它们沉入输出`Flux`。这个生成器`Flux`是由所提供的`ReactiveStreamsSubscribableChannel`所订阅的,以满足下游的背压。从版本 5.5 开始,当`maxMessagesPerPoll == 0`时,根本不调用源代码,并且`flatMapMany()`通过`Mono.empty()`结果立即完成,直到`maxMessagesPerPoll`在稍后的时间被更改为非零值,例如通过控制总线。这样,任何`MessageSource`实现都可以转变为反应性热源。

有关更多信息,请参见[轮询消费者](./polling-consumer.html#polling-consumer)

### 事件驱动通道适配器

`MessageProducerSupport`是事件驱动通道适配器的基类,通常,其`sendMessage(Message<?>)`在产生驱动程序 API 中用作侦听器回调。当消息生成器实现构建消息的`Flux`而不是基于侦听器的功能时,也可以很容易地将此回调插入`doOnNext()`反应器操作符。实际上,当消息生成器的`outputChannel`不是`ReactiveStreamsSubscribableChannel`时,就可以在框架中完成此操作。然而,为了改善最终用户的体验,并允许更多的背压准备功能,`MessageProducerSupport`提供了一个`subscribeToPublisher(Publisher<? extends Message<?>>)`API 来在目标实现中使用,当一个`Publisher<Message<?>>>`是来自目标系统的数据源时。通常,当为`Publisher`的源数据调用目标驱动程序 API 时,它是从`doStart()`实现的。建议将反应性`MessageProducerSupport`实现与`FluxMessageChannel`合并为`outputChannel`,用于按需订阅和下游事件消费。当取消对`Publisher`的订阅时,通道适配器进入停止状态。在这样的通道适配器上调用`stop()`完成从源`Publisher`的生成。可以通过自动订阅新创建的源`Publisher`来重新启动通道适配器。

### 消息源到反应流

从版本 5.3 开始,提供了`ReactiveMessageSourceProducer`。它是将提供的`MessageSource`和事件驱动的生产组合到配置的`outputChannel`中。在内部,它将`MessageSource`封装到重复重新订阅的`Mono`中,生成要在上面提到的`subscribeToPublisher(Publisher<? extends Message<?>>)`中订阅的`Flux<Message<?>>`。此`Mono`的订阅使用`Schedulers.boundedElastic()`完成,以避免在目标`MessageSource`中可能出现的阻塞。当消息源返回`null`(无数据可拉)时,将`Mono`转换为`repeatWhenEmpty()`状态,并带有`delay`,以便基于订阅服务器上下文中的`IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY`条目进行后续的重新订阅。默认情况下是 1 秒。如果`MessageSource`在头中产生带有`IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK`信息的消息,则在原始`Mono``doOnSuccess()`中进行确认(如果需要的话),并且在`doOnError()`中进行拒绝,如果下游流抛出带有拒绝消息的`MessagingException`。当轮询通道适配器的功能应该转换为针对任何现有`MessageSource<?>`实现的反应式、随需应变的解决方案时,此`ReactiveMessageSourceProducer`可用于任何用例。

### 拆分器和聚合器

`AbstractMessageSplitter`得到其逻辑的`Publisher`时,该过程自然地对`Publisher`中的项进行遍历,将它们映射为消息,以发送到`outputChannel`。如果此通道是`ReactiveStreamsSubscribableChannel`,则根据需要从该通道订阅`Publisher``Flux`包装器,当我们将传入事件映射到多值输出`Publisher`时,此拆分器行为看起来更像是`flatMap`反应器操作符。当整个集成流在分配器前后用`FluxMessageChannel`构建时,这是最有意义的, Spring 将集成配置与用于事件处理的反应流要求及其操作符对齐。对于常规通道,将`Publisher`转换为`Iterable`,用于标准的迭代和产生分裂逻辑。

a`FluxAggregatorMessageHandler`是具体的反应流逻辑实现的另一个示例,可以将其作为 a`"reactive operator"`的工程反应器来处理。它是基于`Flux.groupBy()``Flux.window()`(或`buffer()`)的运算符。当创建`FluxAggregatorMessageHandler`时,传入的消息会沉入`Flux.create()`中,使其成为热源。这`Flux`是由`ReactiveStreamsSubscribableChannel`按需订阅的,或者直接在`FluxAggregatorMessageHandler.start()`中当`outputChannel`是不反应的。这个`MessageHandler`有它的力量,当整个集成流程在这个组件之前和之后构建一个`FluxMessageChannel`时,使得整个逻辑背压准备就绪。

有关更多信息,请参见[水流和通量分裂](./splitter.html#split-stream-and-flux)[通量聚合器](./aggregator.html#flux-aggregator)

### Java DSL

Java DSL 中的`IntegrationFlow`可以从任何`Publisher`实例开始(参见`IntegrationFlows.from(Publisher<Message<T>>)`)。此外,使用`IntegrationFlowBuilder.toReactivePublisher()`操作符,`IntegrationFlow`可以将其转换为反应性热源。a`FluxMessageChannel`在这两种情况下都在内部使用;它可以根据其`ReactiveStreamsSubscribableChannel`契约订阅入站`Publisher`,并且它本身对于下游订阅者是`Publisher<Message<?>>`。通过动态的`IntegrationFlow`注册,我们可以实现一种强大的逻辑,将反应流与此集成流桥接到/从`Publisher`

从版本 5.5.6 开始,存在一个`toReactivePublisher(boolean autoStartOnSubscribe)`操作符变体来控制返回的`IntegrationFlow`后面的整个`IntegrationFlow`的生命周期。通常,来自 Active Publisher 的订阅和消耗发生在较晚的运行时阶段,而不是在 Active Stream 复合期间,甚至`ApplicationContext`启动期间。为了避免在`Publisher<Message<?>>`订阅点的`IntegrationFlow`的生命周期管理的样板代码,以及为了更好的最终用户体验,引入了带有`autoStartOnSubscribe`标志的新操作符。它标记(如果`true``IntegrationFlow`及其组件的`autoStartup = false`,因此`ApplicationContext`不会自动启动流中消息的生成和消耗。相反,`start()``IntegrationFlow`是从内部`Flux.doOnSubscribe()`发起的。与`autoStartOnSubscribe`值无关,从`Flux.doOnCancel()``Flux.doOnTerminate()`停止流--如果没有什么可使用的消息,产生消息是没有意义的。

对于完全相反的用例,当`IntegrationFlow`应该调用一个反应流并在完成后继续时,`fluxTransform()`运算符在`IntegrationFlowDefinition`中提供。在这一点上的流被转换为`FluxMessageChannel`,它被传播到所提供的`fluxFunction`中,在`Flux.transform()`操作符中执行。函数的一个结果被包装到`Mono<Message<?>>`中,用于将其平铺映射到输出`Flux`中,该输出由另一个`FluxMessageChannel`订阅,用于下游流。

有关更多信息,请参见[Java DSL 章节](./dsl.html#java-dsl)

### `ReactiveMessageHandler`

从版本 5.3 开始,`ReactiveMessageHandler`在框架中得到了原生支持。这种类型的消息处理程序是为反应性客户端设计的,这些客户端返回一种反应性类型,用于按需订阅以执行低级操作,并且不提供任何响应数据以继续进行反应性流组合。当一个`ReactiveMessageHandler`在命令式积分流中使用时,`handleMessage()`在返回后立即导致订阅,仅仅是因为在这样的合成流中没有反应流来兑现背压。在这种情况下,框架将`ReactiveMessageHandler`封装为`ReactiveMessageHandlerAdapter`-`MessageHandler`的一个简单实现。然而,当`ReactiveStreamsConsumer`是涉及到流(例如当信道消耗是`FluxMessageChannel`时)时,这样的`ReactiveMessageHandler`是由整个反应流组成的带有`flatMap()`的反应器操作人员在消耗过程中遵守背压。

其中一个开箱即用的`ReactiveMessageHandler`实现是用于出站通道适配器的`ReactiveMongoDbStoringMessageHandler`。有关更多信息,请参见[MongoDB 反应式通道适配器](./mongodb.html#mongodb-reactive-channel-adapters)

### 无功通道适配器

当用于集成的目标协议提供了反应流解决方案时,在 Spring 集成中实现通道适配器变得非常简单。

入站的、事件驱动的通道适配器实现是关于将一个请求(如果需要的话)包装到一个延迟的`Mono``Flux`中,并仅在协议组件发起对从侦听器方法返回的`Mono`的订阅时才执行发送(如果有的话,则产生答复)。这样,我们就有了一个完全封装在这个组件中的反应流解决方案。当然,在输出通道上订阅的下游集成流应该遵守反应流规范,并以按需、背压就绪的方式执行。这并不总是由在集成流程中使用的`MessageHandler`处理器的性质(或当前实现)所提供的。当没有反应性实现时,可以使用线程池和队列或`FluxMessageChannel`(见上文)在集成端点之前和之后处理此限制。

一种反应性出站通道适配器的实现是关于根据针对目标协议提供的反应性 API 与外部系统进行交互的反应性流的发起(或延续)。入站有效负载本身可以是一种反应性类型,或者作为整个集成流的一个事件,它是顶部的反应性流的一部分。如果我们在单向的、fire-and-forget 场景中,或者在下游传播(请求-应答场景)以获取进一步的集成流,或者在目标业务逻辑中使用显式的订阅,则可以立即订阅返回的反应性类型,但下游仍然保留反应性流的语义。

当前 Spring 集成为[WebFlux](./webflux.html#webflux)[RSocket](./rsocket.html#rsocket)[MongoDb](./mongodb.html#mongodb)[R2DBC](./r2dbc.html#r2dbc)提供了通道适配器(或网关)实现。[Redis 流通道适配器](./redis.html#redis-stream-outbound)也是活性的,并使用 Spring 数据中的`ReactiveStreamOperations`。另外,[Apache Cassandra 扩展](https://github.com/spring-projects/spring-integration-extensions/tree/main/spring-integration-cassandra)还为 Cassandra 反应驱动程序提供了`MessageHandler`实现。更多的反应性通道适配器正在出现,例如,对于[Kafka](./kafka.html#kafka)中的 Apache Kafka,基于`ReactiveKafkaProducerTemplate``ReactiveKafkaConsumerTemplate`中的[Spring for Apache Kafka](https://spring.io/projects/spring-kafka)等。对于许多其他的无反应通道适配器,线程池是推荐的,以避免在无反应的流处理过程中的阻塞。