# Reactive Streams Support ## Reactive Streams Support Spring Integration provides support for [Reactive Streams](https://www.reactive-streams.org/) interaction in some places of the framework and from different aspects. We will discuss most of them here with appropriate links to the target chapters for details whenever necessary. ### Preface To recap, Spring Integration extends the Spring programming model to support the well-known Enterprise Integration Patterns. Spring Integration enables lightweight messaging within Spring-based applications and supports integration with external systems via declarative adapters. Spring Integration’s primary goal is to provide a simple model for building enterprise integration solutions while maintaining the separation of concerns that is essential for producing maintainable, testable code. This goal is achieved in the target application using first class citizens like `message`, `channel` and `endpoint`, which allow us to build an integration flow (pipeline), where (in most cases) one endpoint produces messages into a channel to be consumed by another endpoint. This way we distinguish an integration interaction model from the target business logic. The crucial part here is a channel in between: the flow behavior depends from its implementation leaving endpoints untouched. On the other hand, the Reactive Streams is a standard for asynchronous stream processing with non-blocking back pressure. The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary – like passing elements on to another thread or thread-pool – while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, back pressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The intention of Reactive Streams implementation, such as [Project Reactor](https://projectreactor.io/), is to preserve these benefits and characteristics across the whole processing graph of a stream application. The ultimate goal of Reactive Streams libraries is to provide types, set of operators and supporting API for a target application in a transparent and smooth manner as is possible with available programming language structure, but the final solution is not as imperative as it is with a normal function chain invocation. It is divided into to phases: definition and execution, which happens some time later during subscription to the final reactive publisher, and demand for data is pushed from the bottom of the definition to the top applying back-pressure as needed - we request as many events as we can handle at the moment. The reactive application looks like a `"stream"` or as we got used to in Spring Integration terms - `"flow"`. In fact the Reactive Streams SPI since Java 9 is presented in the `java.util.concurrent.Flow` class. From here it may look like Spring Integration flows are really a good fit for writing Reactive Streams applications when we apply some reactive framework operators on endpoints, but in fact the problems is much broader and we need to keep in mind that not all endpoints (e.g. `JdbcMessageHandler`) can be processed in a reactive stream transparently. Of course, the main goal for Reactive Streams support in Spring Integration is to allow the whole process to be fully reactive, on demand initiated and back-pressure ready. It is not going to be possible until the target protocols and systems for channel adapters provide a Reactive Streams interaction model. In the sections below we will describe what components and approaches are provided in Spring Integration for developing reactive application preserving integration flow structures. | |All the Reactive Streams interaction in Spring Integration implemented with [Project Reactor](https://projectreactor.io/) types, such as `Mono` and `Flux`.| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------| ### Messaging Gateway The simplest point of interaction with Reactive Streams is a `@MessagingGateway` where we just make a return type of the gateway method as a `Mono` - and the whole integration flow behind a gateway method call is going to be performed when a subscription happens on the returned `Mono` instance. See [Reactor `Mono`](./gateway.html#reactor-mono) for more information. A similar `Mono`-reply approach is used in the framework internally for inbound gateways which are fully based on Reactive Streams compatible protocols (see [Reactive Channel Adapters](#reactive-channel-adapters) below for more information). The send-and-receive operation is wrapped into a `Mono.deffer()` with chaining a reply evaluation from the `replyChannel` header whenever it is available. This way an inbound component for the particular reactive protocol (e.g. Netty) is going to be as a subscriber and initiator for a reactive flow performed on the Spring Integration. If the request payload is a reactive type, it would be better to handle it withing a reactive stream definition deferring a process to the initiator subscription. For this purpose a handler method must return a reactive type as well. See the next section for more information. ### Reactive Reply Payload When a reply producing `MessageHandler` returns a reactive type payload for a reply message, it is processed in an asynchronous manner with a regular `MessageChannel` implementation provided for the `outputChannel` and flattened with on demand subscription when the output channel is a `ReactiveStreamsSubscribableChannel` implementation, e.g. `FluxMessageChannel`. With a standard imperative `MessageChannel` use-case, and if a reply payload is a **multi-value** publisher (see `ReactiveAdapter.isMultiValue()` for more information), it is wrapped into a `Mono.just()`. A result of this, the `Mono` has to be subscribed explicitly downstream or flattened by the `FluxMessageChannel` downstream. With a `ReactiveStreamsSubscribableChannel` for the `outputChannel`, there is no need to be concerned about return type and subscription; everything is processed smoothly by the framework internally. See [Asynchronous Service Activator](./service-activator.html#async-service-activator) for more information. ### `FluxMessageChannel` and `ReactiveStreamsConsumer` The `FluxMessageChannel` is a combined implementation of `MessageChannel` and `Publisher>`. A `Flux`, as a hot source, is created internally for sinking incoming messages from the `send()` implementation. The `Publisher.subscribe()` implementation is delegated to that internal `Flux`. Also, for on demand upstream consumption, the `FluxMessageChannel` provides an implementation for the `ReactiveStreamsSubscribableChannel` contract. Any upstream `Publisher` (see Source Polling Channel Adapter and splitter below, for example) provided for this channel is auto-subscribed when subscription is ready for this channel. Events from this delegating publishers are sunk into an internal `Flux` mentioned above. A consumer for the `FluxMessageChannel` must be a `org.reactivestreams.Subscriber` instance for honoring the Reactive Streams contract. Fortunately, all of the `MessageHandler` implementations in Spring Integration also implement a `CoreSubscriber` from project Reactor. And thanks to a `ReactiveStreamsConsumer` implementation in between, the whole integration flow configuration is left transparent for target developers. In this case, the flow behavior is changed from an imperative push model to a reactive pull model. A `ReactiveStreamsConsumer` can also be used to turn any `MessageChannel` into a reactive source using `IntegrationReactiveUtils`, making an integration flow partially reactive. See [`FluxMessageChannel`](./channel.html#flux-message-channel) for more information. Starting with version 5.5, the `ConsumerEndpointSpec` introduces a `reactive()` option to make the endpoint in the flow as a `ReactiveStreamsConsumer` independently of the input channel. The optional `Function>, ? extends Publisher>>` can be provided to customise a source `Flux` from the input channel via `Flux.transform()` operation, e.g. with the `publishOn()`, `doOnNext()`, `retry()` etc. This functionality is represented as a `@Reactive` sub-annotation for all the messaging annotation (`@ServiceActivator`, `@Splitter` etc.) via their `reactive()` attribute. ### Source Polling Channel Adapter Usually, the `SourcePollingChannelAdapter` relies on the task which is initiated by the `TaskScheduler`. A polling trigger is built from the provided options and used for periodic scheduling a task to poll a target source of data or events. When an `outputChannel` is a `ReactiveStreamsSubscribableChannel`, the same `Trigger` is used to determine the next time for execution, but instead of scheduling tasks, the `SourcePollingChannelAdapter` creates a `Flux>` based on the `Flux.generate()` for the `nextExecutionTime` values and `Mono.delay()` for a duration from the previous step. A `Flux.flatMapMany()` is used then to poll `maxMessagesPerPoll` and sink them into an output `Flux`. This generator `Flux` is subscribed by the provided `ReactiveStreamsSubscribableChannel` honoring a back-pressure downstream. Starting with version 5.5, when `maxMessagesPerPoll == 0`, the source is not called at all, and `flatMapMany()` is completed immediately via a `Mono.empty()` result until the `maxMessagesPerPoll` is changed to non-zero value at a later time, e.g. via a Control Bus. This way, any `MessageSource` implementation can be turned into a reactive hot source. See [Polling Consumer](./polling-consumer.html#polling-consumer) for more information. ### Event-Driven Channel Adapter `MessageProducerSupport` is the base class for event-driven channel adapters and, typically, its `sendMessage(Message)` is used as a listener callback in the producing driver API. This callback can also be easily plugged into the `doOnNext()` Reactor operator when a message producer implementation builds a `Flux` of messages instead of listener-based functionality. In fact, this is done in the framework when an `outputChannel` of the message producer is not a `ReactiveStreamsSubscribableChannel`. However, for improved end-user experience, and to allow more back-pressure ready functionality, the `MessageProducerSupport` provides a `subscribeToPublisher(Publisher>)` API to be used in the target implementation when a `Publisher>>` is the source of data from the target system. Typically, it is used from the `doStart()` implementation when target driver API is called for a `Publisher` of source data. It is recommended to combine a reactive `MessageProducerSupport` implementation with a `FluxMessageChannel` as the `outputChannel` for on-demand subscription and event consumption downstream. The channel adapter goes to a stopped state when a subscription to the `Publisher` is cancelled. Calling `stop()` on such a channel adapter completes the producing from the source `Publisher`. The channel adapter can be restarted with automatic subscription to a newly created source `Publisher`. ### Message Source to Reactive Streams Starting with version 5.3, a `ReactiveMessageSourceProducer` is provided. It is a combination of a provided `MessageSource` and event-driven production into the configured `outputChannel`. Internally it wraps a `MessageSource` into the repeatedly resubscribed `Mono` producing a `Flux>` to be subscribed in the `subscribeToPublisher(Publisher>)` mentioned above. The subscription for this `Mono` is done using `Schedulers.boundedElastic()` to avoid possible blocking in the target `MessageSource`. When the message source returns `null` (no data to pull), the `Mono` is turned into a `repeatWhenEmpty()` state with a `delay` for a subsequent re-subscription based on a `IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY` `Duration` entry from the subscriber context. By default it is 1 second. If the `MessageSource` produces messages with a `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK` information in the headers, it is acknowledged (if necessary) in the `doOnSuccess()` of the original `Mono` and rejected in the `doOnError()` if the downstream flow throws a `MessagingException` with the failed message to reject. This `ReactiveMessageSourceProducer` could be used for any use-case when a a polling channel adapter’s features should be turned into a reactive, on demand solution for any existing `MessageSource` implementation. ### Splitter and Aggregator When an `AbstractMessageSplitter` gets a `Publisher` for its logic, the process goes naturally over the items in the `Publisher` to map them into messages for sending to the `outputChannel`. If this channel is a `ReactiveStreamsSubscribableChannel`, the `Flux` wrapper for the `Publisher` is subscribed on demand from that channel and this splitter behavior looks more like a `flatMap` Reactor operator, when we map an incoming event into multi-value output `Publisher`. It makes most sense when the whole integration flow is built with a `FluxMessageChannel` before and after the splitter, aligning Spring Integration configuration with a Reactive Streams requirements and its operators for event processing. With a regular channel, a `Publisher` is converted into an `Iterable` for standard iterate-and-produce splitting logic. A `FluxAggregatorMessageHandler` is another sample of specific Reactive Streams logic implementation which could be treated as a `"reactive operator"` in terms of Project Reactor. It is based on the `Flux.groupBy()` and `Flux.window()` (or `buffer()`) operators. The incoming messages are sunk into a `Flux.create()` initiated when a `FluxAggregatorMessageHandler` is created, making it as a hot source. This `Flux` is subscribed to by a `ReactiveStreamsSubscribableChannel` on demand, or directly in the `FluxAggregatorMessageHandler.start()` when the `outputChannel` is not reactive. This `MessageHandler` has its power, when the whole integration flow is built with a `FluxMessageChannel` before and after this component, making the whole logic back-pressure ready. See [Stream and Flux Splitting](./splitter.html#split-stream-and-flux) and [Flux Aggregator](./aggregator.html#flux-aggregator) for more information. ### Java DSL An `IntegrationFlow` in Java DSL can start from any `Publisher` instance (see `IntegrationFlows.from(Publisher>)`). Also, with an `IntegrationFlowBuilder.toReactivePublisher()` operator, the `IntegrationFlow` can be turned into a reactive hot source. A `FluxMessageChannel` is used internally in both cases; it can subscribe to an inbound `Publisher` according to its `ReactiveStreamsSubscribableChannel` contract and it is a `Publisher>` by itself for downstream subscribers. With a dynamic `IntegrationFlow` registration we can implement a powerful logic combining Reactive Streams with this integration flow bridging to/from `Publisher`. Starting with version 5.5.6, a `toReactivePublisher(boolean autoStartOnSubscribe)` operator variant is present to control a lifecycle of the whole `IntegrationFlow` behind the returned `Publisher>`. Typically, the subscription and consumption from the reactive publisher happens in the later runtime phase, not during reactive stream composition, or even `ApplicationContext` startup. To avoid boilerplate code for lifecycle management of the `IntegrationFlow` at the `Publisher>` subscription point and for better end-user experience, this new operator with the `autoStartOnSubscribe` flag has been introduced. It marks (if `true`) the `IntegrationFlow` and its components for `autoStartup = false`, so an `ApplicationContext` won’t initiate production and consumption of messages in the flow automatically. Instead the `start()` for the `IntegrationFlow` is initiated from the internal `Flux.doOnSubscribe()`. Independently of the `autoStartOnSubscribe` value, the flow is stopped from a `Flux.doOnCancel()` and `Flux.doOnTerminate()` - it does not make sense to produce messages if there is nothing to consume them. For the exact opposite use-case, when `IntegrationFlow` should call a reactive stream and continue after completion, a `fluxTransform()` operator is provided in the `IntegrationFlowDefinition`. The flow at this point is turned into a `FluxMessageChannel` which is propagated into a provided `fluxFunction`, performed in the `Flux.transform()` operator. A result of the function is wrapped into a `Mono>` for flat-mapping into an output `Flux` which is subscribed by another `FluxMessageChannel` for downstream flow. See [Java DSL Chapter](./dsl.html#java-dsl) for more information. ### `ReactiveMessageHandler` Starting with version 5.3, the `ReactiveMessageHandler` is supported natively in the framework. This type of message handler is designed for reactive clients which return a reactive type for on-demand subscription for low-level operation execution and doesn’t provide any reply data to continue a reactive stream composition. When a `ReactiveMessageHandler` is used in the imperative integration flow, the `handleMessage()` result in subscribed immediately after return, just because there is no reactive streams composition in such a flow to honor back-pressure. In this case the framework wraps this `ReactiveMessageHandler` into a `ReactiveMessageHandlerAdapter` - a plain implementation of `MessageHandler`. However when a `ReactiveStreamsConsumer` is involved in the flow (e.g. when channel to consume is a `FluxMessageChannel`), such a `ReactiveMessageHandler` is composed to the whole reactive stream with a `flatMap()` Reactor operator to honor back-pressure during consumption. One of the out-of-the-box `ReactiveMessageHandler` implementation is a `ReactiveMongoDbStoringMessageHandler` for Outbound Channel Adapter. See [MongoDB Reactive Channel Adapters](./mongodb.html#mongodb-reactive-channel-adapters) for more information. ### Reactive Channel Adapters When the target protocol for integration provides a Reactive Streams solution, it becomes straightforward to implement channel adapters in Spring Integration. An inbound, event-driven channel adapter implementation is about wrapping a request (if necessary) into a deferred `Mono` or `Flux` and perform a send (and produce reply, if any) only when a protocol component initiates a subscription into a `Mono` returned from the listener method. This way we have a reactive stream solution encapsulated exactly in this component. Of course, downstream integration flow subscribed on the output channel should honor Reactive Streams specification and be performed in the on demand, back-pressure ready manner. This is not always available by the nature (or the current implementation) of `MessageHandler` processor used in the integration flow. This limitation can be handled using thread pools and queues or `FluxMessageChannel` (see above) before and after integration endpoints when there is no reactive implementation. A reactive outbound channel adapter implementation is about initiation (or continuation) of a reactive stream to interaction with an external system according provided reactive API for the target protocol. An inbound payload could be a reactive type per se or as an event of the whole integration flow which is a part of reactive stream on top. A returned reactive type can be subscribed immediately if we are in one-way, fire-and-forget scenario, or it is propagated downstream (request-reply scenarios) for further integration flow or an explicit subscription in the target business logic, but still downstream preserving reactive streams semantics. Currently Spring Integration provides channel adapter (or gateway) implementations for [WebFlux](./webflux.html#webflux), [RSocket](./rsocket.html#rsocket), [MongoDb](./mongodb.html#mongodb) and [R2DBC](./r2dbc.html#r2dbc). The [Redis Stream Channel Adapters](./redis.html#redis-stream-outbound) are also reactive and uses `ReactiveStreamOperations` from Spring Data. Also an [Apache Cassandra Extension](https://github.com/spring-projects/spring-integration-extensions/tree/main/spring-integration-cassandra) provides a `MessageHandler` implementation for the Cassandra reactive driver. More reactive channel adapters are coming, for example for Apache Kafka in [Kafka](./kafka.html#kafka) based on the `ReactiveKafkaProducerTemplate` and `ReactiveKafkaConsumerTemplate` from [Spring for Apache Kafka](https://spring.io/projects/spring-kafka) etc. For many other non-reactive channel adapters thread pools are recommended to avoid blocking during reactive stream processing.