(window.webpackJsonp=window.webpackJsonp||[]).push([[504],{934:function(e,t,n){"use strict";n.r(t);var a=n(56),r=Object(a.a)({},(function(){var e=this,t=e.$createElement,n=e._self._c||t;return n("ContentSlotsDistributor",{attrs:{"slot-key":e.$parent.slotKey}},[n("h1",{attrs:{id:"webflux-支持"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#webflux-支持"}},[e._v("#")]),e._v(" WebFlux 支持")]),e._v(" "),n("h2",{attrs:{id:"webflux-支持-2"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#webflux-支持-2"}},[e._v("#")]),e._v(" WebFlux 支持")]),e._v(" "),n("p",[e._v("WebFlux Spring 集成模块("),n("code",[e._v("spring-integration-webflux")]),e._v(")允许以反应的方式执行 HTTP 请求和处理入站 HTTP 请求。")]),e._v(" "),n("p",[e._v("你需要在项目中包含此依赖项:")]),e._v(" "),n("p",[e._v("Maven")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v("\n org.springframework.integration\n spring-integration-webflux\n 5.5.9\n\n")])])]),n("p",[e._v("Gradle")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('compile "org.springframework.integration:spring-integration-webflux:5.5.9"\n')])])]),n("p",[e._v("在非基于 Servlet 的服务器配置的情况下,必须包含"),n("code",[e._v("io.projectreactor.netty:reactor-netty")]),e._v("依赖项。")]),e._v(" "),n("p",[e._v("WebFlux 支持由以下网关实现组成:"),n("code",[e._v("WebFluxInboundEndpoint")]),e._v("和"),n("code",[e._v("WebFluxRequestExecutingMessageHandler")]),e._v("。该支持完全基于 Spring "),n("a",{attrs:{href:"https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#spring-webflux",target:"_blank",rel:"noopener noreferrer"}},[e._v("WebFlux"),n("OutboundLink")],1),e._v("和"),n("a",{attrs:{href:"https://projectreactor.io/",target:"_blank",rel:"noopener noreferrer"}},[e._v("项目反应堆"),n("OutboundLink")],1),e._v("的基础。有关更多信息,请参见"),n("RouterLink",{attrs:{to:"/spring-integration/http.html#http"}},[e._v("HTTP 支持")]),e._v(",因为许多选项是在反应性和常规 HTTP 组件之间共享的。")],1),e._v(" "),n("h3",{attrs:{id:"webflux-名称空间支持"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#webflux-名称空间支持"}},[e._v("#")]),e._v(" WebFlux 名称空间支持")]),e._v(" "),n("p",[e._v("Spring 集成提供了"),n("code",[e._v("webflux")]),e._v("名称空间和相应的模式定义。要将其包含在配置中,请在应用程序上下文配置文件中添加以下名称空间声明:")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('\n\n ...\n\n')])])]),n("h3",{attrs:{id:"webflux-入站组件"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#webflux-入站组件"}},[e._v("#")]),e._v(" WebFlux 入站组件")]),e._v(" "),n("p",[e._v("从版本 5.0 开始,提供了"),n("code",[e._v("WebFluxInboundEndpoint")]),e._v("的"),n("code",[e._v("WebHandler")]),e._v("实现。这个组件类似于基于 MVC 的"),n("code",[e._v("HttpRequestHandlingEndpointSupport")]),e._v(",它通过新提取的"),n("code",[e._v("BaseHttpInboundEndpoint")]),e._v("与该组件共享一些公共选项。它被用于 Spring WebFlux 反应环境(而不是 MVC)。下面的示例展示了 WebFlux 端点的一个简单实现:")]),e._v(" "),n("p",[e._v("爪哇 DSL")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\npublic IntegrationFlow inboundChannelAdapterFlow() {\n return IntegrationFlows\n .from(WebFlux.inboundChannelAdapter("/reactivePost")\n .requestMapping(m -> m.methods(HttpMethod.POST))\n .requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))\n .statusCodeFunction(m -> HttpStatus.ACCEPTED))\n .channel(c -> c.queue("storeChannel"))\n .get();\n}\n')])])]),n("p",[e._v("Kotlin DSL")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\nfun inboundChannelAdapterFlow() =\n integrationFlow(\n WebFlux.inboundChannelAdapter("/reactivePost")\n .apply {\n requestMapping { m -> m.methods(HttpMethod.POST) }\n requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))\n statusCodeFunction { m -> HttpStatus.ACCEPTED }\n })\n {\n channel { queue("storeChannel") }\n }\n')])])]),n("p",[e._v("爪哇")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Configuration\n@EnableWebFlux\n@EnableIntegration\npublic class ReactiveHttpConfiguration {\n\n @Bean\n public WebFluxInboundEndpoint simpleInboundEndpoint() {\n WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();\n RequestMapping requestMapping = new RequestMapping();\n requestMapping.setPathPatterns("/test");\n endpoint.setRequestMapping(requestMapping);\n endpoint.setRequestChannelName("serviceChannel");\n return endpoint;\n }\n\n @ServiceActivator(inputChannel = "serviceChannel")\n String service() {\n return "It works!";\n }\n\n}\n')])])]),n("p",[e._v("XML")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('\n \n\n')])])]),n("p",[e._v("该配置类似于"),n("code",[e._v("HttpRequestHandlingEndpointSupport")]),e._v("(在示例之前提到),只是我们使用"),n("code",[e._v("@EnableWebFlux")]),e._v("将 WebFlux 基础架构添加到我们的集成应用程序中。此外,"),n("code",[e._v("WebFluxInboundEndpoint")]),e._v("通过使用反作用 HTTP 服务器实现提供的基于按需的能力,对下游流执行"),n("code",[e._v("sendAndReceive")]),e._v("操作。")]),e._v(" "),n("table",[n("thead",[n("tr",[n("th"),e._v(" "),n("th",[e._v("应答部分也是非阻塞的,并且基于内部"),n("code",[e._v("FutureReplyChannel")]),e._v(",该内部"),n("code",[e._v("Mono")]),e._v("被平映到一个响应"),n("code",[e._v("Mono")]),e._v("以进行按需解析。")])])]),e._v(" "),n("tbody")]),e._v(" "),n("p",[e._v("你可以使用自定义"),n("code",[e._v("ServerCodecConfigurer")]),e._v("、"),n("code",[e._v("RequestedContentTypeResolver")]),e._v("甚至"),n("code",[e._v("ReactiveAdapterRegistry")]),e._v("来配置"),n("code",[e._v("WebFluxInboundEndpoint")]),e._v("。后者提供了一种机制,你可以使用该机制以任何反应类型返回回复:reactor"),n("code",[e._v("Flux")]),e._v(",rxjava"),n("code",[e._v("Observable")]),e._v(","),n("code",[e._v("Flowable")]),e._v(",以及其他类型。通过这种方式,我们可以使用 Spring 集成组件实现"),n("a",{attrs:{href:"https://en.wikipedia.org/wiki/Server-sent_events",target:"_blank",rel:"noopener noreferrer"}},[e._v("服务器发送事件"),n("OutboundLink")],1),e._v("场景,如下例所示:")]),e._v(" "),n("p",[e._v("爪哇 DSL")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\npublic IntegrationFlow sseFlow() {\n return IntegrationFlows\n .from(WebFlux.inboundGateway("/sse")\n .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))\n .handle((p, h) -> Flux.just("foo", "bar", "baz"))\n .get();\n}\n')])])]),n("p",[e._v("Kotlin DSL")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\nfun sseFlow() =\n integrationFlow(\n WebFlux.inboundGateway("/sse")\n .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))\n {\n handle { (p, h) -> Flux.just("foo", "bar", "baz") }\n }\n')])])]),n("p",[e._v("Java")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\npublic WebFluxInboundEndpoint webfluxInboundGateway() {\n WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();\n RequestMapping requestMapping = new RequestMapping();\n requestMapping.setPathPatterns("/sse");\n requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);\n endpoint.setRequestMapping(requestMapping);\n endpoint.setRequestChannelName("requests");\n return endpoint;\n}\n')])])]),n("p",[e._v("XML")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('\n \n \n \n\n')])])]),n("p",[e._v("有关更多可能的配置选项,请参见"),n("RouterLink",{attrs:{to:"/spring-integration/http.html#http-request-mapping"}},[e._v("请求映射支持")]),e._v("和"),n("RouterLink",{attrs:{to:"/spring-integration/http.html#http-cors"}},[e._v("跨源资源共享支持")]),e._v("。")],1),e._v(" "),n("p",[e._v("当请求主体为空或"),n("code",[e._v("payloadExpression")]),e._v("返回"),n("code",[e._v("null")]),e._v("时,请求参数("),n("code",[e._v("MultiValueMap")]),e._v(")用于处理目标消息的"),n("code",[e._v("payload")]),e._v("。")]),e._v(" "),n("h4",{attrs:{id:"有效载荷验证"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#有效载荷验证"}},[e._v("#")]),e._v(" 有效载荷验证")]),e._v(" "),n("p",[e._v("从版本 5.2 开始,"),n("code",[e._v("WebFluxInboundEndpoint")]),e._v("可以配置为"),n("code",[e._v("Validator")]),e._v("。与"),n("RouterLink",{attrs:{to:"/spring-integration/http.html#http-validation"}},[e._v("HTTP 支持")]),e._v("中的 MVC 验证不同,它用于验证"),n("code",[e._v("Publisher")]),e._v("中的元素,在执行回退和"),n("code",[e._v("payloadExpression")]),e._v("函数之前,"),n("code",[e._v("Publisher")]),e._v("已将请求转换为该元素。该框架不能假设在构建最终有效负载之后"),n("code",[e._v("Publisher")]),e._v("对象的复杂程度。如果需要限制最终有效负载(或其"),n("code",[e._v("Publisher")]),e._v("元素)的验证可见性,则验证应该向下游进行,而不是向 WebFlux 端点进行。有关更多信息,请参见 Spring WebFlux"),n("a",{attrs:{href:"https://docs.spring.io/spring/docs/5.1.8.RELEASE/spring-framework-reference/web-reactive.html#webflux-fn-handler-validation",target:"_blank",rel:"noopener noreferrer"}},[e._v("文件"),n("OutboundLink")],1),e._v("。使用"),n("code",[e._v("IntegrationWebExchangeBindException")]),e._v("(一个"),n("code",[e._v("WebExchangeBindException")]),e._v("扩展)拒绝无效的负载,该扩展包含所有验证"),n("code",[e._v("Errors")]),e._v("。请参阅 Spring Framework"),n("a",{attrs:{href:"https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#validation",target:"_blank",rel:"noopener noreferrer"}},[e._v("参考手册"),n("OutboundLink")],1),e._v("中有关验证的更多内容。")],1),e._v(" "),n("h3",{attrs:{id:"webflux-出站组件"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#webflux-出站组件"}},[e._v("#")]),e._v(" WebFlux 出站组件")]),e._v(" "),n("p",[n("code",[e._v("WebFluxRequestExecutingMessageHandler")]),e._v("(从版本 5.0 开始)实现类似于"),n("code",[e._v("HttpRequestExecutingMessageHandler")]),e._v("。它使用了 Spring Framework WebFlux 模块中的"),n("code",[e._v("WebClient")]),e._v("。要对其进行配置,请定义一个类似于以下内容的 Bean:")]),e._v(" "),n("p",[e._v("Java DSL")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\npublic IntegrationFlow outboundReactive() {\n return f -> f\n .handle(WebFlux.>outboundGateway(m ->\n UriComponentsBuilder.fromUriString("http://localhost:8080/foo")\n .queryParams(m.getPayload())\n .build()\n .toUri())\n .httpMethod(HttpMethod.GET)\n .expectedResponseType(String.class));\n}\n')])])]),n("p",[e._v("Kotlin DSL")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\nfun outboundReactive() =\n integrationFlow {\n handle(\n WebFlux.outboundGateway>({ m ->\n UriComponentsBuilder.fromUriString("http://localhost:8080/foo")\n .queryParams(m.getPayload())\n .build()\n .toUri()\n })\n .httpMethod(HttpMethod.GET)\n .expectedResponseType(String::class.java)\n )\n }\n')])])]),n("p",[e._v("Java")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@ServiceActivator(inputChannel = "reactiveHttpOutRequest")\n@Bean\npublic WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {\n WebFluxRequestExecutingMessageHandler handler =\n new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);\n handler.setHttpMethod(HttpMethod.POST);\n handler.setExpectedResponseType(String.class);\n return handler;\n}\n')])])]),n("p",[e._v("XML")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('\n\n\n')])])]),n("p",[n("code",[e._v("WebClient``exchange()")]),e._v("操作返回一个"),n("code",[e._v("Mono")]),e._v(",将其映射(通过使用几个"),n("code",[e._v("Mono.map()")]),e._v("步骤)到一个"),n("code",[e._v("AbstractIntegrationMessageBuilder")]),e._v("作为"),n("code",[e._v("WebFluxRequestExecutingMessageHandler")]),e._v("的输出。连同"),n("code",[e._v("ReactiveChannel")]),e._v("作为"),n("code",[e._v("outputChannel")]),e._v(","),n("code",[e._v("Mono")]),e._v("评估将被推迟,直到进行下游订阅。否则,它将被视为"),n("code",[e._v("async")]),e._v("模式,并且对于来自"),n("code",[e._v("WebFluxRequestExecutingMessageHandler")]),e._v("的异步回复,将"),n("code",[e._v("Mono")]),e._v("响应调整为"),n("code",[e._v("SettableListenableFuture")]),e._v("。输出消息的目标负载取决于"),n("code",[e._v("WebFluxRequestExecutingMessageHandler")]),e._v("配置。"),n("code",[e._v("setExpectedResponseType(Class)")]),e._v("或"),n("code",[e._v("setExpectedResponseTypeExpression(Expression)")]),e._v("标识响应主体元素转换的目标类型。如果"),n("code",[e._v("replyPayloadToFlux")]),e._v("被设置为"),n("code",[e._v("true")]),e._v(",则响应主体将转换为"),n("code",[e._v("Flux")]),e._v(",并为每个元素提供"),n("code",[e._v("expectedResponseType")]),e._v(",然后将"),n("code",[e._v("Flux")]),e._v("作为下游的有效负载发送。之后,你可以使用"),n("RouterLink",{attrs:{to:"/spring-integration/splitter.html#splitter"}},[e._v("splitter")]),e._v("以反应式的方式迭代这个"),n("code",[e._v("Flux")]),e._v("。")],1),e._v(" "),n("p",[e._v("此外,可以将"),n("code",[e._v("BodyExtractor")]),e._v("注入到"),n("code",[e._v("WebFluxRequestExecutingMessageHandler")]),e._v("而不是"),n("code",[e._v("expectedResponseType")]),e._v("和"),n("code",[e._v("replyPayloadToFlux")]),e._v("属性中。它可以用于对"),n("code",[e._v("ClientHttpResponse")]),e._v("的低级访问以及对 body 和 HTTP 头转换的更多控制。 Spring 集成提供了"),n("code",[e._v("ClientHttpResponseBodyExtractor")]),e._v("作为标识函数来产生(下游)整个"),n("code",[e._v("ClientHttpResponse")]),e._v("和任何其他可能的自定义逻辑。")]),e._v(" "),n("p",[e._v("从版本 5.2 开始,"),n("code",[e._v("WebFluxRequestExecutingMessageHandler")]),e._v("支持反应式"),n("code",[e._v("Publisher")]),e._v("、"),n("code",[e._v("Resource")]),e._v("和"),n("code",[e._v("MultiValueMap")]),e._v("类型作为请求消息有效负载。内部使用相应的"),n("code",[e._v("BodyInserter")]),e._v("来将其填充到"),n("code",[e._v("WebClient.RequestBodySpec")]),e._v("中。当有效负载是反应性的"),n("code",[e._v("Publisher")]),e._v("时,可以使用配置的"),n("code",[e._v("publisherElementType")]),e._v("或"),n("code",[e._v("publisherElementTypeExpression")]),e._v("来确定发布者元素类型的类型。表达式必须解析为"),n("code",[e._v("Class")]),e._v(","),n("code",[e._v("String")]),e._v(",这将解析为目标"),n("code",[e._v("Class")]),e._v("或"),n("code",[e._v("ParameterizedTypeReference")]),e._v("。")]),e._v(" "),n("p",[e._v("从版本 5.5 开始,"),n("code",[e._v("WebFluxRequestExecutingMessageHandler")]),e._v("公开了一个"),n("code",[e._v("extractResponseBody")]),e._v("标志(默认情况下是"),n("code",[e._v("true")]),e._v("),以仅返回响应主体,或返回整个"),n("code",[e._v("ResponseEntity")]),e._v("作为回复消息有效负载,而不依赖于所提供的"),n("code",[e._v("expectedResponseType")]),e._v("或"),n("code",[e._v("replyPayloadToFlux")]),e._v("。如果"),n("code",[e._v("ResponseEntity")]),e._v("中不存在主体,则忽略此标志,并返回整个"),n("code",[e._v("ResponseEntity")]),e._v("。")]),e._v(" "),n("p",[e._v("有关更多可能的配置选项,请参见"),n("RouterLink",{attrs:{to:"/spring-integration/http.html#http-outbound"}},[e._v("HTTP 出站组件")]),e._v("。")],1),e._v(" "),n("h3",{attrs:{id:"webflux-标头映射"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#webflux-标头映射"}},[e._v("#")]),e._v(" WebFlux 标头映射")]),e._v(" "),n("p",[e._v("由于 WebFlux 组件完全基于 HTTP 协议,因此在 HTTP 头映射中没有区别。参见"),n("RouterLink",{attrs:{to:"/spring-integration/http.html#http-header-mapping"}},[e._v("HTTP 头映射")]),e._v(",以获得更多用于映射标头的可能选项和组件。")],1)])}),[],!1,null,null,null);t.default=r.exports}}]);