# MQTT 支持 ## MQTT 支持 Spring 集成提供了入站和出站信道适配器,以支持消息队列遥测传输协议。 你需要在项目中包含此依赖项: Maven ``` org.springframework.integration spring-integration-mqtt 5.5.9 ``` Gradle ``` compile "org.springframework.integration:spring-integration-mqtt:5.5.9" ``` 当前的实现使用[Eclipse PAHO MQTT 客户端](https://www.eclipse.org/paho/)库。 | |XML 配置和本章的大部分内容都是关于 MQTTV3.1 协议支持和相应的 PAHO 客户机的。
有关相应的协议支持,请参见[MQTT V5 支持](#mqtt-v5)段。| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 这两个适配器的配置都是使用`DefaultMqttPahoClientFactory`实现的。有关配置选项的更多信息,请参阅泛美卫生组织的文档。 | |我们建议配置`MqttConnectOptions`对象并将其注入工厂,而不是在工厂本身上设置(不推荐的)选项。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------| ### 入站(消息驱动)通道适配器 入站通道适配器由`MqttPahoMessageDrivenChannelAdapter`实现。为了方便起见,你可以使用名称空间来配置它。最小配置可以如下所示: ``` ``` 下面的清单显示了可用的属性: ``` ``` |**1** |客户 ID。| |------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |**2** |代理 URL。| |**3** |以逗号分隔的主题列表,此适配器从中接收消息。| |**4** |用逗号分隔的 QoS 值列表。
它可以是应用于所有主题的单个值,也可以是每个主题的值(在这种情况下,列表的长度必须相同)。| |**5** |`MqttMessageConverter`(可选)。
默认情况下,默认的`DefaultPahoMessageConverter`生成带有`String`有效负载的消息,其标题如下:

*`mqtt_topic`:接收消息的主题
`mqtt_duplicate`:`true`如果消息是重复的

*`mqtt_qos`:服务质量
你可以将`DefaultPahoMessageConverter`配置为返回有效负载中的 RAW`byte[]`,方法是将其声明为``,并将`payloadAsBytes`属性设置为`true`。| |**6** |客户工厂。| |**7** |Send Timeout.
只有在信道可能阻塞时才适用(例如当前已满的有界`QueueChannel`)。| |**8** |错误通道。
下游异常被发送到此通道,如果提供的话,在`ErrorMessage`中。
有效负载是一个`MessagingException`,其中包含失败的消息和原因。| |**9** |恢复间隔。
它控制适配器在失败后尝试重新连接的间隔。
它默认为`10000ms`(十秒)。| |**10**|确认模式;为手动确认设置为 true。| | |从版本 4.1 开始,你可以省略 URL。
相反,你可以在`serverURIs`属性的`serverURIs`中提供服务器 URI。
这样做,例如,可以启用到高可用集群的连接。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 从版本 4.2.2 开始,当适配器成功订阅主题时,将发布`MqttSubscribedEvent`事件。当连接或订阅失败时,将发布`MqttConnectionFailedEvent`事件。实现`ApplicationListener`的 Bean 可以接收这些事件。 另外,一个名为`recoveryInterval`的新属性控制适配器在失败后尝试重新连接的时间间隔。它默认为`10000ms`(10 秒)。 | |在版本 4.2.3 之前,当适配器停止时,客户端总是取消订阅。
这是不正确的,因为如果客户端 QoS 大于 0,我们需要保持订阅活动,以便在适配器停止时到达
的消息在下一个开始时被传递。
这还需要将客户端工厂上的`cleanSession`属性设置为`false`。
它默认为`true`。
从 4.2.3 版本开始,如果`cleanSession`属性是`false`,则适配器不会取消订阅(默认情况下)。

可以通过在工厂上设置`consumerCloseAction`属性来重写此行为。,
它可以具有值:`UNSUBSCRIBE_ALWAYS`,`UNSUBSCRIBE_NEVER`,并且`UNSUBSCRIBE_CLEAN`.
只有当`cleanSession`属性是`true`时,后者(默认)才会取消订阅。

要恢复到 pre-4.2.3 行为,请使用`UNSUBSCRIBE_ALWAYS`。| |---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | |从版本 5.0 开始,`topic`、`qos`和`retained`属性被映射到`.RECEIVED_…​`标题(`MqttHeaders.RECEIVED_TOPIC`、`MqttHeaders.RECEIVED_QOS`和`MqttHeaders.RECEIVED_RETAINED`),以避免无意中传播到出站消息,即(默认情况下)使用`MqttHeaders.TOPIC`、`MqttHeaders.QOS`和`MqttHeaders.RETAINED`标题。| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### 在运行时添加和删除主题 从版本 4.1 开始,你可以通过编程方式更改订阅适配器的主题。 Spring 集成提供了`addTopic()`和`removeTopic()`方法。在添加主题时,你可以选择指定`QoS`(默认值:1)。你还可以通过向具有适当有效负载的``发送适当的消息来修改主题,例如:`"myMqttAdapter.addTopic('foo', 1)"`。 停止和启动适配器对主题列表没有影响(它不会恢复到配置中的原始设置)。更改不会保留到应用程序上下文的生命周期之后。新的应用程序上下文将恢复到已配置的设置。 当适配器停止(或与代理断开连接)时更改主题将在下一次建立连接时生效。 #### 手动 ACK 从版本 5.3 开始,你可以将`manualAcks`属性设置为 true。通常用于异步确认交付。当设置为`true`时,header(`IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK`)将添加到消息中,其值为`SimpleAcknowledgment`。你必须调用`acknowledge()`方法来完成交付。有关更多信息,请参见`IMqttClient``setManualAcks()`和`messageArrivedComplete()`的 Javadocs。为了方便起见,提供了一个头访问器: ``` StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge(); ``` 从版本`5.2.11`开始,当消息转换器抛出异常或从`MqttMessage`转换返回`null`时,`MqttPahoMessageDrivenChannelAdapter`将`ErrorMessage`发送到`errorChannel`,如果提供的话。将此转换错误重新抛出到 MQTT 客户端回调中。 #### 使用 Java 配置进行配置 下面的 Spring 引导应用程序展示了如何使用 Java 配置配置入站适配器的示例: ``` @SpringBootApplication public class MqttJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient", "topic1", "topic2"); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message message) throws MessagingException { System.out.println(message.getPayload()); } }; } } ``` #### 使用 Java DSL 进行配置 下面的 Spring 引导应用程序提供了一个使用 Java DSL 配置入站适配器的示例: ``` @SpringBootApplication public class MqttJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); } @Bean public IntegrationFlow mqttInbound() { return IntegrationFlows.from( new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient", "topic1", "topic2");) .handle(m -> System.out.println(m.getPayload())) .get(); } } ``` ### 出站通道适配器 出站通道适配器由`MqttPahoMessageHandler`实现,它包装在`ConsumerEndpoint`中。为了方便起见,你可以使用名称空间来配置它。 从版本 4.1 开始,适配器支持异步发送操作,在确认交付之前避免阻塞。如果需要,可以发出应用程序事件以使应用程序能够确认交付。 下面的清单显示了出站通道适配器可用的属性: ``` ``` |**1** |客户 ID。| |------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |**2** |代理 URL。| |**3** |`MqttMessageConverter`(可选)。
缺省的`DefaultPahoMessageConverter`可以识别以下标题:

*`mqtt_topic`:将消息发送到的主题

*`mqtt_retained`:`true`如果要保留消息
<><133"/>:服务质量<>| |**4** |客户工厂。| |**5** |默认的服务质量。
如果没有找到`mqtt_qos`标头,或者`qos-expression`返回`null`,则使用它。
如果你提供自定义`converter`,则不使用它。| |**6** |一个表达式来求值以确定 QoS。
默认值是`headers[mqtt_qos]`。| |**7** |保留标志的默认值。
如果没有`mqtt_retained`标头,则使用它。
如果提供了自定义`converter`,则不使用它。| |**8** |计算表达式以确定保留的布尔值。
默认值是`headers[mqtt_retained]`。| |**9** |消息发送到的默认主题(如果没有`mqtt_topic`头,则使用该主题)。| |**10**|一个表达式来求值以确定目标主题。
默认值是`headers['mqtt_topic']`。| |**11**|当`true`时,调用者不会阻塞。
相反,它会在发送消息时等待发送确认。
缺省值是`false`(发送阻塞直到确认发送)。| |**12**|当`async`和`async-events`都`true`时,会发出一个`MqttMessageSentEvent`(参见[Events](#mqtt-events))。
它包含消息、主题、客户库生成的`messageId`、`clientId`,和`clientInstance`(每次连接客户机时递增)。
当交付被客户库确认时,将发出一个`MqttMessageDeliveredEvent`。
它包含`messageId`、`clientId`和`clientInstance`,启用与发送相关的传递。
任何`ApplicationListener`或事件入站通道适配器都可以接收到这些事件。
注意,对于`MqttMessageDeliveredEvent`有可能在`MqttMessageSentEvent`之前接收到的
默认值是`false`。| | |从版本 4.1 开始,可以省略 URL。
相反,可以在`serverURIs`的`serverURIs`属性中提供服务器 URI。
例如,这可以实现到高可用集群的连接。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### 使用 Java 配置进行配置 下面的 Spring 引导应用程序展示了如何使用 Java 配置出站适配器的示例: ``` @SpringBootApplication @IntegrationComponentScan public class MqttJavaApplication { public static void main(String[] args) { ConfigurableApplicationContext context = new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); MyGateway gateway = context.getBean(MyGateway.class); gateway.sendToMqtt("foo"); } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" }); options.setUserName("username"); options.setPassword("password".toCharArray()); factory.setConnectionOptions(options); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("testClient", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic("testTopic"); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MyGateway { void sendToMqtt(String data); } } ``` #### 使用 Java DSL 进行配置 下面的 Spring 引导应用程序提供了一个使用 Java DSL 配置出站适配器的示例: ``` @SpringBootApplication public class MqttJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); } @Bean public IntegrationFlow mqttOutboundFlow() { return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient")); } } ``` ### 事件 某些应用程序事件由适配器发布。 * `MqttConnectionFailedEvent`-如果连接失败或随后丢失连接,则由两个适配器发布。 * `MqttMessageSentEvent`-如果以异步模式运行,则在消息已发送时由出站适配器发布。 * `MqttMessageDeliveredEvent`-如果以异步模式运行,则由出站适配器在客户端指示消息已被传递时发布。 * `MqttSubscribedEvent`-由入站适配器在订阅主题后发布。 这些事件可以通过`ApplicationListener`或`@EventListener`方法接收。 要确定事件的源,请使用以下方法;你可以检查 Bean 名称和/或连接选项(以访问服务器 URI 等)。 ``` MqttPahoComponent source = event.getSourceAsType(); String beanName = source.getBeanName(); MqttConnectOptions options = source.getConnectionInfo(); ``` ### MQTT v5 支持 从版本 5.5.5 开始,`spring-integration-mqtt`模块为 MQTV5 协议提供通道适配器实现。`org.eclipse.paho:org.eclipse.paho.mqttv5.client`是一个`optional`依赖项,因此必须显式地包含在目标项目中。 由于 MQTT V5 协议在 MQTT 消息中支持额外的任意属性,因此引入了`MqttHeaderMapper`实现,以在发布和接收操作时映射到/来自头。默认情况下(通过`*`模式),它映射所有接收到的`PUBLISH`帧属性(包括用户属性)。在出站方面,它映射了`PUBLISH`框架的这个子集的头:`contentType`,`mqtt_messageExpiryInterval`,`mqtt_responseTopic`,`mqtt_correlationData`。 MQTT V5 协议的出站通道适配器以`Mqttv5PahoMessageHandler`的形式存在。它需要`clientId`和 MQTT 代理 URL 或`MqttConnectionOptions`引用。它支持`MqttClientPersistence`选项,可以是`async`,并且在那种情况下可以发出`MqttIntegrationEvent`对象(参见`asyncEvents`选项)。如果请求消息的有效负载是`org.eclipse.paho.mqttv5.common.MqttMessage`,则它将通过内部`IMqttAsyncClient`按原样发布。如果有效载荷是`byte[]`,则按原样用于发布目标`MqttMessage`有效载荷。如果有效负载是`String`,则将其转换为`byte[]`以进行发布。剩余的用例被委托给所提供的`MessageConverter`,这是来自应用程序上下文的`IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME``ConfigurableCompositeMessageConverter` Bean。注意:当请求的消息有效负载已经是`MqttMessage`时,将不使用所提供的`HeaderMapper`。下面的 Java DSL 配置示例演示了如何在集成流中使用此通道适配器: ``` @Bean public IntegrationFlow mqttOutFlow() { Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout"); MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper(); mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE); messageHandler.setHeaderMapper(mqttHeaderMapper); messageHandler.setAsync(true); messageHandler.setAsyncEvents(true); messageHandler.setConverter(mqttStringToBytesConverter()); return f -> f.handle(messageHandler); } ``` | |`org.springframework.integration.mqtt.support.MqttMessageConverter`不能与`Mqttv5PahoMessageHandler`一起使用,因为它的契约仅针对 MQTV3 协议。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 有关更多信息,请参见`Mqttv5PahoMessageHandler`Javadocs 及其超类。 MQTT V5 协议的入站通道适配器以`Mqttv5PahoMessageDrivenChannelAdapter`的形式存在。它需要`clientId`和 MQTT 代理 URL 或`MqttConnectionOptions`引用,以及要订阅和使用的主题。它支持`MqttClientPersistence`选项,默认情况下是在内存中。可以配置预期的`payloadType`(默认情况下`byte[]`)并将其传播到所提供的`SmartMessageConverter`,用于从`byte[]`中转换所接收的`MqttMessage`。如果设置了`manualAck`选项,则在消息中添加一个`IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK`头,以生成`SimpleAcknowledgment`的实例。`HeaderMapper`用于将`PUBLISH`帧属性(包括用户属性)映射到目标消息头。标准的`MqttMessage`属性,如`qos`,`id`,`dup`,`retained`,加上接收到的主题总是映射到标题。有关更多信息,请参见`MqttHeaders`。 下面的 Java DSL 配置示例演示了如何在集成流中使用此通道适配器: ``` @Bean public IntegrationFlow mqttInFlow() { Mqttv5PahoMessageDrivenChannelAdapter messageProducer = new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest"); messageProducer.setPayloadType(String.class); messageProducer.setMessageConverter(mqttStringToBytesConverter()); messageProducer.setManualAcks(true); return IntegrationFlows.from(messageProducer) .channel(c -> c.queue("fromMqttChannel")) .get(); } ``` | |`org.springframework.integration.mqtt.support.MqttMessageConverter`不能与`Mqttv5PahoMessageDrivenChannelAdapter`一起使用,因为其契约仅针对 MQTT v3 协议。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 有关更多信息,请参见`Mqttv5PahoMessageDrivenChannelAdapter`Javadocs 及其超类。 | |建议将`MqttConnectionOptions#setAutomaticReconnect(boolean)`设置为 true,以让内部`IMqttAsyncClient`实例处理重新连接。否则,只有手动重新启动这些通道适配器才能处理重新连接,例如通过`MqttConnectionFailedEvent`处理断开连接。| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|