# ZeroMQ Support ## ZeroMQ Support Spring Integration provides components to support [ZeroMQ](https://zeromq.org/) communication in the application. The implementation is based on the well-supported Java API of the [JeroMQ](https://github.com/zeromq/jeromq) library. All components encapsulate ZeroMQ socket lifecycles and manage threads for them internally making interactions with these components lock-free and thread-safe. You need to include this dependency into your project: Maven ``` org.springframework.integration spring-integration-zeromq 5.5.9 ``` Gradle ``` compile "org.springframework.integration:spring-integration-zeromq:5.5.9" ``` ### ZeroMQ Proxy The `ZeroMqProxy` is a Spring-friendly wrapper for the built-in `ZMQ.proxy()` [function](https://zguide.zeromq.org/page:chapter2#toc15). It encapsulates socket lifecycles and thread management. The clients of this proxy still can use a standard ZeroMQ socket connection and interaction API. Alongside with the standard `ZContext` it requires one of the well-known ZeroMQ proxy modes: SUB/PUB, PULL/PUSH or ROUTER/DEALER. This way an appropriate pair of ZeroMQ socket types are used for the frontend and backend of the proxy. See `ZeroMqProxy.Type` for details. The `ZeroMqProxy` implements `SmartLifecycle` to create, bind and configure the sockets and to start `ZMQ.proxy()` in a dedicated thread from an `Executor` (if any). The binding for frontend and backend sockets is done over the `tcp://` protocol onto all of the available network interfaces with the provided ports. Otherwise they are bound to random ports which can be obtained later via the respective `getFrontendPort()` and `getBackendPort()` API methods. The control socket is exposed as a `SocketType.PAIR` with an inter-thread transport on the `"inproc://" + beanName + ".control"` address; it can be obtained via `getControlAddress()`. It should be used with the same application from another `SocketType.PAIR` socket to send `ZMQ.PROXY_TERMINATE`, `ZMQ.PROXY_PAUSE` and/or `ZMQ.PROXY_RESUME` commands. The `ZeroMqProxy` performs a `ZMQ.PROXY_TERMINATE` command when `stop()` is called for its lifecycle to terminate the `ZMQ.proxy()` loop and close all the bound sockets gracefully. The `setExposeCaptureSocket(boolean)` option causes this component to bind an additional inter-thread socket with `SocketType.PUB` to capture and publish all the communication between the frontend and backend sockets as it states with `ZMQ.proxy()` implementation. This socket is bound to the `"inproc://" + beanName + ".capture"` address and doesn’t expect any specific subscription for filtering. The frontend and backend sockets can be customized with additional properties, such as read/write timeout or security. This customization is available through `setFrontendSocketConfigurer(Consumer)` and `setBackendSocketConfigurer(Consumer)` callbacks, respectively. The `ZeroMqProxy` could be provided as simple bean like this: ``` @Bean ZeroMqProxy zeroMqProxy() { ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB); proxy.setExposeCaptureSocket(true); proxy.setFrontendPort(6001); proxy.setBackendPort(6002); return proxy; } ``` All the client nodes should connect to the host of this proxy via `tcp://` and use the respective port of their interest. ### ZeroMQ Message Channel The `ZeroMqChannel` is a `SubscribableChannel` which uses a pair of ZeroMQ sockets to connect publishers and subscribers for messaging interaction. It can work in a PUB/SUB mode (defaults to PUSH/PULL); it can also be used as a local inter-thread channel (uses `PAIR` sockets) - the `connectUrl` is not provided in this case. In distributed mode it has to be connected to an externally managed ZeroMQ proxy, where it can exchange messages with other similar channels connected to the same proxy. The connect url option is a standard ZeroMQ connection string with the protocol and host and a pair of ports over colon for frontend and backend sockets of the ZeroMQ proxy. For convenience, the channel could be supplied with the `ZeroMqProxy` instance instead of connection string, if it is configured in the same application as the proxy. Both sending and receiving sockets are managed in their own dedicated threads making this channel concurrency-friendly. This way we can publish and consume to/from a `ZeroMqChannel` from different threads without synchronization. By default the `ZeroMqChannel` uses an `EmbeddedJsonHeadersMessageMapper` to (de)serialize the `Message` (including headers) from/to `byte[]` using a Jackson JSON processor. This logic can be configured via `setMessageMapper(BytesMessageMapper)`. Sending and receiving sockets can be customized for any options (read/write timeout, security etc.) via respective `setSendSocketConfigurer(Consumer)` and `setSubscribeSocketConfigurer(Consumer)` callbacks. The internal logic of the `ZeroMqChannel` is based on the reactive streams via Project Reactor `Flux` and `Mono` operators. This provides easier threading control and allows lock-free concurrent publication and consumption to/from the channel. Local PUB/SUB logic is implemented as a `Flux.publish()` operator to allow all of the local subscribers to this channel to receive the same published message, as distributed subscribers to the `PUB` socket. The following is a simple example of a `ZeroMqChannel` configuration: ``` @Bean ZeroMqChannel zeroMqPubSubChannel(ZContext context) { ZeroMqChannel channel = new ZeroMqChannel(context, true); channel.setConnectUrl("tcp://localhost:6001:6002"); channel.setConsumeDelay(Duration.ofMillis(100)); return channel; } ``` ### ZeroMQ Inbound Channel Adapter The `ZeroMqMessageProducer` is a `MessageProducerSupport` implementation with reactive semantics. It constantly reads the data from a ZeroMQ socket in a non-blocking manner and publishes the messages to an infinite `Flux` which is subscribed to by a `FluxMessageChannel` or explicitly in the `start()` method, if the output channel is not reactive. When no data are received on the socket, a `consumeDelay` (defaults to 1 second) is applied before the next read attempt. Only `SocketType.PAIR`, `SocketType.PULL` and `SocketType.SUB` are supported by the `ZeroMqMessageProducer`. This component can connect to the remote socket or bind onto TCP protocol with the provided or random port. The actual port can be obtained via `getBoundPort()` after this component is started and ZeroMQ socket is bound. The socket options (e.g. security or write timeout) can be configured via `setSocketConfigurer(Consumer socketConfigurer)` callback. If the `receiveRaw` option is set to `true`, a `ZMsg`, consumed from the socket, is sent as is in the payload of the produced `Message`: it’s up to the downstream flow to parse and convert the `ZMsg`. Otherwise an `InboundMessageMapper` is used to convert the consumed data into a `Message`. If the received `ZMsg` is multi-frame, the first frame is treated as the `ZeroMqHeaders.TOPIC` header this ZeroMQ message was published to. With `SocketType.SUB`, the `ZeroMqMessageProducer` uses the provided `topics` option for subscriptions; defaults to subscribe to all. Subscriptions can be adjusted at runtime using `subscribeToTopics()` and `unsubscribeFromTopics()` `@ManagedOperation` s. Here is a sample of `ZeroMqMessageProducer` configuration: ``` @Bean ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) { ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB); messageProducer.setOutputChannel(outputChannel); messageProducer.setTopics("some"); messageProducer.setReceiveRaw(true); messageProducer.setBindPort(7070); messageProducer.setConsumeDelay(Duration.ofMillis(100)); return messageProducer; } ``` ### ZeroMQ Outbound Channel Adapter The `ZeroMqMessageHandler` is a `ReactiveMessageHandler` implementation to produce publish messages into a ZeroMQ socket. Only `SocketType.PAIR`, `SocketType.PUSH` and `SocketType.PUB` are supported. The `ZeroMqMessageHandler` only supports connecting the ZeroMQ socket; binding is not supported. When the `SocketType.PUB` is used, the `topicExpression` is evaluated against a request message to inject a topic frame into a ZeroMQ message if it is not null. The subscriber side (`SocketType.SUB`) must receive the topic frame first before parsing the actual data. When the payload of the request message is a `ZMsg`, no conversion or topic extraction is performed: the `ZMsg` is sent into a socket as is and it is not destroyed for possible further reuse. Otherwise an `OutboundMessageMapper` is used to convert a request message (or just its payload) into a ZeroMQ frame to publish. By default a `ConvertingBytesMessageMapper` is used supplied with a `ConfigurableCompositeMessageConverter`. The socket options (e.g. security or write timeout) can be configured via `setSocketConfigurer(Consumer socketConfigurer)` callback. Here is a sample of `ZeroMqMessageHandler` configuration: ``` @Bean @ServiceActivator(inputChannel = "zeroMqPublisherChannel") ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) { ZeroMqMessageHandler messageHandler = new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB); messageHandler.setTopicExpression( new FunctionExpression>((message) -> message.getHeaders().get("topic"))); messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper()); } ``` ### ZeroMQ Java DSL Support The `spring-integration-zeromq` provide a convenient Java DSL fluent API via `ZeroMq` factory and `IntegrationComponentSpec` implementations for the components mentioned above. This is a sample of Java DSL for `ZeroMqChannel`: ``` .channel(ZeroMq.zeroMqChannel(this.context) .connectUrl("tcp://localhost:6001:6002") .consumeDelay(Duration.ofMillis(100))) } ``` The Inbound Channel Adapter for ZeroMQ Java DSL is: ``` IntegrationFlows.from( ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB) .connectUrl("tcp://localhost:9000") .topics("someTopic") .receiveRaw(true) .consumeDelay(Duration.ofMillis(100))) } ``` The Outbound Channel Adapter for ZeroMQ Java DSL is: ``` .handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB) .topicFunction(message -> message.getHeaders().get("myTopic"))) } ```