# WebFlux Support ## [](#webflux)WebFlux Support The WebFlux Spring Integration module (`spring-integration-webflux`) allows for the execution of HTTP requests and the processing of inbound HTTP requests in a reactive manner. You need to include this dependency into your project: Maven ``` org.springframework.integration spring-integration-webflux 5.5.9 ``` Gradle ``` compile "org.springframework.integration:spring-integration-webflux:5.5.9" ``` The `io.projectreactor.netty:reactor-netty` dependency must be included in case of non-Servlet-based server configuration. The WebFlux support consists of the following gateway implementations: `WebFluxInboundEndpoint` and `WebFluxRequestExecutingMessageHandler`. The support is fully based on the Spring [WebFlux](https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#spring-webflux) and [Project Reactor](https://projectreactor.io/) foundations. See [HTTP Support](./http.html#http) for more information, since many options are shared between reactive and regular HTTP components. ### [](#webflux-namespace)WebFlux Namespace Support Spring Integration provides a `webflux` namespace and the corresponding schema definition. To include it in your configuration, add the following namespace declaration in your application context configuration file: ``` ... ``` ### [](#webflux-inbound)WebFlux Inbound Components Starting with version 5.0, the `WebFluxInboundEndpoint` implementation of `WebHandler` is provided. This component is similar to the MVC-based `HttpRequestHandlingEndpointSupport`, with which it shares some common options through the newly extracted `BaseHttpInboundEndpoint`. It is used in the Spring WebFlux reactive environment (instead of MVC). The following example shows a simple implementation of a WebFlux endpoint: Java DSL ``` @Bean public IntegrationFlow inboundChannelAdapterFlow() { return IntegrationFlows .from(WebFlux.inboundChannelAdapter("/reactivePost") .requestMapping(m -> m.methods(HttpMethod.POST)) .requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class)) .statusCodeFunction(m -> HttpStatus.ACCEPTED)) .channel(c -> c.queue("storeChannel")) .get(); } ``` Kotlin DSL ``` @Bean fun inboundChannelAdapterFlow() = integrationFlow( WebFlux.inboundChannelAdapter("/reactivePost") .apply { requestMapping { m -> m.methods(HttpMethod.POST) } requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java)) statusCodeFunction { m -> HttpStatus.ACCEPTED } }) { channel { queue("storeChannel") } } ``` Java ``` @Configuration @EnableWebFlux @EnableIntegration public class ReactiveHttpConfiguration { @Bean public WebFluxInboundEndpoint simpleInboundEndpoint() { WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint(); RequestMapping requestMapping = new RequestMapping(); requestMapping.setPathPatterns("/test"); endpoint.setRequestMapping(requestMapping); endpoint.setRequestChannelName("serviceChannel"); return endpoint; } @ServiceActivator(inputChannel = "serviceChannel") String service() { return "It works!"; } } ``` XML ``` ``` The configuration is similar to the `HttpRequestHandlingEndpointSupport` (mentioned prior to the example), except that we use `@EnableWebFlux` to add the WebFlux infrastructure to our integration application. Also, the `WebFluxInboundEndpoint` performs `sendAndReceive` operations to the downstream flow by using back-pressure, on-demand based capabilities, provided by the reactive HTTP server implementation. | |The reply part is non-blocking as well and is based on the internal `FutureReplyChannel`, which is flat-mapped to a reply `Mono` for on-demand resolution.| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------| You can configure the `WebFluxInboundEndpoint` with a custom `ServerCodecConfigurer`, a `RequestedContentTypeResolver`, and even a `ReactiveAdapterRegistry`. The latter provides a mechanism you can use to return a reply as any reactive type: Reactor `Flux`, RxJava `Observable`, `Flowable`, and others. This way, we can implement [Server Sent Events](https://en.wikipedia.org/wiki/Server-sent_events) scenarios with Spring Integration components, as the following example shows: Java DSL ``` @Bean public IntegrationFlow sseFlow() { return IntegrationFlows .from(WebFlux.inboundGateway("/sse") .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE))) .handle((p, h) -> Flux.just("foo", "bar", "baz")) .get(); } ``` Kotlin DSL ``` @Bean fun sseFlow() = integrationFlow( WebFlux.inboundGateway("/sse") .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE))) { handle { (p, h) -> Flux.just("foo", "bar", "baz") } } ``` Java ``` @Bean public WebFluxInboundEndpoint webfluxInboundGateway() { WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint(); RequestMapping requestMapping = new RequestMapping(); requestMapping.setPathPatterns("/sse"); requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE); endpoint.setRequestMapping(requestMapping); endpoint.setRequestChannelName("requests"); return endpoint; } ``` XML ``` ``` See [Request Mapping Support](./http.html#http-request-mapping) and [Cross-origin Resource Sharing (CORS) Support](./http.html#http-cors) for more possible configuration options. When the request body is empty or `payloadExpression` returns `null`, the request params (`MultiValueMap`) is used for a `payload` of the target message to process. #### [](#webflux-validation)Payload Validation Starting with version 5.2, the `WebFluxInboundEndpoint` can be configured with a `Validator`. Unlike the MVC validation in the [HTTP Support](./http.html#http-validation), it is used to validate elements in the `Publisher` to which a request has been converted by the `HttpMessageReader`, before performing a fallback and `payloadExpression` functions. The Framework can’t assume how complex the `Publisher` object can be after building the final payload. If there is a requirements to restrict validation visibility for exactly final payload (or its `Publisher` elements), the validation should go downstream instead of WebFlux endpoint. See more information in the Spring WebFlux [documentation](https://docs.spring.io/spring/docs/5.1.8.RELEASE/spring-framework-reference/web-reactive.html#webflux-fn-handler-validation). An invalid payload is rejected with an `IntegrationWebExchangeBindException` (a `WebExchangeBindException` extension), containing all the validation `Errors`. See more in Spring Framework [Reference Manual](https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#validation) about validation. ### [](#webflux-outbound)WebFlux Outbound Components The `WebFluxRequestExecutingMessageHandler` (starting with version 5.0) implementation is similar to `HttpRequestExecutingMessageHandler`. It uses a `WebClient` from the Spring Framework WebFlux module. To configure it, define a bean similar to the following: Java DSL ``` @Bean public IntegrationFlow outboundReactive() { return f -> f .handle(WebFlux.>outboundGateway(m -> UriComponentsBuilder.fromUriString("http://localhost:8080/foo") .queryParams(m.getPayload()) .build() .toUri()) .httpMethod(HttpMethod.GET) .expectedResponseType(String.class)); } ``` Kotlin DSL ``` @Bean fun outboundReactive() = integrationFlow { handle( WebFlux.outboundGateway>({ m -> UriComponentsBuilder.fromUriString("http://localhost:8080/foo") .queryParams(m.getPayload()) .build() .toUri() }) .httpMethod(HttpMethod.GET) .expectedResponseType(String::class.java) ) } ``` Java ``` @ServiceActivator(inputChannel = "reactiveHttpOutRequest") @Bean public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) { WebFluxRequestExecutingMessageHandler handler = new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client); handler.setHttpMethod(HttpMethod.POST); handler.setExpectedResponseType(String.class); return handler; } ``` XML ``` ``` The `WebClient` `exchange()` operation returns a `Mono`, which is mapped (by using several `Mono.map()` steps) to an `AbstractIntegrationMessageBuilder` as the output from the `WebFluxRequestExecutingMessageHandler`. Together with the `ReactiveChannel` as an `outputChannel`, the `Mono` evaluation is deferred until a downstream subscription is made. Otherwise, it is treated as an `async` mode, and the `Mono` response is adapted to a `SettableListenableFuture` for an asynchronous reply from the `WebFluxRequestExecutingMessageHandler`. The target payload of the output message depends on the `WebFluxRequestExecutingMessageHandler` configuration. The `setExpectedResponseType(Class)` or `setExpectedResponseTypeExpression(Expression)` identifies the target type of the response body element conversion. If `replyPayloadToFlux` is set to `true`, the response body is converted to a `Flux` with the provided `expectedResponseType` for each element, and this `Flux` is sent as the payload downstream. Afterwards, you can use a [splitter](./splitter.html#splitter) to iterate over this `Flux` in a reactive manner. In addition a `BodyExtractor` can be injected into the `WebFluxRequestExecutingMessageHandler` instead of the `expectedResponseType` and `replyPayloadToFlux` properties. It can be used for low-level access to the `ClientHttpResponse` and more control over body and HTTP headers conversion. Spring Integration provides `ClientHttpResponseBodyExtractor` as a identity function to produce (downstream) the whole `ClientHttpResponse` and any other possible custom logic. Starting with version 5.2, the `WebFluxRequestExecutingMessageHandler` supports reactive `Publisher`, `Resource`, and `MultiValueMap` types as the request message payload. A respective `BodyInserter` is used internally to be populated into the `WebClient.RequestBodySpec`. When the payload is a reactive `Publisher`, a configured `publisherElementType` or `publisherElementTypeExpression` can be used to determine a type for the publisher’s element type. The expression must be resolved to a `Class`, `String` which is resolved to the target `Class` or `ParameterizedTypeReference`. Starting with version 5.5, the `WebFluxRequestExecutingMessageHandler` exposes an `extractResponseBody` flag (which is `true` by default) to return just the response body, or to return the whole `ResponseEntity` as the reply message payload, independently of the provided `expectedResponseType` or `replyPayloadToFlux`. If a body is not present in the `ResponseEntity`, this flag is ignored and the whole `ResponseEntity` is returned. See [HTTP Outbound Components](./http.html#http-outbound) for more possible configuration options. ### [](#webflux-header-mapping)WebFlux Header Mappings Since WebFlux components are fully based on the HTTP protocol, there is no difference in the HTTP headers mapping. See [HTTP Header Mappings](./http.html#http-header-mapping) for more possible options and components to use for mapping headers.