zeromq.md 10.0 KB
Newer Older
dallascao's avatar
dallascao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
# ZeroMQ 支持

## [](#zeromq)zeromq 支持

Spring 集成提供组件以支持[ZeroMQ](https://zeromq.org/)在应用程序中的通信。该实现基于[JeroMQ](https://github.com/zeromq/jeromq)库中得到良好支持的 Java API。所有组件都封装了 ZeroMQ 套接字生命周期,并在内部为它们管理线程,从而与这些组件进行无锁和线程安全的交互。

你需要在项目中包含此依赖项:

Maven

```
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-zeromq</artifactId>
    <version>5.5.9</version>
</dependency>
```

Gradle

```
compile "org.springframework.integration:spring-integration-zeromq:5.5.9"
```

### [](#zeromq-proxy)Zeromq 代理

`ZeroMqProxy`是内置`ZMQ.proxy()`[function](https://zguide.zeromq.org/page:chapter2#toc15)的 Spring 友好的包装器。它封装了套接字生命周期和线程管理。该代理的客户端仍然可以使用标准的 Zeromq 套接字连接和交互 API。除了标准`ZContext`之外,它还需要一种著名的 ZeroMQ 代理模式:sub/pub、pull/push 或 router/dealer。这样,代理的前端和后端将使用一对适当的 ZeroMQ 套接字类型。详见`ZeroMqProxy.Type`

`ZeroMqProxy`实现`SmartLifecycle`来创建、绑定和配置套接字,并在专用线程中从`Executor`(如果有的话)启动`ZMQ.proxy()`。前端和后端套接字的绑定是通过`tcp://`协议在所有可用的网络接口和提供的端口上完成的。否则,它们被绑定到随机端口,这可以在以后通过相应的`getFrontendPort()``getBackendPort()`API 方法获得。

控制套接字作为`SocketType.PAIR`公开,并在`"inproc://" + beanName + ".control"`地址上具有线程间传输;可以通过`getControlAddress()`获得。它应该与来自另一个`SocketType.PAIR`套接字的相同应用程序一起使用,以发送`ZMQ.PROXY_TERMINATE``ZMQ.PROXY_PAUSE`和/或`ZMQ.PROXY_RESUME`命令。当`ZeroMqProxy`的生命周期调用`stop()`时,`ZMQ.PROXY_TERMINATE`执行一个`ZMQ.PROXY_TERMINATE`命令,以终止`ZMQ.proxy()`循环并优雅地关闭所有绑定的套接字。

`setExposeCaptureSocket(boolean)`选项将使该组件与`SocketType.PUB`绑定一个额外的线程间套接字,以捕获并发布前端和后端套接字之间的所有通信,因为它使用`ZMQ.proxy()`实现。这个套接字绑定到`"inproc://" + beanName + ".capture"`地址,并且不需要任何特定的订阅来进行过滤。

前端和后端套接字可以使用其他属性进行定制,例如读/写超时或安全性。这种定制可分别通过`setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)``setBackendSocketConfigurer(Consumer<ZMQ.Socket>)`回调进行。

`ZeroMqProxy`可以像这样简单地提供:

```
@Bean
ZeroMqProxy zeroMqProxy() {
    ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
    proxy.setExposeCaptureSocket(true);
    proxy.setFrontendPort(6001);
    proxy.setBackendPort(6002);
    return proxy;
}
```

所有的客户端节点都应该通过`tcp://`连接到这个代理的主机,并使用各自感兴趣的端口。

### [](#zeromq-message-channel)zeromq 消息通道

`ZeroMqChannel`是一个`SubscribableChannel`,它使用一对 Zeromq 套接字来连接发布者和订阅者以进行消息交互。它可以在 pub/sub 模式下工作(默认为 push/pull);它也可以用作本地线程间通道(使用`PAIR`套接字)-在这种情况下不提供`connectUrl`。在分布式模式下,它必须连接到外部管理的 ZeroMQ 代理,在该代理中,它可以与连接到同一代理的其他类似通道交换消息。Connect URL 选项是一个标准的 ZeroMQ 连接字符串,其中包含协议和主机,以及一对用于 ZeroMQ 代理的前端和后端套接字的冒号端口。为了方便起见,如果通道在与代理相同的应用程序中配置,则可以提供`ZeroMqProxy`实例,而不是连接字符串。

发送和接收套接字都在自己的专用线程中进行管理,这使得该通道易于并发。通过这种方式,我们可以在不进行同步的情况下,从`ZeroMqChannel`从不同的线程发布和使用`ZeroMqChannel`

默认情况下,`ZeroMqChannel`使用`EmbeddedJsonHeadersMessageMapper`使用 JacksonJSON 处理器将`Message`(包括头)从/到`byte[]`序列化。这个逻辑可以通过`setMessageMapper(BytesMessageMapper)`进行配置。

发送和接收套接字可以通过相应的`setSendSocketConfigurer(Consumer<ZMQ.Socket>)``setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>)`回调为任何选项(读/写超时,安全性等)定制。

`ZeroMqChannel`的内部逻辑是基于经由工程反应器的反应流`Flux``Mono`的操作符。这提供了更容易的线程控制,并允许无锁的并发发布和从通道到/从通道消费。本地发布/订阅逻辑被实现为一个`Flux.publish()`操作符,以允许该信道的所有本地订阅者接收相同的发布消息,作为`PUB`套接字的分布式订阅者。

以下是`ZeroMqChannel`配置的一个简单示例:

```
@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
    ZeroMqChannel channel = new ZeroMqChannel(context, true);
    channel.setConnectUrl("tcp://localhost:6001:6002");
    channel.setConsumeDelay(Duration.ofMillis(100));
    return channel;
}
```

### [](#zeromq-inbound-channel-adapter)zeromq 入站通道适配器

`ZeroMqMessageProducer`是一个具有反应语义的`MessageProducerSupport`实现。它以非阻塞的方式不断地从 ZeroMQ 套接字读取数据,并将消息发布到由`Flux`订阅的无限`FluxMessageChannel`,或者在`start()`方法中显式地订阅的`Flux`,如果输出通道不是反应性的。当套接字上没有接收到数据时,将在下一次读取尝试之前应用`consumeDelay`(默认值为 1 秒)。

只有`SocketType.PAIR``SocketType.PULL``SocketType.SUB`是由`ZeroMqMessageProducer`支持的。该组件可以连接到远程套接字,也可以通过提供的端口或随机端口绑定到 TCP 协议。在启动此组件并绑定 ZeroMQ 套接字之后,可以通过`getBoundPort()`获得实际的端口。可以通过`setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)`回调配置套接字选项(例如,安全性或写超时)。

如果`receiveRaw`选项被设置为`true`,则从套接字中消费的`ZMsg`将在产生的`Message`的有效负载中按原样发送:由下游流来解析和转换`ZMsg`。否则,将使用`InboundMessageMapper`将所消耗的数据转换为`Message`。如果接收到的`ZMsg`是多帧的,则将第一个帧作为`ZeroMqHeaders.TOPIC`头来处理此 zeromq 消息已发布到。

对于`SocketType.SUB``ZeroMqMessageProducer`使用所提供的`topics`选项进行订阅;默认情况下订阅所有内容。订阅可以在运行时使用`subscribeToTopics()``unsubscribeFromTopics()``@ManagedOperation`s 进行调整。

下面是`ZeroMqMessageProducer`配置的示例:

```
@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
    ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
    messageProducer.setOutputChannel(outputChannel);
    messageProducer.setTopics("some");
    messageProducer.setReceiveRaw(true);
    messageProducer.setBindPort(7070);
    messageProducer.setConsumeDelay(Duration.ofMillis(100));
    return messageProducer;
}
```

### [](#zeromq-outbound-channel-adapter)zeromq 出站通道适配器

`ZeroMqMessageHandler`是一个`ReactiveMessageHandler`实现,用于将发布消息生成到 Zeromq 套接字中。只支持`SocketType.PAIR``SocketType.PUSH``SocketType.PUB``ZeroMqMessageHandler`只支持连接 zeromq 套接字;不支持绑定。当使用`SocketType.PUB`时,将根据请求消息对`topicExpression`进行求值,以便在不为空的情况下将主题框架注入到 ZeroMQ 消息中。订阅方方(`SocketType.SUB`)在解析实际数据之前必须首先接收主题帧。当请求消息的有效负载是`ZMsg`时,不执行转换或主题提取:`ZMsg`按原样发送到套接字中,并且不会为可能的进一步重用而销毁它。否则,将使用`OutboundMessageMapper<byte[]>`将请求消息(或仅其有效负载)转换为要发布的 zeromq 帧。默认情况下,`ConvertingBytesMessageMapper`是与`ConfigurableCompositeMessageConverter`一起使用的。可以通过`setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)`回调配置套接字选项(例如,安全性或写超时)。

下面是`ZeroMqMessageHandler`配置的示例:

```
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
    ZeroMqMessageHandler messageHandler =
                  new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
    messageHandler.setTopicExpression(
                  new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
    messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
```

### [](#zeromq-dsl)Zeromq Java DSL 支持

`spring-integration-zeromq`通过`ZeroMq`工厂和`IntegrationComponentSpec`实现,为上面提到的组件提供了一个方便的 Java DSL Fluent API。

这是`ZeroMqChannel`的 Java DSL 示例:

```
.channel(ZeroMq.zeroMqChannel(this.context)
            .connectUrl("tcp://localhost:6001:6002")
            .consumeDelay(Duration.ofMillis(100)))
}
```

Zeromq Java DSL 的入站通道适配器为:

```
IntegrationFlows.from(
            ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
                        .connectUrl("tcp://localhost:9000")
                        .topics("someTopic")
                        .receiveRaw(true)
                        .consumeDelay(Duration.ofMillis(100)))
}
```

Zeromq Java DSL 的出站通道适配器为:

```
.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
                  .topicFunction(message -> message.getHeaders().get("myTopic")))
}
```