# WebFlux Support
## WebFlux Support
The WebFlux Spring Integration module (`spring-integration-webflux`) allows for the execution of HTTP requests and the processing of inbound HTTP requests in a reactive manner.
You need to include this dependency into your project:
Maven
```
org.springframework.integration
spring-integration-webflux
5.5.9
```
Gradle
```
compile "org.springframework.integration:spring-integration-webflux:5.5.9"
```
The `io.projectreactor.netty:reactor-netty` dependency must be included in case of non-Servlet-based server configuration.
The WebFlux support consists of the following gateway implementations: `WebFluxInboundEndpoint` and `WebFluxRequestExecutingMessageHandler`.
The support is fully based on the Spring [WebFlux](https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#spring-webflux) and [Project Reactor](https://projectreactor.io/) foundations.
See [HTTP Support](./http.html#http) for more information, since many options are shared between reactive and regular HTTP components.
### WebFlux Namespace Support
Spring Integration provides a `webflux` namespace and the corresponding schema definition.
To include it in your configuration, add the following namespace declaration in your application context configuration file:
```
...
```
### WebFlux Inbound Components
Starting with version 5.0, the `WebFluxInboundEndpoint` implementation of `WebHandler` is provided.
This component is similar to the MVC-based `HttpRequestHandlingEndpointSupport`, with which it shares some common options through the newly extracted `BaseHttpInboundEndpoint`.
It is used in the Spring WebFlux reactive environment (instead of MVC).
The following example shows a simple implementation of a WebFlux endpoint:
Java DSL
```
@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
return IntegrationFlows
.from(WebFlux.inboundChannelAdapter("/reactivePost")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
.statusCodeFunction(m -> HttpStatus.ACCEPTED))
.channel(c -> c.queue("storeChannel"))
.get();
}
```
Kotlin DSL
```
@Bean
fun inboundChannelAdapterFlow() =
integrationFlow(
WebFlux.inboundChannelAdapter("/reactivePost")
.apply {
requestMapping { m -> m.methods(HttpMethod.POST) }
requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
statusCodeFunction { m -> HttpStatus.ACCEPTED }
})
{
channel { queue("storeChannel") }
}
```
Java
```
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {
@Bean
public WebFluxInboundEndpoint simpleInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/test");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("serviceChannel");
return endpoint;
}
@ServiceActivator(inputChannel = "serviceChannel")
String service() {
return "It works!";
}
}
```
XML
```
```
The configuration is similar to the `HttpRequestHandlingEndpointSupport` (mentioned prior to the example), except that we use `@EnableWebFlux` to add the WebFlux infrastructure to our integration application.
Also, the `WebFluxInboundEndpoint` performs `sendAndReceive` operations to the downstream flow by using back-pressure, on-demand based capabilities, provided by the reactive HTTP server implementation.
| |The reply part is non-blocking as well and is based on the internal `FutureReplyChannel`, which is flat-mapped to a reply `Mono` for on-demand resolution.|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------|
You can configure the `WebFluxInboundEndpoint` with a custom `ServerCodecConfigurer`, a `RequestedContentTypeResolver`, and even a `ReactiveAdapterRegistry`.
The latter provides a mechanism you can use to return a reply as any reactive type: Reactor `Flux`, RxJava `Observable`, `Flowable`, and others.
This way, we can implement [Server Sent Events](https://en.wikipedia.org/wiki/Server-sent_events) scenarios with Spring Integration components, as the following example shows:
Java DSL
```
@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlows
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
.get();
}
```
Kotlin DSL
```
@Bean
fun sseFlow() =
integrationFlow(
WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
{
handle { (p, h) -> Flux.just("foo", "bar", "baz") }
}
```
Java
```
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/sse");
requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("requests");
return endpoint;
}
```
XML
```
```
See [Request Mapping Support](./http.html#http-request-mapping) and [Cross-origin Resource Sharing (CORS) Support](./http.html#http-cors) for more possible configuration options.
When the request body is empty or `payloadExpression` returns `null`, the request params (`MultiValueMap`) is used for a `payload` of the target message to process.
#### Payload Validation
Starting with version 5.2, the `WebFluxInboundEndpoint` can be configured with a `Validator`.
Unlike the MVC validation in the [HTTP Support](./http.html#http-validation), it is used to validate elements in the `Publisher` to which a request has been converted by the `HttpMessageReader`, before performing a fallback and `payloadExpression` functions.
The Framework can’t assume how complex the `Publisher` object can be after building the final payload.
If there is a requirements to restrict validation visibility for exactly final payload (or its `Publisher` elements), the validation should go downstream instead of WebFlux endpoint.
See more information in the Spring WebFlux [documentation](https://docs.spring.io/spring/docs/5.1.8.RELEASE/spring-framework-reference/web-reactive.html#webflux-fn-handler-validation).
An invalid payload is rejected with an `IntegrationWebExchangeBindException` (a `WebExchangeBindException` extension), containing all the validation `Errors`.
See more in Spring Framework [Reference Manual](https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#validation) about validation.
### WebFlux Outbound Components
The `WebFluxRequestExecutingMessageHandler` (starting with version 5.0) implementation is similar to `HttpRequestExecutingMessageHandler`.
It uses a `WebClient` from the Spring Framework WebFlux module.
To configure it, define a bean similar to the following:
Java DSL
```
@Bean
public IntegrationFlow outboundReactive() {
return f -> f
.handle(WebFlux.>outboundGateway(m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri())
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class));
}
```
Kotlin DSL
```
@Bean
fun outboundReactive() =
integrationFlow {
handle(
WebFlux.outboundGateway>({ m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri()
})
.httpMethod(HttpMethod.GET)
.expectedResponseType(String::class.java)
)
}
```
Java
```
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
WebFluxRequestExecutingMessageHandler handler =
new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
handler.setHttpMethod(HttpMethod.POST);
handler.setExpectedResponseType(String.class);
return handler;
}
```
XML
```
```
The `WebClient` `exchange()` operation returns a `Mono`, which is mapped (by using several `Mono.map()` steps) to an `AbstractIntegrationMessageBuilder` as the output from the `WebFluxRequestExecutingMessageHandler`.
Together with the `ReactiveChannel` as an `outputChannel`, the `Mono` evaluation is deferred until a downstream subscription is made.
Otherwise, it is treated as an `async` mode, and the `Mono` response is adapted to a `SettableListenableFuture` for an asynchronous reply from the `WebFluxRequestExecutingMessageHandler`.
The target payload of the output message depends on the `WebFluxRequestExecutingMessageHandler` configuration.
The `setExpectedResponseType(Class>)` or `setExpectedResponseTypeExpression(Expression)` identifies the target type of the response body element conversion.
If `replyPayloadToFlux` is set to `true`, the response body is converted to a `Flux` with the provided `expectedResponseType` for each element, and this `Flux` is sent as the payload downstream.
Afterwards, you can use a [splitter](./splitter.html#splitter) to iterate over this `Flux` in a reactive manner.
In addition a `BodyExtractor, ClientHttpResponse>` can be injected into the `WebFluxRequestExecutingMessageHandler` instead of the `expectedResponseType` and `replyPayloadToFlux` properties.
It can be used for low-level access to the `ClientHttpResponse` and more control over body and HTTP headers conversion.
Spring Integration provides `ClientHttpResponseBodyExtractor` as a identity function to produce (downstream) the whole `ClientHttpResponse` and any other possible custom logic.
Starting with version 5.2, the `WebFluxRequestExecutingMessageHandler` supports reactive `Publisher`, `Resource`, and `MultiValueMap` types as the request message payload.
A respective `BodyInserter` is used internally to be populated into the `WebClient.RequestBodySpec`.
When the payload is a reactive `Publisher`, a configured `publisherElementType` or `publisherElementTypeExpression` can be used to determine a type for the publisher’s element type.
The expression must be resolved to a `Class>`, `String` which is resolved to the target `Class>` or `ParameterizedTypeReference`.
Starting with version 5.5, the `WebFluxRequestExecutingMessageHandler` exposes an `extractResponseBody` flag (which is `true` by default) to return just the response body, or to return the whole `ResponseEntity` as the reply message payload, independently of the provided `expectedResponseType` or `replyPayloadToFlux`.
If a body is not present in the `ResponseEntity`, this flag is ignored and the whole `ResponseEntity` is returned.
See [HTTP Outbound Components](./http.html#http-outbound) for more possible configuration options.
### WebFlux Header Mappings
Since WebFlux components are fully based on the HTTP protocol, there is no difference in the HTTP headers mapping.
See [HTTP Header Mappings](./http.html#http-header-mapping) for more possible options and components to use for mapping headers.