# RSocket 支持 ## RSocket 支持 RSocket Spring 集成模块(`spring-integration-rsocket`)允许执行[RSocket 应用程序协议](https://rsocket.io/)。 你需要在项目中包含此依赖项: Maven ``` org.springframework.integration spring-integration-rsocket 5.5.9 ``` Gradle ``` compile "org.springframework.integration:spring-integration-rsocket:5.5.9" ``` 该模块从版本 5.2 开始可用,并且基于 Spring 消息传递基础及其 RSocket 组件实现,例如`RSocketRequester`,`RSocketMessageHandler`和`RSocketStrategies`。有关 RSocket 协议、术语和组件的更多信息,请参见[Spring Framework RSocket Support](https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#rsocket)。 在通过通道适配器启动集成流处理之前,我们需要在服务器和客户机之间建立一个 RSocket 连接。为此, Spring 集成 RSocket 支持提供了`ServerRSocketConnector`和`ClientRSocketConnector`的`AbstractRSocketConnector`实现。 `ServerRSocketConnector`根据提供的`io.rsocket.transport.ServerTransport`在主机和端口上公开一个侦听器,用于接受来自客户端的连接。可以使用`setServerConfigurer()`自定义内部`RSocketServer`实例,以及可以配置的其他选项,例如。`RSocketStrategies`和`MimeType`用于负载数据和标头元数据。当一个`setupRoute`是从客户端请求者提供的(参见下面的`ClientRSocketConnector`)时,一个连接的客户端被存储为`RSocketRequester`在由`clientRSocketKeyStrategy``BiFunction, DataBuffer, Object>`确定的键下。默认情况下,连接数据被用来作为转换为具有 UTF-8 字符集的字符串的值。这样的`RSocketRequester`注册中心可以在应用程序逻辑中用于确定用于与其进行交互的特定客户端连接,或者用于向所有连接的客户端发布相同的消息。当从客户端建立连接时,`RSocketConnectedEvent`将从`ServerRSocketConnector`发出一个`RSocketConnectedEvent`。这类似于 Spring 消息传递模块中的`@ConnectMapping`注释所提供的内容。映射模式`*`表示接受所有的客户端路由。`RSocketConnectedEvent`可用于通过`DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER`报头来区分不同的路线。 典型的服务器配置可能如下所示: ``` @Bean public RSocketStrategies rsocketStrategies() { return RSocketStrategies.builder() .decoder(StringDecoder.textPlainOnly()) .encoder(CharSequenceEncoder.allMimeTypes()) .dataBufferFactory(new DefaultDataBufferFactory(true)) .build(); } @Bean public ServerRSocketConnector serverRSocketConnector() { ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0); serverRSocketConnector.setRSocketStrategies(rsocketStrategies()); serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0")); serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY)); serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> "" + headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER)); return serverRSocketConnector; } @EventListener public void onApplicationEvent(RSocketConnectedEvent event) { ... } ``` 所有选项,包括`RSocketStrategies` Bean 和`@EventListener`对于`RSocketConnectedEvent`,都是可选的。有关更多信息,请参见`ServerRSocketConnector`Javadocs。 从版本 5.2.1 开始,将`ServerRSocketMessageHandler`提取到一个公共的顶级类,以便与现有的 RSocket 服务器进行可能的连接。当`ServerRSocketConnector`与`ServerRSocketMessageHandler`的外部实例一起提供时,它不会在内部创建 RSocket 服务器,而只是将所有处理逻辑委托给所提供的实例。此外,`ServerRSocketMessageHandler`还可以配置一个`messageMappingCompatible`标志来处理用于 RSocket 控制器的`@MessageMapping`,从而完全取代了标准`RSocketMessageHandler`所提供的功能。这在混合配置中是有用的,当经典的`@MessageMapping`方法与 RSocket 通道适配器一起存在于同一个应用程序中,并且在应用程序中存在外部配置的 RSocket 服务器时。 基于通过所提供的`ClientTransport`连接的`RSocket`,`ClientRSocketConnector`充当`RSocketRequester`的持有者。可以使用提供的`RSocketConnectorConfigurer`来定制`RSocketConnector`。还可以在此组件上配置带有元数据的`setupRoute`(带有可选模板变量)和`setupData`。 典型的客户端配置可能如下所示: ``` @Bean public RSocketStrategies rsocketStrategies() { return RSocketStrategies.builder() .decoder(StringDecoder.textPlainOnly()) .encoder(CharSequenceEncoder.allMimeTypes()) .dataBufferFactory(new DefaultDataBufferFactory(true)) .build(); } @Bean public ClientRSocketConnector clientRSocketConnector() { ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block()); clientRSocketConnector.setRSocketStrategies(rsocketStrategies()); clientRSocketConnector.setSetupRoute("clientConnect/{user}"); clientRSocketConnector.setSetupRouteVariables("myUser"); return clientRSocketConnector; } ``` 这些选项中的大多数(包括`RSocketStrategies` Bean)都是可选的。请注意我们是如何连接到任意端口上本地启动的 RSocket 服务器的。关于`setupData`用例,请参见`ServerRSocketConnector.clientRSocketKeyStrategy`。另请参见`ClientRSocketConnector`及其`AbstractRSocketConnector`超类 Javadocs 以获取更多信息。 `ClientRSocketConnector`和`ServerRSocketConnector`都负责将入站通道适配器映射到它们的`path`配置,用于路由传入的 RSocket 请求。有关更多信息,请参见下一节。 ### RSocket 入站网关 `RSocketInboundGateway`负责接收 RSocket 请求并产生响应(如果有的话)。它需要一个`path`映射的数组,它可以作为类似于 MVC 请求映射或`@MessageMapping`语义的模式。此外(自 5.2.2 版本以来),可以在`RSocketInboundGateway`上配置一组交互模型(参见`RSocketInteractionModel`),以根据特定的帧类型将 RSocket 请求限制到该端点。默认情况下,所有的交互模型都是支持的。这样的 Bean,根据其`IntegrationRSocketEndpoint`实现(扩展 a`ReactiveMessageHandler`),可以通过`ServerRSocketConnector`或`ClientRSocketConnector`对于内部路由逻辑中的`IntegrationRSocketMessageHandler`对传入请求进行自动检测。可以将`AbstractRSocketConnector`提供给`RSocketInboundGateway`以用于显式端点注册。通过这种方式,在`AbstractRSocketConnector`上禁用自动检测选项。也可以将`RSocketStrategies`注入到`RSocketInboundGateway`中,或者它们是从提供的覆盖任何显式注入的`AbstractRSocketConnector`中获得的。解码器是使用来自那些`RSocketStrategies`来根据所提供的`requestElementType`来解码请求有效负载的。如果在传入的`RSocketPayloadReturnValueHandler.RESPONSE_HEADER`中没有提供`RSocketPayloadReturnValueHandler.RESPONSE_HEADER`头,则`RSocketInboundGateway`将请求视为`fireAndForget`rsocket 交互模型。在这种情况下,`RSocketInboundGateway`在`outputChannel`中执行普通的`send`操作。否则,将使用来自`RSocketPayloadReturnValueHandler.RESPONSE_HEADER`头的`MonoProcessor`值向 RSocket 发送回复。为此,`RSocketInboundGateway`在`outputChannel`上执行`sendAndReceiveMessageReactive`操作。根据`MessagingRSocket`逻辑,向下游发送的消息的`payload`始终是`Flux`。当在`fireAndForget`RSocket 交互模型中时,消息有一个简单的转换`payload`。回复`payload`可以是一个普通对象,也可以是`Publisher`-`RSocketInboundGateway`根据`RSocketStrategies`中提供的编码器,将它们正确地转换为 RSocket 响应。 从版本 5.3 开始,向`RSocketInboundGateway`添加一个`decodeFluxAsUnit`选项(默认`false`)。默认情况下,传入`Flux`的转换方式是将其每个事件分别解码。这是当前使用`@MessageMapping`语义存在的精确行为。要根据应用程序的要求恢复以前的行为或将整个`Flux`解码为单个单元,必须将`decodeFluxAsUnit`设置为`true`。然而,目标解码逻辑依赖于所选择的`Decoder`,例如,a`StringDecoder`需要一个新的行分隔符(默认情况下)来表示流中的字节缓冲区结束。 有关如何配置`RSocketInboundGateway`端点和处理下游有效载荷的示例,请参见[用 Java 配置 RSocket 端点](#rsocket-java-config)。 ### RSocket 出站网关 `RSocketOutboundGateway`是一个`AbstractReplyProducingMessageHandler`,用于向 RSocket 执行请求并根据 RSocket 答复(如果有的话)生成答复。从服务器端的请求消息中提供的`ClientRSocketConnector`或`RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER`头,将低级别的 RSocket 协议交互委托给一个`RSocketRequester`解析。服务器端的目标`RSocketRequester`可以通过`RSocketConnectedEvent`或使用`ServerRSocketConnector.getClientRSocketRequester()`API 根据通过`ServerRSocketConnector.setClientRSocketKeyStrategy()`为连接请求映射选择的一些业务密钥进行解析。有关更多信息,请参见`ServerRSocketConnector`Javadocs。 发送请求的`route`必须显式地配置(连同路径变量)或通过 SPEL 表达式配置,SPEL 表达式根据请求消息进行求值。 可以通过`RSocketInteractionModel`选项或相应的表达式设置提供 RSocket 交互模型。默认情况下,`requestResponse`用于通用网关用例。 当请求消息有效负载是`Publisher`时,可以提供一个`publisherElementType`选项来根据目标`RSocketStrategies`中提供的`RSocketStrategies`对其元素进行编码。这个选项的表达式可以计算为`ParameterizedTypeReference`。有关数据及其类型的更多信息,请参见`RSocketRequester.RequestSpec.data()`Javadocs。 还可以使用`metadata`增强 RSocket 请求。为此,可以在`RSocketOutboundGateway`上配置针对请求的`metadataExpression`消息。这样的表达式必须求值为`Map`。 当`interactionModel`不是`fireAndForget`时,必须提供`expectedResponseType`。默认情况下是`String.class`。这个选项的表达式可以计算为`ParameterizedTypeReference`。有关回复数据及其类型的更多信息,请参见`RSocketRequester.RetrieveSpec.retrieveMono()`和`RSocketRequester.RetrieveSpec.retrieveFlux()`Javadocs。 来自`RSocketOutboundGateway`的回复`payload`是`Mono`(即使对于`fireAndForget`交互模型,它也是`Mono`),始终将此组件设置为`async`。这样的`Mono`是在生产前订阅的`outputChannel`为常规通道或按需处理的`FluxMessageChannel`。对于`requestStream`或`requestChannel`交互模型的`Flux`响应也被包装到一个回复`Mono`中。它可以通过`FluxMessageChannel`的传递服务激活器在下游进行平坦化: ``` @ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel") public Flux flattenRSocketResponse(Flux payload) { return payload; } ``` 或在目标应用程序逻辑中显式订阅。 还可以将期望的响应类型配置(或通过表达式进行评估)为`void`将此网关视为出站通道适配器。然而,`outputChannel`仍然必须被配置(即使它只是一个`NullChannel`)以启动对返回的`Mono`的订阅。 有关示例,请参见[用 Java 配置 RSocket 端点](#rsocket-java-config)如何配置`RSocketOutboundGateway`端点与下游有效载荷的交易。 ### RSocket 名称空间支持 Spring 集成提供了`rsocket`名称空间和相应的模式定义。要将其包含在配置中,请在应用程序上下文配置文件中添加以下名称空间声明: ``` ... ``` #### 入站 要用 XML 配置 Spring 集成 RSocket 入站通道适配器,需要使用来自`int-rsocket`名称空间的适当的`inbound-gateway`组件。下面的示例展示了如何配置它: ``` ``` a`ClientRSocketConnector`和`ServerRSocketConnector`应配置为通用的``定义。 #### 出站 ``` ``` 有关所有这些 XML 属性的描述,请参见`spring-integration-rsocket.xsd`。 ### 使用 Java 配置 RSocket 端点 下面的示例展示了如何使用 Java 配置 RSocket 入站端点: ``` @Bean public RSocketInboundGateway rsocketInboundGatewayRequestReply() { RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo"); rsocketInboundGateway.setRequestChannelName("requestReplyChannel"); return rsocketInboundGateway; } @Transformer(inputChannel = "requestReplyChannel") public Mono echoTransformation(Flux payload) { return payload.next().map(String::toUpperCase); } ``` 在此配置中,假设`ClientRSocketConnector`或`ServerRSocketConnector`具有自动检测“echo”路径上的此类端点的意义。请注意`@Transformer`签名及其对 RSocket 请求的完全反应性处理和产生反应性答复。 下面的示例展示了如何使用 Java DSL 配置 RSocket 入站网关: ``` @Bean public IntegrationFlow rsocketUpperCaseFlow() { return IntegrationFlows .from(RSockets.inboundGateway("/uppercase") .interactionModels(RSocketInteractionModel.requestChannel)) ., Mono>transform((flux) -> flux.next().map(String::toUpperCase)) .get(); } ``` 在此配置中假定`ClientRSocketConnector`或`ServerRSocketConnector`,其含义是自动检测“/大写”路径上的此类端点,并将期望的交互模型称为“请求通道”。 下面的示例展示了如何使用 Java 配置 RSocket 出站网关: ``` @Bean @ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel") public RSocketOutboundGateway rsocketOutboundGateway() { RSocketOutboundGateway rsocketOutboundGateway = new RSocketOutboundGateway( new FunctionExpression>((m) -> m.getHeaders().get("route_header"))); rsocketOutboundGateway.setInteractionModelExpression( new FunctionExpression>((m) -> m.getHeaders().get("rsocket_interaction_model"))); rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector()); return rsocketOutboundGateway; } ``` `setClientRSocketConnector()`仅对客户端是必需的。在服务器端,必须在请求消息中提供带有`RSocketRequester`值的`RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER`头。 下面的示例展示了如何使用 Java DSL 配置 RSocket 出站网关: ``` @Bean public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) { return IntegrationFlows .from(Function.class) .handle(RSockets.outboundGateway("/uppercase") .interactionModel(RSocketInteractionModel.requestResponse) .expectedResponseType(String.class) .clientRSocketConnector(clientRSocketConnector)) .get(); } ``` 参见[`IntegrationFlow`作为网关](./dsl.html#integration-flow-as-gateway)以获取更多信息,如何在上面的流的开头使用提到的`Function`接口。