# Redis 支持 ## Redis 支持 Spring Integration2.1 引入了对[Redis](https://redis.io/)的支持:“一个开源的高级键值存储”。这种支持以基于 Redis 的`MessageStore`以及发布-订阅消息适配器的形式出现,Redis 通过其[`PUBLISH`,`SUBSCRIBE`和`UNSUBSCRIBE`](https://redis.io/topics/pubsub)命令支持这些消息适配器。 你需要在项目中包含此依赖项: Maven ``` org.springframework.integration spring-integration-redis 5.5.9 ``` Gradle ``` compile "org.springframework.integration:spring-integration-redis:5.5.9" ``` 你还需要包括 Redis 客户机依赖项,例如 Lettuce。 要下载、安装和运行 Redis,请参见[Redis 文档](https://redis.io/download)。 ### 连接到 Redis 要开始与 Redis 交互,你首先需要连接到它。 Spring 集成使用另一个 Spring 项目[Spring Data Redis](https://github.com/SpringSource/spring-data-redis)提供的支持,该项目提供了典型的 Spring 构造:`ConnectionFactory`和`Template`。这些抽象简化了与多个 Redis 客户机 Java API 的集成。目前 Spring 数据 Redis 支持[Jedis](https://github.com/xetorthio/jedis)和[Lettuce](https://lettuce.io/)。 #### 使用`RedisConnectionFactory` 要连接到 Redis,可以使用`RedisConnectionFactory`接口的一种实现。下面的清单显示了接口定义: ``` public interface RedisConnectionFactory extends PersistenceExceptionTranslator { /** * Provides a suitable connection for interacting with Redis. * @return connection for interacting with Redis. */ RedisConnection getConnection(); } ``` 下面的示例展示了如何在 Java 中创建`LettuceConnectionFactory`: ``` LettuceConnectionFactory cf = new LettuceConnectionFactory(); cf.afterPropertiesSet(); ``` 下面的示例展示了如何在 Spring 的 XML 配置中创建`LettuceConnectionFactory`: ``` ``` `RedisConnectionFactory`的实现提供了一组属性,例如端口和主机,你可以在需要时对其进行设置。一旦有了`RedisConnectionFactory`的实例,就可以创建`RedisTemplate`的实例,并将其注入`RedisConnectionFactory`。 #### 使用`RedisTemplate` 与 Spring 中的其他模板类(例如`JdbcTemplate`和`JmsTemplate`)一样,`RedisTemplate`是一个帮助类,它简化了 Redis 数据访问代码。有关`RedisTemplate`及其变体(例如`StringRedisTemplate`)的更多信息,请参见[Spring Data Redis documentation](https://docs.spring.io/spring-data/data-redis/docs/current/reference/html/)。 下面的示例展示了如何在 Java 中创建`RedisTemplate`实例: ``` RedisTemplate rt = new RedisTemplate(); rt.setConnectionFactory(redisConnectionFactory); ``` 下面的示例展示了如何在 Spring 的 XML 配置中创建`RedisTemplate`实例: ``` ``` ### 使用 Redis 进行消息传递 正如[导言](#redis)中提到的,Redis 通过其`PUBLISH`、`SUBSCRIBE`和`UNSUBSCRIBE`命令提供对发布-订阅消息的支持。 Spring 与 JMS 和 AMQP 一样,集成为通过 Redis 发送和接收消息提供了消息通道和适配器。 #### Redis 发布/订阅频道 与 JMS 类似,在某些情况下,生产者和消费者都是同一个应用程序的一部分,在同一个流程中运行。你可以通过使用一对入站和出站通道适配器来实现这一点。然而,与 Spring Integration 的 JMS 支持一样,有一种更简单的方法来解决这个用例。你可以创建一个发布-订阅通道,如下例所示: ``` ``` `publish-subscribe-channel`的行为很像来自主 Spring 集成名称空间的正常``元素。任何端点的`input-channel`和`output-channel`属性都可以引用它。不同之处在于,该通道由一个 Redis 主题名称支持:由`topic-name`属性指定的`String`值。然而,与 JMS 不同的是,这个主题不必预先创建,甚至不必由 Redis 自动创建。在 Redis 中,主题是起地址作用的简单`String`值。生产者和消费者可以通过使用相同的`String`值作为其主题名称进行通信。对此通道的简单订阅意味着在产生端点和使用端点之间可以进行异步发布-订阅消息传递。然而,与通过在简单的 Spring 集成``元素中添加``元素创建的异步消息通道不同,消息不存储在内存队列中。相反,这些消息是通过 Redis 传递的,它让你依赖于它对持久性和集群的支持,以及它与其他非 Java 平台的互操作性。 #### Redis 入站通道适配器 Redis 入站通道适配器(`RedisInboundChannelAdapter`)以与其他入站适配器相同的方式将传入的 Redis 消息调整为 Spring 消息。它接收特定于平台的消息(本例中为 Redis),并使用`MessageConverter`策略将它们转换为 Spring 消息。下面的示例展示了如何配置 Redis 入站通道适配器: ``` ``` 前面的示例展示了 Redis 入站通道适配器的简单但完整的配置。请注意,前面的配置依赖于熟悉的自动发现某些 bean 的范例 Spring。在这种情况下,`redisConnectionFactory`被隐式地注入到适配器中。你可以使用`connection-factory`属性来显式地指定它。 另外,请注意,前面的配置向适配器注入了自定义`MessageConverter`。这种方法类似于 JMS,其中`MessageConverter`实例用于在 Redis 消息和 Spring 集成消息有效负载之间进行转换。默认值是`SimpleMessageConverter`。 入站适配器可以订阅多个主题名称,因此在`topics`属性中使用逗号分隔的值集。 自 3.0 版本以来,入站适配器除了现有的`topics`属性外,现在还具有`topic-patterns`属性。此属性包含一组以逗号分隔的 Redis 主题模式。有关 Redis 发布-订阅的更多信息,请参见[Redis Pub/Sub](https://redis.io/topics/pubsub)。 入站适配器可以使用`RedisSerializer`来反序列化 Redis 消息体。可以将``属性的`serializer`属性设置为一个空字符串,这将为`null`属性生成一个`null`值。在这种情况下,REDIS 消息的 RAW`byte[]`主体被提供为消息有效负载。 从版本 5.0 开始,你可以使用``的`task-executor`属性向入站适配器提供`Executor`实例。另外,接收到的 Spring 集成消息现在具有`RedisHeaders.MESSAGE_SOURCE`头,以指示已发布消息的源:topic 或 pattern。你可以将此下游用于路由逻辑。 #### Redis 出站通道适配器 Redis 出站通道适配器以与其他出站适配器相同的方式将出站 Spring 集成消息调整为 Redis 消息。它接收 Spring 集成消息,并通过使用`MessageConverter`策略将它们转换为特定于平台的消息(在本例中为 Redis)。下面的示例展示了如何配置 Redis 出站通道适配器: ``` ``` 该配置与 Redis 入站通道适配器并行。适配器隐式地注入了一个`RedisConnectionFactory`,它以`redisConnectionFactory`作为其 Bean 名称。这个示例还包括可选的(和自定义的)`MessageConverter`(`testConverter` Bean)。 Spring Integration3.0 之后,``提供了`topic`属性的替代方案:你可以使用`topic-expression`属性在运行时确定消息的 Redis 主题。这些属性是相互排斥的。 #### Redis 队列入站通道适配器 Spring Integration3.0 引入了队列入站通道适配器,以从 Redis 列表中“弹出”消息。默认情况下,它使用“右 POP”,但你可以将其配置为使用“左 POP”。适配器是消息驱动的。它使用内部侦听器线程,而不使用 Poller。 下面的清单显示了`queue-inbound-channel-adapter`的所有可用属性: ``` (13) ``` |**1** |组件 Bean name.
如果不提供`channel`属性,则在应用程序上下文中创建并注册一个`DirectChannel`,并将该`id`属性作为 Bean 名称。
在这种情况下,端点本身以 Bean 名称`id`加上`.adapter`注册。
(如果 Bean 名称是`thing1`,则端点注册为`thing1.adapter`。)| |------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |**2** |从该端点向其发送`MessageChannel`实例的`Message`实例。| |**3** |一个`SmartLifecycle`属性,用于指定此端点是否应在应用程序上下文启动后自动启动。
它默认为`true`。| |**4** |一个`SmartLifecycle`属性来指定这个端点启动的阶段。
它默认为`0`。| |**5** |对`RedisConnectionFactory` Bean.
的引用默认为`redisConnectionFactory`。| |**6** |执行基于队列的“POP”操作以获取 Redis 消息的 Redis 列表的名称。| |**7** |当从端点的侦听任务接收到异常时,向其发送`MessageChannel`实例。
默认情况下,底层`MessagePublishingErrorHandler`使用应用程序上下文中的默认`errorChannel`。| |**8** |`RedisSerializer` Bean 引用。
它可以是一个空字符串,这意味着’没有序列化器’。
在这种情况下,来自入站 Redis 消息的原始`byte[]`被发送到`channel`作为`Message`有效载荷。
默认情况下它是`JdkSerializationRedisSerializer`。| |**9** |“pop”操作等待来自队列的 Redis 消息的超时(以毫秒为单位)。
默认值为 1 秒。| |**10**|侦听器任务在“pop”操作出现异常后,在重新启动侦听器任务之前,应该睡眠的时间,以毫秒为单位。| |**11**|指定此端点是否期望来自 Redis 队列的数据包含整个`Message`实例。
如果将此属性设置为`true`,则`serializer`不能是空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。
它的默认值是`false`。| |**12**|对 Spring `TaskExecutor`(或标准 JDK1.5+`Executor`) Bean 的引用。
它用于底层监听任务。
它默认为`SimpleAsyncTaskExecutor`。| |**13**|指定此端点是否应该使用“右 POP”(当`true`时)或“左 POP”(当`false`时)从 Redis 列表中读取消息,
如果`true`,当与默认的 Redis 队列出站通道适配器一起使用时,Redis 列表充当`FIFO`队列。
将其设置为`false`以便与软件一起使用它以“右推”或实现堆栈式消息顺序写入列表。
它的默认值是`true`。
自版本 4.3 起。| | |`task-executor`必须配置多个线程进行处理;否则,当`RedisQueueMessageDrivenEndpoint`在出现错误后试图重新启动侦听器任务时,可能会出现死锁,
`errorChannel`可以用来处理这些错误,以避免重新启动,但最好不要将你的应用程序暴露在可能的死锁情况下。
有关可能的`TaskExecutor`实现,请参见 Spring Framework[参考手册](https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#scheduling-task-executor-types)。| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### Redis 队列出站通道适配器 Spring Integration3.0 引入了队列出站通道适配器,以从 Spring Integration 消息“推送”到 Redis 列表。默认情况下,它使用“左推”,但你可以将其配置为使用“右推”。下面的清单显示了 Redis`queue-outbound-channel-adapter`的所有可用属性: ``` (8) ``` |**1**|组件 Bean name.
如果你不提供`channel`属性,那么将在应用程序上下文中创建并注册一个`DirectChannel`,并将这个`id`属性作为 Bean 名称。
在这种情况下,端点以 Bean 名`id`加上`.adapter`注册。
(如果 Bean 名是`thing1`,则端点注册为`thing1.adapter`。)| |-----|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |**2**|该端点接收`Message`实例的`MessageChannel`实例。| |**3**|对`RedisConnectionFactory` Bean.
的引用默认为`redisConnectionFactory`。| |**4**|用于执行基于队列的“推送”操作以发送 Redis 消息的 Redis 列表的名称。
此属性与`queue-expression`互斥。| |**5**|一个 spel`Expression`来确定 Redis 列表的名称。
它在运行时使用传入的`Message`作为`#root`变量。
此属性与`queue`互斥。| |**6**|a`RedisSerializer` Bean 引用。
它默认为 a`JdkSerializationRedisSerializer`。
但是,对于`String`有效载荷,如果不提供`serializer`引用,则使用`StringRedisSerializer`。| |**7**|指定这个端点应该只向 Redis 队列发送有效负载还是整个`Message`。
它默认为`true`。| |**8**|指定此端点是否应该使用“左推”(当`true`时)或“右推”(当`false`时)将消息写入 Redis 列表,
如果`true`,当与默认的 Redis 队列入站通道适配器一起使用时,Redis 列表充当`FIFO`队列。
将其设置为`false`以便与软件一起使用从列表中读取带有“left pop”或实现堆栈式消息顺序。
它默认为`true`。
自版本 4.3 起。| #### Redis 应用程序事件 Spring Integration3.0 以来,Redis 模块提供了`IntegrationEvent`的实现方式,这反过来是`org.springframework.context.ApplicationEvent`。`RedisExceptionEvent`封装了来自 Redis 操作的异常(端点是事件的“源”)。例如,``在捕获来自`BoundListOperations.rightPop`操作的异常后会发出这些事件。例外情况可以是任何通用的`org.springframework.data.redis.RedisSystemException`或`org.springframework.data.redis.RedisConnectionFailureException`。用``处理这些事件对于确定后台 Redis 任务的问题和采取管理操作是有用的。 ### Redis 消息存储 正如*Enterprise 整合模式*一书中所描述的,[消息存储](https://www.enterpriseintegrationpatterns.com/MessageStore.html)允许你持久化消息。在处理具有缓冲消息能力的组件(聚合器、重排序程序和其他组件)时,当可靠性受到关注时,这可能是有用的。在 Spring 集成中,`MessageStore`策略还为[索赔检查](https://www.enterpriseintegrationpatterns.com/StoreInLibrary.html)模式提供了基础,这也在 EIP 中进行了描述。 Spring Integration 的 Redis 模块提供`RedisMessageStore`。下面的示例展示了如何在聚合器中使用它: ``` ``` 前面的示例是 Bean 配置,它需要一个`RedisConnectionFactory`作为构造函数参数。 默认情况下,`RedisMessageStore`使用 Java 序列化来序列化消息。但是,如果你希望使用不同的序列化技术(例如 JSON),则可以通过设置`RedisMessageStore`的`valueSerializer`属性来提供自己的序列化器。 从版本 4.3.10 开始,该框架分别为`Message`实例和`MessageHeaders`实例——`MessageJacksonDeserializer`和`MessageHeadersJacksonSerializer`实例提供了 Jackson 序列化器和反序列化器实现。它们必须配置`SimpleModule``ObjectMapper`选项。此外,你应该在`ObjectMapper`上设置`enableDefaultTyping`,以便为每个序列化的复杂对象添加类型信息(如果你信任源)。然后在反序列化过程中使用该类型信息。该框架提供了一种名为`JacksonJsonUtils.messagingAwareMapper()`的实用方法,该方法已经提供了前面提到的所有属性和序列化器。此实用程序方法带有`trustedPackages`参数,用于限制用于反序列化的 Java 包,以避免安全漏洞。默认受信任的包:`java.util`,`java.lang`,`org.springframework.messaging.support`,`org.springframework.integration.support`,`org.springframework.integration.message`,`org.springframework.integration.store`。要在`RedisMessageStore`中管理 JSON 序列化,你必须以类似于以下示例的方式对其进行配置: ``` RedisMessageStore store = new RedisMessageStore(redisConnectionFactory); ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper(); RedisSerializer serializer = new GenericJackson2JsonRedisSerializer(mapper); store.setValueSerializer(serializer); ``` 从版本 4.3.12 开始,`RedisMessageStore`支持`prefix`选项,以允许区分相同 Redis 服务器上的存储实例。 #### Redis 频道消息存储 `RedisMessageStore`[显示在前面](#redis-message-store)将每个组保持为单个键(组 ID)下的值。虽然你可以使用它来支持用于持久性的`QueueChannel`,但为此目的提供了专门的`RedisChannelMessageStore`(自版本 4.0 以来)。此存储对每个通道使用`LIST`,在发送消息时使用`LPUSH`,在接收消息时使用`RPOP`。默认情况下,此存储还使用 JDK 序列化,但你可以修改值序列化器,如[前面描述的](#redis-message-store)。 我们建议使用此存储备份通道,而不是使用一般的`RedisMessageStore`。下面的示例定义了一个 Redis 消息存储,并在带有队列的通道中使用它: ``` ``` 用于存储数据的键的形式为:`:`(在前面的示例中,`redisMessageStore:somePersistentQueueChannel`)。 此外,还提供了一个子类`RedisChannelPriorityMessageStore`。当你使用`QueueChannel`时,将以优先顺序接收消息。它使用标准的`IntegrationMessageHeaderAccessor.PRIORITY`标头并支持优先值(`0 - 9`)。具有其他优先级的消息(以及不具有优先级的消息)将按照 FIFO 顺序在具有优先级的消息之后检索。 | |这些存储仅实现`BasicMessageGroupStore`,而不实现`MessageGroupStore`。
它们只能用于诸如支持`QueueChannel`的情况。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ### Redis 元数据存储 Spring Integration3.0 引入了一种新的基于 Redis 的[`MetadataStore`](https://DOCS. Spring.io/ Spring-integration/DOCS/latest-ga/api/org/springframework/integration/metadata/metadatastore.html)(参见[元数据存储](./meta-data-store.html#metadata-store))实现。你可以使用`RedisMetadataStore`在应用程序重新启动时维护`MetadataStore`的状态。你可以将这个新的`MetadataStore`实现与适配器一起使用,例如: * [Feed](./feed.html#feed-inbound-channel-adapter) * [File](./file.html#file-reading) * [FTP](./ftp.html#ftp-inbound) * [SFTP](./sftp.html#sftp-inbound) 要指示这些适配器使用新的`RedisMetadataStore`,请声明名为`metadataStore`的 Spring Bean。提要入站通道适配器和提要入站通道适配器都会自动拾取并使用声明的`RedisMetadataStore`。下面的示例展示了如何声明这样的 Bean: ``` ``` `RedisMetadataStore`由[`RedisProperties`](https://DOCS. Spring.io/ Spring-data/data-redis/DOCS/current/api/org/springframework/data/redis/support/collections/redisproperties.html)支持。与它的交互使用[`BoundHashOperations`](https://DOCS. Spring.io/ Spring-data/data-redis/DOCS/current/api/org/springframework/data/redis/core/boundhashoperations.html),而这反过来又要求整个`key`存储使用`key`。在`MetadataStore`的情况下,这个`key`扮演一个区域的角色,这在分布式环境中很有用,当几个应用程序使用相同的 Redis 服务器时。默认情况下,这个`key`的值是`MetaData`。 从版本 4.0 开始,这个存储实现了`ConcurrentMetadataStore`,让它在多个应用程序实例之间可靠地共享,在这些应用程序实例中,只允许一个实例存储或修改键值。 | |你不能将`RedisMetadataStore.replace()`(例如,在`AbstractPersistentAcceptOnceFileListFilter`中)与 Redis 集群一起使用,因为当前不支持原子性的`WATCH`命令。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ### Redis 存储入站通道适配器 Redis 存储入站通道适配器是一个轮询使用者,它从 Redis 集合中读取数据,并将其作为`Message`有效负载发送。下面的示例展示了如何配置 Redis 存储入站通道适配器: ``` ``` 前面的示例展示了如何通过使用`store-inbound-channel-adapter`元素来配置 Redis 存储入站通道适配器,该元素为各种属性提供值,例如: * `key`或`key-expression`:所使用的集合的键的名称。 * `collection-type`:此适配器支持的集合类型的枚举。支持的集合是`LIST`、`SET`、`ZSET`、`PROPERTIES`和`MAP`。 * `connection-factory`:引用`o.s.data.redis.connection.RedisConnectionFactory`的实例。 * `redis-template`:引用`o.s.data.redis.core.RedisTemplate`的实例。 * 在所有其他入站适配器中通用的其他属性(例如“channel”)。 | |你不能同时设置`redis-template`和`connection-factory`。| |---|--------------------------------------------------------------| | |默认情况下,适配器使用`StringRedisTemplate`。
这将使用`StringRedisSerializer`实例来表示键、值、散列键和散列值。
如果你的 Redis 存储中包含使用其他技术进行序列化的对象,则必须提供一个`RedisTemplate`配置了适当的序列化器,例如,
,如果存储被写入使用一个 Redis 存储出站适配器,该适配器的`extract-payload-elements`设置为`false`,你必须提供一个`RedisTemplate`配置如下:

```









```
`RedisTemplate``RedisTemplate``RedisTemplate`使用`String`序列化器来表示键和散列键,并使用默认的 JDK 序列化器来表示值和散列值。| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 因为它有一个`key`的文本值,所以前面的示例是相对简单和静态的。有时,你可能需要根据某些条件在运行时更改键的值。要做到这一点,请使用`key-expression`代替,其中提供的表达式可以是任何有效的 SPEL 表达式。 此外,你可能希望对从 Redis 集合中读取的已成功处理的数据执行一些后处理。例如,你可能希望在其被处理后移动或删除该值。你可以通过使用 Spring Integration2.2 中添加的事务同步特性来实现这一点。下面的示例使用`key-expression`和事务同步: ``` ``` 你可以使用`transactional`元素将你的 Poller 声明为事务性的。这个元素可以引用一个真正的事务管理器(例如,如果流的其他部分调用 JDBC)。如果没有“真正的”事务,则可以使用`o.s.i.transaction.PseudoTransactionManager`,这是 Spring 的`PlatformTransactionManager`的实现,并且在没有实际事务的情况下启用 Redis 适配器的事务同步功能。 | |这并不使 Redis 活动本身具有事务性。
它允许在成功之前或之后(提交)或失败之后(回滚)进行操作的同步。| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 一旦你的 poller 是事务性的,你就可以在`transactional`元素上设置`o.s.i.transaction.TransactionSynchronizationFactory`的实例。`TransactionSynchronizationFactory`创建`TransactionSynchronization`的实例。为了你的方便,我们公开了一个默认的基于 SPEL 的`TransactionSynchronizationFactory`,它允许你配置 SPEL 表达式,其执行将与事务协调(同步)。支持用于 before-commit、after-commit 和 after-rollback 的表达式,以及发送计算结果(如果有的话)的通道(每种事件的一个)。对于每个子元素,你可以指定`expression`和`channel`属性。如果只存在`channel`属性,则将接收到的消息作为特定同步场景的一部分发送到那里。如果只存在`expression`属性,并且表达式的结果是非空值,则生成一条结果为有效负载的消息,并将其发送到默认通道(`NullChannel`),并显示在日志中(在`DEBUG`级别)。如果你希望评估结果转到特定的通道,请添加`channel`属性。如果表达式的结果是空的或无效的,则不会生成任何消息。 有关事务同步的更多信息,请参见[事务同步](./transactions.html#transaction-synchronization)。 ### 恢复存储出站通道适配器 Redisstore 出站通道适配器允许你将消息有效负载写入 Redis 集合,如下例所示: ``` ``` 前面的配置 a Redis 通过使用`store-inbound-channel-adapter`元素存储出站通道适配器。它为各种属性提供了值,例如: * `key`或`key-expression`:所使用的集合的键的名称。 * `extract-payload-elements`:如果设置为`true`(默认值)并且有效负载是“多值”对象的实例(即`Collection`或`Map`),则通过使用“addall”和“putall”语义来存储它。否则,如果设置为`false`,则无论其类型如何,有效负载都将存储为单个条目。如果有效负载不是“多值”对象的实例,则忽略此属性的值,并且有效负载始终存储为单个条目。 * `collection-type`:此适配器支持的`Collection`类型的枚举。支持的集合是`LIST`、`SET`、`ZSET`、`PROPERTIES`和`MAP`。 * `map-key-expression`:spel 表达式,返回所存储条目的键名。它仅在`collection-type`为`MAP`或`PROPERTIES`且’extract-payload-elements’为 false 时才适用。 * `connection-factory`:引用`o.s.data.redis.connection.RedisConnectionFactory`的实例。 * `redis-template`:引用`o.s.data.redis.core.RedisTemplate`的实例。 * 在所有其他入站适配器中通用的其他属性(例如“channel”)。 | |你不能同时设置`redis-template`和`connection-factory`。| |---|--------------------------------------------------------------| | |默认情况下,适配器使用`StringRedisTemplate`。
这将使用`StringRedisSerializer`实例表示键、值、散列键和散列值,
但是,如果`extract-payload-elements`设置为`false`,将使用`RedisTemplate`的`StringRedisSerializer`实例表示键和散列键,`JdkSerializationRedisSerializer`实例表示值和散列值。
有了 JDK 序列化器,理解 Java 序列化用于所有值是很重要的,不管这个值实际上是不是集合。
如果你需要更多地控制值的序列化,请考虑提供你自己的`RedisTemplate`,而不是依赖这些默认值。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 因为它具有`key`和其他属性的文字值,所以前面的示例是相对简单且静态的。有时,你可能需要在运行时根据某些条件动态更改这些值。要做到这一点,使用它们的`-expression`等价物(`key-expression`,`map-key-expression`,以此类推),其中提供的表达式可以是任何有效的 SPEL 表达式。 ### Redis 出站命令网关 Spring Integration4.0 引入了 Redis 命令网关,以允许你通过使用通用的`RedisConnection#execute`方法执行任何标准的 Redis 命令。下面的清单显示了 Redis 出站网关的可用属性: ``` (11) ``` |**1** |该端点接收`Message`实例的`MessageChannel`实例。| |------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |**2** |在`MessageChannel`中,此端点发送回复`Message`实例。| |**3** |指定此出站网关是否必须返回非空值。
它默认为`true`。
`ReplyRequiredException`当 Redis 返回`null`值时抛出。| |**4** |等待回复消息发送的超时(以毫秒为单位)。
它通常应用于基于队列的有限回复通道。| |**5** |对`RedisConnectionFactory` Bean 的引用。
它默认为`redisConnectionFactory`。
它与’redis-template’属性互斥。| |**6** |对`RedisTemplate` Bean.
的引用与“connection-factory”属性互斥。| |**7** |对`org.springframework.data.redis.serializer.RedisSerializer`实例的引用。
它用于在必要时将每个命令参数序列化为字节[]。| |**8** |返回命令键的 SPEL 表达式。
它默认为`redis_command`消息头。
它不能求值为`null`。| |**9** |用逗号分隔的 SPEL 表达式,作为命令参数计算。
与`arguments-strategy`属性互斥。
如果两个属性都不提供,则`payload`用作命令参数。
参数表达式可以计算为“null”,以支持数量可变的参数。| |**10**|一个`boolean`标志,用于指定当`argument-expressions`被配置时,在`o.s.i.redis.outbound.ExpressionArgumentsStrategy`的表达式求值上下文中,是否将已计算的 Redis 命令字符串作为`#cmd`变量提供。
否则将忽略此属性。| |**11**|引用`o.s.i.redis.outbound.ArgumentsStrategy`的实例。
它与`argument-expressions`属性是互斥的。
如果你不提供这两个属性,`payload`将用作命令参数。| 你可以使用``作为公共组件来执行任何所需的 Redis 操作。下面的示例展示了如何从 Redis 原子序数获得递增的值: ``` ``` `Message`有效载荷的名称应该是`redisCounter`,这可以由`org.springframework.data.redis.support.atomic.RedisAtomicInteger` Bean 定义提供。 `RedisConnection#execute`方法有一个通用的`Object`作为其返回类型。实际结果取决于命令类型。例如,`MGET`返回一个`List`。有关命令、其参数和结果类型的更多信息,请参见[Redis 规范](https://redis.io/commands)。 ### Redis 队列出站网关 Spring 集成引入了 Redis 队列出站网关来执行请求和应答场景。它将一个对话`UUID`推送到所提供的`queue`,将带有`UUID`作为其键的值推送到一个 Redis 列表,并等待来自一个键为`UUID' plus '.reply`的 Redis 列表的回复。每个交互使用不同的 UUID。下面的清单显示了 Redis 出站网关的可用属性: ``` (9) ``` |**1**|该端点接收`Message`实例的`MessageChannel`实例。| |-----|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |**2**|在`MessageChannel`中,此端点发送回复`Message`实例。| |**3**|指定此出站网关是否必须返回非空值。
此值默认为`false`。
否则,当 Redis 返回`ReplyRequiredException`值时,将抛出一个`ReplyRequiredException`值。| |**4**|等待回复消息发送的超时(以毫秒为单位)。
它通常应用于基于队列的有限回复通道。| |**5**|对`RedisConnectionFactory` Bean 的引用。
它默认为`redisConnectionFactory`。
它与’redis-template’属性互斥。| |**6**|出站网关向其发送对话`UUID`的 Redis 列表的名称。| |**7**|当注册了多个网关时,此出站网关的顺序。| |**8**|`RedisSerializer` Bean 引用。
它可以是一个空字符串,这意味着“没有序列化器”。
在这种情况下,来自入站 Redis 消息的 RAW`byte[]`被发送到`channel`作为`Message`有效载荷。
默认情况下,它是`JdkSerializationRedisSerializer`。| |**9**|指定此端点是否期望来自 Redis 队列的数据包含整个`Message`实例。
如果将此属性设置为`true`,则`serializer`不能是空字符串,因为消息需要某种形式的反序列化(默认情况下是 JDK 序列化)。| ### Redis 队列入站网关 Spring 集成 4.1 引入了 REDIS 队列入站网关来执行请求和应答场景。它从提供的`queue`中弹出一个对话`UUID`,从 Redis 列表中弹出以`UUID`作为其键的值,并将该回复推到 Redis 列表中,其键为`UUID`加上`.reply`。下面的清单显示了 Redis 队列入站网关的可用属性: ``` (11) ``` |**1** |这个端点发送从 Redis 数据创建的`MessageChannel`实例的`Message`实例。| |------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |**2** |从该端点等待回复的`MessageChannel`实例。
可选-`replyChannel`标头仍在使用。| |**3** |对 Spring `TaskExecutor`(或标准 JDK`Executor`) Bean 的引用。
它用于底层监听任务。
它默认为`SimpleAsyncTaskExecutor`。| |**4** |等待回复消息发送的超时(以毫秒为单位)。
它通常应用于基于队列的有限回复通道。| |**5** |对`RedisConnectionFactory` Bean 的引用。
它默认为`redisConnectionFactory`。
它与’redis-template’属性互斥。| |**6** |对话`UUID`的 Redis 列表的名称。| |**7** |当注册了多个网关时,此入站网关的顺序。| |**8** |`RedisSerializer` Bean 引用。
它可以是一个空字符串,这意味着“没有序列化器”。
在这种情况下,来自入站 Redis 消息的 RAW`byte[]`被发送到作为有效负载。
它默认为`JdkSerializationRedisSerializer`。(请注意,在版本 4.3 之前的版本中,默认情况下是`StringRedisSerializer`。
要恢复该行为,请提供对`StringRedisSerializer`的引用)。| |**9** |等待接收消息后的超时(以毫秒为单位)。
它通常应用于基于队列的有限请求通道。| |**10**|指定此端点是否期望来自 Redis 队列的数据包含整个`Message`实例。
如果将此属性设置为`true`,则`serializer`不能是空字符串,因为消息需要某种形式的反序列化(默认情况下是 JDK 序列化)。| |**11**|侦听器任务在“右 POP”操作出现异常后应该休眠的时间(以毫秒为单位),然后再重新启动侦听器任务。| | |`task-executor`必须配置多个线程进行处理;否则,当`RedisQueueMessageDrivenEndpoint`在出现错误后试图重新启动侦听器任务时,可能会出现死锁,
`errorChannel`可以用来处理这些错误,以避免重新启动,但最好不要将你的应用程序暴露在可能的死锁情况下。
有关可能的`TaskExecutor`实现,请参见 Spring Framework[参考手册](https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#scheduling-task-executor-types)。| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ### Redis 流出站通道适配器 Spring 集成 5.4 引入了反应性 Redis 流出站信道适配器,以将消息有效负载写入 Redis 流。出站通道适配器使用`ReactiveStreamOperations.add(…​)`向流添加`Record`。下面的示例展示了如何为 Redis 流出站通道适配器使用 Java 配置和服务类。 ``` @Bean @ServiceActivator(inputChannel = "messageChannel") public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler( ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) { ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler = new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1) reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2) reactiveStreamMessageHandler.setHashMapper(hashMapper); (3) reactiveStreamMessageHandler.setExtractPayload(true); (4) return reactiveStreamMessageHandler; } ``` |**1**|使用`ReactiveRedisConnectionFactory`和流名构造`ReactiveRedisStreamMessageHandler`的实例来添加记录。
另一个构造函数变体是基于 SPEL 表达式来根据请求消息计算流键。| |-----|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |**2**|设置一个`RedisSerializationContext`,用于在向流添加之前序列化一个记录键和值。| |**3**|设置`HashMapper`,它提供 Java 类型和 Redis 散列/映射之间的契约。| |**4**|如果“true”,通道适配器将从请求消息中提取有效负载,用于添加流记录。
或将整个消息用作一个值。
它默认为`true`。| ### Redis 流入站通道适配器 Spring 集成 5.4 引入了用于从 Redis 流读取消息的反应流入站通道适配器。入站通道适配器基于自动确认标志使用`StreamReceiver.receive(…​)`或`StreamReceiver.receiveAutoAck()`从 Redis 流读取记录。下面的示例展示了如何为 Redis 流入站通道适配器使用 Java 配置。 ``` @Bean public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer( ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) { ReactiveRedisStreamMessageProducer messageProducer = new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1) messageProducer.setStreamReceiverOptions( (2) StreamReceiver.StreamReceiverOptions.builder() .pollTimeout(Duration.ofMillis(100)) .build()); messageProducer.setAutoStartup(true); (3) messageProducer.setAutoAck(false); (4) messageProducer.setCreateConsumerGroup(true); (5) messageProducer.setConsumerGroup("my-group"); (6) messageProducer.setConsumerName("my-consumer"); (7) messageProducer.setOutputChannel(fromRedisStreamChannel); (8) messageProducer.setReadOffset(ReadOffset.latest()); (9) messageProducer.extractPayload(true); (10) return messageProducer; } ``` |**1** |构造`ReactiveRedisStreamMessageProducer`的实例,使用`ReactiveRedisConnectionFactory`和流键来读取记录。| |------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |**2** |a`StreamReceiver.StreamReceiverOptions`使用反应性基础设施来消耗 Redis 流。| |**3** |一个`SmartLifecycle`属性,用于指定此端点是否应在应用程序上下文启动后自动启动。
它默认为`true`。
如果`false`,`RedisStreamMessageProducer`应该手动启动`messageProducer.start()`。| |**4** |如果`false`,则接收到的消息不是自动确认的。
消息的确认将推迟到客户端消费消息。
它默认为`true`。| |**5** |如果`true`,将创建一个消费者组。
在创建消费者组时也将创建一个流(如果还不存在的话)。
消费者组跟踪消息传递并区分消费者组。
它默认为`false`。| |**6** |设置消费者组名称。
它默认为定义的 Bean 名称。| |**7** |设置消费者名称。
将消息从组`my-group`中读取为`my-consumer`。| |**8** |从该端点向其发送消息的消息通道。| |**9** |定义偏移量来读取消息。
它默认为`ReadOffset.latest()`。| |**10**|如果“true”,通道适配器将从`Record`提取有效载荷值。
否则将整个`Record`用作有效负载。
它默认为`true`。| 如果`autoAck`设置为`false`,则 Redis 流中的`Record`不会由 Redis 驱动程序自动确认,而是将`IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK`头添加到消息中,以产生一个`SimpleAcknowledgment`实例作为值。每当基于这样的记录为消息执行业务逻辑时,目标集成流负责调用其`acknowledge()`回调。即使在反序列化过程中发生异常并且`errorChannel`被配置时,也需要类似的逻辑。因此,目标错误处理程序必须决定 ACK 或 NACK 这样的失败消息。除了`IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK`,`ReactiveRedisStreamMessageProducer`还将这些头填充到消息中,以生成:`RedisHeaders.STREAM_KEY`,`RedisHeaders.STREAM_MESSAGE_ID`,`RedisHeaders.CONSUMER_GROUP`和`RedisHeaders.CONSUMER`。 从版本 5.5 开始,你可以在`ReactiveRedisStreamMessageProducer`上显式地配置`StreamReceiver.StreamReceiverOptionsBuilder`选项,包括新引入的`onErrorResume`函数,如果出现反序列化错误时,Redis 流使用者应该继续轮询,则需要该函数。默认函数向错误通道(如果提供)发送一条消息,并对上面描述的失败消息进行可能的确认。所有这些`StreamReceiver.StreamReceiverOptionsBuilder`与外部提供的`StreamReceiver.StreamReceiverOptions`是互斥的。 ### Redis 锁定注册表 Spring Integration4.0 引入了`RedisLockRegistry`。某些组件(例如,聚合器和重排序程序)使用从`LockRegistry`实例获得的锁,以确保一次只有一个线程操作组。`DefaultLockRegistry`在单个组件中执行此功能。现在,你可以在这些组件上配置一个外部锁注册中心。当你将它与共享的`MessageGroupStore`一起使用时,你可以使用`RedisLockRegistry`来跨多个应用程序实例提供此功能,使得一次只有一个实例可以操作组。 当一个本地线程释放一个锁时,另一个本地线程通常可以立即获得该锁。如果一个线程使用不同的注册中心实例释放了一个锁,那么获取该锁可能需要多达 100ms 的时间。 为了避免“挂起”锁(当服务器发生故障时),此注册中心中的锁在默认的 60 秒后过期,但是你可以在注册中心上配置该值。通常锁的时间要短得多。 | |由于密钥可能过期,试图解锁过期的锁将导致引发异常,
但是,受此锁保护的资源可能已遭到破坏,所以这样的异常应该被认为是严重的。
你应该将过期时间设置为足够大的值,以防止出现这种情况,但是将过期时间设置得足够低,以便在服务器故障后在合理的时间内恢复锁。| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 从版本 5.0 开始,`RedisLockRegistry`实现了`ExpirableLockRegistry`,它删除了上次获得的超过`age`的锁,并且这些锁目前没有被锁定。 使用 5.5.6 版本的字符串,`RedisLockRegistry`支持通过`RedisLockRegistry.setCacheCapacity()`在`RedisLockRegistry.locks`中自动清理缓存以进行重新锁定。有关更多信息,请参见其 Javadocs。