# ZeroMQ 支持
## zeromq 支持
Spring 集成提供组件以支持[ZeroMQ](https://zeromq.org/)在应用程序中的通信。该实现基于[JeroMQ](https://github.com/zeromq/jeromq)库中得到良好支持的 Java API。所有组件都封装了 ZeroMQ 套接字生命周期,并在内部为它们管理线程,从而与这些组件进行无锁和线程安全的交互。
你需要在项目中包含此依赖项:
Maven
```
org.springframework.integration
spring-integration-zeromq
5.5.9
```
Gradle
```
compile "org.springframework.integration:spring-integration-zeromq:5.5.9"
```
### Zeromq 代理
`ZeroMqProxy`是内置`ZMQ.proxy()`[function](https://zguide.zeromq.org/page:chapter2#toc15)的 Spring 友好的包装器。它封装了套接字生命周期和线程管理。该代理的客户端仍然可以使用标准的 Zeromq 套接字连接和交互 API。除了标准`ZContext`之外,它还需要一种著名的 ZeroMQ 代理模式:sub/pub、pull/push 或 router/dealer。这样,代理的前端和后端将使用一对适当的 ZeroMQ 套接字类型。详见`ZeroMqProxy.Type`。
`ZeroMqProxy`实现`SmartLifecycle`来创建、绑定和配置套接字,并在专用线程中从`Executor`(如果有的话)启动`ZMQ.proxy()`。前端和后端套接字的绑定是通过`tcp://`协议在所有可用的网络接口和提供的端口上完成的。否则,它们被绑定到随机端口,这可以在以后通过相应的`getFrontendPort()`和`getBackendPort()`API 方法获得。
控制套接字作为`SocketType.PAIR`公开,并在`"inproc://" + beanName + ".control"`地址上具有线程间传输;可以通过`getControlAddress()`获得。它应该与来自另一个`SocketType.PAIR`套接字的相同应用程序一起使用,以发送`ZMQ.PROXY_TERMINATE`、`ZMQ.PROXY_PAUSE`和/或`ZMQ.PROXY_RESUME`命令。当`ZeroMqProxy`的生命周期调用`stop()`时,`ZMQ.PROXY_TERMINATE`执行一个`ZMQ.PROXY_TERMINATE`命令,以终止`ZMQ.proxy()`循环并优雅地关闭所有绑定的套接字。
`setExposeCaptureSocket(boolean)`选项将使该组件与`SocketType.PUB`绑定一个额外的线程间套接字,以捕获并发布前端和后端套接字之间的所有通信,因为它使用`ZMQ.proxy()`实现。这个套接字绑定到`"inproc://" + beanName + ".capture"`地址,并且不需要任何特定的订阅来进行过滤。
前端和后端套接字可以使用其他属性进行定制,例如读/写超时或安全性。这种定制可分别通过`setFrontendSocketConfigurer(Consumer)`和`setBackendSocketConfigurer(Consumer)`回调进行。
`ZeroMqProxy`可以像这样简单地提供:
```
@Bean
ZeroMqProxy zeroMqProxy() {
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(6001);
proxy.setBackendPort(6002);
return proxy;
}
```
所有的客户端节点都应该通过`tcp://`连接到这个代理的主机,并使用各自感兴趣的端口。
### zeromq 消息通道
`ZeroMqChannel`是一个`SubscribableChannel`,它使用一对 Zeromq 套接字来连接发布者和订阅者以进行消息交互。它可以在 pub/sub 模式下工作(默认为 push/pull);它也可以用作本地线程间通道(使用`PAIR`套接字)-在这种情况下不提供`connectUrl`。在分布式模式下,它必须连接到外部管理的 ZeroMQ 代理,在该代理中,它可以与连接到同一代理的其他类似通道交换消息。Connect URL 选项是一个标准的 ZeroMQ 连接字符串,其中包含协议和主机,以及一对用于 ZeroMQ 代理的前端和后端套接字的冒号端口。为了方便起见,如果通道在与代理相同的应用程序中配置,则可以提供`ZeroMqProxy`实例,而不是连接字符串。
发送和接收套接字都在自己的专用线程中进行管理,这使得该通道易于并发。通过这种方式,我们可以在不进行同步的情况下,从`ZeroMqChannel`从不同的线程发布和使用`ZeroMqChannel`。
默认情况下,`ZeroMqChannel`使用`EmbeddedJsonHeadersMessageMapper`使用 JacksonJSON 处理器将`Message`(包括头)从/到`byte[]`序列化。这个逻辑可以通过`setMessageMapper(BytesMessageMapper)`进行配置。
发送和接收套接字可以通过相应的`setSendSocketConfigurer(Consumer)`和`setSubscribeSocketConfigurer(Consumer)`回调为任何选项(读/写超时,安全性等)定制。
该`ZeroMqChannel`的内部逻辑是基于经由工程反应器的反应流`Flux`和`Mono`的操作符。这提供了更容易的线程控制,并允许无锁的并发发布和从通道到/从通道消费。本地发布/订阅逻辑被实现为一个`Flux.publish()`操作符,以允许该信道的所有本地订阅者接收相同的发布消息,作为`PUB`套接字的分布式订阅者。
以下是`ZeroMqChannel`配置的一个简单示例:
```
@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://localhost:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
return channel;
}
```
### zeromq 入站通道适配器
`ZeroMqMessageProducer`是一个具有反应语义的`MessageProducerSupport`实现。它以非阻塞的方式不断地从 ZeroMQ 套接字读取数据,并将消息发布到由`Flux`订阅的无限`FluxMessageChannel`,或者在`start()`方法中显式地订阅的`Flux`,如果输出通道不是反应性的。当套接字上没有接收到数据时,将在下一次读取尝试之前应用`consumeDelay`(默认值为 1 秒)。
只有`SocketType.PAIR`、`SocketType.PULL`和`SocketType.SUB`是由`ZeroMqMessageProducer`支持的。该组件可以连接到远程套接字,也可以通过提供的端口或随机端口绑定到 TCP 协议。在启动此组件并绑定 ZeroMQ 套接字之后,可以通过`getBoundPort()`获得实际的端口。可以通过`setSocketConfigurer(Consumer socketConfigurer)`回调配置套接字选项(例如,安全性或写超时)。
如果`receiveRaw`选项被设置为`true`,则从套接字中消费的`ZMsg`将在产生的`Message`的有效负载中按原样发送:由下游流来解析和转换`ZMsg`。否则,将使用`InboundMessageMapper`将所消耗的数据转换为`Message`。如果接收到的`ZMsg`是多帧的,则将第一个帧作为`ZeroMqHeaders.TOPIC`头来处理此 zeromq 消息已发布到。
对于`SocketType.SUB`,`ZeroMqMessageProducer`使用所提供的`topics`选项进行订阅;默认情况下订阅所有内容。订阅可以在运行时使用`subscribeToTopics()`和`unsubscribeFromTopics()``@ManagedOperation`s 进行调整。
下面是`ZeroMqMessageProducer`配置的示例:
```
@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
messageProducer.setOutputChannel(outputChannel);
messageProducer.setTopics("some");
messageProducer.setReceiveRaw(true);
messageProducer.setBindPort(7070);
messageProducer.setConsumeDelay(Duration.ofMillis(100));
return messageProducer;
}
```
### zeromq 出站通道适配器
`ZeroMqMessageHandler`是一个`ReactiveMessageHandler`实现,用于将发布消息生成到 Zeromq 套接字中。只支持`SocketType.PAIR`、`SocketType.PUSH`和`SocketType.PUB`。`ZeroMqMessageHandler`只支持连接 zeromq 套接字;不支持绑定。当使用`SocketType.PUB`时,将根据请求消息对`topicExpression`进行求值,以便在不为空的情况下将主题框架注入到 ZeroMQ 消息中。订阅方方(`SocketType.SUB`)在解析实际数据之前必须首先接收主题帧。当请求消息的有效负载是`ZMsg`时,不执行转换或主题提取:`ZMsg`按原样发送到套接字中,并且不会为可能的进一步重用而销毁它。否则,将使用`OutboundMessageMapper`将请求消息(或仅其有效负载)转换为要发布的 zeromq 帧。默认情况下,`ConvertingBytesMessageMapper`是与`ConfigurableCompositeMessageConverter`一起使用的。可以通过`setSocketConfigurer(Consumer socketConfigurer)`回调配置套接字选项(例如,安全性或写超时)。
下面是`ZeroMqMessageHandler`配置的示例:
```
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
```
### Zeromq Java DSL 支持
`spring-integration-zeromq`通过`ZeroMq`工厂和`IntegrationComponentSpec`实现,为上面提到的组件提供了一个方便的 Java DSL Fluent API。
这是`ZeroMqChannel`的 Java DSL 示例:
```
.channel(ZeroMq.zeroMqChannel(this.context)
.connectUrl("tcp://localhost:6001:6002")
.consumeDelay(Duration.ofMillis(100)))
}
```
Zeromq Java DSL 的入站通道适配器为:
```
IntegrationFlows.from(
ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
.connectUrl("tcp://localhost:9000")
.topics("someTopic")
.receiveRaw(true)
.consumeDelay(Duration.ofMillis(100)))
}
```
Zeromq Java DSL 的出站通道适配器为:
```
.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
.topicFunction(message -> message.getHeaders().get("myTopic")))
}
```