(window.webpackJsonp=window.webpackJsonp||[]).push([[509],{939:function(e,o,r){"use strict";r.r(o);var t=r(56),a=Object(t.a)({},(function(){var e=this,o=e.$createElement,r=e._self._c||o;return r("ContentSlotsDistributor",{attrs:{"slot-key":e.$parent.slotKey}},[r("h1",{attrs:{id:"zeromq-支持"}},[r("a",{staticClass:"header-anchor",attrs:{href:"#zeromq-支持"}},[e._v("#")]),e._v(" ZeroMQ 支持")]),e._v(" "),r("h2",{attrs:{id:"zeromq-支持-2"}},[r("a",{staticClass:"header-anchor",attrs:{href:"#zeromq-支持-2"}},[e._v("#")]),e._v(" zeromq 支持")]),e._v(" "),r("p",[e._v("Spring 集成提供组件以支持"),r("a",{attrs:{href:"https://zeromq.org/",target:"_blank",rel:"noopener noreferrer"}},[e._v("ZeroMQ"),r("OutboundLink")],1),e._v("在应用程序中的通信。该实现基于"),r("a",{attrs:{href:"https://github.com/zeromq/jeromq",target:"_blank",rel:"noopener noreferrer"}},[e._v("JeroMQ"),r("OutboundLink")],1),e._v("库中得到良好支持的 Java API。所有组件都封装了 ZeroMQ 套接字生命周期,并在内部为它们管理线程,从而与这些组件进行无锁和线程安全的交互。")]),e._v(" "),r("p",[e._v("你需要在项目中包含此依赖项:")]),e._v(" "),r("p",[e._v("Maven")]),e._v(" "),r("div",{staticClass:"language- extra-class"},[r("pre",{pre:!0,attrs:{class:"language-text"}},[r("code",[e._v("\n org.springframework.integration\n spring-integration-zeromq\n 5.5.9\n\n")])])]),r("p",[e._v("Gradle")]),e._v(" "),r("div",{staticClass:"language- extra-class"},[r("pre",{pre:!0,attrs:{class:"language-text"}},[r("code",[e._v('compile "org.springframework.integration:spring-integration-zeromq:5.5.9"\n')])])]),r("h3",{attrs:{id:"zeromq-代理"}},[r("a",{staticClass:"header-anchor",attrs:{href:"#zeromq-代理"}},[e._v("#")]),e._v(" Zeromq 代理")]),e._v(" "),r("p",[r("code",[e._v("ZeroMqProxy")]),e._v("是内置"),r("code",[e._v("ZMQ.proxy()")]),r("a",{attrs:{href:"https://zguide.zeromq.org/page:chapter2#toc15",target:"_blank",rel:"noopener noreferrer"}},[e._v("function"),r("OutboundLink")],1),e._v("的 Spring 友好的包装器。它封装了套接字生命周期和线程管理。该代理的客户端仍然可以使用标准的 Zeromq 套接字连接和交互 API。除了标准"),r("code",[e._v("ZContext")]),e._v("之外,它还需要一种著名的 ZeroMQ 代理模式:sub/pub、pull/push 或 router/dealer。这样,代理的前端和后端将使用一对适当的 ZeroMQ 套接字类型。详见"),r("code",[e._v("ZeroMqProxy.Type")]),e._v("。")]),e._v(" "),r("p",[r("code",[e._v("ZeroMqProxy")]),e._v("实现"),r("code",[e._v("SmartLifecycle")]),e._v("来创建、绑定和配置套接字,并在专用线程中从"),r("code",[e._v("Executor")]),e._v("(如果有的话)启动"),r("code",[e._v("ZMQ.proxy()")]),e._v("。前端和后端套接字的绑定是通过"),r("code",[e._v("tcp://")]),e._v("协议在所有可用的网络接口和提供的端口上完成的。否则,它们被绑定到随机端口,这可以在以后通过相应的"),r("code",[e._v("getFrontendPort()")]),e._v("和"),r("code",[e._v("getBackendPort()")]),e._v("API 方法获得。")]),e._v(" "),r("p",[e._v("控制套接字作为"),r("code",[e._v("SocketType.PAIR")]),e._v("公开,并在"),r("code",[e._v('"inproc://" + beanName + ".control"')]),e._v("地址上具有线程间传输;可以通过"),r("code",[e._v("getControlAddress()")]),e._v("获得。它应该与来自另一个"),r("code",[e._v("SocketType.PAIR")]),e._v("套接字的相同应用程序一起使用,以发送"),r("code",[e._v("ZMQ.PROXY_TERMINATE")]),e._v("、"),r("code",[e._v("ZMQ.PROXY_PAUSE")]),e._v("和/或"),r("code",[e._v("ZMQ.PROXY_RESUME")]),e._v("命令。当"),r("code",[e._v("ZeroMqProxy")]),e._v("的生命周期调用"),r("code",[e._v("stop()")]),e._v("时,"),r("code",[e._v("ZMQ.PROXY_TERMINATE")]),e._v("执行一个"),r("code",[e._v("ZMQ.PROXY_TERMINATE")]),e._v("命令,以终止"),r("code",[e._v("ZMQ.proxy()")]),e._v("循环并优雅地关闭所有绑定的套接字。")]),e._v(" "),r("p",[r("code",[e._v("setExposeCaptureSocket(boolean)")]),e._v("选项将使该组件与"),r("code",[e._v("SocketType.PUB")]),e._v("绑定一个额外的线程间套接字,以捕获并发布前端和后端套接字之间的所有通信,因为它使用"),r("code",[e._v("ZMQ.proxy()")]),e._v("实现。这个套接字绑定到"),r("code",[e._v('"inproc://" + beanName + ".capture"')]),e._v("地址,并且不需要任何特定的订阅来进行过滤。")]),e._v(" "),r("p",[e._v("前端和后端套接字可以使用其他属性进行定制,例如读/写超时或安全性。这种定制可分别通过"),r("code",[e._v("setFrontendSocketConfigurer(Consumer)")]),e._v("和"),r("code",[e._v("setBackendSocketConfigurer(Consumer)")]),e._v("回调进行。")]),e._v(" "),r("p",[r("code",[e._v("ZeroMqProxy")]),e._v("可以像这样简单地提供:")]),e._v(" "),r("div",{staticClass:"language- extra-class"},[r("pre",{pre:!0,attrs:{class:"language-text"}},[r("code",[e._v("@Bean\nZeroMqProxy zeroMqProxy() {\n ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);\n proxy.setExposeCaptureSocket(true);\n proxy.setFrontendPort(6001);\n proxy.setBackendPort(6002);\n return proxy;\n}\n")])])]),r("p",[e._v("所有的客户端节点都应该通过"),r("code",[e._v("tcp://")]),e._v("连接到这个代理的主机,并使用各自感兴趣的端口。")]),e._v(" "),r("h3",{attrs:{id:"zeromq-消息通道"}},[r("a",{staticClass:"header-anchor",attrs:{href:"#zeromq-消息通道"}},[e._v("#")]),e._v(" zeromq 消息通道")]),e._v(" "),r("p",[r("code",[e._v("ZeroMqChannel")]),e._v("是一个"),r("code",[e._v("SubscribableChannel")]),e._v(",它使用一对 Zeromq 套接字来连接发布者和订阅者以进行消息交互。它可以在 pub/sub 模式下工作(默认为 push/pull);它也可以用作本地线程间通道(使用"),r("code",[e._v("PAIR")]),e._v("套接字)-在这种情况下不提供"),r("code",[e._v("connectUrl")]),e._v("。在分布式模式下,它必须连接到外部管理的 ZeroMQ 代理,在该代理中,它可以与连接到同一代理的其他类似通道交换消息。Connect URL 选项是一个标准的 ZeroMQ 连接字符串,其中包含协议和主机,以及一对用于 ZeroMQ 代理的前端和后端套接字的冒号端口。为了方便起见,如果通道在与代理相同的应用程序中配置,则可以提供"),r("code",[e._v("ZeroMqProxy")]),e._v("实例,而不是连接字符串。")]),e._v(" "),r("p",[e._v("发送和接收套接字都在自己的专用线程中进行管理,这使得该通道易于并发。通过这种方式,我们可以在不进行同步的情况下,从"),r("code",[e._v("ZeroMqChannel")]),e._v("从不同的线程发布和使用"),r("code",[e._v("ZeroMqChannel")]),e._v("。")]),e._v(" "),r("p",[e._v("默认情况下,"),r("code",[e._v("ZeroMqChannel")]),e._v("使用"),r("code",[e._v("EmbeddedJsonHeadersMessageMapper")]),e._v("使用 JacksonJSON 处理器将"),r("code",[e._v("Message")]),e._v("(包括头)从/到"),r("code",[e._v("byte[]")]),e._v("序列化。这个逻辑可以通过"),r("code",[e._v("setMessageMapper(BytesMessageMapper)")]),e._v("进行配置。")]),e._v(" "),r("p",[e._v("发送和接收套接字可以通过相应的"),r("code",[e._v("setSendSocketConfigurer(Consumer)")]),e._v("和"),r("code",[e._v("setSubscribeSocketConfigurer(Consumer)")]),e._v("回调为任何选项(读/写超时,安全性等)定制。")]),e._v(" "),r("p",[e._v("该"),r("code",[e._v("ZeroMqChannel")]),e._v("的内部逻辑是基于经由工程反应器的反应流"),r("code",[e._v("Flux")]),e._v("和"),r("code",[e._v("Mono")]),e._v("的操作符。这提供了更容易的线程控制,并允许无锁的并发发布和从通道到/从通道消费。本地发布/订阅逻辑被实现为一个"),r("code",[e._v("Flux.publish()")]),e._v("操作符,以允许该信道的所有本地订阅者接收相同的发布消息,作为"),r("code",[e._v("PUB")]),e._v("套接字的分布式订阅者。")]),e._v(" "),r("p",[e._v("以下是"),r("code",[e._v("ZeroMqChannel")]),e._v("配置的一个简单示例:")]),e._v(" "),r("div",{staticClass:"language- extra-class"},[r("pre",{pre:!0,attrs:{class:"language-text"}},[r("code",[e._v('@Bean\nZeroMqChannel zeroMqPubSubChannel(ZContext context) {\n ZeroMqChannel channel = new ZeroMqChannel(context, true);\n channel.setConnectUrl("tcp://localhost:6001:6002");\n channel.setConsumeDelay(Duration.ofMillis(100));\n return channel;\n}\n')])])]),r("h3",{attrs:{id:"zeromq-入站通道适配器"}},[r("a",{staticClass:"header-anchor",attrs:{href:"#zeromq-入站通道适配器"}},[e._v("#")]),e._v(" zeromq 入站通道适配器")]),e._v(" "),r("p",[r("code",[e._v("ZeroMqMessageProducer")]),e._v("是一个具有反应语义的"),r("code",[e._v("MessageProducerSupport")]),e._v("实现。它以非阻塞的方式不断地从 ZeroMQ 套接字读取数据,并将消息发布到由"),r("code",[e._v("Flux")]),e._v("订阅的无限"),r("code",[e._v("FluxMessageChannel")]),e._v(",或者在"),r("code",[e._v("start()")]),e._v("方法中显式地订阅的"),r("code",[e._v("Flux")]),e._v(",如果输出通道不是反应性的。当套接字上没有接收到数据时,将在下一次读取尝试之前应用"),r("code",[e._v("consumeDelay")]),e._v("(默认值为 1 秒)。")]),e._v(" "),r("p",[e._v("只有"),r("code",[e._v("SocketType.PAIR")]),e._v("、"),r("code",[e._v("SocketType.PULL")]),e._v("和"),r("code",[e._v("SocketType.SUB")]),e._v("是由"),r("code",[e._v("ZeroMqMessageProducer")]),e._v("支持的。该组件可以连接到远程套接字,也可以通过提供的端口或随机端口绑定到 TCP 协议。在启动此组件并绑定 ZeroMQ 套接字之后,可以通过"),r("code",[e._v("getBoundPort()")]),e._v("获得实际的端口。可以通过"),r("code",[e._v("setSocketConfigurer(Consumer socketConfigurer)")]),e._v("回调配置套接字选项(例如,安全性或写超时)。")]),e._v(" "),r("p",[e._v("如果"),r("code",[e._v("receiveRaw")]),e._v("选项被设置为"),r("code",[e._v("true")]),e._v(",则从套接字中消费的"),r("code",[e._v("ZMsg")]),e._v("将在产生的"),r("code",[e._v("Message")]),e._v("的有效负载中按原样发送:由下游流来解析和转换"),r("code",[e._v("ZMsg")]),e._v("。否则,将使用"),r("code",[e._v("InboundMessageMapper")]),e._v("将所消耗的数据转换为"),r("code",[e._v("Message")]),e._v("。如果接收到的"),r("code",[e._v("ZMsg")]),e._v("是多帧的,则将第一个帧作为"),r("code",[e._v("ZeroMqHeaders.TOPIC")]),e._v("头来处理此 zeromq 消息已发布到。")]),e._v(" "),r("p",[e._v("对于"),r("code",[e._v("SocketType.SUB")]),e._v(","),r("code",[e._v("ZeroMqMessageProducer")]),e._v("使用所提供的"),r("code",[e._v("topics")]),e._v("选项进行订阅;默认情况下订阅所有内容。订阅可以在运行时使用"),r("code",[e._v("subscribeToTopics()")]),e._v("和"),r("code",[e._v("unsubscribeFromTopics()``@ManagedOperation")]),e._v("s 进行调整。")]),e._v(" "),r("p",[e._v("下面是"),r("code",[e._v("ZeroMqMessageProducer")]),e._v("配置的示例:")]),e._v(" "),r("div",{staticClass:"language- extra-class"},[r("pre",{pre:!0,attrs:{class:"language-text"}},[r("code",[e._v('@Bean\nZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {\n ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);\n messageProducer.setOutputChannel(outputChannel);\n messageProducer.setTopics("some");\n messageProducer.setReceiveRaw(true);\n messageProducer.setBindPort(7070);\n messageProducer.setConsumeDelay(Duration.ofMillis(100));\n return messageProducer;\n}\n')])])]),r("h3",{attrs:{id:"zeromq-出站通道适配器"}},[r("a",{staticClass:"header-anchor",attrs:{href:"#zeromq-出站通道适配器"}},[e._v("#")]),e._v(" zeromq 出站通道适配器")]),e._v(" "),r("p",[r("code",[e._v("ZeroMqMessageHandler")]),e._v("是一个"),r("code",[e._v("ReactiveMessageHandler")]),e._v("实现,用于将发布消息生成到 Zeromq 套接字中。只支持"),r("code",[e._v("SocketType.PAIR")]),e._v("、"),r("code",[e._v("SocketType.PUSH")]),e._v("和"),r("code",[e._v("SocketType.PUB")]),e._v("。"),r("code",[e._v("ZeroMqMessageHandler")]),e._v("只支持连接 zeromq 套接字;不支持绑定。当使用"),r("code",[e._v("SocketType.PUB")]),e._v("时,将根据请求消息对"),r("code",[e._v("topicExpression")]),e._v("进行求值,以便在不为空的情况下将主题框架注入到 ZeroMQ 消息中。订阅方方("),r("code",[e._v("SocketType.SUB")]),e._v(")在解析实际数据之前必须首先接收主题帧。当请求消息的有效负载是"),r("code",[e._v("ZMsg")]),e._v("时,不执行转换或主题提取:"),r("code",[e._v("ZMsg")]),e._v("按原样发送到套接字中,并且不会为可能的进一步重用而销毁它。否则,将使用"),r("code",[e._v("OutboundMessageMapper")]),e._v("将请求消息(或仅其有效负载)转换为要发布的 zeromq 帧。默认情况下,"),r("code",[e._v("ConvertingBytesMessageMapper")]),e._v("是与"),r("code",[e._v("ConfigurableCompositeMessageConverter")]),e._v("一起使用的。可以通过"),r("code",[e._v("setSocketConfigurer(Consumer socketConfigurer)")]),e._v("回调配置套接字选项(例如,安全性或写超时)。")]),e._v(" "),r("p",[e._v("下面是"),r("code",[e._v("ZeroMqMessageHandler")]),e._v("配置的示例:")]),e._v(" "),r("div",{staticClass:"language- extra-class"},[r("pre",{pre:!0,attrs:{class:"language-text"}},[r("code",[e._v('@Bean\n@ServiceActivator(inputChannel = "zeroMqPublisherChannel")\nZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {\n ZeroMqMessageHandler messageHandler =\n new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);\n messageHandler.setTopicExpression(\n new FunctionExpression>((message) -> message.getHeaders().get("topic")));\n messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());\n}\n')])])]),r("h3",{attrs:{id:"zeromq-java-dsl-支持"}},[r("a",{staticClass:"header-anchor",attrs:{href:"#zeromq-java-dsl-支持"}},[e._v("#")]),e._v(" Zeromq Java DSL 支持")]),e._v(" "),r("p",[r("code",[e._v("spring-integration-zeromq")]),e._v("通过"),r("code",[e._v("ZeroMq")]),e._v("工厂和"),r("code",[e._v("IntegrationComponentSpec")]),e._v("实现,为上面提到的组件提供了一个方便的 Java DSL Fluent API。")]),e._v(" "),r("p",[e._v("这是"),r("code",[e._v("ZeroMqChannel")]),e._v("的 Java DSL 示例:")]),e._v(" "),r("div",{staticClass:"language- extra-class"},[r("pre",{pre:!0,attrs:{class:"language-text"}},[r("code",[e._v('.channel(ZeroMq.zeroMqChannel(this.context)\n .connectUrl("tcp://localhost:6001:6002")\n .consumeDelay(Duration.ofMillis(100)))\n}\n')])])]),r("p",[e._v("Zeromq Java DSL 的入站通道适配器为:")]),e._v(" "),r("div",{staticClass:"language- extra-class"},[r("pre",{pre:!0,attrs:{class:"language-text"}},[r("code",[e._v('IntegrationFlows.from(\n ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)\n .connectUrl("tcp://localhost:9000")\n .topics("someTopic")\n .receiveRaw(true)\n .consumeDelay(Duration.ofMillis(100)))\n}\n')])])]),r("p",[e._v("Zeromq Java DSL 的出站通道适配器为:")]),e._v(" "),r("div",{staticClass:"language- extra-class"},[r("pre",{pre:!0,attrs:{class:"language-text"}},[r("code",[e._v('.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)\n .topicFunction(message -> message.getHeaders().get("myTopic")))\n}\n')])])])])}),[],!1,null,null,null);o.default=a.exports}}]);