# Redis 支持
## Redis 支持
Spring Integration2.1 引入了对[Redis](https://redis.io/)的支持:“一个开源的高级键值存储”。这种支持以基于 Redis 的`MessageStore`以及发布-订阅消息适配器的形式出现,Redis 通过其[`PUBLISH`,`SUBSCRIBE`和`UNSUBSCRIBE`](https://redis.io/topics/pubsub)命令支持这些消息适配器。
你需要在项目中包含此依赖项:
Maven
```
org.springframework.integration
spring-integration-redis
5.5.9
```
Gradle
```
compile "org.springframework.integration:spring-integration-redis:5.5.9"
```
你还需要包括 Redis 客户机依赖项,例如 Lettuce。
要下载、安装和运行 Redis,请参见[Redis 文档](https://redis.io/download)。
### 连接到 Redis
要开始与 Redis 交互,你首先需要连接到它。 Spring 集成使用另一个 Spring 项目[Spring Data Redis](https://github.com/SpringSource/spring-data-redis)提供的支持,该项目提供了典型的 Spring 构造:`ConnectionFactory`和`Template`。这些抽象简化了与多个 Redis 客户机 Java API 的集成。目前 Spring 数据 Redis 支持[Jedis](https://github.com/xetorthio/jedis)和[Lettuce](https://lettuce.io/)。
#### 使用`RedisConnectionFactory`
要连接到 Redis,可以使用`RedisConnectionFactory`接口的一种实现。下面的清单显示了接口定义:
```
public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
/**
* Provides a suitable connection for interacting with Redis.
* @return connection for interacting with Redis.
*/
RedisConnection getConnection();
}
```
下面的示例展示了如何在 Java 中创建`LettuceConnectionFactory`:
```
LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();
```
下面的示例展示了如何在 Spring 的 XML 配置中创建`LettuceConnectionFactory`:
```
```
`RedisConnectionFactory`的实现提供了一组属性,例如端口和主机,你可以在需要时对其进行设置。一旦有了`RedisConnectionFactory`的实例,就可以创建`RedisTemplate`的实例,并将其注入`RedisConnectionFactory`。
#### 使用`RedisTemplate`
与 Spring 中的其他模板类(例如`JdbcTemplate`和`JmsTemplate`)一样,`RedisTemplate`是一个帮助类,它简化了 Redis 数据访问代码。有关`RedisTemplate`及其变体(例如`StringRedisTemplate`)的更多信息,请参见[Spring Data Redis documentation](https://docs.spring.io/spring-data/data-redis/docs/current/reference/html/)。
下面的示例展示了如何在 Java 中创建`RedisTemplate`实例:
```
RedisTemplate rt = new RedisTemplate();
rt.setConnectionFactory(redisConnectionFactory);
```
下面的示例展示了如何在 Spring 的 XML 配置中创建`RedisTemplate`实例:
```
```
### 使用 Redis 进行消息传递
正如[导言](#redis)中提到的,Redis 通过其`PUBLISH`、`SUBSCRIBE`和`UNSUBSCRIBE`命令提供对发布-订阅消息的支持。 Spring 与 JMS 和 AMQP 一样,集成为通过 Redis 发送和接收消息提供了消息通道和适配器。
#### Redis 发布/订阅频道
与 JMS 类似,在某些情况下,生产者和消费者都是同一个应用程序的一部分,在同一个流程中运行。你可以通过使用一对入站和出站通道适配器来实现这一点。然而,与 Spring Integration 的 JMS 支持一样,有一种更简单的方法来解决这个用例。你可以创建一个发布-订阅通道,如下例所示:
```
```
`publish-subscribe-channel`的行为很像来自主 Spring 集成名称空间的正常``元素。任何端点的`input-channel`和`output-channel`属性都可以引用它。不同之处在于,该通道由一个 Redis 主题名称支持:由`topic-name`属性指定的`String`值。然而,与 JMS 不同的是,这个主题不必预先创建,甚至不必由 Redis 自动创建。在 Redis 中,主题是起地址作用的简单`String`值。生产者和消费者可以通过使用相同的`String`值作为其主题名称进行通信。对此通道的简单订阅意味着在产生端点和使用端点之间可以进行异步发布-订阅消息传递。然而,与通过在简单的 Spring 集成``元素中添加``元素创建的异步消息通道不同,消息不存储在内存队列中。相反,这些消息是通过 Redis 传递的,它让你依赖于它对持久性和集群的支持,以及它与其他非 Java 平台的互操作性。
#### Redis 入站通道适配器
Redis 入站通道适配器(`RedisInboundChannelAdapter`)以与其他入站适配器相同的方式将传入的 Redis 消息调整为 Spring 消息。它接收特定于平台的消息(本例中为 Redis),并使用`MessageConverter`策略将它们转换为 Spring 消息。下面的示例展示了如何配置 Redis 入站通道适配器:
```
```
前面的示例展示了 Redis 入站通道适配器的简单但完整的配置。请注意,前面的配置依赖于熟悉的自动发现某些 bean 的范例 Spring。在这种情况下,`redisConnectionFactory`被隐式地注入到适配器中。你可以使用`connection-factory`属性来显式地指定它。
另外,请注意,前面的配置向适配器注入了自定义`MessageConverter`。这种方法类似于 JMS,其中`MessageConverter`实例用于在 Redis 消息和 Spring 集成消息有效负载之间进行转换。默认值是`SimpleMessageConverter`。
入站适配器可以订阅多个主题名称,因此在`topics`属性中使用逗号分隔的值集。
自 3.0 版本以来,入站适配器除了现有的`topics`属性外,现在还具有`topic-patterns`属性。此属性包含一组以逗号分隔的 Redis 主题模式。有关 Redis 发布-订阅的更多信息,请参见[Redis Pub/Sub](https://redis.io/topics/pubsub)。
入站适配器可以使用`RedisSerializer`来反序列化 Redis 消息体。可以将``属性的`serializer`属性设置为一个空字符串,这将为`null`属性生成一个`null`值。在这种情况下,REDIS 消息的 RAW`byte[]`主体被提供为消息有效负载。
从版本 5.0 开始,你可以使用``的`task-executor`属性向入站适配器提供`Executor`实例。另外,接收到的 Spring 集成消息现在具有`RedisHeaders.MESSAGE_SOURCE`头,以指示已发布消息的源:topic 或 pattern。你可以将此下游用于路由逻辑。
#### Redis 出站通道适配器
Redis 出站通道适配器以与其他出站适配器相同的方式将出站 Spring 集成消息调整为 Redis 消息。它接收 Spring 集成消息,并通过使用`MessageConverter`策略将它们转换为特定于平台的消息(在本例中为 Redis)。下面的示例展示了如何配置 Redis 出站通道适配器:
```
```
该配置与 Redis 入站通道适配器并行。适配器隐式地注入了一个`RedisConnectionFactory`,它以`redisConnectionFactory`作为其 Bean 名称。这个示例还包括可选的(和自定义的)`MessageConverter`(`testConverter` Bean)。
Spring Integration3.0 之后,``提供了`topic`属性的替代方案:你可以使用`topic-expression`属性在运行时确定消息的 Redis 主题。这些属性是相互排斥的。
#### Redis 队列入站通道适配器
Spring Integration3.0 引入了队列入站通道适配器,以从 Redis 列表中“弹出”消息。默认情况下,它使用“右 POP”,但你可以将其配置为使用“左 POP”。适配器是消息驱动的。它使用内部侦听器线程,而不使用 Poller。
下面的清单显示了`queue-inbound-channel-adapter`的所有可用属性:
```
(13)
```
|**1** |组件 Bean name.
如果不提供`channel`属性,则在应用程序上下文中创建并注册一个`DirectChannel`,并将该`id`属性作为 Bean 名称。
在这种情况下,端点本身以 Bean 名称`id`加上`.adapter`注册。
(如果 Bean 名称是`thing1`,则端点注册为`thing1.adapter`。)|
|------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2** |从该端点向其发送`MessageChannel`实例的`Message`实例。|
|**3** |一个`SmartLifecycle`属性,用于指定此端点是否应在应用程序上下文启动后自动启动。
它默认为`true`。|
|**4** |一个`SmartLifecycle`属性来指定这个端点启动的阶段。
它默认为`0`。|
|**5** |对`RedisConnectionFactory` Bean.
的引用默认为`redisConnectionFactory`。|
|**6** |执行基于队列的“POP”操作以获取 Redis 消息的 Redis 列表的名称。|
|**7** |当从端点的侦听任务接收到异常时,向其发送`MessageChannel`实例。
默认情况下,底层`MessagePublishingErrorHandler`使用应用程序上下文中的默认`errorChannel`。|
|**8** |`RedisSerializer` Bean 引用。
它可以是一个空字符串,这意味着’没有序列化器’。
在这种情况下,来自入站 Redis 消息的原始`byte[]`被发送到`channel`作为`Message`有效载荷。
默认情况下它是`JdkSerializationRedisSerializer`。|
|**9** |“pop”操作等待来自队列的 Redis 消息的超时(以毫秒为单位)。
默认值为 1 秒。|
|**10**|侦听器任务在“pop”操作出现异常后,在重新启动侦听器任务之前,应该睡眠的时间,以毫秒为单位。|
|**11**|指定此端点是否期望来自 Redis 队列的数据包含整个`Message`实例。
如果将此属性设置为`true`,则`serializer`不能是空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。
它的默认值是`false`。|
|**12**|对 Spring `TaskExecutor`(或标准 JDK1.5+`Executor`) Bean 的引用。
它用于底层监听任务。
它默认为`SimpleAsyncTaskExecutor`。|
|**13**|指定此端点是否应该使用“右 POP”(当`true`时)或“左 POP”(当`false`时)从 Redis 列表中读取消息,
如果`true`,当与默认的 Redis 队列出站通道适配器一起使用时,Redis 列表充当`FIFO`队列。
将其设置为`false`以便与软件一起使用它以“右推”或实现堆栈式消息顺序写入列表。
它的默认值是`true`。
自版本 4.3 起。|
| |`task-executor`必须配置多个线程进行处理;否则,当`RedisQueueMessageDrivenEndpoint`在出现错误后试图重新启动侦听器任务时,可能会出现死锁,
`errorChannel`可以用来处理这些错误,以避免重新启动,但最好不要将你的应用程序暴露在可能的死锁情况下。
有关可能的`TaskExecutor`实现,请参见 Spring Framework[参考手册](https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#scheduling-task-executor-types)。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
#### Redis 队列出站通道适配器
Spring Integration3.0 引入了队列出站通道适配器,以从 Spring Integration 消息“推送”到 Redis 列表。默认情况下,它使用“左推”,但你可以将其配置为使用“右推”。下面的清单显示了 Redis`queue-outbound-channel-adapter`的所有可用属性:
```
(8)
```
|**1**|组件 Bean name.
如果你不提供`channel`属性,那么将在应用程序上下文中创建并注册一个`DirectChannel`,并将这个`id`属性作为 Bean 名称。
在这种情况下,端点以 Bean 名`id`加上`.adapter`注册。
(如果 Bean 名是`thing1`,则端点注册为`thing1.adapter`。)|
|-----|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2**|该端点接收`Message`实例的`MessageChannel`实例。|
|**3**|对`RedisConnectionFactory` Bean.
的引用默认为`redisConnectionFactory`。|
|**4**|用于执行基于队列的“推送”操作以发送 Redis 消息的 Redis 列表的名称。
此属性与`queue-expression`互斥。|
|**5**|一个 spel`Expression`来确定 Redis 列表的名称。
它在运行时使用传入的`Message`作为`#root`变量。
此属性与`queue`互斥。|
|**6**|a`RedisSerializer` Bean 引用。
它默认为 a`JdkSerializationRedisSerializer`。
但是,对于`String`有效载荷,如果不提供`serializer`引用,则使用`StringRedisSerializer`。|
|**7**|指定这个端点应该只向 Redis 队列发送有效负载还是整个`Message`。
它默认为`true`。|
|**8**|指定此端点是否应该使用“左推”(当`true`时)或“右推”(当`false`时)将消息写入 Redis 列表,
如果`true`,当与默认的 Redis 队列入站通道适配器一起使用时,Redis 列表充当`FIFO`队列。
将其设置为`false`以便与软件一起使用从列表中读取带有“left pop”或实现堆栈式消息顺序。
它默认为`true`。
自版本 4.3 起。|
#### Redis 应用程序事件
Spring Integration3.0 以来,Redis 模块提供了`IntegrationEvent`的实现方式,这反过来是`org.springframework.context.ApplicationEvent`。`RedisExceptionEvent`封装了来自 Redis 操作的异常(端点是事件的“源”)。例如,``在捕获来自`BoundListOperations.rightPop`操作的异常后会发出这些事件。例外情况可以是任何通用的`org.springframework.data.redis.RedisSystemException`或`org.springframework.data.redis.RedisConnectionFailureException`。用``处理这些事件对于确定后台 Redis 任务的问题和采取管理操作是有用的。
### Redis 消息存储
正如*Enterprise 整合模式*一书中所描述的,[消息存储](https://www.enterpriseintegrationpatterns.com/MessageStore.html)允许你持久化消息。在处理具有缓冲消息能力的组件(聚合器、重排序程序和其他组件)时,当可靠性受到关注时,这可能是有用的。在 Spring 集成中,`MessageStore`策略还为[索赔检查](https://www.enterpriseintegrationpatterns.com/StoreInLibrary.html)模式提供了基础,这也在 EIP 中进行了描述。
Spring Integration 的 Redis 模块提供`RedisMessageStore`。下面的示例展示了如何在聚合器中使用它:
```
```
前面的示例是 Bean 配置,它需要一个`RedisConnectionFactory`作为构造函数参数。
默认情况下,`RedisMessageStore`使用 Java 序列化来序列化消息。但是,如果你希望使用不同的序列化技术(例如 JSON),则可以通过设置`RedisMessageStore`的`valueSerializer`属性来提供自己的序列化器。
从版本 4.3.10 开始,该框架分别为`Message`实例和`MessageHeaders`实例——`MessageJacksonDeserializer`和`MessageHeadersJacksonSerializer`实例提供了 Jackson 序列化器和反序列化器实现。它们必须配置`SimpleModule``ObjectMapper`选项。此外,你应该在`ObjectMapper`上设置`enableDefaultTyping`,以便为每个序列化的复杂对象添加类型信息(如果你信任源)。然后在反序列化过程中使用该类型信息。该框架提供了一种名为`JacksonJsonUtils.messagingAwareMapper()`的实用方法,该方法已经提供了前面提到的所有属性和序列化器。此实用程序方法带有`trustedPackages`参数,用于限制用于反序列化的 Java 包,以避免安全漏洞。默认受信任的包:`java.util`,`java.lang`,`org.springframework.messaging.support`,`org.springframework.integration.support`,`org.springframework.integration.message`,`org.springframework.integration.store`。要在`RedisMessageStore`中管理 JSON 序列化,你必须以类似于以下示例的方式对其进行配置:
```
RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer