# ZeroMQ 支持 ## zeromq 支持 Spring 集成提供组件以支持[ZeroMQ](https://zeromq.org/)在应用程序中的通信。该实现基于[JeroMQ](https://github.com/zeromq/jeromq)库中得到良好支持的 Java API。所有组件都封装了 ZeroMQ 套接字生命周期,并在内部为它们管理线程,从而与这些组件进行无锁和线程安全的交互。 你需要在项目中包含此依赖项: Maven ``` org.springframework.integration spring-integration-zeromq 5.5.9 ``` Gradle ``` compile "org.springframework.integration:spring-integration-zeromq:5.5.9" ``` ### Zeromq 代理 `ZeroMqProxy`是内置`ZMQ.proxy()`[function](https://zguide.zeromq.org/page:chapter2#toc15)的 Spring 友好的包装器。它封装了套接字生命周期和线程管理。该代理的客户端仍然可以使用标准的 Zeromq 套接字连接和交互 API。除了标准`ZContext`之外,它还需要一种著名的 ZeroMQ 代理模式:sub/pub、pull/push 或 router/dealer。这样,代理的前端和后端将使用一对适当的 ZeroMQ 套接字类型。详见`ZeroMqProxy.Type`。 `ZeroMqProxy`实现`SmartLifecycle`来创建、绑定和配置套接字,并在专用线程中从`Executor`(如果有的话)启动`ZMQ.proxy()`。前端和后端套接字的绑定是通过`tcp://`协议在所有可用的网络接口和提供的端口上完成的。否则,它们被绑定到随机端口,这可以在以后通过相应的`getFrontendPort()`和`getBackendPort()`API 方法获得。 控制套接字作为`SocketType.PAIR`公开,并在`"inproc://" + beanName + ".control"`地址上具有线程间传输;可以通过`getControlAddress()`获得。它应该与来自另一个`SocketType.PAIR`套接字的相同应用程序一起使用,以发送`ZMQ.PROXY_TERMINATE`、`ZMQ.PROXY_PAUSE`和/或`ZMQ.PROXY_RESUME`命令。当`ZeroMqProxy`的生命周期调用`stop()`时,`ZMQ.PROXY_TERMINATE`执行一个`ZMQ.PROXY_TERMINATE`命令,以终止`ZMQ.proxy()`循环并优雅地关闭所有绑定的套接字。 `setExposeCaptureSocket(boolean)`选项将使该组件与`SocketType.PUB`绑定一个额外的线程间套接字,以捕获并发布前端和后端套接字之间的所有通信,因为它使用`ZMQ.proxy()`实现。这个套接字绑定到`"inproc://" + beanName + ".capture"`地址,并且不需要任何特定的订阅来进行过滤。 前端和后端套接字可以使用其他属性进行定制,例如读/写超时或安全性。这种定制可分别通过`setFrontendSocketConfigurer(Consumer)`和`setBackendSocketConfigurer(Consumer)`回调进行。 `ZeroMqProxy`可以像这样简单地提供: ``` @Bean ZeroMqProxy zeroMqProxy() { ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB); proxy.setExposeCaptureSocket(true); proxy.setFrontendPort(6001); proxy.setBackendPort(6002); return proxy; } ``` 所有的客户端节点都应该通过`tcp://`连接到这个代理的主机,并使用各自感兴趣的端口。 ### zeromq 消息通道 `ZeroMqChannel`是一个`SubscribableChannel`,它使用一对 Zeromq 套接字来连接发布者和订阅者以进行消息交互。它可以在 pub/sub 模式下工作(默认为 push/pull);它也可以用作本地线程间通道(使用`PAIR`套接字)-在这种情况下不提供`connectUrl`。在分布式模式下,它必须连接到外部管理的 ZeroMQ 代理,在该代理中,它可以与连接到同一代理的其他类似通道交换消息。Connect URL 选项是一个标准的 ZeroMQ 连接字符串,其中包含协议和主机,以及一对用于 ZeroMQ 代理的前端和后端套接字的冒号端口。为了方便起见,如果通道在与代理相同的应用程序中配置,则可以提供`ZeroMqProxy`实例,而不是连接字符串。 发送和接收套接字都在自己的专用线程中进行管理,这使得该通道易于并发。通过这种方式,我们可以在不进行同步的情况下,从`ZeroMqChannel`从不同的线程发布和使用`ZeroMqChannel`。 默认情况下,`ZeroMqChannel`使用`EmbeddedJsonHeadersMessageMapper`使用 JacksonJSON 处理器将`Message`(包括头)从/到`byte[]`序列化。这个逻辑可以通过`setMessageMapper(BytesMessageMapper)`进行配置。 发送和接收套接字可以通过相应的`setSendSocketConfigurer(Consumer)`和`setSubscribeSocketConfigurer(Consumer)`回调为任何选项(读/写超时,安全性等)定制。 该`ZeroMqChannel`的内部逻辑是基于经由工程反应器的反应流`Flux`和`Mono`的操作符。这提供了更容易的线程控制,并允许无锁的并发发布和从通道到/从通道消费。本地发布/订阅逻辑被实现为一个`Flux.publish()`操作符,以允许该信道的所有本地订阅者接收相同的发布消息,作为`PUB`套接字的分布式订阅者。 以下是`ZeroMqChannel`配置的一个简单示例: ``` @Bean ZeroMqChannel zeroMqPubSubChannel(ZContext context) { ZeroMqChannel channel = new ZeroMqChannel(context, true); channel.setConnectUrl("tcp://localhost:6001:6002"); channel.setConsumeDelay(Duration.ofMillis(100)); return channel; } ``` ### zeromq 入站通道适配器 `ZeroMqMessageProducer`是一个具有反应语义的`MessageProducerSupport`实现。它以非阻塞的方式不断地从 ZeroMQ 套接字读取数据,并将消息发布到由`Flux`订阅的无限`FluxMessageChannel`,或者在`start()`方法中显式地订阅的`Flux`,如果输出通道不是反应性的。当套接字上没有接收到数据时,将在下一次读取尝试之前应用`consumeDelay`(默认值为 1 秒)。 只有`SocketType.PAIR`、`SocketType.PULL`和`SocketType.SUB`是由`ZeroMqMessageProducer`支持的。该组件可以连接到远程套接字,也可以通过提供的端口或随机端口绑定到 TCP 协议。在启动此组件并绑定 ZeroMQ 套接字之后,可以通过`getBoundPort()`获得实际的端口。可以通过`setSocketConfigurer(Consumer socketConfigurer)`回调配置套接字选项(例如,安全性或写超时)。 如果`receiveRaw`选项被设置为`true`,则从套接字中消费的`ZMsg`将在产生的`Message`的有效负载中按原样发送:由下游流来解析和转换`ZMsg`。否则,将使用`InboundMessageMapper`将所消耗的数据转换为`Message`。如果接收到的`ZMsg`是多帧的,则将第一个帧作为`ZeroMqHeaders.TOPIC`头来处理此 zeromq 消息已发布到。 对于`SocketType.SUB`,`ZeroMqMessageProducer`使用所提供的`topics`选项进行订阅;默认情况下订阅所有内容。订阅可以在运行时使用`subscribeToTopics()`和`unsubscribeFromTopics()``@ManagedOperation`s 进行调整。 下面是`ZeroMqMessageProducer`配置的示例: ``` @Bean ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) { ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB); messageProducer.setOutputChannel(outputChannel); messageProducer.setTopics("some"); messageProducer.setReceiveRaw(true); messageProducer.setBindPort(7070); messageProducer.setConsumeDelay(Duration.ofMillis(100)); return messageProducer; } ``` ### zeromq 出站通道适配器 `ZeroMqMessageHandler`是一个`ReactiveMessageHandler`实现,用于将发布消息生成到 Zeromq 套接字中。只支持`SocketType.PAIR`、`SocketType.PUSH`和`SocketType.PUB`。`ZeroMqMessageHandler`只支持连接 zeromq 套接字;不支持绑定。当使用`SocketType.PUB`时,将根据请求消息对`topicExpression`进行求值,以便在不为空的情况下将主题框架注入到 ZeroMQ 消息中。订阅方方(`SocketType.SUB`)在解析实际数据之前必须首先接收主题帧。当请求消息的有效负载是`ZMsg`时,不执行转换或主题提取:`ZMsg`按原样发送到套接字中,并且不会为可能的进一步重用而销毁它。否则,将使用`OutboundMessageMapper`将请求消息(或仅其有效负载)转换为要发布的 zeromq 帧。默认情况下,`ConvertingBytesMessageMapper`是与`ConfigurableCompositeMessageConverter`一起使用的。可以通过`setSocketConfigurer(Consumer socketConfigurer)`回调配置套接字选项(例如,安全性或写超时)。 下面是`ZeroMqMessageHandler`配置的示例: ``` @Bean @ServiceActivator(inputChannel = "zeroMqPublisherChannel") ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) { ZeroMqMessageHandler messageHandler = new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB); messageHandler.setTopicExpression( new FunctionExpression>((message) -> message.getHeaders().get("topic"))); messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper()); } ``` ### Zeromq Java DSL 支持 `spring-integration-zeromq`通过`ZeroMq`工厂和`IntegrationComponentSpec`实现,为上面提到的组件提供了一个方便的 Java DSL Fluent API。 这是`ZeroMqChannel`的 Java DSL 示例: ``` .channel(ZeroMq.zeroMqChannel(this.context) .connectUrl("tcp://localhost:6001:6002") .consumeDelay(Duration.ofMillis(100))) } ``` Zeromq Java DSL 的入站通道适配器为: ``` IntegrationFlows.from( ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB) .connectUrl("tcp://localhost:9000") .topics("someTopic") .receiveRaw(true) .consumeDelay(Duration.ofMillis(100))) } ``` Zeromq Java DSL 的出站通道适配器为: ``` .handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB) .topicFunction(message -> message.getHeaders().get("myTopic"))) } ```