mqtt.md 23.2 KB
Newer Older
dallascao's avatar
dallascao 已提交
1 2
# MQTT 支持

3
## MQTT 支持
dallascao's avatar
dallascao 已提交
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34

Spring 集成提供了入站和出站信道适配器,以支持消息队列遥测传输协议。

你需要在项目中包含此依赖项:

Maven

```
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.5.9</version>
</dependency>
```

Gradle

```
compile "org.springframework.integration:spring-integration-mqtt:5.5.9"
```

当前的实现使用[Eclipse PAHO MQTT 客户端](https://www.eclipse.org/paho/)库。

|   |XML 配置和本章的大部分内容都是关于 MQTTV3.1 协议支持和相应的 PAHO 客户机的。<br/>有关相应的协议支持,请参见[MQTT V5 支持](#mqtt-v5)段。|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

这两个适配器的配置都是使用`DefaultMqttPahoClientFactory`实现的。有关配置选项的更多信息,请参阅泛美卫生组织的文档。

|   |我们建议配置`MqttConnectOptions`对象并将其注入工厂,而不是在工厂本身上设置(不推荐的)选项。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------|

35
### 入站(消息驱动)通道适配器
dallascao's avatar
dallascao 已提交
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99

入站通道适配器由`MqttPahoMessageDrivenChannelAdapter`实现。为了方便起见,你可以使用名称空间来配置它。最小配置可以如下所示:

```
<bean id="clientFactory"
        class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
    <property name="connectionOptions">
        <bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
            <property name="userName" value="${mqtt.username}"/>
            <property name="password" value="${mqtt.password}"/>
        </bean>
    </property>
</bean>

<int-mqtt:message-driven-channel-adapter id="mqttInbound"
    client-id="${mqtt.default.client.id}.src"
    url="${mqtt.url}"
    topics="sometopic"
    client-factory="clientFactory"
    channel="output"/>
```

下面的清单显示了可用的属性:

```
<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
    client-id="foo"  (1)
    url="tcp://localhost:1883"  (2)
    topics="bar,baz"  (3)
    qos="1,2"  (4)
    converter="myConverter"  (5)
    client-factory="clientFactory"  (6)
    send-timeout="123"  (7)
    error-channel="errors"  (8)
    recovery-interval="10000"  (9)
    manual-acks="false" (10)
    channel="out" />
```

|**1** |客户 ID。|
|------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2** |代理 URL。|
|**3** |以逗号分隔的主题列表,此适配器从中接收消息。|
|**4** |用逗号分隔的 QoS 值列表。<br/>它可以是应用于所有主题的单个值,也可以是每个主题的值(在这种情况下,列表的长度必须相同)。|
|**5** |`MqttMessageConverter`(可选)。<br/>默认情况下,默认的`DefaultPahoMessageConverter`生成带有`String`有效负载的消息,其标题如下:<br/><br/>*`mqtt_topic`:接收消息的主题<br/>`mqtt_duplicate`:`true`如果消息是重复的<br/><br/>*`mqtt_qos`:服务质量<br/>你可以将`DefaultPahoMessageConverter`配置为返回有效负载中的 RAW`byte[]`,方法是将其声明为`<bean/>`,并将`payloadAsBytes`属性设置为`true`。|
|**6** |客户工厂。|
|**7** |Send Timeout.<br/>只有在信道可能阻塞时才适用(例如当前已满的有界`QueueChannel`)。|
|**8** |错误通道。<br/>下游异常被发送到此通道,如果提供的话,在`ErrorMessage`中。<br/>有效负载是一个`MessagingException`,其中包含失败的消息和原因。|
|**9** |恢复间隔。<br/>它控制适配器在失败后尝试重新连接的间隔。<br/>它默认为`10000ms`(十秒)。|
|**10**|确认模式;为手动确认设置为 true。|

|   |从版本 4.1 开始,你可以省略 URL。<br/>相反,你可以在`serverURIs`属性的`serverURIs`中提供服务器 URI。<br/>这样做,例如,可以启用到高可用集群的连接。|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

从版本 4.2.2 开始,当适配器成功订阅主题时,将发布`MqttSubscribedEvent`事件。当连接或订阅失败时,将发布`MqttConnectionFailedEvent`事件。实现`ApplicationListener`的 Bean 可以接收这些事件。

另外,一个名为`recoveryInterval`的新属性控制适配器在失败后尝试重新连接的时间间隔。它默认为`10000ms`(10 秒)。

|   |在版本 4.2.3 之前,当适配器停止时,客户端总是取消订阅。<br/>这是不正确的,因为如果客户端 QoS 大于 0,我们需要保持订阅活动,以便在适配器停止时到达<br/>的消息在下一个开始时被传递。<br/>这还需要将客户端工厂上的`cleanSession`属性设置为`false`<br/>它默认为`true`<br/>从 4.2.3 版本开始,如果`cleanSession`属性是`false`,则适配器不会取消订阅(默认情况下)。<br/><br/>可以通过在工厂上设置`consumerCloseAction`属性来重写此行为。,<br/>它可以具有值:`UNSUBSCRIBE_ALWAYS``UNSUBSCRIBE_NEVER`,并且`UNSUBSCRIBE_CLEAN`.<br/>只有当`cleanSession`属性是`true`时,后者(默认)才会取消订阅。<br/><br/>要恢复到 pre-4.2.3 行为,请使用`UNSUBSCRIBE_ALWAYS`。|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

|   |从版本 5.0 开始,`topic``qos``retained`属性被映射到`.RECEIVED_…​`标题(`MqttHeaders.RECEIVED_TOPIC``MqttHeaders.RECEIVED_QOS``MqttHeaders.RECEIVED_RETAINED`),以避免无意中传播到出站消息,即(默认情况下)使用`MqttHeaders.TOPIC``MqttHeaders.QOS``MqttHeaders.RETAINED`标题。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

100
#### 在运行时添加和删除主题
dallascao's avatar
dallascao 已提交
101 102 103 104 105 106 107

从版本 4.1 开始,你可以通过编程方式更改订阅适配器的主题。 Spring 集成提供了`addTopic()``removeTopic()`方法。在添加主题时,你可以选择指定`QoS`(默认值:1)。你还可以通过向具有适当有效负载的`<control-bus/>`发送适当的消息来修改主题,例如:`"myMqttAdapter.addTopic('foo', 1)"`

停止和启动适配器对主题列表没有影响(它不会恢复到配置中的原始设置)。更改不会保留到应用程序上下文的生命周期之后。新的应用程序上下文将恢复到已配置的设置。

当适配器停止(或与代理断开连接)时更改主题将在下一次建立连接时生效。

108
#### 手动 ACK
dallascao's avatar
dallascao 已提交
109 110 111 112 113 114 115 116 117

从版本 5.3 开始,你可以将`manualAcks`属性设置为 true。通常用于异步确认交付。当设置为`true`时,header(`IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK`)将添加到消息中,其值为`SimpleAcknowledgment`。你必须调用`acknowledge()`方法来完成交付。有关更多信息,请参见`IMqttClient``setManualAcks()``messageArrivedComplete()`的 Javadocs。为了方便起见,提供了一个头访问器:

```
StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();
```

从版本`5.2.11`开始,当消息转换器抛出异常或从`MqttMessage`转换返回`null`时,`MqttPahoMessageDrivenChannelAdapter``ErrorMessage`发送到`errorChannel`,如果提供的话。将此转换错误重新抛出到 MQTT 客户端回调中。

118
#### 使用 Java 配置进行配置
dallascao's avatar
dallascao 已提交
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164

下面的 Spring 引导应用程序展示了如何使用 Java 配置配置入站适配器的示例:

```
@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
                .web(false)
                .run(args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
                                                 "topic1", "topic2");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}
```

165
#### 使用 Java DSL 进行配置
dallascao's avatar
dallascao 已提交
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190

下面的 Spring 引导应用程序提供了一个使用 Java DSL 配置入站适配器的示例:

```
@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow mqttInbound() {
        return IntegrationFlows.from(
                         new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
                                        "testClient", "topic1", "topic2");)
                .handle(m -> System.out.println(m.getPayload()))
                .get();
    }

}
```

191
### 出站通道适配器
dallascao's avatar
dallascao 已提交
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232

出站通道适配器由`MqttPahoMessageHandler`实现,它包装在`ConsumerEndpoint`中。为了方便起见,你可以使用名称空间来配置它。

从版本 4.1 开始,适配器支持异步发送操作,在确认交付之前避免阻塞。如果需要,可以发出应用程序事件以使应用程序能够确认交付。

下面的清单显示了出站通道适配器可用的属性:

```
<int-mqtt:outbound-channel-adapter id="withConverter"
    client-id="foo"  (1)
    url="tcp://localhost:1883"  (2)
    converter="myConverter"  (3)
    client-factory="clientFactory"  (4)
    default-qos="1"  (5)
    qos-expression="" (6)
    default-retained="true"  (7)
    retained-expression="" (8)
    default-topic="bar"  (9)
    topic-expression="" (10)
    async="false"  (11)
    async-events="false"  (12)
    channel="target" />
```

|**1** |客户 ID。|
|------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2** |代理 URL。|
|**3** |`MqttMessageConverter`(可选)。<br/>缺省的`DefaultPahoMessageConverter`可以识别以下标题:<br/><br/>*`mqtt_topic`:将消息发送到的主题<br/><br/>*`mqtt_retained`:`true`如果要保留消息<br/><><133"/>:服务质量<>|
|**4** |客户工厂。|
|**5** |默认的服务质量。<br/>如果没有找到`mqtt_qos`标头,或者`qos-expression`返回`null`,则使用它。<br/>如果你提供自定义`converter`,则不使用它。|
|**6** |一个表达式来求值以确定 QoS。<br/>默认值是`headers[mqtt_qos]`。|
|**7** |保留标志的默认值。<br/>如果没有`mqtt_retained`标头,则使用它。<br/>如果提供了自定义`converter`,则不使用它。|
|**8** |计算表达式以确定保留的布尔值。<br/>默认值是`headers[mqtt_retained]`。|
|**9** |消息发送到的默认主题(如果没有`mqtt_topic`头,则使用该主题)。|
|**10**|一个表达式来求值以确定目标主题。<br/>默认值是`headers['mqtt_topic']`。|
|**11**|当`true`时,调用者不会阻塞。<br/>相反,它会在发送消息时等待发送确认。<br/>缺省值是`false`(发送阻塞直到确认发送)。|
|**12**|当`async``async-events``true`时,会发出一个`MqttMessageSentEvent`(参见[Events](#mqtt-events))。<br/>它包含消息、主题、客户库生成的`messageId``clientId`,和`clientInstance`(每次连接客户机时递增)。<br/>当交付被客户库确认时,将发出一个`MqttMessageDeliveredEvent`<br/>它包含`messageId``clientId``clientInstance`,启用与发送相关的传递。<br/>任何`ApplicationListener`或事件入站通道适配器都可以接收到这些事件。<br/>注意,对于`MqttMessageDeliveredEvent`有可能在`MqttMessageSentEvent`之前接收到的<br/>默认值是`false`。|

|   |从版本 4.1 开始,可以省略 URL。<br/>相反,可以在`serverURIs``serverURIs`属性中提供服务器 URI。<br/>例如,这可以实现到高可用集群的连接。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

233
#### 使用 Java 配置进行配置
dallascao's avatar
dallascao 已提交
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286

下面的 Spring 引导应用程序展示了如何使用 Java 配置出站适配器的示例:

```
@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo");
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
        options.setUserName("username");
        options.setPassword("password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

    }

}
```

287
#### 使用 Java DSL 进行配置
dallascao's avatar
dallascao 已提交
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308

下面的 Spring 引导应用程序提供了一个使用 Java DSL 配置出站适配器的示例:

```
@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

   	@Bean
   	public IntegrationFlow mqttOutboundFlow() {
   	    return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
    }

}
```

309
### 事件
dallascao's avatar
dallascao 已提交
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330

某些应用程序事件由适配器发布。

* `MqttConnectionFailedEvent`-如果连接失败或随后丢失连接,则由两个适配器发布。

* `MqttMessageSentEvent`-如果以异步模式运行,则在消息已发送时由出站适配器发布。

* `MqttMessageDeliveredEvent`-如果以异步模式运行,则由出站适配器在客户端指示消息已被传递时发布。

* `MqttSubscribedEvent`-由入站适配器在订阅主题后发布。

这些事件可以通过`ApplicationListener<MqttIntegrationEvent>``@EventListener`方法接收。

要确定事件的源,请使用以下方法;你可以检查 Bean 名称和/或连接选项(以访问服务器 URI 等)。

```
MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();
```

331
### MQTT v5 支持
dallascao's avatar
dallascao 已提交
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384

从版本 5.5.5 开始,`spring-integration-mqtt`模块为 MQTV5 协议提供通道适配器实现。`org.eclipse.paho:org.eclipse.paho.mqttv5.client`是一个`optional`依赖项,因此必须显式地包含在目标项目中。

由于 MQTT V5 协议在 MQTT 消息中支持额外的任意属性,因此引入了`MqttHeaderMapper`实现,以在发布和接收操作时映射到/来自头。默认情况下(通过`*`模式),它映射所有接收到的`PUBLISH`帧属性(包括用户属性)。在出站方面,它映射了`PUBLISH`框架的这个子集的头:`contentType``mqtt_messageExpiryInterval``mqtt_responseTopic``mqtt_correlationData`

MQTT V5 协议的出站通道适配器以`Mqttv5PahoMessageHandler`的形式存在。它需要`clientId`和 MQTT 代理 URL 或`MqttConnectionOptions`引用。它支持`MqttClientPersistence`选项,可以是`async`,并且在那种情况下可以发出`MqttIntegrationEvent`对象(参见`asyncEvents`选项)。如果请求消息的有效负载是`org.eclipse.paho.mqttv5.common.MqttMessage`,则它将通过内部`IMqttAsyncClient`按原样发布。如果有效载荷是`byte[]`,则按原样用于发布目标`MqttMessage`有效载荷。如果有效负载是`String`,则将其转换为`byte[]`以进行发布。剩余的用例被委托给所提供的`MessageConverter`,这是来自应用程序上下文的`IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME``ConfigurableCompositeMessageConverter` Bean。注意:当请求的消息有效负载已经是`MqttMessage`时,将不使用所提供的`HeaderMapper<MqttProperties>`。下面的 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:

```
@Bean
public IntegrationFlow mqttOutFlow() {
    Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
    MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
    mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
    messageHandler.setHeaderMapper(mqttHeaderMapper);
    messageHandler.setAsync(true);
    messageHandler.setAsyncEvents(true);
    messageHandler.setConverter(mqttStringToBytesConverter());

    return f -> f.handle(messageHandler);
}
```

|   |`org.springframework.integration.mqtt.support.MqttMessageConverter`不能与`Mqttv5PahoMessageHandler`一起使用,因为它的契约仅针对 MQTV3 协议。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

有关更多信息,请参见`Mqttv5PahoMessageHandler`Javadocs 及其超类。

MQTT V5 协议的入站通道适配器以`Mqttv5PahoMessageDrivenChannelAdapter`的形式存在。它需要`clientId`和 MQTT 代理 URL 或`MqttConnectionOptions`引用,以及要订阅和使用的主题。它支持`MqttClientPersistence`选项,默认情况下是在内存中。可以配置预期的`payloadType`(默认情况下`byte[]`)并将其传播到所提供的`SmartMessageConverter`,用于从`byte[]`中转换所接收的`MqttMessage`。如果设置了`manualAck`选项,则在消息中添加一个`IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK`头,以生成`SimpleAcknowledgment`的实例。`HeaderMapper<MqttProperties>`用于将`PUBLISH`帧属性(包括用户属性)映射到目标消息头。标准的`MqttMessage`属性,如`qos``id``dup``retained`,加上接收到的主题总是映射到标题。有关更多信息,请参见`MqttHeaders`

下面的 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:

```
@Bean
public IntegrationFlow mqttInFlow() {
    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
    messageProducer.setPayloadType(String.class);
    messageProducer.setMessageConverter(mqttStringToBytesConverter());
    messageProducer.setManualAcks(true);

    return IntegrationFlows.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}
```

|   |`org.springframework.integration.mqtt.support.MqttMessageConverter`不能与`Mqttv5PahoMessageDrivenChannelAdapter`一起使用,因为其契约仅针对 MQTT v3 协议。|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

有关更多信息,请参见`Mqttv5PahoMessageDrivenChannelAdapter`Javadocs 及其超类。

|   |建议将`MqttConnectionOptions#setAutomaticReconnect(boolean)`设置为 true,以让内部`IMqttAsyncClient`实例处理重新连接。否则,只有手动重新启动这些通道适配器才能处理重新连接,例如通过`MqttConnectionFailedEvent`处理断开连接。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|