# RSocket Support ## RSocket Support The RSocket Spring Integration module (`spring-integration-rsocket`) allows for executions of [RSocket application protocol](https://rsocket.io/). You need to include this dependency into your project: Maven ``` org.springframework.integration spring-integration-rsocket 5.5.9 ``` Gradle ``` compile "org.springframework.integration:spring-integration-rsocket:5.5.9" ``` This module is available starting with version 5.2 and is based on the Spring Messaging foundation with its RSocket component implementations, such as `RSocketRequester`, `RSocketMessageHandler` and `RSocketStrategies`. See [Spring Framework RSocket Support](https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#rsocket) for more information about the RSocket protocol, terminology and components. Before starting an integration flow processing via channel adapters, we need to establish an RSocket connection between server and client. For this purpose, Spring Integration RSocket support provides the `ServerRSocketConnector` and `ClientRSocketConnector` implementations of the `AbstractRSocketConnector`. The `ServerRSocketConnector` exposes a listener on the host and port according to provided `io.rsocket.transport.ServerTransport` for accepting connections from clients. An internal `RSocketServer` instance can be customized with the `setServerConfigurer()`, as well as other options that can be configured, e.g. `RSocketStrategies` and `MimeType` for payload data and headers metadata. When a `setupRoute` is provided from the client requester (see `ClientRSocketConnector` below), a connected client is stored as a `RSocketRequester` under the key determined by the `clientRSocketKeyStrategy` `BiFunction, DataBuffer, Object>`. By default a connect data is used for the key as a converted value to string with UTF-8 charset. Such an `RSocketRequester` registry can be used in the application logic to determine a particular client connection for interaction with it, or for publishing the same message to all connected clients. When a connection is established from the client, an `RSocketConnectedEvent` is emitted from the `ServerRSocketConnector`. This is similar to what is provided by the `@ConnectMapping` annotation in Spring Messaging module. The mapping pattern `*` means accept all the client routes. The `RSocketConnectedEvent` can be used to distinguish different routes via `DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER` header. A typical server configuration might look like this: ``` @Bean public RSocketStrategies rsocketStrategies() { return RSocketStrategies.builder() .decoder(StringDecoder.textPlainOnly()) .encoder(CharSequenceEncoder.allMimeTypes()) .dataBufferFactory(new DefaultDataBufferFactory(true)) .build(); } @Bean public ServerRSocketConnector serverRSocketConnector() { ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0); serverRSocketConnector.setRSocketStrategies(rsocketStrategies()); serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0")); serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY)); serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> "" + headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER)); return serverRSocketConnector; } @EventListener public void onApplicationEvent(RSocketConnectedEvent event) { ... } ``` All the options, including `RSocketStrategies` bean and `@EventListener` for `RSocketConnectedEvent`, are optional. See `ServerRSocketConnector` JavaDocs for more information. Starting with version 5.2.1, the `ServerRSocketMessageHandler` is extracted to a public, top-level class for possible connection with an existing RSocket server. When a `ServerRSocketConnector` is supplied with an external instance of `ServerRSocketMessageHandler`, it doesn’t create an RSocket server internally and just delegates all the handling logic to the provided instance. In addition the `ServerRSocketMessageHandler` can be configured with a `messageMappingCompatible` flag to handle also `@MessageMapping` for an RSocket controller, fully replacing the functionality provided by the standard `RSocketMessageHandler`. This can be useful in mixed configurations, when classic `@MessageMapping` methods are present in the same application along with RSocket channel adapters and an externally configured RSocket server is present in the application. The `ClientRSocketConnector` serves as a holder for `RSocketRequester` based on the `RSocket` connected via the provided `ClientTransport`. The `RSocketConnector` can be customized with the provided `RSocketConnectorConfigurer`. The `setupRoute` (with optional templates variables) and `setupData` with metadata can be also configured on this component. A typical client configuration might look like this: ``` @Bean public RSocketStrategies rsocketStrategies() { return RSocketStrategies.builder() .decoder(StringDecoder.textPlainOnly()) .encoder(CharSequenceEncoder.allMimeTypes()) .dataBufferFactory(new DefaultDataBufferFactory(true)) .build(); } @Bean public ClientRSocketConnector clientRSocketConnector() { ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block()); clientRSocketConnector.setRSocketStrategies(rsocketStrategies()); clientRSocketConnector.setSetupRoute("clientConnect/{user}"); clientRSocketConnector.setSetupRouteVariables("myUser"); return clientRSocketConnector; } ``` Most of these options (including `RSocketStrategies` bean) are optional. Note how we connect to the locally started RSocket server on the arbitrary port. See `ServerRSocketConnector.clientRSocketKeyStrategy` for `setupData` use cases. Also see `ClientRSocketConnector` and its `AbstractRSocketConnector` superclass JavaDocs for more information. Both `ClientRSocketConnector` and `ServerRSocketConnector` are responsible for mapping inbound channel adapters to their `path` configuration for routing incoming RSocket requests. See the next section for more information. ### RSocket Inbound Gateway The `RSocketInboundGateway` is responsible for receiving RSocket requests and producing responses (if any). It requires an array of `path` mapping which could be as patterns similar to MVC request mapping or `@MessageMapping` semantics. In addition (since version 5.2.2), a set of interaction models (see `RSocketInteractionModel`) can be configured on the `RSocketInboundGateway` to restrict RSocket requests to this endpoint by the particular frame type. By default all the interaction models are supported. Such a bean, according its `IntegrationRSocketEndpoint` implementation (extension of a `ReactiveMessageHandler`), is auto detected either by the `ServerRSocketConnector` or `ClientRSocketConnector` for a routing logic in the internal `IntegrationRSocketMessageHandler` for incoming requests. An `AbstractRSocketConnector` can be provided to the `RSocketInboundGateway` for explicit endpoint registration. This way, the auto-detection option is disabled on that `AbstractRSocketConnector`. The `RSocketStrategies` can also be injected into the `RSocketInboundGateway` or they are obtained from the provided `AbstractRSocketConnector` overriding any explicit injection. Decoders are used from those `RSocketStrategies` to decode a request payload according to the provided `requestElementType`. If an `RSocketPayloadReturnValueHandler.RESPONSE_HEADER` header is not provided in incoming the `Message`, the `RSocketInboundGateway` treats a request as a `fireAndForget` RSocket interaction model. In this case, an `RSocketInboundGateway` performs a plain `send` operation into the `outputChannel`. Otherwise a `MonoProcessor` value from the `RSocketPayloadReturnValueHandler.RESPONSE_HEADER` header is used for sending a reply to the RSocket. For this purpose, an `RSocketInboundGateway` performs a `sendAndReceiveMessageReactive` operation on the `outputChannel`. The `payload` of the message to send downstream is always a `Flux` according to `MessagingRSocket` logic. When in a `fireAndForget` RSocket interaction model, the message has a plain converted `payload`. The reply `payload` could be a plain object or a `Publisher` - the `RSocketInboundGateway` converts both of them properly into an RSocket response according to the encoders provided in the `RSocketStrategies`. Starting with version 5.3, a `decodeFluxAsUnit` option (default `false`) is added to the `RSocketInboundGateway`. By default incoming `Flux` is transformed the way that each its event is decoded separately. This is an exact behavior present currently with `@MessageMapping` semantics. To restore a previous behavior or decode the whole `Flux` as single unit according application requirements, the `decodeFluxAsUnit` has to be set to `true`. However the target decoding logic depends on the `Decoder` selected, e.g. a `StringDecoder` requires a new line separator (by default) to be present in the stream to indicate a byte buffer end. See [Configuring RSocket Endpoints with Java](#rsocket-java-config) for samples how to configure an `RSocketInboundGateway` endpoint and deal with payloads downstream. ### RSocket Outbound Gateway The `RSocketOutboundGateway` is an `AbstractReplyProducingMessageHandler` to perform requests into RSocket and produce replies based on the RSocket replies (if any). A low level RSocket protocol interaction is delegated into an `RSocketRequester` resolved from the provided `ClientRSocketConnector` or from the `RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER` header in the request message on the server side. A target `RSocketRequester` on the server side can be resolved from an `RSocketConnectedEvent` or using `ServerRSocketConnector.getClientRSocketRequester()` API according some business key selected for connect request mappings via `ServerRSocketConnector.setClientRSocketKeyStrategy()`. See `ServerRSocketConnector` JavaDocs for more information. The `route` to send request has to be configured explicitly (together with path variables) or via a SpEL expression which is evaluated against request message. The RSocket interaction model can be provided via `RSocketInteractionModel` option or respective expression setting. By default a `requestResponse` is used for common gateway use-cases. When request message payload is a `Publisher`, a `publisherElementType` option can be provided to encode its elements according an `RSocketStrategies` supplied in the target `RSocketRequester`. An expression for this option can evaluate to a `ParameterizedTypeReference`. See the `RSocketRequester.RequestSpec.data()` JavaDocs for more information about data and its type. An RSocket request can also be enhanced with a `metadata`. For this purpose a `metadataExpression` against request message can be configured on the `RSocketOutboundGateway`. Such an expression must evaluate to a `Map`. When `interactionModel` is not `fireAndForget`, an `expectedResponseType` must be supplied. It is a `String.class` by default. An expression for this option can evaluate to a `ParameterizedTypeReference`. See the `RSocketRequester.RetrieveSpec.retrieveMono()` and `RSocketRequester.RetrieveSpec.retrieveFlux()` JavaDocs for more information about reply data and its type. A reply `payload` from the `RSocketOutboundGateway` is a `Mono` (even for a `fireAndForget` interaction model it is `Mono`) always making this component as `async`. Such a `Mono` is subscribed before producing into the `outputChannel` for regular channels or processed on demand by the `FluxMessageChannel`. A `Flux` response for the `requestStream` or `requestChannel` interaction model is also wrapped into a reply `Mono`. It can be flattened downstream by the `FluxMessageChannel` with a passthrough service activator: ``` @ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel") public Flux flattenRSocketResponse(Flux payload) { return payload; } ``` Or subscribed explicitly in the target application logic. The expected response type can also be configured (or evaluated via expression) to `void` treating this gateway as an outbound channel adapter. However the `outputChannel` still has to be configured (even if it just a `NullChannel`) to initiate a subscription to the returned `Mono`. See [Configuring RSocket Endpoints with Java](#rsocket-java-config) for samples how to configure an `RSocketOutboundGateway` endpoint a deal with payloads downstream. ### RSocket Namespace Support Spring Integration provides an `rsocket` namespace and the corresponding schema definition. To include it in your configuration, add the following namespace declaration in your application context configuration file: ``` ... ``` #### Inbound To configure Spring Integration RSocket inbound channel adapters with XML, you need to use an appropriate `inbound-gateway` components from the `int-rsocket` namespace. The following example shows how to configure it: ``` ``` A `ClientRSocketConnector` and `ServerRSocketConnector` should be configured as generic `` definitions. #### Outbound ``` ``` See `spring-integration-rsocket.xsd` for description for all those XML attributes. ### Configuring RSocket Endpoints with Java The following example shows how to configure an RSocket inbound endpoint with Java: ``` @Bean public RSocketInboundGateway rsocketInboundGatewayRequestReply() { RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo"); rsocketInboundGateway.setRequestChannelName("requestReplyChannel"); return rsocketInboundGateway; } @Transformer(inputChannel = "requestReplyChannel") public Mono echoTransformation(Flux payload) { return payload.next().map(String::toUpperCase); } ``` A `ClientRSocketConnector` or `ServerRSocketConnector` is assumed in this configuration with meaning for auto-detection of such an endpoint on the “echo” path. Pay attention to the `@Transformer` signature with its fully reactive processing of the RSocket requests and producing reactive replies. The following example shows how to configure a RSocket inbound gateway with the Java DSL: ``` @Bean public IntegrationFlow rsocketUpperCaseFlow() { return IntegrationFlows .from(RSockets.inboundGateway("/uppercase") .interactionModels(RSocketInteractionModel.requestChannel)) ., Mono>transform((flux) -> flux.next().map(String::toUpperCase)) .get(); } ``` A `ClientRSocketConnector` or `ServerRSocketConnector` is assumed in this configuration with meaning for auto-detection of such an endpoint on the “/uppercase” path and expected interaction model as “request channel”. The following example shows how to configure a RSocket outbound gateway with Java: ``` @Bean @ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel") public RSocketOutboundGateway rsocketOutboundGateway() { RSocketOutboundGateway rsocketOutboundGateway = new RSocketOutboundGateway( new FunctionExpression>((m) -> m.getHeaders().get("route_header"))); rsocketOutboundGateway.setInteractionModelExpression( new FunctionExpression>((m) -> m.getHeaders().get("rsocket_interaction_model"))); rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector()); return rsocketOutboundGateway; } ``` The `setClientRSocketConnector()` is required only for the client side. On the server side, the `RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER` header with an `RSocketRequester` value must be supplied in the request message. The following example shows how to configure a RSocket outbound gateway with the Java DSL: ``` @Bean public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) { return IntegrationFlows .from(Function.class) .handle(RSockets.outboundGateway("/uppercase") .interactionModel(RSocketInteractionModel.requestResponse) .expectedResponseType(String.class) .clientRSocketConnector(clientRSocketConnector)) .get(); } ``` See [`IntegrationFlow` as a Gateway](./dsl.html#integration-flow-as-gateway) for more information how to use a mentioned `Function` interface in the beginning of the flow above.