141.b08ac1ec.js 27.1 KB
Newer Older
茶陵後's avatar
茶陵後 已提交
1
(window.webpackJsonp=window.webpackJsonp||[]).push([[141],{564:function(e,t,o){"use strict";o.r(t);var n=o(56),a=Object(n.a)({},(function(){var e=this,t=e.$createElement,o=e._self._c||t;return o("ContentSlotsDistributor",{attrs:{"slot-key":e.$parent.slotKey}},[o("h1",{attrs:{id:"rsocket-support"}},[o("a",{staticClass:"header-anchor",attrs:{href:"#rsocket-support"}},[e._v("#")]),e._v(" RSocket Support")]),e._v(" "),o("h2",{attrs:{id:"rsocket-support-2"}},[o("a",{staticClass:"header-anchor",attrs:{href:"#rsocket-support-2"}},[e._v("#")]),e._v(" RSocket Support")]),e._v(" "),o("p",[e._v("The RSocket Spring Integration module ("),o("code",[e._v("spring-integration-rsocket")]),e._v(") allows for executions of "),o("a",{attrs:{href:"https://rsocket.io/",target:"_blank",rel:"noopener noreferrer"}},[e._v("RSocket application protocol"),o("OutboundLink")],1),e._v(".")]),e._v(" "),o("p",[e._v("You need to include this dependency into your project:")]),e._v(" "),o("p",[e._v("Maven")]),e._v(" "),o("div",{staticClass:"language- extra-class"},[o("pre",{pre:!0,attrs:{class:"language-text"}},[o("code",[e._v("<dependency>\n    <groupId>org.springframework.integration</groupId>\n    <artifactId>spring-integration-rsocket</artifactId>\n    <version>5.5.9</version>\n</dependency>\n")])])]),o("p",[e._v("Gradle")]),e._v(" "),o("div",{staticClass:"language- extra-class"},[o("pre",{pre:!0,attrs:{class:"language-text"}},[o("code",[e._v('compile "org.springframework.integration:spring-integration-rsocket:5.5.9"\n')])])]),o("p",[e._v("This module is available starting with version 5.2 and is based on the Spring Messaging foundation with its RSocket component implementations, such as "),o("code",[e._v("RSocketRequester")]),e._v(", "),o("code",[e._v("RSocketMessageHandler")]),e._v(" and "),o("code",[e._v("RSocketStrategies")]),e._v(".\nSee "),o("a",{attrs:{href:"https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#rsocket",target:"_blank",rel:"noopener noreferrer"}},[e._v("Spring Framework RSocket Support"),o("OutboundLink")],1),e._v(" for more information about the RSocket protocol, terminology and components.")]),e._v(" "),o("p",[e._v("Before starting an integration flow processing via channel adapters, we need to establish an RSocket connection between server and client.\nFor this purpose, Spring Integration RSocket support provides the "),o("code",[e._v("ServerRSocketConnector")]),e._v(" and "),o("code",[e._v("ClientRSocketConnector")]),e._v(" implementations of the "),o("code",[e._v("AbstractRSocketConnector")]),e._v(".")]),e._v(" "),o("p",[e._v("The "),o("code",[e._v("ServerRSocketConnector")]),e._v(" exposes a listener on the host and port according to provided "),o("code",[e._v("io.rsocket.transport.ServerTransport")]),e._v(" for accepting connections from clients.\nAn internal "),o("code",[e._v("RSocketServer")]),e._v(" instance can be customized with the "),o("code",[e._v("setServerConfigurer()")]),e._v(", as well as other options that can be configured, e.g. "),o("code",[e._v("RSocketStrategies")]),e._v(" and "),o("code",[e._v("MimeType")]),e._v(" for payload data and headers metadata.\nWhen a "),o("code",[e._v("setupRoute")]),e._v(" is provided from the client requester (see "),o("code",[e._v("ClientRSocketConnector")]),e._v(" below), a connected client is stored as a "),o("code",[e._v("RSocketRequester")]),e._v(" under the key determined by the "),o("code",[e._v("clientRSocketKeyStrategy")]),e._v(" "),o("code",[e._v("BiFunction<Map<String, Object>, DataBuffer, Object>")]),e._v(".\nBy default a connect data is used for the key as a converted value to string with UTF-8 charset.\nSuch an "),o("code",[e._v("RSocketRequester")]),e._v(" registry can be used in the application logic to determine a particular client connection for interaction with it, or for publishing the same message to all connected clients.\nWhen a connection is established from the client, an "),o("code",[e._v("RSocketConnectedEvent")]),e._v(" is emitted from the "),o("code",[e._v("ServerRSocketConnector")]),e._v(".\nThis is similar to what is provided by the "),o("code",[e._v("@ConnectMapping")]),e._v(" annotation in Spring Messaging module.\nThe mapping pattern "),o("code",[e._v("*")]),e._v(" means accept all the client routes.\nThe "),o("code",[e._v("RSocketConnectedEvent")]),e._v(" can be used to distinguish different routes via "),o("code",[e._v("DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER")]),e._v(" header.")]),e._v(" "),o("p",[e._v("A typical server configuration might look like this:")]),e._v(" "),o("div",{staticClass:"language- extra-class"},[o("pre",{pre:!0,attrs:{class:"language-text"}},[o("code",[e._v('@Bean\npublic RSocketStrategies rsocketStrategies() {\n    return RSocketStrategies.builder()\n        .decoder(StringDecoder.textPlainOnly())\n        .encoder(CharSequenceEncoder.allMimeTypes())\n        .dataBufferFactory(new DefaultDataBufferFactory(true))\n        .build();\n}\n\n@Bean\npublic ServerRSocketConnector serverRSocketConnector() {\n    ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);\n    serverRSocketConnector.setRSocketStrategies(rsocketStrategies());\n    serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));\n    serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));\n    serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""\n                                    + headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));\n    return serverRSocketConnector;\n}\n\n@EventListener\npublic void onApplicationEvent(RSocketConnectedEvent event) {\n\t...\n}\n')])])]),o("p",[e._v("All the options, including "),o("code",[e._v("RSocketStrategies")]),e._v(" bean and "),o("code",[e._v("@EventListener")]),e._v(" for "),o("code",[e._v("RSocketConnectedEvent")]),e._v(", are optional.\nSee "),o("code",[e._v("ServerRSocketConnector")]),e._v(" JavaDocs for more information.")]),e._v(" "),o("p",[e._v("Starting with version 5.2.1, the "),o("code",[e._v("ServerRSocketMessageHandler")]),e._v(" is extracted to a public, top-level class for possible connection with an existing RSocket server.\nWhen a "),o("code",[e._v("ServerRSocketConnector")]),e._v(" is supplied with an external instance of "),o("code",[e._v("ServerRSocketMessageHandler")]),e._v(", it doesn’t create an RSocket server internally and just delegates all the handling logic to the provided instance.\nIn addition the "),o("code",[e._v("ServerRSocketMessageHandler")]),e._v(" can be configured with a "),o("code",[e._v("messageMappingCompatible")]),e._v(" flag to handle also "),o("code",[e._v("@MessageMapping")]),e._v(" for an RSocket controller, fully replacing the functionality provided by the standard "),o("code",[e._v("RSocketMessageHandler")]),e._v(".\nThis can be useful in mixed configurations, when classic "),o("code",[e._v("@MessageMapping")]),e._v(" methods are present in the same application along with RSocket channel adapters and an externally configured RSocket server is present in the application.")]),e._v(" "),o("p",[e._v("The "),o("code",[e._v("ClientRSocketConnector")]),e._v(" serves as a holder for "),o("code",[e._v("RSocketRequester")]),e._v(" based on the "),o("code",[e._v("RSocket")]),e._v(" connected via the provided "),o("code",[e._v("ClientTransport")]),e._v(".\nThe "),o("code",[e._v("RSocketConnector")]),e._v(" can be customized with the provided "),o("code",[e._v("RSocketConnectorConfigurer")]),e._v(".\nThe "),o("code",[e._v("setupRoute")]),e._v(" (with optional templates variables) and "),o("code",[e._v("setupData")]),e._v(" with metadata can be also configured on this component.")]),e._v(" "),o("p",[e._v("A typical client configuration might look like this:")]),e._v(" "),o("div",{staticClass:"language- extra-class"},[o("pre",{pre:!0,attrs:{class:"language-text"}},[o("code",[e._v('@Bean\npublic RSocketStrategies rsocketStrategies() {\n    return RSocketStrategies.builder()\n        .decoder(StringDecoder.textPlainOnly())\n        .encoder(CharSequenceEncoder.allMimeTypes())\n        .dataBufferFactory(new DefaultDataBufferFactory(true))\n        .build();\n}\n\n@Bean\npublic ClientRSocketConnector clientRSocketConnector() {\n    ClientRSocketConnector clientRSocketConnector =\n            new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());\n    clientRSocketConnector.setRSocketStrategies(rsocketStrategies());\n    clientRSocketConnector.setSetupRoute("clientConnect/{user}");\n    clientRSocketConnector.setSetupRouteVariables("myUser");\n    return clientRSocketConnector;\n}\n')])])]),o("p",[e._v("Most of these options (including "),o("code",[e._v("RSocketStrategies")]),e._v(" bean) are optional.\nNote how we connect to the locally started RSocket server on the arbitrary port.\nSee "),o("code",[e._v("ServerRSocketConnector.clientRSocketKeyStrategy")]),e._v(" for "),o("code",[e._v("setupData")]),e._v(" use cases.\nAlso see "),o("code",[e._v("ClientRSocketConnector")]),e._v(" and its "),o("code",[e._v("AbstractRSocketConnector")]),e._v(" superclass JavaDocs for more information.")]),e._v(" "),o("p",[e._v("Both "),o("code",[e._v("ClientRSocketConnector")]),e._v(" and "),o("code",[e._v("ServerRSocketConnector")]),e._v(" are responsible for mapping inbound channel adapters to their "),o("code",[e._v("path")]),e._v(" configuration for routing incoming RSocket requests.\nSee the next section for more information.")]),e._v(" "),o("h3",{attrs:{id:"rsocket-inbound-gateway"}},[o("a",{staticClass:"header-anchor",attrs:{href:"#rsocket-inbound-gateway"}},[e._v("#")]),e._v(" RSocket Inbound Gateway")]),e._v(" "),o("p",[e._v("The "),o("code",[e._v("RSocketInboundGateway")]),e._v(" is responsible for receiving RSocket requests and producing responses (if any).\nIt requires an array of "),o("code",[e._v("path")]),e._v(" mapping which could be as patterns similar to MVC request mapping or "),o("code",[e._v("@MessageMapping")]),e._v(" semantics.\nIn addition (since version 5.2.2), a set of interaction models (see "),o("code",[e._v("RSocketInteractionModel")]),e._v(") can be configured on the "),o("code",[e._v("RSocketInboundGateway")]),e._v(" to restrict RSocket requests to this endpoint by the particular frame type.\nBy default all the interaction models are supported.\nSuch a bean, according its "),o("code",[e._v("IntegrationRSocketEndpoint")]),e._v(" implementation (extension of a "),o("code",[e._v("ReactiveMessageHandler")]),e._v("), is auto detected either by the "),o("code",[e._v("ServerRSocketConnector")]),e._v(" or "),o("code",[e._v("ClientRSocketConnector")]),e._v(" for a routing logic in the internal "),o("code",[e._v("IntegrationRSocketMessageHandler")]),e._v(" for incoming requests.\nAn "),o("code",[e._v("AbstractRSocketConnector")]),e._v(" can be provided to the "),o("code",[e._v("RSocketInboundGateway")]),e._v(" for explicit endpoint registration.\nThis way, the auto-detection option is disabled on that "),o("code",[e._v("AbstractRSocketConnector")]),e._v(".\nThe "),o("code",[e._v("RSocketStrategies")]),e._v(" can also be injected into the "),o("code",[e._v("RSocketInboundGateway")]),e._v(" or they are obtained from the provided "),o("code",[e._v("AbstractRSocketConnector")]),e._v(" overriding any explicit injection.\nDecoders are used from those "),o("code",[e._v("RSocketStrategies")]),e._v(" to decode a request payload according to the provided "),o("code",[e._v("requestElementType")]),e._v(".\nIf an "),o("code",[e._v("RSocketPayloadReturnValueHandler.RESPONSE_HEADER")]),e._v(" header is not provided in incoming the "),o("code",[e._v("Message")]),e._v(", the "),o("code",[e._v("RSocketInboundGateway")]),e._v(" treats a request as a "),o("code",[e._v("fireAndForget")]),e._v(" RSocket interaction model.\nIn this case, an "),o("code",[e._v("RSocketInboundGateway")]),e._v(" performs a plain "),o("code",[e._v("send")]),e._v(" operation into the "),o("code",[e._v("outputChannel")]),e._v(".\nOtherwise a "),o("code",[e._v("MonoProcessor")]),e._v(" value from the "),o("code",[e._v("RSocketPayloadReturnValueHandler.RESPONSE_HEADER")]),e._v(" header is used for sending a reply to the RSocket.\nFor this purpose, an "),o("code",[e._v("RSocketInboundGateway")]),e._v(" performs a "),o("code",[e._v("sendAndReceiveMessageReactive")]),e._v(" operation on the "),o("code",[e._v("outputChannel")]),e._v(".\nThe "),o("code",[e._v("payload")]),e._v(" of the message to send downstream is always a "),o("code",[e._v("Flux")]),e._v(" according to "),o("code",[e._v("MessagingRSocket")]),e._v(" logic.\nWhen in a "),o("code",[e._v("fireAndForget")]),e._v(" RSocket interaction model, the message has a plain converted "),o("code",[e._v("payload")]),e._v(".\nThe reply "),o("code",[e._v("payload")]),e._v(" could be a plain object or a "),o("code",[e._v("Publisher")]),e._v(" - the "),o("code",[e._v("RSocketInboundGateway")]),e._v(" converts both of them properly into an RSocket response according to the encoders provided in the "),o("code",[e._v("RSocketStrategies")]),e._v(".")]),e._v(" "),o("p",[e._v("Starting with version 5.3, a "),o("code",[e._v("decodeFluxAsUnit")]),e._v(" option (default "),o("code",[e._v("false")]),e._v(") is added to the "),o("code",[e._v("RSocketInboundGateway")]),e._v(".\nBy default incoming "),o("code",[e._v("Flux")]),e._v(" is transformed the way that each its event is decoded separately.\nThis is an exact behavior present currently with "),o("code",[e._v("@MessageMapping")]),e._v(" semantics.\nTo restore a previous behavior or decode the whole "),o("code",[e._v("Flux")]),e._v(" as single unit according application requirements, the "),o("code",[e._v("decodeFluxAsUnit")]),e._v(" has to be set to "),o("code",[e._v("true")]),e._v(".\nHowever the target decoding logic depends on the "),o("code",[e._v("Decoder")]),e._v(" selected, e.g. a "),o("code",[e._v("StringDecoder")]),e._v(" requires a new line separator (by default) to be present in the stream to indicate a byte buffer end.")]),e._v(" "),o("p",[e._v("See "),o("a",{attrs:{href:"#rsocket-java-config"}},[e._v("Configuring RSocket Endpoints with Java")]),e._v(" for samples how to configure an "),o("code",[e._v("RSocketInboundGateway")]),e._v(" endpoint and deal with payloads downstream.")]),e._v(" "),o("h3",{attrs:{id:"rsocket-outbound-gateway"}},[o("a",{staticClass:"header-anchor",attrs:{href:"#rsocket-outbound-gateway"}},[e._v("#")]),e._v(" RSocket Outbound Gateway")]),e._v(" "),o("p",[e._v("The "),o("code",[e._v("RSocketOutboundGateway")]),e._v(" is an "),o("code",[e._v("AbstractReplyProducingMessageHandler")]),e._v(" to perform requests into RSocket and produce replies based on the RSocket replies (if any).\nA low level RSocket protocol interaction is delegated into an "),o("code",[e._v("RSocketRequester")]),e._v(" resolved from the provided "),o("code",[e._v("ClientRSocketConnector")]),e._v(" or from the "),o("code",[e._v("RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER")]),e._v(" header in the request message on the server side.\nA target "),o("code",[e._v("RSocketRequester")]),e._v(" on the server side can be resolved from an "),o("code",[e._v("RSocketConnectedEvent")]),e._v(" or using "),o("code",[e._v("ServerRSocketConnector.getClientRSocketRequester()")]),e._v(" API according some business key selected for connect request mappings via "),o("code",[e._v("ServerRSocketConnector.setClientRSocketKeyStrategy()")]),e._v(".\nSee "),o("code",[e._v("ServerRSocketConnector")]),e._v(" JavaDocs for more information.")]),e._v(" "),o("p",[e._v("The "),o("code",[e._v("route")]),e._v(" to send request has to be configured explicitly (together with path variables) or via a SpEL expression which is evaluated against request message.")]),e._v(" "),o("p",[e._v("The RSocket interaction model can be provided via "),o("code",[e._v("RSocketInteractionModel")]),e._v(" option or respective expression setting.\nBy default a "),o("code",[e._v("requestResponse")]),e._v(" is used for common gateway use-cases.")]),e._v(" "),o("p",[e._v("When request message payload is a "),o("code",[e._v("Publisher")]),e._v(", a "),o("code",[e._v("publisherElementType")]),e._v(" option can be provided to encode its elements according an "),o("code",[e._v("RSocketStrategies")]),e._v(" supplied in the target "),o("code",[e._v("RSocketRequester")]),e._v(".\nAn expression for this option can evaluate to a "),o("code",[e._v("ParameterizedTypeReference")]),e._v(".\nSee the "),o("code",[e._v("RSocketRequester.RequestSpec.data()")]),e._v(" JavaDocs for more information about data and its type.")]),e._v(" "),o("p",[e._v("An RSocket request can also be enhanced with a "),o("code",[e._v("metadata")]),e._v(".\nFor this purpose a "),o("code",[e._v("metadataExpression")]),e._v(" against request message can be configured on the "),o("code",[e._v("RSocketOutboundGateway")]),e._v(".\nSuch an expression must evaluate to a "),o("code",[e._v("Map<Object, MimeType>")]),e._v(".")]),e._v(" "),o("p",[e._v("When "),o("code",[e._v("interactionModel")]),e._v(" is not "),o("code",[e._v("fireAndForget")]),e._v(", an "),o("code",[e._v("expectedResponseType")]),e._v(" must be supplied.\nIt is a "),o("code",[e._v("String.class")]),e._v(" by default.\nAn expression for this option can evaluate to a "),o("code",[e._v("ParameterizedTypeReference")]),e._v(".\nSee the "),o("code",[e._v("RSocketRequester.RetrieveSpec.retrieveMono()")]),e._v(" and "),o("code",[e._v("RSocketRequester.RetrieveSpec.retrieveFlux()")]),e._v(" JavaDocs for more information about reply data and its type.")]),e._v(" "),o("p",[e._v("A reply "),o("code",[e._v("payload")]),e._v(" from the "),o("code",[e._v("RSocketOutboundGateway")]),e._v(" is a "),o("code",[e._v("Mono")]),e._v(" (even for a "),o("code",[e._v("fireAndForget")]),e._v(" interaction model it is "),o("code",[e._v("Mono<Void>")]),e._v(") always making this component as "),o("code",[e._v("async")]),e._v(".\nSuch a "),o("code",[e._v("Mono")]),e._v(" is subscribed before producing into the "),o("code",[e._v("outputChannel")]),e._v(" for regular channels or processed on demand by the "),o("code",[e._v("FluxMessageChannel")]),e._v(".\nA "),o("code",[e._v("Flux")]),e._v(" response for the "),o("code",[e._v("requestStream")]),e._v(" or "),o("code",[e._v("requestChannel")]),e._v(" interaction model is also wrapped into a reply "),o("code",[e._v("Mono")]),e._v(".\nIt can be flattened downstream by the "),o("code",[e._v("FluxMessageChannel")]),e._v(" with a passthrough service activator:")]),e._v(" "),o("div",{staticClass:"language- extra-class"},[o("pre",{pre:!0,attrs:{class:"language-text"}},[o("code",[e._v('@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")\npublic Flux<?> flattenRSocketResponse(Flux<?> payload) {\n    return payload;\n}\n')])])]),o("p",[e._v("Or subscribed explicitly in the target application logic.")]),e._v(" "),o("p",[e._v("The expected response type can also be configured (or evaluated via expression) to "),o("code",[e._v("void")]),e._v(" treating this gateway as an outbound channel adapter.\nHowever the "),o("code",[e._v("outputChannel")]),e._v(" still has to be configured (even if it just a "),o("code",[e._v("NullChannel")]),e._v(") to initiate a subscription to the returned "),o("code",[e._v("Mono")]),e._v(".")]),e._v(" "),o("p",[e._v("See "),o("a",{attrs:{href:"#rsocket-java-config"}},[e._v("Configuring RSocket Endpoints with Java")]),e._v(" for samples how to configure an "),o("code",[e._v("RSocketOutboundGateway")]),e._v(" endpoint a deal with payloads downstream.")]),e._v(" "),o("h3",{attrs:{id:"rsocket-namespace-support"}},[o("a",{staticClass:"header-anchor",attrs:{href:"#rsocket-namespace-support"}},[e._v("#")]),e._v(" RSocket Namespace Support")]),e._v(" "),o("p",[e._v("Spring Integration provides an "),o("code",[e._v("rsocket")]),e._v(" namespace and the corresponding schema definition.\nTo include it in your configuration, add the following namespace declaration in your application context configuration file:")]),e._v(" "),o("div",{staticClass:"language- extra-class"},[o("pre",{pre:!0,attrs:{class:"language-text"}},[o("code",[e._v('<?xml version="1.0" encoding="UTF-8"?>\n<beans xmlns="http://www.springframework.org/schema/beans"\n  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"\n  xmlns:int="http://www.springframework.org/schema/integration"\n  xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"\n  xsi:schemaLocation="\n    http://www.springframework.org/schema/beans\n    https://www.springframework.org/schema/beans/spring-beans.xsd\n    http://www.springframework.org/schema/integration\n    https://www.springframework.org/schema/integration/spring-integration.xsd\n    http://www.springframework.org/schema/integration/rsocket\n    https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">\n    ...\n</beans>\n')])])]),o("h4",{attrs:{id:"inbound"}},[o("a",{staticClass:"header-anchor",attrs:{href:"#inbound"}},[e._v("#")]),e._v(" Inbound")]),e._v(" "),o("p",[e._v("To configure Spring Integration RSocket inbound channel adapters with XML, you need to use an appropriate "),o("code",[e._v("inbound-gateway")]),e._v(" components from the "),o("code",[e._v("int-rsocket")]),e._v(" namespace.\nThe following example shows how to configure it:")]),e._v(" "),o("div",{staticClass:"language- extra-class"},[o("pre",{pre:!0,attrs:{class:"language-text"}},[o("code",[e._v('<int-rsocket:inbound-gateway id="inboundGateway"\n                             path="testPath"\n                             interaction-models="requestStream,requestChannel"\n                             rsocket-connector="clientRSocketConnector"\n                             request-channel="requestChannel"\n                             rsocket-strategies="rsocketStrategies"\n                             request-element-type="byte[]"/>\n')])])]),o("p",[e._v("A "),o("code",[e._v("ClientRSocketConnector")]),e._v(" and "),o("code",[e._v("ServerRSocketConnector")]),e._v(" should be configured as generic "),o("code",[e._v("<bean>")]),e._v(" definitions.")]),e._v(" "),o("h4",{attrs:{id:"outbound"}},[o("a",{staticClass:"header-anchor",attrs:{href:"#outbound"}},[e._v("#")]),e._v(" Outbound")]),e._v(" "),o("div",{staticClass:"language- extra-class"},[o("pre",{pre:!0,attrs:{class:"language-text"}},[o("code",[e._v('<int-rsocket:outbound-gateway id="outboundGateway"\n                              client-rsocket-connector="clientRSocketConnector"\n                              auto-startup="false"\n                              interaction-model="fireAndForget"\n                              route-expression="\'testRoute\'"\n                              request-channel="requestChannel"\n                              publisher-element-type="byte[]"\n                              expected-response-type="java.util.Date"\n                              metadata-expression="{\'metadata\': new org.springframework.util.MimeType(\'*\')}"/>\n')])])]),o("p",[e._v("See "),o("code",[e._v("spring-integration-rsocket.xsd")]),e._v(" for description for all those XML attributes.")]),e._v(" "),o("h3",{attrs:{id:"configuring-rsocket-endpoints-with-java"}},[o("a",{staticClass:"header-anchor",attrs:{href:"#configuring-rsocket-endpoints-with-java"}},[e._v("#")]),e._v(" Configuring RSocket Endpoints with Java")]),e._v(" "),o("p",[e._v("The following example shows how to configure an RSocket inbound endpoint with Java:")]),e._v(" "),o("div",{staticClass:"language- extra-class"},[o("pre",{pre:!0,attrs:{class:"language-text"}},[o("code",[e._v('@Bean\npublic RSocketInboundGateway rsocketInboundGatewayRequestReply() {\n    RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");\n    rsocketInboundGateway.setRequestChannelName("requestReplyChannel");\n    return rsocketInboundGateway;\n}\n\n@Transformer(inputChannel = "requestReplyChannel")\npublic Mono<String> echoTransformation(Flux<String> payload) {\n    return payload.next().map(String::toUpperCase);\n}\n')])])]),o("p",[e._v("A "),o("code",[e._v("ClientRSocketConnector")]),e._v(" or "),o("code",[e._v("ServerRSocketConnector")]),e._v(" is assumed in this configuration with meaning for auto-detection of such an endpoint on the “echo” path.\nPay attention to the "),o("code",[e._v("@Transformer")]),e._v(" signature with its fully reactive processing of the RSocket requests and producing reactive replies.")]),e._v(" "),o("p",[e._v("The following example shows how to configure a RSocket inbound gateway with the Java DSL:")]),e._v(" "),o("div",{staticClass:"language- extra-class"},[o("pre",{pre:!0,attrs:{class:"language-text"}},[o("code",[e._v('@Bean\npublic IntegrationFlow rsocketUpperCaseFlow() {\n    return IntegrationFlows\n        .from(RSockets.inboundGateway("/uppercase")\n                   .interactionModels(RSocketInteractionModel.requestChannel))\n        .<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))\n        .get();\n}\n')])])]),o("p",[e._v("A "),o("code",[e._v("ClientRSocketConnector")]),e._v(" or "),o("code",[e._v("ServerRSocketConnector")]),e._v(" is assumed in this configuration with meaning for auto-detection of such an endpoint on the “/uppercase” path and expected interaction model as “request channel”.")]),e._v(" "),o("p",[e._v("The following example shows how to configure a RSocket outbound gateway with Java:")]),e._v(" "),o("div",{staticClass:"language- extra-class"},[o("pre",{pre:!0,attrs:{class:"language-text"}},[o("code",[e._v('@Bean\n@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")\npublic RSocketOutboundGateway rsocketOutboundGateway() {\n    RSocketOutboundGateway rsocketOutboundGateway =\n            new RSocketOutboundGateway(\n                    new FunctionExpression<Message<?>>((m) ->\n                        m.getHeaders().get("route_header")));\n    rsocketOutboundGateway.setInteractionModelExpression(\n            new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));\n    rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());\n    return rsocketOutboundGateway;\n}\n')])])]),o("p",[e._v("The "),o("code",[e._v("setClientRSocketConnector()")]),e._v(" is required only for the client side.\nOn the server side, the "),o("code",[e._v("RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER")]),e._v(" header with an "),o("code",[e._v("RSocketRequester")]),e._v(" value must be supplied in the request message.")]),e._v(" "),o("p",[e._v("The following example shows how to configure a RSocket outbound gateway with the Java DSL:")]),e._v(" "),o("div",{staticClass:"language- extra-class"},[o("pre",{pre:!0,attrs:{class:"language-text"}},[o("code",[e._v('@Bean\npublic IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {\n    return IntegrationFlows\n        .from(Function.class)\n        .handle(RSockets.outboundGateway("/uppercase")\n            .interactionModel(RSocketInteractionModel.requestResponse)\n            .expectedResponseType(String.class)\n            .clientRSocketConnector(clientRSocketConnector))\n        .get();\n}\n')])])]),o("p",[e._v("See "),o("RouterLink",{attrs:{to:"/en/spring-integration/dsl.html#integration-flow-as-gateway"}},[o("code",[e._v("IntegrationFlow")]),e._v(" as a Gateway")]),e._v(" for more information how to use a mentioned "),o("code",[e._v("Function")]),e._v(" interface in the beginning of the flow above.")],1)])}),[],!1,null,null,null);t.default=a.exports}}]);