stomp.md 16.4 KB
Newer Older
dallascao's avatar
dallascao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
# Stomp 支持

## [](#stomp)stomp 支持

Spring 集成版本 4.2 引入了 STOMP(简单文本定向消息传递协议)客户端支持。它基于 Spring 框架的消息传递模块 STOMP 包中的体系结构、基础设施和 API。 Spring 集成使用 Spring Stomp 组件中的许多(例如`StompSession``StompClientSupport`)。有关更多信息,请参见 Spring 框架参考手册中的[Spring Framework STOMP Support](https://docs.spring.io/spring/docs/current/spring-framework-reference/web.html#websocket-stomp-client)章节。

你需要在项目中包含此依赖项:

Maven

```
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stomp</artifactId>
    <version>5.5.9</version>
</dependency>
```

Gradle

```
compile "org.springframework.integration:spring-integration-stomp:5.5.9"
```

对于服务器端组件,你需要添加`org.springframework:spring-websocket`和/或`io.projectreactor.netty:reactor-netty`依赖关系。

### [](#stomp-overview)概述

要配置 STOMP,你应该从 STOMP 客户机对象开始。 Spring 框架提供了以下实现方式:

* `WebSocketStompClient`:构建在 Spring  WebSocket API 之上,支持标准的 JSR-356 WebSocket、 Jetty 9,以及用于基于 HTTP 的 WebSocket SockJS 仿真的 SockJS 和 SockJS 客户端。

* `ReactorNettyTcpStompClient`:构建于`ReactorNettyTcpClient`项目的`reactor-netty`之上。

你可以提供任何其他`StompClientSupport`实现。参见这些类的[Javadoc](https://docs.spring.io/spring-integration/api/)

`StompClientSupport`类设计为*工厂*,以生成针对所提供的`StompSessionHandler``StompSession`,并通过*回调*`StompSessionHandler``StompSession`的抽象来完成所有剩余的工作。通过 Spring 集成*适配器*抽象,我们需要提供一些托管共享对象,以将我们的应用程序表示为具有唯一会话的 Stomp 客户机。为此, Spring 集成提供了`StompSessionManager`的抽象,以管理在任意提供的*单身*`StompSession`之间的`StompSessionHandler`。这允许对特定的 Stomp 代理使用*入站**出站*通道适配器(或两者)。有关更多信息,请参见`StompSessionManager`(及其实现)Javadocs。

### [](#stomp-inbound-adapter)Stomp 入站通道适配器

`StompInboundChannelAdapter`是一个一站式`MessageProducer`组件,它将你的 Spring 集成应用程序订阅到所提供的 stomp 目的地,并从它们接收消息(通过在连接的`StompSession`上使用所提供的`MessageConverter`从 stomp 帧转换)。通过在`StompInboundChannelAdapter`上使用适当的`@ManagedOperation`注释,你可以在运行时更改目的地(因此也可以更改 stomp 订阅)。

有关更多配置选项,请参见[Stomp 名称空间支持](#stomp-namespace)`StompInboundChannelAdapter`[Javadoc](https://docs.spring.io/spring-integration/api/org/springframework/integration/stomp/inbound/StompInboundChannelAdapter.html)

### [](#stomp-outbound-adapter)Stomp 出站通道适配器

`StompMessageHandler``MessageHandler``MessageHandler`,用于将传出的`Message<?>`实例通过`destination`发送到 stomp`destination`(在运行时通过 spel 表达式预先配置或确定)(由共享的`StompSession`提供)。

有关更多配置选项,请参见[Stomp 名称空间支持](#stomp-namespace)`StompMessageHandler`[Javadoc](https://docs.spring.io/spring-integration/api/org/springframework/integration/stomp/outbound/StompMessageHandler.html)

### [](#stomp-headers)Stomp 标头映射

STOMP 协议提供头作为其框架的一部分。Stomp 框架的整个结构具有以下格式:

```
....
COMMAND
header1:value1
header2:value2

Body^@
....
```

Spring Framework 提供`StompHeaders`来表示这些头。有关更多详细信息,请参见[Javadoc](https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/messaging/simp/stomp/StompHeaders.html)。Stomp 帧被转换为和转换为`Message<?>`实例,这些头被映射为和转换为`MessageHeaders`实例。 Spring 集成为 Stomp 适配器提供了一个默认的`HeaderMapper`实现。实现方式是`StompHeaderMapper`。它分别为入站适配器和出站适配器提供`fromHeaders()``toHeaders()`操作。

与许多其他 Spring 集成模块一样,引入了`IntegrationStompHeaders`类来将标准的 stomp 头映射到`MessageHeaders`,并以`stomp_`作为头名称前缀。此外,所有带有该前缀的`MessageHeaders`实例在发送到目标时都映射到`StompHeaders`

有关更多信息,请参见[Javadoc](https://docs.spring.io/spring-integration/api/)中的这些类和`mapped-headers`属性描述。

### [](#stomp-events)Stomp 集成事件

许多 STOMP 操作是异步的,包括错误处理。例如,Stomp 有一个`RECEIPT`服务器帧,当客户端帧通过添加`RECEIPT`头请求一个服务器帧时,它会返回这个服务器帧。为了提供对这些异步事件的访问, Spring 集成会发出`StompIntegrationEvent`实例,你可以通过实现`ApplicationListener`或使用`<int-event:inbound-channel-adapter>`(参见[Receiving Spring Application Events](./event.html#appevent-inbound))来获得这些实例。

具体地说,当`stompSessionListenableFuture`由于连接到 Stomp 代理失败而接收到`onFailure()`时,从`AbstractStompSessionManager`发出`StompExceptionEvent`。另一个例子是`StompMessageHandler`。它处理`ERROR`stomp 帧,这些帧是服务器对此`StompMessageHandler`发送的不正确(不接受)消息的响应。

在发送到`StompSession`的消息的异步答案中,`StompMessageHandler`发出`StompReceiptEvent`作为`StompSession.Receiptable`回调的一部分的`StompReceiptEvent``StompReceiptEvent`可以是正的,也可以是负的,这取决于`RECEIPT`帧是否在`receiptTimeLimit`期间从服务器接收,你可以在`StompClientSupport`实例上配置该帧。它默认为`15 * 1000`(以毫秒为单位,所以是 15 秒)。

|   |只有当要发送的消息的`RECEIPT`stomp 报头不是`null`时,才会添加`StompSession.Receiptable`回调。<br/>你可以分别通过其`StompSession`选项和`StompSessionManager`选项在`StompSession`报头上启用自动`RECEIPT`报头生成。|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

有关如何配置 Spring 集成以接受那些`ApplicationEvent`实例的详细信息,请参见[Stomp 适配器 Java 配置](#stomp-java-config)

### [](#stomp-java-config)Stomp 适配器 Java 配置

下面的示例展示了用于 Stomp 适配器的全面的 Java 配置:

```
@Configuration
@EnableIntegration
public class StompConfiguration {

    @Bean
    public ReactorNettyTcpStompClient stompClient() {
        ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
        stompClient.setMessageConverter(new PassThruMessageConverter());
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.afterPropertiesSet();
        stompClient.setTaskScheduler(taskScheduler);
        stompClient.setReceiptTimeLimit(5000);
        return stompClient;
    }

    @Bean
    public StompSessionManager stompSessionManager() {
        ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
        stompSessionManager.setAutoReceipt(true);
        return stompSessionManager;
    }

    @Bean
    public PollableChannel stompInputChannel() {
        return new QueueChannel();
    }

    @Bean
    public StompInboundChannelAdapter stompInboundChannelAdapter() {
        StompInboundChannelAdapter adapter =
        		new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
        adapter.setOutputChannel(stompInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "stompOutputChannel")
    public MessageHandler stompMessageHandler() {
        StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
        handler.setDestination("/topic/myTopic");
        return handler;
    }

    @Bean
    public PollableChannel stompEvents() {
        return new QueueChannel();
    }

    @Bean
    public ApplicationListener<ApplicationEvent> stompEventListener() {
        ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
        producer.setEventTypes(StompIntegrationEvent.class);
        producer.setOutputChannel(stompEvents());
        return producer;
    }

}
```

### [](#stomp-namespace)Stomp 名称空间支持

Spring 集成 Stomp 命名空间实现了入站和出站通道适配器组件。要将其包含在配置中,请在应用程序上下文配置文件中提供以下名称空间声明:

```
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/stomp
    https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
    ...
</beans>
```

#### [](#stomp-outbound-channel-adapter)理解`<int-stomp:outbound-channel-adapter>`元素

下面的清单显示了 STOMP 出站通道适配器的可用属性:

```
<int-stomp:outbound-channel-adapter
                           id=""                      (1)
                           channel=""                 (2)
                           stomp-session-manager=""   (3)
                           header-mapper=""           (4)
                           mapped-headers=""          (5)
                           destination=""             (6)
                           destination-expression=""  (7)
                           auto-startup=""            (8)
                           phase=""/>                 (9)
```

|**1**|组件 Bean 名.<br/>`MessageHandler``id`的 Bean 别名注册为`id`加上`.handler`<br/>如果不设置`channel`属性,在应用程序上下文中创建并注册了一个`DirectChannel`,并将这个`id`属性的值作为 Bean 名称。<br/>在这种情况下,端点以 Bean 名称`id`加上`.adapter`注册。|
|-----|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2**|如果`id`存在,则标识连接到此适配器的通道。<br/>参见`id`<br/>可选。|
|**3**|引用`StompSessionManager` Bean,它封装了低级连接和`StompSession`处理操作。<br/>需要。|
|**4**|引用一个实现`HeaderMapper<StompHeaders>`的 Bean,它将 Spring 集成`MessageHeaders`映射到和从<br/>stomp 帧头映射。<br/>它与`mapped-headers`是互斥的。<br/>它默认为`StompHeaderMapper`。|
|**5**|用逗号分隔的要映射到 Stomp 框架标头的 Stomp 标头的名称列表。<br/>只能提供如果`header-mapper`引用未设置。<br/>此列表中的值也可以是简单的模式,以与标题名称进行匹配(例如`myheader*``*myheader`)。<br/>一个特殊的标记(`STOMP_OUTBOUND_HEADERS`)表示所有标准的 stomp 标题(内容长度、收据、心跳),<br/>它们是默认包含的。<br/>如果你想添加自己的标头并希望同时映射标准标头,则必须包含该令牌或通过使用`header-mapper`提供自己的`HeaderMapper`实现。|
|**6**|STOMP 消息发送到的目的地的名称。<br/>它与`destination-expression`是互斥的。|
|**7**|在运行时针对每个 Spring 积分`Message`作为根对象求值的 SPEL 表达式。<br/>它与`destination`是互斥的。|
|**8**|布尔值,表示此端点是否应自动启动。<br/>默认为`true`。|
|**9**|这个端点应该在其中开始和停止的生命周期阶段。<br/>值越低,这个端点开始得越早,停止得越晚。<br/>默认值是`Integer.MIN_VALUE`<br/>值可以是负数。<br/>参见[`SmartLifeCycle`(https:/DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/org/api/context/smartlifycle.html)。|

#### [](#understanding-the-int-stompinbound-channel-adapter-element)理解`<int-stomp:inbound-channel-adapter>`元素

下面的清单显示了 STOMP 入站通道适配器的可用属性:

```
<int-stomp:inbound-channel-adapter
                           id=""                     (1)
                           channel=""                (2)
                           error-channel=""          (3)
                           stomp-session-manager=""  (4)
                           header-mapper=""          (5)
                           mapped-headers=""         (6)
                           destinations=""           (7)
                           send-timeout=""           (8)
                           payload-type=""           (9)
                           auto-startup=""           (10)
                           phase=""/>                (11)
```

|**1** |组件 Bean name.<br/>如果不设置`channel`属性,将在应用程序上下文中创建并注册一个`DirectChannel`,并将这个`id`属性的值作为 Bean 名称。<br/>在这种情况下,端点以 Bean 名`id`加上`.adapter`注册。|
|------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2** |标识连接到此适配器的通道。|
|**3** |应该发送`ErrorMessage`实例的`MessageChannel` Bean 引用。|
|**4** |请参阅[`<int-stomp:outbound-channel-adapter>`](#stomp-出站-通道-适配器)上的相同选项。|
|**5** |用逗号分隔的要从 stomp 帧头映射的 stomp 头的名称列表。<br/>只有在`header-mapper`引用未设置的情况下,才能提供此选项。<br/>此列表中的值也可以是与头名称匹配的简单模式(例如,`myheader*``*myheader`)。<br/>特殊令牌(`STOMP_INBOUND_HEADERS`)表示所有标准的 Stomp 标头(内容长度、收据、心跳等)。<br/>默认情况下,它们都包括在内。<br/>如果你想添加自己的标头,并希望同时映射标准标头,你还必须包括这个令牌,或者使用`header-mapper`提供你自己的`HeaderMapper`实现。|
|**6** |请参阅[`<int-stomp:outbound-channel-adapter>`](#stomp-出站-通道-适配器)上的相同选项。|
|**7** |用逗号分隔的要订阅的目标名称列表。<br/>可以在运行时通过`addDestination()``removeDestination()``@ManagedOperation`注释修改目标(因此也是订阅)列表。|
|**8** |如果信道可以阻塞,则在向信道发送消息时等待的最大时间量(以毫秒为单位)。<br/>例如,如果已达到其最大容量,则`QueueChannel`可以阻塞直到可用空间。|
|**9** |从传入的 Stomp 帧转换的目标`payload`的完全限定的 Java 类型名称。<br/>它默认为`String.class`。|
|**10**|请参阅[`<int-stomp:outbound-channel-adapter>`](#stomp-出站-通道-适配器)上的相同选项。|
|**11**|请参阅[`<int-stomp:outbound-channel-adapter>`](#stomp-出站-通道-适配器)上的相同选项。|